Scaling Algorithm¶
Deep dive into Orchestry's multi-metric auto-scaling algorithm, policies, and decision-making process.
Overview¶
Orchestry uses a sophisticated multi-metric scaling algorithm that considers CPU utilization, memory usage, request rate, response latency, and active connections to make intelligent scaling decisions. The algorithm is designed to be predictive, stable, and configurable.
Core Algorithm¶
Scaling Decision Flow¶
graph TD
A[Collect Metrics] --> B[Calculate Scale Factors]
B --> C{In Cooldown?}
C -->|Yes| D[No Scaling]
C -->|No| E[Evaluate Thresholds]
E --> F{Any Metric > Scale Out Threshold?}
F -->|Yes| G[Calculate Scale Out Target]
F -->|No| H{All Metrics < Scale In Threshold?}
H -->|Yes| I[Calculate Scale In Target]
H -->|No| J[No Scaling Needed]
G --> K[Apply Constraints]
I --> K
K --> L[Execute Scaling]
L --> M[Log Decision]
D --> M
J --> M
Scale Factor Calculation¶
Each metric is normalized to a scale factor representing resource pressure:
def calculate_scale_factors(self, metrics: ScalingMetrics, policy: ScalingPolicy) -> Dict[str, float]:
"""
Calculate normalized scale factors for each metric.
A factor of 1.0 means the metric is at its target.
A factor > 1.0 indicates pressure (potential scale out).
A factor < 1.0 indicates underutilization (potential scale in).
"""
factors = {}
# CPU Utilization Factor
# Target: policy.max_cpu_percent (default: 70%)
if policy.max_cpu_percent > 0:
factors['cpu'] = metrics.cpu_percent / policy.max_cpu_percent
# Memory Utilization Factor
# Target: policy.max_memory_percent (default: 75%)
if policy.max_memory_percent > 0:
factors['memory'] = metrics.memory_percent / policy.max_memory_percent
# Requests Per Second Factor
# Target: policy.target_rps_per_replica RPS per healthy replica
if policy.target_rps_per_replica > 0 and metrics.healthy_replicas > 0:
current_rps_per_replica = metrics.rps / metrics.healthy_replicas
factors['rps'] = current_rps_per_replica / policy.target_rps_per_replica
# Response Latency Factor
# Target: policy.max_p95_latency_ms (default: 250ms)
if policy.max_p95_latency_ms > 0:
factors['latency'] = metrics.p95_latency_ms / policy.max_p95_latency_ms
# Connection Count Factor
# Target: policy.max_conn_per_replica connections per healthy replica
if policy.max_conn_per_replica > 0 and metrics.healthy_replicas > 0:
current_conn_per_replica = metrics.active_connections / metrics.healthy_replicas
factors['connections'] = current_conn_per_replica / policy.max_conn_per_replica
return factors
Scaling Policies¶
Policy Structure¶
@dataclass
class ScalingPolicy:
"""Comprehensive scaling policy configuration."""
# Replica bounds
min_replicas: int = 1
max_replicas: int = 5
# Performance targets
target_rps_per_replica: int = 50 # Target requests/sec per replica
max_p95_latency_ms: int = 250 # Maximum 95th percentile latency
max_conn_per_replica: int = 80 # Maximum connections per replica
max_cpu_percent: float = 70.0 # Maximum CPU utilization %
max_memory_percent: float = 75.0 # Maximum memory utilization %
# Scaling thresholds
scale_out_threshold_pct: int = 80 # Scale out when factor > 80% of target
scale_in_threshold_pct: int = 30 # Scale in when factor < 30% of target
# Timing configuration
window_seconds: int = 60 # Metrics evaluation window
cooldown_seconds: int = 180 # Minimum time between scaling events
# Advanced settings
max_scale_out_step: int = 0 # Max replicas to add (0 = unlimited)
max_scale_in_step: int = 1 # Max replicas to remove per operation
stabilization_window_seconds: int = 300 # Time to wait for stability
# Metric weights (future enhancement)
cpu_weight: float = 1.0
memory_weight: float = 1.0
rps_weight: float = 1.0
latency_weight: float = 1.5 # Latency gets higher priority
connection_weight: float = 0.8
Policy Examples¶
Latency-Sensitive Application¶
latency_sensitive_policy = ScalingPolicy(
min_replicas=3, # Always maintain 3 replicas
max_replicas=20, # Can scale up to 20
target_rps_per_replica=30, # Lower RPS target
max_p95_latency_ms=100, # Strict 100ms latency limit
max_cpu_percent=60.0, # Conservative CPU target
scale_out_threshold_pct=70, # Scale out early
scale_in_threshold_pct=20, # Scale in conservatively
cooldown_seconds=120, # Faster scaling decisions
stabilization_window_seconds=180 # Shorter stability window
)
Batch Processing Application¶
batch_processing_policy = ScalingPolicy(
min_replicas=1, # Can scale to zero
max_replicas=50, # High maximum for bursts
target_rps_per_replica=200, # Higher RPS tolerance
max_p95_latency_ms=2000, # More latency tolerance
max_cpu_percent=90.0, # High CPU utilization OK
max_memory_percent=85.0, # High memory utilization OK
scale_out_threshold_pct=85, # Scale out later
scale_in_threshold_pct=15, # Scale in aggressively
cooldown_seconds=300, # Slower scaling decisions
max_scale_out_step=5 # Scale out in larger steps
)
Development Environment¶
dev_policy = ScalingPolicy(
min_replicas=1,
max_replicas=3, # Limited scaling for dev
target_rps_per_replica=10, # Low traffic expected
max_p95_latency_ms=1000, # Relaxed latency
max_cpu_percent=80.0,
scale_out_threshold_pct=90, # Scale out very late
scale_in_threshold_pct=10, # Scale in very early
cooldown_seconds=600, # Long cooldown for stability
stabilization_window_seconds=600
)
Scaling Decision Logic¶
Scale Out Decision¶
def should_scale_out(self, scale_factors: Dict[str, float], policy: ScalingPolicy) -> Tuple[bool, str, List[str]]:
"""
Determine if scale out is needed.
Scale out if ANY metric exceeds the scale out threshold.
This ensures responsive scaling when any resource becomes constrained.
"""
threshold = policy.scale_out_threshold_pct / 100.0
triggered_metrics = []
for metric_name, factor in scale_factors.items():
if factor > threshold:
triggered_metrics.append(metric_name)
if triggered_metrics:
# Find the most constrained metric
max_metric = max(scale_factors.items(), key=lambda x: x[1])
reason = f"Scale out triggered by {max_metric[0]}: {max_metric[1]:.2f}x target"
return True, reason, triggered_metrics
return False, "All metrics below scale out threshold", []
Scale In Decision¶
def should_scale_in(self, scale_factors: Dict[str, float], policy: ScalingPolicy, current_replicas: int) -> Tuple[bool, str]:
"""
Determine if scale in is needed.
Scale in only if ALL metrics are below the scale in threshold.
This ensures we don't scale in prematurely while any resource is under pressure.
"""
if current_replicas <= policy.min_replicas:
return False, f"Already at minimum replicas ({policy.min_replicas})"
threshold = policy.scale_in_threshold_pct / 100.0
for metric_name, factor in scale_factors.items():
if factor > threshold:
return False, f"Cannot scale in: {metric_name} factor {factor:.2f} > threshold {threshold:.2f}"
# All metrics are below threshold
max_factor = max(scale_factors.values()) if scale_factors else 0
reason = f"Scale in: All metrics below threshold (max factor: {max_factor:.2f})"
return True, reason
Target Replica Calculation¶
Scale Out Target¶
def calculate_scale_out_target(self, current_replicas: int, scale_factors: Dict[str, float],
policy: ScalingPolicy) -> int:
"""
Calculate target replicas for scale out operation.
Uses the highest scale factor to determine how many replicas are needed
to bring all metrics back within acceptable ranges.
"""
if not scale_factors:
return current_replicas + 1
# Find the metric with highest pressure
max_factor = max(scale_factors.values())
# Calculate theoretical target based on max factor
# If factor is 1.5, we need 50% more capacity
theoretical_target = math.ceil(current_replicas * max_factor)
# Apply step limits
if policy.max_scale_out_step > 0:
max_increase = policy.max_scale_out_step
theoretical_target = min(theoretical_target, current_replicas + max_increase)
# Conservative scaling: don't increase by more than 100% at once
max_conservative = current_replicas * 2
theoretical_target = min(theoretical_target, max_conservative)
# Apply absolute maximum
final_target = min(theoretical_target, policy.max_replicas)
# Ensure we're actually scaling out
return max(final_target, current_replicas + 1)
Scale In Target¶
def calculate_scale_in_target(self, current_replicas: int, scale_factors: Dict[str, float],
policy: ScalingPolicy) -> int:
"""
Calculate target replicas for scale in operation.
Uses conservative approach - remove one replica at a time unless
utilization is extremely low.
"""
if current_replicas <= policy.min_replicas:
return current_replicas
# Default: scale in by 1 replica
target = current_replicas - 1
# If utilization is very low, consider scaling in more aggressively
if scale_factors:
max_factor = max(scale_factors.values())
if max_factor < 0.1: # Less than 10% utilization
# Can scale in more aggressively
theoretical_target = math.floor(current_replicas * max_factor * 2) # 2x buffer
target = max(theoretical_target, policy.min_replicas)
# Apply step limits
if policy.max_scale_in_step > 0:
max_decrease = policy.max_scale_in_step
target = max(target, current_replicas - max_decrease)
# Ensure we don't go below minimum
return max(target, policy.min_replicas)
Cooldown and Stability¶
Cooldown Management¶
def is_in_cooldown(self, app_name: str, cooldown_seconds: int) -> bool:
"""
Check if application is in cooldown period.
Cooldown prevents rapid scaling oscillations by enforcing
minimum time between scaling operations.
"""
last_scale_time = self.last_scale_time.get(app_name, 0)
return time.time() - last_scale_time < cooldown_seconds
def update_cooldown_timer(self, app_name: str):
"""Update the last scaling time for cooldown calculation."""
self.last_scale_time[app_name] = time.time()
Stability Window¶
def is_system_stable(self, app_name: str, policy: ScalingPolicy) -> bool:
"""
Check if the system has been stable for the required period.
Stability window ensures the system has had time to adapt to
previous scaling decisions before making new ones.
"""
# Get recent scaling events
recent_events = self.get_recent_scaling_events(
app_name,
since=time.time() - policy.stabilization_window_seconds
)
# System is stable if no scaling events in the stability window
return len(recent_events) == 0
Metrics Collection and Analysis¶
Metrics Aggregation¶
def aggregate_metrics(self, app_name: str, window_seconds: int) -> ScalingMetrics:
"""
Aggregate metrics over the specified time window.
Uses different aggregation methods for different metrics:
- CPU/Memory: Average over window
- RPS: Rate calculation over window
- Latency: 95th percentile over window
- Connections: Current value (latest)
"""
cutoff_time = time.time() - window_seconds
history = self.metrics_history[app_name]
# Filter metrics within window
cpu_points = [p for p in history['cpu'] if p.timestamp >= cutoff_time]
memory_points = [p for p in history['memory'] if p.timestamp >= cutoff_time]
rps_points = [p for p in history['rps'] if p.timestamp >= cutoff_time]
latency_points = [p for p in history['latency'] if p.timestamp >= cutoff_time]
connection_points = [p for p in history['connections'] if p.timestamp >= cutoff_time]
replica_points = [p for p in history['healthy_replicas'] if p.timestamp >= cutoff_time]
# Calculate aggregated values
metrics = ScalingMetrics()
if cpu_points:
metrics.cpu_percent = statistics.mean([p.value for p in cpu_points])
if memory_points:
metrics.memory_percent = statistics.mean([p.value for p in memory_points])
if rps_points and len(rps_points) >= 2:
# Calculate rate from first and last points
first_point = rps_points[0]
last_point = rps_points[-1]
time_diff = last_point.timestamp - first_point.timestamp
if time_diff > 0:
metrics.rps = (last_point.value - first_point.value) / time_diff
if latency_points:
# Use 95th percentile
latencies = sorted([p.value for p in latency_points])
percentile_idx = int(0.95 * len(latencies))
metrics.p95_latency_ms = latencies[percentile_idx]
if connection_points:
# Use latest value
metrics.active_connections = connection_points[-1].value
if replica_points:
# Use latest healthy replica count
metrics.healthy_replicas = int(replica_points[-1].value)
metrics.total_replicas = self.get_total_replicas(app_name)
return metrics
Smoothing and Filtering¶
def apply_smoothing(self, current_metrics: ScalingMetrics,
previous_metrics: ScalingMetrics, alpha: float = 0.3) -> ScalingMetrics:
"""
Apply exponential smoothing to reduce noise in metrics.
Uses exponential weighted moving average:
smoothed_value = alpha * current + (1 - alpha) * previous
"""
if not previous_metrics:
return current_metrics
smoothed = ScalingMetrics()
smoothed.cpu_percent = alpha * current_metrics.cpu_percent + (1 - alpha) * previous_metrics.cpu_percent
smoothed.memory_percent = alpha * current_metrics.memory_percent + (1 - alpha) * previous_metrics.memory_percent
smoothed.rps = alpha * current_metrics.rps + (1 - alpha) * previous_metrics.rps
smoothed.p95_latency_ms = alpha * current_metrics.p95_latency_ms + (1 - alpha) * previous_metrics.p95_latency_ms
smoothed.active_connections = current_metrics.active_connections # Don't smooth discrete values
smoothed.healthy_replicas = current_metrics.healthy_replicas
smoothed.total_replicas = current_metrics.total_replicas
return smoothed
Advanced Features¶
Predictive Scaling¶
def predict_future_load(self, app_name: str, lookahead_minutes: int = 5) -> ScalingMetrics:
"""
Predict future load based on historical trends.
Uses linear regression on recent metrics to predict future values.
This enables proactive scaling before resources become constrained.
"""
history = self.metrics_history[app_name]
current_time = time.time()
lookback_seconds = lookahead_minutes * 60 * 4 # Look back 4x the prediction window
# Get recent RPS data points
rps_points = [p for p in history['rps']
if p.timestamp >= current_time - lookback_seconds]
if len(rps_points) < 10: # Need sufficient data
return self.get_current_metrics(app_name)
# Simple linear trend calculation
x_values = [p.timestamp - current_time for p in rps_points]
y_values = [p.value for p in rps_points]
# Calculate trend (slope)
n = len(x_values)
sum_x = sum(x_values)
sum_y = sum(y_values)
sum_xy = sum(x * y for x, y in zip(x_values, y_values))
sum_x2 = sum(x * x for x in x_values)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
intercept = (sum_y - slope * sum_x) / n
# Predict future RPS
future_time = lookahead_minutes * 60
predicted_rps = slope * future_time + intercept
# Create predicted metrics
current_metrics = self.get_current_metrics(app_name)
predicted_metrics = ScalingMetrics(
rps=max(predicted_rps, 0), # RPS can't be negative
cpu_percent=current_metrics.cpu_percent,
memory_percent=current_metrics.memory_percent,
p95_latency_ms=current_metrics.p95_latency_ms,
active_connections=current_metrics.active_connections,
healthy_replicas=current_metrics.healthy_replicas,
total_replicas=current_metrics.total_replicas
)
return predicted_metrics
Custom Scaling Strategies¶
class ScalingStrategy:
"""Base class for custom scaling strategies."""
def should_scale(self, metrics: ScalingMetrics, policy: ScalingPolicy) -> ScalingDecision:
raise NotImplementedError
class ConservativeStrategy(ScalingStrategy):
"""Conservative scaling strategy - prefers stability over responsiveness."""
def should_scale(self, metrics: ScalingMetrics, policy: ScalingPolicy) -> ScalingDecision:
# Require multiple metrics to be over threshold
scale_factors = calculate_scale_factors(metrics, policy)
over_threshold = sum(1 for f in scale_factors.values()
if f > policy.scale_out_threshold_pct / 100)
if over_threshold >= 2: # Require at least 2 metrics
return ScalingDecision(should_scale=True, direction='out')
# Scale in only if all metrics very low
if all(f < policy.scale_in_threshold_pct / 200 for f in scale_factors.values()):
return ScalingDecision(should_scale=True, direction='in')
return ScalingDecision(should_scale=False)
class AggressiveStrategy(ScalingStrategy):
"""Aggressive scaling strategy - prioritizes performance over cost."""
def should_scale(self, metrics: ScalingMetrics, policy: ScalingPolicy) -> ScalingDecision:
scale_factors = calculate_scale_factors(metrics, policy)
# Scale out if any metric approaching threshold
if any(f > policy.scale_out_threshold_pct / 150 for f in scale_factors.values()):
return ScalingDecision(should_scale=True, direction='out')
# Scale in only if all metrics very low for extended period
if (all(f < policy.scale_in_threshold_pct / 100 for f in scale_factors.values()) and
self.low_utilization_duration(app_name) > 600): # 10 minutes
return ScalingDecision(should_scale=True, direction='in')
return ScalingDecision(should_scale=False)
Performance Optimization¶
Efficient Metrics Storage¶
class RingBuffer:
"""Memory-efficient ring buffer for metrics storage."""
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self.buffer = [None] * max_size
self.head = 0
self.size = 0
def append(self, item):
self.buffer[self.head] = item
self.head = (self.head + 1) % self.max_size
if self.size < self.max_size:
self.size += 1
def get_recent(self, count: int):
if count >= self.size:
# Return all items
if self.size < self.max_size:
return [item for item in self.buffer[:self.size] if item is not None]
else:
return (self.buffer[self.head:] + self.buffer[:self.head])
else:
# Return most recent items
items = []
pos = (self.head - 1) % self.max_size
for _ in range(count):
if self.buffer[pos] is not None:
items.append(self.buffer[pos])
pos = (pos - 1) % self.max_size
return list(reversed(items))
Batch Scaling Operations¶
async def batch_scale_operations(self, scaling_decisions: List[Tuple[str, int]]):
"""
Execute multiple scaling operations in parallel.
Improves performance when scaling multiple applications simultaneously.
"""
tasks = []
for app_name, target_replicas in scaling_decisions:
task = asyncio.create_task(self.execute_scaling(app_name, target_replicas))
tasks.append(task)
# Execute all scaling operations concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results and exceptions
for i, result in enumerate(results):
app_name, target_replicas = scaling_decisions[i]
if isinstance(result, Exception):
logger.error(f"Scaling failed for {app_name}: {result}")
else:
logger.info(f"Successfully scaled {app_name} to {target_replicas} replicas")
Monitoring and Debugging¶
Scaling Decision Logging¶
def log_scaling_decision(self, app_name: str, decision: ScalingDecision):
"""Log detailed scaling decision for debugging and analysis."""
log_data = {
'app_name': app_name,
'timestamp': time.time(),
'should_scale': decision.should_scale,
'current_replicas': decision.current_replicas,
'target_replicas': decision.target_replicas,
'reason': decision.reason,
'triggered_by': decision.triggered_by,
'metrics': {
'cpu_percent': decision.metrics.cpu_percent,
'memory_percent': decision.metrics.memory_percent,
'rps': decision.metrics.rps,
'latency_p95_ms': decision.metrics.p95_latency_ms,
'active_connections': decision.metrics.active_connections
} if decision.metrics else None,
'scale_factors': self.last_scale_factors.get(app_name, {})
}
# Store in database for analysis
asyncio.create_task(
self.state_store.log_event(
app_name=app_name,
event_type='scaling_decision',
message=decision.reason,
details=log_data
)
)
Performance Metrics¶
def get_scaling_performance_metrics(self, app_name: str) -> dict:
"""Get performance metrics for the scaling algorithm."""
decisions = self.get_recent_scaling_decisions(app_name, hours=24)
total_decisions = len(decisions)
scale_out_decisions = len([d for d in decisions if d.target_replicas > d.current_replicas])
scale_in_decisions = len([d for d in decisions if d.target_replicas < d.current_replicas])
no_scale_decisions = total_decisions - scale_out_decisions - scale_in_decisions
# Calculate average response time to scaling needs
response_times = []
for decision in decisions:
if decision.should_scale:
# Time from when scaling was needed to when it was decided
response_time = self.calculate_response_time(app_name, decision.timestamp)
if response_time:
response_times.append(response_time)
return {
'total_decisions': total_decisions,
'scale_out_count': scale_out_decisions,
'scale_in_count': scale_in_decisions,
'no_scale_count': no_scale_decisions,
'avg_response_time_seconds': statistics.mean(response_times) if response_times else 0,
'scaling_efficiency': self.calculate_scaling_efficiency(app_name),
'oscillation_rate': self.calculate_oscillation_rate(app_name)
}
Next Steps: Learn about Health Monitoring and how health checks integrate with scaling decisions.