AI Model Monitoring: Drift Detection and Performance Tracking

AI Model Monitoring: Drift Detection and Performance Tracking

Introduction

Production ML models degrade over time due to data drift, concept drift, and changing patterns. This guide covers monitoring strategies, drift detection, performance tracking, and automated remediation.

Types of Model Drift

Data Drift (Covariate Shift)

Input distribution changes over time.

from scipy.stats import ks_2samp

def detect_data_drift(baseline_data, current_data, features, threshold=0.05):
    """Kolmogorov-Smirnov test for distribution change"""
    drift_detected = {}
    
    for feature in features:
        statistic, p_value = ks_2samp(
            baseline_data[feature],
            current_data[feature]
        )
        
        drift_detected[feature] = {
            "statistic": statistic,
            "p_value": p_value,
            "drift": p_value < threshold
        }
    
    return drift_detected

Concept Drift

Relationship between features and target changes.

def detect_concept_drift(model, X_baseline, y_baseline, X_current, y_current, threshold=0.05):
    """Compare model performance across time periods"""
    baseline_accuracy = model.score(X_baseline, y_baseline)
    current_accuracy = model.score(X_current, y_current)
    
    accuracy_drop = baseline_accuracy - current_accuracy
    
    return {
        "baseline_accuracy": baseline_accuracy,
        "current_accuracy": current_accuracy,
        "accuracy_drop": accuracy_drop,
        "drift_detected": accuracy_drop > threshold
    }

Azure ML Data Drift Monitoring

Configure Monitor

from azure.ai.ml import MLClient
from azure.ai.ml.entities import DataDriftMonitor
from azure.identity import DefaultAzureCredential

ml_client = MLClient.from_config(credential=DefaultAzureCredential())

monitor = DataDriftMonitor(
    name="credit-model-drift-monitor",
    endpoint_name="credit-risk-endpoint",
    deployment_name="production",
    baseline_dataset="azureml:baseline-data:1",
    target_dataset="azureml:production-data:1",
    features=["income", "debt_ratio", "credit_score", "employment_length"],
    compute="monitoring-cluster",
    frequency="Day",
    alert_enabled=True,
    alert_threshold=0.1
)

ml_client.data_drift_monitors.begin_create_or_update(monitor)

Statistical Drift Detection Methods

Population Stability Index (PSI)

import numpy as np

def calculate_psi(baseline, current, bins=10):
    """Calculate PSI between baseline and current distributions"""
    baseline_counts, bin_edges = np.histogram(baseline, bins=bins)
    current_counts, _ = np.histogram(current, bins=bin_edges)
    
    baseline_pct = baseline_counts / len(baseline)
    current_pct = current_counts / len(current)
    
    # Avoid division by zero
    baseline_pct = np.where(baseline_pct == 0, 0.0001, baseline_pct)
    current_pct = np.where(current_pct == 0, 0.0001, current_pct)
    
    psi = np.sum((current_pct - baseline_pct) * np.log(current_pct / baseline_pct))
    
    interpretation = "stable" if psi < 0.1 else "moderate drift" if psi < 0.25 else "significant drift"
    
    return {"psi": psi, "interpretation": interpretation}

Wasserstein Distance

from scipy.stats import wasserstein_distance

def calculate_wasserstein(baseline, current):
    """Calculate earth mover's distance"""
    distance = wasserstein_distance(baseline, current)
    return distance

Performance Metrics Tracking

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd

class ModelPerformanceTracker:
    def __init__(self):
        self.metrics_history = []
    
    def log_metrics(self, y_true, y_pred, timestamp):
        """Track classification metrics over time"""
        metrics = {
            "timestamp": timestamp,
            "accuracy": accuracy_score(y_true, y_pred),
            "precision": precision_score(y_true, y_pred, average='weighted'),
            "recall": recall_score(y_true, y_pred, average='weighted'),
            "f1": f1_score(y_true, y_pred, average='weighted')
        }
        self.metrics_history.append(metrics)
        return metrics
    
    def detect_performance_degradation(self, window_size=30, threshold=0.05):
        """Compare recent performance to historical baseline"""
        df = pd.DataFrame(self.metrics_history)
        
        if len(df) < window_size * 2:
            return {"degradation_detected": False, "message": "Insufficient data"}
        
        baseline = df.head(window_size)["accuracy"].mean()
        recent = df.tail(window_size)["accuracy"].mean()
        
        degradation = baseline - recent
        
        return {
            "degradation_detected": degradation > threshold,
            "baseline_accuracy": baseline,
            "recent_accuracy": recent,
            "degradation": degradation
        }

