#!/usr/bin/env python3 """ InsightFlow Database Manager - Phase 5 处理项目、实体、关系的持久化 支持实体属性扩展 """ import json import os import sqlite3 import uuid from dataclasses import dataclass from datetime import datetime DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db") # Constants UUID_LENGTH = 8 # UUID 截断长度 @dataclass class Project: id: str name: str description: str = "" created_at: str = "" updated_at: str = "" @dataclass class Entity: id: str project_id: str name: str type: str definition: str = "" canonical_name: str = "" aliases: list[str] = None embedding: str = "" # Phase 3: 实体嵌入向量 attributes: dict = None # Phase 5: 实体属性 created_at: str = "" updated_at: str = "" def __post_init__(self) -> None: if self.aliases is None: self.aliases = [] if self.attributes is None: self.attributes = {} @dataclass class AttributeTemplate: """属性模板定义""" id: str project_id: str name: str type: str # text, number, date, select, multiselect, boolean options: list[str] = None # 用于 select/multiselect default_value: str = "" description: str = "" is_required: bool = False sort_order: int = 0 created_at: str = "" updated_at: str = "" def __post_init__(self) -> None: if self.options is None: self.options = [] @dataclass class EntityAttribute: """实体属性值""" id: str entity_id: str template_id: str | None = None name: str = "" # 属性名称 type: str = "text" # 属性类型 value: str = "" options: list[str] = None # 选项列表 template_name: str = "" # 关联查询时填充 template_type: str = "" # 关联查询时填充 created_at: str = "" updated_at: str = "" def __post_init__(self) -> None: if self.options is None: self.options = [] @dataclass class AttributeHistory: """属性变更历史""" id: str entity_id: str attribute_name: str = "" # 属性名称 old_value: str = "" new_value: str = "" changed_by: str = "" changed_at: str = "" change_reason: str = "" @dataclass class EntityMention: id: str entity_id: str transcript_id: str start_pos: int end_pos: int text_snippet: str confidence: float = 1.0 class DatabaseManager: def __init__(self, db_path: str = DB_PATH) -> None: self.db_path = db_path os.makedirs(os.path.dirname(db_path), exist_ok=True) self.init_db() def get_conn(self) -> None: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def init_db(self) -> None: """初始化数据库表""" with open(os.path.join(os.path.dirname(__file__), "schema.sql")) as f: schema = f.read() conn = self.get_conn() conn.executescript(schema) conn.commit() conn.close() # ==================== Project Operations ==================== def create_project(self, project_id: str, name: str, description: str = "") -> Project: conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT INTO projects (id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?)""", (project_id, name, description, now, now), ) conn.commit() conn.close() return Project( id=project_id, name=name, description=description, created_at=now, updated_at=now, ) def get_project(self, project_id: str) -> Project | None: conn = self.get_conn() row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone() conn.close() if row: return Project(**dict(row)) return None def list_projects(self) -> list[Project]: conn = self.get_conn() rows = conn.execute("SELECT * FROM projects ORDER BY updated_at DESC").fetchall() conn.close() return [Project(**dict(r)) for r in rows] # ==================== Entity Operations ==================== def create_entity(self, entity: Entity) -> Entity: conn = self.get_conn() conn.execute( """INSERT INTO entities (id, project_id, name, canonical_name, type, definition, aliases, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( entity.id, entity.project_id, entity.name, entity.canonical_name, entity.type, entity.definition, json.dumps(entity.aliases), datetime.now().isoformat(), datetime.now().isoformat(), ), ) conn.commit() conn.close() return entity def get_entity_by_name(self, project_id: str, name: str) -> Entity | None: """通过名称查找实体(用于对齐)""" conn = self.get_conn() row = conn.execute( """SELECT * FROM entities WHERE project_id = ? AND (name = ? OR canonical_name = ? OR aliases LIKE ?)""", (project_id, name, name, f'%"{name}"%'), ).fetchone() conn.close() if row: data = dict(row) data["aliases"] = json.loads(data["aliases"]) if data["aliases"] else [] return Entity(**data) return None def find_similar_entities( self, project_id: str, name: str, threshold: float = 0.8, ) -> list[Entity]: """查找相似实体""" conn = self.get_conn() rows = conn.execute( "SELECT * FROM entities WHERE project_id = ? AND name LIKE ?", (project_id, f"%{name}%"), ).fetchall() conn.close() entities = [] for row in rows: data = dict(row) data["aliases"] = json.loads(data["aliases"]) if data["aliases"] else [] entities.append(Entity(**data)) return entities def merge_entities(self, target_id: str, source_id: str) -> Entity: """合并两个实体""" conn = self.get_conn() target = conn.execute("SELECT * FROM entities WHERE id = ?", (target_id,)).fetchone() source = conn.execute("SELECT * FROM entities WHERE id = ?", (source_id,)).fetchone() if not target or not source: conn.close() raise ValueError("Entity not found") target_aliases = set(json.loads(target["aliases"]) if target["aliases"] else []) target_aliases.add(source["name"]) target_aliases.update(json.loads(source["aliases"]) if source["aliases"] else []) conn.execute( "UPDATE entities SET aliases = ?, updated_at = ? WHERE id = ?", (json.dumps(list(target_aliases)), datetime.now().isoformat(), target_id), ) conn.execute( "UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?", (target_id, source_id), ) conn.execute( "UPDATE entity_relations SET source_entity_id = ? WHERE source_entity_id = ?", (target_id, source_id), ) conn.execute( "UPDATE entity_relations SET target_entity_id = ? WHERE target_entity_id = ?", (target_id, source_id), ) conn.execute("DELETE FROM entities WHERE id = ?", (source_id,)) conn.commit() conn.close() return self.get_entity(target_id) def get_entity(self, entity_id: str) -> Entity | None: conn = self.get_conn() row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone() conn.close() if row: data = dict(row) data["aliases"] = json.loads(data["aliases"]) if data["aliases"] else [] return Entity(**data) return None def list_project_entities(self, project_id: str) -> list[Entity]: conn = self.get_conn() rows = conn.execute( "SELECT * FROM entities WHERE project_id = ? ORDER BY updated_at DESC", (project_id,), ).fetchall() conn.close() entities = [] for row in rows: data = dict(row) data["aliases"] = json.loads(data["aliases"]) if data["aliases"] else [] entities.append(Entity(**data)) return entities def update_entity(self, entity_id: str, **kwargs) -> Entity: """更新实体信息""" conn = self.get_conn() allowed_fields = ["name", "type", "definition", "canonical_name"] updates = [] values = [] for field in allowed_fields: if field in kwargs: updates.append(f"{field} = ?") values.append(kwargs[field]) if "aliases" in kwargs: updates.append("aliases = ?") values.append(json.dumps(kwargs["aliases"])) if not updates: conn.close() return self.get_entity(entity_id) updates.append("updated_at = ?") values.append(datetime.now().isoformat()) values.append(entity_id) query = f"UPDATE entities SET {', '.join(updates)} WHERE id = ?" conn.execute(query, values) conn.commit() conn.close() return self.get_entity(entity_id) def delete_entity(self, entity_id: str) -> None: """删除实体及其关联数据""" conn = self.get_conn() conn.execute("DELETE FROM entity_mentions WHERE entity_id = ?", (entity_id,)) conn.execute( "DELETE FROM entity_relations WHERE source_entity_id = ? OR target_entity_id = ?", (entity_id, entity_id), ) conn.execute("DELETE FROM entity_attributes WHERE entity_id = ?", (entity_id,)) conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,)) conn.commit() conn.close() # ==================== Mention Operations ==================== def add_mention(self, mention: EntityMention) -> EntityMention: conn = self.get_conn() conn.execute( """INSERT INTO entity_mentions (id, entity_id, transcript_id, start_pos, end_pos, text_snippet, confidence) VALUES (?, ?, ?, ?, ?, ?, ?)""", ( mention.id, mention.entity_id, mention.transcript_id, mention.start_pos, mention.end_pos, mention.text_snippet, mention.confidence, ), ) conn.commit() conn.close() return mention def get_entity_mentions(self, entity_id: str) -> list[EntityMention]: conn = self.get_conn() rows = conn.execute( "SELECT * FROM entity_mentions WHERE entity_id = ? ORDER BY transcript_id, start_pos", (entity_id,), ).fetchall() conn.close() return [EntityMention(**dict(r)) for r in rows] # ==================== Transcript Operations ==================== def save_transcript( self, transcript_id: str, project_id: str, filename: str, full_text: str, transcript_type: str = "audio", ) -> None: conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT INTO transcripts (id, project_id, filename, full_text, type, created_at) VALUES (?, ?, ?, ?, ?, ?)""", (transcript_id, project_id, filename, full_text, transcript_type, now), ) conn.commit() conn.close() def get_transcript(self, transcript_id: str) -> dict | None: conn = self.get_conn() row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone() conn.close() return dict(row) if row else None def list_project_transcripts(self, project_id: str) -> list[dict]: conn = self.get_conn() rows = conn.execute( "SELECT * FROM transcripts WHERE project_id = ? ORDER BY created_at DESC", (project_id,), ).fetchall() conn.close() return [dict(r) for r in rows] def update_transcript(self, transcript_id: str, full_text: str) -> dict: conn = self.get_conn() now = datetime.now().isoformat() conn.execute( "UPDATE transcripts SET full_text = ?, updated_at = ? WHERE id = ?", (full_text, now, transcript_id), ) conn.commit() row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone() conn.close() return dict(row) if row else None # ==================== Relation Operations ==================== def create_relation( self, project_id: str, source_entity_id: str, target_entity_id: str, relation_type: str = "related", evidence: str = "", transcript_id: str = "", ) -> None: conn = self.get_conn() relation_id = str(uuid.uuid4())[:UUID_LENGTH] now = datetime.now().isoformat() conn.execute( """INSERT INTO entity_relations (id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( relation_id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, now, ), ) conn.commit() conn.close() return relation_id def get_entity_relations(self, entity_id: str) -> list[dict]: conn = self.get_conn() rows = conn.execute( """SELECT * FROM entity_relations WHERE source_entity_id = ? OR target_entity_id = ? ORDER BY created_at DESC""", (entity_id, entity_id), ).fetchall() conn.close() return [dict(r) for r in rows] def list_project_relations(self, project_id: str) -> list[dict]: conn = self.get_conn() rows = conn.execute( "SELECT * FROM entity_relations WHERE project_id = ? ORDER BY created_at DESC", (project_id,), ).fetchall() conn.close() return [dict(r) for r in rows] def update_relation(self, relation_id: str, **kwargs) -> dict: conn = self.get_conn() allowed_fields = ["relation_type", "evidence"] updates = [] values = [] for field in allowed_fields: if field in kwargs: updates.append(f"{field} = ?") values.append(kwargs[field]) if updates: query = f"UPDATE entity_relations SET {', '.join(updates)} WHERE id = ?" values.append(relation_id) conn.execute(query, values) conn.commit() row = conn.execute( "SELECT * FROM entity_relations WHERE id = ?", (relation_id,), ).fetchone() conn.close() return dict(row) if row else None def delete_relation(self, relation_id: str) -> None: conn = self.get_conn() conn.execute("DELETE FROM entity_relations WHERE id = ?", (relation_id,)) conn.commit() conn.close() # ==================== Glossary Operations ==================== def add_glossary_term(self, project_id: str, term: str, pronunciation: str = "") -> str: conn = self.get_conn() existing = conn.execute( "SELECT * FROM glossary WHERE project_id = ? AND term = ?", (project_id, term), ).fetchone() if existing: conn.execute( "UPDATE glossary SET frequency = frequency + 1 WHERE id = ?", (existing["id"],), ) conn.commit() conn.close() return existing["id"] term_id = str(uuid.uuid4())[:UUID_LENGTH] conn.execute( """INSERT INTO glossary (id, project_id, term, pronunciation, frequency) VALUES (?, ?, ?, ?, ?)""", (term_id, project_id, term, pronunciation, 1), ) conn.commit() conn.close() return term_id def list_glossary(self, project_id: str) -> list[dict]: conn = self.get_conn() rows = conn.execute( "SELECT * FROM glossary WHERE project_id = ? ORDER BY frequency DESC", (project_id,), ).fetchall() conn.close() return [dict(r) for r in rows] def delete_glossary_term(self, term_id: str) -> None: conn = self.get_conn() conn.execute("DELETE FROM glossary WHERE id = ?", (term_id,)) conn.commit() conn.close() # ==================== Phase 4: Agent & Provenance ==================== def get_relation_with_details(self, relation_id: str) -> dict | None: conn = self.get_conn() row = conn.execute( """SELECT r.*, s.name as source_name, t.name as target_name, tr.filename as transcript_filename, tr.full_text as transcript_text FROM entity_relations r JOIN entities s ON r.source_entity_id = s.id JOIN entities t ON r.target_entity_id = t.id LEFT JOIN transcripts tr ON r.transcript_id = tr.id WHERE r.id = ?""", (relation_id,), ).fetchone() conn.close() return dict(row) if row else None def get_entity_with_mentions(self, entity_id: str) -> dict | None: conn = self.get_conn() entity_row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone() if not entity_row: conn.close() return None entity = dict(entity_row) entity["aliases"] = json.loads(entity["aliases"]) if entity["aliases"] else [] mentions = conn.execute( """SELECT m.*, t.filename, t.created_at as transcript_date FROM entity_mentions m JOIN transcripts t ON m.transcript_id = t.id WHERE m.entity_id = ? ORDER BY t.created_at, m.start_pos""", (entity_id,), ).fetchall() entity["mentions"] = [dict(m) for m in mentions] entity["mention_count"] = len(mentions) relations = conn.execute( """SELECT r.*, s.name as source_name, t.name as target_name FROM entity_relations r JOIN entities s ON r.source_entity_id = s.id JOIN entities t ON r.target_entity_id = t.id WHERE r.source_entity_id = ? OR r.target_entity_id = ? ORDER BY r.created_at DESC""", (entity_id, entity_id), ).fetchall() entity["relations"] = [dict(r) for r in relations] conn.close() return entity def search_entities(self, project_id: str, query: str) -> list[Entity]: conn = self.get_conn() rows = conn.execute( """SELECT * FROM entities WHERE project_id = ? AND (name LIKE ? OR definition LIKE ? OR aliases LIKE ?) ORDER BY name""", (project_id, f"%{query}%", f"%{query}%", f"%{query}%"), ).fetchall() conn.close() entities = [] for row in rows: data = dict(row) data["aliases"] = json.loads(data["aliases"]) if data["aliases"] else [] entities.append(Entity(**data)) return entities def get_project_summary(self, project_id: str) -> dict: conn = self.get_conn() project = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone() entity_count = conn.execute( "SELECT COUNT(*) as count FROM entities WHERE project_id = ?", (project_id,), ).fetchone()["count"] transcript_count = conn.execute( "SELECT COUNT(*) as count FROM transcripts WHERE project_id = ?", (project_id,), ).fetchone()["count"] relation_count = conn.execute( "SELECT COUNT(*) as count FROM entity_relations WHERE project_id = ?", (project_id,), ).fetchone()["count"] recent_transcripts = conn.execute( """SELECT filename, full_text, created_at FROM transcripts WHERE project_id = ? ORDER BY created_at DESC LIMIT 5""", (project_id,), ).fetchall() top_entities = conn.execute( """SELECT e.name, e.type, e.definition, COUNT(m.id) as mention_count FROM entities e LEFT JOIN entity_mentions m ON e.id = m.entity_id WHERE e.project_id = ? GROUP BY e.id ORDER BY mention_count DESC LIMIT 10""", (project_id,), ).fetchall() conn.close() return { "project": dict(project) if project else {}, "statistics": { "entity_count": entity_count, "transcript_count": transcript_count, "relation_count": relation_count, }, "recent_transcripts": [dict(t) for t in recent_transcripts], "top_entities": [dict(e) for e in top_entities], } def get_transcript_context( self, transcript_id: str, position: int, context_chars: int = 200, ) -> str: conn = self.get_conn() row = conn.execute( "SELECT full_text FROM transcripts WHERE id = ?", (transcript_id,), ).fetchone() conn.close() if not row: return "" text = row["full_text"] start = max(0, position - context_chars) end = min(len(text), position + context_chars) return text[start:end] # ==================== Phase 5: Timeline Operations ==================== def get_project_timeline( self, project_id: str, entity_id: str = None, start_date: str = None, end_date: str = None, ) -> list[dict]: conn = self.get_conn() conditions = ["t.project_id = ?"] params = [project_id] if entity_id: conditions.append("m.entity_id = ?") params.append(entity_id) if start_date: conditions.append("t.created_at >= ?") params.append(start_date) if end_date: conditions.append("t.created_at <= ?") params.append(end_date) where_clause = " AND ".join(conditions) mentions = conn.execute( f"""SELECT m.*, e.name as entity_name, e.type as entity_type, e.definition, t.filename, t.created_at as event_date, t.type as source_type FROM entity_mentions m JOIN entities e ON m.entity_id = e.id JOIN transcripts t ON m.transcript_id = t.id WHERE {where_clause} ORDER BY t.created_at, m.start_pos""", params, ).fetchall() timeline_events = [] for m in mentions: timeline_events.append( { "id": m["id"], "type": "mention", "event_date": m["event_date"], "entity_id": m["entity_id"], "entity_name": m["entity_name"], "entity_type": m["entity_type"], "text_snippet": m["text_snippet"], "confidence": m["confidence"], "source": { "transcript_id": m["transcript_id"], "filename": m["filename"], "type": m["source_type"], }, }, ) conn.close() timeline_events.sort(key=lambda x: x["event_date"]) return timeline_events def get_entity_timeline_summary(self, project_id: str) -> dict: conn = self.get_conn() daily_stats = conn.execute( """SELECT DATE(t.created_at) as date, COUNT(*) as count FROM entity_mentions m JOIN transcripts t ON m.transcript_id = t.id WHERE t.project_id = ? GROUP BY DATE(t.created_at) ORDER BY date""", (project_id,), ).fetchall() entity_stats = conn.execute( """SELECT e.name, e.type, COUNT(m.id) as mention_count, MIN(t.created_at) as first_mentioned, MAX(t.created_at) as last_mentioned FROM entities e LEFT JOIN entity_mentions m ON e.id = m.entity_id LEFT JOIN transcripts t ON m.transcript_id = t.id WHERE e.project_id = ? GROUP BY e.id ORDER BY mention_count DESC LIMIT 20""", (project_id,), ).fetchall() conn.close() return { "daily_activity": [dict(d) for d in daily_stats], "top_entities": [dict(e) for e in entity_stats], } # ==================== Phase 5: Entity Attributes ==================== def create_attribute_template(self, template: AttributeTemplate) -> AttributeTemplate: conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT INTO attribute_templates (id, project_id, name, type, options, default_value, description, is_required, sort_order, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( template.id, template.project_id, template.name, template.type, json.dumps(template.options) if template.options else None, template.default_value, template.description, template.is_required, template.sort_order, now, now, ), ) conn.commit() conn.close() return template def get_attribute_template(self, template_id: str) -> AttributeTemplate | None: conn = self.get_conn() row = conn.execute( "SELECT * FROM attribute_templates WHERE id = ?", (template_id,), ).fetchone() conn.close() if row: data = dict(row) data["options"] = json.loads(data["options"]) if data["options"] else [] return AttributeTemplate(**data) return None def list_attribute_templates(self, project_id: str) -> list[AttributeTemplate]: conn = self.get_conn() rows = conn.execute( """SELECT * FROM attribute_templates WHERE project_id = ? ORDER BY sort_order, created_at""", (project_id,), ).fetchall() conn.close() templates = [] for row in rows: data = dict(row) data["options"] = json.loads(data["options"]) if data["options"] else [] templates.append(AttributeTemplate(**data)) return templates def update_attribute_template(self, template_id: str, **kwargs) -> AttributeTemplate | None: conn = self.get_conn() allowed_fields = [ "name", "type", "options", "default_value", "description", "is_required", "sort_order", ] updates = [] values = [] for field in allowed_fields: if field in kwargs: updates.append(f"{field} = ?") if field == "options": values.append(json.dumps(kwargs[field]) if kwargs[field] else None) else: values.append(kwargs[field]) if updates: updates.append("updated_at = ?") values.append(datetime.now().isoformat()) values.append(template_id) query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?" conn.execute(query, values) conn.commit() conn.close() return self.get_attribute_template(template_id) def delete_attribute_template(self, template_id: str) -> None: conn = self.get_conn() conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,)) conn.commit() conn.close() def set_entity_attribute( self, attr: EntityAttribute, changed_by: str = "system", change_reason: str = "", ) -> EntityAttribute: conn = self.get_conn() now = datetime.now().isoformat() old_row = conn.execute( "SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?", (attr.entity_id, attr.template_id), ).fetchone() old_value = old_row["value"] if old_row else None if old_value != attr.value: conn.execute( """INSERT INTO attribute_history (id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( str(uuid.uuid4())[:UUID_LENGTH], attr.entity_id, attr.template_id, old_value, attr.value, changed_by, now, change_reason, ), ) conn.execute( """INSERT OR REPLACE INTO entity_attributes (id, entity_id, template_id, value, created_at, updated_at) VALUES ( COALESCE( (SELECT id FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ? ), ?, ?, ?, COALESCE( (SELECT created_at FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ? ), ?)""", ( attr.entity_id, attr.template_id, attr.id, attr.entity_id, attr.template_id, attr.value, attr.entity_id, attr.template_id, now, now, ), ) conn.commit() conn.close() return attr def get_entity_attributes(self, entity_id: str) -> list[EntityAttribute]: conn = self.get_conn() rows = conn.execute( """SELECT ea.*, at.name as template_name, at.type as template_type FROM entity_attributes ea LEFT JOIN attribute_templates at ON ea.template_id = at.id WHERE ea.entity_id = ? ORDER BY ea.created_at""", (entity_id,), ).fetchall() conn.close() return [EntityAttribute(**dict(r)) for r in rows] def get_entity_with_attributes(self, entity_id: str) -> Entity | None: entity = self.get_entity(entity_id) if not entity: return None attrs = self.get_entity_attributes(entity_id) entity.attributes = { attr.template_name: { "value": attr.value, "type": attr.template_type, "template_id": attr.template_id, } for attr in attrs } return entity def delete_entity_attribute( self, entity_id: str, template_id: str, changed_by: str = "system", change_reason: str = "", ) -> None: conn = self.get_conn() old_row = conn.execute( """SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?""", (entity_id, template_id), ).fetchone() if old_row: conn.execute( """INSERT INTO attribute_history (id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( str(uuid.uuid4())[:UUID_LENGTH], entity_id, template_id, old_row["value"], None, changed_by, datetime.now().isoformat(), change_reason or "属性删除", ), ) conn.execute( "DELETE FROM entity_attributes WHERE entity_id = ? AND template_id = ?", (entity_id, template_id), ) conn.commit() conn.close() def get_attribute_history( self, entity_id: str = None, template_id: str = None, limit: int = 50, ) -> list[AttributeHistory]: conn = self.get_conn() conditions = [] params = [] if entity_id: conditions.append("ah.entity_id = ?") params.append(entity_id) if template_id: conditions.append("ah.template_id = ?") params.append(template_id) where_clause = " AND ".join(conditions) if conditions else "1 = 1" rows = conn.execute( f"""SELECT ah.* FROM attribute_history ah WHERE {where_clause} ORDER BY ah.changed_at DESC LIMIT ?""", params + [limit], ).fetchall() conn.close() return [AttributeHistory(**dict(r)) for r in rows] def search_entities_by_attributes( self, project_id: str, attribute_filters: dict[str, str], ) -> list[Entity]: entities = self.list_project_entities(project_id) if not attribute_filters: return entities entity_ids = [e.id for e in entities] if not entity_ids: return [] conn = self.get_conn() placeholders = ", ".join(["?" for _ in entity_ids]) rows = conn.execute( f"""SELECT ea.*, at.name as template_name FROM entity_attributes ea JOIN attribute_templates at ON ea.template_id = at.id WHERE ea.entity_id IN ({placeholders})""", entity_ids, ).fetchall() conn.close() entity_attrs = {} for row in rows: eid = row["entity_id"] if eid not in entity_attrs: entity_attrs[eid] = {} entity_attrs[eid][row["template_name"]] = row["value"] filtered = [] for entity in entities: attrs = entity_attrs.get(entity.id, {}) match = True for attr_name, attr_value in attribute_filters.items(): if attrs.get(attr_name) != attr_value: match = False break if match: entity.attributes = attrs filtered.append(entity) return filtered # ==================== Phase 7: Multimodal Support ==================== def create_video( self, video_id: str, project_id: str, filename: str, duration: float = 0, fps: float = 0, resolution: dict = None, audio_transcript_id: str = None, full_ocr_text: str = "", extracted_entities: list[dict] = None, extracted_relations: list[dict] = None, ) -> str: """创建视频记录""" conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT INTO videos (id, project_id, filename, duration, fps, resolution, audio_transcript_id, full_ocr_text, extracted_entities, extracted_relations, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( video_id, project_id, filename, duration, fps, json.dumps(resolution) if resolution else None, audio_transcript_id, full_ocr_text, json.dumps(extracted_entities or []), json.dumps(extracted_relations or []), "completed", now, now, ), ) conn.commit() conn.close() return video_id def get_video(self, video_id: str) -> dict | None: """获取视频信息""" conn = self.get_conn() row = conn.execute("SELECT * FROM videos WHERE id = ?", (video_id,)).fetchone() conn.close() if row: data = dict(row) data["resolution"] = json.loads(data["resolution"]) if data["resolution"] else None data["extracted_entities"] = ( json.loads(data["extracted_entities"]) if data["extracted_entities"] else [] ) data["extracted_relations"] = ( json.loads(data["extracted_relations"]) if data["extracted_relations"] else [] ) return data return None def list_project_videos(self, project_id: str) -> list[dict]: """获取项目的所有视频""" conn = self.get_conn() rows = conn.execute( "SELECT * FROM videos WHERE project_id = ? ORDER BY created_at DESC", (project_id,), ).fetchall() conn.close() videos = [] for row in rows: data = dict(row) data["resolution"] = json.loads(data["resolution"]) if data["resolution"] else None data["extracted_entities"] = ( json.loads(data["extracted_entities"]) if data["extracted_entities"] else [] ) data["extracted_relations"] = ( json.loads(data["extracted_relations"]) if data["extracted_relations"] else [] ) videos.append(data) return videos def create_video_frame( self, frame_id: str, video_id: str, frame_number: int, timestamp: float, image_url: str = None, ocr_text: str = None, extracted_entities: list[dict] = None, ) -> str: """创建视频帧记录""" conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT INTO video_frames (id, video_id, frame_number, timestamp, image_url, ocr_text, extracted_entities, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( frame_id, video_id, frame_number, timestamp, image_url, ocr_text, json.dumps(extracted_entities or []), now, ), ) conn.commit() conn.close() return frame_id def get_video_frames(self, video_id: str) -> list[dict]: """获取视频的所有帧""" conn = self.get_conn() rows = conn.execute( """SELECT * FROM video_frames WHERE video_id = ? ORDER BY timestamp""", (video_id,), ).fetchall() conn.close() frames = [] for row in rows: data = dict(row) data["extracted_entities"] = ( json.loads(data["extracted_entities"]) if data["extracted_entities"] else [] ) frames.append(data) return frames def create_image( self, image_id: str, project_id: str, filename: str, ocr_text: str = "", description: str = "", extracted_entities: list[dict] = None, extracted_relations: list[dict] = None, ) -> str: """创建图片记录""" conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT INTO images (id, project_id, filename, ocr_text, description, extracted_entities, extracted_relations, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( image_id, project_id, filename, ocr_text, description, json.dumps(extracted_entities or []), json.dumps(extracted_relations or []), "completed", now, now, ), ) conn.commit() conn.close() return image_id def get_image(self, image_id: str) -> dict | None: """获取图片信息""" conn = self.get_conn() row = conn.execute("SELECT * FROM images WHERE id = ?", (image_id,)).fetchone() conn.close() if row: data = dict(row) data["extracted_entities"] = ( json.loads(data["extracted_entities"]) if data["extracted_entities"] else [] ) data["extracted_relations"] = ( json.loads(data["extracted_relations"]) if data["extracted_relations"] else [] ) return data return None def list_project_images(self, project_id: str) -> list[dict]: """获取项目的所有图片""" conn = self.get_conn() rows = conn.execute( "SELECT * FROM images WHERE project_id = ? ORDER BY created_at DESC", (project_id,), ).fetchall() conn.close() images = [] for row in rows: data = dict(row) data["extracted_entities"] = ( json.loads(data["extracted_entities"]) if data["extracted_entities"] else [] ) data["extracted_relations"] = ( json.loads(data["extracted_relations"]) if data["extracted_relations"] else [] ) images.append(data) return images def create_multimodal_mention( self, mention_id: str, project_id: str, entity_id: str, modality: str, source_id: str, source_type: str, text_snippet: str = "", confidence: float = 1.0, ) -> str: """创建多模态实体提及记录""" conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT OR REPLACE INTO multimodal_mentions (id, project_id, entity_id, modality, source_id, source_type, text_snippet, confidence, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( mention_id, project_id, entity_id, modality, source_id, source_type, text_snippet, confidence, now, ), ) conn.commit() conn.close() return mention_id def get_entity_multimodal_mentions(self, entity_id: str) -> list[dict]: """获取实体的多模态提及""" conn = self.get_conn() rows = conn.execute( """SELECT m.*, e.name as entity_name FROM multimodal_mentions m JOIN entities e ON m.entity_id = e.id WHERE m.entity_id = ? ORDER BY m.created_at DESC""", (entity_id,), ).fetchall() conn.close() return [dict(r) for r in rows] def get_project_multimodal_mentions(self, project_id: str, modality: str = None) -> list[dict]: """获取项目的多模态提及""" conn = self.get_conn() if modality: rows = conn.execute( """SELECT m.*, e.name as entity_name FROM multimodal_mentions m JOIN entities e ON m.entity_id = e.id WHERE m.project_id = ? AND m.modality = ? ORDER BY m.created_at DESC""", (project_id, modality), ).fetchall() else: rows = conn.execute( """SELECT m.*, e.name as entity_name FROM multimodal_mentions m JOIN entities e ON m.entity_id = e.id WHERE m.project_id = ? ORDER BY m.created_at DESC""", (project_id,), ).fetchall() conn.close() return [dict(r) for r in rows] def create_multimodal_entity_link( self, link_id: str, entity_id: str, linked_entity_id: str, link_type: str, confidence: float = 1.0, evidence: str = "", modalities: list[str] = None, ) -> str: """创建多模态实体关联""" conn = self.get_conn() now = datetime.now().isoformat() conn.execute( """INSERT OR REPLACE INTO multimodal_entity_links (id, entity_id, linked_entity_id, link_type, confidence, evidence, modalities, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( link_id, entity_id, linked_entity_id, link_type, confidence, evidence, json.dumps(modalities or []), now, ), ) conn.commit() conn.close() return link_id def get_entity_multimodal_links(self, entity_id: str) -> list[dict]: """获取实体的多模态关联""" conn = self.get_conn() rows = conn.execute( """SELECT l.*, e1.name as entity_name, e2.name as linked_entity_name FROM multimodal_entity_links l JOIN entities e1 ON l.entity_id = e1.id JOIN entities e2 ON l.linked_entity_id = e2.id WHERE l.entity_id = ? OR l.linked_entity_id = ?""", (entity_id, entity_id), ).fetchall() conn.close() links = [] for row in rows: data = dict(row) data["modalities"] = json.loads(data["modalities"]) if data["modalities"] else [] links.append(data) return links def get_project_multimodal_stats(self, project_id: str) -> dict: """获取项目多模态统计信息""" conn = self.get_conn() stats = { "video_count": 0, "image_count": 0, "multimodal_entity_count": 0, "cross_modal_links": 0, "modality_distribution": {}, } # 视频数量 row = conn.execute( "SELECT COUNT(*) as count FROM videos WHERE project_id = ?", (project_id,), ).fetchone() stats["video_count"] = row["count"] # 图片数量 row = conn.execute( "SELECT COUNT(*) as count FROM images WHERE project_id = ?", (project_id,), ).fetchone() stats["image_count"] = row["count"] # 多模态实体数量 row = conn.execute( """SELECT COUNT(DISTINCT entity_id) as count FROM multimodal_mentions WHERE project_id = ?""", (project_id,), ).fetchone() stats["multimodal_entity_count"] = row["count"] # 跨模态关联数量 row = conn.execute( """SELECT COUNT(*) as count FROM multimodal_entity_links WHERE entity_id IN (SELECT id FROM entities WHERE project_id = ?)""", (project_id,), ).fetchone() stats["cross_modal_links"] = row["count"] # 模态分布 for modality in ["audio", "video", "image", "document"]: row = conn.execute( """SELECT COUNT(*) as count FROM multimodal_mentions WHERE project_id = ? AND modality = ?""", (project_id, modality), ).fetchone() stats["modality_distribution"][modality] = row["count"] conn.close() return stats # Singleton instance _db_manager = None def get_db_manager() -> DatabaseManager: global _db_manager if _db_manager is None: _db_manager = DatabaseManager() return _db_manager