Enterprise Security
Advanced security frameworks and compliance features for enterprise deployments
Enterprise Security
Implement comprehensive security frameworks and compliance measures for enterprise CoAI.Dev deployments. This guide covers advanced security controls, compliance frameworks, and enterprise-grade protection mechanisms.
Overview
Enterprise security encompasses:
- 🔐 Identity & Access Management: Multi-factor authentication and role-based access control
- 🛡️ Data Protection: Encryption, data loss prevention, and privacy controls
- 📋 Compliance: SOC2, GDPR, HIPAA, and industry-specific requirements
- 🚨 Threat Detection: Advanced security monitoring and incident response
- 🔒 Zero Trust Architecture: Network segmentation and micro-segmentation
Security-First Approach
Enterprise security requires a multi-layered approach combining technical controls, process improvements, and continuous monitoring to protect against evolving threats.
Identity and Access Management
Multi-Factor Authentication (MFA)
Time-based One-Time Password (TOTP)
# mfa_totp.py
import pyotp
import qrcode
from io import BytesIO
import base64
class TOTPManager:
def __init__(self):
self.app_name = "CoAI.Dev"
self.issuer = "chatnio.com"
def generate_secret(self, user_email):
"""Generate TOTP secret for user"""
secret = pyotp.random_base32()
# Generate QR code
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=user_email,
issuer_name=self.issuer
)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format='PNG')
qr_code = base64.b64encode(buffer.getvalue()).decode()
return {
"secret": secret,
"qr_code": qr_code,
"manual_entry_key": secret,
"uri": totp_uri
}
def verify_token(self, secret, token):
"""Verify TOTP token"""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=1)
def generate_backup_codes(self, count=10):
"""Generate backup codes"""
import secrets
import string
codes = []
for _ in range(count):
code = ''.join(secrets.choice(string.ascii_uppercase + string.digits)
for _ in range(8))
codes.append(f"{code[:4]}-{code[4:]}")
return codes
# Usage example
mfa = TOTPManager()
user_email = "user@company.com"
# Setup MFA for user
mfa_setup = mfa.generate_secret(user_email)
print(f"Secret: {mfa_setup['secret']}")
print(f"QR Code: data:image/png;base64,{mfa_setup['qr_code']}")
# Generate backup codes
backup_codes = mfa.generate_backup_codes()
print(f"Backup codes: {backup_codes}")
# Verify token
token = input("Enter TOTP token: ")
is_valid = mfa.verify_token(mfa_setup['secret'], token)
print(f"Token valid: {is_valid}")
Configuration in CoAI.Dev:
# mfa-config.yaml
security:
mfa:
enabled: true
required_for_admin: true
required_for_api: false
totp:
issuer: "CoAI.Dev Enterprise"
algorithm: "sha1"
digits: 6
period: 30
backup_codes:
enabled: true
count: 10
length: 8
Role-Based Access Control (RBAC)
Granular Permission System
Define Roles and Permissions
# rbac-config.yaml
roles:
super_admin:
description: "Full system access"
permissions:
- "system:*"
- "users:*"
- "channels:*"
- "models:*"
- "billing:*"
- "analytics:*"
- "security:*"
organization_admin:
description: "Organization-level administration"
permissions:
- "org:manage"
- "users:create"
- "users:update"
- "users:view"
- "channels:view"
- "models:view"
- "billing:view"
- "analytics:view"
channel_manager:
description: "AI channel management"
permissions:
- "channels:create"
- "channels:update"
- "channels:delete"
- "channels:view"
- "models:create"
- "models:update"
- "models:view"
billing_manager:
description: "Billing and subscription management"
permissions:
- "billing:create"
- "billing:update"
- "billing:view"
- "analytics:billing"
- "users:billing"
user_support:
description: "User support and assistance"
permissions:
- "users:view"
- "users:support"
- "conversations:view"
- "analytics:usage"
end_user:
description: "Regular platform user"
permissions:
- "chat:use"
- "models:use"
- "files:upload"
- "conversations:manage"
permissions:
system:
- name: "system:config"
description: "Modify system configuration"
- name: "system:maintenance"
description: "Perform system maintenance"
- name: "system:logs"
description: "Access system logs"
users:
- name: "users:create"
description: "Create new users"
- name: "users:update"
description: "Update user information"
- name: "users:delete"
description: "Delete users"
- name: "users:view"
description: "View user information"
- name: "users:impersonate"
description: "Impersonate other users"
channels:
- name: "channels:create"
description: "Create AI channels"
- name: "channels:update"
description: "Update channel configuration"
- name: "channels:delete"
description: "Delete channels"
- name: "channels:view"
description: "View channel information"
models:
- name: "models:create"
description: "Add new models"
- name: "models:update"
description: "Update model configuration"
- name: "models:delete"
description: "Remove models"
- name: "models:view"
description: "View model information"
- name: "models:use"
description: "Use models for inference"
Implement Permission Checking
# rbac_system.py
import json
from typing import List, Dict, Set
from enum import Enum
class Permission:
def __init__(self, resource: str, action: str, scope: str = "global"):
self.resource = resource
self.action = action
self.scope = scope
self.permission_string = f"{resource}:{action}"
if scope != "global":
self.permission_string += f":{scope}"
def __str__(self):
return self.permission_string
def __eq__(self, other):
return str(self) == str(other)
def __hash__(self):
return hash(str(self))
class Role:
def __init__(self, name: str, permissions: List[str], description: str = ""):
self.name = name
self.description = description
self.permissions = set()
for perm_str in permissions:
if "*" in perm_str:
# Handle wildcard permissions
self.permissions.add(perm_str)
else:
parts = perm_str.split(":")
resource = parts[0]
action = parts[1] if len(parts) > 1 else "*"
scope = parts[2] if len(parts) > 2 else "global"
self.permissions.add(Permission(resource, action, scope))
def has_permission(self, permission: Permission) -> bool:
"""Check if role has specific permission"""
# Direct permission match
if permission in self.permissions:
return True
# Check wildcard permissions
for perm in self.permissions:
if isinstance(perm, str) and "*" in perm:
if self._matches_wildcard(str(permission), perm):
return True
return False
def _matches_wildcard(self, permission_str: str, wildcard_pattern: str) -> bool:
"""Check if permission matches wildcard pattern"""
import fnmatch
return fnmatch.fnmatch(permission_str, wildcard_pattern)
class User:
def __init__(self, user_id: str, username: str, roles: List[str] = None):
self.user_id = user_id
self.username = username
self.roles = roles or []
self.direct_permissions = set()
self.organization_id = None
self.team_id = None
def add_role(self, role_name: str):
"""Add role to user"""
if role_name not in self.roles:
self.roles.append(role_name)
def remove_role(self, role_name: str):
"""Remove role from user"""
if role_name in self.roles:
self.roles.remove(role_name)
def add_permission(self, permission: Permission):
"""Add direct permission to user"""
self.direct_permissions.add(permission)
class RBACSystem:
def __init__(self):
self.roles: Dict[str, Role] = {}
self.users: Dict[str, User] = {}
def define_role(self, name: str, permissions: List[str], description: str = ""):
"""Define a new role"""
self.roles[name] = Role(name, permissions, description)
def assign_role(self, user_id: str, role_name: str):
"""Assign role to user"""
if user_id not in self.users:
raise ValueError(f"User {user_id} not found")
if role_name not in self.roles:
raise ValueError(f"Role {role_name} not found")
self.users[user_id].add_role(role_name)
def create_user(self, user_id: str, username: str, roles: List[str] = None):
"""Create new user"""
self.users[user_id] = User(user_id, username, roles)
def check_permission(self, user_id: str, permission: Permission) -> bool:
"""Check if user has permission"""
user = self.users.get(user_id)
if not user:
return False
# Check direct permissions
if permission in user.direct_permissions:
return True
# Check role-based permissions
for role_name in user.roles:
role = self.roles.get(role_name)
if role and role.has_permission(permission):
return True
return False
def get_user_permissions(self, user_id: str) -> Set[Permission]:
"""Get all permissions for user"""
user = self.users.get(user_id)
if not user:
return set()
permissions = set(user.direct_permissions)
for role_name in user.roles:
role = self.roles.get(role_name)
if role:
permissions.update(role.permissions)
return permissions
def require_permission(self, permission_str: str):
"""Decorator for permission checking"""
def decorator(func):
def wrapper(*args, **kwargs):
# Extract user_id from request context
user_id = kwargs.get('user_id') or getattr(args[0], 'user_id', None)
if not user_id:
raise Exception("User ID not found in request context")
parts = permission_str.split(":")
permission = Permission(parts[0], parts[1],
parts[2] if len(parts) > 2 else "global")
if not self.check_permission(user_id, permission):
raise Exception(f"Access denied: Missing permission {permission_str}")
return func(*args, **kwargs)
return wrapper
return decorator
# Usage example
rbac = RBACSystem()
# Load roles from configuration
with open("rbac-config.yaml", "r") as f:
import yaml
config = yaml.safe_load(f)
for role_name, role_config in config["roles"].items():
rbac.define_role(
role_name,
role_config["permissions"],
role_config["description"]
)
# Create users
rbac.create_user("admin1", "admin@company.com", ["super_admin"])
rbac.create_user("user1", "user@company.com", ["end_user"])
rbac.create_user("support1", "support@company.com", ["user_support"])
# Check permissions
admin_permission = Permission("system", "config")
print(f"Admin has system config permission: {rbac.check_permission('admin1', admin_permission)}")
user_permission = Permission("chat", "use")
print(f"User has chat permission: {rbac.check_permission('user1', user_permission)}")
# Use decorator for API endpoints
@rbac.require_permission("users:create")
def create_user_api(user_id, new_user_data):
"""API endpoint that requires users:create permission"""
print(f"Creating user: {new_user_data}")
return {"status": "success"}
# Test API call
try:
create_user_api(user_id="admin1", new_user_data={"username": "newuser"})
print("User creation successful")
except Exception as e:
print(f"User creation failed: {e}")
Data Protection and Privacy
Data Loss Prevention (DLP)
Real-time Content Scanning
# dlp_scanner.py
import re
import hashlib
from typing import Dict, List, Tuple
from enum import Enum
class SensitivityLevel(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class DLPRule:
def __init__(self, name: str, pattern: str, sensitivity: SensitivityLevel,
action: str = "block", description: str = ""):
self.name = name
self.pattern = re.compile(pattern, re.IGNORECASE)
self.sensitivity = sensitivity
self.action = action # block, warn, log, redact
self.description = description
def scan(self, text: str) -> List[Dict]:
"""Scan text for pattern matches"""
matches = []
for match in self.pattern.finditer(text):
matches.append({
"rule_name": self.name,
"match": match.group(),
"start": match.start(),
"end": match.end(),
"sensitivity": self.sensitivity.value,
"action": self.action
})
return matches
class DLPScanner:
def __init__(self):
self.rules: List[DLPRule] = []
self.scan_results = []
# Load default rules
self._load_default_rules()
def _load_default_rules(self):
"""Load default DLP rules"""
# Credit card numbers
self.add_rule(DLPRule(
"credit_card",
r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
SensitivityLevel.RESTRICTED,
"block",
"Credit card number detection"
))
# Social Security Numbers
self.add_rule(DLPRule(
"ssn",
r"\b\d{3}-\d{2}-\d{4}\b",
SensitivityLevel.RESTRICTED,
"block",
"Social Security Number detection"
))
# Email addresses
self.add_rule(DLPRule(
"email",
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
SensitivityLevel.INTERNAL,
"warn",
"Email address detection"
))
# Phone numbers
self.add_rule(DLPRule(
"phone",
r"\b\+?1?[-.\s]?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
SensitivityLevel.INTERNAL,
"warn",
"Phone number detection"
))
# API keys (generic pattern)
self.add_rule(DLPRule(
"api_key",
r"\b[A-Za-z0-9]{32,}\b",
SensitivityLevel.CONFIDENTIAL,
"block",
"Potential API key detection"
))
# IP addresses
self.add_rule(DLPRule(
"ip_address",
r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b",
SensitivityLevel.INTERNAL,
"log",
"IP address detection"
))
def add_rule(self, rule: DLPRule):
"""Add DLP rule"""
self.rules.append(rule)
def scan_text(self, text: str, user_id: str = None) -> Dict:
"""Scan text for sensitive content"""
all_matches = []
for rule in self.rules:
matches = rule.scan(text)
all_matches.extend(matches)
# Determine overall action
actions = [match["action"] for match in all_matches]
if "block" in actions:
overall_action = "block"
elif "warn" in actions:
overall_action = "warn"
elif "log" in actions:
overall_action = "log"
else:
overall_action = "allow"
result = {
"scan_id": hashlib.md5(text.encode()).hexdigest(),
"user_id": user_id,
"text_length": len(text),
"matches": all_matches,
"action": overall_action,
"timestamp": "2024-01-15T10:00:00Z"
}
self.scan_results.append(result)
return result
def redact_content(self, text: str) -> str:
"""Redact sensitive content from text"""
redacted_text = text
for rule in self.rules:
if rule.action in ["redact", "block"]:
redacted_text = rule.pattern.sub("***REDACTED***", redacted_text)
return redacted_text
def get_scan_statistics(self) -> Dict:
"""Get scanning statistics"""
total_scans = len(self.scan_results)
blocked_scans = len([r for r in self.scan_results if r["action"] == "block"])
warned_scans = len([r for r in self.scan_results if r["action"] == "warn"])
rule_stats = {}
for result in self.scan_results:
for match in result["matches"]:
rule_name = match["rule_name"]
rule_stats[rule_name] = rule_stats.get(rule_name, 0) + 1
return {
"total_scans": total_scans,
"blocked_scans": blocked_scans,
"warned_scans": warned_scans,
"rule_statistics": rule_stats,
"block_rate": blocked_scans / max(total_scans, 1) * 100
}
# Usage example
dlp = DLPScanner()
# Add custom rule for company-specific data
dlp.add_rule(DLPRule(
"employee_id",
r"\bEMP\d{6}\b",
SensitivityLevel.CONFIDENTIAL,
"warn",
"Employee ID detection"
))
# Scan text
test_text = """
Hello, my credit card number is 4532-1234-5678-9012.
You can reach me at john.doe@company.com or call 555-123-4567.
My employee ID is EMP123456.
"""
result = dlp.scan_text(test_text, user_id="user123")
print(f"Scan result: {result}")
# Redact sensitive content
redacted = dlp.redact_content(test_text)
print(f"Redacted text: {redacted}")
# Get statistics
stats = dlp.get_scan_statistics()
print(f"DLP Statistics: {stats}")
Compliance Frameworks
SOC 2 Compliance
Trust Service Criteria Implementation
# soc2-controls.yaml
soc2_controls:
security:
CC6.1:
title: "Logical and Physical Access Controls"
implementation:
- Multi-factor authentication for all admin accounts
- Role-based access control (RBAC)
- Regular access reviews and deprovisioning
- Physical security for data centers
evidence:
- Access control matrix
- MFA configuration screenshots
- Access review reports
- Data center security certificates
CC6.2:
title: "System Access Monitoring"
implementation:
- Real-time access monitoring
- Failed login attempt tracking
- Privileged access logging
- Automated alerting for suspicious activity
evidence:
- Security monitoring dashboard
- Alert configuration
- Incident response logs
CC6.3:
title: "Access Rights Management"
implementation:
- Automated user provisioning/deprovisioning
- Regular access rights reviews
- Separation of duties enforcement
- Principle of least privilege
evidence:
- User access reports
- Segregation of duties matrix
- Access request approval workflows
availability:
CC7.1:
title: "System Availability"
implementation:
- 99.9% uptime SLA
- Redundant infrastructure
- Automated failover
- Load balancing
evidence:
- Uptime monitoring reports
- Infrastructure diagrams
- Disaster recovery test results
CC7.2:
title: "System Monitoring"
implementation:
- 24/7 system monitoring
- Performance metrics tracking
- Capacity planning
- Proactive alerting
evidence:
- Monitoring dashboard screenshots
- Performance reports
- Capacity planning documents
processing_integrity:
CC8.1:
title: "Data Processing Integrity"
implementation:
- Input validation and sanitization
- Error handling and logging
- Data quality checks
- Processing workflow controls
evidence:
- Code review reports
- Data validation test results
- Error handling documentation
confidentiality:
CC9.1:
title: "Confidential Information Protection"
implementation:
- Data encryption at rest and in transit
- Data classification system
- Secure data disposal
- Confidentiality agreements
evidence:
- Encryption configuration
- Data classification policy
- Secure disposal procedures
- Signed NDAs
privacy:
CC10.1:
title: "Privacy Notice and Consent"
implementation:
- Clear privacy policy
- Consent management system
- Data subject rights procedures
- Privacy impact assessments
evidence:
- Privacy policy documentation
- Consent records
- Data subject request logs
- PIA reports
GDPR Compliance Implementation
# gdpr_compliance.py
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
class GDPRComplianceManager:
def __init__(self):
self.consent_records = {}
self.data_processing_activities = []
self.data_subject_requests = []
self.breach_incidents = []
def record_consent(self, user_id: str, consent_type: str,
purpose: str, legal_basis: str) -> str:
"""Record user consent according to GDPR requirements"""
consent_id = f"consent_{user_id}_{datetime.utcnow().timestamp()}"
consent_record = {
"consent_id": consent_id,
"user_id": user_id,
"consent_type": consent_type,
"purpose": purpose,
"legal_basis": legal_basis,
"timestamp": datetime.utcnow().isoformat(),
"ip_address": "192.168.1.1", # Would get from request
"user_agent": "Mozilla/5.0...", # Would get from request
"consent_given": True,
"consent_method": "explicit",
"granular": True
}
self.consent_records[consent_id] = consent_record
return consent_id
def withdraw_consent(self, user_id: str, consent_id: str) -> bool:
"""Allow user to withdraw consent"""
if consent_id in self.consent_records:
self.consent_records[consent_id]["consent_given"] = False
self.consent_records[consent_id]["withdrawal_timestamp"] = datetime.utcnow().isoformat()
# Trigger data processing review
self._review_data_processing(user_id)
return True
return False
def handle_data_subject_request(self, user_id: str, request_type: str,
details: str = None) -> str:
"""Handle GDPR data subject requests"""
request_id = f"dsr_{user_id}_{datetime.utcnow().timestamp()}"
request = {
"request_id": request_id,
"user_id": user_id,
"request_type": request_type, # access, rectification, erasure, portability, restriction
"details": details,
"received_date": datetime.utcnow().isoformat(),
"status": "received",
"due_date": (datetime.utcnow() + timedelta(days=30)).isoformat()
}
self.data_subject_requests.append(request)
# Trigger appropriate action based on request type
if request_type == "access":
self._process_access_request(request_id, user_id)
elif request_type == "erasure":
self._process_erasure_request(request_id, user_id)
elif request_type == "portability":
self._process_portability_request(request_id, user_id)
return request_id
def _process_access_request(self, request_id: str, user_id: str):
"""Process data access request (Article 15)"""
# Collect all data about the user
user_data = {
"personal_data": self._get_user_personal_data(user_id),
"processing_purposes": self._get_processing_purposes(user_id),
"data_categories": self._get_data_categories(user_id),
"recipients": self._get_data_recipients(user_id),
"retention_periods": self._get_retention_periods(user_id),
"rights_information": self._get_rights_information(),
"data_source": self._get_data_source(user_id)
}
# Update request status
for request in self.data_subject_requests:
if request["request_id"] == request_id:
request["status"] = "completed"
request["response_data"] = user_data
request["completed_date"] = datetime.utcnow().isoformat()
break
def _process_erasure_request(self, request_id: str, user_id: str):
"""Process right to be forgotten request (Article 17)"""
# Check if erasure is legally required
erasure_conditions = self._check_erasure_conditions(user_id)
if erasure_conditions["can_erase"]:
# Perform data erasure
erased_data = self._erase_user_data(user_id)
# Update request
for request in self.data_subject_requests:
if request["request_id"] == request_id:
request["status"] = "completed"
request["action_taken"] = "data_erased"
request["erased_data"] = erased_data
request["completed_date"] = datetime.utcnow().isoformat()
break
else:
# Provide explanation for why data cannot be erased
for request in self.data_subject_requests:
if request["request_id"] == request_id:
request["status"] = "declined"
request["decline_reason"] = erasure_conditions["reason"]
request["completed_date"] = datetime.utcnow().isoformat()
break
def _process_portability_request(self, request_id: str, user_id: str):
"""Process data portability request (Article 20)"""
portable_data = self._get_portable_data(user_id)
# Export data in machine-readable format
export_data = {
"user_profile": portable_data["profile"],
"conversations": portable_data["conversations"],
"preferences": portable_data["preferences"],
"usage_history": portable_data["usage_history"]
}
# Update request
for request in self.data_subject_requests:
if request["request_id"] == request_id:
request["status"] = "completed"
request["export_data"] = export_data
request["export_format"] = "JSON"
request["completed_date"] = datetime.utcnow().isoformat()
break
def report_data_breach(self, breach_details: Dict) -> str:
"""Report data breach according to GDPR Article 33"""
breach_id = f"breach_{datetime.utcnow().timestamp()}"
breach_record = {
"breach_id": breach_id,
"detected_date": datetime.utcnow().isoformat(),
"reported_date": datetime.utcnow().isoformat(),
"breach_type": breach_details["type"], # confidentiality, integrity, availability
"affected_individuals": breach_details.get("affected_individuals", 0),
"personal_data_involved": breach_details.get("personal_data_categories", []),
"likely_consequences": breach_details.get("consequences", ""),
"measures_taken": breach_details.get("measures", ""),
"notification_required": self._assess_notification_requirement(breach_details),
"supervisory_authority_notified": False,
"individuals_notified": False
}
self.breach_incidents.append(breach_record)
# Check if notification is required within 72 hours
if breach_record["notification_required"]:
self._schedule_breach_notification(breach_id)
return breach_id
def _assess_notification_requirement(self, breach_details: Dict) -> bool:
"""Assess if breach notification is required"""
# High risk indicators
high_risk_factors = [
breach_details.get("involves_special_categories", False),
breach_details.get("affects_vulnerable_individuals", False),
breach_details.get("large_scale", False),
breach_details.get("identity_theft_risk", False),
breach_details.get("financial_loss_risk", False)
]
return any(high_risk_factors) or breach_details.get("affected_individuals", 0) > 100
def generate_privacy_impact_assessment(self, processing_activity: Dict) -> Dict:
"""Generate Privacy Impact Assessment (DPIA)"""
pia = {
"pia_id": f"pia_{datetime.utcnow().timestamp()}",
"processing_activity": processing_activity,
"necessity_assessment": self._assess_necessity(processing_activity),
"proportionality_assessment": self._assess_proportionality(processing_activity),
"risk_assessment": self._assess_privacy_risks(processing_activity),
"safeguards": self._identify_safeguards(processing_activity),
"consultation_required": self._check_consultation_requirement(processing_activity),
"completed_date": datetime.utcnow().isoformat()
}
return pia
def _get_user_personal_data(self, user_id: str) -> Dict:
"""Collect all personal data for user"""
# This would integrate with actual data stores
return {
"profile": {"name": "John Doe", "email": "john@example.com"},
"conversations": ["conversation1", "conversation2"],
"usage_logs": ["log1", "log2"],
"preferences": {"theme": "dark", "language": "en"}
}
def _erase_user_data(self, user_id: str) -> Dict:
"""Erase user data while respecting legal obligations"""
erased_categories = []
# Erase profile data
erased_categories.append("profile_data")
# Erase conversation history
erased_categories.append("conversation_history")
# Anonymize usage logs (keep for statistical purposes)
erased_categories.append("anonymized_usage_logs")
# Note: Some data may need to be retained for legal/regulatory reasons
return {
"erased_categories": erased_categories,
"erasure_date": datetime.utcnow().isoformat(),
"verification_hash": "hash_of_erasure_operation"
}
# Usage example
gdpr_manager = GDPRComplianceManager()
# Record user consent
consent_id = gdpr_manager.record_consent(
user_id="user123",
consent_type="marketing",
purpose="Send promotional emails about new AI features",
legal_basis="consent"
)
print(f"Recorded consent: {consent_id}")
# Handle data subject access request
request_id = gdpr_manager.handle_data_subject_request(
user_id="user123",
request_type="access",
details="I want to see all my personal data"
)
print(f"Processed access request: {request_id}")
# Report data breach
breach_id = gdpr_manager.report_data_breach({
"type": "confidentiality",
"affected_individuals": 150,
"personal_data_categories": ["names", "email_addresses"],
"consequences": "Potential spam emails",
"measures": "Patched vulnerability, notified affected users"
})
print(f"Reported breach: {breach_id}")
# Generate PIA for new AI feature
pia = gdpr_manager.generate_privacy_impact_assessment({
"name": "AI Conversation Analysis",
"purpose": "Improve response quality",
"data_types": ["conversation_content", "user_preferences"],
"automated_decision_making": True
})
print(f"Generated PIA: {pia['pia_id']}")
Enterprise security requires a comprehensive approach combining technical controls, compliance frameworks, and operational procedures. Start with identity management and access controls, then implement data protection measures and compliance monitoring to create a robust security posture for your CoAI.Dev deployment.