Skip to content

Database Schema

Complete documentation of Orchestry's database schema, data models, and persistence layer.

Overview

Orchestry uses PostgreSQL as its primary datastore with an optional read replica for scalability. The database schema is designed for:

  • High Performance: Optimized queries with proper indexing
  • Scalability: Support for large numbers of applications and metrics
  • Consistency: ACID transactions for critical operations
  • Auditability: Complete event trail for all operations
  • Extensibility: Schema designed for future enhancements

Database Architecture

Primary-Replica Setup

┌─────────────────────────┐    ┌─────────────────────────┐
│    Primary Database     │    │    Replica Database     │
│                         │    │                         │
│  • Read/Write           │◄──►│  • Read Only            │
│  • Real-time data       │    │  • Analytics queries    │
│  • Critical operations  │    │  • Reporting            │
│  • Schema changes       │    │  • Backup source        │
└─────────────────────────┘    └─────────────────────────┘

Connection Management

class DatabaseManager:
    """Database connection and query management."""

    def __init__(self, config):
        self.primary_pool = None    # Read/write operations
        self.replica_pool = None    # Read-only operations
        self.config = config

    async def get_write_connection(self):
        """Get connection for write operations."""
        return await self.primary_pool.acquire()

    async def get_read_connection(self):
        """Get connection for read operations."""
        if self.replica_pool and self.config.get('prefer_replica'):
            return await self.replica_pool.acquire()
        return await self.primary_pool.acquire()

Core Tables

Applications Table

Stores application specifications and metadata.

CREATE TABLE applications (
    -- Primary identification
    name VARCHAR(253) PRIMARY KEY,              -- DNS-compatible app name

    -- Application specification
    spec JSONB NOT NULL,                        -- Complete app specification
    status VARCHAR(50) NOT NULL DEFAULT 'registered', -- Current status

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Scaling information
    replicas INTEGER DEFAULT 0,                 -- Current replica count
    desired_replicas INTEGER DEFAULT 0,         -- Desired replica count
    last_scaled_at TIMESTAMP WITH TIME ZONE,    -- Last scaling operation

    -- Configuration
    mode VARCHAR(20) DEFAULT 'auto',            -- Scaling mode (auto/manual)

    -- Constraints
    CONSTRAINT valid_status CHECK (status IN (
        'registered', 'starting', 'running', 'stopping', 'stopped', 'error', 'updating'
    )),
    CONSTRAINT valid_mode CHECK (mode IN ('auto', 'manual')),
    CONSTRAINT valid_replicas CHECK (replicas >= 0),
    CONSTRAINT valid_desired_replicas CHECK (desired_replicas >= 0)
);

-- Indexes for performance
CREATE INDEX idx_applications_status ON applications(status);
CREATE INDEX idx_applications_mode ON applications(mode);
CREATE INDEX idx_applications_updated_at ON applications(updated_at DESC);
CREATE INDEX idx_applications_spec_image ON applications USING GIN ((spec->'spec'->>'image'));

Application Status Values:

Status Description
registered Application spec stored, not yet started
starting Containers being created
running Application running normally
stopping Graceful shutdown in progress
stopped Application stopped
error Error state, manual intervention needed
updating Configuration or image update in progress

Instances Table

Tracks individual container instances for each application.

