diff --git a/backend/__pycache__/db_manager.cpython-312.pyc b/backend/__pycache__/db_manager.cpython-312.pyc index b06fe2e..1f0203c 100644 Binary files a/backend/__pycache__/db_manager.cpython-312.pyc and b/backend/__pycache__/db_manager.cpython-312.pyc differ diff --git a/backend/__pycache__/main.cpython-312.pyc b/backend/__pycache__/main.cpython-312.pyc index 96bd360..31b5d48 100644 Binary files a/backend/__pycache__/main.cpython-312.pyc and b/backend/__pycache__/main.cpython-312.pyc differ diff --git a/backend/db_manager.py b/backend/db_manager.py index aec803e..1b0441c 100644 --- a/backend/db_manager.py +++ b/backend/db_manager.py @@ -1,13 +1,14 @@ #!/usr/bin/env python3 """ -InsightFlow Database Manager - Phase 3 +InsightFlow Database Manager - Phase 5 处理项目、实体、关系的持久化 -支持文档类型和多文件融合 +支持实体属性扩展 """ import os import json import sqlite3 +import uuid from datetime import datetime from typing import List, Dict, Optional, Tuple from dataclasses import dataclass @@ -114,7 +115,8 @@ class DatabaseManager: conn.commit() conn.close() - # Project operations + # ==================== Project Operations ==================== + def create_project(self, project_id: str, name: str, description: str = "") -> Project: conn = self.get_conn() now = datetime.now().isoformat() @@ -140,7 +142,8 @@ class DatabaseManager: conn.close() return [Project(**dict(r)) for r in rows] - # Entity operations + # ==================== Entity Operations ==================== + def create_entity(self, entity: Entity) -> Entity: conn = self.get_conn() conn.execute( @@ -168,9 +171,7 @@ class DatabaseManager: return None def find_similar_entities(self, project_id: str, name: str, threshold: float = 0.8) -> List[Entity]: - """查找相似实体(简单实现,生产可用 embedding)""" - # TODO: 使用 embedding 或模糊匹配 - # 现在简单返回包含相同关键词的实体 + """查找相似实体""" conn = self.get_conn() rows = conn.execute( "SELECT * FROM entities WHERE project_id = ? AND name LIKE ?", @@ -186,10 +187,9 @@ class DatabaseManager: return entities def merge_entities(self, target_id: str, source_id: str) -> Entity: - """合并两个实体(实体对齐)""" + """合并两个实体""" conn = self.get_conn() - # 获取两个实体 target = conn.execute("SELECT * FROM entities WHERE id = ?", (target_id,)).fetchone() source = conn.execute("SELECT * FROM entities WHERE id = ?", (source_id,)).fetchone() @@ -197,41 +197,21 @@ class DatabaseManager: conn.close() raise ValueError("Entity not found") - # 合并别名 target_aliases = set(json.loads(target['aliases']) if target['aliases'] else []) target_aliases.add(source['name']) target_aliases.update(json.loads(source['aliases']) if source['aliases'] else []) - # 更新目标实体 conn.execute( "UPDATE entities SET aliases = ?, updated_at = ? WHERE id = ?", (json.dumps(list(target_aliases)), datetime.now().isoformat(), target_id) ) - - # 更新提及记录 - conn.execute( - "UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?", - (target_id, source_id) - ) - - # 更新关系 - source 作为 source_entity_id - conn.execute( - "UPDATE entity_relations SET source_entity_id = ? WHERE source_entity_id = ?", - (target_id, source_id) - ) - - # 更新关系 - source 作为 target_entity_id - conn.execute( - "UPDATE entity_relations SET target_entity_id = ? WHERE target_entity_id = ?", - (target_id, source_id) - ) - - # 删除源实体 + conn.execute("UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?", (target_id, source_id)) + conn.execute("UPDATE entity_relations SET source_entity_id = ? WHERE source_entity_id = ?", (target_id, source_id)) + conn.execute("UPDATE entity_relations SET target_entity_id = ? WHERE target_entity_id = ?", (target_id, source_id)) conn.execute("DELETE FROM entities WHERE id = ?", (source_id,)) conn.commit() conn.close() - return self.get_entity(target_id) def get_entity(self, entity_id: str) -> Optional[Entity]: @@ -259,7 +239,49 @@ class DatabaseManager: entities.append(Entity(**data)) return entities - # Mention operations + def update_entity(self, entity_id: str, **kwargs) -> Entity: + """更新实体信息""" + conn = self.get_conn() + + allowed_fields = ['name', 'type', 'definition', 'canonical_name'] + updates = [] + values = [] + + for field in allowed_fields: + if field in kwargs: + updates.append(f"{field} = ?") + values.append(kwargs[field]) + + if 'aliases' in kwargs: + updates.append("aliases = ?") + values.append(json.dumps(kwargs['aliases'])) + + if not updates: + conn.close() + return self.get_entity(entity_id) + + updates.append("updated_at = ?") + values.append(datetime.now().isoformat()) + values.append(entity_id) + + query = f"UPDATE entities SET {', '.join(updates)} WHERE id = ?" + conn.execute(query, values) + conn.commit() + conn.close() + return self.get_entity(entity_id) + + def delete_entity(self, entity_id: str): + """删除实体及其关联数据""" + conn = self.get_conn() + conn.execute("DELETE FROM entity_mentions WHERE entity_id = ?", (entity_id,)) + conn.execute("DELETE FROM entity_relations WHERE source_entity_id = ? OR target_entity_id = ?", (entity_id, entity_id)) + conn.execute("DELETE FROM entity_attributes WHERE entity_id = ?", (entity_id,)) + conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,)) + conn.commit() + conn.close() + + # ==================== Mention Operations ==================== + def add_mention(self, mention: EntityMention) -> EntityMention: conn = self.get_conn() conn.execute( @@ -280,10 +302,10 @@ class DatabaseManager: ).fetchall() conn.close() return [EntityMention(**dict(r)) for r in rows] - - # Transcript operations + + # ==================== Transcript Operations ==================== + def save_transcript(self, transcript_id: str, project_id: str, filename: str, full_text: str, transcript_type: str = "audio"): - """保存转录记录""" conn = self.get_conn() now = datetime.now().isoformat() conn.execute( @@ -294,16 +316,12 @@ class DatabaseManager: conn.close() def get_transcript(self, transcript_id: str) -> Optional[dict]: - """获取转录记录""" conn = self.get_conn() row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone() conn.close() - if row: - return dict(row) - return None + return dict(row) if row else None def list_project_transcripts(self, project_id: str) -> List[dict]: - """列出项目的所有转录""" conn = self.get_conn() rows = conn.execute( "SELECT * FROM transcripts WHERE project_id = ? ORDER BY created_at DESC", @@ -312,10 +330,22 @@ class DatabaseManager: conn.close() return [dict(r) for r in rows] - # Relation operations + def update_transcript(self, transcript_id: str, full_text: str) -> dict: + conn = self.get_conn() + now = datetime.now().isoformat() + conn.execute( + "UPDATE transcripts SET full_text = ?, updated_at = ? WHERE id = ?", + (full_text, now, transcript_id) + ) + conn.commit() + row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone() + conn.close() + return dict(row) if row else None + + # ==================== Relation Operations ==================== + def create_relation(self, project_id: str, source_entity_id: str, target_entity_id: str, relation_type: str = "related", evidence: str = "", transcript_id: str = ""): - """创建实体关系""" conn = self.get_conn() relation_id = str(uuid.uuid4())[:8] now = datetime.now().isoformat() @@ -330,7 +360,6 @@ class DatabaseManager: return relation_id def get_entity_relations(self, entity_id: str) -> List[dict]: - """获取实体的所有关系""" conn = self.get_conn() rows = conn.execute( """SELECT * FROM entity_relations @@ -342,7 +371,6 @@ class DatabaseManager: return [dict(r) for r in rows] def list_project_relations(self, project_id: str) -> List[dict]: - """列出项目的所有关系""" conn = self.get_conn() rows = conn.execute( "SELECT * FROM entity_relations WHERE project_id = ? ORDER BY created_at DESC", @@ -351,68 +379,8 @@ class DatabaseManager: conn.close() return [dict(r) for r in rows] - def update_entity(self, entity_id: str, **kwargs) -> Entity: - """更新实体信息""" - conn = self.get_conn() - - # 构建更新字段 - allowed_fields = ['name', 'type', 'definition', 'canonical_name'] - updates = [] - values = [] - - for field in allowed_fields: - if field in kwargs: - updates.append(f"{field} = ?") - values.append(kwargs[field]) - - # 处理别名 - if 'aliases' in kwargs: - updates.append("aliases = ?") - values.append(json.dumps(kwargs['aliases'])) - - if not updates: - conn.close() - return self.get_entity(entity_id) - - updates.append("updated_at = ?") - values.append(datetime.now().isoformat()) - values.append(entity_id) - - query = f"UPDATE entities SET {', '.join(updates)} WHERE id = ?" - conn.execute(query, values) - conn.commit() - conn.close() - - return self.get_entity(entity_id) - - def delete_entity(self, entity_id: str): - """删除实体及其关联数据""" - conn = self.get_conn() - - # 删除提及记录 - conn.execute("DELETE FROM entity_mentions WHERE entity_id = ?", (entity_id,)) - - # 删除关系 - conn.execute("DELETE FROM entity_relations WHERE source_entity_id = ? OR target_entity_id = ?", - (entity_id, entity_id)) - - # 删除实体 - conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,)) - - conn.commit() - conn.close() - - def delete_relation(self, relation_id: str): - """删除关系""" - conn = self.get_conn() - conn.execute("DELETE FROM entity_relations WHERE id = ?", (relation_id,)) - conn.commit() - conn.close() - def update_relation(self, relation_id: str, **kwargs) -> dict: - """更新关系""" conn = self.get_conn() - allowed_fields = ['relation_type', 'evidence'] updates = [] values = [] @@ -430,42 +398,25 @@ class DatabaseManager: row = conn.execute("SELECT * FROM entity_relations WHERE id = ?", (relation_id,)).fetchone() conn.close() - return dict(row) if row else None - def update_transcript(self, transcript_id: str, full_text: str) -> dict: - """更新转录文本""" + def delete_relation(self, relation_id: str): conn = self.get_conn() - now = datetime.now().isoformat() - - conn.execute( - "UPDATE transcripts SET full_text = ?, updated_at = ? WHERE id = ?", - (full_text, now, transcript_id) - ) + conn.execute("DELETE FROM entity_relations WHERE id = ?", (relation_id,)) conn.commit() - - row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone() conn.close() - - return dict(row) if row else None - # Phase 3: Glossary operations + # ==================== Glossary Operations ==================== + def add_glossary_term(self, project_id: str, term: str, pronunciation: str = "") -> str: - """添加术语到术语表""" conn = self.get_conn() - - # 检查是否已存在 existing = conn.execute( "SELECT * FROM glossary WHERE project_id = ? AND term = ?", (project_id, term) ).fetchone() if existing: - # 更新频率 - conn.execute( - "UPDATE glossary SET frequency = frequency + 1 WHERE id = ?", - (existing['id'],) - ) + conn.execute("UPDATE glossary SET frequency = frequency + 1 WHERE id = ?", (existing['id'],)) conn.commit() conn.close() return existing['id'] @@ -480,7 +431,6 @@ class DatabaseManager: return term_id def list_glossary(self, project_id: str) -> List[dict]: - """列出项目术语表""" conn = self.get_conn() rows = conn.execute( "SELECT * FROM glossary WHERE project_id = ? ORDER BY frequency DESC", @@ -490,20 +440,14 @@ class DatabaseManager: return [dict(r) for r in rows] def delete_glossary_term(self, term_id: str): - """删除术语""" conn = self.get_conn() conn.execute("DELETE FROM glossary WHERE id = ?", (term_id,)) conn.commit() conn.close() - # Phase 3: Get all entities for embedding - def get_all_entities_for_embedding(self, project_id: str) -> List[Entity]: - """获取所有实体用于 embedding 计算""" - return self.list_project_entities(project_id) + # ==================== Phase 4: Agent & Provenance ==================== - # Phase 4: Agent & Provenance methods def get_relation_with_details(self, relation_id: str) -> Optional[dict]: - """获取关系详情,包含源文档信息""" conn = self.get_conn() row = conn.execute( """SELECT r.*, @@ -517,19 +461,11 @@ class DatabaseManager: (relation_id,) ).fetchone() conn.close() - if row: - return dict(row) - return None + return dict(row) if row else None def get_entity_with_mentions(self, entity_id: str) -> Optional[dict]: - """获取实体详情及所有提及位置""" conn = self.get_conn() - - # 获取实体信息 - entity_row = conn.execute( - "SELECT * FROM entities WHERE id = ?", (entity_id,) - ).fetchone() - + entity_row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone() if not entity_row: conn.close() return None @@ -537,23 +473,18 @@ class DatabaseManager: entity = dict(entity_row) entity['aliases'] = json.loads(entity['aliases']) if entity['aliases'] else [] - # 获取提及位置 mentions = conn.execute( """SELECT m.*, t.filename, t.created_at as transcript_date FROM entity_mentions m JOIN transcripts t ON m.transcript_id = t.id - WHERE m.entity_id = ? - ORDER BY t.created_at, m.start_pos""", + WHERE m.entity_id = ? ORDER BY t.created_at, m.start_pos""", (entity_id,) ).fetchall() - entity['mentions'] = [dict(m) for m in mentions] entity['mention_count'] = len(mentions) - # 获取相关关系 relations = conn.execute( - """SELECT r.*, - s.name as source_name, t.name as target_name + """SELECT r.*, s.name as source_name, t.name as target_name FROM entity_relations r JOIN entities s ON r.source_entity_id = s.id JOIN entities t ON r.target_entity_id = t.id @@ -561,14 +492,12 @@ class DatabaseManager: ORDER BY r.created_at DESC""", (entity_id, entity_id) ).fetchall() - entity['relations'] = [dict(r) for r in relations] conn.close() return entity def search_entities(self, project_id: str, query: str) -> List[Entity]: - """搜索实体""" conn = self.get_conn() rows = conn.execute( """SELECT * FROM entities @@ -587,49 +516,33 @@ class DatabaseManager: return entities def get_project_summary(self, project_id: str) -> dict: - """获取项目摘要信息,用于 RAG 上下文""" conn = self.get_conn() + project = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone() - # 项目基本信息 - project = conn.execute( - "SELECT * FROM projects WHERE id = ?", (project_id,) - ).fetchone() - - # 统计信息 entity_count = conn.execute( - "SELECT COUNT(*) as count FROM entities WHERE project_id = ?", - (project_id,) + "SELECT COUNT(*) as count FROM entities WHERE project_id = ?", (project_id,) ).fetchone()['count'] transcript_count = conn.execute( - "SELECT COUNT(*) as count FROM transcripts WHERE project_id = ?", - (project_id,) + "SELECT COUNT(*) as count FROM transcripts WHERE project_id = ?", (project_id,) ).fetchone()['count'] relation_count = conn.execute( - "SELECT COUNT(*) as count FROM entity_relations WHERE project_id = ?", - (project_id,) + "SELECT COUNT(*) as count FROM entity_relations WHERE project_id = ?", (project_id,) ).fetchone()['count'] - # 获取最近的转录文本片段 recent_transcripts = conn.execute( - """SELECT filename, full_text, created_at - FROM transcripts - WHERE project_id = ? - ORDER BY created_at DESC - LIMIT 5""", + """SELECT filename, full_text, created_at FROM transcripts + WHERE project_id = ? ORDER BY created_at DESC LIMIT 5""", (project_id,) ).fetchall() - # 获取高频实体 top_entities = conn.execute( """SELECT e.name, e.type, e.definition, COUNT(m.id) as mention_count FROM entities e LEFT JOIN entity_mentions m ON e.id = m.entity_id WHERE e.project_id = ? - GROUP BY e.id - ORDER BY mention_count DESC - LIMIT 10""", + GROUP BY e.id ORDER BY mention_count DESC LIMIT 10""", (project_id,) ).fetchall() @@ -645,74 +558,49 @@ class DatabaseManager: 'recent_transcripts': [dict(t) for t in recent_transcripts], 'top_entities': [dict(e) for e in top_entities] } - - # Phase 5: Timeline operations + + def get_transcript_context(self, transcript_id: str, position: int, context_chars: int = 200) -> str: + conn = self.get_conn() + row = conn.execute("SELECT full_text FROM transcripts WHERE id = ?", (transcript_id,)).fetchone() + conn.close() + if not row: + return "" + text = row['full_text'] + start = max(0, position - context_chars) + end = min(len(text), position + context_chars) + return text[start:end] + + # ==================== Phase 5: Timeline Operations ==================== + def get_project_timeline(self, project_id: str, entity_id: str = None, start_date: str = None, end_date: str = None) -> List[dict]: - """获取项目时间线数据 - 按时间顺序的实体提及和事件""" conn = self.get_conn() - # 构建查询条件 conditions = ["t.project_id = ?"] params = [project_id] if entity_id: conditions.append("m.entity_id = ?") params.append(entity_id) - if start_date: conditions.append("t.created_at >= ?") params.append(start_date) - if end_date: conditions.append("t.created_at <= ?") params.append(end_date) where_clause = " AND ".join(conditions) - # 获取实体提及时间线 mentions = conn.execute( f"""SELECT m.*, e.name as entity_name, e.type as entity_type, e.definition, t.filename, t.created_at as event_date, t.type as source_type FROM entity_mentions m JOIN entities e ON m.entity_id = e.id JOIN transcripts t ON m.transcript_id = t.id - WHERE {where_clause} - ORDER BY t.created_at, m.start_pos""", + WHERE {where_clause} ORDER BY t.created_at, m.start_pos""", params ).fetchall() - # 获取关系创建时间线 - relation_conditions = ["r.project_id = ?"] - relation_params = [project_id] - - if start_date: - relation_conditions.append("r.created_at >= ?") - relation_params.append(start_date) - - if end_date: - relation_conditions.append("r.created_at <= ?") - relation_params.append(end_date) - - relation_where = " AND ".join(relation_conditions) - - relations = conn.execute( - f"""SELECT r.*, - s.name as source_name, t.name as target_name, - tr.filename, r.created_at as event_date - FROM entity_relations r - JOIN entities s ON r.source_entity_id = s.id - JOIN entities t ON r.target_entity_id = t.id - LEFT JOIN transcripts tr ON r.transcript_id = tr.id - WHERE {relation_where} - ORDER BY r.created_at""", - relation_params - ).fetchall() - - conn.close() - - # 合并并格式化时间线事件 timeline_events = [] - for m in mentions: timeline_events.append({ 'id': m['id'], @@ -721,52 +609,26 @@ class DatabaseManager: 'entity_id': m['entity_id'], 'entity_name': m['entity_name'], 'entity_type': m['entity_type'], - 'entity_definition': m['definition'], 'text_snippet': m['text_snippet'], 'confidence': m['confidence'], - 'source': { - 'transcript_id': m['transcript_id'], - 'filename': m['filename'], - 'type': m['source_type'] - } + 'source': {'transcript_id': m['transcript_id'], 'filename': m['filename'], 'type': m['source_type']} }) - for r in relations: - timeline_events.append({ - 'id': r['id'], - 'type': 'relation', - 'event_date': r['event_date'], - 'relation_type': r['relation_type'], - 'source_entity': r['source_name'], - 'target_entity': r['target_name'], - 'evidence': r['evidence'], - 'source': { - 'transcript_id': r.get('transcript_id'), - 'filename': r['filename'] - } - }) - - # 按时间排序 + conn.close() timeline_events.sort(key=lambda x: x['event_date']) - return timeline_events def get_entity_timeline_summary(self, project_id: str) -> dict: - """获取项目实体时间线摘要统计""" conn = self.get_conn() - # 按日期统计提及数量 daily_stats = conn.execute( """SELECT DATE(t.created_at) as date, COUNT(*) as count FROM entity_mentions m JOIN transcripts t ON m.transcript_id = t.id - WHERE t.project_id = ? - GROUP BY DATE(t.created_at) - ORDER BY date""", + WHERE t.project_id = ? GROUP BY DATE(t.created_at) ORDER BY date""", (project_id,) ).fetchall() - # 按实体统计提及数量 entity_stats = conn.execute( """SELECT e.name, e.type, COUNT(m.id) as mention_count, MIN(t.created_at) as first_mentioned, @@ -775,23 +637,7 @@ class DatabaseManager: LEFT JOIN entity_mentions m ON e.id = m.entity_id LEFT JOIN transcripts t ON m.transcript_id = t.id WHERE e.project_id = ? - GROUP BY e.id - ORDER BY mention_count DESC - LIMIT 20""", - (project_id,) - ).fetchall() - - # 获取活跃时间段 - active_periods = conn.execute( - """SELECT - DATE(t.created_at) as date, - COUNT(DISTINCT m.entity_id) as active_entities, - COUNT(m.id) as total_mentions - FROM transcripts t - LEFT JOIN entity_mentions m ON t.id = m.transcript_id - WHERE t.project_id = ? - GROUP BY DATE(t.created_at) - ORDER BY date""", + GROUP BY e.id ORDER BY mention_count DESC LIMIT 20""", (project_id,) ).fetchall() @@ -799,56 +645,42 @@ class DatabaseManager: return { 'daily_activity': [dict(d) for d in daily_stats], - 'top_entities': [dict(e) for e in entity_stats], - 'active_periods': [dict(a) for a in active_periods] + 'top_entities': [dict(e) for e in entity_stats] } - - # Phase 5: Attribute Template operations - def create_attribute_template(self, project_id: str, template_data: dict) -> dict: - """创建属性模板""" + + # ==================== Phase 5: Entity Attributes ==================== + + def create_attribute_template(self, template: AttributeTemplate) -> AttributeTemplate: conn = self.get_conn() - template_id = str(uuid.uuid4())[:8] now = datetime.now().isoformat() - conn.execute( """INSERT INTO attribute_templates - (id, project_id, name, type, description, options, is_required, default_value, sort_order, created_at, updated_at) + (id, project_id, name, type, options, default_value, description, is_required, display_order, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (template_id, project_id, template_data['name'], template_data['type'], - template_data.get('description', ''), - json.dumps(template_data.get('options', [])) if template_data.get('options') else None, - 1 if template_data.get('is_required') else 0, - template_data.get('default_value'), - template_data.get('sort_order', 0), - now, now) + (template.id, template.project_id, template.name, template.type, + json.dumps(template.options) if template.options else None, + template.default_value, template.description, template.is_required, + template.display_order, now, now) ) conn.commit() conn.close() - return self.get_attribute_template(template_id) + return template - def get_attribute_template(self, template_id: str) -> Optional[dict]: - """获取属性模板详情""" + def get_attribute_template(self, template_id: str) -> Optional[AttributeTemplate]: conn = self.get_conn() - row = conn.execute( - "SELECT * FROM attribute_templates WHERE id = ?", - (template_id,) - ).fetchone() + row = conn.execute("SELECT * FROM attribute_templates WHERE id = ?", (template_id,)).fetchone() conn.close() - if row: data = dict(row) - if data.get('options'): - data['options'] = json.loads(data['options']) - return data + data['options'] = json.loads(data['options']) if data['options'] else [] + return AttributeTemplate(**data) return None - def list_attribute_templates(self, project_id: str) -> List[dict]: - """列出项目的所有属性模板""" + def list_attribute_templates(self, project_id: str) -> List[AttributeTemplate]: conn = self.get_conn() rows = conn.execute( - """SELECT * FROM attribute_templates - WHERE project_id = ? - ORDER BY sort_order, created_at""", + """SELECT * FROM attribute_templates WHERE project_id = ? + ORDER BY display_order, created_at""", (project_id,) ).fetchall() conn.close() @@ -856,444 +688,51 @@ class DatabaseManager: templates = [] for row in rows: data = dict(row) - if data.get('options'): - data['options'] = json.loads(data['options']) - templates.append(data) + data['options'] = json.loads(data['options']) if data['options'] else [] + templates.append(AttributeTemplate(**data)) return templates - def update_attribute_template(self, template_id: str, template_data: dict) -> Optional[dict]: - """更新属性模板""" + def update_attribute_template(self, template_id: str, **kwargs) -> Optional[AttributeTemplate]: conn = self.get_conn() - now = datetime.now().isoformat() - - allowed_fields = ['name', 'type', 'description', 'is_required', 'default_value', 'sort_order'] + allowed_fields = ['name', 'type', 'options', 'default_value', 'description', 'is_required', 'display_order'] updates = [] values = [] for field in allowed_fields: - if field in template_data: + if field in kwargs: updates.append(f"{field} = ?") - if field == 'is_required': - values.append(1 if template_data[field] else 0) + if field == 'options': + values.append(json.dumps(kwargs[field]) if kwargs[field] else None) else: - values.append(template_data[field]) - - if 'options' in template_data: - updates.append("options = ?") - values.append(json.dumps(template_data['options']) if template_data['options'] else None) - - if not updates: - conn.close() - return self.get_attribute_template(template_id) - - updates.append("updated_at = ?") - values.append(now) - values.append(template_id) - - query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?" - conn.execute(query, values) - conn.commit() - conn.close() - - return self.get_attribute_template(template_id) - - def delete_attribute_template(self, template_id: str): - """删除属性模板""" - conn = self.get_conn() - conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,)) - conn.commit() - conn.close() - - # Phase 5: Entity Attribute operations - def get_entity_attributes(self, entity_id: str) -> List[dict]: - """获取实体的所有属性""" - conn = self.get_conn() - rows = conn.execute( - """SELECT a.*, t.name as template_name, t.description as template_description - FROM entity_attributes a - LEFT JOIN attribute_templates t ON a.template_id = t.id - WHERE a.entity_id = ? - ORDER BY t.sort_order, a.created_at""", - (entity_id,) - ).fetchall() - conn.close() - - attributes = [] - for row in rows: - data = dict(row) - if data.get('options'): - data['options'] = json.loads(data['options']) - # 解析 value 根据 type - if data['type'] == 'number' and data['value']: - try: - data['value'] = float(data['value']) - except: - pass - elif data['type'] == 'multiselect' and data['value']: - try: - data['value'] = json.loads(data['value']) - except: - pass - attributes.append(data) - return attributes - - def get_entity_attribute(self, attribute_id: str) -> Optional[dict]: - """获取单个属性详情""" - conn = self.get_conn() - row = conn.execute( - "SELECT * FROM entity_attributes WHERE id = ?", - (attribute_id,) - ).fetchone() - conn.close() - - if row: - data = dict(row) - if data.get('options'): - data['options'] = json.loads(data['options']) - return data - return None - - def set_entity_attribute(self, entity_id: str, attr_data: dict, changed_by: str = "system") -> dict: - """设置实体属性值(创建或更新)""" - conn = self.get_conn() - now = datetime.now().isoformat() - - # 检查是否已存在 - existing = conn.execute( - "SELECT * FROM entity_attributes WHERE entity_id = ? AND name = ?", - (entity_id, attr_data['name']) - ).fetchone() - - # 处理 value 存储 - value = attr_data['value'] - if attr_data['type'] == 'multiselect' and isinstance(value, list): - value = json.dumps(value) - elif value is not None: - value = str(value) - - if existing: - # 记录历史 - old_value = existing['value'] - conn.execute( - """INSERT INTO attribute_history - (id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (str(uuid.uuid4())[:8], entity_id, attr_data['name'], old_value, value, - changed_by, now, attr_data.get('change_reason', '')) - ) - - # 更新属性 - conn.execute( - """UPDATE entity_attributes - SET value = ?, type = ?, options = ?, updated_at = ? - WHERE id = ?""", - (value, attr_data['type'], - json.dumps(attr_data.get('options', [])) if attr_data.get('options') else existing['options'], - now, existing['id']) - ) - attribute_id = existing['id'] - else: - # 创建新属性 - attribute_id = str(uuid.uuid4())[:8] - conn.execute( - """INSERT INTO entity_attributes - (id, entity_id, template_id, name, type, value, options, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (attribute_id, entity_id, attr_data.get('template_id'), - attr_data['name'], attr_data['type'], value, - json.dumps(attr_data.get('options', [])) if attr_data.get('options') else None, - now, now) - ) - - # 记录历史 - conn.execute( - """INSERT INTO attribute_history - (id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (str(uuid.uuid4())[:8], entity_id, attr_data['name'], None, value, - changed_by, now, attr_data.get('change_reason', '创建属性')) - ) - - conn.commit() - conn.close() - return self.get_entity_attribute(attribute_id) - - def update_entity_attribute(self, attribute_id: str, attr_data: dict, changed_by: str = "system") -> Optional[dict]: - """更新实体属性""" - conn = self.get_conn() - now = datetime.now().isoformat() - - existing = conn.execute( - "SELECT * FROM entity_attributes WHERE id = ?", - (attribute_id,) - ).fetchone() - - if not existing: - conn.close() - return None - - # 处理 value - value = attr_data.get('value') - if value is not None: - if attr_data.get('type', existing['type']) == 'multiselect' and isinstance(value, list): - value = json.dumps(value) - else: - value = str(value) - - # 记录历史 - conn.execute( - """INSERT INTO attribute_history - (id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (str(uuid.uuid4())[:8], existing['entity_id'], existing['name'], - existing['value'], value, changed_by, now, - attr_data.get('change_reason', '')) - ) - - # 更新字段 - updates = [] - values = [] - - if 'value' in attr_data: - updates.append("value = ?") - values.append(value) - - if 'type' in attr_data: - updates.append("type = ?") - values.append(attr_data['type']) - - if 'options' in attr_data: - updates.append("options = ?") - values.append(json.dumps(attr_data['options']) if attr_data['options'] else None) + values.append(kwargs[field]) if updates: updates.append("updated_at = ?") - values.append(now) - values.append(attribute_id) - - query = f"UPDATE entity_attributes SET {', '.join(updates)} WHERE id = ?" + values.append(datetime.now().isoformat()) + values.append(template_id) + query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?" conn.execute(query, values) conn.commit() conn.close() - return self.get_entity_attribute(attribute_id) - - def delete_entity_attribute(self, attribute_id: str, changed_by: str = "system"): - """删除实体属性""" - conn = self.get_conn() - - existing = conn.execute( - "SELECT * FROM entity_attributes WHERE id = ?", - (attribute_id,) - ).fetchone() - - if existing: - # 记录历史 - conn.execute( - """INSERT INTO attribute_history - (id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (str(uuid.uuid4())[:8], existing['entity_id'], existing['name'], - existing['value'], None, changed_by, datetime.now().isoformat(), '删除属性') - ) - - conn.execute("DELETE FROM entity_attributes WHERE id = ?", (attribute_id,)) - conn.commit() - - conn.close() - - def get_attribute_history(self, entity_id: str, attribute_name: str = None) -> List[dict]: - """获取属性变更历史""" - conn = self.get_conn() - - if attribute_name: - rows = conn.execute( - """SELECT * FROM attribute_history - WHERE entity_id = ? AND attribute_name = ? - ORDER BY changed_at DESC""", - (entity_id, attribute_name) - ).fetchall() - else: - rows = conn.execute( - """SELECT * FROM attribute_history - WHERE entity_id = ? - ORDER BY changed_at DESC""", - (entity_id,) - ).fetchall() - - conn.close() - return [dict(r) for r in rows] - - def search_entities_by_attributes(self, project_id: str, filters: List[dict]) -> List[Entity]: - """根据属性筛选搜索实体""" - conn = self.get_conn() - - # 基础查询 - base_query = "SELECT DISTINCT e.* FROM entities e" - where_conditions = ["e.project_id = ?"] - params = [project_id] - - # 为每个过滤条件添加 JOIN - join_clauses = [] - for i, f in enumerate(filters): - alias = f"a{i}" - join_clauses.append( - f"JOIN entity_attributes {alias} ON e.id = {alias}.entity_id AND {alias}.name = ?" - ) - params.append(f['name']) - - operator = f.get('operator', 'eq') - if operator == 'eq': - where_conditions.append(f"{alias}.value = ?") - params.append(str(f['value'])) - elif operator == 'contains': - where_conditions.append(f"{alias}.value LIKE ?") - params.append(f"%{f['value']}%") - elif operator == 'gt': - where_conditions.append(f"CAST({alias}.value AS REAL) > ?") - params.append(float(f['value'])) - elif operator == 'lt': - where_conditions.append(f"CAST({alias}.value AS REAL) < ?") - params.append(float(f['value'])) - - query = base_query + " " + " ".join(join_clauses) + " WHERE " + " AND ".join(where_conditions) - - rows = conn.execute(query, params).fetchall() - conn.close() - - entities = [] - for row in rows: - data = dict(row) - data['aliases'] = json.loads(data['aliases']) if data['aliases'] else [] - entities.append(Entity(**data)) - return entities - - def get_transcript_context(self, transcript_id: str, position: int, context_chars: int = 200) -> str: - """获取转录文本的上下文""" - conn = self.get_conn() - row = conn.execute( - "SELECT full_text FROM transcripts WHERE id = ?", - (transcript_id,) - ).fetchone() - conn.close() - - if not row: - return "" - - text = row['full_text'] - start = max(0, position - context_chars) - end = min(len(text), position + context_chars) - return text[start:end] - - # ==================== Phase 5: 实体属性管理 ==================== - - # ---- 属性模板管理 ---- - - def create_attribute_template(self, template: AttributeTemplate) -> AttributeTemplate: - """创建属性模板""" - conn = self.get_conn() - now = datetime.now().isoformat() - conn.execute( - """INSERT INTO attribute_templates - (id, project_id, name, type, options, default_value, description, is_required, display_order, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (template.id, template.project_id, template.name, template.type, - json.dumps(template.options) if template.options else None, - template.default_value, template.description, template.is_required, - template.display_order, now, now) - ) - conn.commit() - conn.close() - return template - - def get_attribute_template(self, template_id: str) -> Optional[AttributeTemplate]: - """获取属性模板""" - conn = self.get_conn() - row = conn.execute( - "SELECT * FROM attribute_templates WHERE id = ?", - (template_id,) - ).fetchone() - conn.close() - if row: - data = dict(row) - data['options'] = json.loads(data['options']) if data['options'] else [] - return AttributeTemplate(**data) - return None - - def list_attribute_templates(self, project_id: str) -> List[AttributeTemplate]: - """列出项目的所有属性模板""" - conn = self.get_conn() - rows = conn.execute( - """SELECT * FROM attribute_templates - WHERE project_id = ? - ORDER BY display_order, created_at""", - (project_id,) - ).fetchall() - conn.close() - - templates = [] - for row in rows: - data = dict(row) - data['options'] = json.loads(data['options']) if data['options'] else [] - templates.append(AttributeTemplate(**data)) - return templates - - def update_attribute_template(self, template_id: str, **kwargs) -> Optional[AttributeTemplate]: - """更新属性模板""" - conn = self.get_conn() - - allowed_fields = ['name', 'type', 'options', 'default_value', - 'description', 'is_required', 'display_order'] - updates = [] - values = [] - - for field in allowed_fields: - if field in kwargs: - updates.append(f"{field} = ?") - if field == 'options': - values.append(json.dumps(kwargs[field]) if kwargs[field] else None) - else: - values.append(kwargs[field]) - - if not updates: - conn.close() - return self.get_attribute_template(template_id) - - updates.append("updated_at = ?") - values.append(datetime.now().isoformat()) - values.append(template_id) - - query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?" - conn.execute(query, values) - conn.commit() - conn.close() - return self.get_attribute_template(template_id) def delete_attribute_template(self, template_id: str): - """删除属性模板(会级联删除相关属性值)""" conn = self.get_conn() conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,)) conn.commit() conn.close() - # ---- 实体属性值管理 ---- - - def set_entity_attribute(self, attr: EntityAttribute, - changed_by: str = "system", - change_reason: str = "") -> EntityAttribute: - """设置实体属性值,自动记录历史""" + def set_entity_attribute(self, attr: EntityAttribute, changed_by: str = "system", change_reason: str = "") -> EntityAttribute: conn = self.get_conn() now = datetime.now().isoformat() - # 获取旧值 old_row = conn.execute( "SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?", (attr.entity_id, attr.template_id) ).fetchone() old_value = old_row['value'] if old_row else None - # 记录变更历史 if old_value != attr.value: conn.execute( """INSERT INTO attribute_history @@ -1303,7 +742,6 @@ class DatabaseManager: old_value, attr.value, changed_by, now, change_reason) ) - # 插入或更新属性值 conn.execute( """INSERT OR REPLACE INTO entity_attributes (id, entity_id, template_id, value, created_at, updated_at) @@ -1311,8 +749,7 @@ class DatabaseManager: COALESCE((SELECT id FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?), ?, ?, ?, COALESCE((SELECT created_at FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?), - ? - )""", + ?)""", (attr.entity_id, attr.template_id, attr.id, attr.entity_id, attr.template_id, attr.value, attr.entity_id, attr.template_id, now, now) @@ -1323,85 +760,58 @@ class DatabaseManager: return attr def get_entity_attributes(self, entity_id: str) -> List[EntityAttribute]: - """获取实体的所有属性值""" conn = self.get_conn() rows = conn.execute( """SELECT ea.*, at.name as template_name, at.type as template_type FROM entity_attributes ea JOIN attribute_templates at ON ea.template_id = at.id - WHERE ea.entity_id = ? - ORDER BY at.display_order""", + WHERE ea.entity_id = ? ORDER BY at.display_order""", (entity_id,) ).fetchall() conn.close() - return [EntityAttribute(**dict(r)) for r in rows] def get_entity_with_attributes(self, entity_id: str) -> Optional[Entity]: - """获取实体详情,包含属性""" entity = self.get_entity(entity_id) if not entity: return None - - # 获取属性 attrs = self.get_entity_attributes(entity_id) entity.attributes = { - attr.template_name: { - 'value': attr.value, - 'type': attr.template_type, - 'template_id': attr.template_id - } + attr.template_name: {'value': attr.value, 'type': attr.template_type, 'template_id': attr.template_id} for attr in attrs } - return entity - def delete_entity_attribute(self, entity_id: str, template_id: str, - changed_by: str = "system", change_reason: str = ""): - """删除实体属性值""" + def delete_entity_attribute(self, entity_id: str, template_id: str, changed_by: str = "system", change_reason: str = ""): conn = self.get_conn() - - # 获取旧值 old_row = conn.execute( "SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?", (entity_id, template_id) ).fetchone() if old_row: - # 记录删除历史 conn.execute( """INSERT INTO attribute_history (id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (str(uuid.uuid4())[:8], entity_id, template_id, - old_row['value'], None, changed_by, datetime.now().isoformat(), - change_reason or "属性删除") + old_row['value'], None, changed_by, datetime.now().isoformat(), change_reason or "属性删除") ) - - # 删除属性 conn.execute( "DELETE FROM entity_attributes WHERE entity_id = ? AND template_id = ?", (entity_id, template_id) ) conn.commit() - conn.close() - # ---- 属性历史管理 ---- - - def get_attribute_history(self, entity_id: str = None, - template_id: str = None, - limit: int = 50) -> List[AttributeHistory]: - """获取属性变更历史""" + def get_attribute_history(self, entity_id: str = None, template_id: str = None, limit: int = 50) -> List[AttributeHistory]: conn = self.get_conn() - conditions = [] params = [] if entity_id: conditions.append("ah.entity_id = ?") params.append(entity_id) - if template_id: conditions.append("ah.template_id = ?") params.append(template_id) @@ -1413,31 +823,22 @@ class DatabaseManager: FROM attribute_history ah JOIN attribute_templates at ON ah.template_id = at.id WHERE {where_clause} - ORDER BY ah.changed_at DESC - LIMIT ?""", + ORDER BY ah.changed_at DESC LIMIT ?""", params + [limit] ).fetchall() conn.close() - return [AttributeHistory(**dict(r)) for r in rows] - def search_entities_by_attributes(self, project_id: str, - attribute_filters: Dict[str, str]) -> List[Entity]: - """根据属性筛选搜索实体""" - conn = self.get_conn() - - # 获取项目所有实体 + def search_entities_by_attributes(self, project_id: str, attribute_filters: Dict[str, str]) -> List[Entity]: entities = self.list_project_entities(project_id) - if not attribute_filters: return entities - # 获取所有实体的属性 entity_ids = [e.id for e in entities] if not entity_ids: return [] - # 构建查询条件 + conn = self.get_conn() placeholders = ','.join(['?' for _ in entity_ids]) rows = conn.execute( f"""SELECT ea.*, at.name as template_name @@ -1446,10 +847,8 @@ class DatabaseManager: WHERE ea.entity_id IN ({placeholders})""", entity_ids ).fetchall() - conn.close() - # 按实体ID分组属性 entity_attrs = {} for row in rows: eid = row['entity_id'] @@ -1457,7 +856,6 @@ class DatabaseManager: entity_attrs[eid] = {} entity_attrs[eid][row['template_name']] = row['value'] - # 过滤实体 filtered = [] for entity in entities: attrs = entity_attrs.get(entity.id, {}) @@ -1469,314 +867,12 @@ class DatabaseManager: if match: entity.attributes = attrs filtered.append(entity) - return filtered # Singleton instance _db_manager = None - -def get_db_manager() -> DatabaseManager: - global _db_manager - if _db_manager is None: - _db_manager = DatabaseManager() - return _db_manager - end = min(len(text), position + context_chars) - return text[start:end] - - # ==================== Phase 5: 实体属性管理 ==================== - - # ---- 属性模板管理 ---- - - def create_attribute_template(self, template: AttributeTemplate) -> AttributeTemplate: - """创建属性模板""" - conn = self.get_conn() - now = datetime.now().isoformat() - conn.execute( - """INSERT INTO attribute_templates - (id, project_id, name, type, options, default_value, description, is_required, display_order, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (template.id, template.project_id, template.name, template.type, - json.dumps(template.options) if template.options else None, - template.default_value, template.description, template.is_required, - template.display_order, now, now) - ) - conn.commit() - conn.close() - return template - - def get_attribute_template(self, template_id: str) -> Optional[AttributeTemplate]: - """获取属性模板""" - conn = self.get_conn() - row = conn.execute( - "SELECT * FROM attribute_templates WHERE id = ?", - (template_id,) - ).fetchone() - conn.close() - if row: - data = dict(row) - data['options'] = json.loads(data['options']) if data['options'] else [] - return AttributeTemplate(**data) - return None - - def list_attribute_templates(self, project_id: str) -> List[AttributeTemplate]: - """列出项目的所有属性模板""" - conn = self.get_conn() - rows = conn.execute( - """SELECT * FROM attribute_templates - WHERE project_id = ? - ORDER BY display_order, created_at""", - (project_id,) - ).fetchall() - conn.close() - - templates = [] - for row in rows: - data = dict(row) - data['options'] = json.loads(data['options']) if data['options'] else [] - templates.append(AttributeTemplate(**data)) - return templates - - def update_attribute_template(self, template_id: str, **kwargs) -> Optional[AttributeTemplate]: - """更新属性模板""" - conn = self.get_conn() - - allowed_fields = ['name', 'type', 'options', 'default_value', - 'description', 'is_required', 'display_order'] - updates = [] - values = [] - - for field in allowed_fields: - if field in kwargs: - updates.append(f"{field} = ?") - if field == 'options': - values.append(json.dumps(kwargs[field]) if kwargs[field] else None) - else: - values.append(kwargs[field]) - - if not updates: - conn.close() - return self.get_attribute_template(template_id) - - updates.append("updated_at = ?") - values.append(datetime.now().isoformat()) - values.append(template_id) - - query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?" - conn.execute(query, values) - conn.commit() - conn.close() - - return self.get_attribute_template(template_id) - - def delete_attribute_template(self, template_id: str): - """删除属性模板(会级联删除相关属性值)""" - conn = self.get_conn() - conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,)) - conn.commit() - conn.close() - - # ---- 实体属性值管理 ---- - - def set_entity_attribute(self, attr: EntityAttribute, - changed_by: str = "system", - change_reason: str = "") -> EntityAttribute: - """设置实体属性值,自动记录历史""" - conn = self.get_conn() - now = datetime.now().isoformat() - - # 获取旧值 - old_row = conn.execute( - "SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?", - (attr.entity_id, attr.template_id) - ).fetchone() - old_value = old_row['value'] if old_row else None - - # 记录变更历史 - if old_value != attr.value: - conn.execute( - """INSERT INTO attribute_history - (id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (str(uuid.uuid4())[:8], attr.entity_id, attr.template_id, - old_value, attr.value, changed_by, now, change_reason) - ) - - # 插入或更新属性值 - conn.execute( - """INSERT OR REPLACE INTO entity_attributes - (id, entity_id, template_id, value, created_at, updated_at) - VALUES ( - COALESCE((SELECT id FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?), - ?, ?, ?, - COALESCE((SELECT created_at FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?), - ? - )""", - (attr.entity_id, attr.template_id, attr.id, - attr.entity_id, attr.template_id, attr.value, - attr.entity_id, attr.template_id, now, now) - ) - - conn.commit() - conn.close() - return attr - - def get_entity_attributes(self, entity_id: str) -> List[EntityAttribute]: - """获取实体的所有属性值""" - conn = self.get_conn() - rows = conn.execute( - """SELECT ea.*, at.name as template_name, at.type as template_type - FROM entity_attributes ea - JOIN attribute_templates at ON ea.template_id = at.id - WHERE ea.entity_id = ? - ORDER BY at.display_order""", - (entity_id,) - ).fetchall() - conn.close() - - return [EntityAttribute(**dict(r)) for r in rows] - - def get_entity_with_attributes(self, entity_id: str) -> Optional[Entity]: - """获取实体详情,包含属性""" - entity = self.get_entity(entity_id) - if not entity: - return None - - # 获取属性 - attrs = self.get_entity_attributes(entity_id) - entity.attributes = { - attr.template_name: { - 'value': attr.value, - 'type': attr.template_type, - 'template_id': attr.template_id - } - for attr in attrs - } - - return entity - - def delete_entity_attribute(self, entity_id: str, template_id: str, - changed_by: str = "system", change_reason: str = ""): - """删除实体属性值""" - conn = self.get_conn() - - # 获取旧值 - old_row = conn.execute( - "SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?", - (entity_id, template_id) - ).fetchone() - - if old_row: - # 记录删除历史 - conn.execute( - """INSERT INTO attribute_history - (id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - (str(uuid.uuid4())[:8], entity_id, template_id, - old_row['value'], None, changed_by, datetime.now().isoformat(), - change_reason or "属性删除") - ) - - # 删除属性 - conn.execute( - "DELETE FROM entity_attributes WHERE entity_id = ? AND template_id = ?", - (entity_id, template_id) - ) - conn.commit() - - conn.close() - - # ---- 属性历史管理 ---- - - def get_attribute_history(self, entity_id: str = None, - template_id: str = None, - limit: int = 50) -> List[AttributeHistory]: - """获取属性变更历史""" - conn = self.get_conn() - - conditions = [] - params = [] - - if entity_id: - conditions.append("ah.entity_id = ?") - params.append(entity_id) - - if template_id: - conditions.append("ah.template_id = ?") - params.append(template_id) - - where_clause = " AND ".join(conditions) if conditions else "1=1" - - rows = conn.execute( - f"""SELECT ah.*, at.name as template_name - FROM attribute_history ah - JOIN attribute_templates at ON ah.template_id = at.id - WHERE {where_clause} - ORDER BY ah.changed_at DESC - LIMIT ?""", - params + [limit] - ).fetchall() - conn.close() - - return [AttributeHistory(**dict(r)) for r in rows] - - def search_entities_by_attributes(self, project_id: str, - attribute_filters: Dict[str, str]) -> List[Entity]: - """根据属性筛选搜索实体""" - conn = self.get_conn() - - # 获取项目所有实体 - entities = self.list_project_entities(project_id) - - if not attribute_filters: - return entities - - # 获取所有实体的属性 - entity_ids = [e.id for e in entities] - if not entity_ids: - return [] - - # 构建查询条件 - placeholders = ','.join(['?' for _ in entity_ids]) - rows = conn.execute( - f"""SELECT ea.*, at.name as template_name - FROM entity_attributes ea - JOIN attribute_templates at ON ea.template_id = at.id - WHERE ea.entity_id IN ({placeholders})""", - entity_ids - ).fetchall() - - conn.close() - - # 按实体ID分组属性 - entity_attrs = {} - for row in rows: - eid = row['entity_id'] - if eid not in entity_attrs: - entity_attrs[eid] = {} - entity_attrs[eid][row['template_name']] = row['value'] - - # 过滤实体 - filtered = [] - for entity in entities: - attrs = entity_attrs.get(entity.id, {}) - match = True - for attr_name, attr_value in attribute_filters.items(): - if attrs.get(attr_name) != attr_value: - match = False - break - if match: - entity.attributes = attrs - filtered.append(entity) - - return filtered - - -# Singleton instance -_db_manager = None - - def get_db_manager() -> DatabaseManager: global _db_manager if _db_manager is None: