Skip to content

Load Balancing and Traffic Management

Complete documentation of Orchestry's load balancing system, Nginx integration, and traffic management capabilities.

Overview

Orchestry uses Nginx as a dynamic load balancer to distribute traffic across application instances. The system provides:

  • Dynamic Configuration: Real-time updates without service interruption
  • Health-Aware Routing: Traffic only to healthy instances
  • Multiple Load Balancing Algorithms: Round-robin, least connections, IP hash
  • SSL Termination: HTTPS support with automatic certificate management
  • Connection Pooling: Efficient upstream connection management
  • Circuit Breaking: Protection against cascading failures
  • Request Routing: Path-based and header-based routing rules

Architecture Overview

Load Balancing Flow

┌─────────────────────┐    ┌─────────────────────┐    ┌─────────────────────┐
│    Client Request   │    │   Nginx Proxy       │    │  Application        │
│                     │    │                     │    │  Instances          │
│  • HTTP/HTTPS       │───►│  • Load balancing   │───►│                     │
│  • WebSocket        │    │  • Health checks    │    │  • Instance 1       │
│  • API calls        │    │  • SSL termination  │    │  • Instance 2       │
└─────────────────────┘    │  • Request routing  │    │  • Instance N       │
                           └─────────────────────┘    └─────────────────────┘
                           ┌─────────────────────┐
                           │   Configuration     │
                           │   Management        │
                           │                     │
                           │  • Dynamic updates  │
                           │  • Health status    │
                           │  • Routing rules    │
                           └─────────────────────┘

Component Integration

class LoadBalancingSystem:
    """Orchestrates load balancing components."""

    def __init__(self):
        self.nginx_manager = NginxManager()
        self.upstream_manager = UpstreamManager()
        self.health_monitor = HealthMonitor()
        self.ssl_manager = SSLManager()
        self.metrics_collector = LoadBalancerMetrics()

    async def initialize(self):
        """Initialize the load balancing system."""
        await self.nginx_manager.initialize()
        await self.upstream_manager.initialize()
        await self.health_monitor.start()
        await self.ssl_manager.initialize()

    async def update_application_routing(self, app_name: str, instances: List[InstanceRecord]):
        """Update routing configuration for an application."""
        # Generate upstream configuration
        upstream_config = await self.upstream_manager.generate_upstream_config(app_name, instances)

        # Update Nginx configuration
        await self.nginx_manager.update_upstream(app_name, upstream_config)

        # Reload Nginx configuration
        await self.nginx_manager.reload_config()

        # Update health monitoring
        await self.health_monitor.update_targets(app_name, instances)

Nginx Configuration Management

Dynamic Configuration Generator