CREATE TABLE instances (
    -- Primary key
    id SERIAL PRIMARY KEY,

    -- Application relationship
    app_name VARCHAR(253) NOT NULL REFERENCES applications(name) ON DELETE CASCADE,

    -- Container identification
    container_id VARCHAR(128) UNIQUE NOT NULL,  -- Docker container ID
    container_name VARCHAR(253) NOT NULL,       -- Human-readable container name
    replica_index INTEGER NOT NULL,             -- Replica number (0, 1, 2, ...)

    -- Network configuration
    ip INET NOT NULL,                           -- Container IP address
    port INTEGER NOT NULL,                      -- Primary application port

    -- Status information
    status VARCHAR(50) NOT NULL DEFAULT 'starting',
    health_status VARCHAR(20) DEFAULT 'unknown', -- Health check status

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    started_at TIMESTAMP WITH TIME ZONE,        -- When container started
    last_health_check TIMESTAMP WITH TIME ZONE,

    -- Failure tracking
    failure_count INTEGER DEFAULT 0,
    consecutive_failures INTEGER DEFAULT 0,
    last_failure_at TIMESTAMP WITH TIME ZONE,

    -- Resource usage (latest values)
    cpu_percent REAL DEFAULT 0,
    memory_percent REAL DEFAULT 0,
    memory_usage_bytes BIGINT DEFAULT 0,

    -- Constraints
    CONSTRAINT valid_status CHECK (status IN (
        'starting', 'running', 'stopping', 'stopped', 'error', 'draining'
    )),
    CONSTRAINT valid_health_status CHECK (health_status IN (
        'unknown', 'healthy', 'unhealthy', 'starting'
    )),
    CONSTRAINT valid_replica_index CHECK (replica_index >= 0),
    CONSTRAINT valid_port CHECK (port > 0 AND port <= 65535),
    CONSTRAINT valid_failure_count CHECK (failure_count >= 0),
    CONSTRAINT valid_cpu_percent CHECK (cpu_percent >= 0),
    CONSTRAINT valid_memory_percent CHECK (memory_percent >= 0 AND memory_percent <= 100),

    -- Unique constraint for app + replica
    UNIQUE(app_name, replica_index)
);

-- Indexes for performance
CREATE INDEX idx_instances_app_name ON instances(app_name);
CREATE INDEX idx_instances_status ON instances(status);
CREATE INDEX idx_instances_health_status ON instances(health_status);
CREATE INDEX idx_instances_container_id ON instances(container_id);
CREATE INDEX idx_instances_updated_at ON instances(updated_at DESC);
CREATE INDEX idx_instances_app_status ON instances(app_name, status);
CREATE INDEX idx_instances_health_check ON instances(last_health_check DESC);

Instance Status Values:

Status Description
starting Container is being created/started
running Container is running normally
stopping Container is being gracefully stopped
stopped Container has stopped
error Container in error state
draining Container marked for removal, not receiving new traffic

Events Table

Comprehensive audit trail of all system events.

CREATE TABLE events (
    -- Primary key
    id SERIAL PRIMARY KEY,

    -- Event identification
    event_type VARCHAR(50) NOT NULL,            -- Type of event
    event_category VARCHAR(30) NOT NULL DEFAULT 'application', -- Event category

    -- Associated resources
    app_name VARCHAR(253) REFERENCES applications(name) ON DELETE SET NULL,
    container_id VARCHAR(128),                  -- Optional container reference

    -- Event details
    message TEXT NOT NULL,                      -- Human-readable message
    details JSONB,                              -- Structured event data
    severity VARCHAR(20) DEFAULT 'info',       -- Event severity level

    -- Timestamps
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Source information
    source VARCHAR(50) DEFAULT 'controller',    -- Component that generated event
    node_id VARCHAR(100),                       -- Cluster node ID (if applicable)

    -- Constraints
    CONSTRAINT valid_event_type CHECK (event_type IN (
        'registration', 'scaling', 'health', 'config', 'deployment', 
        'error', 'warning', 'network', 'resource', 'security'
    )),
    CONSTRAINT valid_event_category CHECK (event_category IN (
        'application', 'system', 'cluster', 'security', 'performance'
    )),
    CONSTRAINT valid_severity CHECK (severity IN (
        'debug', 'info', 'warning', 'error', 'critical'
    ))
);

-- Indexes for efficient querying
CREATE INDEX idx_events_app_name_timestamp ON events(app_name, timestamp DESC);
CREATE INDEX idx_events_type_timestamp ON events(event_type, timestamp DESC);
CREATE INDEX idx_events_timestamp ON events(timestamp DESC);
CREATE INDEX idx_events_severity ON events(severity, timestamp DESC);
CREATE INDEX idx_events_category ON events(event_category, timestamp DESC);
CREATE INDEX idx_events_container ON events(container_id, timestamp DESC);

-- Partial index for recent events (performance optimization)
CREATE INDEX idx_events_recent ON events(timestamp DESC) 
WHERE timestamp > NOW() - INTERVAL '7 days';

Event Types:

Type Description Example Details
registration App registered/updated {"image": "nginx:alpine", "replicas": 3}
scaling Scaling operation {"from": 2, "to": 5, "reason": "high_cpu"}
health Health status change {"status": "unhealthy", "failures": 3}
config Configuration change {"field": "replicas", "old": 2, "new": 3}
deployment Deployment operation {"action": "deploy", "version": "v1.2.0"}
error Error occurred {"error": "ImagePullBackOff", "code": "E001"}

