Python Automation for Infrastructure Engineers

Introduction

Every infrastructure engineer eventually hits the limits of bash scripts. Python bridges the gap between simple automation and complex orchestration—readable enough for quick scripts, powerful enough for production tools.

This guide covers practical Python patterns I use daily: API integrations, system administration, log parsing, and building command-line tools.

Setting Up for Infrastructure Work

Virtual Environments

Always isolate your projects:

# Create project
mkdir infra-scripts && cd infra-scripts
python3 -m venv venv
source venv/bin/activate

# Core infrastructure libraries
pip install requests paramiko boto3 azure-identity pyyaml click rich
pip freeze > requirements.txt

Project Structure

infra-scripts/
├── src/
│   ├── __init__.py
│   ├── cli.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── zabbix.py
│   │   └── azure.py
│   ├── utils/
│   │   ├── __init__.py
│   │   ├── ssh.py
│   │   └── logging.py
│   └── tasks/
│       ├── __init__.py
│       ├── backup.py
│       └── cleanup.py
├── tests/
├── requirements.txt
└── pyproject.toml

API Integrations

Base API Client

# src/api/base.py
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging

logger = logging.getLogger(__name__)

class APIClient:
    """Base API client with retry logic and error handling."""
    
    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        self.session = self._create_session()
    
    def _create_session(self) -> requests.Session:
        session = requests.Session()
        
        # Retry strategy for resilience
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("http://", adapter)
        session.mount("https://", adapter)
        
        return session
    
    def _request(self, method: str, endpoint: str, **kwargs) -> dict:
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        kwargs.setdefault('timeout', self.timeout)
        
        try:
            response = self.session.request(method, url, **kwargs)
            response.raise_for_status()
            return response.json() if response.content else {}
        except requests.exceptions.HTTPError as e:
            logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}")
            raise
        except requests.exceptions.RequestException as e:
            logger.error(f"Request failed: {e}")
            raise
    
    def get(self, endpoint: str, **kwargs) -> dict:
        return self._request('GET', endpoint, **kwargs)
    
    def post(self, endpoint: str, **kwargs) -> dict:
        return self._request('POST', endpoint, **kwargs)
    
    def put(self, endpoint: str, **kwargs) -> dict:
        return self._request('PUT', endpoint, **kwargs)
    
    def delete(self, endpoint: str, **kwargs) -> dict:
        return self._request('DELETE', endpoint, **kwargs)

Zabbix API Client

# src/api/zabbix.py
from .base import APIClient

class ZabbixClient(APIClient):
    """Zabbix API client for monitoring automation."""
    
    def __init__(self, url: str, user: str, password: str):
        super().__init__(f"{url}/api_jsonrpc.php")
        self.auth_token = None
        self._authenticate(user, password)
    
    def _authenticate(self, user: str, password: str):
        response = self.post('', json={
            "jsonrpc": "2.0",
            "method": "user.login",
            "params": {
                "username": user,
                "password": password
            },
            "id": 1
        })
        self.auth_token = response.get('result')
    
    def _call(self, method: str, params: dict = None) -> dict:
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "auth": self.auth_token,
            "id": 1
        }
        return self.post('', json=payload).get('result', [])
    
    def get_hosts(self, group_name: str = None) -> list:
        params = {
            "output": ["hostid", "host", "name", "status"],
            "selectInterfaces": ["ip"],
            "selectGroups": ["name"]
        }
        if group_name:
            params["groupids"] = self._get_group_id(group_name)
        return self._call("host.get", params)
    
    def get_problems(self, severity_min: int = 3) -> list:
        return self._call("problem.get", {
            "output": "extend",
            "selectHosts": ["host", "name"],
            "severities": list(range(severity_min, 6)),
            "recent": True,
            "sortfield": ["eventid"],
            "sortorder": "DESC"
        })
    
    def acknowledge_problem(self, event_id: str, message: str):
        return self._call("event.acknowledge", {
            "eventids": event_id,
            "action": 6,  # Close problem + add message
            "message": message
        })
    
    def create_maintenance(self, name: str, host_ids: list, 
                          duration_hours: int = 2) -> str:
        import time
        now = int(time.time())
        return self._call("maintenance.create", {
            "name": name,
            "active_since": now,
            "active_till": now + (duration_hours * 3600),
            "hostids": host_ids,
            "timeperiods": [{
                "timeperiod_type": 0,
                "start_date": now,
                "period": duration_hours * 3600
            }]
        })


