""" InsightFlow Phase 7 Task 3: 数据安全与合规模块 Security Manager - 端到端加密、数据脱敏、审计日志 """ import base64 import hashlib import json import re import secrets import sqlite3 from dataclasses import asdict, dataclass, field from datetime import datetime, timedelta from enum import Enum from typing import Any # 加密相关 try: from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC CRYPTO_AVAILABLE = True except ImportError: CRYPTO_AVAILABLE = False print("Warning: cryptography not available, encryption features disabled") class AuditActionType(Enum): """审计动作类型""" CREATE = "create" READ = "read" UPDATE = "update" DELETE = "delete" LOGIN = "login" LOGOUT = "logout" EXPORT = "export" IMPORT = "import" SHARE = "share" PERMISSION_CHANGE = "permission_change" ENCRYPTION_ENABLE = "encryption_enable" ENCRYPTION_DISABLE = "encryption_disable" DATA_MASKING = "data_masking" API_KEY_CREATE = "api_key_create" API_KEY_REVOKE = "api_key_revoke" WORKFLOW_TRIGGER = "workflow_trigger" WEBHOOK_SEND = "webhook_send" BOT_MESSAGE = "bot_message" class DataSensitivityLevel(Enum): """数据敏感度级别""" PUBLIC = "public" # 公开 INTERNAL = "internal" # 内部 CONFIDENTIAL = "confidential" # 机密 SECRET = "secret" # 绝密 class MaskingRuleType(Enum): """脱敏规则类型""" PHONE = "phone" # 手机号 EMAIL = "email" # 邮箱 ID_CARD = "id_card" # 身份证号 BANK_CARD = "bank_card" # 银行卡号 NAME = "name" # 姓名 ADDRESS = "address" # 地址 CUSTOM = "custom" # 自定义 @dataclass class AuditLog: """审计日志条目""" id: str action_type: str user_id: str | None = None user_ip: str | None = None user_agent: str | None = None resource_type: str | None = None # project, entity, transcript, etc. resource_id: str | None = None action_details: str | None = None # JSON string before_value: str | None = None after_value: str | None = None success: bool = True error_message: str | None = None created_at: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> dict[str, Any]: return asdict(self) @dataclass class EncryptionConfig: """加密配置""" id: str project_id: str is_enabled: bool = False encryption_type: str = "aes-256-gcm" # aes-256-gcm, chacha20-poly1305 key_derivation: str = "pbkdf2" # pbkdf2, argon2 master_key_hash: str | None = None # 主密钥哈希(用于验证) salt: str | None = None created_at: str = field(default_factory=lambda: datetime.now().isoformat()) updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> dict[str, Any]: return asdict(self) @dataclass class MaskingRule: """脱敏规则""" id: str project_id: str name: str rule_type: str # phone, email, id_card, bank_card, name, address, custom pattern: str # 正则表达式 replacement: str # 替换模板,如 "****" is_active: bool = True priority: int = 0 description: str | None = None created_at: str = field(default_factory=lambda: datetime.now().isoformat()) updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> dict[str, Any]: return asdict(self) @dataclass class DataAccessPolicy: """数据访问策略""" id: str project_id: str name: str description: str | None = None allowed_users: str | None = None # JSON array of user IDs allowed_roles: str | None = None # JSON array of roles allowed_ips: str | None = None # JSON array of IP patterns time_restrictions: str | None = None # JSON: {"start_time": "09:00", "end_time": "18:00"} max_access_count: int | None = None # 最大访问次数 require_approval: bool = False is_active: bool = True created_at: str = field(default_factory=lambda: datetime.now().isoformat()) updated_at: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> dict[str, Any]: return asdict(self) @dataclass class AccessRequest: """访问请求(用于需要审批的访问)""" id: str policy_id: str user_id: str request_reason: str | None = None status: str = "pending" # pending, approved, rejected, expired approved_by: str | None = None approved_at: str | None = None expires_at: str | None = None created_at: str = field(default_factory=lambda: datetime.now().isoformat()) def to_dict(self) -> dict[str, Any]: return asdict(self) class SecurityManager: """安全管理器""" # 预定义脱敏规则 DEFAULT_MASKING_RULES = { MaskingRuleType.PHONE: {"pattern": r"(\d{3})\d{4}(\d{4})", "replacement": r"\1****\2"}, MaskingRuleType.EMAIL: {"pattern": r"(\w{1,3})\w+(@\w+\.\w+)", "replacement": r"\1***\2"}, MaskingRuleType.ID_CARD: {"pattern": r"(\d{6})\d{8}(\d{4})", "replacement": r"\1********\2"}, MaskingRuleType.BANK_CARD: {"pattern": r"(\d{4})\d+(\d{4})", "replacement": r"\1 **** **** \2"}, MaskingRuleType.NAME: {"pattern": r"([\u4e00-\u9fa5])[\u4e00-\u9fa5]+", "replacement": r"\1**"}, MaskingRuleType.ADDRESS: { "pattern": r"([\u4e00-\u9fa5]{2,})([\u4e00-\u9fa5]+路|街|巷|号)(.+)", "replacement": r"\1\2***", }, } def __init__(self, db_path: str = "insightflow.db"): self.db_path = db_path self.db_path = db_path # 预编译正则缓存 self._compiled_patterns: dict[str, re.Pattern] = {} self._local = {} self._init_db() def _init_db(self) -> None: """初始化数据库表""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 审计日志表 cursor.execute(""" CREATE TABLE IF NOT EXISTS audit_logs ( id TEXT PRIMARY KEY, action_type TEXT NOT NULL, user_id TEXT, user_ip TEXT, user_agent TEXT, resource_type TEXT, resource_id TEXT, action_details TEXT, before_value TEXT, after_value TEXT, success INTEGER DEFAULT 1, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 加密配置表 cursor.execute(""" CREATE TABLE IF NOT EXISTS encryption_configs ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, is_enabled INTEGER DEFAULT 0, encryption_type TEXT DEFAULT 'aes-256-gcm', key_derivation TEXT DEFAULT 'pbkdf2', master_key_hash TEXT, salt TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) # 脱敏规则表 cursor.execute(""" CREATE TABLE IF NOT EXISTS masking_rules ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, name TEXT NOT NULL, rule_type TEXT NOT NULL, pattern TEXT NOT NULL, replacement TEXT NOT NULL, is_active INTEGER DEFAULT 1, priority INTEGER DEFAULT 0, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) # 数据访问策略表 cursor.execute(""" CREATE TABLE IF NOT EXISTS data_access_policies ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT, allowed_users TEXT, allowed_roles TEXT, allowed_ips TEXT, time_restrictions TEXT, max_access_count INTEGER, require_approval INTEGER DEFAULT 0, is_active INTEGER DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (project_id) REFERENCES projects(id) ) """) # 访问请求表 cursor.execute(""" CREATE TABLE IF NOT EXISTS access_requests ( id TEXT PRIMARY KEY, policy_id TEXT NOT NULL, user_id TEXT NOT NULL, request_reason TEXT, status TEXT DEFAULT 'pending', approved_by TEXT, approved_at TIMESTAMP, expires_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (policy_id) REFERENCES data_access_policies(id) ) """) # 创建索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_audit_logs_user ON audit_logs(user_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_audit_logs_resource ON audit_logs(resource_type, resource_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_audit_logs_action ON audit_logs(action_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_audit_logs_created ON audit_logs(created_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_encryption_project ON encryption_configs(project_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_masking_project ON masking_rules(project_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_access_policy_project ON data_access_policies(project_id)") conn.commit() conn.close() def _generate_id(self) -> str: """生成唯一ID""" return hashlib.sha256(f"{datetime.now().isoformat()}{secrets.token_hex(16)}".encode()).hexdigest()[:32] # ==================== 审计日志 ==================== def log_audit( self, action_type: AuditActionType, user_id: str | None = None, user_ip: str | None = None, user_agent: str | None = None, resource_type: str | None = None, resource_id: str | None = None, action_details: dict | None = None, before_value: str | None = None, after_value: str | None = None, success: bool = True, error_message: str | None = None, ) -> AuditLog: """记录审计日志""" log = AuditLog( id=self._generate_id(), action_type=action_type.value, user_id=user_id, user_ip=user_ip, user_agent=user_agent, resource_type=resource_type, resource_id=resource_id, action_details=json.dumps(action_details) if action_details else None, before_value=before_value, after_value=after_value, success=success, error_message=error_message, ) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( """ INSERT INTO audit_logs (id, action_type, user_id, user_ip, user_agent, resource_type, resource_id, action_details, before_value, after_value, success, error_message, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( log.id, log.action_type, log.user_id, log.user_ip, log.user_agent, log.resource_type, log.resource_id, log.action_details, log.before_value, log.after_value, int(log.success), log.error_message, log.created_at, ), ) conn.commit() conn.close() return log def get_audit_logs( self, user_id: str | None = None, resource_type: str | None = None, resource_id: str | None = None, action_type: str | None = None, start_time: str | None = None, end_time: str | None = None, success: bool | None = None, limit: int = 100, offset: int = 0, ) -> list[AuditLog]: """查询审计日志""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = "SELECT * FROM audit_logs WHERE 1=1" params = [] if user_id: query += " AND user_id = ?" params.append(user_id) if resource_type: query += " AND resource_type = ?" params.append(resource_type) if resource_id: query += " AND resource_id = ?" params.append(resource_id) if action_type: query += " AND action_type = ?" params.append(action_type) if start_time: query += " AND created_at >= ?" params.append(start_time) if end_time: query += " AND created_at <= ?" params.append(end_time) if success is not None: query += " AND success = ?" params.append(int(success)) query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) rows = cursor.fetchall() conn.close() logs = [] col_names = [desc[0] for desc in cursor.description] if cursor.description else [] if not col_names: return logs for row in rows: log = AuditLog( id=row[0], action_type=row[1], user_id=row[2], user_ip=row[3], user_agent=row[4], resource_type=row[5], resource_id=row[6], action_details=row[7], before_value=row[8], after_value=row[9], success=bool(row[10]), error_message=row[11], created_at=row[12], ) logs.append(log) conn.close() return logs def get_audit_stats(self, start_time: str | None = None, end_time: str | None = None) -> dict[str, Any]: """获取审计统计""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = "SELECT action_type, success, COUNT(*) FROM audit_logs WHERE 1=1" params = [] if start_time: query += " AND created_at >= ?" params.append(start_time) if end_time: query += " AND created_at <= ?" params.append(end_time) query += " GROUP BY action_type, success" cursor.execute(query, params) rows = cursor.fetchall() stats = {"total_actions": 0, "success_count": 0, "failure_count": 0, "action_breakdown": {}} for action_type, success, count in rows: stats["total_actions"] += count if success: stats["success_count"] += count else: stats["failure_count"] += count if action_type not in stats["action_breakdown"]: stats["action_breakdown"][action_type] = {"success": 0, "failure": 0} if success: stats["action_breakdown"][action_type]["success"] += count else: stats["action_breakdown"][action_type]["failure"] += count conn.close() return stats # ==================== 端到端加密 ==================== def _derive_key(self, password: str, salt: bytes) -> bytes: """从密码派生密钥""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography library not available") kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) return base64.urlsafe_b64encode(kdf.derive(password.encode())) def enable_encryption(self, project_id: str, master_password: str) -> EncryptionConfig: """启用项目加密""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography library not available") # 生成盐值 salt = secrets.token_hex(16) # 派生密钥并哈希(用于验证) key = self._derive_key(master_password, salt.encode()) key_hash = hashlib.sha256(key).hexdigest() config = EncryptionConfig( id=self._generate_id(), project_id=project_id, is_enabled=True, encryption_type="aes-256-gcm", key_derivation="pbkdf2", master_key_hash=key_hash, salt=salt, ) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 检查是否已存在配置 cursor.execute("SELECT id FROM encryption_configs WHERE project_id = ?", (project_id,)) existing = cursor.fetchone() if existing: cursor.execute( """ UPDATE encryption_configs SET is_enabled = 1, encryption_type = ?, key_derivation = ?, master_key_hash = ?, salt = ?, updated_at = ? WHERE project_id = ? """, ( config.encryption_type, config.key_derivation, config.master_key_hash, config.salt, config.updated_at, project_id, ), ) config.id = existing[0] else: cursor.execute( """ INSERT INTO encryption_configs (id, project_id, is_enabled, encryption_type, key_derivation, master_key_hash, salt, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( config.id, config.project_id, int(config.is_enabled), config.encryption_type, config.key_derivation, config.master_key_hash, config.salt, config.created_at, config.updated_at, ), ) conn.commit() conn.close() # 记录审计日志 self.log_audit( action_type=AuditActionType.ENCRYPTION_ENABLE, resource_type="project", resource_id=project_id, action_details={"encryption_type": config.encryption_type}, ) return config def disable_encryption(self, project_id: str, master_password: str) -> bool: """禁用项目加密""" # 验证密码 if not self.verify_encryption_password(project_id, master_password): return False conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( """ UPDATE encryption_configs SET is_enabled = 0, updated_at = ? WHERE project_id = ? """, (datetime.now().isoformat(), project_id), ) conn.commit() conn.close() # 记录审计日志 self.log_audit(action_type=AuditActionType.ENCRYPTION_DISABLE, resource_type="project", resource_id=project_id) return True def verify_encryption_password(self, project_id: str, password: str) -> bool: """验证加密密码""" if not CRYPTO_AVAILABLE: return False conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT master_key_hash, salt FROM encryption_configs WHERE project_id = ?", (project_id,)) row = cursor.fetchone() conn.close() if not row: return False stored_hash, salt = row key = self._derive_key(password, salt.encode()) key_hash = hashlib.sha256(key).hexdigest() return key_hash == stored_hash def get_encryption_config(self, project_id: str) -> EncryptionConfig | None: """获取加密配置""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT * FROM encryption_configs WHERE project_id = ?", (project_id,)) row = cursor.fetchone() conn.close() if not row: return None return EncryptionConfig( id=row[0], project_id=row[1], is_enabled=bool(row[2]), encryption_type=row[3], key_derivation=row[4], master_key_hash=row[5], salt=row[6], created_at=row[7], updated_at=row[8], ) def encrypt_data(self, data: str, password: str, salt: str | None = None) -> tuple[str, str]: """加密数据""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography library not available") if salt is None: salt = secrets.token_hex(16) key = self._derive_key(password, salt.encode()) f = Fernet(key) encrypted = f.encrypt(data.encode()) return base64.b64encode(encrypted).decode(), salt def decrypt_data(self, encrypted_data: str, password: str, salt: str) -> str: """解密数据""" if not CRYPTO_AVAILABLE: raise RuntimeError("cryptography library not available") key = self._derive_key(password, salt.encode()) f = Fernet(key) decrypted = f.decrypt(base64.b64decode(encrypted_data)) return decrypted.decode() # ==================== 数据脱敏 ==================== def create_masking_rule( self, project_id: str, name: str, rule_type: MaskingRuleType, pattern: str | None = None, replacement: str | None = None, description: str | None = None, priority: int = 0, ) -> MaskingRule: """创建脱敏规则""" # 使用预定义规则或自定义规则 if rule_type in self.DEFAULT_MASKING_RULES and not pattern: default = self.DEFAULT_MASKING_RULES[rule_type] pattern = default["pattern"] replacement = replacement or default["replacement"] rule = MaskingRule( id=self._generate_id(), project_id=project_id, name=name, rule_type=rule_type.value, pattern=pattern or "", replacement=replacement or "****", description=description, priority=priority, ) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( """ INSERT INTO masking_rules (id, project_id, name, rule_type, pattern, replacement, is_active, priority, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( rule.id, rule.project_id, rule.name, rule.rule_type, rule.pattern, rule.replacement, int(rule.is_active), rule.priority, rule.description, rule.created_at, rule.updated_at, ), ) conn.commit() conn.close() # 记录审计日志 self.log_audit( action_type=AuditActionType.DATA_MASKING, resource_type="project", resource_id=project_id, action_details={"action": "create_rule", "rule_name": name}, ) return rule def get_masking_rules(self, project_id: str, active_only: bool = True) -> list[MaskingRule]: """获取脱敏规则""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = "SELECT * FROM masking_rules WHERE project_id = ?" params = [project_id] if active_only: query += " AND is_active = 1" query += " ORDER BY priority DESC" cursor.execute(query, params) rows = cursor.fetchall() conn.close() rules = [] for row in rows: rules.append( MaskingRule( id=row[0], project_id=row[1], name=row[2], rule_type=row[3], pattern=row[4], replacement=row[5], is_active=bool(row[6]), priority=row[7], description=row[8], created_at=row[9], updated_at=row[10], ) ) return rules def update_masking_rule(self, rule_id: str, **kwargs) -> MaskingRule | None: """更新脱敏规则""" allowed_fields = ["name", "pattern", "replacement", "is_active", "priority", "description"] conn = sqlite3.connect(self.db_path) cursor = conn.cursor() set_clauses = [] params = [] for key, value in kwargs.items(): if key in allowed_fields: set_clauses.append(f"{key} = ?") params.append(int(value) if key == "is_active" else value) if not set_clauses: conn.close() return None set_clauses.append("updated_at = ?") params.append(datetime.now().isoformat()) params.append(rule_id) cursor.execute( f""" UPDATE masking_rules SET {', '.join(set_clauses)} WHERE id = ? """, params, ) conn.commit() conn.close() # 获取更新后的规则 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT * FROM masking_rules WHERE id = ?", (rule_id,)) row = cursor.fetchone() conn.close() if not row: return None return MaskingRule( id=row[0], project_id=row[1], name=row[2], rule_type=row[3], pattern=row[4], replacement=row[5], is_active=bool(row[6]), priority=row[7], description=row[8], created_at=row[9], updated_at=row[10], ) def delete_masking_rule(self, rule_id: str) -> bool: """删除脱敏规则""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("DELETE FROM masking_rules WHERE id = ?", (rule_id,)) success = cursor.rowcount > 0 conn.commit() conn.close() return success def apply_masking(self, text: str, project_id: str, rule_types: list[MaskingRuleType] | None = None) -> str: """应用脱敏规则到文本""" rules = self.get_masking_rules(project_id) if not rules: return text masked_text = text for rule in rules: # 如果指定了规则类型,只应用指定类型的规则 if rule_types and MaskingRuleType(rule.rule_type) not in rule_types: continue try: masked_text = re.sub(rule.pattern, rule.replacement, masked_text) except re.error: # 忽略无效的正则表达式 continue return masked_text def apply_masking_to_entity(self, entity_data: dict[str, Any], project_id: str) -> dict[str, Any]: """对实体数据应用脱敏""" masked_data = entity_data.copy() # 对可能包含敏感信息的字段进行脱敏 sensitive_fields = ["name", "definition", "description", "value"] for f in sensitive_fields: if f in masked_data and isinstance(masked_data[f], str): masked_data[f] = self.apply_masking(masked_data[f], project_id) return masked_data # ==================== 数据访问策略 ==================== def create_access_policy( self, project_id: str, name: str, description: str | None = None, allowed_users: list[str] | None = None, allowed_roles: list[str] | None = None, allowed_ips: list[str] | None = None, time_restrictions: dict | None = None, max_access_count: int | None = None, require_approval: bool = False, ) -> DataAccessPolicy: """创建数据访问策略""" policy = DataAccessPolicy( id=self._generate_id(), project_id=project_id, name=name, description=description, allowed_users=json.dumps(allowed_users) if allowed_users else None, allowed_roles=json.dumps(allowed_roles) if allowed_roles else None, allowed_ips=json.dumps(allowed_ips) if allowed_ips else None, time_restrictions=json.dumps(time_restrictions) if time_restrictions else None, max_access_count=max_access_count, require_approval=require_approval, ) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( """ INSERT INTO data_access_policies (id, project_id, name, description, allowed_users, allowed_roles, allowed_ips, time_restrictions, max_access_count, require_approval, is_active, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( policy.id, policy.project_id, policy.name, policy.description, policy.allowed_users, policy.allowed_roles, policy.allowed_ips, policy.time_restrictions, policy.max_access_count, int(policy.require_approval), int(policy.is_active), policy.created_at, policy.updated_at, ), ) conn.commit() conn.close() return policy def get_access_policies(self, project_id: str, active_only: bool = True) -> list[DataAccessPolicy]: """获取数据访问策略""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = "SELECT * FROM data_access_policies WHERE project_id = ?" params = [project_id] if active_only: query += " AND is_active = 1" cursor.execute(query, params) rows = cursor.fetchall() conn.close() policies = [] for row in rows: policies.append( DataAccessPolicy( id=row[0], project_id=row[1], name=row[2], description=row[3], allowed_users=row[4], allowed_roles=row[5], allowed_ips=row[6], time_restrictions=row[7], max_access_count=row[8], require_approval=bool(row[9]), is_active=bool(row[10]), created_at=row[11], updated_at=row[12], ) ) return policies def check_access_permission( self, policy_id: str, user_id: str, user_ip: str | None = None ) -> tuple[bool, str | None]: """检查访问权限""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT * FROM data_access_policies WHERE id = ? AND is_active = 1", (policy_id,)) row = cursor.fetchone() conn.close() if not row: return False, "Policy not found or inactive" policy = DataAccessPolicy( id=row[0], project_id=row[1], name=row[2], description=row[3], allowed_users=row[4], allowed_roles=row[5], allowed_ips=row[6], time_restrictions=row[7], max_access_count=row[8], require_approval=bool(row[9]), is_active=bool(row[10]), created_at=row[11], updated_at=row[12], ) # 检查用户白名单 if policy.allowed_users: allowed = json.loads(policy.allowed_users) if user_id not in allowed: return False, "User not in allowed list" # 检查IP白名单 if policy.allowed_ips and user_ip: allowed_ips = json.loads(policy.allowed_ips) ip_allowed = False for ip_pattern in allowed_ips: if self._match_ip_pattern(user_ip, ip_pattern): ip_allowed = True break if not ip_allowed: return False, "IP not in allowed list" # 检查时间限制 if policy.time_restrictions: restrictions = json.loads(policy.time_restrictions) now = datetime.now() if "start_time" in restrictions and "end_time" in restrictions: current_time = now.strftime("%H:%M") if not (restrictions["start_time"] <= current_time <= restrictions["end_time"]): return False, "Access not allowed at this time" if "days_of_week" in restrictions: if now.weekday() not in restrictions["days_of_week"]: return False, "Access not allowed on this day" # 检查是否需要审批 if policy.require_approval: # 检查是否有有效的访问请求 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( """ SELECT * FROM access_requests WHERE policy_id = ? AND user_id = ? AND status = 'approved' AND (expires_at IS NULL OR expires_at > ?) """, (policy_id, user_id, datetime.now().isoformat()), ) request = cursor.fetchone() conn.close() if not request: return False, "Access requires approval" return True, None def _match_ip_pattern(self, ip: str, pattern: str) -> bool: """匹配IP模式(支持CIDR)""" import ipaddress try: if "/" in pattern: # CIDR 表示法 network = ipaddress.ip_network(pattern, strict=False) return ipaddress.ip_address(ip) in network else: # 精确匹配 return ip == pattern except ValueError: return ip == pattern def create_access_request( self, policy_id: str, user_id: str, request_reason: str | None = None, expires_hours: int = 24 ) -> AccessRequest: """创建访问请求""" request = AccessRequest( id=self._generate_id(), policy_id=policy_id, user_id=user_id, request_reason=request_reason, expires_at=(datetime.now() + timedelta(hours=expires_hours)).isoformat(), ) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( """ INSERT INTO access_requests (id, policy_id, user_id, request_reason, status, expires_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( request.id, request.policy_id, request.user_id, request.request_reason, request.status, request.expires_at, request.created_at, ), ) conn.commit() conn.close() return request def approve_access_request( self, request_id: str, approved_by: str, expires_hours: int = 24 ) -> AccessRequest | None: """批准访问请求""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() expires_at = (datetime.now() + timedelta(hours=expires_hours)).isoformat() approved_at = datetime.now().isoformat() cursor.execute( """ UPDATE access_requests SET status = 'approved', approved_by = ?, approved_at = ?, expires_at = ? WHERE id = ? """, (approved_by, approved_at, expires_at, request_id), ) conn.commit() # 获取更新后的请求 cursor.execute("SELECT * FROM access_requests WHERE id = ?", (request_id,)) row = cursor.fetchone() conn.close() if not row: return None return AccessRequest( id=row[0], policy_id=row[1], user_id=row[2], request_reason=row[3], status=row[4], approved_by=row[5], approved_at=row[6], expires_at=row[7], created_at=row[8], ) def reject_access_request(self, request_id: str, rejected_by: str) -> AccessRequest | None: """拒绝访问请求""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( """ UPDATE access_requests SET status = 'rejected', approved_by = ? WHERE id = ? """, (rejected_by, request_id), ) conn.commit() cursor.execute("SELECT * FROM access_requests WHERE id = ?", (request_id,)) row = cursor.fetchone() conn.close() if not row: return None return AccessRequest( id=row[0], policy_id=row[1], user_id=row[2], request_reason=row[3], status=row[4], approved_by=row[5], approved_at=row[6], expires_at=row[7], created_at=row[8], ) # 全局安全管理器实例 _security_manager = None def get_security_manager(db_path: str = "insightflow.db") -> SecurityManager: """获取安全管理器实例""" global _security_manager if _security_manager is None: _security_manager = SecurityManager(db_path) return _security_manager