class NginxConfigGenerator:
    """Generates Nginx configuration files dynamically."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.template_loader = jinja2.FileSystemLoader('configs/nginx/')
        self.template_env = jinja2.Environment(loader=self.template_loader)

    async def generate_main_config(self, applications: List[str]) -> str:
        """Generate main Nginx configuration."""
        template = self.template_env.get_template('nginx-main.conf')

        return template.render(
            worker_processes=self.config.get('worker_processes', 'auto'),
            worker_connections=self.config.get('worker_connections', 1024),
            keepalive_timeout=self.config.get('keepalive_timeout', 65),
            client_max_body_size=self.config.get('client_max_body_size', '10m'),
            proxy_connect_timeout=self.config.get('proxy_connect_timeout', '60s'),
            proxy_send_timeout=self.config.get('proxy_send_timeout', '60s'),
            proxy_read_timeout=self.config.get('proxy_read_timeout', '60s'),
            proxy_buffer_size=self.config.get('proxy_buffer_size', '4k'),
            proxy_buffers=self.config.get('proxy_buffers', '8 4k'),
            applications=applications,
            timestamp=datetime.now().isoformat()
        )

    async def generate_upstream_config(self, app_name: str, instances: List[InstanceRecord], 
                                     lb_method: str = 'round_robin') -> str:
        """Generate upstream configuration for an application."""
        template = self.template_env.get_template('upstream.conf')

        # Filter healthy instances
        healthy_instances = [
            instance for instance in instances 
            if instance.status == 'running' and instance.health_status == 'healthy'
        ]

        # Prepare server entries
        servers = []
        for instance in healthy_instances:
            server_config = {
                'address': f"{instance.ip}:{instance.port}",
                'weight': self._calculate_server_weight(instance),
                'max_fails': self.config.get('max_fails', 3),
                'fail_timeout': self.config.get('fail_timeout', '30s'),
                'max_conns': self._calculate_max_connections(instance)
            }

            # Add server-specific parameters
            if instance.consecutive_failures > 0:
                server_config['backup'] = True

            servers.append(server_config)

        return template.render(
            app_name=app_name,
            lb_method=lb_method,
            servers=servers,
            keepalive=self.config.get('upstream_keepalive', 32),
            keepalive_requests=self.config.get('keepalive_requests', 100),
            keepalive_timeout=self.config.get('upstream_keepalive_timeout', '60s')
        )

    async def generate_server_config(self, app_spec: Dict[str, Any]) -> str:
        """Generate server block configuration for an application."""
        template = self.template_env.get_template('server.conf')

        # Extract configuration from app spec
        app_name = app_spec['metadata']['name']
        networking = app_spec['spec'].get('networking', {})

        # Determine server configuration
        server_config = {
            'app_name': app_name,
            'listen_port': networking.get('external_port', 80),
            'server_name': networking.get('domain', f"{app_name}.orchestry.local"),
            'ssl_enabled': networking.get('ssl', {}).get('enabled', False),
            'ssl_cert_path': f"/etc/ssl/certs/{app_name}.crt",
            'ssl_key_path': f"/etc/ssl/private/{app_name}.key",
            'proxy_pass': f"http://{app_name}_upstream",
            'access_log': f"/var/log/nginx/{app_name}_access.log",
            'error_log': f"/var/log/nginx/{app_name}_error.log",
            'client_max_body_size': networking.get('max_body_size', '10m'),
            'proxy_timeout': networking.get('timeout', '60s')
        }

        # Add custom headers and rules
        custom_headers = networking.get('headers', {})
        location_rules = networking.get('locations', [])

        return template.render(
            **server_config,
            custom_headers=custom_headers,
            location_rules=location_rules,
            health_check_path='/_health',
            status_check_path='/_status'
        )

    def _calculate_server_weight(self, instance: InstanceRecord) -> int:
        """Calculate server weight based on performance metrics."""
        base_weight = 1

        # Adjust based on CPU usage
        if instance.cpu_percent < 30:
            base_weight += 2
        elif instance.cpu_percent > 70:
            base_weight -= 1

        # Adjust based on memory usage
        if instance.memory_percent < 50:
            base_weight += 1
        elif instance.memory_percent > 80:
            base_weight -= 1

        # Adjust based on failure history
        if instance.consecutive_failures > 0:
            base_weight = max(1, base_weight - instance.consecutive_failures)

        return max(1, base_weight)

    def _calculate_max_connections(self, instance: InstanceRecord) -> int:
        """Calculate maximum connections for an instance."""
        # Base connection limit
        base_limit = self.config.get('default_max_conns', 100)

        # Adjust based on resource usage
        if instance.memory_percent > 80:
            return int(base_limit * 0.5)
        elif instance.cpu_percent > 80:
            return int(base_limit * 0.7)
        else:
            return base_limit

Nginx Template System

Main Configuration Template (nginx-main.conf):

# Orchestry Nginx Configuration
# Generated at: {{ timestamp }}

user nginx;
worker_processes {{ worker_processes }};
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;

events {
    worker_connections {{ worker_connections }};
    use epoll;
    multi_accept on;
}

http {
    # Basic settings
    include /etc/nginx/mime.types;
    default_type application/octet-stream;

    # Logging format
    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                    '$status $body_bytes_sent "$http_referer" '
                    '"$http_user_agent" "$http_x_forwarded_for" '
                    'rt=$request_time uct="$upstream_connect_time" '
                    'uht="$upstream_header_time" urt="$upstream_response_time"';

    # Performance optimizations
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;
    keepalive_timeout {{ keepalive_timeout }};
    types_hash_max_size 2048;
    client_max_body_size {{ client_max_body_size }};

    # Proxy settings
    proxy_connect_timeout {{ proxy_connect_timeout }};
    proxy_send_timeout {{ proxy_send_timeout }};
    proxy_read_timeout {{ proxy_read_timeout }};
    proxy_buffer_size {{ proxy_buffer_size }};
    proxy_buffers {{ proxy_buffers }};
    proxy_busy_buffers_size 8k;
    proxy_temp_file_write_size 8k;

    # Compression
    gzip on;
    gzip_vary on;
    gzip_min_length 1024;
    gzip_proxied any;
    gzip_comp_level 6;
    gzip_types
        text/plain
        text/css
        text/xml
        text/javascript
        application/json
        application/javascript
        application/xml+rss
        application/atom+xml
        image/svg+xml;

    # Rate limiting zones
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
    limit_req_zone $binary_remote_addr zone=login:10m rate=1r/s;

    # Connection limiting
    limit_conn_zone $binary_remote_addr zone=conn_limit_per_ip:10m;

    # Include upstream configurations
    {% for app in applications %}
    include /etc/nginx/conf.d/{{ app }}_upstream.conf;
    {% endfor %}

    # Include server configurations  
    {% for app in applications %}
    include /etc/nginx/conf.d/{{ app }}_server.conf;
    {% endfor %}

    # Default server (catch-all)
    include /etc/nginx/conf.d/default.conf;
}

Upstream Configuration Template (upstream.conf):

# Upstream configuration for {{ app_name }}
upstream {{ app_name }}_upstream {
    {% if lb_method == 'least_conn' %}
    least_conn;
    {% elif lb_method == 'ip_hash' %}
    ip_hash;
    {% elif lb_method == 'hash' %}
    hash $request_uri consistent;
    {% endif %}

    {% for server in servers %}
    server {{ server.address }}{% if server.weight != 1 %} weight={{ server.weight }}{% endif %}{% if server.max_fails %} max_fails={{ server.max_fails }}{% endif %}{% if server.fail_timeout %} fail_timeout={{ server.fail_timeout }}{% endif %}{% if server.max_conns %} max_conns={{ server.max_conns }}{% endif %}{% if server.backup %} backup{% endif %};
    {% endfor %}

    # Connection pooling
    keepalive {{ keepalive }};
    keepalive_requests {{ keepalive_requests }};
    keepalive_timeout {{ keepalive_timeout }};
}

Server Configuration Template (server.conf):

# Server configuration for {{ app_name }}
server {
    listen {{ listen_port }}{% if ssl_enabled %} ssl http2{% endif %};
    server_name {{ server_name }};

    {% if ssl_enabled %}
    # SSL configuration
    ssl_certificate {{ ssl_cert_path }};
    ssl_certificate_key {{ ssl_key_path }};
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384;
    ssl_prefer_server_ciphers off;
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    {% endif %}

    # Logging
    access_log {{ access_log }} main;
    error_log {{ error_log }};

    # Basic settings
    client_max_body_size {{ client_max_body_size }};

    # Custom headers
    {% for header, value in custom_headers.items() %}
    add_header {{ header }} "{{ value }}" always;
    {% endfor %}

    # Health check endpoint
    location {{ health_check_path }} {
        access_log off;
        return 200 "healthy\n";
        add_header Content-Type text/plain;
    }

    # Status endpoint  
    location {{ status_check_path }} {
        access_log off;
        stub_status on;
        allow 127.0.0.1;
        deny all;
    }

    # Custom location rules
    {% for location in location_rules %}
    location {{ location.path }} {
        {% for directive in location.directives %}
        {{ directive }};
        {% endfor %}
    }
    {% endfor %}

    # Main application proxy
    location / {
        # Rate limiting
        limit_req zone=api burst=20 nodelay;
        limit_conn conn_limit_per_ip 20;

        # Proxy headers
        proxy_pass {{ proxy_pass }};
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection 'upgrade';
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Forwarded-Host $server_name;
        proxy_set_header X-Forwarded-Port $server_port;

        # Timeouts
        proxy_connect_timeout {{ proxy_timeout }};
        proxy_send_timeout {{ proxy_timeout }};
        proxy_read_timeout {{ proxy_timeout }};

        # Buffering
        proxy_buffering on;
        proxy_buffer_size 4k;
        proxy_buffers 8 4k;

        # Cache control
        proxy_cache_bypass $http_upgrade;
        proxy_no_cache $http_upgrade;
    }
}

Upstream Management

Dynamic Upstream Updates

class UpstreamManager:
    """Manages Nginx upstream configurations dynamically."""

    def __init__(self, nginx_config_dir: str = "/etc/nginx/conf.d"):
        self.config_dir = nginx_config_dir
        self.config_generator = NginxConfigGenerator({})
        self.active_upstreams: Dict[str, List[str]] = {}
        self.upstream_lock = asyncio.Lock()

    async def update_upstream(self, app_name: str, instances: List[InstanceRecord], 
                            lb_method: str = 'round_robin') -> bool:
        """Update upstream configuration for an application."""
        async with self.upstream_lock:
            try:
                # Generate new upstream configuration
                config_content = await self.config_generator.generate_upstream_config(
                    app_name, instances, lb_method
                )

                # Write configuration file
                config_file = os.path.join(self.config_dir, f"{app_name}_upstream.conf")
                async with aiofiles.open(config_file, 'w') as f:
                    await f.write(config_content)

                # Track current servers
                current_servers = [f"{instance.ip}:{instance.port}" for instance in instances 
                                 if instance.status == 'running' and instance.health_status == 'healthy']

                # Log changes
                previous_servers = self.active_upstreams.get(app_name, [])
                added_servers = set(current_servers) - set(previous_servers)
                removed_servers = set(previous_servers) - set(current_servers)

                if added_servers:
                    logger.info(f"Added servers to {app_name}: {added_servers}")
                if removed_servers:
                    logger.info(f"Removed servers from {app_name}: {removed_servers}")

                self.active_upstreams[app_name] = current_servers

                return True

            except Exception as e:
                logger.error(f"Failed to update upstream for {app_name}: {e}")
                return False

    async def add_server_to_upstream(self, app_name: str, server_address: str, 
                                   weight: int = 1, max_fails: int = 3) -> bool:
        """Add a single server to an existing upstream."""
        try:
            # Read current configuration
            config_file = os.path.join(self.config_dir, f"{app_name}_upstream.conf")

            if not os.path.exists(config_file):
                logger.warning(f"Upstream config for {app_name} does not exist")
                return False

            async with aiofiles.open(config_file, 'r') as f:
                content = await f.read()

            # Parse and modify configuration
            lines = content.split('\n')
            modified_lines = []
            inside_upstream = False

            for line in lines:
                modified_lines.append(line)

                if f'upstream {app_name}_upstream' in line:
                    inside_upstream = True
                elif inside_upstream and line.strip().startswith('server'):
                    # Check if this is the last server line
                    continue
                elif inside_upstream and line.strip() == '}':
                    # Add new server before closing brace
                    server_line = f"    server {server_address}"
                    if weight != 1:
                        server_line += f" weight={weight}"
                    if max_fails != 3:
                        server_line += f" max_fails={max_fails}"
                    server_line += ";"

                    modified_lines.insert(-1, server_line)
                    inside_upstream = False

            # Write updated configuration
            async with aiofiles.open(config_file, 'w') as f:
                await f.write('\n'.join(modified_lines))

            return True

        except Exception as e:
            logger.error(f"Failed to add server to {app_name}: {e}")
            return False

    async def remove_server_from_upstream(self, app_name: str, server_address: str) -> bool:
        """Remove a server from an existing upstream."""
        try:
            config_file = os.path.join(self.config_dir, f"{app_name}_upstream.conf")

            if not os.path.exists(config_file):
                return False

            async with aiofiles.open(config_file, 'r') as f:
                content = await f.read()

            # Remove server line
            lines = content.split('\n')
            filtered_lines = [
                line for line in lines 
                if not (line.strip().startswith('server') and server_address in line)
            ]

            # Write updated configuration
            async with aiofiles.open(config_file, 'w') as f:
                await f.write('\n'.join(filtered_lines))

            return True

        except Exception as e:
            logger.error(f"Failed to remove server from {app_name}: {e}")
            return False

    async def get_upstream_status(self, app_name: str) -> Dict[str, Any]:
        """Get current upstream status and server information."""
        try:
            # Use Nginx Plus API if available, otherwise parse config
            if self._has_nginx_plus_api():
                return await self._get_upstream_status_from_api(app_name)
            else:
                return await self._get_upstream_status_from_config(app_name)

        except Exception as e:
            logger.error(f"Failed to get upstream status for {app_name}: {e}")
            return {}

    async def _get_upstream_status_from_config(self, app_name: str) -> Dict[str, Any]:
        """Get upstream status by parsing configuration file."""
        config_file = os.path.join(self.config_dir, f"{app_name}_upstream.conf")

        if not os.path.exists(config_file):
            return {'error': 'Upstream configuration not found'}

        async with aiofiles.open(config_file, 'r') as f:
            content = await f.read()

        servers = []
        for line in content.split('\n'):
            if line.strip().startswith('server'):
                # Parse server line
                parts = line.strip().split()
                if len(parts) >= 2:
                    address = parts[1].rstrip(';')

                    server_info = {
                        'address': address,
                        'weight': 1,
                        'max_fails': 3,
                        'fail_timeout': '10s',
                        'backup': False,
                        'down': False
                    }

                    # Parse additional parameters
                    for part in parts[2:]:
                        if part.startswith('weight='):
                            server_info['weight'] = int(part.split('=')[1])
                        elif part.startswith('max_fails='):
                            server_info['max_fails'] = int(part.split('=')[1])
                        elif part.startswith('fail_timeout='):
                            server_info['fail_timeout'] = part.split('=')[1]
                        elif part == 'backup':
                            server_info['backup'] = True
                        elif part == 'down':
                            server_info['down'] = True

                    servers.append(server_info)

        return {
            'upstream': f"{app_name}_upstream",
            'servers': servers,
            'total_servers': len(servers),
            'active_servers': len([s for s in servers if not s['down']]),
            'backup_servers': len([s for s in servers if s['backup']])
        }

Load Balancing Algorithms

class LoadBalancingAlgorithm(Enum):
    """Available load balancing algorithms."""
    ROUND_ROBIN = "round_robin"
    LEAST_CONNECTIONS = "least_conn"
    IP_HASH = "ip_hash"
    HASH = "hash"
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
    LEAST_TIME = "least_time"

class LoadBalancingStrategy:
    """Determines optimal load balancing strategy for applications."""

    @staticmethod
    def recommend_algorithm(app_spec: Dict[str, Any], 
                          performance_metrics: Dict[str, Any]) -> LoadBalancingAlgorithm:
        """Recommend load balancing algorithm based on application characteristics."""

        # Check if session affinity is required
        if app_spec.get('spec', {}).get('session_affinity', False):
            return LoadBalancingAlgorithm.IP_HASH

        # For applications with stateful operations
        if app_spec.get('spec', {}).get('stateful', False):
            return LoadBalancingAlgorithm.HASH

        # For high-throughput APIs
        avg_rps = performance_metrics.get('avg_rps', 0)
        if avg_rps > 1000:
            return LoadBalancingAlgorithm.LEAST_CONNECTIONS

        # For applications with varying response times
        response_time_variance = performance_metrics.get('response_time_variance', 0)
        if response_time_variance > 100:  # High variance in response times
            return LoadBalancingAlgorithm.LEAST_TIME

        # Default to round robin
        return LoadBalancingAlgorithm.ROUND_ROBIN

    @staticmethod
    def get_nginx_directive(algorithm: LoadBalancingAlgorithm, 
                          params: Dict[str, Any] = None) -> str:
        """Get Nginx directive for load balancing algorithm."""
        params = params or {}

        if algorithm == LoadBalancingAlgorithm.LEAST_CONNECTIONS:
            return "least_conn;"
        elif algorithm == LoadBalancingAlgorithm.IP_HASH:
            return "ip_hash;"
        elif algorithm == LoadBalancingAlgorithm.HASH:
            hash_key = params.get('hash_key', '$request_uri')
            consistent = "consistent" if params.get('consistent', True) else ""
            return f"hash {hash_key} {consistent};"
        elif algorithm == LoadBalancingAlgorithm.LEAST_TIME:
            # Nginx Plus feature
            return "least_time header;"
        else:
            return ""  # Round robin is default, no directive needed

SSL/TLS Management

Certificate Management

class SSLManager:
    """Manages SSL certificates for applications."""

    def __init__(self, cert_dir: str = "/etc/ssl/orchestry"):
        self.cert_dir = cert_dir
        self.ca_client = None  # ACME client for Let's Encrypt

    async def initialize(self):
        """Initialize SSL management."""
        os.makedirs(self.cert_dir, exist_ok=True)
        os.makedirs(f"{self.cert_dir}/private", mode=0o700, exist_ok=True)
        os.makedirs(f"{self.cert_dir}/certs", exist_ok=True)

    async def provision_certificate(self, domain: str, app_name: str) -> bool:
        """Provision SSL certificate for a domain."""
        try:
            cert_path = f"{self.cert_dir}/certs/{app_name}.crt"
            key_path = f"{self.cert_dir}/private/{app_name}.key"

            # Check if certificate already exists and is valid
            if await self._is_certificate_valid(cert_path, domain):
                logger.info(f"Valid certificate already exists for {domain}")
                return True

            # Generate certificate using ACME (Let's Encrypt)
            if self.config.get('ssl_provider') == 'letsencrypt':
                return await self._provision_letsencrypt_certificate(domain, cert_path, key_path)
            else:
                # Generate self-signed certificate for development
                return await self._generate_self_signed_certificate(domain, cert_path, key_path)

        except Exception as e:
            logger.error(f"Failed to provision certificate for {domain}: {e}")
            return False

    async def _provision_letsencrypt_certificate(self, domain: str, cert_path: str, key_path: str) -> bool:
        """Provision certificate from Let's Encrypt."""
        try:
            # Use certbot or acme library
            cmd = [
                'certbot', 'certonly',
                '--webroot',
                '--webroot-path', '/var/www/html',
                '--email', self.config.get('ssl_email', 'admin@example.com'),
                '--agree-tos',
                '--non-interactive',
                '--domain', domain,
                '--cert-path', cert_path,
                '--key-path', key_path
            ]

            process = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )

            stdout, stderr = await process.communicate()

            if process.returncode == 0:
                logger.info(f"Successfully provisioned Let's Encrypt certificate for {domain}")
                return True
            else:
                logger.error(f"Certbot failed: {stderr.decode()}")
                return False

        except Exception as e:
            logger.error(f"Let's Encrypt provisioning failed: {e}")
            return False

    async def _generate_self_signed_certificate(self, domain: str, cert_path: str, key_path: str) -> bool:
        """Generate self-signed certificate for development."""
        try:
            # Generate private key
            key_cmd = [
                'openssl', 'genrsa',
                '-out', key_path,
                '2048'
            ]

            await asyncio.create_subprocess_exec(*key_cmd)

            # Generate certificate
            cert_cmd = [
                'openssl', 'req',
                '-new', '-x509',
                '-key', key_path,
                '-out', cert_path,
                '-days', '365',
                '-subj', f'/CN={domain}'
            ]

            process = await asyncio.create_subprocess_exec(*cert_cmd)
            await process.communicate()

            if process.returncode == 0:
                logger.info(f"Generated self-signed certificate for {domain}")
                return True
            else:
                return False

        except Exception as e:
            logger.error(f"Self-signed certificate generation failed: {e}")
            return False

    async def _is_certificate_valid(self, cert_path: str, domain: str) -> bool:
        """Check if certificate exists and is valid."""
        if not os.path.exists(cert_path):
            return False

        try:
            # Check certificate expiration
            cmd = [
                'openssl', 'x509',
                '-in', cert_path,
                '-noout',
                '-dates'
            ]

            process = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )

            stdout, stderr = await process.communicate()

            if process.returncode != 0:
                return False

            # Parse expiration date
            output = stdout.decode()
            for line in output.split('\n'):
                if line.startswith('notAfter='):
                    expire_str = line.split('=', 1)[1]
                    expire_date = datetime.strptime(expire_str, '%b %d %H:%M:%S %Y %Z')

                    # Check if certificate expires within 30 days
                    if expire_date - datetime.now() < timedelta(days=30):
                        return False

            return True

        except Exception as e:
            logger.error(f"Certificate validation failed: {e}")
            return False

    async def renew_certificates(self) -> Dict[str, bool]:
        """Renew expiring certificates."""
        renewal_results = {}

        # Find all certificates
        cert_dir = f"{self.cert_dir}/certs"
        if not os.path.exists(cert_dir):
            return renewal_results

        for cert_file in os.listdir(cert_dir):
            if not cert_file.endswith('.crt'):
                continue

            app_name = cert_file[:-4]  # Remove .crt extension
            cert_path = os.path.join(cert_dir, cert_file)

            try:
                # Get domain from certificate
                domain = await self._get_certificate_domain(cert_path)

                # Check if renewal is needed
                if not await self._is_certificate_valid(cert_path, domain):
                    logger.info(f"Renewing certificate for {domain}")
                    key_path = f"{self.cert_dir}/private/{app_name}.key"
                    success = await self.provision_certificate(domain, app_name)
                    renewal_results[domain] = success
                else:
                    renewal_results[domain] = True  # Still valid

            except Exception as e:
                logger.error(f"Certificate renewal failed for {app_name}: {e}")
                renewal_results[app_name] = False

        return renewal_results

