Introduction
Every infrastructure engineer eventually hits the limits of bash scripts. Python bridges the gap between simple automation and complex orchestration—readable enough for quick scripts, powerful enough for production tools.
This guide covers practical Python patterns I use daily: API integrations, system administration, log parsing, and building command-line tools.
Setting Up for Infrastructure Work
Virtual Environments
Always isolate your projects:
# Create project
mkdir infra-scripts && cd infra-scripts
python3 -m venv venv
source venv/bin/activate
# Core infrastructure libraries
pip install requests paramiko boto3 azure-identity pyyaml click rich
pip freeze > requirements.txt
Project Structure
infra-scripts/
├── src/
│ ├── __init__.py
│ ├── cli.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── zabbix.py
│ │ └── azure.py
│ ├── utils/
│ │ ├── __init__.py
│ │ ├── ssh.py
│ │ └── logging.py
│ └── tasks/
│ ├── __init__.py
│ ├── backup.py
│ └── cleanup.py
├── tests/
├── requirements.txt
└── pyproject.toml
API Integrations
Base API Client
# src/api/base.py
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import logging
logger = logging.getLogger(__name__)
class APIClient:
"""Base API client with retry logic and error handling."""
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.session = self._create_session()
def _create_session(self) -> requests.Session:
session = requests.Session()
# Retry strategy for resilience
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _request(self, method: str, endpoint: str, **kwargs) -> dict:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
kwargs.setdefault('timeout', self.timeout)
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json() if response.content else {}
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
raise
def get(self, endpoint: str, **kwargs) -> dict:
return self._request('GET', endpoint, **kwargs)
def post(self, endpoint: str, **kwargs) -> dict:
return self._request('POST', endpoint, **kwargs)
def put(self, endpoint: str, **kwargs) -> dict:
return self._request('PUT', endpoint, **kwargs)
def delete(self, endpoint: str, **kwargs) -> dict:
return self._request('DELETE', endpoint, **kwargs)
Zabbix API Client
# src/api/zabbix.py
from .base import APIClient
class ZabbixClient(APIClient):
"""Zabbix API client for monitoring automation."""
def __init__(self, url: str, user: str, password: str):
super().__init__(f"{url}/api_jsonrpc.php")
self.auth_token = None
self._authenticate(user, password)
def _authenticate(self, user: str, password: str):
response = self.post('', json={
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"username": user,
"password": password
},
"id": 1
})
self.auth_token = response.get('result')
def _call(self, method: str, params: dict = None) -> dict:
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"auth": self.auth_token,
"id": 1
}
return self.post('', json=payload).get('result', [])
def get_hosts(self, group_name: str = None) -> list:
params = {
"output": ["hostid", "host", "name", "status"],
"selectInterfaces": ["ip"],
"selectGroups": ["name"]
}
if group_name:
params["groupids"] = self._get_group_id(group_name)
return self._call("host.get", params)
def get_problems(self, severity_min: int = 3) -> list:
return self._call("problem.get", {
"output": "extend",
"selectHosts": ["host", "name"],
"severities": list(range(severity_min, 6)),
"recent": True,
"sortfield": ["eventid"],
"sortorder": "DESC"
})
def acknowledge_problem(self, event_id: str, message: str):
return self._call("event.acknowledge", {
"eventids": event_id,
"action": 6, # Close problem + add message
"message": message
})
def create_maintenance(self, name: str, host_ids: list,
duration_hours: int = 2) -> str:
import time
now = int(time.time())
return self._call("maintenance.create", {
"name": name,
"active_since": now,
"active_till": now + (duration_hours * 3600),
"hostids": host_ids,
"timeperiods": [{
"timeperiod_type": 0,
"start_date": now,
"period": duration_hours * 3600
}]
})
# Usage example
if __name__ == "__main__":
client = ZabbixClient(
url="https://zabbix.yourorg.com",
user="api-user",
password="secret"
)
# Get all active problems
problems = client.get_problems(severity_min=4)
for p in problems:
print(f"[{p['severity']}] {p['name']} on {p['hosts'][0]['name']}")
Azure Resource Management
# src/api/azure.py
from azure.identity import DefaultAzureCredential
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.resource import ResourceManagementClient
from typing import Iterator, Optional
import logging
logger = logging.getLogger(__name__)
class AzureClient:
"""Azure infrastructure management client."""
def __init__(self, subscription_id: str):
self.subscription_id = subscription_id
self.credential = DefaultAzureCredential()
self.compute = ComputeManagementClient(
self.credential, subscription_id
)
self.network = NetworkManagementClient(
self.credential, subscription_id
)
self.resource = ResourceManagementClient(
self.credential, subscription_id
)
def list_vms(self, resource_group: str = None) -> Iterator[dict]:
"""List all VMs with their details."""
if resource_group:
vms = self.compute.virtual_machines.list(resource_group)
else:
vms = self.compute.virtual_machines.list_all()
for vm in vms:
yield {
"name": vm.name,
"location": vm.location,
"size": vm.hardware_profile.vm_size,
"os_type": vm.storage_profile.os_disk.os_type,
"tags": vm.tags or {},
"resource_group": vm.id.split('/')[4]
}
def get_vm_status(self, resource_group: str, vm_name: str) -> str:
"""Get VM power state."""
instance = self.compute.virtual_machines.instance_view(
resource_group, vm_name
)
for status in instance.statuses:
if status.code.startswith('PowerState/'):
return status.code.split('/')[-1]
return 'unknown'
def start_vm(self, resource_group: str, vm_name: str):
"""Start a VM asynchronously."""
logger.info(f"Starting VM {vm_name}")
return self.compute.virtual_machines.begin_start(
resource_group, vm_name
)
def stop_vm(self, resource_group: str, vm_name: str,
deallocate: bool = True):
"""Stop a VM (deallocate to avoid charges)."""
logger.info(f"Stopping VM {vm_name}")
if deallocate:
return self.compute.virtual_machines.begin_deallocate(
resource_group, vm_name
)
return self.compute.virtual_machines.begin_power_off(
resource_group, vm_name
)
def resize_vm(self, resource_group: str, vm_name: str, new_size: str):
"""Resize a VM (requires restart)."""
vm = self.compute.virtual_machines.get(resource_group, vm_name)
vm.hardware_profile.vm_size = new_size
return self.compute.virtual_machines.begin_create_or_update(
resource_group, vm_name, vm
)
def get_unattached_disks(self) -> Iterator[dict]:
"""Find orphaned managed disks."""
for disk in self.compute.disks.list():
if disk.managed_by is None:
yield {
"name": disk.name,
"size_gb": disk.disk_size_gb,
"location": disk.location,
"resource_group": disk.id.split('/')[4]
}
# Usage example
if __name__ == "__main__":
client = AzureClient(subscription_id="your-sub-id")
# Find all stopped VMs
for vm in client.list_vms():
rg = vm['resource_group']
status = client.get_vm_status(rg, vm['name'])
if status == 'deallocated':
print(f"Stopped: {vm['name']} ({vm['size']})")
SSH Automation
Parallel Command Execution
# src/utils/ssh.py
import paramiko
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Tuple
import logging
logger = logging.getLogger(__name__)
@dataclass
class SSHResult:
hostname: str
exit_code: int
stdout: str
stderr: str
success: bool = True
error: str = None
class SSHClient:
"""SSH client for remote command execution."""
def __init__(self, key_path: str = None, username: str = 'root'):
self.key_path = key_path
self.username = username
def execute(self, hostname: str, command: str,
timeout: int = 30) -> SSHResult:
"""Execute command on remote host."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
connect_kwargs = {
'hostname': hostname,
'username': self.username,
'timeout': 10
}
if self.key_path:
connect_kwargs['key_filename'] = self.key_path
client.connect(**connect_kwargs)
stdin, stdout, stderr = client.exec_command(
command, timeout=timeout
)
exit_code = stdout.channel.recv_exit_status()
return SSHResult(
hostname=hostname,
exit_code=exit_code,
stdout=stdout.read().decode().strip(),
stderr=stderr.read().decode().strip(),
success=exit_code == 0
)
except Exception as e:
logger.error(f"SSH to {hostname} failed: {e}")
return SSHResult(
hostname=hostname,
exit_code=-1,
stdout='',
stderr='',
success=False,
error=str(e)
)
finally:
client.close()
def execute_parallel(self, hosts: List[str], command: str,
max_workers: int = 10) -> List[SSHResult]:
"""Execute command on multiple hosts in parallel."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(self.execute, host, command): host
for host in hosts
}
for future in as_completed(futures):
results.append(future.result())
return results
# Usage example
if __name__ == "__main__":
ssh = SSHClient(key_path="~/.ssh/id_rsa", username="ansible")
hosts = [
"web-01.prod.yourorg.com",
"web-02.prod.yourorg.com",
"web-03.prod.yourorg.com"
]
results = ssh.execute_parallel(hosts, "uptime")
for r in results:
if r.success:
print(f"✓ {r.hostname}: {r.stdout}")
else:
print(f"✗ {r.hostname}: {r.error}")
Log Parsing and Analysis
Structured Log Parser
# src/utils/logs.py
import re
from datetime import datetime
from collections import defaultdict
from typing import Iterator, Dict, Any
import gzip
class LogParser:
"""Parse and analyze log files."""
# Common log patterns
PATTERNS = {
'nginx_access': re.compile(
r'(?P<ip>\S+) - \S+ \[(?P<time>[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d+) (?P<size>\d+)'
),
'syslog': re.compile(
r'(?P<timestamp>\S+ \S+ \S+) (?P<host>\S+) '
r'(?P<program>\S+?)\[?(?P<pid>\d+)?\]?: (?P<message>.+)'
),
'auth': re.compile(
r'(?P<timestamp>\S+ \S+ \S+) (?P<host>\S+) '
r'sshd\[(?P<pid>\d+)\]: (?P<action>Accepted|Failed) '
r'(?P<method>\S+) for (?P<user>\S+) from (?P<ip>\S+)'
)
}
def __init__(self, pattern_name: str = 'nginx_access'):
self.pattern = self.PATTERNS[pattern_name]
def parse_file(self, filepath: str) -> Iterator[Dict[str, Any]]:
"""Parse log file and yield structured entries."""
opener = gzip.open if filepath.endswith('.gz') else open
with opener(filepath, 'rt') as f:
for line in f:
match = self.pattern.match(line.strip())
if match:
yield match.groupdict()
def analyze_nginx(self, filepath: str) -> Dict[str, Any]:
"""Analyze nginx access log."""
stats = {
'total_requests': 0,
'status_codes': defaultdict(int),
'top_paths': defaultdict(int),
'top_ips': defaultdict(int),
'errors': []
}
for entry in self.parse_file(filepath):
stats['total_requests'] += 1
stats['status_codes'][entry['status']] += 1
stats['top_paths'][entry['path']] += 1
stats['top_ips'][entry['ip']] += 1
if entry['status'].startswith(('4', '5')):
stats['errors'].append(entry)
# Convert to sorted lists
stats['top_paths'] = sorted(
stats['top_paths'].items(),
key=lambda x: x[1],
reverse=True
)[:10]
stats['top_ips'] = sorted(
stats['top_ips'].items(),
key=lambda x: x[1],
reverse=True
)[:10]
return stats
def find_failed_logins(self, filepath: str) -> Iterator[Dict]:
"""Find failed SSH login attempts."""
self.pattern = self.PATTERNS['auth']
for entry in self.parse_file(filepath):
if entry.get('action') == 'Failed':
yield entry
# Usage
if __name__ == "__main__":
parser = LogParser('nginx_access')
stats = parser.analyze_nginx('/var/log/nginx/access.log')
print(f"Total requests: {stats['total_requests']}")
print(f"\nStatus codes:")
for code, count in stats['status_codes'].items():
print(f" {code}: {count}")
Building CLI Tools
Click-based CLI
# src/cli.py
import click
from rich.console import Console
from rich.table import Table
from .api.zabbix import ZabbixClient
from .api.azure import AzureClient
console = Console()
@click.group()
@click.option('--verbose', '-v', is_flag=True, help='Verbose output')
@click.pass_context
def cli(ctx, verbose):
"""Infrastructure automation toolkit."""
ctx.ensure_object(dict)
ctx.obj['verbose'] = verbose
@cli.group()
def zabbix():
"""Zabbix monitoring commands."""
pass
@zabbix.command('problems')
@click.option('--severity', '-s', default=3, help='Minimum severity')
def zabbix_problems(severity):
"""List active Zabbix problems."""
client = ZabbixClient(
url=os.environ['ZABBIX_URL'],
user=os.environ['ZABBIX_USER'],
password=os.environ['ZABBIX_PASSWORD']
)
problems = client.get_problems(severity_min=severity)
if not problems:
console.print("[green]No active problems![/green]")
return
table = Table(title="Active Problems")
table.add_column("Severity", style="cyan")
table.add_column("Host", style="magenta")
table.add_column("Problem", style="white")
severity_colors = {
'3': 'yellow', '4': 'orange', '5': 'red'
}
for p in problems:
sev = p['severity']
color = severity_colors.get(sev, 'white')
table.add_row(
f"[{color}]{sev}[/{color}]",
p['hosts'][0]['name'] if p['hosts'] else 'N/A',
p['name']
)
console.print(table)
@cli.group()
def azure():
"""Azure resource commands."""
pass
@azure.command('vms')
@click.option('--resource-group', '-g', help='Filter by resource group')
@click.option('--stopped', is_flag=True, help='Show only stopped VMs')
def azure_vms(resource_group, stopped):
"""List Azure VMs."""
client = AzureClient(os.environ['AZURE_SUBSCRIPTION_ID'])
table = Table(title="Azure Virtual Machines")
table.add_column("Name", style="cyan")
table.add_column("Size", style="magenta")
table.add_column("Status", style="green")
table.add_column("Resource Group")
for vm in client.list_vms(resource_group):
status = client.get_vm_status(vm['resource_group'], vm['name'])
if stopped and status != 'deallocated':
continue
status_style = "green" if status == "running" else "red"
table.add_row(
vm['name'],
vm['size'],
f"[{status_style}]{status}[/{status_style}]",
vm['resource_group']
)
console.print(table)
@azure.command('cleanup-disks')
@click.option('--dry-run', is_flag=True, help='Show what would be deleted')
def cleanup_disks(dry_run):
"""Find and optionally delete orphaned disks."""
client = AzureClient(os.environ['AZURE_SUBSCRIPTION_ID'])
orphans = list(client.get_unattached_disks())
if not orphans:
console.print("[green]No orphaned disks found![/green]")
return
total_size = sum(d['size_gb'] for d in orphans)
console.print(f"Found {len(orphans)} orphaned disks ({total_size} GB)")
for disk in orphans:
console.print(f" - {disk['name']} ({disk['size_gb']} GB)")
if dry_run:
console.print("\n[yellow]Dry run - no changes made[/yellow]")
if __name__ == '__main__':
cli()
Lessons Learned
- Start with logging. Every script should have proper logging from day one.
- Handle failures gracefully. Network calls fail. SSH connections timeout. Plan for it.
- Use type hints. They make code readable and catch bugs early.
- Build reusable clients. API clients pay dividends across multiple scripts.
- Add
--dry-runeverywhere. Test destructive operations before running them.
Conclusion
Python transforms infrastructure work from repetitive tasks into reusable tools. Start with simple scripts, refactor into modules, and build toward a personal toolkit that makes you more effective.
The examples here are starting points—adapt them to your environment, add error handling, and iterate based on real-world usage.