- Add backend API endpoints for timeline data:
- GET /api/v1/projects/{id}/timeline
- GET /api/v1/projects/{id}/timeline/summary
- GET /api/v1/entities/{id}/timeline
- Add database methods for timeline queries in db_manager.py
- Add frontend timeline view:
- New sidebar button for timeline view
- Timeline panel with date-grouped events
- Visual distinction between mentions and relations
- Entity filter dropdown
- Statistics cards
- Interactive event cards
- Update STATUS.md with Phase 5 progress
- Add view switching functions (switchView, switchKBTab)
- Add knowledge base loading functions
786 lines
27 KiB
Python
786 lines
27 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InsightFlow Database Manager - Phase 3
|
||
处理项目、实体、关系的持久化
|
||
支持文档类型和多文件融合
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import sqlite3
|
||
from datetime import datetime
|
||
from typing import List, Dict, Optional, Tuple
|
||
from dataclasses import dataclass
|
||
|
||
DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db")
|
||
|
||
@dataclass
|
||
class Project:
|
||
id: str
|
||
name: str
|
||
description: str = ""
|
||
created_at: str = ""
|
||
updated_at: str = ""
|
||
|
||
@dataclass
|
||
class Entity:
|
||
id: str
|
||
project_id: str
|
||
name: str
|
||
type: str
|
||
definition: str = ""
|
||
canonical_name: str = ""
|
||
aliases: List[str] = None
|
||
|
||
def __post_init__(self):
|
||
if self.aliases is None:
|
||
self.aliases = []
|
||
|
||
@dataclass
|
||
class EntityMention:
|
||
id: str
|
||
entity_id: str
|
||
transcript_id: str
|
||
start_pos: int
|
||
end_pos: int
|
||
text_snippet: str
|
||
confidence: float = 1.0
|
||
|
||
class DatabaseManager:
|
||
def __init__(self, db_path: str = DB_PATH):
|
||
self.db_path = db_path
|
||
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
||
self.init_db()
|
||
|
||
def get_conn(self):
|
||
conn = sqlite3.connect(self.db_path)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
def init_db(self):
|
||
"""初始化数据库表"""
|
||
with open(os.path.join(os.path.dirname(__file__), 'schema.sql'), 'r') as f:
|
||
schema = f.read()
|
||
|
||
conn = self.get_conn()
|
||
conn.executescript(schema)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
# Project operations
|
||
def create_project(self, project_id: str, name: str, description: str = "") -> Project:
|
||
conn = self.get_conn()
|
||
now = datetime.now().isoformat()
|
||
conn.execute(
|
||
"INSERT INTO projects (id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
|
||
(project_id, name, description, now, now)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
return Project(id=project_id, name=name, description=description, created_at=now, updated_at=now)
|
||
|
||
def get_project(self, project_id: str) -> Optional[Project]:
|
||
conn = self.get_conn()
|
||
row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
|
||
conn.close()
|
||
if row:
|
||
return Project(**dict(row))
|
||
return None
|
||
|
||
def list_projects(self) -> List[Project]:
|
||
conn = self.get_conn()
|
||
rows = conn.execute("SELECT * FROM projects ORDER BY updated_at DESC").fetchall()
|
||
conn.close()
|
||
return [Project(**dict(r)) for r in rows]
|
||
|
||
# Entity operations
|
||
def create_entity(self, entity: Entity) -> Entity:
|
||
conn = self.get_conn()
|
||
conn.execute(
|
||
"""INSERT INTO entities (id, project_id, name, canonical_name, type, definition, aliases, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||
(entity.id, entity.project_id, entity.name, entity.canonical_name, entity.type,
|
||
entity.definition, json.dumps(entity.aliases), datetime.now().isoformat(), datetime.now().isoformat())
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
return entity
|
||
|
||
def get_entity_by_name(self, project_id: str, name: str) -> Optional[Entity]:
|
||
"""通过名称查找实体(用于对齐)"""
|
||
conn = self.get_conn()
|
||
row = conn.execute(
|
||
"SELECT * FROM entities WHERE project_id = ? AND (name = ? OR canonical_name = ? OR aliases LIKE ?)",
|
||
(project_id, name, name, f'%"{name}"%')
|
||
).fetchone()
|
||
conn.close()
|
||
if row:
|
||
data = dict(row)
|
||
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
|
||
return Entity(**data)
|
||
return None
|
||
|
||
def find_similar_entities(self, project_id: str, name: str, threshold: float = 0.8) -> List[Entity]:
|
||
"""查找相似实体(简单实现,生产可用 embedding)"""
|
||
# TODO: 使用 embedding 或模糊匹配
|
||
# 现在简单返回包含相同关键词的实体
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"SELECT * FROM entities WHERE project_id = ? AND name LIKE ?",
|
||
(project_id, f"%{name}%")
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
entities = []
|
||
for row in rows:
|
||
data = dict(row)
|
||
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
|
||
entities.append(Entity(**data))
|
||
return entities
|
||
|
||
def merge_entities(self, target_id: str, source_id: str) -> Entity:
|
||
"""合并两个实体(实体对齐)"""
|
||
conn = self.get_conn()
|
||
|
||
# 获取两个实体
|
||
target = conn.execute("SELECT * FROM entities WHERE id = ?", (target_id,)).fetchone()
|
||
source = conn.execute("SELECT * FROM entities WHERE id = ?", (source_id,)).fetchone()
|
||
|
||
if not target or not source:
|
||
conn.close()
|
||
raise ValueError("Entity not found")
|
||
|
||
# 合并别名
|
||
target_aliases = set(json.loads(target['aliases']) if target['aliases'] else [])
|
||
target_aliases.add(source['name'])
|
||
target_aliases.update(json.loads(source['aliases']) if source['aliases'] else [])
|
||
|
||
# 更新目标实体
|
||
conn.execute(
|
||
"UPDATE entities SET aliases = ?, updated_at = ? WHERE id = ?",
|
||
(json.dumps(list(target_aliases)), datetime.now().isoformat(), target_id)
|
||
)
|
||
|
||
# 更新提及记录
|
||
conn.execute(
|
||
"UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?",
|
||
(target_id, source_id)
|
||
)
|
||
|
||
# 更新关系 - source 作为 source_entity_id
|
||
conn.execute(
|
||
"UPDATE entity_relations SET source_entity_id = ? WHERE source_entity_id = ?",
|
||
(target_id, source_id)
|
||
)
|
||
|
||
# 更新关系 - source 作为 target_entity_id
|
||
conn.execute(
|
||
"UPDATE entity_relations SET target_entity_id = ? WHERE target_entity_id = ?",
|
||
(target_id, source_id)
|
||
)
|
||
|
||
# 删除源实体
|
||
conn.execute("DELETE FROM entities WHERE id = ?", (source_id,))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
return self.get_entity(target_id)
|
||
|
||
def get_entity(self, entity_id: str) -> Optional[Entity]:
|
||
conn = self.get_conn()
|
||
row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone()
|
||
conn.close()
|
||
if row:
|
||
data = dict(row)
|
||
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
|
||
return Entity(**data)
|
||
return None
|
||
|
||
def list_project_entities(self, project_id: str) -> List[Entity]:
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"SELECT * FROM entities WHERE project_id = ? ORDER BY updated_at DESC",
|
||
(project_id,)
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
entities = []
|
||
for row in rows:
|
||
data = dict(row)
|
||
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
|
||
entities.append(Entity(**data))
|
||
return entities
|
||
|
||
# Mention operations
|
||
def add_mention(self, mention: EntityMention) -> EntityMention:
|
||
conn = self.get_conn()
|
||
conn.execute(
|
||
"""INSERT INTO entity_mentions (id, entity_id, transcript_id, start_pos, end_pos, text_snippet, confidence)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||
(mention.id, mention.entity_id, mention.transcript_id, mention.start_pos,
|
||
mention.end_pos, mention.text_snippet, mention.confidence)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
return mention
|
||
|
||
def get_entity_mentions(self, entity_id: str) -> List[EntityMention]:
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"SELECT * FROM entity_mentions WHERE entity_id = ? ORDER BY transcript_id, start_pos",
|
||
(entity_id,)
|
||
).fetchall()
|
||
conn.close()
|
||
return [EntityMention(**dict(r)) for r in rows]
|
||
|
||
# Transcript operations
|
||
def save_transcript(self, transcript_id: str, project_id: str, filename: str, full_text: str, transcript_type: str = "audio"):
|
||
"""保存转录记录"""
|
||
conn = self.get_conn()
|
||
now = datetime.now().isoformat()
|
||
conn.execute(
|
||
"INSERT INTO transcripts (id, project_id, filename, full_text, type, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||
(transcript_id, project_id, filename, full_text, transcript_type, now)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def get_transcript(self, transcript_id: str) -> Optional[dict]:
|
||
"""获取转录记录"""
|
||
conn = self.get_conn()
|
||
row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone()
|
||
conn.close()
|
||
if row:
|
||
return dict(row)
|
||
return None
|
||
|
||
def list_project_transcripts(self, project_id: str) -> List[dict]:
|
||
"""列出项目的所有转录"""
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"SELECT * FROM transcripts WHERE project_id = ? ORDER BY created_at DESC",
|
||
(project_id,)
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
# Relation operations
|
||
def create_relation(self, project_id: str, source_entity_id: str, target_entity_id: str,
|
||
relation_type: str = "related", evidence: str = "", transcript_id: str = ""):
|
||
"""创建实体关系"""
|
||
conn = self.get_conn()
|
||
relation_id = str(uuid.uuid4())[:8]
|
||
now = datetime.now().isoformat()
|
||
conn.execute(
|
||
"""INSERT INTO entity_relations
|
||
(id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||
(relation_id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, now)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
return relation_id
|
||
|
||
def get_entity_relations(self, entity_id: str) -> List[dict]:
|
||
"""获取实体的所有关系"""
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"""SELECT * FROM entity_relations
|
||
WHERE source_entity_id = ? OR target_entity_id = ?
|
||
ORDER BY created_at DESC""",
|
||
(entity_id, entity_id)
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
def list_project_relations(self, project_id: str) -> List[dict]:
|
||
"""列出项目的所有关系"""
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"SELECT * FROM entity_relations WHERE project_id = ? ORDER BY created_at DESC",
|
||
(project_id,)
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
def update_entity(self, entity_id: str, **kwargs) -> Entity:
|
||
"""更新实体信息"""
|
||
conn = self.get_conn()
|
||
|
||
# 构建更新字段
|
||
allowed_fields = ['name', 'type', 'definition', 'canonical_name']
|
||
updates = []
|
||
values = []
|
||
|
||
for field in allowed_fields:
|
||
if field in kwargs:
|
||
updates.append(f"{field} = ?")
|
||
values.append(kwargs[field])
|
||
|
||
# 处理别名
|
||
if 'aliases' in kwargs:
|
||
updates.append("aliases = ?")
|
||
values.append(json.dumps(kwargs['aliases']))
|
||
|
||
if not updates:
|
||
conn.close()
|
||
return self.get_entity(entity_id)
|
||
|
||
updates.append("updated_at = ?")
|
||
values.append(datetime.now().isoformat())
|
||
values.append(entity_id)
|
||
|
||
query = f"UPDATE entities SET {', '.join(updates)} WHERE id = ?"
|
||
conn.execute(query, values)
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
return self.get_entity(entity_id)
|
||
|
||
def delete_entity(self, entity_id: str):
|
||
"""删除实体及其关联数据"""
|
||
conn = self.get_conn()
|
||
|
||
# 删除提及记录
|
||
conn.execute("DELETE FROM entity_mentions WHERE entity_id = ?", (entity_id,))
|
||
|
||
# 删除关系
|
||
conn.execute("DELETE FROM entity_relations WHERE source_entity_id = ? OR target_entity_id = ?",
|
||
(entity_id, entity_id))
|
||
|
||
# 删除实体
|
||
conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,))
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def delete_relation(self, relation_id: str):
|
||
"""删除关系"""
|
||
conn = self.get_conn()
|
||
conn.execute("DELETE FROM entity_relations WHERE id = ?", (relation_id,))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
def update_relation(self, relation_id: str, **kwargs) -> dict:
|
||
"""更新关系"""
|
||
conn = self.get_conn()
|
||
|
||
allowed_fields = ['relation_type', 'evidence']
|
||
updates = []
|
||
values = []
|
||
|
||
for field in allowed_fields:
|
||
if field in kwargs:
|
||
updates.append(f"{field} = ?")
|
||
values.append(kwargs[field])
|
||
|
||
if updates:
|
||
query = f"UPDATE entity_relations SET {', '.join(updates)} WHERE id = ?"
|
||
values.append(relation_id)
|
||
conn.execute(query, values)
|
||
conn.commit()
|
||
|
||
row = conn.execute("SELECT * FROM entity_relations WHERE id = ?", (relation_id,)).fetchone()
|
||
conn.close()
|
||
|
||
return dict(row) if row else None
|
||
|
||
def update_transcript(self, transcript_id: str, full_text: str) -> dict:
|
||
"""更新转录文本"""
|
||
conn = self.get_conn()
|
||
now = datetime.now().isoformat()
|
||
|
||
conn.execute(
|
||
"UPDATE transcripts SET full_text = ?, updated_at = ? WHERE id = ?",
|
||
(full_text, now, transcript_id)
|
||
)
|
||
conn.commit()
|
||
|
||
row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone()
|
||
conn.close()
|
||
|
||
return dict(row) if row else None
|
||
|
||
# Phase 3: Glossary operations
|
||
def add_glossary_term(self, project_id: str, term: str, pronunciation: str = "") -> str:
|
||
"""添加术语到术语表"""
|
||
conn = self.get_conn()
|
||
|
||
# 检查是否已存在
|
||
existing = conn.execute(
|
||
"SELECT * FROM glossary WHERE project_id = ? AND term = ?",
|
||
(project_id, term)
|
||
).fetchone()
|
||
|
||
if existing:
|
||
# 更新频率
|
||
conn.execute(
|
||
"UPDATE glossary SET frequency = frequency + 1 WHERE id = ?",
|
||
(existing['id'],)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
return existing['id']
|
||
|
||
term_id = str(uuid.uuid4())[:8]
|
||
conn.execute(
|
||
"INSERT INTO glossary (id, project_id, term, pronunciation, frequency) VALUES (?, ?, ?, ?, ?)",
|
||
(term_id, project_id, term, pronunciation, 1)
|
||
)
|
||
conn.commit()
|
||
conn.close()
|
||
return term_id
|
||
|
||
def list_glossary(self, project_id: str) -> List[dict]:
|
||
"""列出项目术语表"""
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"SELECT * FROM glossary WHERE project_id = ? ORDER BY frequency DESC",
|
||
(project_id,)
|
||
).fetchall()
|
||
conn.close()
|
||
return [dict(r) for r in rows]
|
||
|
||
def delete_glossary_term(self, term_id: str):
|
||
"""删除术语"""
|
||
conn = self.get_conn()
|
||
conn.execute("DELETE FROM glossary WHERE id = ?", (term_id,))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
# Phase 3: Get all entities for embedding
|
||
def get_all_entities_for_embedding(self, project_id: str) -> List[Entity]:
|
||
"""获取所有实体用于 embedding 计算"""
|
||
return self.list_project_entities(project_id)
|
||
|
||
# Phase 4: Agent & Provenance methods
|
||
def get_relation_with_details(self, relation_id: str) -> Optional[dict]:
|
||
"""获取关系详情,包含源文档信息"""
|
||
conn = self.get_conn()
|
||
row = conn.execute(
|
||
"""SELECT r.*,
|
||
s.name as source_name, t.name as target_name,
|
||
tr.filename as transcript_filename, tr.full_text as transcript_text
|
||
FROM entity_relations r
|
||
JOIN entities s ON r.source_entity_id = s.id
|
||
JOIN entities t ON r.target_entity_id = t.id
|
||
LEFT JOIN transcripts tr ON r.transcript_id = tr.id
|
||
WHERE r.id = ?""",
|
||
(relation_id,)
|
||
).fetchone()
|
||
conn.close()
|
||
if row:
|
||
return dict(row)
|
||
return None
|
||
|
||
def get_entity_with_mentions(self, entity_id: str) -> Optional[dict]:
|
||
"""获取实体详情及所有提及位置"""
|
||
conn = self.get_conn()
|
||
|
||
# 获取实体信息
|
||
entity_row = conn.execute(
|
||
"SELECT * FROM entities WHERE id = ?", (entity_id,)
|
||
).fetchone()
|
||
|
||
if not entity_row:
|
||
conn.close()
|
||
return None
|
||
|
||
entity = dict(entity_row)
|
||
entity['aliases'] = json.loads(entity['aliases']) if entity['aliases'] else []
|
||
|
||
# 获取提及位置
|
||
mentions = conn.execute(
|
||
"""SELECT m.*, t.filename, t.created_at as transcript_date
|
||
FROM entity_mentions m
|
||
JOIN transcripts t ON m.transcript_id = t.id
|
||
WHERE m.entity_id = ?
|
||
ORDER BY t.created_at, m.start_pos""",
|
||
(entity_id,)
|
||
).fetchall()
|
||
|
||
entity['mentions'] = [dict(m) for m in mentions]
|
||
entity['mention_count'] = len(mentions)
|
||
|
||
# 获取相关关系
|
||
relations = conn.execute(
|
||
"""SELECT r.*,
|
||
s.name as source_name, t.name as target_name
|
||
FROM entity_relations r
|
||
JOIN entities s ON r.source_entity_id = s.id
|
||
JOIN entities t ON r.target_entity_id = t.id
|
||
WHERE r.source_entity_id = ? OR r.target_entity_id = ?
|
||
ORDER BY r.created_at DESC""",
|
||
(entity_id, entity_id)
|
||
).fetchall()
|
||
|
||
entity['relations'] = [dict(r) for r in relations]
|
||
|
||
conn.close()
|
||
return entity
|
||
|
||
def search_entities(self, project_id: str, query: str) -> List[Entity]:
|
||
"""搜索实体"""
|
||
conn = self.get_conn()
|
||
rows = conn.execute(
|
||
"""SELECT * FROM entities
|
||
WHERE project_id = ? AND
|
||
(name LIKE ? OR definition LIKE ? OR aliases LIKE ?)
|
||
ORDER BY name""",
|
||
(project_id, f'%{query}%', f'%{query}%', f'%{query}%')
|
||
).fetchall()
|
||
conn.close()
|
||
|
||
entities = []
|
||
for row in rows:
|
||
data = dict(row)
|
||
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
|
||
entities.append(Entity(**data))
|
||
return entities
|
||
|
||
def get_project_summary(self, project_id: str) -> dict:
|
||
"""获取项目摘要信息,用于 RAG 上下文"""
|
||
conn = self.get_conn()
|
||
|
||
# 项目基本信息
|
||
project = conn.execute(
|
||
"SELECT * FROM projects WHERE id = ?", (project_id,)
|
||
).fetchone()
|
||
|
||
# 统计信息
|
||
entity_count = conn.execute(
|
||
"SELECT COUNT(*) as count FROM entities WHERE project_id = ?",
|
||
(project_id,)
|
||
).fetchone()['count']
|
||
|
||
transcript_count = conn.execute(
|
||
"SELECT COUNT(*) as count FROM transcripts WHERE project_id = ?",
|
||
(project_id,)
|
||
).fetchone()['count']
|
||
|
||
relation_count = conn.execute(
|
||
"SELECT COUNT(*) as count FROM entity_relations WHERE project_id = ?",
|
||
(project_id,)
|
||
).fetchone()['count']
|
||
|
||
# 获取最近的转录文本片段
|
||
recent_transcripts = conn.execute(
|
||
"""SELECT filename, full_text, created_at
|
||
FROM transcripts
|
||
WHERE project_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 5""",
|
||
(project_id,)
|
||
).fetchall()
|
||
|
||
# 获取高频实体
|
||
top_entities = conn.execute(
|
||
"""SELECT e.name, e.type, e.definition, COUNT(m.id) as mention_count
|
||
FROM entities e
|
||
LEFT JOIN entity_mentions m ON e.id = m.entity_id
|
||
WHERE e.project_id = ?
|
||
GROUP BY e.id
|
||
ORDER BY mention_count DESC
|
||
LIMIT 10""",
|
||
(project_id,)
|
||
).fetchall()
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'project': dict(project) if project else {},
|
||
'statistics': {
|
||
'entity_count': entity_count,
|
||
'transcript_count': transcript_count,
|
||
'relation_count': relation_count
|
||
},
|
||
'recent_transcripts': [dict(t) for t in recent_transcripts],
|
||
'top_entities': [dict(e) for e in top_entities]
|
||
}
|
||
|
||
# Phase 5: Timeline operations
|
||
def get_project_timeline(self, project_id: str, entity_id: str = None, start_date: str = None, end_date: str = None) -> List[dict]:
|
||
"""获取项目时间线数据 - 按时间顺序的实体提及和事件"""
|
||
conn = self.get_conn()
|
||
|
||
# 构建查询条件
|
||
conditions = ["t.project_id = ?"]
|
||
params = [project_id]
|
||
|
||
if entity_id:
|
||
conditions.append("m.entity_id = ?")
|
||
params.append(entity_id)
|
||
|
||
if start_date:
|
||
conditions.append("t.created_at >= ?")
|
||
params.append(start_date)
|
||
|
||
if end_date:
|
||
conditions.append("t.created_at <= ?")
|
||
params.append(end_date)
|
||
|
||
where_clause = " AND ".join(conditions)
|
||
|
||
# 获取实体提及时间线
|
||
mentions = conn.execute(
|
||
f"""SELECT m.*, e.name as entity_name, e.type as entity_type, e.definition,
|
||
t.filename, t.created_at as event_date, t.type as source_type
|
||
FROM entity_mentions m
|
||
JOIN entities e ON m.entity_id = e.id
|
||
JOIN transcripts t ON m.transcript_id = t.id
|
||
WHERE {where_clause}
|
||
ORDER BY t.created_at, m.start_pos""",
|
||
params
|
||
).fetchall()
|
||
|
||
# 获取关系创建时间线
|
||
relation_conditions = ["r.project_id = ?"]
|
||
relation_params = [project_id]
|
||
|
||
if start_date:
|
||
relation_conditions.append("r.created_at >= ?")
|
||
relation_params.append(start_date)
|
||
|
||
if end_date:
|
||
relation_conditions.append("r.created_at <= ?")
|
||
relation_params.append(end_date)
|
||
|
||
relation_where = " AND ".join(relation_conditions)
|
||
|
||
relations = conn.execute(
|
||
f"""SELECT r.*,
|
||
s.name as source_name, t.name as target_name,
|
||
tr.filename, r.created_at as event_date
|
||
FROM entity_relations r
|
||
JOIN entities s ON r.source_entity_id = s.id
|
||
JOIN entities t ON r.target_entity_id = t.id
|
||
LEFT JOIN transcripts tr ON r.transcript_id = tr.id
|
||
WHERE {relation_where}
|
||
ORDER BY r.created_at""",
|
||
relation_params
|
||
).fetchall()
|
||
|
||
conn.close()
|
||
|
||
# 合并并格式化时间线事件
|
||
timeline_events = []
|
||
|
||
for m in mentions:
|
||
timeline_events.append({
|
||
'id': m['id'],
|
||
'type': 'mention',
|
||
'event_date': m['event_date'],
|
||
'entity_id': m['entity_id'],
|
||
'entity_name': m['entity_name'],
|
||
'entity_type': m['entity_type'],
|
||
'entity_definition': m['definition'],
|
||
'text_snippet': m['text_snippet'],
|
||
'confidence': m['confidence'],
|
||
'source': {
|
||
'transcript_id': m['transcript_id'],
|
||
'filename': m['filename'],
|
||
'type': m['source_type']
|
||
}
|
||
})
|
||
|
||
for r in relations:
|
||
timeline_events.append({
|
||
'id': r['id'],
|
||
'type': 'relation',
|
||
'event_date': r['event_date'],
|
||
'relation_type': r['relation_type'],
|
||
'source_entity': r['source_name'],
|
||
'target_entity': r['target_name'],
|
||
'evidence': r['evidence'],
|
||
'source': {
|
||
'transcript_id': r.get('transcript_id'),
|
||
'filename': r['filename']
|
||
}
|
||
})
|
||
|
||
# 按时间排序
|
||
timeline_events.sort(key=lambda x: x['event_date'])
|
||
|
||
return timeline_events
|
||
|
||
def get_entity_timeline_summary(self, project_id: str) -> dict:
|
||
"""获取项目实体时间线摘要统计"""
|
||
conn = self.get_conn()
|
||
|
||
# 按日期统计提及数量
|
||
daily_stats = conn.execute(
|
||
"""SELECT DATE(t.created_at) as date, COUNT(*) as count
|
||
FROM entity_mentions m
|
||
JOIN transcripts t ON m.transcript_id = t.id
|
||
WHERE t.project_id = ?
|
||
GROUP BY DATE(t.created_at)
|
||
ORDER BY date""",
|
||
(project_id,)
|
||
).fetchall()
|
||
|
||
# 按实体统计提及数量
|
||
entity_stats = conn.execute(
|
||
"""SELECT e.name, e.type, COUNT(m.id) as mention_count,
|
||
MIN(t.created_at) as first_mentioned,
|
||
MAX(t.created_at) as last_mentioned
|
||
FROM entities e
|
||
LEFT JOIN entity_mentions m ON e.id = m.entity_id
|
||
LEFT JOIN transcripts t ON m.transcript_id = t.id
|
||
WHERE e.project_id = ?
|
||
GROUP BY e.id
|
||
ORDER BY mention_count DESC
|
||
LIMIT 20""",
|
||
(project_id,)
|
||
).fetchall()
|
||
|
||
# 获取活跃时间段
|
||
active_periods = conn.execute(
|
||
"""SELECT
|
||
DATE(t.created_at) as date,
|
||
COUNT(DISTINCT m.entity_id) as active_entities,
|
||
COUNT(m.id) as total_mentions
|
||
FROM transcripts t
|
||
LEFT JOIN entity_mentions m ON t.id = m.transcript_id
|
||
WHERE t.project_id = ?
|
||
GROUP BY DATE(t.created_at)
|
||
ORDER BY date""",
|
||
(project_id,)
|
||
).fetchall()
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'daily_activity': [dict(d) for d in daily_stats],
|
||
'top_entities': [dict(e) for e in entity_stats],
|
||
'active_periods': [dict(a) for a in active_periods]
|
||
}
|
||
|
||
def get_transcript_context(self, transcript_id: str, position: int, context_chars: int = 200) -> str:
|
||
"""获取转录文本的上下文"""
|
||
conn = self.get_conn()
|
||
row = conn.execute(
|
||
"SELECT full_text FROM transcripts WHERE id = ?",
|
||
(transcript_id,)
|
||
).fetchone()
|
||
conn.close()
|
||
|
||
if not row:
|
||
return ""
|
||
|
||
text = row['full_text']
|
||
start = max(0, position - context_chars)
|
||
end = min(len(text), position + context_chars)
|
||
return text[start:end]
|
||
|
||
|
||
# Singleton instance
|
||
_db_manager = None
|
||
|
||
|
||
def get_db_manager() -> DatabaseManager:
|
||
global _db_manager
|
||
if _db_manager is None:
|
||
_db_manager = DatabaseManager()
|
||
return _db_manager
|