AI Governance: Compliance, Security, and Risk Management

AI Governance: Compliance, Security, and Risk Management

Introduction

AI governance ensures models meet regulatory requirements, security standards, and organizational policies. This guide covers compliance frameworks, risk management, security controls, and audit practices.

Governance Framework Components

  1. Policy Definition: Standards, guidelines, procedures
  2. Risk Assessment: Identify and mitigate AI-specific risks
  3. Compliance Monitoring: Regulatory alignment (GDPR, CCPA, HIPAA)
  4. Security Controls: Access, encryption, data protection
  5. Audit & Accountability: Logging, traceability, reviews

Regulatory Compliance

GDPR Requirements

class GDPRCompliantModel:
    """Ensure model complies with GDPR right to explanation"""
    
    def __init__(self, model, explainer):
        self.model = model
        self.explainer = explainer
        self.prediction_log = []
    
    def predict_with_explanation(self, X, user_id):
        """Provide prediction with explanation (GDPR Article 22)"""
        prediction = self.model.predict(X)
        explanation = self.explainer.explain_instance(X[0], self.model.predict_proba)
        
        record = {
            "user_id": user_id,
            "timestamp": datetime.utcnow().isoformat(),
            "prediction": prediction[0],
            "explanation": explanation.as_list(),
            "data_hash": hashlib.sha256(str(X).encode()).hexdigest()
        }
        
        self.prediction_log.append(record)
        return prediction, explanation
    
    def delete_user_data(self, user_id):
        """Right to be forgotten (GDPR Article 17)"""
        self.prediction_log = [
            record for record in self.prediction_log 
            if record["user_id"] != user_id
        ]

HIPAA Compliance for Healthcare AI

from azure.keyvault.secrets import SecretClient
from cryptography.fernet import Fernet

class HIPAACompliantDataHandler:
    """Handle protected health information (PHI) securely"""
    
    def __init__(self, key_vault_client):
        self.key_vault = key_vault_client
        encryption_key = self.key_vault.get_secret("phi-encryption-key").value
        self.cipher = Fernet(encryption_key.encode())
    
    def encrypt_phi(self, data):
        """Encrypt PHI at rest"""
        return self.cipher.encrypt(data.encode()).decode()
    
    def decrypt_phi(self, encrypted_data):
        """Decrypt PHI for authorized access"""
        return self.cipher.decrypt(encrypted_data.encode()).decode()
    
    def audit_phi_access(self, user_id, data_id, action):
        """Log all PHI access (HIPAA audit requirements)"""
        audit_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": user_id,
            "data_id": data_id,
            "action": action,
            "ip_address": request.remote_addr
        }
        logging.info(f"PHI Access: {audit_entry}")

Risk Assessment Framework

AI Risk Categories

Risk Category Examples Mitigation
Bias & Fairness Discriminatory outcomes Fairness testing, diverse datasets
Privacy Data leakage, re-identification Anonymization, differential privacy
Security Adversarial attacks, model theft Input validation, rate limiting
Transparency Black-box decisions Explainability tools, model cards
Safety Unsafe recommendations Human review, confidence thresholds

Risk Assessment Matrix

class AIRiskAssessment:
    def __init__(self):
        self.risk_registry = []
    
    def assess_risk(self, model_name, risk_category, likelihood, impact):
        """
        likelihood: 1-5 (rare to almost certain)
        impact: 1-5 (negligible to catastrophic)
        """
        risk_score = likelihood * impact
        risk_level = self._categorize_risk(risk_score)
        
        risk = {
            "model_name": model_name,
            "category": risk_category,
            "likelihood": likelihood,
            "impact": impact,
            "risk_score": risk_score,
            "risk_level": risk_level,
            "assessment_date": datetime.utcnow().isoformat()
        }
        
        self.risk_registry.append(risk)
        return risk
    
    def _categorize_risk(self, score):
        if score <= 5:
            return "Low"
        elif score <= 12:
            return "Medium"
        elif score <= 20:
            return "High"
        else:
            return "Critical"
    
    def get_high_risks(self):
        """Return models with high or critical risk"""
        return [r for r in self.risk_registry if r["risk_level"] in ["High", "Critical"]]

Security Controls

Role-Based Access Control (RBAC)

from azure.ai.ml import MLClient
from azure.mgmt.authorization import AuthorizationManagementClient

def assign_ml_permissions(subscription_id, resource_group, workspace, user_email, role):
    """Assign RBAC role for Azure ML workspace"""
    
    role_definitions = {
        "data_scientist": "AzureML Data Scientist",
        "compute_operator": "AzureML Compute Operator",
        "reader": "Reader"
    }
    
    auth_client = AuthorizationManagementClient(
        credential=DefaultAzureCredential(),
        subscription_id=subscription_id
    )
    
    scope = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group}/providers/Microsoft.MachineLearningServices/workspaces/{workspace}"
    
    # Assign role
    auth_client.role_assignments.create(
        scope=scope,
        role_assignment_name=str(uuid.uuid4()),
        parameters={
            "role_definition_id": f"/subscriptions/{subscription_id}/providers/Microsoft.Authorization/roleDefinitions/{role_definitions[role]}",
            "principal_id": user_email
        }
    )

