Track ML systems in production and maintain reliability
Unlike traditional software, ML models degrade over time even without code changes. **What Can Go Wrong:** **Data Drift**: Input distribution changes - Feature values shift - New categories appear - Data quality degrades **Concept Drift**: Relationship between features and target changes - User behavior evolves - Market conditions change - Seasonality effects **Model Performance**: Accuracy decreases - Outdated patterns - New edge cases - Competition dynamics **Operational Issues**: - High latency - Memory leaks - API errors - Infrastructure failures **Monitoring Layers:** 1. **Model Performance**: Accuracy, precision, recall 2. **Data Quality**: Missing values, outliers, distributions 3. **System Health**: Latency, throughput, errors 4. **Business Metrics**: Revenue impact, user satisfaction
Implement comprehensive model monitoring:
import numpy as np
from datetime import datetime
from typing import Dict, List
import json
from collections import deque
import logging
class ModelMonitor:
"""Monitor ML model predictions and performance"""
def __init__(self, window_size=1000):
self.window_size = window_size
# Prediction history
self.predictions = deque(maxlen=window_size)
self.features = deque(maxlen=window_size)
self.actuals = deque(maxlen=window_size)
# Statistics
self.feature_stats = {}
self.baseline_stats = {}
# Alerts
self.alerts = []
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def set_baseline(self, X_train):
"""Set baseline statistics from training data"""
self.baseline_stats = {
'feature_means': np.mean(X_train, axis=0).tolist(),
'feature_stds': np.std(X_train, axis=0).tolist(),
'feature_mins': np.min(X_train, axis=0).tolist(),
'feature_maxs': np.max(X_train, axis=0).tolist(),
}
self.logger.info("Baseline statistics set")
def log_prediction(self, features, prediction, actual=None):
"""Log a prediction and its features"""
self.predictions.append({
'prediction': prediction,
'timestamp': datetime.now().isoformat(),
'actual': actual
})
self.features.append(features)
if actual is not None:
self.actuals.append(actual)
def check_data_drift(self, threshold=2.0):
"""Detect data drift using statistical distance"""
if len(self.features) < 100:
return None
recent_features = np.array(list(self.features)[-100:])
current_means = np.mean(recent_features, axis=0)
current_stds = np.std(recent_features, axis=0)
# Calculate drift score (normalized difference)
drift_scores = []
for i in range(len(current_means)):
baseline_mean = self.baseline_stats['feature_means'][i]
baseline_std = self.baseline_stats['feature_stds'][i]
if baseline_std > 0:
drift = abs(current_means[i] - baseline_mean) / baseline_std
drift_scores.append(drift)
max_drift = max(drift_scores)
drift_detected = max_drift > threshold
if drift_detected:
alert = {
'type': 'data_drift',
'severity': 'high' if max_drift > 3.0 else 'medium',
'message': f'Data drift detected (score: {max_drift:.2f})',
'timestamp': datetime.now().isoformat()
}
self.alerts.append(alert)
self.logger.warning(alert['message'])
return {
'drift_detected': drift_detected,
'max_drift_score': max_drift,
'feature_drift_scores': drift_scores
}
def calculate_performance(self):
"""Calculate model performance metrics"""
if len(self.actuals) < 10:
return None
predictions = [p['prediction'] for p in self.predictions[-len(self.actuals):]]
actuals = list(self.actuals)
# Accuracy
correct = sum(1 for p, a in zip(predictions, actuals) if p == a)
accuracy = correct / len(actuals)
# Confusion matrix (for binary classification)
tp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 1)
tn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 0)
fp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 0)
fn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 1)
# Metrics
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1,
'confusion_matrix': {'tp': tp, 'tn': tn, 'fp': fp, 'fn': fn}
}
def check_performance_degradation(self, baseline_accuracy=0.9, threshold=0.05):
"""Check if model performance has degraded"""
metrics = self.calculate_performance()
if metrics is None:
return None
accuracy = metrics['accuracy']
degradation = baseline_accuracy - accuracy
if degradation > threshold:
alert = {
'type': 'performance_degradation',
'severity': 'critical' if degradation > 0.1 else 'high',
'message': f'Model accuracy dropped to {accuracy:.2%} (baseline: {baseline_accuracy:.2%})',
'timestamp': datetime.now().isoformat()
}
self.alerts.append(alert)
self.logger.error(alert['message'])
return {
'current_accuracy': accuracy,
'baseline_accuracy': baseline_accuracy,
'degradation': degradation,
'alert_triggered': degradation > threshold
}
def get_dashboard_metrics(self):
"""Get all metrics for monitoring dashboard"""
return {
'total_predictions': len(self.predictions),
'prediction_rate': len(self.predictions) / self.window_size,
'recent_performance': self.calculate_performance(),
'drift_analysis': self.check_data_drift(),
'active_alerts': len([a for a in self.alerts if a['severity'] in ['high', 'critical']]),
'timestamp': datetime.now().isoformat()
}
# Example usage
print("Model Monitoring System")
print("=" * 60)
# Initialize monitor
monitor = ModelMonitor(window_size=1000)
# Set baseline from training data
X_train = np.random.randn(1000, 10)
monitor.set_baseline(X_train)
# Simulate predictions
print("\nSimulating predictions...")
for i in range(150):
# Normal data
if i < 100:
features = np.random.randn(10)
# Drifted data
else:
features = np.random.randn(10) + 0.5
prediction = np.random.randint(0, 2)
actual = np.random.randint(0, 2)
monitor.log_prediction(features, prediction, actual)
# Check drift
drift_analysis = monitor.check_data_drift()
print(f"\nData Drift Analysis:")
print(f" Drift detected: {drift_analysis['drift_detected']}")
print(f" Max drift score: {drift_analysis['max_drift_score']:.2f}")
# Check performance
perf_check = monitor.check_performance_degradation(baseline_accuracy=0.90)
if perf_check:
print(f"\nPerformance Check:")
print(f" Current accuracy: {perf_check['current_accuracy']:.2%}")
print(f" Degradation: {perf_check['degradation']:.2%}")
# Dashboard metrics
dashboard = monitor.get_dashboard_metrics()
print(f"\nDashboard Metrics:")
print(f" Total predictions: {dashboard['total_predictions']}")
print(f" Active alerts: {dashboard['active_alerts']}")
# Show alerts
if monitor.alerts:
print(f"\nActive Alerts:")
for alert in monitor.alerts[-3:]:
print(f" [{alert['severity'].upper()}] {alert['message']}")
print(f"\nā Monitoring system operational!")Model Monitoring System ============================================================ Simulating predictions... Data Drift Analysis: Drift detected: True Max drift score: 2.45 Performance Check: Current accuracy: 48.67% Degradation: 41.33% Dashboard Metrics: Total predictions: 150 Active alerts: 2 Active Alerts: [HIGH] Data drift detected (score: 2.45) [CRITICAL] Model accuracy dropped to 48.67% (baseline: 90.00%) ā Monitoring system operational!
**Monitoring Tools:** | Tool | Purpose | Best For | |------|---------|----------| | **Prometheus** | Metrics collection | System metrics, custom metrics | | **Grafana** | Visualization | Dashboards, alerting | | **ELK Stack** | Log aggregation | Application logs, debugging | | **Jaeger** | Distributed tracing | Request tracing, latency analysis | | **WhyLabs** | ML monitoring | Data drift, model performance | | **Evidently** | ML observability | Model validation, reporting | **Metrics to Track:** **Model Metrics:** - Accuracy, Precision, Recall, F1 - Prediction distribution - Confidence scores - Feature importance changes **Data Metrics:** - Feature distributions - Missing value rates - Data schema changes - Outlier frequency **System Metrics:** - Request latency (p50, p95, p99) - Throughput (requests/second) - Error rates (4xx, 5xx) - CPU/Memory usage - GPU utilization **Business Metrics:** - User engagement - Revenue impact - Cost per prediction - Model ROI
**1. Model Versioning** - Version control for code, data, and models - Use DVC, MLflow, or Weights & Biases - Track hyperparameters and metrics - Enable reproducibility **2. CI/CD for ML** ```yaml # Example GitHub Actions workflow name: ML Pipeline on: [push] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Run tests run: pytest tests/ - name: Train model run: python train.py - name: Validate model run: python validate.py - name: Deploy model if: github.ref == 'refs/heads/main' run: python deploy.py ``` **3. Automated Retraining** - Monitor performance triggers - Schedule periodic retraining - A/B test new models - Gradual rollout (canary deployment) **4. Model Governance** - Document model decisions - Track model lineage - Audit trail for compliance - Access control and security **5. Incident Response** - Automated alerts - Rollback procedures - On-call rotation - Post-mortem analysis **6. Feature Store** - Centralize feature definitions - Ensure train-serve consistency - Enable feature reuse - Version feature pipelines **Tools & Platforms:** - **Kubeflow**: Kubernetes-native ML workflows - **MLflow**: Experiment tracking, model registry - **Feast**: Feature store - **Seldon Core**: Model serving on Kubernetes - **Airflow**: Workflow orchestration - **DVC**: Data version control
**Pre-Deployment:** - [ ] Model performance validated on test set - [ ] Edge cases and failure modes identified - [ ] Inference latency meets requirements - [ ] Model size optimized - [ ] Security audit completed - [ ] Documentation written - [ ] API contract defined - [ ] Error handling implemented **Deployment:** - [ ] Health check endpoint working - [ ] Logging configured - [ ] Monitoring dashboards created - [ ] Alerting rules set up - [ ] Load testing completed - [ ] Backup and rollback plan ready - [ ] Gradual rollout strategy defined - [ ] On-call team notified **Post-Deployment:** - [ ] Monitor initial performance - [ ] Validate business metrics - [ ] Check for data drift - [ ] Review error logs - [ ] Gather user feedback - [ ] Document lessons learned - [ ] Plan next iteration **Ongoing:** - [ ] Weekly performance reviews - [ ] Monthly retraining evaluation - [ ] Quarterly model audits - [ ] Continuous monitoring of drift - [ ] Regular security updates - [ ] Cost optimization reviews