Metrics Table (Time Series)

Historical performance and resource metrics.

CREATE TABLE metrics (
    -- Primary key
    id SERIAL PRIMARY KEY,

    -- Metric identification
    app_name VARCHAR(253) NOT NULL REFERENCES applications(name) ON DELETE CASCADE,
    metric_type VARCHAR(50) NOT NULL,           -- Type of metric

    -- Time series data
    timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    value REAL NOT NULL,                        -- Metric value

    -- Additional metadata
    labels JSONB,                               -- Key-value labels
    unit VARCHAR(20),                           -- Metric unit (%, bytes, ms, etc.)

    -- Aggregation level
    aggregation_level VARCHAR(20) DEFAULT 'instance', -- instance, app, system

    -- Constraints
    CONSTRAINT valid_metric_type CHECK (metric_type IN (
        'cpu_percent', 'memory_percent', 'memory_bytes', 'rps', 
        'latency_p50', 'latency_p95', 'latency_p99', 'active_connections',
        'error_rate', 'response_time', 'queue_length', 'disk_usage'
    )),
    CONSTRAINT valid_aggregation_level CHECK (aggregation_level IN (
        'instance', 'app', 'system'
    ))
);

-- Time series indexes (critical for performance)
CREATE INDEX idx_metrics_app_timestamp ON metrics(app_name, timestamp DESC);
CREATE INDEX idx_metrics_type_timestamp ON metrics(metric_type, timestamp DESC);
CREATE INDEX idx_metrics_timestamp ON metrics(timestamp DESC);
CREATE INDEX idx_metrics_app_type_time ON metrics(app_name, metric_type, timestamp DESC);

-- Partitioning for large datasets (optional)
-- CREATE TABLE metrics_2024_01 PARTITION OF metrics
-- FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

Scaling Policies Table

Stores scaling policies and configurations.

CREATE TABLE scaling_policies (
    -- Primary key
    app_name VARCHAR(253) PRIMARY KEY REFERENCES applications(name) ON DELETE CASCADE,

    -- Basic scaling parameters
    min_replicas INTEGER NOT NULL DEFAULT 1,
    max_replicas INTEGER NOT NULL DEFAULT 5,

    -- Performance targets
    target_rps_per_replica INTEGER DEFAULT 50,
    max_p95_latency_ms INTEGER DEFAULT 250,
    max_cpu_percent REAL DEFAULT 70.0,
    max_memory_percent REAL DEFAULT 75.0,
    max_connections_per_replica INTEGER DEFAULT 100,

    -- Scaling behavior
    scale_out_threshold_pct INTEGER DEFAULT 80,
    scale_in_threshold_pct INTEGER DEFAULT 30,
    cooldown_seconds INTEGER DEFAULT 180,
    evaluation_window_seconds INTEGER DEFAULT 60,

    -- Advanced settings
    max_scale_out_step INTEGER DEFAULT 0,       -- 0 means no limit
    max_scale_in_step INTEGER DEFAULT 1,
    stabilization_window_seconds INTEGER DEFAULT 300,

    -- Metric weights (for future use)
    cpu_weight REAL DEFAULT 1.0,
    memory_weight REAL DEFAULT 1.0,
    rps_weight REAL DEFAULT 1.0,
    latency_weight REAL DEFAULT 1.5,
    connection_weight REAL DEFAULT 0.8,

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Constraints
    CONSTRAINT valid_replica_bounds CHECK (min_replicas <= max_replicas),
    CONSTRAINT valid_min_replicas CHECK (min_replicas >= 0),
    CONSTRAINT valid_max_replicas CHECK (max_replicas >= 1),
    CONSTRAINT valid_thresholds CHECK (scale_in_threshold_pct < scale_out_threshold_pct),
    CONSTRAINT valid_percentages CHECK (
        scale_out_threshold_pct BETWEEN 1 AND 100 AND
        scale_in_threshold_pct BETWEEN 1 AND 100
    ),
    CONSTRAINT valid_weights CHECK (
        cpu_weight >= 0 AND memory_weight >= 0 AND rps_weight >= 0 AND
        latency_weight >= 0 AND connection_weight >= 0
    )
);

