AI Governance: Compliance, Security, and Risk Management
Introduction
AI governance ensures models meet regulatory requirements, security standards, and organizational policies. This guide covers compliance frameworks, risk management, security controls, and audit practices.
Governance Framework Components
- Policy Definition: Standards, guidelines, procedures
- Risk Assessment: Identify and mitigate AI-specific risks
- Compliance Monitoring: Regulatory alignment (GDPR, CCPA, HIPAA)
- Security Controls: Access, encryption, data protection
- Audit & Accountability: Logging, traceability, reviews
Regulatory Compliance
GDPR Requirements
class GDPRCompliantModel:
"""Ensure model complies with GDPR right to explanation"""
def __init__(self, model, explainer):
self.model = model
self.explainer = explainer
self.prediction_log = []
def predict_with_explanation(self, X, user_id):
"""Provide prediction with explanation (GDPR Article 22)"""
prediction = self.model.predict(X)
explanation = self.explainer.explain_instance(X[0], self.model.predict_proba)
record = {
"user_id": user_id,
"timestamp": datetime.utcnow().isoformat(),
"prediction": prediction[0],
"explanation": explanation.as_list(),
"data_hash": hashlib.sha256(str(X).encode()).hexdigest()
}
self.prediction_log.append(record)
return prediction, explanation
def delete_user_data(self, user_id):
"""Right to be forgotten (GDPR Article 17)"""
self.prediction_log = [
record for record in self.prediction_log
if record["user_id"] != user_id
]
HIPAA Compliance for Healthcare AI
from azure.keyvault.secrets import SecretClient
from cryptography.fernet import Fernet
class HIPAACompliantDataHandler:
"""Handle protected health information (PHI) securely"""
def __init__(self, key_vault_client):
self.key_vault = key_vault_client
encryption_key = self.key_vault.get_secret("phi-encryption-key").value
self.cipher = Fernet(encryption_key.encode())
def encrypt_phi(self, data):
"""Encrypt PHI at rest"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt_phi(self, encrypted_data):
"""Decrypt PHI for authorized access"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
def audit_phi_access(self, user_id, data_id, action):
"""Log all PHI access (HIPAA audit requirements)"""
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"data_id": data_id,
"action": action,
"ip_address": request.remote_addr
}
logging.info(f"PHI Access: {audit_entry}")
Risk Assessment Framework
AI Risk Categories
| Risk Category | Examples | Mitigation |
|---|---|---|
| Bias & Fairness | Discriminatory outcomes | Fairness testing, diverse datasets |
| Privacy | Data leakage, re-identification | Anonymization, differential privacy |
| Security | Adversarial attacks, model theft | Input validation, rate limiting |
| Transparency | Black-box decisions | Explainability tools, model cards |
| Safety | Unsafe recommendations | Human review, confidence thresholds |
Risk Assessment Matrix
class AIRiskAssessment:
def __init__(self):
self.risk_registry = []
def assess_risk(self, model_name, risk_category, likelihood, impact):
"""
likelihood: 1-5 (rare to almost certain)
impact: 1-5 (negligible to catastrophic)
"""
risk_score = likelihood * impact
risk_level = self._categorize_risk(risk_score)
risk = {
"model_name": model_name,
"category": risk_category,
"likelihood": likelihood,
"impact": impact,
"risk_score": risk_score,
"risk_level": risk_level,
"assessment_date": datetime.utcnow().isoformat()
}
self.risk_registry.append(risk)
return risk
def _categorize_risk(self, score):
if score <= 5:
return "Low"
elif score <= 12:
return "Medium"
elif score <= 20:
return "High"
else:
return "Critical"
def get_high_risks(self):
"""Return models with high or critical risk"""
return [r for r in self.risk_registry if r["risk_level"] in ["High", "Critical"]]
Security Controls
Role-Based Access Control (RBAC)
from azure.ai.ml import MLClient
from azure.mgmt.authorization import AuthorizationManagementClient
def assign_ml_permissions(subscription_id, resource_group, workspace, user_email, role):
"""Assign RBAC role for Azure ML workspace"""
role_definitions = {
"data_scientist": "AzureML Data Scientist",
"compute_operator": "AzureML Compute Operator",
"reader": "Reader"
}
auth_client = AuthorizationManagementClient(
credential=DefaultAzureCredential(),
subscription_id=subscription_id
)
scope = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.MachineLearningServices/workspaces/{workspace}"
# Assign role
auth_client.role_assignments.create(
scope=scope,
role_assignment_name=str(uuid.uuid4()),
parameters={
"role_definition_id": f"/subscriptions/{subscription_id}/providers/Microsoft.Authorization/roleDefinitions/{role_definitions[role]}",
"principal_id": user_email
}
)
Model Encryption
from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm
def encrypt_model(model_bytes, key_vault_url, key_name):
"""Encrypt model before storage"""
credential = DefaultAzureCredential()
key_client = KeyClient(vault_url=key_vault_url, credential=credential)
key = key_client.get_key(key_name)
crypto_client = CryptographyClient(key, credential=credential)
result = crypto_client.encrypt(EncryptionAlgorithm.rsa_oaep, model_bytes)
return result.ciphertext
Policy Enforcement
Automated Policy Checks
class ModelGovernancePolicy:
def __init__(self):
self.policies = {
"min_accuracy": 0.85,
"max_bias_disparity": 0.1,
"require_explainability": True,
"require_documentation": True,
"max_data_age_days": 180
}
def validate_model(self, model_metadata):
"""Check if model meets governance policies"""
violations = []
if model_metadata["accuracy"] < self.policies["min_accuracy"]:
violations.append(f"Accuracy {model_metadata['accuracy']} below threshold")
if model_metadata["bias_disparity"] > self.policies["max_bias_disparity"]:
violations.append(f"Bias disparity {model_metadata['bias_disparity']} exceeds limit")
if self.policies["require_explainability"] and not model_metadata.get("explainer"):
violations.append("Explainability not implemented")
if self.policies["require_documentation"] and not model_metadata.get("model_card"):
violations.append("Model card documentation missing")
data_age = (datetime.utcnow() - datetime.fromisoformat(model_metadata["training_date"])).days
if data_age > self.policies["max_data_age_days"]:
violations.append(f"Training data is {data_age} days old (max: {self.policies['max_data_age_days']})")
return {
"compliant": len(violations) == 0,
"violations": violations
}
Audit Trail Implementation
import json
from azure.storage.blob import BlobServiceClient
class ModelAuditTrail:
def __init__(self, connection_string, container_name):
self.blob_service = BlobServiceClient.from_connection_string(connection_string)
self.container = self.blob_service.get_container_client(container_name)
def log_event(self, event_type, model_name, user_id, details):
"""Create immutable audit log entry"""
audit_entry = {
"event_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"model_name": model_name,
"user_id": user_id,
"details": details
}
blob_name = f"audit/{event_type}/{datetime.utcnow().date()}/{audit_entry['event_id']}.json"
blob_client = self.container.get_blob_client(blob_name)
blob_client.upload_blob(json.dumps(audit_entry), overwrite=False)
return audit_entry
def query_audit_log(self, start_date, end_date, event_type=None, model_name=None):
"""Query audit logs for compliance reporting"""
blobs = self.container.list_blobs(name_starts_with="audit/")
results = []
for blob in blobs:
blob_client = self.container.get_blob_client(blob.name)
data = json.loads(blob_client.download_blob().readall())
if start_date <= data["timestamp"] <= end_date:
if event_type and data["event_type"] != event_type:
continue
if model_name and data["model_name"] != model_name:
continue
results.append(data)
return results
Governance Dashboard
from flask import Flask, render_template
import pandas as pd
app = Flask(__name__)
audit_trail = ModelAuditTrail("<connection-string>", "audit-logs")
risk_assessment = AIRiskAssessment()
policy_checker = ModelGovernancePolicy()
@app.route("/governance/dashboard")
def dashboard():
"""Display governance metrics"""
high_risks = risk_assessment.get_high_risks()
recent_audits = audit_trail.query_audit_log(
start_date=(datetime.utcnow() - timedelta(days=30)).isoformat(),
end_date=datetime.utcnow().isoformat()
)
return render_template("dashboard.html",
high_risks=high_risks,
audit_count=len(recent_audits))
Best Practices
- Establish clear AI governance policies before deployment
- Implement layered security (defense in depth)
- Conduct regular risk assessments
- Maintain comprehensive audit trails
- Automate policy enforcement where possible
- Train teams on compliance requirements
- Establish incident response procedures
- Regular third-party security audits
Troubleshooting
| Issue | Cause | Resolution |
|---|---|---|
| Policy violations | Unclear requirements | Document policies; automate checks |
| Audit gaps | Incomplete logging | Implement comprehensive audit trail |
| Compliance failures | Regulatory changes | Regular compliance reviews |
| Access breaches | Weak controls | Enforce RBAC; MFA; monitoring |
Key Takeaways
Effective AI governance requires proactive risk management, comprehensive security controls, continuous compliance monitoring, and transparent audit practices.