AI Model Monitoring: Drift Detection and Performance Tracking
Introduction
Production ML models degrade over time due to data drift, concept drift, and changing patterns. This guide covers monitoring strategies, drift detection, performance tracking, and automated remediation.
Types of Model Drift
Data Drift (Covariate Shift)
Input distribution changes over time.
from scipy.stats import ks_2samp
def detect_data_drift(baseline_data, current_data, features, threshold=0.05):
"""Kolmogorov-Smirnov test for distribution change"""
drift_detected = {}
for feature in features:
statistic, p_value = ks_2samp(
baseline_data[feature],
current_data[feature]
)
drift_detected[feature] = {
"statistic": statistic,
"p_value": p_value,
"drift": p_value < threshold
}
return drift_detected
Concept Drift
Relationship between features and target changes.
def detect_concept_drift(model, X_baseline, y_baseline, X_current, y_current, threshold=0.05):
"""Compare model performance across time periods"""
baseline_accuracy = model.score(X_baseline, y_baseline)
current_accuracy = model.score(X_current, y_current)
accuracy_drop = baseline_accuracy - current_accuracy
return {
"baseline_accuracy": baseline_accuracy,
"current_accuracy": current_accuracy,
"accuracy_drop": accuracy_drop,
"drift_detected": accuracy_drop > threshold
}
Azure ML Data Drift Monitoring
Configure Monitor
from azure.ai.ml import MLClient
from azure.ai.ml.entities import DataDriftMonitor
from azure.identity import DefaultAzureCredential
ml_client = MLClient.from_config(credential=DefaultAzureCredential())
monitor = DataDriftMonitor(
name="credit-model-drift-monitor",
endpoint_name="credit-risk-endpoint",
deployment_name="production",
baseline_dataset="azureml:baseline-data:1",
target_dataset="azureml:production-data:1",
features=["income", "debt_ratio", "credit_score", "employment_length"],
compute="monitoring-cluster",
frequency="Day",
alert_enabled=True,
alert_threshold=0.1
)
ml_client.data_drift_monitors.begin_create_or_update(monitor)
Statistical Drift Detection Methods
Population Stability Index (PSI)
import numpy as np
def calculate_psi(baseline, current, bins=10):
"""Calculate PSI between baseline and current distributions"""
baseline_counts, bin_edges = np.histogram(baseline, bins=bins)
current_counts, _ = np.histogram(current, bins=bin_edges)
baseline_pct = baseline_counts / len(baseline)
current_pct = current_counts / len(current)
# Avoid division by zero
baseline_pct = np.where(baseline_pct == 0, 0.0001, baseline_pct)
current_pct = np.where(current_pct == 0, 0.0001, current_pct)
psi = np.sum((current_pct - baseline_pct) * np.log(current_pct / baseline_pct))
interpretation = "stable" if psi < 0.1 else "moderate drift" if psi < 0.25 else "significant drift"
return {"psi": psi, "interpretation": interpretation}
Wasserstein Distance
from scipy.stats import wasserstein_distance
def calculate_wasserstein(baseline, current):
"""Calculate earth mover's distance"""
distance = wasserstein_distance(baseline, current)
return distance
Performance Metrics Tracking
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
class ModelPerformanceTracker:
def __init__(self):
self.metrics_history = []
def log_metrics(self, y_true, y_pred, timestamp):
"""Track classification metrics over time"""
metrics = {
"timestamp": timestamp,
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, average='weighted'),
"recall": recall_score(y_true, y_pred, average='weighted'),
"f1": f1_score(y_true, y_pred, average='weighted')
}
self.metrics_history.append(metrics)
return metrics
def detect_performance_degradation(self, window_size=30, threshold=0.05):
"""Compare recent performance to historical baseline"""
df = pd.DataFrame(self.metrics_history)
if len(df) < window_size * 2:
return {"degradation_detected": False, "message": "Insufficient data"}
baseline = df.head(window_size)["accuracy"].mean()
recent = df.tail(window_size)["accuracy"].mean()
degradation = baseline - recent
return {
"degradation_detected": degradation > threshold,
"baseline_accuracy": baseline,
"recent_accuracy": recent,
"degradation": degradation
}
Real-Time Monitoring Dashboard
Azure Application Insights
from applicationinsights import TelemetryClient
telemetry = TelemetryClient('<instrumentation-key>')
def log_prediction(features, prediction, confidence, actual=None):
"""Log prediction details for monitoring"""
properties = {
"prediction": str(prediction),
"confidence": confidence,
"feature_hash": hash(str(features))
}
if actual is not None:
properties["actual"] = str(actual)
properties["correct"] = prediction == actual
telemetry.track_event("model_prediction", properties)
telemetry.track_metric("prediction_confidence", confidence)
telemetry.flush()
Custom Monitoring API
from flask import Flask, request, jsonify
import numpy as np
app = Flask(__name__)
tracker = ModelPerformanceTracker()
@app.route("/monitor/metrics", methods=["POST"])
def log_metrics():
data = request.json
metrics = tracker.log_metrics(
y_true=data["y_true"],
y_pred=data["y_pred"],
timestamp=data["timestamp"]
)
return jsonify(metrics)
@app.route("/monitor/drift", methods=["POST"])
def check_drift():
data = request.json
drift = detect_data_drift(
baseline_data=data["baseline"],
current_data=data["current"],
features=data["features"]
)
return jsonify(drift)
Alerting and Notifications
Azure Logic Apps Integration
import requests
def send_drift_alert(drift_info):
"""Trigger Logic App workflow for alerting"""
logic_app_url = "https://<logic-app-url>/triggers/manual/paths/invoke"
payload = {
"alert_type": "data_drift",
"severity": "high" if drift_info["drift_detected"] else "low",
"details": drift_info,
"timestamp": datetime.utcnow().isoformat()
}
response = requests.post(logic_app_url, json=payload)
return response.status_code
Email Notifications
from azure.communication.email import EmailClient
def send_performance_alert(metrics):
"""Send email alert for performance degradation"""
email_client = EmailClient.from_connection_string("<connection-string>")
message = {
"senderAddress": "alerts@contoso.com",
"recipients": {
"to": [{"address": "ml-team@contoso.com"}]
},
"content": {
"subject": "Model Performance Degradation Detected",
"plainText": f"Recent accuracy: {metrics['recent_accuracy']:.2f}\n"
f"Baseline accuracy: {metrics['baseline_accuracy']:.2f}\n"
f"Degradation: {metrics['degradation']:.2f}"
}
}
email_client.begin_send(message)
Automated Retraining Workflow
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Job
def trigger_retraining(ml_client, drift_threshold=0.1):
"""Automatically retrain model when drift exceeds threshold"""
# Check drift
drift_report = ml_client.data_drift_monitors.get("credit-model-drift-monitor")
if drift_report.metrics["drift_coefficient"] > drift_threshold:
# Submit retraining pipeline
pipeline_job = ml_client.jobs.create_or_update(
Job.load("./retraining-pipeline.yml")
)
print(f"Retraining triggered: {pipeline_job.name}")
return pipeline_job
return None
Monitoring Best Practices
- Establish baseline metrics from validation set
- Monitor both technical and business metrics
- Set appropriate alert thresholds (avoid false alarms)
- Log prediction inputs and outputs
- Track latency and throughput
- Monitor resource utilization (CPU, memory)
- Implement canary deployments for new models
- Version monitoring configurations
Troubleshooting
| Issue | Cause | Resolution |
|---|---|---|
| False drift alerts | High variance in data | Increase window size; adjust threshold |
| Missed drift | Threshold too high | Lower alert threshold; use multiple metrics |
| Performance drop | Concept drift | Retrain with recent labeled data |
| High monitoring cost | Too frequent checks | Reduce monitoring frequency |
Key Takeaways
Effective model monitoring detects drift early, tracks performance trends, and triggers automated remediation to maintain model reliability.