Traffic Routing and Rules

Advanced Routing Configuration

class TrafficRouter:
    """Manages advanced traffic routing rules."""

    def __init__(self):
        self.routing_rules: Dict[str, List[RoutingRule]] = {}

    async def add_routing_rule(self, app_name: str, rule: RoutingRule):
        """Add a traffic routing rule for an application."""
        if app_name not in self.routing_rules:
            self.routing_rules[app_name] = []

        self.routing_rules[app_name].append(rule)
        await self._update_nginx_routing(app_name)

    async def remove_routing_rule(self, app_name: str, rule_id: str):
        """Remove a traffic routing rule."""
        if app_name in self.routing_rules:
            self.routing_rules[app_name] = [
                rule for rule in self.routing_rules[app_name] 
                if rule.id != rule_id
            ]
            await self._update_nginx_routing(app_name)

    async def _update_nginx_routing(self, app_name: str):
        """Update Nginx configuration with routing rules."""
        rules = self.routing_rules.get(app_name, [])

        # Generate location blocks for each rule
        location_blocks = []
        for rule in rules:
            location_block = self._generate_location_block(rule)
            location_blocks.append(location_block)

        # Update server configuration
        await self._inject_location_blocks(app_name, location_blocks)

    def _generate_location_block(self, rule: RoutingRule) -> str:
        """Generate Nginx location block for a routing rule."""
        if rule.type == RoutingType.PATH:
            location = f'location {rule.path}'
        elif rule.type == RoutingType.REGEX:
            location = f'location ~ {rule.pattern}'
        elif rule.type == RoutingType.EXACT:
            location = f'location = {rule.path}'
        else:
            location = f'location {rule.path}'

        directives = []

        # Add header-based routing
        if rule.headers:
            for header, value in rule.headers.items():
                if rule.header_match_type == HeaderMatchType.EXACT:
                    directives.append(f'if ($http_{header.lower().replace("-", "_")} != "{value}") {{ return 404; }}')
                elif rule.header_match_type == HeaderMatchType.REGEX:
                    directives.append(f'if ($http_{header.lower().replace("-", "_")} !~ "{value}") {{ return 404; }}')

        # Add weight-based routing (A/B testing)
        if rule.weight_percentage and rule.weight_percentage < 100:
            directives.append(f'split_clients $request_id $variant {{')
            directives.append(f'    {rule.weight_percentage}% "primary";')
            directives.append(f'    * "secondary";')
            directives.append(f'}}')
            directives.append(f'if ($variant = "secondary") {{ proxy_pass {rule.secondary_upstream}; }}')

        # Add rate limiting
        if rule.rate_limit:
            directives.append(f'limit_req zone={rule.rate_limit.zone} burst={rule.rate_limit.burst}')

        # Add custom headers
        if rule.response_headers:
            for header, value in rule.response_headers.items():
                directives.append(f'add_header {header} "{value}" always;')

        # Add proxy configuration
        if rule.upstream:
            directives.append(f'proxy_pass {rule.upstream};')
        else:
            directives.append(f'proxy_pass http://{rule.app_name}_upstream;')

        # Add proxy headers
        directives.extend([
            'proxy_set_header Host $host;',
            'proxy_set_header X-Real-IP $remote_addr;',
            'proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;',
            'proxy_set_header X-Forwarded-Proto $scheme;'
        ])

        # Build complete location block
        block = f'{location} {{\n'
        for directive in directives:
            block += f'    {directive}\n'
        block += '}\n'

        return block