# Usage example
if __name__ == "__main__":
    client = ZabbixClient(
        url="https://zabbix.yourorg.com",
        user="api-user",
        password="secret"
    )
    
    # Get all active problems
    problems = client.get_problems(severity_min=4)
    for p in problems:
        print(f"[{p['severity']}] {p['name']} on {p['hosts'][0]['name']}")

Azure Resource Management

# src/api/azure.py
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.resource import ResourceManagementClient
from typing import Iterator, Optional
import logging

logger = logging.getLogger(__name__)

class AzureClient:
    """Azure infrastructure management client."""
    
    def __init__(self, subscription_id: str):
        self.subscription_id = subscription_id
        self.credential = DefaultAzureCredential()
        
        self.compute = ComputeManagementClient(
            self.credential, subscription_id
        )
        self.network = NetworkManagementClient(
            self.credential, subscription_id
        )
        self.resource = ResourceManagementClient(
            self.credential, subscription_id
        )
    
    def list_vms(self, resource_group: str = None) -> Iterator[dict]:
        """List all VMs with their details."""
        if resource_group:
            vms = self.compute.virtual_machines.list(resource_group)
        else:
            vms = self.compute.virtual_machines.list_all()
        
        for vm in vms:
            yield {
                "name": vm.name,
                "location": vm.location,
                "size": vm.hardware_profile.vm_size,
                "os_type": vm.storage_profile.os_disk.os_type,
                "tags": vm.tags or {},
                "resource_group": vm.id.split('/')[4]
            }
    
    def get_vm_status(self, resource_group: str, vm_name: str) -> str:
        """Get VM power state."""
        instance = self.compute.virtual_machines.instance_view(
            resource_group, vm_name
        )
        for status in instance.statuses:
            if status.code.startswith('PowerState/'):
                return status.code.split('/')[-1]
        return 'unknown'
    
    def start_vm(self, resource_group: str, vm_name: str):
        """Start a VM asynchronously."""
        logger.info(f"Starting VM {vm_name}")
        return self.compute.virtual_machines.begin_start(
            resource_group, vm_name
        )
    
    def stop_vm(self, resource_group: str, vm_name: str, 
                deallocate: bool = True):
        """Stop a VM (deallocate to avoid charges)."""
        logger.info(f"Stopping VM {vm_name}")
        if deallocate:
            return self.compute.virtual_machines.begin_deallocate(
                resource_group, vm_name
            )
        return self.compute.virtual_machines.begin_power_off(
            resource_group, vm_name
        )
    
    def resize_vm(self, resource_group: str, vm_name: str, new_size: str):
        """Resize a VM (requires restart)."""
        vm = self.compute.virtual_machines.get(resource_group, vm_name)
        vm.hardware_profile.vm_size = new_size
        return self.compute.virtual_machines.begin_create_or_update(
            resource_group, vm_name, vm
        )
    
    def get_unattached_disks(self) -> Iterator[dict]:
        """Find orphaned managed disks."""
        for disk in self.compute.disks.list():
            if disk.managed_by is None:
                yield {
                    "name": disk.name,
                    "size_gb": disk.disk_size_gb,
                    "location": disk.location,
                    "resource_group": disk.id.split('/')[4]
                }


# Usage example
if __name__ == "__main__":
    client = AzureClient(subscription_id="your-sub-id")
    
    # Find all stopped VMs
    for vm in client.list_vms():
        rg = vm['resource_group']
        status = client.get_vm_status(rg, vm['name'])
        if status == 'deallocated':
            print(f"Stopped: {vm['name']} ({vm['size']})")

SSH Automation

Parallel Command Execution

# src/utils/ssh.py
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Tuple
import logging

logger = logging.getLogger(__name__)

@dataclass
class SSHResult:
    hostname: str
    exit_code: int
    stdout: str
    stderr: str
    success: bool = True
    error: str = None

