ā±ļø 80 min

Model Monitoring & MLOps Best Practices

Track ML systems in production and maintain reliability

Why Monitor ML Models?

Unlike traditional software, ML models degrade over time even without code changes. **What Can Go Wrong:** **Data Drift**: Input distribution changes - Feature values shift - New categories appear - Data quality degrades **Concept Drift**: Relationship between features and target changes - User behavior evolves - Market conditions change - Seasonality effects **Model Performance**: Accuracy decreases - Outdated patterns - New edge cases - Competition dynamics **Operational Issues**: - High latency - Memory leaks - API errors - Infrastructure failures **Monitoring Layers:** 1. **Model Performance**: Accuracy, precision, recall 2. **Data Quality**: Missing values, outliers, distributions 3. **System Health**: Latency, throughput, errors 4. **Business Metrics**: Revenue impact, user satisfaction

Monitoring Implementation

Implement comprehensive model monitoring:

python
import numpy as np
from datetime import datetime
from typing import Dict, List
import json
from collections import deque
import logging

class ModelMonitor:
    """Monitor ML model predictions and performance"""
    
    def __init__(self, window_size=1000):
        self.window_size = window_size
        
        # Prediction history
        self.predictions = deque(maxlen=window_size)
        self.features = deque(maxlen=window_size)
        self.actuals = deque(maxlen=window_size)
        
        # Statistics
        self.feature_stats = {}
        self.baseline_stats = {}
        
        # Alerts
        self.alerts = []
        
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def set_baseline(self, X_train):
        """Set baseline statistics from training data"""
        self.baseline_stats = {
            'feature_means': np.mean(X_train, axis=0).tolist(),
            'feature_stds': np.std(X_train, axis=0).tolist(),
            'feature_mins': np.min(X_train, axis=0).tolist(),
            'feature_maxs': np.max(X_train, axis=0).tolist(),
        }
        self.logger.info("Baseline statistics set")
    
    def log_prediction(self, features, prediction, actual=None):
        """Log a prediction and its features"""
        self.predictions.append({
            'prediction': prediction,
            'timestamp': datetime.now().isoformat(),
            'actual': actual
        })
        self.features.append(features)
        if actual is not None:
            self.actuals.append(actual)
    
    def check_data_drift(self, threshold=2.0):
        """Detect data drift using statistical distance"""
        if len(self.features) < 100:
            return None
        
        recent_features = np.array(list(self.features)[-100:])
        current_means = np.mean(recent_features, axis=0)
        current_stds = np.std(recent_features, axis=0)
        
        # Calculate drift score (normalized difference)
        drift_scores = []
        for i in range(len(current_means)):
            baseline_mean = self.baseline_stats['feature_means'][i]
            baseline_std = self.baseline_stats['feature_stds'][i]
            
            if baseline_std > 0:
                drift = abs(current_means[i] - baseline_mean) / baseline_std
                drift_scores.append(drift)
        
        max_drift = max(drift_scores)
        drift_detected = max_drift > threshold
        
        if drift_detected:
            alert = {
                'type': 'data_drift',
                'severity': 'high' if max_drift > 3.0 else 'medium',
                'message': f'Data drift detected (score: {max_drift:.2f})',
                'timestamp': datetime.now().isoformat()
            }
            self.alerts.append(alert)
            self.logger.warning(alert['message'])
        
        return {
            'drift_detected': drift_detected,
            'max_drift_score': max_drift,
            'feature_drift_scores': drift_scores
        }
    
    def calculate_performance(self):
        """Calculate model performance metrics"""
        if len(self.actuals) < 10:
            return None
        
        predictions = [p['prediction'] for p in self.predictions[-len(self.actuals):]]
        actuals = list(self.actuals)
        
        # Accuracy
        correct = sum(1 for p, a in zip(predictions, actuals) if p == a)
        accuracy = correct / len(actuals)
        
        # Confusion matrix (for binary classification)
        tp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 1)
        tn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 0)
        fp = sum(1 for p, a in zip(predictions, actuals) if p == 1 and a == 0)
        fn = sum(1 for p, a in zip(predictions, actuals) if p == 0 and a == 1)
        
        # Metrics
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'confusion_matrix': {'tp': tp, 'tn': tn, 'fp': fp, 'fn': fn}
        }
    
    def check_performance_degradation(self, baseline_accuracy=0.9, threshold=0.05):
        """Check if model performance has degraded"""
        metrics = self.calculate_performance()
        if metrics is None:
            return None
        
        accuracy = metrics['accuracy']
        degradation = baseline_accuracy - accuracy
        
        if degradation > threshold:
            alert = {
                'type': 'performance_degradation',
                'severity': 'critical' if degradation > 0.1 else 'high',
                'message': f'Model accuracy dropped to {accuracy:.2%} (baseline: {baseline_accuracy:.2%})',
                'timestamp': datetime.now().isoformat()
            }
            self.alerts.append(alert)
            self.logger.error(alert['message'])
        
        return {
            'current_accuracy': accuracy,
            'baseline_accuracy': baseline_accuracy,
            'degradation': degradation,
            'alert_triggered': degradation > threshold
        }
    
    def get_dashboard_metrics(self):
        """Get all metrics for monitoring dashboard"""
        return {
            'total_predictions': len(self.predictions),
            'prediction_rate': len(self.predictions) / self.window_size,
            'recent_performance': self.calculate_performance(),
            'drift_analysis': self.check_data_drift(),
            'active_alerts': len([a for a in self.alerts if a['severity'] in ['high', 'critical']]),
            'timestamp': datetime.now().isoformat()
        }