Model Encryption

from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm

def encrypt_model(model_bytes, key_vault_url, key_name):
    """Encrypt model before storage"""
    credential = DefaultAzureCredential()
    key_client = KeyClient(vault_url=key_vault_url, credential=credential)
    key = key_client.get_key(key_name)
    
    crypto_client = CryptographyClient(key, credential=credential)
    result = crypto_client.encrypt(EncryptionAlgorithm.rsa_oaep, model_bytes)
    
    return result.ciphertext

Policy Enforcement

Automated Policy Checks

class ModelGovernancePolicy:
    def __init__(self):
        self.policies = {
            "min_accuracy": 0.85,
            "max_bias_disparity": 0.1,
            "require_explainability": True,
            "require_documentation": True,
            "max_data_age_days": 180
        }
    
    def validate_model(self, model_metadata):
        """Check if model meets governance policies"""
        violations = []
        
        if model_metadata["accuracy"] < self.policies["min_accuracy"]:
            violations.append(f"Accuracy {model_metadata['accuracy']} below threshold")
        
        if model_metadata["bias_disparity"] > self.policies["max_bias_disparity"]:
            violations.append(f"Bias disparity {model_metadata['bias_disparity']} exceeds limit")
        
        if self.policies["require_explainability"] and not model_metadata.get("explainer"):
            violations.append("Explainability not implemented")
        
        if self.policies["require_documentation"] and not model_metadata.get("model_card"):
            violations.append("Model card documentation missing")
        
        data_age = (datetime.utcnow() - datetime.fromisoformat(model_metadata["training_date"])).days
        if data_age > self.policies["max_data_age_days"]:
            violations.append(f"Training data is {data_age} days old (max: {self.policies['max_data_age_days']})")
        
        return {
            "compliant": len(violations) == 0,
            "violations": violations
        }

Audit Trail Implementation

import json
from azure.storage.blob import BlobServiceClient

class ModelAuditTrail:
    def __init__(self, connection_string, container_name):
        self.blob_service = BlobServiceClient.from_connection_string(connection_string)
        self.container = self.blob_service.get_container_client(container_name)
    
    def log_event(self, event_type, model_name, user_id, details):
        """Create immutable audit log entry"""
        audit_entry = {
            "event_id": str(uuid.uuid4()),
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "model_name": model_name,
            "user_id": user_id,
            "details": details
        }
        
        blob_name = f"audit/{event_type}/{datetime.utcnow().date()}/{audit_entry['event_id']}.json"
        blob_client = self.container.get_blob_client(blob_name)
        blob_client.upload_blob(json.dumps(audit_entry), overwrite=False)
        
        return audit_entry
    
    def query_audit_log(self, start_date, end_date, event_type=None, model_name=None):
        """Query audit logs for compliance reporting"""
        blobs = self.container.list_blobs(name_starts_with="audit/")
        
        results = []
        for blob in blobs:
            blob_client = self.container.get_blob_client(blob.name)
            data = json.loads(blob_client.download_blob().readall())
            
            if start_date <= data["timestamp"] <= end_date:
                if event_type and data["event_type"] != event_type:
                    continue
                if model_name and data["model_name"] != model_name:
                    continue
                results.append(data)
        
        return results

Governance Dashboard

from flask import Flask, render_template
import pandas as pd

app = Flask(__name__)
audit_trail = ModelAuditTrail("<connection-string>", "audit-logs")
risk_assessment = AIRiskAssessment()
policy_checker = ModelGovernancePolicy()

@app.route("/governance/dashboard")
def dashboard():
    """Display governance metrics"""
    high_risks = risk_assessment.get_high_risks()
    recent_audits = audit_trail.query_audit_log(
        start_date=(datetime.utcnow() - timedelta(days=30)).isoformat(),
        end_date=datetime.utcnow().isoformat()
    )
    
    return render_template("dashboard.html", 
                           high_risks=high_risks,
                           audit_count=len(recent_audits))

Best Practices

  • Establish clear AI governance policies before deployment
  • Implement layered security (defense in depth)
  • Conduct regular risk assessments
  • Maintain comprehensive audit trails
  • Automate policy enforcement where possible
  • Train teams on compliance requirements
  • Establish incident response procedures
  • Regular third-party security audits

Troubleshooting

Issue Cause Resolution
Policy violations Unclear requirements Document policies; automate checks
Audit gaps Incomplete logging Implement comprehensive audit trail
Compliance failures Regulatory changes Regular compliance reviews
Access breaches Weak controls Enforce RBAC; MFA; monitoring

Key Takeaways

Effective AI governance requires proactive risk management, comprehensive security controls, continuous compliance monitoring, and transparent audit practices.

References