-- Index for policy lookups
CREATE INDEX idx_scaling_policies_updated_at ON scaling_policies(updated_at DESC);

Health Checks Table

Configuration and status of health checks.

CREATE TABLE health_checks (
    -- Primary key
    id SERIAL PRIMARY KEY,

    -- Application relationship
    app_name VARCHAR(253) NOT NULL REFERENCES applications(name) ON DELETE CASCADE,

    -- Health check configuration
    protocol VARCHAR(10) NOT NULL DEFAULT 'HTTP',
    path VARCHAR(500),                          -- HTTP path
    port INTEGER NOT NULL,
    method VARCHAR(10) DEFAULT 'GET',           -- HTTP method
    headers JSONB,                              -- HTTP headers
    body TEXT,                                  -- Request body
    expected_status_codes INTEGER[] DEFAULT ARRAY[200],

    -- Timing configuration
    initial_delay_seconds INTEGER DEFAULT 30,
    period_seconds INTEGER DEFAULT 30,
    timeout_seconds INTEGER DEFAULT 5,
    failure_threshold INTEGER DEFAULT 3,
    success_threshold INTEGER DEFAULT 1,

    -- Status
    enabled BOOLEAN DEFAULT true,

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Constraints
    CONSTRAINT valid_protocol CHECK (protocol IN ('HTTP', 'TCP')),
    CONSTRAINT valid_method CHECK (method IN ('GET', 'POST', 'PUT', 'HEAD')),
    CONSTRAINT valid_port CHECK (port > 0 AND port <= 65535),
    CONSTRAINT valid_timings CHECK (
        initial_delay_seconds >= 0 AND
        period_seconds > 0 AND
        timeout_seconds > 0 AND
        failure_threshold > 0 AND
        success_threshold > 0
    ),

    -- Unique health check per app
    UNIQUE(app_name)
);

-- Index for health check lookups
CREATE INDEX idx_health_checks_app_name ON health_checks(app_name);
CREATE INDEX idx_health_checks_enabled ON health_checks(enabled);

Data Models (Python)

Application Record

@dataclass
class AppRecord:
    """Application record data model."""
    name: str
    spec: Dict[str, Any]
    status: str
    created_at: float
    updated_at: float
    replicas: int = 0
    desired_replicas: int = 0
    last_scaled_at: Optional[float] = None
    mode: str = 'auto'

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            'name': self.name,
            'spec': self.spec,
            'status': self.status,
            'created_at': self.created_at,
            'updated_at': self.updated_at,
            'replicas': self.replicas,
            'desired_replicas': self.desired_replicas,
            'last_scaled_at': self.last_scaled_at,
            'mode': self.mode
        }

    @classmethod
    def from_db_row(cls, row) -> 'AppRecord':
        """Create from database row."""
        return cls(
            name=row['name'],
            spec=json.loads(row['spec']) if isinstance(row['spec'], str) else row['spec'],
            status=row['status'],
            created_at=row['created_at'].timestamp(),
            updated_at=row['updated_at'].timestamp(),
            replicas=row['replicas'],
            desired_replicas=row.get('desired_replicas', 0),
            last_scaled_at=row['last_scaled_at'].timestamp() if row['last_scaled_at'] else None,
            mode=row['mode']
        )

Instance Record

@dataclass
class InstanceRecord:
    """Container instance record data model."""
    id: Optional[int]
    app_name: str
    container_id: str
    container_name: str
    replica_index: int
    ip: str
    port: int
    status: str
    health_status: str
    created_at: float
    updated_at: float
    started_at: Optional[float] = None
    last_health_check: Optional[float] = None
    failure_count: int = 0
    consecutive_failures: int = 0
    last_failure_at: Optional[float] = None
    cpu_percent: float = 0.0
    memory_percent: float = 0.0
    memory_usage_bytes: int = 0

    def is_healthy(self) -> bool:
        """Check if instance is healthy."""
        return (self.status == 'running' and 
                self.health_status == 'healthy' and
                self.consecutive_failures < 3)

    def is_ready_for_traffic(self) -> bool:
        """Check if instance can receive traffic."""
        return (self.status == 'running' and 
                self.health_status in ['healthy', 'starting'] and
                self.consecutive_failures < 5)

