Extensions and Plugins¶
Guide for extending ORCHESTRY functionality through plugins, custom components, and integration points.
Overview¶
ORCHESTRY is designed with extensibility in mind, providing multiple extension points:
- Custom Scalers: Implement domain-specific scaling algorithms
- Health Check Providers: Add custom health check protocols
- Load Balancer Integrations: Support additional load balancers beyond Nginx
- Metrics Exporters: Export metrics to various monitoring systems
- Event Handlers: React to system events with custom logic
- Authentication Providers: Integrate with enterprise authentication systems
- Storage Backends: Use alternative storage systems for state management
Plugin Architecture¶
Plugin System Design¶
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
import logging
class Plugin(ABC):
"""Base class for all ORCHESTRY plugins."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(f"plugin.{self.__class__.__name__}")
self._enabled = config.get('enabled', True)
@property
def name(self) -> str:
"""Plugin name."""
return self.__class__.__name__.lower().replace('plugin', '')
@property
def version(self) -> str:
"""Plugin version."""
return getattr(self, '__version__', '1.0.0')
@property
def enabled(self) -> bool:
"""Check if plugin is enabled."""
return self._enabled
@abstractmethod
async def initialize(self) -> bool:
"""Initialize the plugin."""
pass
@abstractmethod
async def cleanup(self) -> None:
"""Clean up plugin resources."""
pass
def validate_config(self) -> List[str]:
"""Validate plugin configuration. Return list of errors."""
return []
class PluginManager:
"""Manages plugin lifecycle and registration."""
def __init__(self):
self.plugins: Dict[str, Plugin] = {}
self.plugin_registry: Dict[str, type] = {}
def register_plugin(self, plugin_class: type) -> None:
"""Register a plugin class."""
plugin_name = plugin_class.__name__.lower().replace('plugin', '')
self.plugin_registry[plugin_name] = plugin_class
async def load_plugin(self, plugin_name: str, config: Dict[str, Any]) -> bool:
"""Load and initialize a plugin."""
if plugin_name not in self.plugin_registry:
raise ValueError(f"Plugin '{plugin_name}' not registered")
plugin_class = self.plugin_registry[plugin_name]
plugin = plugin_class(config)
# Validate configuration
config_errors = plugin.validate_config()
if config_errors:
raise ValueError(f"Plugin configuration errors: {config_errors}")
# Initialize plugin
try:
success = await plugin.initialize()
if success:
self.plugins[plugin_name] = plugin
logging.info(f"Successfully loaded plugin: {plugin_name}")
return True
else:
logging.error(f"Failed to initialize plugin: {plugin_name}")
return False
except Exception as e:
logging.error(f"Error loading plugin {plugin_name}: {e}")
return False
async def unload_plugin(self, plugin_name: str) -> None:
"""Unload a plugin."""
if plugin_name in self.plugins:
plugin = self.plugins[plugin_name]
await plugin.cleanup()
del self.plugins[plugin_name]
logging.info(f"Unloaded plugin: {plugin_name}")
def get_plugin(self, plugin_name: str) -> Optional[Plugin]:
"""Get a loaded plugin by name."""
return self.plugins.get(plugin_name)
def list_plugins(self) -> List[str]:
"""List all loaded plugins."""
return list(self.plugins.keys())
Custom Scalers¶
Scaler Plugin Interface¶
from abc import abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, List
@dataclass
class ScalingDecision:
"""Represents a scaling decision made by a scaler."""
app_name: str
current_replicas: int
target_replicas: int
reason: str
confidence: float # 0.0 to 1.0
metadata: Dict[str, Any]
class CustomScaler(Plugin):
"""Base class for custom scaling algorithms."""
@abstractmethod
async def should_scale(self, app_name: str, metrics: Dict[str, Any],
current_replicas: int) -> Optional[ScalingDecision]:
"""Determine if scaling is needed."""
pass
@abstractmethod
def get_required_metrics(self) -> List[str]:
"""Return list of required metrics for this scaler."""
pass
# Example: Custom ML-based scaler
class MLScalerPlugin(CustomScaler):
"""Machine learning-based auto scaler."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.model_path = config.get('model_path')
self.prediction_window = config.get('prediction_window', 300) # 5 minutes
self.model = None
async def initialize(self) -> bool:
"""Initialize ML model."""
try:
import joblib
self.model = joblib.load(self.model_path)
self.logger.info(f"Loaded ML model from {self.model_path}")
return True
except Exception as e:
self.logger.error(f"Failed to load ML model: {e}")
return False
async def cleanup(self) -> None:
"""Clean up resources."""
self.model = None
def get_required_metrics(self) -> List[str]:
"""Required metrics for ML prediction."""
return [
'cpu_percent',
'memory_percent',
'rps',
'latency_p95',
'active_connections',
'error_rate'
]
async def should_scale(self, app_name: str, metrics: Dict[str, Any],
current_replicas: int) -> Optional[ScalingDecision]:
"""Use ML model to predict scaling needs."""
if not self.model:
return None
try:
# Prepare feature vector
features = self._prepare_features(metrics, current_replicas)
# Predict optimal replica count
predicted_replicas = self.model.predict([features])[0]
predicted_replicas = max(1, min(20, int(round(predicted_replicas))))
# Calculate confidence based on prediction probability
confidence = self._calculate_confidence(features)
if predicted_replicas != current_replicas and confidence > 0.7:
return ScalingDecision(
app_name=app_name,
current_replicas=current_replicas,
target_replicas=predicted_replicas,
reason=f"ML prediction: {predicted_replicas} replicas (confidence: {confidence:.2f})",
confidence=confidence,
metadata={
'model_prediction': predicted_replicas,
'features': dict(zip(self.get_required_metrics() + ['current_replicas'], features)),
'scaler_type': 'ml'
}
)
except Exception as e:
self.logger.error(f"ML scaling prediction failed: {e}")
return None
def _prepare_features(self, metrics: Dict[str, Any], current_replicas: int) -> List[float]:
"""Prepare feature vector for ML model."""
features = []
for metric_name in self.get_required_metrics():
metric_data = metrics.get(metric_name, {})
# Use average value, defaulting to 0 if not available
value = metric_data.get('avg', 0) if isinstance(metric_data, dict) else metric_data
features.append(float(value))
features.append(float(current_replicas))
return features
def _calculate_confidence(self, features: List[float]) -> float:
"""Calculate prediction confidence."""
# This would use model-specific confidence calculation
# For example, using prediction probabilities from ensemble models
return 0.8 # Placeholder
# Example: Time-based scaler
class TimeBasedScalerPlugin(CustomScaler):
"""Scales based on time patterns and scheduled events."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.schedule = config.get('schedule', {})
# Schedule format: {"09:00": 5, "18:00": 2, "22:00": 1}
async def initialize(self) -> bool:
"""Initialize time-based scaler."""
return True
async def cleanup(self) -> None:
"""No cleanup needed."""
pass
def get_required_metrics(self) -> List[str]:
"""No specific metrics required."""
return []
async def should_scale(self, app_name: str, metrics: Dict[str, Any],
current_replicas: int) -> Optional[ScalingDecision]:
"""Scale based on time schedule."""
from datetime import datetime
current_time = datetime.now().strftime("%H:%M")
# Find the most recent schedule entry
target_replicas = None
for time_str, replicas in sorted(self.schedule.items()):
if current_time >= time_str:
target_replicas = replicas
if target_replicas and target_replicas != current_replicas:
return ScalingDecision(
app_name=app_name,
current_replicas=current_replicas,
target_replicas=target_replicas,
reason=f"Scheduled scaling at {current_time}: {target_replicas} replicas",
confidence=1.0,
metadata={
'schedule_time': current_time,
'scaler_type': 'time_based'
}
)
return None
Custom Health Check Providers¶
Health Check Plugin Interface¶
from dataclasses import dataclass
from enum import Enum
class HealthStatus(Enum):
"""Health check status values."""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
@dataclass
class HealthCheckResult:
"""Health check result."""
status: HealthStatus
response_time_ms: float
message: str
details: Dict[str, Any]
timestamp: float
class HealthCheckProvider(Plugin):
"""Base class for custom health check providers."""
@abstractmethod
async def check_health(self, instance: 'InstanceRecord') -> HealthCheckResult:
"""Perform health check on an instance."""
pass
@abstractmethod
def get_protocol(self) -> str:
"""Return the protocol this provider handles."""
pass
# Example: gRPC health check provider
class GRPCHealthCheckPlugin(HealthCheckProvider):
"""gRPC health check implementation."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.timeout = config.get('timeout', 5.0)
self.service_name = config.get('service_name', '')
async def initialize(self) -> bool:
"""Initialize gRPC client."""
try:
import grpc
from grpc_health.v1 import health_pb2, health_pb2_grpc
self.grpc = grpc
self.health_pb2 = health_pb2
self.health_pb2_grpc = health_pb2_grpc
return True
except ImportError as e:
self.logger.error(f"gRPC libraries not available: {e}")
return False
async def cleanup(self) -> None:
"""No cleanup needed."""
pass
def get_protocol(self) -> str:
"""Return protocol name."""
return "grpc"
async def check_health(self, instance) -> HealthCheckResult:
"""Perform gRPC health check."""
start_time = time.time()
try:
# Create gRPC channel
channel = self.grpc.aio.insecure_channel(f"{instance.ip}:{instance.port}")
stub = self.health_pb2_grpc.HealthStub(channel)
# Create health check request
request = self.health_pb2.HealthCheckRequest(service=self.service_name)
# Perform health check with timeout
response = await asyncio.wait_for(
stub.Check(request),
timeout=self.timeout
)
response_time = (time.time() - start_time) * 1000
# Map gRPC health status to our status
if response.status == self.health_pb2.HealthCheckResponse.SERVING:
status = HealthStatus.HEALTHY
message = "Service is serving"
else:
status = HealthStatus.UNHEALTHY
message = f"Service status: {response.status}"
await channel.close()
return HealthCheckResult(
status=status,
response_time_ms=response_time,
message=message,
details={
'grpc_status': response.status,
'service_name': self.service_name
},
timestamp=time.time()
)
except asyncio.TimeoutError:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message="Health check timeout",
details={'error': 'timeout'},
timestamp=time.time()
)
except Exception as e:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message=f"Health check failed: {str(e)}",
details={'error': str(e)},
timestamp=time.time()
)
# Example: Redis health check provider
class RedisHealthCheckPlugin(HealthCheckProvider):
"""Redis-based health check for applications using Redis."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.redis_url = config.get('redis_url', 'redis://localhost:6379')
self.key_prefix = config.get('key_prefix', 'health:')
self.redis_client = None
async def initialize(self) -> bool:
"""Initialize Redis connection."""
try:
import aioredis
self.redis_client = aioredis.from_url(self.redis_url)
await self.redis_client.ping()
return True
except Exception as e:
self.logger.error(f"Failed to connect to Redis: {e}")
return False
async def cleanup(self) -> None:
"""Clean up Redis connection."""
if self.redis_client:
await self.redis_client.close()
def get_protocol(self) -> str:
"""Return protocol name."""
return "redis"
async def check_health(self, instance) -> HealthCheckResult:
"""Check health by looking up status in Redis."""
start_time = time.time()
try:
# Look up health status in Redis
health_key = f"{self.key_prefix}{instance.app_name}:{instance.container_id}"
status_data = await self.redis_client.get(health_key)
response_time = (time.time() - start_time) * 1000
if status_data:
status_info = json.loads(status_data)
last_update = status_info.get('timestamp', 0)
# Check if status is recent (within last 60 seconds)
if time.time() - last_update < 60:
status = HealthStatus.HEALTHY if status_info.get('healthy', False) else HealthStatus.UNHEALTHY
message = status_info.get('message', 'Redis health check')
else:
status = HealthStatus.UNKNOWN
message = "Stale health data in Redis"
else:
status = HealthStatus.UNKNOWN
message = "No health data found in Redis"
return HealthCheckResult(
status=status,
response_time_ms=response_time,
message=message,
details={'redis_key': health_key},
timestamp=time.time()
)
except Exception as e:
return HealthCheckResult(
status=HealthStatus.UNHEALTHY,
response_time_ms=(time.time() - start_time) * 1000,
message=f"Redis health check failed: {str(e)}",
details={'error': str(e)},
timestamp=time.time()
)
Metrics Exporters¶
Metrics Plugin Interface¶
class MetricsExporter(Plugin):
"""Base class for metrics exporters."""
@abstractmethod
async def export_metrics(self, metrics: List[Dict[str, Any]]) -> bool:
"""Export metrics to external system."""
pass
@abstractmethod
def get_supported_formats(self) -> List[str]:
"""Return list of supported metric formats."""
pass
# Example: Prometheus exporter
class PrometheusExporterPlugin(MetricsExporter):
"""Export metrics to Prometheus."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.gateway_url = config.get('gateway_url', 'http://localhost:9091')
self.job_name = config.get('job_name', 'orchestry')
self.instance_id = config.get('instance_id', 'orchestry-controller')
async def initialize(self) -> bool:
"""Initialize Prometheus client."""
try:
from prometheus_client import CollectorRegistry, Gauge, Counter, push_to_gateway
self.registry = CollectorRegistry()
self.push_to_gateway = push_to_gateway
# Create metric objects
self.metrics = {
'app_replicas': Gauge('orchestry_app_replicas', 'Number of replicas', ['app_name'], registry=self.registry),
'app_healthy_instances': Gauge('orchestry_app_healthy_instances', 'Healthy instances', ['app_name'], registry=self.registry),
'app_cpu_percent': Gauge('orchestry_app_cpu_percent', 'CPU usage percentage', ['app_name'], registry=self.registry),
'app_memory_percent': Gauge('orchestry_app_memory_percent', 'Memory usage percentage', ['app_name'], registry=self.registry),
'app_rps': Gauge('orchestry_app_rps', 'Requests per second', ['app_name'], registry=self.registry),
'scaling_events': Counter('orchestry_scaling_events_total', 'Scaling events', ['app_name', 'direction'], registry=self.registry),
'health_check_failures': Counter('orchestry_health_check_failures_total', 'Health check failures', ['app_name'], registry=self.registry)
}
return True
except ImportError as e:
self.logger.error(f"Prometheus client not available: {e}")
return False
async def cleanup(self) -> None:
"""No cleanup needed."""
pass
def get_supported_formats(self) -> List[str]:
"""Supported formats."""
return ['prometheus']
async def export_metrics(self, metrics: List[Dict[str, Any]]) -> bool:
"""Export metrics to Prometheus pushgateway."""
try:
# Group metrics by application
app_metrics = {}
for metric in metrics:
app_name = metric.get('app_name')
if not app_name:
continue
if app_name not in app_metrics:
app_metrics[app_name] = {}
app_metrics[app_name][metric['metric_type']] = metric['value']
# Update Prometheus metrics
for app_name, metric_data in app_metrics.items():
for metric_type, value in metric_data.items():
if metric_type == 'replicas':
self.metrics['app_replicas'].labels(app_name=app_name).set(value)
elif metric_type == 'healthy_instances':
self.metrics['app_healthy_instances'].labels(app_name=app_name).set(value)
elif metric_type == 'cpu_percent':
self.metrics['app_cpu_percent'].labels(app_name=app_name).set(value)
elif metric_type == 'memory_percent':
self.metrics['app_memory_percent'].labels(app_name=app_name).set(value)
elif metric_type == 'rps':
self.metrics['app_rps'].labels(app_name=app_name).set(value)
# Push to gateway
self.push_to_gateway(
self.gateway_url,
job=self.job_name,
registry=self.registry,
grouping_key={'instance': self.instance_id}
)
return True
except Exception as e:
self.logger.error(f"Failed to export metrics to Prometheus: {e}")
return False
# Example: InfluxDB exporter
class InfluxDBExporterPlugin(MetricsExporter):
"""Export metrics to InfluxDB."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.url = config.get('url', 'http://localhost:8086')
self.token = config.get('token')
self.org = config.get('org')
self.bucket = config.get('bucket', 'orchestry')
self.client = None
async def initialize(self) -> bool:
"""Initialize InfluxDB client."""
try:
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
self.client = InfluxDBClientAsync(
url=self.url,
token=self.token,
org=self.org
)
return True
except ImportError as e:
self.logger.error(f"InfluxDB client not available: {e}")
return False
async def cleanup(self) -> None:
"""Clean up InfluxDB client."""
if self.client:
await self.client.close()
def get_supported_formats(self) -> List[str]:
"""Supported formats."""
return ['influxdb']
async def export_metrics(self, metrics: List[Dict[str, Any]]) -> bool:
"""Export metrics to InfluxDB."""
try:
write_api = self.client.write_api()
points = []
for metric in metrics:
# Convert to InfluxDB line protocol
point = {
'measurement': metric['metric_type'],
'tags': {
'app_name': metric.get('app_name', ''),
'source': 'orchestry'
},
'fields': {
'value': float(metric['value'])
},
'time': int(metric.get('timestamp', time.time()) * 1000000000) # nanoseconds
}
# Add additional labels as tags
if 'labels' in metric:
point['tags'].update(metric['labels'])
points.append(point)
# Write points to InfluxDB
await write_api.write(bucket=self.bucket, record=points)
return True
except Exception as e:
self.logger.error(f"Failed to export metrics to InfluxDB: {e}")
return False
Event Handlers¶
Event Handler Plugin Interface¶
from enum import Enum
class EventType(Enum):
"""System event types."""
APPLICATION_DEPLOYED = "application_deployed"
APPLICATION_SCALED = "application_scaled"
APPLICATION_REMOVED = "application_removed"
INSTANCE_STARTED = "instance_started"
INSTANCE_STOPPED = "instance_stopped"
HEALTH_CHECK_FAILED = "health_check_failed"
SCALING_DECISION = "scaling_decision"
@dataclass
class SystemEvent:
"""System event data."""
event_type: EventType
app_name: str
timestamp: float
data: Dict[str, Any]
source: str = "controller"
class EventHandler(Plugin):
"""Base class for event handlers."""
@abstractmethod
async def handle_event(self, event: SystemEvent) -> None:
"""Handle a system event."""
pass
@abstractmethod
def get_supported_events(self) -> List[EventType]:
"""Return list of event types this handler supports."""
pass
# Example: Slack notification handler
class SlackNotificationPlugin(EventHandler):
"""Send notifications to Slack for important events."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.webhook_url = config.get('webhook_url')
self.channel = config.get('channel', '#alerts')
self.severity_levels = config.get('severity_levels', ['error', 'warning'])
async def initialize(self) -> bool:
"""Initialize Slack client."""
return self.webhook_url is not None
async def cleanup(self) -> None:
"""No cleanup needed."""
pass
def get_supported_events(self) -> List[EventType]:
"""Events we handle."""
return [
EventType.APPLICATION_DEPLOYED,
EventType.APPLICATION_SCALED,
EventType.HEALTH_CHECK_FAILED,
EventType.SCALING_DECISION
]
async def handle_event(self, event: SystemEvent) -> None:
"""Send Slack notification for event."""
try:
message = self._format_message(event)
if message:
await self._send_slack_message(message)
except Exception as e:
self.logger.error(f"Failed to send Slack notification: {e}")
def _format_message(self, event: SystemEvent) -> Optional[str]:
"""Format event as Slack message."""
if event.event_type == EventType.APPLICATION_DEPLOYED:
return f"🚀 Application `{event.app_name}` deployed successfully"
elif event.event_type == EventType.APPLICATION_SCALED:
old_replicas = event.data.get('old_replicas', 0)
new_replicas = event.data.get('new_replicas', 0)
direction = "up" if new_replicas > old_replicas else "down"
return f"📈 Application `{event.app_name}` scaled {direction}: {old_replicas} → {new_replicas} replicas"
elif event.event_type == EventType.HEALTH_CHECK_FAILED:
instance_id = event.data.get('instance_id', 'unknown')
return f"⚠️ Health check failed for `{event.app_name}` instance `{instance_id}`"
elif event.event_type == EventType.SCALING_DECISION:
reason = event.data.get('reason', 'Unknown reason')
return f"🔄 Scaling decision for `{event.app_name}`: {reason}"
return None
async def _send_slack_message(self, message: str) -> None:
"""Send message to Slack."""
import aiohttp
payload = {
'channel': self.channel,
'text': message,
'username': 'Orchestry',
'icon_emoji': ':robot_face:'
}
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload) as response:
if response.status != 200:
self.logger.error(f"Slack API error: {response.status}")
Load Balancer Integrations¶
Load Balancer Plugin Interface¶
class LoadBalancer(Plugin):
"""Base class for load balancer integrations."""
@abstractmethod
async def add_upstream(self, app_name: str, instances: List['InstanceRecord']) -> bool:
"""Add upstream configuration for an application."""
pass
@abstractmethod
async def remove_upstream(self, app_name: str) -> bool:
"""Remove upstream configuration."""
pass
@abstractmethod
async def update_upstream(self, app_name: str, instances: List['InstanceRecord']) -> bool:
"""Update upstream configuration."""
pass
@abstractmethod
async def get_upstream_status(self, app_name: str) -> Dict[str, Any]:
"""Get upstream status information."""
pass
# Example: HAProxy integration
class HAProxyPlugin(LoadBalancer):
"""HAProxy load balancer integration."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.config_path = config.get('config_path', '/etc/haproxy/haproxy.cfg')
self.stats_url = config.get('stats_url', 'http://localhost:8404/stats')
self.reload_command = config.get('reload_command', 'systemctl reload haproxy')
async def initialize(self) -> bool:
"""Initialize HAProxy integration."""
# Check if HAProxy is available
try:
import subprocess
result = subprocess.run(['haproxy', '-v'], capture_output=True)
return result.returncode == 0
except FileNotFoundError:
self.logger.error("HAProxy not found")
return False
async def cleanup(self) -> None:
"""No cleanup needed."""
pass
async def add_upstream(self, app_name: str, instances: List['InstanceRecord']) -> bool:
"""Add HAProxy backend for application."""
try:
config = await self._generate_backend_config(app_name, instances)
await self._update_haproxy_config(app_name, config)
await self._reload_haproxy()
return True
except Exception as e:
self.logger.error(f"Failed to add HAProxy upstream: {e}")
return False
async def remove_upstream(self, app_name: str) -> bool:
"""Remove HAProxy backend."""
try:
await self._remove_backend_config(app_name)
await self._reload_haproxy()
return True
except Exception as e:
self.logger.error(f"Failed to remove HAProxy upstream: {e}")
return False
async def update_upstream(self, app_name: str, instances: List['InstanceRecord']) -> bool:
"""Update HAProxy backend."""
return await self.add_upstream(app_name, instances)
async def get_upstream_status(self, app_name: str) -> Dict[str, Any]:
"""Get HAProxy backend status."""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(f"{self.stats_url};csv") as response:
csv_data = await response.text()
return self._parse_haproxy_stats(csv_data, app_name)
except Exception as e:
self.logger.error(f"Failed to get HAProxy status: {e}")
return {}
async def _generate_backend_config(self, app_name: str, instances: List['InstanceRecord']) -> str:
"""Generate HAProxy backend configuration."""
config_lines = [
f"backend {app_name}_backend",
" balance roundrobin",
" option httpchk GET /health"
]
for i, instance in enumerate(instances):
if instance.status == 'running' and instance.health_status == 'healthy':
config_lines.append(
f" server {app_name}-{i} {instance.ip}:{instance.port} check"
)
return '\n'.join(config_lines)
Plugin Configuration¶
Configuration Management¶
# Plugin configuration in main Orchestry config
PLUGIN_CONFIG = {
'plugins': {
'ml_scaler': {
'enabled': True,
'model_path': '/opt/orchestry/models/scaling_model.pkl',
'prediction_window': 300,
'confidence_threshold': 0.7
},
'prometheus_exporter': {
'enabled': True,
'gateway_url': 'http://localhost:9091',
'job_name': 'orchestry',
'export_interval': 30
},
'slack_notifications': {
'enabled': True,
'webhook_url': 'https://hooks.slack.com/services/...',
'channel': '#orchestry-alerts',
'severity_levels': ['error', 'warning']
},
'grpc_health_check': {
'enabled': True,
'timeout': 5.0,
'service_name': 'health'
}
}
}
class ExtensibleController:
"""Controller with plugin support."""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.plugin_manager = PluginManager()
self.custom_scalers: List[CustomScaler] = []
self.health_check_providers: Dict[str, HealthCheckProvider] = {}
self.metrics_exporters: List[MetricsExporter] = []
self.event_handlers: List[EventHandler] = []
async def initialize_plugins(self):
"""Initialize all configured plugins."""
plugin_configs = self.config.get('plugins', {})
for plugin_name, plugin_config in plugin_configs.items():
if plugin_config.get('enabled', False):
try:
await self.plugin_manager.load_plugin(plugin_name, plugin_config)
plugin = self.plugin_manager.get_plugin(plugin_name)
# Register plugin with appropriate manager
if isinstance(plugin, CustomScaler):
self.custom_scalers.append(plugin)
elif isinstance(plugin, HealthCheckProvider):
self.health_check_providers[plugin.get_protocol()] = plugin
elif isinstance(plugin, MetricsExporter):
self.metrics_exporters.append(plugin)
elif isinstance(plugin, EventHandler):
self.event_handlers.append(plugin)
except Exception as e:
logging.error(f"Failed to load plugin {plugin_name}: {e}")
async def emit_event(self, event: SystemEvent):
"""Emit event to all registered handlers."""
for handler in self.event_handlers:
if event.event_type in handler.get_supported_events():
try:
await handler.handle_event(event)
except Exception as e:
logging.error(f"Event handler error: {e}")
async def export_metrics(self, metrics: List[Dict[str, Any]]):
"""Export metrics using all registered exporters."""
for exporter in self.metrics_exporters:
try:
await exporter.export_metrics(metrics)
except Exception as e:
logging.error(f"Metrics export error: {e}")
This completes the comprehensive Orchestry documentation! The documentation now covers:
Summary of Documentation Created¶
User Guide (docs/user-guide/)¶
- Quick Start - Get up and running quickly
- CLI Reference - Complete command-line interface documentation
- Application Specification - YAML/JSON app configuration format
- API Reference - REST API endpoints and usage
- Configuration - System configuration options
- Troubleshooting - Common issues and solutions
Developer Guide (docs/developer-guide/)¶
- Architecture - System design and component overview
- Components - Detailed component documentation
- Scaling - Auto-scaling algorithms and implementation
- Database - Schema, queries, and data management
- Health Monitoring - Health checks and failure detection
- Load Balancing - Nginx integration and traffic management
- Development - Development environment and workflows
- Extensions - Plugin system and extensibility
Examples (docs/examples/)¶
- Applications - Sample application configurations
Main Documentation (docs/)¶
- README.md - Documentation overview and navigation
This documentation provides comprehensive coverage for both users wanting to deploy applications and developers wanting to understand or extend Orchestry's functionality. It includes practical examples, troubleshooting guides, and detailed technical implementation information suitable for deployment and ongoing maintenance.