""" InsightFlow Phase 8 - 企业级功能管理模块 功能: 1. SSO/SAML 单点登录(企业微信、钉钉、飞书、Okta) 2. SCIM 用户目录同步 3. 审计日志导出(SOC2/ISO27001 合规) 4. 数据保留策略(自动归档、数据删除) 作者: InsightFlow Team """ import sqlite3 import json import uuid import hashlib import base64 import xml.etree.ElementTree as ET from datetime import datetime, timedelta from typing import Optional, List, Dict, Any, Tuple from dataclasses import dataclass, asdict from enum import Enum import logging import re logger = logging.getLogger(__name__) class SSOProvider(str, Enum): """SSO 提供商类型""" WECHAT_WORK = "wechat_work" # 企业微信 DINGTALK = "dingtalk" # 钉钉 FEISHU = "feishu" # 飞书 OKTA = "okta" # Okta AZURE_AD = "azure_ad" # Azure AD GOOGLE = "google" # Google Workspace CUSTOM_SAML = "custom_saml" # 自定义 SAML class SSOStatus(str, Enum): """SSO 配置状态""" DISABLED = "disabled" # 未启用 PENDING = "pending" # 待配置 ACTIVE = "active" # 已启用 ERROR = "error" # 配置错误 class SCIMSyncStatus(str, Enum): """SCIM 同步状态""" IDLE = "idle" # 空闲 SYNCING = "syncing" # 同步中 SUCCESS = "success" # 同步成功 FAILED = "failed" # 同步失败 class AuditLogExportFormat(str, Enum): """审计日志导出格式""" JSON = "json" CSV = "csv" PDF = "pdf" XLSX = "xlsx" class DataRetentionAction(str, Enum): """数据保留策略动作""" ARCHIVE = "archive" # 归档 DELETE = "delete" # 删除 ANONYMIZE = "anonymize" # 匿名化 class ComplianceStandard(str, Enum): """合规标准""" SOC2 = "soc2" ISO27001 = "iso27001" GDPR = "gdpr" HIPAA = "hipaa" PCI_DSS = "pci_dss" @dataclass class SSOConfig: """SSO 配置数据类""" id: str tenant_id: str provider: str # SSO 提供商 status: str # 状态 entity_id: Optional[str] # SAML Entity ID sso_url: Optional[str] # SAML SSO URL slo_url: Optional[str] # SAML SLO URL certificate: Optional[str] # SAML 证书 (X.509) metadata_url: Optional[str] # SAML 元数据 URL metadata_xml: Optional[str] # SAML 元数据 XML # OAuth/OIDC 配置 client_id: Optional[str] client_secret: Optional[str] authorization_url: Optional[str] token_url: Optional[str] userinfo_url: Optional[str] scopes: List[str] # 属性映射 attribute_mapping: Dict[str, str] # 如 {"email": "user.mail", "name": "user.name"} # 其他配置 auto_provision: bool # 自动创建用户 default_role: str # 默认角色 domain_restriction: List[str] # 允许的邮箱域名 created_at: datetime updated_at: datetime last_tested_at: Optional[datetime] last_error: Optional[str] @dataclass class SCIMConfig: """SCIM 配置数据类""" id: str tenant_id: str provider: str # 身份提供商 status: str # SCIM 服务端配置 scim_base_url: str # SCIM 服务端地址 scim_token: str # SCIM 访问令牌 # 同步配置 sync_interval_minutes: int # 同步间隔(分钟) last_sync_at: Optional[datetime] last_sync_status: Optional[str] last_sync_error: Optional[str] last_sync_users_count: int # 属性映射 attribute_mapping: Dict[str, str] # 同步规则 sync_rules: Dict[str, Any] # 过滤规则、转换规则等 created_at: datetime updated_at: datetime @dataclass class SCIMUser: """SCIM 用户数据类""" id: str tenant_id: str external_id: str # 外部系统 ID user_name: str email: str display_name: Optional[str] given_name: Optional[str] family_name: Optional[str] active: bool groups: List[str] raw_data: Dict[str, Any] # 原始 SCIM 数据 synced_at: datetime created_at: datetime updated_at: datetime @dataclass class AuditLogExport: """审计日志导出记录""" id: str tenant_id: str export_format: str start_date: datetime end_date: datetime filters: Dict[str, Any] # 过滤条件 compliance_standard: Optional[str] status: str # pending/processing/completed/failed file_path: Optional[str] file_size: Optional[int] record_count: Optional[int] checksum: Optional[str] # 文件校验和 downloaded_by: Optional[str] downloaded_at: Optional[datetime] expires_at: Optional[datetime] # 文件过期时间 created_by: str created_at: datetime completed_at: Optional[datetime] error_message: Optional[str] @dataclass class DataRetentionPolicy: """数据保留策略""" id: str tenant_id: str name: str description: Optional[str] resource_type: str # project/transcript/entity/audit_log/user_data retention_days: int # 保留天数 action: str # archive/delete/anonymize # 条件 conditions: Dict[str, Any] # 触发条件 # 执行配置 auto_execute: bool # 自动执行 execute_at: Optional[str] # 执行时间 (cron 表达式) notify_before_days: int # 提前通知天数 # 归档配置 archive_location: Optional[str] # 归档位置 archive_encryption: bool # 归档加密 # 状态 is_active: bool last_executed_at: Optional[datetime] last_execution_result: Optional[str] created_at: datetime updated_at: datetime @dataclass class DataRetentionJob: """数据保留任务""" id: str policy_id: str tenant_id: str status: str # pending/running/completed/failed started_at: Optional[datetime] completed_at: Optional[datetime] affected_records: int archived_records: int deleted_records: int error_count: int details: Dict[str, Any] created_at: datetime @dataclass class SAMLAuthRequest: """SAML 认证请求""" id: str tenant_id: str sso_config_id: str request_id: str # SAML Request ID relay_state: Optional[str] created_at: datetime expires_at: datetime used: bool used_at: Optional[datetime] @dataclass class SAMLAuthResponse: """SAML 认证响应""" id: str request_id: str tenant_id: str user_id: Optional[str] email: Optional[str] name: Optional[str] attributes: Dict[str, Any] session_index: Optional[str] processed: bool processed_at: Optional[datetime] created_at: datetime class EnterpriseManager: """企业级功能管理器""" # 默认属性映射 DEFAULT_ATTRIBUTE_MAPPING = { SSOProvider.WECHAT_WORK: { "email": "email", "name": "name", "department": "department", "position": "position" }, SSOProvider.DINGTALK: { "email": "email", "name": "name", "department": "department", "job_title": "title" }, SSOProvider.FEISHU: { "email": "email", "name": "name", "department": "department", "employee_no": "employee_no" }, SSOProvider.OKTA: { "email": "user.email", "name": "user.firstName + ' ' + user.lastName", "first_name": "user.firstName", "last_name": "user.lastName", "groups": "groups" } } # 合规标准字段映射 COMPLIANCE_FIELDS = { ComplianceStandard.SOC2: [ "timestamp", "user_id", "user_email", "action", "resource_type", "resource_id", "ip_address", "user_agent", "success", "details" ], ComplianceStandard.ISO27001: [ "timestamp", "user_id", "action", "resource_type", "resource_id", "classification", "access_type", "result", "justification" ], ComplianceStandard.GDPR: [ "timestamp", "user_id", "action", "data_subject_id", "data_category", "processing_purpose", "legal_basis", "retention_period" ] } def __init__(self, db_path: str = "insightflow.db"): self.db_path = db_path self._init_db() def _get_connection(self) -> sqlite3.Connection: """获取数据库连接""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def _init_db(self): """初始化数据库表""" conn = self._get_connection() try: cursor = conn.cursor() # SSO 配置表 cursor.execute(""" CREATE TABLE IF NOT EXISTS sso_configs ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, provider TEXT NOT NULL, status TEXT DEFAULT 'disabled', entity_id TEXT, sso_url TEXT, slo_url TEXT, certificate TEXT, metadata_url TEXT, metadata_xml TEXT, client_id TEXT, client_secret TEXT, authorization_url TEXT, token_url TEXT, userinfo_url TEXT, scopes TEXT DEFAULT '["openid", "email", "profile"]', attribute_mapping TEXT DEFAULT '{}', auto_provision INTEGER DEFAULT 1, default_role TEXT DEFAULT 'member', domain_restriction TEXT DEFAULT '[]', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_tested_at TIMESTAMP, last_error TEXT, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE ) """) # SAML 认证请求表 cursor.execute(""" CREATE TABLE IF NOT EXISTS saml_auth_requests ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, sso_config_id TEXT NOT NULL, request_id TEXT NOT NULL UNIQUE, relay_state TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NOT NULL, used INTEGER DEFAULT 0, used_at TIMESTAMP, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE, FOREIGN KEY (sso_config_id) REFERENCES sso_configs(id) ON DELETE CASCADE ) """) # SAML 认证响应表 cursor.execute(""" CREATE TABLE IF NOT EXISTS saml_auth_responses ( id TEXT PRIMARY KEY, request_id TEXT NOT NULL, tenant_id TEXT NOT NULL, user_id TEXT, email TEXT, name TEXT, attributes TEXT DEFAULT '{}', session_index TEXT, processed INTEGER DEFAULT 0, processed_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (request_id) REFERENCES saml_auth_requests(request_id) ON DELETE CASCADE, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE ) """) # SCIM 配置表 cursor.execute(""" CREATE TABLE IF NOT EXISTS scim_configs ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, provider TEXT NOT NULL, status TEXT DEFAULT 'disabled', scim_base_url TEXT, scim_token TEXT, sync_interval_minutes INTEGER DEFAULT 60, last_sync_at TIMESTAMP, last_sync_status TEXT, last_sync_error TEXT, last_sync_users_count INTEGER DEFAULT 0, attribute_mapping TEXT DEFAULT '{}', sync_rules TEXT DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE ) """) # SCIM 用户表 cursor.execute(""" CREATE TABLE IF NOT EXISTS scim_users ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, external_id TEXT NOT NULL, user_name TEXT NOT NULL, email TEXT NOT NULL, display_name TEXT, given_name TEXT, family_name TEXT, active INTEGER DEFAULT 1, groups TEXT DEFAULT '[]', raw_data TEXT DEFAULT '{}', synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE, UNIQUE(tenant_id, external_id) ) """) # 审计日志导出表 cursor.execute(""" CREATE TABLE IF NOT EXISTS audit_log_exports ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, export_format TEXT NOT NULL, start_date TIMESTAMP NOT NULL, end_date TIMESTAMP NOT NULL, filters TEXT DEFAULT '{}', compliance_standard TEXT, status TEXT DEFAULT 'pending', file_path TEXT, file_size INTEGER, record_count INTEGER, checksum TEXT, downloaded_by TEXT, downloaded_at TIMESTAMP, expires_at TIMESTAMP, created_by TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, error_message TEXT, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE ) """) # 数据保留策略表 cursor.execute(""" CREATE TABLE IF NOT EXISTS data_retention_policies ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT, resource_type TEXT NOT NULL, retention_days INTEGER NOT NULL, action TEXT NOT NULL, conditions TEXT DEFAULT '{}', auto_execute INTEGER DEFAULT 0, execute_at TEXT, notify_before_days INTEGER DEFAULT 7, archive_location TEXT, archive_encryption INTEGER DEFAULT 1, is_active INTEGER DEFAULT 1, last_executed_at TIMESTAMP, last_execution_result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE ) """) # 数据保留任务表 cursor.execute(""" CREATE TABLE IF NOT EXISTS data_retention_jobs ( id TEXT PRIMARY KEY, policy_id TEXT NOT NULL, tenant_id TEXT NOT NULL, status TEXT DEFAULT 'pending', started_at TIMESTAMP, completed_at TIMESTAMP, affected_records INTEGER DEFAULT 0, archived_records INTEGER DEFAULT 0, deleted_records INTEGER DEFAULT 0, error_count INTEGER DEFAULT 0, details TEXT DEFAULT '{}', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (policy_id) REFERENCES data_retention_policies(id) ON DELETE CASCADE, FOREIGN KEY (tenant_id) REFERENCES tenants(id) ON DELETE CASCADE ) """) # 创建索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_sso_tenant ON sso_configs(tenant_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_sso_provider ON sso_configs(provider)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_saml_requests_config ON saml_auth_requests(sso_config_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_saml_requests_expires ON saml_auth_requests(expires_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_saml_responses_request ON saml_auth_responses(request_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_scim_config_tenant ON scim_configs(tenant_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_scim_users_tenant ON scim_users(tenant_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_scim_users_external ON scim_users(external_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_audit_export_tenant ON audit_log_exports(tenant_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_audit_export_status ON audit_log_exports(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_retention_tenant ON data_retention_policies(tenant_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_retention_type ON data_retention_policies(resource_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_retention_jobs_policy ON data_retention_jobs(policy_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_retention_jobs_status ON data_retention_jobs(status)") conn.commit() logger.info("Enterprise tables initialized successfully") except Exception as e: logger.error(f"Error initializing enterprise tables: {e}") raise finally: conn.close() # ==================== SSO/SAML 管理 ==================== def create_sso_config(self, tenant_id: str, provider: str, entity_id: Optional[str] = None, sso_url: Optional[str] = None, slo_url: Optional[str] = None, certificate: Optional[str] = None, metadata_url: Optional[str] = None, metadata_xml: Optional[str] = None, client_id: Optional[str] = None, client_secret: Optional[str] = None, authorization_url: Optional[str] = None, token_url: Optional[str] = None, userinfo_url: Optional[str] = None, scopes: Optional[List[str]] = None, attribute_mapping: Optional[Dict[str, str]] = None, auto_provision: bool = True, default_role: str = "member", domain_restriction: Optional[List[str]] = None) -> SSOConfig: """创建 SSO 配置""" conn = self._get_connection() try: config_id = str(uuid.uuid4()) now = datetime.now() # 使用默认属性映射 if attribute_mapping is None and provider in self.DEFAULT_ATTRIBUTE_MAPPING: attribute_mapping = self.DEFAULT_ATTRIBUTE_MAPPING[SSOProvider(provider)] config = SSOConfig( id=config_id, tenant_id=tenant_id, provider=provider, status=SSOStatus.PENDING.value, entity_id=entity_id, sso_url=sso_url, slo_url=slo_url, certificate=certificate, metadata_url=metadata_url, metadata_xml=metadata_xml, client_id=client_id, client_secret=client_secret, authorization_url=authorization_url, token_url=token_url, userinfo_url=userinfo_url, scopes=scopes or ["openid", "email", "profile"], attribute_mapping=attribute_mapping or {}, auto_provision=auto_provision, default_role=default_role, domain_restriction=domain_restriction or [], created_at=now, updated_at=now, last_tested_at=None, last_error=None ) cursor = conn.cursor() cursor.execute(""" INSERT INTO sso_configs (id, tenant_id, provider, status, entity_id, sso_url, slo_url, certificate, metadata_url, metadata_xml, client_id, client_secret, authorization_url, token_url, userinfo_url, scopes, attribute_mapping, auto_provision, default_role, domain_restriction, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( config.id, config.tenant_id, config.provider, config.status, config.entity_id, config.sso_url, config.slo_url, config.certificate, config.metadata_url, config.metadata_xml, config.client_id, config.client_secret, config.authorization_url, config.token_url, config.userinfo_url, json.dumps(config.scopes), json.dumps(config.attribute_mapping), int(config.auto_provision), config.default_role, json.dumps(config.domain_restriction), config.created_at, config.updated_at )) conn.commit() logger.info(f"SSO config created: {config_id} for tenant {tenant_id}") return config except Exception as e: conn.rollback() logger.error(f"Error creating SSO config: {e}") raise finally: conn.close() def get_sso_config(self, config_id: str) -> Optional[SSOConfig]: """获取 SSO 配置""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM sso_configs WHERE id = ?", (config_id,)) row = cursor.fetchone() if row: return self._row_to_sso_config(row) return None finally: conn.close() def get_tenant_sso_config(self, tenant_id: str, provider: Optional[str] = None) -> Optional[SSOConfig]: """获取租户的 SSO 配置""" conn = self._get_connection() try: cursor = conn.cursor() if provider: cursor.execute(""" SELECT * FROM sso_configs WHERE tenant_id = ? AND provider = ? ORDER BY created_at DESC LIMIT 1 """, (tenant_id, provider)) else: cursor.execute(""" SELECT * FROM sso_configs WHERE tenant_id = ? AND status = 'active' ORDER BY created_at DESC LIMIT 1 """, (tenant_id,)) row = cursor.fetchone() if row: return self._row_to_sso_config(row) return None finally: conn.close() def update_sso_config(self, config_id: str, **kwargs) -> Optional[SSOConfig]: """更新 SSO 配置""" conn = self._get_connection() try: config = self.get_sso_config(config_id) if not config: return None updates = [] params = [] allowed_fields = ['entity_id', 'sso_url', 'slo_url', 'certificate', 'metadata_url', 'metadata_xml', 'client_id', 'client_secret', 'authorization_url', 'token_url', 'userinfo_url', 'scopes', 'attribute_mapping', 'auto_provision', 'default_role', 'domain_restriction', 'status'] for key, value in kwargs.items(): if key in allowed_fields: updates.append(f"{key} = ?") if key in ['scopes', 'attribute_mapping', 'domain_restriction']: params.append(json.dumps(value) if value else '[]') elif key == 'auto_provision': params.append(int(value)) else: params.append(value) if not updates: return config updates.append("updated_at = ?") params.append(datetime.now()) params.append(config_id) cursor = conn.cursor() cursor.execute(f""" UPDATE sso_configs SET {', '.join(updates)} WHERE id = ? """, params) conn.commit() return self.get_sso_config(config_id) finally: conn.close() def delete_sso_config(self, config_id: str) -> bool: """删除 SSO 配置""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("DELETE FROM sso_configs WHERE id = ?", (config_id,)) conn.commit() return cursor.rowcount > 0 finally: conn.close() def list_sso_configs(self, tenant_id: str) -> List[SSOConfig]: """列出租户的所有 SSO 配置""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT * FROM sso_configs WHERE tenant_id = ? ORDER BY created_at DESC """, (tenant_id,)) rows = cursor.fetchall() return [self._row_to_sso_config(row) for row in rows] finally: conn.close() def generate_saml_metadata(self, config_id: str, base_url: str) -> str: """生成 SAML Service Provider 元数据""" config = self.get_sso_config(config_id) if not config: raise ValueError(f"SSO config {config_id} not found") # 生成 SP 实体 ID sp_entity_id = f"{base_url}/api/v1/sso/saml/{config.tenant_id}" acs_url = f"{base_url}/api/v1/sso/saml/{config.tenant_id}/acs" slo_url = f"{base_url}/api/v1/sso/saml/{config.tenant_id}/slo" # 生成 X.509 证书(简化实现,实际应该生成真实的密钥对) cert = config.certificate or self._generate_self_signed_cert() metadata = f""" {cert} InsightFlow InsightFlow {base_url} """ return metadata def create_saml_auth_request(self, tenant_id: str, config_id: str, relay_state: Optional[str] = None) -> SAMLAuthRequest: """创建 SAML 认证请求""" conn = self._get_connection() try: request_id = f"_{uuid.uuid4().hex}" now = datetime.now() expires = now + timedelta(minutes=10) auth_request = SAMLAuthRequest( id=str(uuid.uuid4()), tenant_id=tenant_id, sso_config_id=config_id, request_id=request_id, relay_state=relay_state, created_at=now, expires_at=expires, used=False, used_at=None ) cursor = conn.cursor() cursor.execute(""" INSERT INTO saml_auth_requests (id, tenant_id, sso_config_id, request_id, relay_state, created_at, expires_at, used) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( auth_request.id, auth_request.tenant_id, auth_request.sso_config_id, auth_request.request_id, auth_request.relay_state, auth_request.created_at, auth_request.expires_at, int(auth_request.used) )) conn.commit() return auth_request finally: conn.close() def get_saml_auth_request(self, request_id: str) -> Optional[SAMLAuthRequest]: """获取 SAML 认证请求""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT * FROM saml_auth_requests WHERE request_id = ? """, (request_id,)) row = cursor.fetchone() if row: return self._row_to_saml_request(row) return None finally: conn.close() def process_saml_response(self, request_id: str, saml_response: str) -> Optional[SAMLAuthResponse]: """处理 SAML 响应""" # 这里应该实现实际的 SAML 响应解析 # 简化实现:假设响应已经验证并解析 conn = self._get_connection() try: # 解析 SAML Response(简化) # 实际应该使用 python-saml 或类似库 attributes = self._parse_saml_response(saml_response) auth_response = SAMLAuthResponse( id=str(uuid.uuid4()), request_id=request_id, tenant_id="", # 从 request 获取 user_id=None, email=attributes.get("email"), name=attributes.get("name"), attributes=attributes, session_index=attributes.get("session_index"), processed=False, processed_at=None, created_at=datetime.now() ) cursor = conn.cursor() cursor.execute(""" INSERT INTO saml_auth_responses (id, request_id, tenant_id, user_id, email, name, attributes, session_index, processed, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( auth_response.id, auth_response.request_id, auth_response.tenant_id, auth_response.user_id, auth_response.email, auth_response.name, json.dumps(auth_response.attributes), auth_response.session_index, int(auth_response.processed), auth_response.created_at )) conn.commit() return auth_response finally: conn.close() def _parse_saml_response(self, saml_response: str) -> Dict[str, Any]: """解析 SAML 响应(简化实现)""" # 实际应该使用 python-saml 库解析 # 这里返回模拟数据 return { "email": "user@example.com", "name": "Test User", "session_index": f"_{uuid.uuid4().hex}" } def _generate_self_signed_cert(self) -> str: """生成自签名证书(简化实现)""" # 实际应该使用 cryptography 库生成 return "MIICpDCCAYwCCQDU+pQ4nEHXqzANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC..." # ==================== SCIM 用户目录同步 ==================== def create_scim_config(self, tenant_id: str, provider: str, scim_base_url: str, scim_token: str, sync_interval_minutes: int = 60, attribute_mapping: Optional[Dict[str, str]] = None, sync_rules: Optional[Dict[str, Any]] = None) -> SCIMConfig: """创建 SCIM 配置""" conn = self._get_connection() try: config_id = str(uuid.uuid4()) now = datetime.now() config = SCIMConfig( id=config_id, tenant_id=tenant_id, provider=provider, status="disabled", scim_base_url=scim_base_url, scim_token=scim_token, sync_interval_minutes=sync_interval_minutes, last_sync_at=None, last_sync_status=None, last_sync_error=None, last_sync_users_count=0, attribute_mapping=attribute_mapping or {}, sync_rules=sync_rules or {}, created_at=now, updated_at=now ) cursor = conn.cursor() cursor.execute(""" INSERT INTO scim_configs (id, tenant_id, provider, status, scim_base_url, scim_token, sync_interval_minutes, attribute_mapping, sync_rules, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( config.id, config.tenant_id, config.provider, config.status, config.scim_base_url, config.scim_token, config.sync_interval_minutes, json.dumps(config.attribute_mapping), json.dumps(config.sync_rules), config.created_at, config.updated_at )) conn.commit() logger.info(f"SCIM config created: {config_id} for tenant {tenant_id}") return config except Exception as e: conn.rollback() logger.error(f"Error creating SCIM config: {e}") raise finally: conn.close() def get_scim_config(self, config_id: str) -> Optional[SCIMConfig]: """获取 SCIM 配置""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM scim_configs WHERE id = ?", (config_id,)) row = cursor.fetchone() if row: return self._row_to_scim_config(row) return None finally: conn.close() def get_tenant_scim_config(self, tenant_id: str) -> Optional[SCIMConfig]: """获取租户的 SCIM 配置""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT * FROM scim_configs WHERE tenant_id = ? ORDER BY created_at DESC LIMIT 1 """, (tenant_id,)) row = cursor.fetchone() if row: return self._row_to_scim_config(row) return None finally: conn.close() def update_scim_config(self, config_id: str, **kwargs) -> Optional[SCIMConfig]: """更新 SCIM 配置""" conn = self._get_connection() try: config = self.get_scim_config(config_id) if not config: return None updates = [] params = [] allowed_fields = ['scim_base_url', 'scim_token', 'sync_interval_minutes', 'attribute_mapping', 'sync_rules', 'status'] for key, value in kwargs.items(): if key in allowed_fields: updates.append(f"{key} = ?") if key in ['attribute_mapping', 'sync_rules']: params.append(json.dumps(value) if value else '{}') else: params.append(value) if not updates: return config updates.append("updated_at = ?") params.append(datetime.now()) params.append(config_id) cursor = conn.cursor() cursor.execute(f""" UPDATE scim_configs SET {', '.join(updates)} WHERE id = ? """, params) conn.commit() return self.get_scim_config(config_id) finally: conn.close() def sync_scim_users(self, config_id: str) -> Dict[str, Any]: """执行 SCIM 用户同步""" config = self.get_scim_config(config_id) if not config: raise ValueError(f"SCIM config {config_id} not found") conn = self._get_connection() try: now = datetime.now() # 更新同步状态 cursor = conn.cursor() cursor.execute(""" UPDATE scim_configs SET status = 'syncing', last_sync_at = ? WHERE id = ? """, (now, config_id)) conn.commit() try: # 模拟从 SCIM 服务端获取用户 # 实际应该使用 HTTP 请求获取 users = self._fetch_scim_users(config) synced_count = 0 for user_data in users: self._upsert_scim_user(conn, config.tenant_id, user_data) synced_count += 1 # 更新同步状态 cursor.execute(""" UPDATE scim_configs SET status = 'active', last_sync_status = 'success', last_sync_error = NULL, last_sync_users_count = ? WHERE id = ? """, (synced_count, config_id)) conn.commit() return { "success": True, "synced_count": synced_count, "timestamp": now.isoformat() } except Exception as e: cursor.execute(""" UPDATE scim_configs SET status = 'error', last_sync_status = 'failed', last_sync_error = ? WHERE id = ? """, (str(e), config_id)) conn.commit() return { "success": False, "error": str(e), "timestamp": now.isoformat() } finally: conn.close() def _fetch_scim_users(self, config: SCIMConfig) -> List[Dict[str, Any]]: """从 SCIM 服务端获取用户(模拟实现)""" # 实际应该使用 HTTP 请求获取 # GET {scim_base_url}/Users return [] def _upsert_scim_user(self, conn: sqlite3.Connection, tenant_id: str, user_data: Dict[str, Any]): """插入或更新 SCIM 用户""" cursor = conn.cursor() external_id = user_data.get("id") user_name = user_data.get("userName", "") email = user_data.get("emails", [{}])[0].get("value", "") display_name = user_data.get("displayName") name = user_data.get("name", {}) given_name = name.get("givenName") family_name = name.get("familyName") active = user_data.get("active", True) groups = [g.get("value") for g in user_data.get("groups", [])] cursor.execute(""" INSERT INTO scim_users (id, tenant_id, external_id, user_name, email, display_name, given_name, family_name, active, groups, raw_data, synced_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(tenant_id, external_id) DO UPDATE SET user_name = excluded.user_name, email = excluded.email, display_name = excluded.display_name, given_name = excluded.given_name, family_name = excluded.family_name, active = excluded.active, groups = excluded.groups, raw_data = excluded.raw_data, synced_at = excluded.synced_at, updated_at = CURRENT_TIMESTAMP """, ( str(uuid.uuid4()), tenant_id, external_id, user_name, email, display_name, given_name, family_name, int(active), json.dumps(groups), json.dumps(user_data), datetime.now() )) def list_scim_users(self, tenant_id: str, active_only: bool = True) -> List[SCIMUser]: """列出 SCIM 用户""" conn = self._get_connection() try: cursor = conn.cursor() query = "SELECT * FROM scim_users WHERE tenant_id = ?" params = [tenant_id] if active_only: query += " AND active = 1" query += " ORDER BY synced_at DESC" cursor.execute(query, params) rows = cursor.fetchall() return [self._row_to_scim_user(row) for row in rows] finally: conn.close() # ==================== 审计日志导出 ==================== def create_audit_export(self, tenant_id: str, export_format: str, start_date: datetime, end_date: datetime, created_by: str, filters: Optional[Dict[str, Any]] = None, compliance_standard: Optional[str] = None) -> AuditLogExport: """创建审计日志导出任务""" conn = self._get_connection() try: export_id = str(uuid.uuid4()) now = datetime.now() # 默认7天后过期 expires_at = now + timedelta(days=7) export = AuditLogExport( id=export_id, tenant_id=tenant_id, export_format=export_format, start_date=start_date, end_date=end_date, filters=filters or {}, compliance_standard=compliance_standard, status="pending", file_path=None, file_size=None, record_count=None, checksum=None, downloaded_by=None, downloaded_at=None, expires_at=expires_at, created_by=created_by, created_at=now, completed_at=None, error_message=None ) cursor = conn.cursor() cursor.execute(""" INSERT INTO audit_log_exports (id, tenant_id, export_format, start_date, end_date, filters, compliance_standard, status, expires_at, created_by, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( export.id, export.tenant_id, export.export_format, export.start_date, export.end_date, json.dumps(export.filters), export.compliance_standard, export.status, export.expires_at, export.created_by, export.created_at )) conn.commit() logger.info(f"Audit export created: {export_id}") return export except Exception as e: conn.rollback() logger.error(f"Error creating audit export: {e}") raise finally: conn.close() def process_audit_export(self, export_id: str, db_manager=None) -> Optional[AuditLogExport]: """处理审计日志导出任务""" export = self.get_audit_export(export_id) if not export: return None conn = self._get_connection() try: # 更新状态为处理中 cursor = conn.cursor() cursor.execute(""" UPDATE audit_log_exports SET status = 'processing' WHERE id = ? """, (export_id,)) conn.commit() try: # 获取审计日志数据 logs = self._fetch_audit_logs( export.tenant_id, export.start_date, export.end_date, export.filters, db_manager ) # 根据合规标准过滤字段 if export.compliance_standard: logs = self._apply_compliance_filter(logs, export.compliance_standard) # 生成导出文件 file_path, file_size, checksum = self._generate_export_file( export_id, logs, export.export_format ) now = datetime.now() # 更新导出记录 cursor.execute(""" UPDATE audit_log_exports SET status = 'completed', file_path = ?, file_size = ?, record_count = ?, checksum = ?, completed_at = ? WHERE id = ? """, (file_path, file_size, len(logs), checksum, now, export_id)) conn.commit() return self.get_audit_export(export_id) except Exception as e: cursor.execute(""" UPDATE audit_log_exports SET status = 'failed', error_message = ? WHERE id = ? """, (str(e), export_id)) conn.commit() raise finally: conn.close() def _fetch_audit_logs(self, tenant_id: str, start_date: datetime, end_date: datetime, filters: Dict[str, Any], db_manager=None) -> List[Dict[str, Any]]: """获取审计日志数据""" if db_manager is None: return [] # 使用 db_manager 获取审计日志 # 这里简化实现 return [] def _apply_compliance_filter(self, logs: List[Dict[str, Any]], standard: str) -> List[Dict[str, Any]]: """应用合规标准字段过滤""" fields = self.COMPLIANCE_FIELDS.get(ComplianceStandard(standard), []) if not fields: return logs filtered_logs = [] for log in logs: filtered_log = {k: v for k, v in log.items() if k in fields} filtered_logs.append(filtered_log) return filtered_logs def _generate_export_file(self, export_id: str, logs: List[Dict[str, Any]], format: str) -> Tuple[str, int, str]: """生成导出文件""" import os import hashlib export_dir = "/tmp/insightflow/exports" os.makedirs(export_dir, exist_ok=True) file_path = f"{export_dir}/audit_export_{export_id}.{format}" if format == "json": content = json.dumps(logs, ensure_ascii=False, indent=2) with open(file_path, "w", encoding="utf-8") as f: f.write(content) elif format == "csv": import csv if logs: with open(file_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=logs[0].keys()) writer.writeheader() writer.writerows(logs) else: # 其他格式暂不支持 content = json.dumps(logs, ensure_ascii=False) with open(file_path, "w", encoding="utf-8") as f: f.write(content) file_size = os.path.getsize(file_path) # 计算校验和 with open(file_path, "rb") as f: checksum = hashlib.sha256(f.read()).hexdigest() return file_path, file_size, checksum def get_audit_export(self, export_id: str) -> Optional[AuditLogExport]: """获取审计日志导出记录""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM audit_log_exports WHERE id = ?", (export_id,)) row = cursor.fetchone() if row: return self._row_to_audit_export(row) return None finally: conn.close() def list_audit_exports(self, tenant_id: str, limit: int = 100) -> List[AuditLogExport]: """列出审计日志导出记录""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT * FROM audit_log_exports WHERE tenant_id = ? ORDER BY created_at DESC LIMIT ? """, (tenant_id, limit)) rows = cursor.fetchall() return [self._row_to_audit_export(row) for row in rows] finally: conn.close() def mark_export_downloaded(self, export_id: str, downloaded_by: str) -> bool: """标记导出文件已下载""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute(""" UPDATE audit_log_exports SET downloaded_by = ?, downloaded_at = ? WHERE id = ? """, (downloaded_by, datetime.now(), export_id)) conn.commit() return cursor.rowcount > 0 finally: conn.close() # ==================== 数据保留策略 ==================== def create_retention_policy(self, tenant_id: str, name: str, resource_type: str, retention_days: int, action: str, description: Optional[str] = None, conditions: Optional[Dict[str, Any]] = None, auto_execute: bool = False, execute_at: Optional[str] = None, notify_before_days: int = 7, archive_location: Optional[str] = None, archive_encryption: bool = True) -> DataRetentionPolicy: """创建数据保留策略""" conn = self._get_connection() try: policy_id = str(uuid.uuid4()) now = datetime.now() policy = DataRetentionPolicy( id=policy_id, tenant_id=tenant_id, name=name, description=description, resource_type=resource_type, retention_days=retention_days, action=action, conditions=conditions or {}, auto_execute=auto_execute, execute_at=execute_at, notify_before_days=notify_before_days, archive_location=archive_location, archive_encryption=archive_encryption, is_active=True, last_executed_at=None, last_execution_result=None, created_at=now, updated_at=now ) cursor = conn.cursor() cursor.execute(""" INSERT INTO data_retention_policies (id, tenant_id, name, description, resource_type, retention_days, action, conditions, auto_execute, execute_at, notify_before_days, archive_location, archive_encryption, is_active, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( policy.id, policy.tenant_id, policy.name, policy.description, policy.resource_type, policy.retention_days, policy.action, json.dumps(policy.conditions), int(policy.auto_execute), policy.execute_at, policy.notify_before_days, policy.archive_location, int(policy.archive_encryption), int(policy.is_active), policy.created_at, policy.updated_at )) conn.commit() logger.info(f"Retention policy created: {policy_id}") return policy except Exception as e: conn.rollback() logger.error(f"Error creating retention policy: {e}") raise finally: conn.close() def get_retention_policy(self, policy_id: str) -> Optional[DataRetentionPolicy]: """获取数据保留策略""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM data_retention_policies WHERE id = ?", (policy_id,)) row = cursor.fetchone() if row: return self._row_to_retention_policy(row) return None finally: conn.close() def list_retention_policies(self, tenant_id: str, resource_type: Optional[str] = None) -> List[DataRetentionPolicy]: """列出数据保留策略""" conn = self._get_connection() try: cursor = conn.cursor() query = "SELECT * FROM data_retention_policies WHERE tenant_id = ?" params = [tenant_id] if resource_type: query += " AND resource_type = ?" params.append(resource_type) query += " ORDER BY created_at DESC" cursor.execute(query, params) rows = cursor.fetchall() return [self._row_to_retention_policy(row) for row in rows] finally: conn.close() def update_retention_policy(self, policy_id: str, **kwargs) -> Optional[DataRetentionPolicy]: """更新数据保留策略""" conn = self._get_connection() try: policy = self.get_retention_policy(policy_id) if not policy: return None updates = [] params = [] allowed_fields = ['name', 'description', 'retention_days', 'action', 'conditions', 'auto_execute', 'execute_at', 'notify_before_days', 'archive_location', 'archive_encryption', 'is_active'] for key, value in kwargs.items(): if key in allowed_fields: updates.append(f"{key} = ?") if key == 'conditions': params.append(json.dumps(value) if value else '{}') elif key in ['auto_execute', 'archive_encryption', 'is_active']: params.append(int(value)) else: params.append(value) if not updates: return policy updates.append("updated_at = ?") params.append(datetime.now()) params.append(policy_id) cursor = conn.cursor() cursor.execute(f""" UPDATE data_retention_policies SET {', '.join(updates)} WHERE id = ? """, params) conn.commit() return self.get_retention_policy(policy_id) finally: conn.close() def delete_retention_policy(self, policy_id: str) -> bool: """删除数据保留策略""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("DELETE FROM data_retention_policies WHERE id = ?", (policy_id,)) conn.commit() return cursor.rowcount > 0 finally: conn.close() def execute_retention_policy(self, policy_id: str) -> DataRetentionJob: """执行数据保留策略""" policy = self.get_retention_policy(policy_id) if not policy: raise ValueError(f"Retention policy {policy_id} not found") conn = self._get_connection() try: job_id = str(uuid.uuid4()) now = datetime.now() job = DataRetentionJob( id=job_id, policy_id=policy_id, tenant_id=policy.tenant_id, status="running", started_at=now, completed_at=None, affected_records=0, archived_records=0, deleted_records=0, error_count=0, details={}, created_at=now ) cursor = conn.cursor() cursor.execute(""" INSERT INTO data_retention_jobs (id, policy_id, tenant_id, status, started_at, created_at) VALUES (?, ?, ?, ?, ?, ?) """, (job.id, job.policy_id, job.tenant_id, job.status, job.started_at, job.created_at)) conn.commit() try: # 计算截止日期 cutoff_date = now - timedelta(days=policy.retention_days) # 根据资源类型执行不同的处理 if policy.resource_type == "audit_log": result = self._retain_audit_logs(conn, policy, cutoff_date) elif policy.resource_type == "project": result = self._retain_projects(conn, policy, cutoff_date) elif policy.resource_type == "transcript": result = self._retain_transcripts(conn, policy, cutoff_date) else: result = {"affected": 0, "archived": 0, "deleted": 0, "errors": 0} # 更新任务状态 cursor.execute(""" UPDATE data_retention_jobs SET status = 'completed', completed_at = ?, affected_records = ?, archived_records = ?, deleted_records = ?, error_count = ?, details = ? WHERE id = ? """, ( datetime.now(), result.get("affected", 0), result.get("archived", 0), result.get("deleted", 0), result.get("errors", 0), json.dumps(result), job_id )) # 更新策略最后执行时间 cursor.execute(""" UPDATE data_retention_policies SET last_executed_at = ?, last_execution_result = 'success' WHERE id = ? """, (datetime.now(), policy_id)) conn.commit() except Exception as e: cursor.execute(""" UPDATE data_retention_jobs SET status = 'failed', completed_at = ?, error_count = 1, details = ? WHERE id = ? """, (datetime.now(), json.dumps({"error": str(e)}), job_id)) cursor.execute(""" UPDATE data_retention_policies SET last_executed_at = ?, last_execution_result = ? WHERE id = ? """, (datetime.now(), str(e), policy_id)) conn.commit() raise return self.get_retention_job(job_id) finally: conn.close() def _retain_audit_logs(self, conn: sqlite3.Connection, policy: DataRetentionPolicy, cutoff_date: datetime) -> Dict[str, int]: """保留审计日志""" cursor = conn.cursor() # 获取符合条件的记录数 cursor.execute(""" SELECT COUNT(*) as count FROM audit_logs WHERE created_at < ? """, (cutoff_date,)) count = cursor.fetchone()['count'] if policy.action == DataRetentionAction.DELETE.value: cursor.execute(""" DELETE FROM audit_logs WHERE created_at < ? """, (cutoff_date,)) deleted = cursor.rowcount return {"affected": count, "archived": 0, "deleted": deleted, "errors": 0} elif policy.action == DataRetentionAction.ARCHIVE.value: # 归档逻辑(简化实现) archived = count return {"affected": count, "archived": archived, "deleted": 0, "errors": 0} return {"affected": 0, "archived": 0, "deleted": 0, "errors": 0} def _retain_projects(self, conn: sqlite3.Connection, policy: DataRetentionPolicy, cutoff_date: datetime) -> Dict[str, int]: """保留项目数据""" # 简化实现 return {"affected": 0, "archived": 0, "deleted": 0, "errors": 0} def _retain_transcripts(self, conn: sqlite3.Connection, policy: DataRetentionPolicy, cutoff_date: datetime) -> Dict[str, int]: """保留转录数据""" # 简化实现 return {"affected": 0, "archived": 0, "deleted": 0, "errors": 0} def get_retention_job(self, job_id: str) -> Optional[DataRetentionJob]: """获取数据保留任务""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute("SELECT * FROM data_retention_jobs WHERE id = ?", (job_id,)) row = cursor.fetchone() if row: return self._row_to_retention_job(row) return None finally: conn.close() def list_retention_jobs(self, policy_id: str, limit: int = 100) -> List[DataRetentionJob]: """列出数据保留任务""" conn = self._get_connection() try: cursor = conn.cursor() cursor.execute(""" SELECT * FROM data_retention_jobs WHERE policy_id = ? ORDER BY created_at DESC LIMIT ? """, (policy_id, limit)) rows = cursor.fetchall() return [self._row_to_retention_job(row) for row in rows] finally: conn.close() # ==================== 辅助方法 ==================== def _row_to_sso_config(self, row: sqlite3.Row) -> SSOConfig: """数据库行转换为 SSOConfig 对象""" return SSOConfig( id=row['id'], tenant_id=row['tenant_id'], provider=row['provider'], status=row['status'], entity_id=row['entity_id'], sso_url=row['sso_url'], slo_url=row['slo_url'], certificate=row['certificate'], metadata_url=row['metadata_url'], metadata_xml=row['metadata_xml'], client_id=row['client_id'], client_secret=row['client_secret'], authorization_url=row['authorization_url'], token_url=row['token_url'], userinfo_url=row['userinfo_url'], scopes=json.loads(row['scopes'] or '["openid", "email", "profile"]'), attribute_mapping=json.loads(row['attribute_mapping'] or '{}'), auto_provision=bool(row['auto_provision']), default_role=row['default_role'], domain_restriction=json.loads(row['domain_restriction'] or '[]'), created_at=datetime.fromisoformat(row['created_at']) if isinstance(row['created_at'], str) else row['created_at'], updated_at=datetime.fromisoformat(row['updated_at']) if isinstance(row['updated_at'], str) else row['updated_at'], last_tested_at=datetime.fromisoformat(row['last_tested_at']) if row['last_tested_at'] and isinstance(row['last_tested_at'], str) else row['last_tested_at'], last_error=row['last_error'] ) def _row_to_saml_request(self, row: sqlite3.Row) -> SAMLAuthRequest: """数据库行转换为 SAMLAuthRequest 对象""" return SAMLAuthRequest( id=row['id'], tenant_id=row['tenant_id'], sso_config_id=row['sso_config_id'], request_id=row['request_id'], relay_state=row['relay_state'], created_at=datetime.fromisoformat(row['created_at']) if isinstance(row['created_at'], str) else row['created_at'], expires_at=datetime.fromisoformat(row['expires_at']) if isinstance(row['expires_at'], str) else row['expires_at'], used=bool(row['used']), used_at=datetime.fromisoformat(row['used_at']) if row['used_at'] and isinstance(row['used_at'], str) else row['used_at'] ) def _row_to_scim_config(self, row: sqlite3.Row) -> SCIMConfig: """数据库行转换为 SCIMConfig 对象""" return SCIMConfig( id=row['id'], tenant_id=row['tenant_id'], provider=row['provider'], status=row['status'], scim_base_url=row['scim_base_url'], scim_token=row['scim_token'], sync_interval_minutes=row['sync_interval_minutes'], last_sync_at=datetime.fromisoformat(row['last_sync_at']) if row['last_sync_at'] and isinstance(row['last_sync_at'], str) else row['last_sync_at'], last_sync_status=row['last_sync_status'], last_sync_error=row['last_sync_error'], last_sync_users_count=row['last_sync_users_count'], attribute_mapping=json.loads(row['attribute_mapping'] or '{}'), sync_rules=json.loads(row['sync_rules'] or '{}'), created_at=datetime.fromisoformat(row['created_at']) if isinstance(row['created_at'], str) else row['created_at'], updated_at=datetime.fromisoformat(row['updated_at']) if isinstance(row['updated_at'], str) else row['updated_at'] ) def _row_to_scim_user(self, row: sqlite3.Row) -> SCIMUser: """数据库行转换为 SCIMUser 对象""" return SCIMUser( id=row['id'], tenant_id=row['tenant_id'], external_id=row['external_id'], user_name=row['user_name'], email=row['email'], display_name=row['display_name'], given_name=row['given_name'], family_name=row['family_name'], active=bool(row['active']), groups=json.loads(row['groups'] or '[]'), raw_data=json.loads(row['raw_data'] or '{}'), synced_at=datetime.fromisoformat(row['synced_at']) if isinstance(row['synced_at'], str) else row['synced_at'], created_at=datetime.fromisoformat(row['created_at']) if isinstance(row['created_at'], str) else row['created_at'], updated_at=datetime.fromisoformat(row['updated_at']) if isinstance(row['updated_at'], str) else row['updated_at'] ) def _row_to_audit_export(self, row: sqlite3.Row) -> AuditLogExport: """数据库行转换为 AuditLogExport 对象""" return AuditLogExport( id=row['id'], tenant_id=row['tenant_id'], export_format=row['export_format'], start_date=datetime.fromisoformat(row['start_date']) if isinstance(row['start_date'], str) else row['start_date'], end_date=datetime.fromisoformat(row['end_date']) if isinstance(row['end_date'], str) else row['end_date'], filters=json.loads(row['filters'] or '{}'), compliance_standard=row['compliance_standard'], status=row['status'], file_path=row['file_path'], file_size=row['file_size'], record_count=row['record_count'], checksum=row['checksum'], downloaded_by=row['downloaded_by'], downloaded_at=datetime.fromisoformat(row['downloaded_at']) if row['downloaded_at'] and isinstance(row['downloaded_at'], str) else row['downloaded_at'], expires_at=datetime.fromisoformat(row['expires_at']) if isinstance(row['expires_at'], str) else row['expires_at'], created_by=row['created_by'], created_at=datetime.fromisoformat(row['created_at']) if isinstance(row['created_at'], str) else row['created_at'], completed_at=datetime.fromisoformat(row['completed_at']) if row['completed_at'] and isinstance(row['completed_at'], str) else row['completed_at'], error_message=row['error_message'] ) def _row_to_retention_policy(self, row: sqlite3.Row) -> DataRetentionPolicy: """数据库行转换为 DataRetentionPolicy 对象""" return DataRetentionPolicy( id=row['id'], tenant_id=row['tenant_id'], name=row['name'], description=row['description'], resource_type=row['resource_type'], retention_days=row['retention_days'], action=row['action'], conditions=json.loads(row['conditions'] or '{}'), auto_execute=bool(row['auto_execute']), execute_at=row['execute_at'], notify_before_days=row['notify_before_days'], archive_location=row['archive_location'], archive_encryption=bool(row['archive_encryption']), is_active=bool(row['is_active']), last_executed_at=datetime.fromisoformat(row['last_executed_at']) if row['last_executed_at'] and isinstance(row['last_executed_at'], str) else row['last_executed_at'], last_execution_result=row['last_execution_result'], created_at=datetime.fromisoformat(row['created_at']) if isinstance(row['created_at'], str) else row['created_at'], updated_at=datetime.fromisoformat(row['updated_at']) if isinstance(row['updated_at'], str) else row['updated_at'] ) def _row_to_retention_job(self, row: sqlite3.Row) -> DataRetentionJob: """数据库行转换为 DataRetentionJob 对象""" return DataRetentionJob( id=row['id'], policy_id=row['policy_id'], tenant_id=row['tenant_id'], status=row['status'], started_at=datetime.fromisoformat(row['started_at']) if row['started_at'] and isinstance(row['started_at'], str) else row['started_at'], completed_at=datetime.fromisoformat(row['completed_at']) if row['completed_at'] and isinstance(row['completed_at'], str) else row['completed_at'], affected_records=row['affected_records'], archived_records=row['archived_records'], deleted_records=row['deleted_records'], error_count=row['error_count'], details=json.loads(row['details'] or '{}'), created_at=datetime.fromisoformat(row['created_at']) if isinstance(row['created_at'], str) else row['created_at'] ) # 全局实例 _enterprise_manager = None def get_enterprise_manager(db_path: str = "insightflow.db") -> EnterpriseManager: """获取 EnterpriseManager 单例""" global _enterprise_manager if _enterprise_manager is None: _enterprise_manager = EnterpriseManager(db_path) return _enterprise_manager