Real-Time Monitoring Dashboard

Azure Application Insights

from applicationinsights import TelemetryClient

telemetry = TelemetryClient('<instrumentation-key>')

def log_prediction(features, prediction, confidence, actual=None):
    """Log prediction details for monitoring"""
    properties = {
        "prediction": str(prediction),
        "confidence": confidence,
        "feature_hash": hash(str(features))
    }
    
    if actual is not None:
        properties["actual"] = str(actual)
        properties["correct"] = prediction == actual
    
    telemetry.track_event("model_prediction", properties)
    telemetry.track_metric("prediction_confidence", confidence)
    
    telemetry.flush()

Custom Monitoring API

from flask import Flask, request, jsonify
import numpy as np

app = Flask(__name__)
tracker = ModelPerformanceTracker()

@app.route("/monitor/metrics", methods=["POST"])
def log_metrics():
    data = request.json
    metrics = tracker.log_metrics(
        y_true=data["y_true"],
        y_pred=data["y_pred"],
        timestamp=data["timestamp"]
    )
    return jsonify(metrics)

@app.route("/monitor/drift", methods=["POST"])
def check_drift():
    data = request.json
    drift = detect_data_drift(
        baseline_data=data["baseline"],
        current_data=data["current"],
        features=data["features"]
    )
    return jsonify(drift)

Alerting and Notifications

Azure Logic Apps Integration

import requests

def send_drift_alert(drift_info):
    """Trigger Logic App workflow for alerting"""
    logic_app_url = "https://<logic-app-url>/triggers/manual/paths/invoke"
    
    payload = {
        "alert_type": "data_drift",
        "severity": "high" if drift_info["drift_detected"] else "low",
        "details": drift_info,
        "timestamp": datetime.utcnow().isoformat()
    }
    
    response = requests.post(logic_app_url, json=payload)
    return response.status_code

Email Notifications

from azure.communication.email import EmailClient

def send_performance_alert(metrics):
    """Send email alert for performance degradation"""
    email_client = EmailClient.from_connection_string("<connection-string>")
    
    message = {
        "senderAddress": "alerts@contoso.com",
        "recipients": {
            "to": [{"address": "ml-team@contoso.com"}]
        },
        "content": {
            "subject": "Model Performance Degradation Detected",
            "plainText": f"Recent accuracy: {metrics['recent_accuracy']:.2f}\n"
                         f"Baseline accuracy: {metrics['baseline_accuracy']:.2f}\n"
                         f"Degradation: {metrics['degradation']:.2f}"
        }
    }
    
    email_client.begin_send(message)

Automated Retraining Workflow

from azure.ai.ml import MLClient
from azure.ai.ml.entities import Job

def trigger_retraining(ml_client, drift_threshold=0.1):
    """Automatically retrain model when drift exceeds threshold"""
    
    # Check drift
    drift_report = ml_client.data_drift_monitors.get("credit-model-drift-monitor")
    
    if drift_report.metrics["drift_coefficient"] > drift_threshold:
        # Submit retraining pipeline
        pipeline_job = ml_client.jobs.create_or_update(
            Job.load("./retraining-pipeline.yml")
        )
        
        print(f"Retraining triggered: {pipeline_job.name}")
        return pipeline_job
    
    return None

Monitoring Best Practices

  • Establish baseline metrics from validation set
  • Monitor both technical and business metrics
  • Set appropriate alert thresholds (avoid false alarms)
  • Log prediction inputs and outputs
  • Track latency and throughput
  • Monitor resource utilization (CPU, memory)
  • Implement canary deployments for new models
  • Version monitoring configurations

Troubleshooting

Issue Cause Resolution
False drift alerts High variance in data Increase window size; adjust threshold
Missed drift Threshold too high Lower alert threshold; use multiple metrics
Performance drop Concept drift Retrain with recent labeled data
High monitoring cost Too frequent checks Reduce monitoring frequency

Key Takeaways

Effective model monitoring detects drift early, tracks performance trends, and triggers automated remediation to maintain model reliability.

References