Event Record

@dataclass
class EventRecord:
    """System event record data model."""
    id: Optional[int]
    event_type: str
    event_category: str
    app_name: Optional[str]
    container_id: Optional[str]
    message: str
    details: Optional[Dict[str, Any]]
    severity: str
    timestamp: float
    source: str = 'controller'
    node_id: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for API responses."""
        return {
            'id': self.id,
            'event_type': self.event_type,
            'event_category': self.event_category,
            'app_name': self.app_name,
            'container_id': self.container_id,
            'message': self.message,
            'details': self.details,
            'severity': self.severity,
            'timestamp': self.timestamp,
            'source': self.source,
            'node_id': self.node_id
        }

Database Operations

Connection Management

class DatabasePool:
    """Database connection pool manager."""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.primary_pool = None
        self.replica_pool = None
        self._locks = {
            'primary': asyncio.Lock(),
            'replica': asyncio.Lock()
        }

    async def initialize_primary(self):
        """Initialize primary database connection pool."""
        async with self._locks['primary']:
            if self.primary_pool is None:
                self.primary_pool = await asyncpg.create_pool(
                    host=self.config['primary']['host'],
                    port=self.config['primary']['port'],
                    user=self.config['primary']['user'],
                    password=self.config['primary']['password'],
                    database=self.config['primary']['database'],
                    min_size=self.config.get('pool_min_size', 5),
                    max_size=self.config.get('pool_max_size', 20),
                    command_timeout=self.config.get('command_timeout', 30),
                    server_settings={
                        'application_name': 'orchestry_controller',
                        'jit': 'off'  # Disable JIT for better predictability
                    }
                )

    async def initialize_replica(self):
        """Initialize replica database connection pool."""
        if not self.config.get('replica', {}).get('enabled', False):
            return

        async with self._locks['replica']:
            if self.replica_pool is None:
                try:
                    self.replica_pool = await asyncpg.create_pool(
                        host=self.config['replica']['host'],
                        port=self.config['replica']['port'],
                        user=self.config['replica']['user'],
                        password=self.config['replica']['password'],
                        database=self.config['replica']['database'],
                        min_size=self.config.get('pool_min_size', 3),
                        max_size=self.config.get('pool_max_size', 10),
                        command_timeout=self.config.get('command_timeout', 30),
                        server_settings={
                            'application_name': 'orchestry_controller_read',
                            'default_transaction_isolation': 'repeatable_read'
                        }
                    )
                except Exception as e:
                    logger.warning(f"Failed to initialize replica pool: {e}")

Query Patterns

class ApplicationQueries:
    """Optimized queries for application operations."""

    @staticmethod
    async def get_app_with_instances(conn, app_name: str) -> Optional[Dict[str, Any]]:
        """Get application with all instances in a single query."""
        query = """
        SELECT 
            a.*,
            COALESCE(
                json_agg(
                    json_build_object(
                        'id', i.id,
                        'container_id', i.container_id,
                        'container_name', i.container_name,
                        'replica_index', i.replica_index,
                        'ip', i.ip,
                        'port', i.port,
                        'status', i.status,
                        'health_status', i.health_status,
                        'cpu_percent', i.cpu_percent,
                        'memory_percent', i.memory_percent,
                        'failure_count', i.failure_count,
                        'last_health_check', EXTRACT(EPOCH FROM i.last_health_check)
                    ) ORDER BY i.replica_index
                ) FILTER (WHERE i.id IS NOT NULL),
                '[]'::json
            ) as instances
        FROM applications a
        LEFT JOIN instances i ON a.name = i.app_name AND i.status != 'stopped'
        WHERE a.name = $1
        GROUP BY a.name
        """

        row = await conn.fetchrow(query, app_name)
        if not row:
            return None

        return {
            'app': AppRecord.from_db_row(row),
            'instances': [InstanceRecord(**instance) for instance in row['instances']]
        }

    @staticmethod
    async def get_apps_summary(conn, status_filter: Optional[str] = None) -> List[Dict[str, Any]]:
        """Get summary of all applications with instance counts."""
        conditions = []
        params = []

        if status_filter:
            conditions.append("a.status = $1")
            params.append(status_filter)

        where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""

        query = f"""
        SELECT 
            a.name,
            a.status,
            a.replicas,
            a.desired_replicas,
            a.mode,
            a.created_at,
            a.updated_at,
            a.last_scaled_at,
            COUNT(i.id) FILTER (WHERE i.status = 'running') as running_instances,
            COUNT(i.id) FILTER (WHERE i.health_status = 'healthy') as healthy_instances,
            AVG(i.cpu_percent) FILTER (WHERE i.status = 'running') as avg_cpu_percent,
            AVG(i.memory_percent) FILTER (WHERE i.status = 'running') as avg_memory_percent
        FROM applications a
        LEFT JOIN instances i ON a.name = i.app_name
        {where_clause}
        GROUP BY a.name, a.status, a.replicas, a.desired_replicas, a.mode, 
                 a.created_at, a.updated_at, a.last_scaled_at
        ORDER BY a.name
        """

        rows = await conn.fetch(query, *params)
        return [dict(row) for row in rows]

Metrics Aggregation

class MetricsQueries:
    """Optimized queries for metrics operations."""

    @staticmethod
    async def get_app_metrics_window(conn, app_name: str, window_minutes: int = 5) -> Dict[str, Any]:
        """Get aggregated metrics for an application over a time window."""
        query = """
        SELECT 
            metric_type,
            AVG(value) as avg_value,
            MIN(value) as min_value,
            MAX(value) as max_value,
            PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) as p50_value,
            PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95_value,
            COUNT(*) as data_points
        FROM metrics
        WHERE app_name = $1 
          AND timestamp >= NOW() - INTERVAL '%s minutes'
          AND metric_type IN ('cpu_percent', 'memory_percent', 'rps', 'latency_p95', 'active_connections')
        GROUP BY metric_type
        """ % window_minutes

        rows = await conn.fetch(query, app_name)

        metrics = {}
        for row in rows:
            metrics[row['metric_type']] = {
                'avg': float(row['avg_value']) if row['avg_value'] else 0,
                'min': float(row['min_value']) if row['min_value'] else 0,
                'max': float(row['max_value']) if row['max_value'] else 0,
                'p50': float(row['p50_value']) if row['p50_value'] else 0,
                'p95': float(row['p95_value']) if row['p95_value'] else 0,
                'data_points': row['data_points']
            }

        return metrics

    @staticmethod
    async def cleanup_old_metrics(conn, retention_hours: int = 168):
        """Clean up metrics older than retention period."""
        query = """
        DELETE FROM metrics 
        WHERE timestamp < NOW() - INTERVAL '%s hours'
        """ % retention_hours

        result = await conn.execute(query)
        deleted_count = int(result.split()[1])
        return deleted_count

Performance Optimization

Indexing Strategy

-- Composite indexes for common query patterns
CREATE INDEX idx_instances_app_status_health ON instances(app_name, status, health_status);
CREATE INDEX idx_events_app_type_severity ON events(app_name, event_type, severity, timestamp DESC);
CREATE INDEX idx_metrics_app_type_recent ON metrics(app_name, metric_type, timestamp DESC)
WHERE timestamp > NOW() - INTERVAL '24 hours';

-- Partial indexes for active data
CREATE INDEX idx_applications_active ON applications(name, status, updated_at)
WHERE status IN ('running', 'starting', 'updating');

CREATE INDEX idx_instances_active ON instances(app_name, status, health_status, updated_at)
WHERE status IN ('running', 'starting');

Query Optimization

class QueryOptimizer:
    """Database query optimization utilities."""

    @staticmethod
    async def explain_query(conn, query: str, params: List[Any] = None) -> str:
        """Get query execution plan."""
        explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
        result = await conn.fetch(explain_query, *(params or []))
        return json.dumps(result[0]['QUERY PLAN'], indent=2)

    @staticmethod
    async def get_slow_queries(conn, min_duration_ms: int = 1000) -> List[Dict[str, Any]]:
        """Get slow queries from pg_stat_statements."""
        query = """
        SELECT 
            query,
            calls,
            total_time,
            mean_time,
            stddev_time,
            rows,
            100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
        FROM pg_stat_statements
        WHERE mean_time > $1
        ORDER BY mean_time DESC
        LIMIT 20
        """

        rows = await conn.fetch(query, min_duration_ms)
        return [dict(row) for row in rows]

Connection Pool Tuning

# Optimal pool configuration
POOL_CONFIG = {
    'primary': {
        'min_size': 5,          # Always keep 5 connections open
        'max_size': 20,         # Maximum 20 connections
        'command_timeout': 30,   # 30 second query timeout
        'max_queries': 50000,    # Recycle connections after 50k queries
        'max_inactive_time': 300 # Close inactive connections after 5 minutes
    },
    'replica': {
        'min_size': 3,          # Fewer connections for read replica
        'max_size': 10,         # Lower maximum for read workload
        'command_timeout': 60,   # Longer timeout for analytical queries
        'max_queries': 100000,   # Recycle less frequently
        'max_inactive_time': 600 # Keep connections longer for batch operations
    }
}

Data Retention and Archival

Automatic Cleanup

-- Function to clean up old data
CREATE OR REPLACE FUNCTION cleanup_old_data()
RETURNS TABLE(table_name TEXT, deleted_count INTEGER) AS $$
BEGIN
    -- Clean up old events (keep 30 days)
    DELETE FROM events WHERE timestamp < NOW() - INTERVAL '30 days';
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    table_name := 'events';
    RETURN NEXT;

    -- Clean up old metrics (keep 7 days for instance level, 30 days for app level)
    DELETE FROM metrics 
    WHERE timestamp < NOW() - INTERVAL '7 days' 
      AND aggregation_level = 'instance';
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    table_name := 'metrics (instance)';
    RETURN NEXT;

    DELETE FROM metrics 
    WHERE timestamp < NOW() - INTERVAL '30 days' 
      AND aggregation_level = 'app';
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    table_name := 'metrics (app)';
    RETURN NEXT;

    -- Clean up stopped instances (keep 7 days)
    DELETE FROM instances 
    WHERE status = 'stopped' 
      AND updated_at < NOW() - INTERVAL '7 days';
    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    table_name := 'instances (stopped)';
    RETURN NEXT;
END;
$$ LANGUAGE plpgsql;

-- Schedule cleanup (requires pg_cron extension)
SELECT cron.schedule('cleanup-old-data', '0 2 * * *', 'SELECT cleanup_old_data();');

Archival Strategy

class DataArchival:
    """Data archival and retention management."""

    async def archive_old_metrics(self, cutoff_date: datetime, archive_table: str):
        """Archive old metrics to separate table."""
        async with self.get_write_connection() as conn:
            # Create archive table if it doesn't exist
            await conn.execute(f"""
                CREATE TABLE IF NOT EXISTS {archive_table} 
                (LIKE metrics INCLUDING ALL)
            """)

            # Move old data to archive
            await conn.execute(f"""
                WITH moved_data AS (
                    DELETE FROM metrics 
                    WHERE timestamp < $1 
                    RETURNING *
                )
                INSERT INTO {archive_table} 
                SELECT * FROM moved_data
            """, cutoff_date)

    async def compress_old_events(self, days_old: int = 90):
        """Compress old events to reduce storage."""
        async with self.get_write_connection() as conn:
            # Aggregate old events by hour
            await conn.execute("""
                INSERT INTO events_compressed (
                    hour_bucket, app_name, event_type, event_count, 
                    severity_counts, sample_messages
                )
                SELECT 
                    date_trunc('hour', timestamp) as hour_bucket,
                    app_name,
                    event_type,
                    COUNT(*) as event_count,
                    json_object_agg(severity, severity_count) as severity_counts,
                    array_agg(DISTINCT message ORDER BY timestamp DESC LIMIT 5) as sample_messages
                FROM events
                CROSS JOIN LATERAL (
                    SELECT severity, COUNT(*) as severity_count
                    FROM events e2
                    WHERE e2.timestamp = events.timestamp
                      AND e2.app_name = events.app_name
                      AND e2.event_type = events.event_type
                    GROUP BY severity
                ) severity_agg
                WHERE timestamp < NOW() - INTERVAL '%s days'
                GROUP BY date_trunc('hour', timestamp), app_name, event_type
            """ % days_old)

            # Delete original events after compression
            result = await conn.execute(
                "DELETE FROM events WHERE timestamp < NOW() - INTERVAL '%s days'" % days_old
            )
            return int(result.split()[1])

Next Steps: Learn about Load Balancing and Nginx integration.