Skip to content

Python API

### Logging (continued)
def setup_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('ansible_api.log'),
            logging.StreamHandler()
        ]
    )

    logger = logging.getLogger('ansible_api')
    return logger

logger = setup_logging()

def log_playbook_execution(result):
    logger.info(f"Playbook execution started")
    for event in result.events:
        if event['event'] == 'playbook_on_task_start':
            logger.info(f"Task started: {event['event_data']['task']}")
        elif event['event'] == 'runner_on_failed':
            logger.error(f"Task failed: {event['event_data']['task']}")
    logger.info(f"Playbook execution completed with status: {result.status}")

Advanced Integration

Parallel Execution

from concurrent.futures import ThreadPoolExecutor
import ansible_runner

def run_parallel_playbooks(playbook_configs):
    def execute_playbook(config):
        return ansible_runner.run(**config)

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(execute_playbook, config)
            for config in playbook_configs
        ]

        results = []
        for future in futures:
            try:
                result = future.result()
                results.append({
                    'status': result.status,
                    'rc': result.rc,
                    'stats': result.stats
                })
            except Exception as e:
                results.append({
                    'status': 'failed',
                    'error': str(e)
                })

        return results

Event Handling

class CustomEventHandler:
    def __init__(self):
        self.tasks = {}
        self.failures = []

    def handle_event(self, event):
        if event['event'] == 'playbook_on_task_start':
            task_name = event['event_data']['task']
            self.tasks[task_name] = {
                'start_time': time.time(),
                'status': 'running'
            }

        elif event['event'] == 'runner_on_ok':
            task_name = event['event_data']['task']
            if task_name in self.tasks:
                self.tasks[task_name].update({
                    'end_time': time.time(),
                    'status': 'completed'
                })

        elif event['event'] == 'runner_on_failed':
            self.failures.append({
                'task': event['event_data']['task'],
                'host': event['event_data']['host'],
                'message': event['event_data']['res'].get('msg', 'No error message')
            })

def run_with_custom_handler():
    handler = CustomEventHandler()

    result = ansible_runner.run(
        playbook='site.yml',
        inventory='inventory.yml',
        event_handler=handler.handle_event
    )

    return {
        'tasks': handler.tasks,
        'failures': handler.failures,
        'status': result.status
    }

Inventory Management

class DynamicInventoryManager:
    def __init__(self):
        self.loader = DataLoader()
        self.inventory = InventoryManager(loader=self.loader)

    def add_host(self, hostname, group_name, variables=None):
        if not self.inventory.get_group(group_name):
            self.inventory.add_group(group_name)

        self.inventory.add_host(hostname, group=group_name)

        if variables:
            for key, value in variables.items():
                self.inventory.set_variable(hostname, key, value)

    def remove_host(self, hostname):
        self.inventory.remove_host(hostname)

    def get_host_vars(self, hostname):
        host = self.inventory.get_host(hostname)
        if host:
            return host.get_vars()
        return {}

    def write_inventory(self, path):
        inventory_data = {}

        for group in self.inventory.groups.values():
            inventory_data[group.name] = {
                'hosts': [host.name for host in group.hosts],
                'vars': group.vars
            }

        with open(path, 'w') as f:
            yaml.dump(inventory_data, f)

Testing and Debugging

Mock Testing

from unittest.mock import patch, MagicMock

class TestAnsibleRunner:
    @patch('ansible_runner.run')
    def test_playbook_execution(self, mock_run):
        mock_result = MagicMock()
        mock_result.status = 'successful'
        mock_result.rc = 0
        mock_run.return_value = mock_result

        result = run_playbook('test.yml', 'inventory.yml')

        assert result['status'] == 'successful'
        assert result['rc'] == 0

    @patch('ansible.inventory.manager.InventoryManager')
    def test_inventory_management(self, mock_inventory):
        mock_inventory.return_value.get_hosts.return_value = [
            MagicMock(name='host1', groups=[], vars={})
        ]

        inventory = get_inventory()
        hosts = inventory.get_hosts()

        assert len(hosts) == 1
        assert hosts[0].name == 'host1'

Debugging Tools

class AnsibleDebugger:
    def __init__(self):
        self.event_log = []
        self.task_timings = {}
        self.host_stats = {}

    def event_handler(self, event):
        self.event_log.append(event)

        if event['event'] == 'playbook_on_task_start':
            task = event['event_data']['task']
            self.task_timings[task] = time.time()

        elif event['event'] == 'runner_on_ok':
            task = event['event_data']['task']
            host = event['event_data']['host']

            if task in self.task_timings:
                duration = time.time() - self.task_timings[task]
                if host not in self.host_stats:
                    self.host_stats[host] = []

                self.host_stats[host].append({
                    'task': task,
                    'duration': duration,
                    'status': 'ok'
                })

    def generate_report(self):
        return {
            'total_events': len(self.event_log),
            'task_timings': self.task_timings,
            'host_stats': self.host_stats
        }

def debug_playbook_execution():
    debugger = AnsibleDebugger()

    result = ansible_runner.run(
        playbook='site.yml',
        inventory='inventory.yml',
        event_handler=debugger.event_handler,
        debug=True
    )

    report = debugger.generate_report()
    print(f"Execution Report:\n{json.dumps(report, indent=2)}")

    return result, report

Performance Optimization

Caching

from functools import lru_cache
import redis

class AnsibleCache:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)

    def set_facts(self, host, facts):
        key = f"ansible_facts:{host}"
        self.redis.setex(key, 3600, json.dumps(facts))

    def get_facts(self, host):
        key = f"ansible_facts:{host}"
        facts = self.redis.get(key)
        return json.loads(facts) if facts else None

    @lru_cache(maxsize=128)
    def get_host_vars(self, host):
        return self.inventory.get_host(host).get_vars()

Resource Management

class ResourceManager:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.running_tasks = {}

    async def run_playbook(self, name, config):
        async with self.semaphore:
            task_id = str(uuid.uuid4())
            self.running_tasks[task_id] = {
                'status': 'running',
                'start_time': time.time()
            }

            try:
                result = await asyncio.to_thread(
                    ansible_runner.run,
                    **config
                )

                self.running_tasks[task_id].update({
                    'status': 'completed',
                    'end_time': time.time(),
                    'result': {
                        'status': result.status,
                        'rc': result.rc
                    }
                })

                return result

            except Exception as e:
                self.running_tasks[task_id].update({
                    'status': 'failed',
                    'error': str(e)
                })
                raise