MLOps: Machine Learning Operations and Pipelines
Executive Summary
MLOps operationalizes the entire machine learning lifecycleβdata ingestion, experimentation, training, deployment, monitoring, and governanceβthrough automation, observability, repeatability, and continuous improvement. This blueprint implements Azure Machine Learning + supporting OSS tooling (MLflow, DVC, GitHub Actions, Terraform/Bicep) to achieve production-grade CI/CD, reproducible pipelines, automated retraining, safe rollout strategies, and telemetry-driven quality management. Target outcomes: <30 min pipeline lead time, rollback in <5 min, drift detection <24h, reproducibility hash parity 100%, model promotion SLA <2h, and defect escape rate <5% of validation issues.
Introduction
Traditional ML workflows stall in production due to manual handoffs, hidden dependencies, inconsistent feature engineering, and weak monitoring. MLOps removes these bottlenecks by enforcing infrastructure-as-code, declarative pipeline specs, artifact versioning, and continuous validation gates ensuring models remain performant and compliant under dynamic data conditions.
Reference Architecture (Textual Diagram)
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MLOps Reference Architecture β
ββββββββββββββββ¬βββββββββββββββββββββ¬ββββββββββββββββββββ¬βββββββββββββββββββββ€
β Data Layer β Experimentation β Training & Build β Deployment & Ops β
β Ingestion β β Notebook/VSCode β β Pipeline (AzureML)β Online/Batch EPs β
β Validation β β Tracking (MLflow) β Artifact Store β Canary / Blue-Greenβ
β Versioning(DVCβ Metrics Registry β Model Registry β Feature Store Sync β
ββββββββββββββββ΄βββββββββββββββββββββ΄ββββββββββββββββββββ΄βββββββββββββββββββββ€
β Monitoring & Drift: App Insights + Custom Metrics + Data/Concept Drift Jobs β
β Governance: Model Card, Lineage, Approval Workflow, Policy Enforcement β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lifecycle Stages & Key Controls
| Stage | Goal | Automation | Artifacts | KPIs |
|---|---|---|---|---|
| Data Ingestion | Reliable fresh data | Scheduled ETL + validation | Raw & cleaned datasets (DVC) | Data freshness, validation pass % |
| Feature Engineering | Consistency & reusability | Feature store sync jobs | Feature definitions, transformation scripts | Feature reuse %, calc latency |
| Training | Reproducible model builds | Azure ML pipeline + MLflow logging | Model binary, metrics, params, environment spec | Build time, accuracy, F1, cost/run |
| Evaluation | Quality + risk gating | Automated test harness | Test report, fairness & drift prelim | Test pass rate, disparity deltas |
| Deployment | Safe promotion | Blue-green rollout, canary traffic | Live endpoint versions, deployment manifest | Time-to-live, rollback time |
| Monitoring | Detect issues early | Scheduled metric collection | Latency, accuracy, drift logs | p95 latency, drift score, alert MTTR |
| Retraining | Adapt to data changes | Cron + event triggers | Versioned retrained models | Retrain frequency, performance delta |
| Governance | Compliance & traceability | Auto model card & lineage | Model card, approval log, lineage map | Audit completeness %, approval SLA |
Foundational Reproducibility (Environment + Hashes)
# Create deterministic conda environment export
conda env export --no-builds > env-lock.yml
# Hash training code & data sample for lineage
shasum -a 256 src/train.py data/sample_head.csv > build_provenance.sha
Store env-lock.yml and build_provenance.sha with model artifacts to guarantee reproducibility.
MLOps Lifecycle (Expanded)
- Data Engineering β ingestion, schema checks, null & range validation, distribution profiling.
- Feature Engineering β deterministic transformations, versioned code, caching expensive derived features.
- Experimentation β parameter sweeps logged to MLflow; narrow candidate set via multi-objective metrics.
- Training & Evaluation β orchestrated pipeline; fairness & performance tests; generate model card stub.
- Packaging & Deployment β environment image build, vulnerability scan, progressive rollout.
- Monitoring β real-time endpoint telemetry + scheduled drift & performance jobs.
- Continuous Retraining β triggered by drift or periodic schedule; compare candidate vs champion.
- Governance & Retirement β approval workflow, lineage archive, decommission low-value models.
Azure Machine Learning Pipelines
Pipeline Components
from azure.ai.ml import MLClient, command, Input, Output
from azure.ai.ml.dsl import pipeline
from azure.identity import DefaultAzureCredential
ml_client = MLClient.from_config(credential=DefaultAzureCredential())
# Define training component
train_component = command(
name="train_model",
display_name="Train ML Model",
code="./src",
command="python train.py --data ${{inputs.training_data}} --model ${{outputs.model_output}}",
environment="azureml:sklearn-env@latest",
inputs={
"training_data": Input(type="uri_folder")
},
outputs={
"model_output": Output(type="uri_folder")
}
)
# Define evaluation component
eval_component = command(
name="evaluate_model",
code="./src",
command="python evaluate.py --model ${{inputs.model_input}} --test_data ${{inputs.test_data}}",
environment="azureml:sklearn-env@latest",
inputs={
"model_input": Input(type="uri_folder"),
"test_data": Input(type="uri_folder")
}
)
Pipeline Definition
@pipeline(
name="training_pipeline",
description="End-to-end training pipeline"
)
def ml_training_pipeline(training_data, test_data):
train_job = train_component(training_data=training_data)
eval_job = eval_component(
model_input=train_job.outputs.model_output,
test_data=test_data
)
return {
"trained_model": train_job.outputs.model_output
}
# Create and submit pipeline
pipeline_job = ml_training_pipeline(
training_data=Input(path="azureml:train-dataset:1"),
test_data=Input(path="azureml:test-dataset:1")
)
submitted_job = ml_client.jobs.create_or_update(pipeline_job)
Model Versioning & Experiment Tracking
MLflow Tracking (Parameters, Metrics, Artifacts)
import mlflow
import mlflow.sklearn
mlflow.set_tracking_uri("azureml://")
mlflow.set_experiment("credit-risk-model")
with mlflow.start_run():
# Log parameters
mlflow.log_param("max_depth", 5)
mlflow.log_param("n_estimators", 100)
# Train model
model = RandomForestClassifier(max_depth=5, n_estimators=100)
model.fit(X_train, y_train)
# Log metrics
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
Model Registry (Promotion Workflow)
from azure.ai.ml.entities import Model
model = Model(
path="./model",
type="mlflow_model",
name="credit-risk-classifier",
version="1.0",
description="Random Forest classifier for credit risk",
tags={"framework": "sklearn", "task": "classification"}
)
ml_client.models.create_or_update(model)
Continuous Training & Retraining Triggers
Scheduled Pipeline (Time-based)
from azure.ai.ml.entities import CronSchedule, JobSchedule
schedule = JobSchedule(
name="weekly_retraining",
trigger=CronSchedule(expression="0 0 * * 1"), # Every Monday
create_job=pipeline_job
)
ml_client.schedules.begin_create_or_update(schedule)
Data-Triggered Retraining (Event-Based)
from azure.eventgrid import EventGridPublisherClient
from azure.core.credentials import AzureKeyCredential
# Trigger retraining on new data
def on_new_data(event):
if event["eventType"] == "Microsoft.Storage.BlobCreated":
# Submit retraining pipeline
ml_client.jobs.create_or_update(pipeline_job)
Deployment Strategies & Patterns
Online Endpoints (Managed Inference)
from azure.ai.ml.entities import ManagedOnlineEndpoint, ManagedOnlineDeployment
# Create endpoint
endpoint = ManagedOnlineEndpoint(
name="credit-risk-endpoint",
description="Credit risk prediction endpoint",
auth_mode="key"
)
ml_client.online_endpoints.begin_create_or_update(endpoint)
# Create deployment
deployment = ManagedOnlineDeployment(
name="blue",
endpoint_name="credit-risk-endpoint",
model=model,
instance_type="Standard_DS2_v2",
instance_count=1,
environment_variables={
"ENABLE_MONITORING": "true"
}
)
ml_client.online_deployments.begin_create_or_update(deployment)
Blue-Green Deployment & Canary Traffic Shifting
# Deploy new version (green)
green_deployment = ManagedOnlineDeployment(
name="green",
endpoint_name="credit-risk-endpoint",
model=new_model,
instance_type="Standard_DS2_v2",
instance_count=1
)
ml_client.online_deployments.begin_create_or_update(green_deployment)
# Gradually shift traffic
endpoint.traffic = {"blue": 90, "green": 10}
ml_client.online_endpoints.begin_create_or_update(endpoint)
# After validation, switch fully
endpoint.traffic = {"blue": 0, "green": 100}
ml_client.online_endpoints.begin_create_or_update(endpoint)
Batch Inference & Offline Scoring
from azure.ai.ml.entities import BatchEndpoint, BatchDeployment
batch_endpoint = BatchEndpoint(
name="batch-scoring",
description="Batch scoring endpoint"
)
batch_deployment = BatchDeployment(
name="production",
endpoint_name="batch-scoring",
model=model,
compute="batch-cluster",
instance_count=4,
max_concurrency_per_instance=2
)
# Invoke batch job
job = ml_client.batch_endpoints.invoke(
endpoint_name="batch-scoring",
deployment_name="production",
input=Input(path="azureml:batch-input-data:1")
)
Model Monitoring (Performance, Drift, Quality)
Data Drift Detection (Statistical Tests)
from azure.ai.ml.entities import DataDriftMonitor
monitor = DataDriftMonitor(
name="credit-data-drift",
endpoint_name="credit-risk-endpoint",
deployment_name="blue",
baseline_dataset=baseline_dataset,
target_dataset=production_dataset,
features=["income", "debt_ratio", "credit_score"],
frequency="Week",
alert_enabled=True
)
ml_client.data_drift_monitors.begin_create_or_update(monitor)
Performance & Resource Tracking
Add accuracy & error rate logging with MLflow, plus system metrics via Azure Monitor queries. Incorporate concept drift by comparing rolling window performance (e.g., last 7 days vs baseline).
def rolling_performance(y_true_history, y_pred_history, window=500):
import numpy as np
recent = y_true_history[-window:], y_pred_history[-window:]
acc = (recent[0] == recent[1]).mean()
mlflow.log_metric("rolling_accuracy", acc)
return acc
from azure.monitor import MetricsQueryClient
metrics_client = MetricsQueryClient(credential=DefaultAzureCredential())
# Query endpoint metrics
response = metrics_client.query_resource(
resource_uri=endpoint_resource_id,
metric_names=["RequestLatency", "RequestsPerSecond"],
timespan="PT1H"
)
for metric in response.metrics:
print(f"{metric.name}: {metric.timeseries[0].data}")
CI/CD Integration (Azure DevOps & GitHub Actions)
Azure DevOps Pipeline (Training + Deployment Stages)
# azure-pipelines.yml
trigger:
branches:
include:
- main
pool:
vmImage: 'ubuntu-latest'
stages:
- stage: Train
jobs:
- job: TrainModel
steps:
- task: AzureCLI@2
inputs:
azureSubscription: 'ML-Service-Connection'
scriptType: 'bash'
scriptLocation: 'inlineScript'
inlineScript: |
az ml job create --file pipeline.yml
- stage: Deploy
dependsOn: Train
jobs:
- job: DeployModel
steps:
- task: AzureCLI@2
inputs:
azureSubscription: 'ML-Service-Connection'
scriptType: 'bash'
scriptLocation: 'inlineScript'
inlineScript: |
az ml online-deployment create --file deployment.yml
GitHub Actions Workflow (Infra + Pipeline)
name: MLOps Pipeline
on:
push:
branches: [main]
jobs:
train-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Azure Login
uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: Submit Training Pipeline
run: |
az ml job create --file pipeline.yml --resource-group ${{ secrets.RESOURCE_GROUP }} --workspace-name ${{ secrets.WORKSPACE_NAME }}
- name: Deploy Model
run: |
az ml online-deployment update --name blue --endpoint credit-risk-endpoint --set traffic=100
Testing Strategy (Unit, Integration, Validation)
| Layer | Scope | Example Tests | Tooling |
|---|---|---|---|
| Unit | Pure functions (feature transforms) | Null handling, scaling correctness | pytest |
| Data Validation | Schema & distribution | Column presence, drift thresholds | Great Expectations |
| Model Quality | Metrics & fairness | Min accuracy, parity diff limits | pytest + custom asserts |
| Integration | Pipeline artifact wiring | Output existence, registry publish | pytest + CLI |
| Deployment | Endpoint health & latency | 95th percentile < target | k6 / locust |
| Monitoring | Alerting rules fire | Simulated drift triggers alerts | synthetic events |
Sample Pytest for Metric Gate
def test_model_quality():
assert mlflow.get_metric("accuracy") >= 0.85
assert mlflow.get_metric("parity_diff") <= 0.08
Infrastructure as Code (Bicep & Terraform)
Bicep Workspace Snippet
resource aml 'Microsoft.MachineLearningServices/workspaces@2023-04-01' = {
name: 'ml-workspace'
location: resourceGroup().location
properties: {
friendlyName: 'Prod ML'
keyVault: kv.id
storageAccount: sa.id
applicationInsights: ai.id
}
}
Terraform for Registry
resource "azurerm_machine_learning_workspace" "ws" {
name = "ml-workspace"
location = azurerm_resource_group.rg.location
resource_group_name = azurerm_resource_group.rg.name
sku_name = "Basic"
}
Feature Store Pattern (Simplified)
Design principle: consistent offline & online computation. Cache feature transformations; attach metadata (owner, refresh cadence).
class FeatureStore:
def __init__(self, registry):
self.registry = registry
def compute_credit_utilization(self, df):
feat = df['current_balance'] / (df['credit_limit'] + 1e-6)
self.registry.log('credit_utilization', 'v1', 'daily')
return feat
Canary Analysis Script
import numpy as np
def canary_diff(champion_preds, canary_preds, y_true):
from sklearn.metrics import accuracy_score
champ_acc = accuracy_score(y_true, champion_preds)
can_acc = accuracy_score(y_true, canary_preds)
diff = can_acc - champ_acc
mlflow.log_metric('canary_accuracy_delta', diff)
return diff
Promotion Criteria
| Metric | Threshold | Rationale |
|---|---|---|
| Accuracy Delta | >= -0.01 | Canary not worse than champion |
| Latency p95 | < 350ms | Performance acceptable |
| Error Rate | < 2% | Stability maintained |
| Drift Score | < 0.05 | Data distribution stable |
| Fairness Parity Diff | < 0.08 | Equity preserved |
Best Practices
- Version everything: data, code, models, environments
- Automate testing: unit tests, integration tests, model validation
- Use feature stores for consistent features
- Implement gradual rollout for deployments
- Monitor both model performance and system metrics
- Establish rollback procedures
- Document pipeline dependencies
- Use separate environments (dev, staging, prod)
Troubleshooting
| Issue | Cause | Resolution | Preventive Action |
|---|---|---|---|
| Pipeline failure | Dependency conflict | Pin versions, rebuild env | Automated env lock file |
| Deployment timeout | Insufficient resources | Scale instances | Autoscaling rules |
| Data drift alert | Distribution shift | Retrain model | Implement early detection |
| High latency | Inefficient preprocessing | Optimize transforms | Profile & cache features |
| Model not reproducible | Missing env spec | Capture env-lock.yml | Enforce pipeline check |
| Failed canary | Poor generalization | Rollback traffic | Expand validation set |
| Registry version conflict | Duplicate names | Increment semantic version | Pre-deploy naming check |
| Metrics missing | Logging skipped | Add logging wrapper | CI lint on instrumentation |
Key Takeaways
Effective MLOps combines automation, versioning, observability, governance, and controlled rollouts to deliver reliable, adaptable ML systems at scale.
References
- Azure ML Pipelines
- Deploy Online Endpoints
- MLflow Tracking
- Data Versioning with DVC
- Azure Monitor Metrics
- Great Expectations
- Terraform AzureRM Provider
Advanced Pipeline Patterns
Parallel & Conditional Steps (Hyperparameter + Conditional Promotion)
from azure.ai.ml.dsl import pipeline
@pipeline(name="advanced_training_pipeline")
def advanced_pipeline(train_data, test_data):
sweep_jobs = []
for depth in [3,5,7]:
job = train_component(training_data=train_data, overrides={"max_depth": depth})
sweep_jobs.append(job)
# Parallel hyperparameter sweeps auto-run concurrently
eval_jobs = [
eval_component(model_input=j.outputs.model_output, test_data=test_data)
for j in sweep_jobs
]
# Conditional register best performing model
from azure.ai.ml import Output
# (Pseudo) select best based on metric
best = eval_jobs[0] # replaced later with comparison logic
return {"champion_model": best.inputs.model_input}
Dynamic Branching Logic (Metric Gate)
def should_promote(metrics: dict):
return (
metrics["accuracy"] >= 0.86 and
metrics["fairness_parity"] <= 0.08 and
metrics["latency_p95"] < 350
)
Data & Feature Versioning (DVC + Lineage)
# Initialize DVC and remote
dvc init
dvc remote add origin azure://myblobcontainer/dvcstore
# Track raw and processed datasets
dvc add data/raw/credit.csv
dvc add data/processed/credit_clean.parquet
# Generate pipeline stages
dvc stage add -n preprocess -d src/preprocess.py -d data/raw/credit.csv -o data/processed/credit_clean.parquet \
python src/preprocess.py --in data/raw/credit.csv --out data/processed/credit_clean.parquet
# Push artifacts & metadata
git add .
git commit -m "Add data lineage"
dvc push
Lineage binding combines DVC file hashes + code commit + environment lock to produce a composite provenance record:
import hashlib, json
def composite_lineage(files):
h = hashlib.sha256()
for f in files:
with open(f,'rb') as fh:
h.update(fh.read())
return h.hexdigest()
record = {
"data_hash": composite_lineage(["data/processed/credit_clean.parquet.dvc"]),
"code_commit": os.getenv("GIT_COMMIT"),
"env_lock": open("env-lock.yml").read()
}
open("lineage.json","w").write(json.dumps(record, indent=2))
Concept & Data Drift Deep Dive
Combine statistical tests (KS, PSI) + embedding shift for text features.
from scipy.stats import ks_2samp
import numpy as np
def population_stability_index(expected, actual, bins=10):
expected_hist, _ = np.histogram(expected, bins=bins)
actual_hist, _ = np.histogram(actual, bins=bins)
psi = 0.0
for e,a in zip(expected_hist, actual_hist):
if e == 0 or a == 0: continue
ratio = (a/len(actual)) / (e/len(expected))
psi += (a/len(actual) - e/len(expected)) * np.log(ratio)
return psi
def drift_report(baseline, production):
report = {}
for col in baseline.columns:
ks_p = ks_2samp(baseline[col], production[col]).pvalue
psi = population_stability_index(baseline[col], production[col])
report[col] = {"ks_p": ks_p, "psi": psi}
return report
Threshold Guidelines: KS p-value < 0.05 or PSI > 0.2 triggers retraining candidate evaluation.
Retraining Orchestration Logic
def evaluate_retraining_trigger(drift_scores, perf_delta, time_since_last):
if any(d['psi'] > 0.2 for d in drift_scores.values()):
return "drift"
if perf_delta < -0.03: # accuracy dropped more than 3%
return "performance"
if time_since_last.days >= 30:
return "stale"
return None
When trigger reason != None: submit pipeline with label metadata trigger_reason for audit.
Security & Compliance Controls
| Control | Implementation | Tooling |
|---|---|---|
| Secrets Management | Key Vault references in workspace | Azure Key Vault |
| Network Isolation | Private endpoints + VNet injection | Azure ML VNet |
| Image Vulnerability Scan | Scan base container before deploy | Trivy / Defender |
| RBAC Segmentation | Least privilege roles (Reader, Data Scientist, Approver) | Azure RBAC |
| Policy Enforcement | Deny public endpoints, enforce encryption | Azure Policy |
| Audit Logging | Centralized actions (deploy, approve, rollback) | Log Analytics + KQL |
Sample KQL (Deployment Approvals)
AzureActivity
| where OperationName =~ "Create Or Update Deployment"
| project TimeGenerated, Caller, ResultType, CorrelationId
Cost Optimization Strategies
| Area | Strategy | Impact |
|---|---|---|
| Compute | Use spot/low-priority for batch training | 30β70% savings |
| Autoscaling | Scale out on p95 latency > threshold | Avoid over-provision |
| Feature Engineering | Cache expensive aggregations | Reduce pipeline run time |
| Model Selection | Early stop underperforming sweeps | Lower GPU hours |
| Storage | Lifecycle policies for old artifacts | Lower storage costs |
| Monitoring | Sample inference logging (e.g., 5%) | Control telemetry spend |
Maturity Model
| Level | Characteristics | Focus |
|---|---|---|
| 1 Ad-hoc | Manual scripts, no tracking | Establish versioning |
| 2 Repeatable | Some pipeline jobs, partial logging | Metric standardization |
| 3 Defined | CI/CD, registry, monitoring baseline | Quality gates |
| 4 Managed | Drift detection, retraining triggers | SLA enforcement |
| 5 Optimizing | A/B experimentation, cost governance | Continuous improvement |
| 6 Autonomous | Self-healing pipelines, adaptive retraining | Policy-driven optimization |
KPI Catalog
| KPI | Definition | Target |
|---|---|---|
| Lead Time | Commit β Production deploy time | < 2h |
| Deployment Frequency | Successful prod releases / week | β₯ 5 |
| Change Failure Rate | Failed deploys requiring rollback | < 5% |
| MTTR | Mean time to restore after incident | < 30 min |
| Drift Reaction Time | Drift detection β retraining start | < 24h |
| Reproducibility Score | % runs reproduce metrics | 100% |
| Fairness Regression | Parity change vs baseline | β€ 0.02 |
| Cost per 1000 Predictions | Inference total / volume | Trending β |
Extended Best Practices & Anti-Patterns
| Do | Reason | Avoid | Risk |
|---|---|---|---|
| Hash lineage artifacts | Ensures reproducibility | Rely on timestamps | Ambiguous provenance |
| Implement composite gates | Prevent regressions | Single-metric gating | Hidden bias/perf drop |
| Separate dev/stage/prod | Contained risk | Testing in prod | Customer impact |
| Automate rollback | Fast recovery | Manual redeploy | Extended outage |
| Document feature semantics | Maintain clarity | Tribal knowledge | Rework & errors |
| Monitor fairness + drift | Early issue detection | Only accuracy | Ethical/regulatory risk |
Environment Drift Detection
import yaml, subprocess
def current_env_spec():
out = subprocess.check_output(["pip","freeze"]).decode()
return sorted(out.splitlines())
baseline = open("env-lock.yml").read().splitlines()
drift = [p for p in current_env_spec() if p not in baseline]
if drift:
print("Environment drift detected:", drift)
Scaling & Performance Engineering
- Use load testing (k6, locust) pre-promotion.
- Profile endpoints (CPU, memory) with sampling profiler.
- Apply autoscaling on p95 latency + queue depth signals.
- Use model quantization or distillation for heavy architectures.
- Employ warm pools to reduce cold start latency.
Rollback & Disaster Recovery
Rollback trigger conditions: error rate spike > 3x baseline, latency p95 > SLA + 25%, critical metric regression > 2%. Execute traffic shift to previous stable deployment + open incident ticket with root cause template.
FAQs
| Question | Answer |
|---|---|
| How do I decide retraining cadence? | Combine drift metrics, performance decay, and business cycle (e.g., monthly) |
| What if fairness improves but accuracy drops? | Evaluate net business impact; may accept trade-off if accuracy within tolerance |
| How are secrets rotated? | Managed via Key Vault with quarterly rotation policy and automated pipeline update |
| How do I handle large feature sets? | Prune low-importance features; maintain feature importance registry; apply sparse techniques |
| Should I store raw data forever? | Retain per compliance & privacy rules; apply lifecycle deletion policies |
| How to manage multi-model ensembles? | Track component models separately; create ensemble manifest with versions |
| What triggers rollback automatically? | Canary delta beyond thresholds, spike in error, SLA violation events |
| How to audit lineage quickly? | Query combined lineage JSON + registry metadata via reporting script |
Additional References
- Azure ML Responsible AI
- Azure Policy for ML
- Key Vault Integration
- Autoscaling Patterns
- PSI Calculation
Service Levels (SLO / SLA / Error Budgets)
Define clear reliability expectations for ML serving:
| Dimension | SLI (Metric) | SLO Target | SLA (External) | Error Budget |
|---|---|---|---|---|
| Availability | Successful request ratio | 99.5% monthly | 99.0% | 0.5% downtime or failed calls |
| Latency | p95 response time | < 350 ms | < 400 ms | 50 ms headroom |
| Correctness | Rolling accuracy vs baseline | β₯ baseline -1% | Baseline -2% | 1% degradation allowance |
| Freshness | Max model age | < 30 days | < 45 days | 15 day buffer |
| Fairness | Parity diff (worst group) | β€ 0.08 | β€ 0.10 | 0.02 tolerance |
Error budget usage informs release velocity; if exhausted, freeze feature changes, prioritize reliability/drift fixes.
Incident Response Runbook (Template)
- Triage: Identify alert type (latency, drift, fairness, correctness).
- Scope: Determine impacted endpoints, traffic %, user segments.
- Mitigation: Apply rollback (traffic shift), scale resources, or disable risky feature flags.
- Diagnosis: Pull KQL queries (latency, drift, fairness) + recent deployment diff.
- Communication: Post status update (Internal channel + status page if SLA breach).
- Resolution: Implement fix (hot patch, retrain, config tune).
- Postmortem: Document root cause + action items (automation, tests, guardrails).
Root Cause Template (Excerpt)
Issue: High latency spike post canary promotion
Cause: Missing warm-up routine; model cold start + autoscaling delay
Impact: p95 latency 780ms for 12 minutes (SLO breach)
Actions: Add warm pool, pre-load embeddings cache, adjust autoscaling thresholds
Preventive Controls: Performance regression test, synthetic load pre-promotion
Enhanced Monitoring (KQL Queries)
Latency & Error Trend
AppRequests
| where Name == "credit-risk-endpoint"
| summarize p95Latency=percentile(DurationMs,95), errorRate=countif(ResultCode >= 500)/count() by bin(TimeGenerated, 5m)
| render timechart
Drift & Fairness Metrics (Custom Logs)
CustomMetrics
| where MetricName in ("psi_income", "psi_credit_score", "fairness_parity_diff")
| project TimeGenerated, MetricName, MetricValue
| summarize avg(MetricValue) by MetricName, bin(TimeGenerated, 1h)
Canary vs Champion Comparison
CustomMetrics
| where MetricName in ("champ_accuracy", "canary_accuracy")
| summarize champ=avgif(MetricValue, MetricName=="champ_accuracy"), canary=avgif(MetricValue, MetricName=="canary_accuracy") by bin(TimeGenerated, 30m)
| extend delta = canary - champ
Shadow & Bandit Deployment Strategies
| Strategy | Description | Use Case | Risk |
|---|---|---|---|
| Shadow Testing | New model receives mirrored traffic; responses not returned to user | Non-regression validation | Resource overhead |
| Canary | Partial traffic routed; metrics compared | Gradual rollout | Needs gating discipline |
| A/B (Fixed Split) | Two versions share traffic; track uplift | UI/content or model improvements | Longer evaluation window |
| Multi-Armed Bandit | Adaptive traffic based on live reward | Optimize conversions dynamically | Complex reward design |
Shadow establishes safety; bandit optimizes exploitation vs exploration under controlled fairness constraints.
Promotion Gate Script (Composite Validation)
def promotion_gate(metrics):
checks = {
"accuracy": metrics["accuracy"] >= 0.85,
"fairness": metrics["parity_diff"] <= 0.08,
"latency": metrics["p95_latency_ms"] < 350,
"drift": metrics["avg_psi"] < 0.2,
"error_rate": metrics["error_rate"] < 0.02
}
failed = [k for k,v in checks.items() if not v]
if failed:
mlflow.log_param("promotion_gate", f"FAILED:{','.join(failed)}")
return False
mlflow.log_param("promotion_gate", "PASSED")
return True
Model Explanation Integration (SHAP Gate)
import shap
def shap_consistency(model, X_sample, top_n=5):
explainer = shap.TreeExplainer(model)
vals = explainer.shap_values(X_sample)
mean_abs = np.abs(vals).mean(axis=0)
top_features = X_sample.columns[np.argsort(mean_abs)[-top_n:]]
mlflow.log_artifact("shap_top_features.txt")
open("shap_top_features.txt","w").write("\n".join(top_features))
return top_features
Gate can enforce stability (e.g., expected core features remain in top list across versions).
Pipeline Optimization Techniques
- Cache intermediate feature sets for iterative experiments.
- Use lightweight models for early sweep pruning before heavy architectures.
- Parallelize data validation + feature extraction stages.
- Adopt container layer caching (shared base image for Python deps).
- Use distributed training only beyond dataset size / complexity thresholds.
Expanded Troubleshooting Matrix (Additions)
| Issue | Cause | Resolution | Preventive Action |
|---|---|---|---|
| Fairness regression | Distribution shift in subgroup | Retrain with reweight / constraints | Continuous subgroup monitoring |
| Cost spike | Unbounded sweep search | Cap trials + early stopping | Budget-aware scheduler |
| Metrics delay | Telemetry ingestion lag | Investigate Log Analytics latency | Implement buffering & retry |
| Env drift | Untracked dependency added | Rebuild from lock, update lock file | Immutable build images |
| Stale canary | Canary not promoted or rolled back | Review decision gates | Set max canary duration SLA |
| Slow retraining | Inefficient data loading | Optimize IO, parquet usage | Profile pipeline steps |
| Registry clutter | Excess obsolete versions | Archive & purge policy | Scheduled cleanup job |
Security & Compliance Checklist
- Key Vault secrets mounted via URI references
- Private endpoint enabled for workspace & storage
- All deployments use approved base images (scanned)
- RBAC roles reviewed quarterly
- Policies enforce HTTPS & encryption at rest
- Lineage JSON generated & stored per version
- Fairness & drift metrics logged hourly
- Promotion gate result persisted
Quality Scoreboard (Example)
| Version | Accuracy | Fairness Diff | Drift PSI | Latency p95 | Status |
|---|---|---|---|---|---|
| 1.0 | 0.872 | 0.05 | 0.08 | 310ms | Champion |
| 1.1 | 0.879 | 0.06 | 0.07 | 305ms | Shadow |
| 1.2 | 0.881 | 0.05 | 0.09 | 315ms | Canary |
| 1.3 | 0.890 | 0.07 | 0.06 | 298ms | Candidate |
Track evolution; degrade detection triggers investigation before user impact.
Final Thoughts
Robust MLOps transforms model delivery from artisanal bursts to engineered, measurable, and continuously improving systemsβbalancing velocity, reliability, ethics, and cost efficiency.