class SSHClient:
    """SSH client for remote command execution."""
    
    def __init__(self, key_path: str = None, username: str = 'root'):
        self.key_path = key_path
        self.username = username
    
    def execute(self, hostname: str, command: str, 
                timeout: int = 30) -> SSHResult:
        """Execute command on remote host."""
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        try:
            connect_kwargs = {
                'hostname': hostname,
                'username': self.username,
                'timeout': 10
            }
            if self.key_path:
                connect_kwargs['key_filename'] = self.key_path
            
            client.connect(**connect_kwargs)
            
            stdin, stdout, stderr = client.exec_command(
                command, timeout=timeout
            )
            exit_code = stdout.channel.recv_exit_status()
            
            return SSHResult(
                hostname=hostname,
                exit_code=exit_code,
                stdout=stdout.read().decode().strip(),
                stderr=stderr.read().decode().strip(),
                success=exit_code == 0
            )
        except Exception as e:
            logger.error(f"SSH to {hostname} failed: {e}")
            return SSHResult(
                hostname=hostname,
                exit_code=-1,
                stdout='',
                stderr='',
                success=False,
                error=str(e)
            )
        finally:
            client.close()
    
    def execute_parallel(self, hosts: List[str], command: str,
                        max_workers: int = 10) -> List[SSHResult]:
        """Execute command on multiple hosts in parallel."""
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self.execute, host, command): host 
                for host in hosts
            }
            
            for future in as_completed(futures):
                results.append(future.result())
        
        return results


# Usage example
if __name__ == "__main__":
    ssh = SSHClient(key_path="~/.ssh/id_rsa", username="ansible")
    
    hosts = [
        "web-01.prod.yourorg.com",
        "web-02.prod.yourorg.com",
        "web-03.prod.yourorg.com"
    ]
    
    results = ssh.execute_parallel(hosts, "uptime")
    
    for r in results:
        if r.success:
            print(f"✓ {r.hostname}: {r.stdout}")
        else:
            print(f"✗ {r.hostname}: {r.error}")

Log Parsing and Analysis

Structured Log Parser

# src/utils/logs.py
import re
from datetime import datetime
from collections import defaultdict
from typing import Iterator, Dict, Any
import gzip

class LogParser:
    """Parse and analyze log files."""
    
    # Common log patterns
    PATTERNS = {
        'nginx_access': re.compile(
            r'(?P<ip>\S+) - \S+ \[(?P<time>[^\]]+)\] '
            r'"(?P<method>\S+) (?P<path>\S+) \S+" '
            r'(?P<status>\d+) (?P<size>\d+)'
        ),
        'syslog': re.compile(
            r'(?P<timestamp>\S+ \S+ \S+) (?P<host>\S+) '
            r'(?P<program>\S+?)\[?(?P<pid>\d+)?\]?: (?P<message>.+)'
        ),
        'auth': re.compile(
            r'(?P<timestamp>\S+ \S+ \S+) (?P<host>\S+) '
            r'sshd\[(?P<pid>\d+)\]: (?P<action>Accepted|Failed) '
            r'(?P<method>\S+) for (?P<user>\S+) from (?P<ip>\S+)'
        )
    }
    
    def __init__(self, pattern_name: str = 'nginx_access'):
        self.pattern = self.PATTERNS[pattern_name]
    
    def parse_file(self, filepath: str) -> Iterator[Dict[str, Any]]:
        """Parse log file and yield structured entries."""
        opener = gzip.open if filepath.endswith('.gz') else open
        
        with opener(filepath, 'rt') as f:
            for line in f:
                match = self.pattern.match(line.strip())
                if match:
                    yield match.groupdict()
    
    def analyze_nginx(self, filepath: str) -> Dict[str, Any]:
        """Analyze nginx access log."""
        stats = {
            'total_requests': 0,
            'status_codes': defaultdict(int),
            'top_paths': defaultdict(int),
            'top_ips': defaultdict(int),
            'errors': []
        }
        
        for entry in self.parse_file(filepath):
            stats['total_requests'] += 1
            stats['status_codes'][entry['status']] += 1
            stats['top_paths'][entry['path']] += 1
            stats['top_ips'][entry['ip']] += 1
            
            if entry['status'].startswith(('4', '5')):
                stats['errors'].append(entry)
        
        # Convert to sorted lists
        stats['top_paths'] = sorted(
            stats['top_paths'].items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:10]
        stats['top_ips'] = sorted(
            stats['top_ips'].items(),
            key=lambda x: x[1],
            reverse=True
        )[:10]
        
        return stats
    
    def find_failed_logins(self, filepath: str) -> Iterator[Dict]:
        """Find failed SSH login attempts."""
        self.pattern = self.PATTERNS['auth']
        
        for entry in self.parse_file(filepath):
            if entry.get('action') == 'Failed':
                yield entry


