Python API
### Logging (continued)
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('ansible_api.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('ansible_api')
return logger
logger = setup_logging()
def log_playbook_execution(result):
logger.info(f"Playbook execution started")
for event in result.events:
if event['event'] == 'playbook_on_task_start':
logger.info(f"Task started: {event['event_data']['task']}")
elif event['event'] == 'runner_on_failed':
logger.error(f"Task failed: {event['event_data']['task']}")
logger.info(f"Playbook execution completed with status: {result.status}")
Advanced Integration
Parallel Execution
from concurrent.futures import ThreadPoolExecutor
import ansible_runner
def run_parallel_playbooks(playbook_configs):
def execute_playbook(config):
return ansible_runner.run(**config)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(execute_playbook, config)
for config in playbook_configs
]
results = []
for future in futures:
try:
result = future.result()
results.append({
'status': result.status,
'rc': result.rc,
'stats': result.stats
})
except Exception as e:
results.append({
'status': 'failed',
'error': str(e)
})
return results
Event Handling
class CustomEventHandler:
def __init__(self):
self.tasks = {}
self.failures = []
def handle_event(self, event):
if event['event'] == 'playbook_on_task_start':
task_name = event['event_data']['task']
self.tasks[task_name] = {
'start_time': time.time(),
'status': 'running'
}
elif event['event'] == 'runner_on_ok':
task_name = event['event_data']['task']
if task_name in self.tasks:
self.tasks[task_name].update({
'end_time': time.time(),
'status': 'completed'
})
elif event['event'] == 'runner_on_failed':
self.failures.append({
'task': event['event_data']['task'],
'host': event['event_data']['host'],
'message': event['event_data']['res'].get('msg', 'No error message')
})
def run_with_custom_handler():
handler = CustomEventHandler()
result = ansible_runner.run(
playbook='site.yml',
inventory='inventory.yml',
event_handler=handler.handle_event
)
return {
'tasks': handler.tasks,
'failures': handler.failures,
'status': result.status
}
Inventory Management
class DynamicInventoryManager:
def __init__(self):
self.loader = DataLoader()
self.inventory = InventoryManager(loader=self.loader)
def add_host(self, hostname, group_name, variables=None):
if not self.inventory.get_group(group_name):
self.inventory.add_group(group_name)
self.inventory.add_host(hostname, group=group_name)
if variables:
for key, value in variables.items():
self.inventory.set_variable(hostname, key, value)
def remove_host(self, hostname):
self.inventory.remove_host(hostname)
def get_host_vars(self, hostname):
host = self.inventory.get_host(hostname)
if host:
return host.get_vars()
return {}
def write_inventory(self, path):
inventory_data = {}
for group in self.inventory.groups.values():
inventory_data[group.name] = {
'hosts': [host.name for host in group.hosts],
'vars': group.vars
}
with open(path, 'w') as f:
yaml.dump(inventory_data, f)
Testing and Debugging
Mock Testing
from unittest.mock import patch, MagicMock
class TestAnsibleRunner:
@patch('ansible_runner.run')
def test_playbook_execution(self, mock_run):
mock_result = MagicMock()
mock_result.status = 'successful'
mock_result.rc = 0
mock_run.return_value = mock_result
result = run_playbook('test.yml', 'inventory.yml')
assert result['status'] == 'successful'
assert result['rc'] == 0
@patch('ansible.inventory.manager.InventoryManager')
def test_inventory_management(self, mock_inventory):
mock_inventory.return_value.get_hosts.return_value = [
MagicMock(name='host1', groups=[], vars={})
]
inventory = get_inventory()
hosts = inventory.get_hosts()
assert len(hosts) == 1
assert hosts[0].name == 'host1'
class AnsibleDebugger:
def __init__(self):
self.event_log = []
self.task_timings = {}
self.host_stats = {}
def event_handler(self, event):
self.event_log.append(event)
if event['event'] == 'playbook_on_task_start':
task = event['event_data']['task']
self.task_timings[task] = time.time()
elif event['event'] == 'runner_on_ok':
task = event['event_data']['task']
host = event['event_data']['host']
if task in self.task_timings:
duration = time.time() - self.task_timings[task]
if host not in self.host_stats:
self.host_stats[host] = []
self.host_stats[host].append({
'task': task,
'duration': duration,
'status': 'ok'
})
def generate_report(self):
return {
'total_events': len(self.event_log),
'task_timings': self.task_timings,
'host_stats': self.host_stats
}
def debug_playbook_execution():
debugger = AnsibleDebugger()
result = ansible_runner.run(
playbook='site.yml',
inventory='inventory.yml',
event_handler=debugger.event_handler,
debug=True
)
report = debugger.generate_report()
print(f"Execution Report:\n{json.dumps(report, indent=2)}")
return result, report
Caching
from functools import lru_cache
import redis
class AnsibleCache:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
def set_facts(self, host, facts):
key = f"ansible_facts:{host}"
self.redis.setex(key, 3600, json.dumps(facts))
def get_facts(self, host):
key = f"ansible_facts:{host}"
facts = self.redis.get(key)
return json.loads(facts) if facts else None
@lru_cache(maxsize=128)
def get_host_vars(self, host):
return self.inventory.get_host(host).get_vars()
Resource Management
class ResourceManager:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.running_tasks = {}
async def run_playbook(self, name, config):
async with self.semaphore:
task_id = str(uuid.uuid4())
self.running_tasks[task_id] = {
'status': 'running',
'start_time': time.time()
}
try:
result = await asyncio.to_thread(
ansible_runner.run,
**config
)
self.running_tasks[task_id].update({
'status': 'completed',
'end_time': time.time(),
'result': {
'status': result.status,
'rc': result.rc
}
})
return result
except Exception as e:
self.running_tasks[task_id].update({
'status': 'failed',
'error': str(e)
})
raise