@dataclass
class RoutingRule:
    """Traffic routing rule configuration."""
    id: str
    app_name: str
    type: RoutingType
    path: Optional[str] = None
    pattern: Optional[str] = None
    headers: Optional[Dict[str, str]] = None
    header_match_type: HeaderMatchType = HeaderMatchType.EXACT
    weight_percentage: Optional[int] = None
    secondary_upstream: Optional[str] = None
    rate_limit: Optional[RateLimitConfig] = None
    response_headers: Optional[Dict[str, str]] = None
    upstream: Optional[str] = None
    priority: int = 100
    enabled: bool = True

class RoutingType(Enum):
    """Types of routing rules."""
    PATH = "path"
    REGEX = "regex"
    EXACT = "exact"
    PREFIX = "prefix"

class HeaderMatchType(Enum):
    """Header matching types."""
    EXACT = "exact"
    REGEX = "regex"
    EXISTS = "exists"

@dataclass
class RateLimitConfig:
    """Rate limiting configuration."""
    zone: str
    burst: int = 10
    nodelay: bool = True

Monitoring and Metrics

Load Balancer Metrics

class LoadBalancerMetrics:
    """Collects and reports load balancer metrics."""

    async def collect_nginx_metrics(self) -> Dict[str, Any]:
        """Collect metrics from Nginx."""
        metrics = {}

        # Basic Nginx metrics
        nginx_status = await self._get_nginx_status()
        metrics.update(nginx_status)

        # Upstream metrics
        for app_name in self.active_applications:
            upstream_metrics = await self._get_upstream_metrics(app_name)
            metrics[f"upstream_{app_name}"] = upstream_metrics

        # Connection metrics
        connection_metrics = await self._get_connection_metrics()
        metrics["connections"] = connection_metrics

        # Request metrics
        request_metrics = await self._get_request_metrics()
        metrics["requests"] = request_metrics

        return metrics

    async def _get_nginx_status(self) -> Dict[str, Any]:
        """Get basic Nginx status metrics."""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get('http://localhost/nginx_status') as response:
                    text = await response.text()

                    # Parse Nginx status format
                    lines = text.strip().split('\n')

                    # Active connections
                    active_connections = int(lines[0].split(':')[1].strip())

                    # Server statistics
                    server_stats = lines[2].split()
                    accepts = int(server_stats[0])
                    handled = int(server_stats[1])
                    requests = int(server_stats[2])

                    # Reading, Writing, Waiting
                    conn_stats = lines[3].split()
                    reading = int(conn_stats[1])
                    writing = int(conn_stats[3])
                    waiting = int(conn_stats[5])

                    return {
                        'active_connections': active_connections,
                        'total_accepts': accepts,
                        'total_handled': handled,
                        'total_requests': requests,
                        'reading': reading,
                        'writing': writing,
                        'waiting': waiting,
                        'requests_per_connection': requests / handled if handled > 0 else 0
                    }

        except Exception as e:
            logger.error(f"Failed to get Nginx status: {e}")
            return {}

    async def _get_upstream_metrics(self, app_name: str) -> Dict[str, Any]:
        """Get metrics for a specific upstream."""
        # This would typically use Nginx Plus API or parse access logs
        # For now, return mock data structure
        return {
            'total_servers': 0,
            'active_servers': 0,
            'requests_per_second': 0,
            'response_time_avg': 0,
            'response_time_p95': 0,
            'error_rate': 0,
            'server_stats': []
        }

Next Steps: Learn about Development Setup and contribution guidelines.