# Usage
if __name__ == "__main__":
    parser = LogParser('nginx_access')
    stats = parser.analyze_nginx('/var/log/nginx/access.log')
    
    print(f"Total requests: {stats['total_requests']}")
    print(f"\nStatus codes:")
    for code, count in stats['status_codes'].items():
        print(f"  {code}: {count}")

Building CLI Tools

Click-based CLI

# src/cli.py
import click
from rich.console import Console
from rich.table import Table
from .api.zabbix import ZabbixClient
from .api.azure import AzureClient

console = Console()

@click.group()
@click.option('--verbose', '-v', is_flag=True, help='Verbose output')
@click.pass_context
def cli(ctx, verbose):
    """Infrastructure automation toolkit."""
    ctx.ensure_object(dict)
    ctx.obj['verbose'] = verbose

@cli.group()
def zabbix():
    """Zabbix monitoring commands."""
    pass

@zabbix.command('problems')
@click.option('--severity', '-s', default=3, help='Minimum severity')
def zabbix_problems(severity):
    """List active Zabbix problems."""
    client = ZabbixClient(
        url=os.environ['ZABBIX_URL'],
        user=os.environ['ZABBIX_USER'],
        password=os.environ['ZABBIX_PASSWORD']
    )
    
    problems = client.get_problems(severity_min=severity)
    
    if not problems:
        console.print("[green]No active problems![/green]")
        return
    
    table = Table(title="Active Problems")
    table.add_column("Severity", style="cyan")
    table.add_column("Host", style="magenta")
    table.add_column("Problem", style="white")
    
    severity_colors = {
        '3': 'yellow', '4': 'orange', '5': 'red'
    }
    
    for p in problems:
        sev = p['severity']
        color = severity_colors.get(sev, 'white')
        table.add_row(
            f"[{color}]{sev}[/{color}]",
            p['hosts'][0]['name'] if p['hosts'] else 'N/A',
            p['name']
        )
    
    console.print(table)

@cli.group()
def azure():
    """Azure resource commands."""
    pass

@azure.command('vms')
@click.option('--resource-group', '-g', help='Filter by resource group')
@click.option('--stopped', is_flag=True, help='Show only stopped VMs')
def azure_vms(resource_group, stopped):
    """List Azure VMs."""
    client = AzureClient(os.environ['AZURE_SUBSCRIPTION_ID'])
    
    table = Table(title="Azure Virtual Machines")
    table.add_column("Name", style="cyan")
    table.add_column("Size", style="magenta")
    table.add_column("Status", style="green")
    table.add_column("Resource Group")
    
    for vm in client.list_vms(resource_group):
        status = client.get_vm_status(vm['resource_group'], vm['name'])
        
        if stopped and status != 'deallocated':
            continue
        
        status_style = "green" if status == "running" else "red"
        table.add_row(
            vm['name'],
            vm['size'],
            f"[{status_style}]{status}[/{status_style}]",
            vm['resource_group']
        )
    
    console.print(table)

@azure.command('cleanup-disks')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def cleanup_disks(dry_run):
    """Find and optionally delete orphaned disks."""
    client = AzureClient(os.environ['AZURE_SUBSCRIPTION_ID'])
    
    orphans = list(client.get_unattached_disks())
    
    if not orphans:
        console.print("[green]No orphaned disks found![/green]")
        return
    
    total_size = sum(d['size_gb'] for d in orphans)
    console.print(f"Found {len(orphans)} orphaned disks ({total_size} GB)")
    
    for disk in orphans:
        console.print(f"  - {disk['name']} ({disk['size_gb']} GB)")
    
    if dry_run:
        console.print("\n[yellow]Dry run - no changes made[/yellow]")

if __name__ == '__main__':
    cli()

Lessons Learned

  1. Start with logging. Every script should have proper logging from day one.
  2. Handle failures gracefully. Network calls fail. SSH connections timeout. Plan for it.
  3. Use type hints. They make code readable and catch bugs early.
  4. Build reusable clients. API clients pay dividends across multiple scripts.
  5. Add --dry-run everywhere. Test destructive operations before running them.

Conclusion

Python transforms infrastructure work from repetitive tasks into reusable tools. Start with simple scripts, refactor into modules, and build toward a personal toolkit that makes you more effective.

The examples here are starting points—adapt them to your environment, add error handling, and iterate based on real-world usage.

Resources