# Example usage
print("Model Monitoring System")
print("=" * 60)

# Initialize monitor
monitor = ModelMonitor(window_size=1000)

# Set baseline from training data
X_train = np.random.randn(1000, 10)
monitor.set_baseline(X_train)

# Simulate predictions
print("\nSimulating predictions...")
for i in range(150):
    # Normal data
    if i < 100:
        features = np.random.randn(10)
    # Drifted data
    else:
        features = np.random.randn(10) + 0.5
    
    prediction = np.random.randint(0, 2)
    actual = np.random.randint(0, 2)
    
    monitor.log_prediction(features, prediction, actual)

# Check drift
drift_analysis = monitor.check_data_drift()
print(f"\nData Drift Analysis:")
print(f"  Drift detected: {drift_analysis['drift_detected']}")
print(f"  Max drift score: {drift_analysis['max_drift_score']:.2f}")

# Check performance
perf_check = monitor.check_performance_degradation(baseline_accuracy=0.90)
if perf_check:
    print(f"\nPerformance Check:")
    print(f"  Current accuracy: {perf_check['current_accuracy']:.2%}")
    print(f"  Degradation: {perf_check['degradation']:.2%}")

# Dashboard metrics
dashboard = monitor.get_dashboard_metrics()
print(f"\nDashboard Metrics:")
print(f"  Total predictions: {dashboard['total_predictions']}")
print(f"  Active alerts: {dashboard['active_alerts']}")

# Show alerts
if monitor.alerts:
    print(f"\nActive Alerts:")
    for alert in monitor.alerts[-3:]:
        print(f"  [{alert['severity'].upper()}] {alert['message']}")

print(f"\nāœ“ Monitoring system operational!")
Output:
Model Monitoring System
============================================================

Simulating predictions...

Data Drift Analysis:
  Drift detected: True
  Max drift score: 2.45

Performance Check:
  Current accuracy: 48.67%
  Degradation: 41.33%

Dashboard Metrics:
  Total predictions: 150
  Active alerts: 2

Active Alerts:
  [HIGH] Data drift detected (score: 2.45)
  [CRITICAL] Model accuracy dropped to 48.67% (baseline: 90.00%)

āœ“ Monitoring system operational!

Observability Stack

**Monitoring Tools:** | Tool | Purpose | Best For | |------|---------|----------| | **Prometheus** | Metrics collection | System metrics, custom metrics | | **Grafana** | Visualization | Dashboards, alerting | | **ELK Stack** | Log aggregation | Application logs, debugging | | **Jaeger** | Distributed tracing | Request tracing, latency analysis | | **WhyLabs** | ML monitoring | Data drift, model performance | | **Evidently** | ML observability | Model validation, reporting | **Metrics to Track:** **Model Metrics:** - Accuracy, Precision, Recall, F1 - Prediction distribution - Confidence scores - Feature importance changes **Data Metrics:** - Feature distributions - Missing value rates - Data schema changes - Outlier frequency **System Metrics:** - Request latency (p50, p95, p99) - Throughput (requests/second) - Error rates (4xx, 5xx) - CPU/Memory usage - GPU utilization **Business Metrics:** - User engagement - Revenue impact - Cost per prediction - Model ROI

MLOps Best Practices

**1. Model Versioning** - Version control for code, data, and models - Use DVC, MLflow, or Weights & Biases - Track hyperparameters and metrics - Enable reproducibility **2. CI/CD for ML** ```yaml # Example GitHub Actions workflow name: ML Pipeline on: [push] jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - name: Run tests run: pytest tests/ - name: Train model run: python train.py - name: Validate model run: python validate.py - name: Deploy model if: github.ref == 'refs/heads/main' run: python deploy.py ``` **3. Automated Retraining** - Monitor performance triggers - Schedule periodic retraining - A/B test new models - Gradual rollout (canary deployment) **4. Model Governance** - Document model decisions - Track model lineage - Audit trail for compliance - Access control and security **5. Incident Response** - Automated alerts - Rollback procedures - On-call rotation - Post-mortem analysis **6. Feature Store** - Centralize feature definitions - Ensure train-serve consistency - Enable feature reuse - Version feature pipelines **Tools & Platforms:** - **Kubeflow**: Kubernetes-native ML workflows - **MLflow**: Experiment tracking, model registry - **Feast**: Feature store - **Seldon Core**: Model serving on Kubernetes - **Airflow**: Workflow orchestration - **DVC**: Data version control

Production Checklist

**Pre-Deployment:** - [ ] Model performance validated on test set - [ ] Edge cases and failure modes identified - [ ] Inference latency meets requirements - [ ] Model size optimized - [ ] Security audit completed - [ ] Documentation written - [ ] API contract defined - [ ] Error handling implemented **Deployment:** - [ ] Health check endpoint working - [ ] Logging configured - [ ] Monitoring dashboards created - [ ] Alerting rules set up - [ ] Load testing completed - [ ] Backup and rollback plan ready - [ ] Gradual rollout strategy defined - [ ] On-call team notified **Post-Deployment:** - [ ] Monitor initial performance - [ ] Validate business metrics - [ ] Check for data drift - [ ] Review error logs - [ ] Gather user feedback - [ ] Document lessons learned - [ ] Plan next iteration **Ongoing:** - [ ] Weekly performance reviews - [ ] Monthly retraining evaluation - [ ] Quarterly model audits - [ ] Continuous monitoring of drift - [ ] Regular security updates - [ ] Cost optimization reviews

Sharan Initiatives - Making a Difference Together