Files
insightflow/backend/db_manager.py
OpenClaw Bot 797ca58e8e Phase 7 Task 7: 插件与集成系统
- 创建 plugin_manager.py 模块
  - PluginManager: 插件管理主类
  - ChromeExtensionHandler: Chrome 插件处理
  - BotHandler: 飞书/钉钉/Slack 机器人处理
  - WebhookIntegration: Zapier/Make Webhook 集成
  - WebDAVSync: WebDAV 同步管理

- 创建完整的 Chrome 扩展代码
  - manifest.json, background.js, content.js, content.css
  - popup.html/js: 弹出窗口界面
  - options.html/js: 设置页面
  - 支持网页剪藏、选中文本保存、项目选择

- 更新 schema.sql 添加插件相关数据库表
  - plugins: 插件配置表
  - bot_sessions: 机器人会话表
  - webhook_endpoints: Webhook 端点表
  - webdav_syncs: WebDAV 同步配置表
  - plugin_activity_logs: 插件活动日志表

- 更新 main.py 添加插件相关 API 端点
  - GET/POST /api/v1/plugins - 插件管理
  - POST /api/v1/plugins/chrome/clip - Chrome 插件保存网页
  - POST /api/v1/bots/webhook/{platform} - 接收机器人消息
  - GET /api/v1/bots/sessions - 机器人会话列表
  - POST /api/v1/webhook-endpoints - 创建 Webhook 端点
  - POST /webhook/{type}/{token} - 接收外部 Webhook
  - POST /api/v1/webdav-syncs - WebDAV 同步配置
  - POST /api/v1/webdav-syncs/{id}/test - 测试 WebDAV 连接
  - POST /api/v1/webdav-syncs/{id}/sync - 触发 WebDAV 同步

- 更新 requirements.txt 添加插件依赖
  - beautifulsoup4: HTML 解析
  - webdavclient3: WebDAV 客户端

- 更新 STATUS.md 和 README.md 开发进度
2026-02-23 12:09:15 +08:00

1194 lines
45 KiB
Python

#!/usr/bin/env python3
"""
InsightFlow Database Manager - Phase 5
处理项目、实体、关系的持久化
支持实体属性扩展
"""
import os
import json
import sqlite3
import uuid
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db")
@dataclass
class Project:
id: str
name: str
description: str = ""
created_at: str = ""
updated_at: str = ""
@dataclass
class Entity:
id: str
project_id: str
name: str
type: str
definition: str = ""
canonical_name: str = ""
aliases: List[str] = None
embedding: str = "" # Phase 3: 实体嵌入向量
attributes: Dict = None # Phase 5: 实体属性
created_at: str = ""
updated_at: str = ""
def __post_init__(self):
if self.aliases is None:
self.aliases = []
if self.attributes is None:
self.attributes = {}
@dataclass
class AttributeTemplate:
"""属性模板定义"""
id: str
project_id: str
name: str
type: str # text, number, date, select, multiselect, boolean
options: List[str] = None # 用于 select/multiselect
default_value: str = ""
description: str = ""
is_required: bool = False
sort_order: int = 0
created_at: str = ""
updated_at: str = ""
def __post_init__(self):
if self.options is None:
self.options = []
@dataclass
class EntityAttribute:
"""实体属性值"""
id: str
entity_id: str
template_id: Optional[str] = None
name: str = "" # 属性名称
type: str = "text" # 属性类型
value: str = ""
options: List[str] = None # 选项列表
template_name: str = "" # 关联查询时填充
template_type: str = "" # 关联查询时填充
created_at: str = ""
updated_at: str = ""
def __post_init__(self):
if self.options is None:
self.options = []
@dataclass
class AttributeHistory:
"""属性变更历史"""
id: str
entity_id: str
attribute_name: str = "" # 属性名称
old_value: str = ""
new_value: str = ""
changed_by: str = ""
changed_at: str = ""
change_reason: str = ""
change_reason: str = ""
@dataclass
class EntityMention:
id: str
entity_id: str
transcript_id: str
start_pos: int
end_pos: int
text_snippet: str
confidence: float = 1.0
class DatabaseManager:
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.init_db()
def get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(self):
"""初始化数据库表"""
with open(os.path.join(os.path.dirname(__file__), 'schema.sql'), 'r') as f:
schema = f.read()
conn = self.get_conn()
conn.executescript(schema)
conn.commit()
conn.close()
# ==================== Project Operations ====================
def create_project(self, project_id: str, name: str, description: str = "") -> Project:
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"INSERT INTO projects (id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
(project_id, name, description, now, now)
)
conn.commit()
conn.close()
return Project(id=project_id, name=name, description=description, created_at=now, updated_at=now)
def get_project(self, project_id: str) -> Optional[Project]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
conn.close()
if row:
return Project(**dict(row))
return None
def list_projects(self) -> List[Project]:
conn = self.get_conn()
rows = conn.execute("SELECT * FROM projects ORDER BY updated_at DESC").fetchall()
conn.close()
return [Project(**dict(r)) for r in rows]
# ==================== Entity Operations ====================
def create_entity(self, entity: Entity) -> Entity:
conn = self.get_conn()
conn.execute(
"""INSERT INTO entities (id, project_id, name, canonical_name, type, definition, aliases, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(entity.id, entity.project_id, entity.name, entity.canonical_name, entity.type,
entity.definition, json.dumps(entity.aliases), datetime.now().isoformat(), datetime.now().isoformat())
)
conn.commit()
conn.close()
return entity
def get_entity_by_name(self, project_id: str, name: str) -> Optional[Entity]:
"""通过名称查找实体(用于对齐)"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM entities WHERE project_id = ? AND (name = ? OR canonical_name = ? OR aliases LIKE ?)",
(project_id, name, name, f'%"{name}"%')
).fetchone()
conn.close()
if row:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
return Entity(**data)
return None
def find_similar_entities(self, project_id: str, name: str, threshold: float = 0.8) -> List[Entity]:
"""查找相似实体"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entities WHERE project_id = ? AND name LIKE ?",
(project_id, f"%{name}%")
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
def merge_entities(self, target_id: str, source_id: str) -> Entity:
"""合并两个实体"""
conn = self.get_conn()
target = conn.execute("SELECT * FROM entities WHERE id = ?", (target_id,)).fetchone()
source = conn.execute("SELECT * FROM entities WHERE id = ?", (source_id,)).fetchone()
if not target or not source:
conn.close()
raise ValueError("Entity not found")
target_aliases = set(json.loads(target['aliases']) if target['aliases'] else [])
target_aliases.add(source['name'])
target_aliases.update(json.loads(source['aliases']) if source['aliases'] else [])
conn.execute(
"UPDATE entities SET aliases = ?, updated_at = ? WHERE id = ?",
(json.dumps(list(target_aliases)), datetime.now().isoformat(), target_id)
)
conn.execute("UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?", (target_id, source_id))
conn.execute("UPDATE entity_relations SET source_entity_id = ? WHERE source_entity_id = ?", (target_id, source_id))
conn.execute("UPDATE entity_relations SET target_entity_id = ? WHERE target_entity_id = ?", (target_id, source_id))
conn.execute("DELETE FROM entities WHERE id = ?", (source_id,))
conn.commit()
conn.close()
return self.get_entity(target_id)
def get_entity(self, entity_id: str) -> Optional[Entity]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone()
conn.close()
if row:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
return Entity(**data)
return None
def list_project_entities(self, project_id: str) -> List[Entity]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entities WHERE project_id = ? ORDER BY updated_at DESC",
(project_id,)
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
def update_entity(self, entity_id: str, **kwargs) -> Entity:
"""更新实体信息"""
conn = self.get_conn()
allowed_fields = ['name', 'type', 'definition', 'canonical_name']
updates = []
values = []
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
values.append(kwargs[field])
if 'aliases' in kwargs:
updates.append("aliases = ?")
values.append(json.dumps(kwargs['aliases']))
if not updates:
conn.close()
return self.get_entity(entity_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(entity_id)
query = f"UPDATE entities SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_entity(entity_id)
def delete_entity(self, entity_id: str):
"""删除实体及其关联数据"""
conn = self.get_conn()
conn.execute("DELETE FROM entity_mentions WHERE entity_id = ?", (entity_id,))
conn.execute("DELETE FROM entity_relations WHERE source_entity_id = ? OR target_entity_id = ?", (entity_id, entity_id))
conn.execute("DELETE FROM entity_attributes WHERE entity_id = ?", (entity_id,))
conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,))
conn.commit()
conn.close()
# ==================== Mention Operations ====================
def add_mention(self, mention: EntityMention) -> EntityMention:
conn = self.get_conn()
conn.execute(
"""INSERT INTO entity_mentions (id, entity_id, transcript_id, start_pos, end_pos, text_snippet, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(mention.id, mention.entity_id, mention.transcript_id, mention.start_pos,
mention.end_pos, mention.text_snippet, mention.confidence)
)
conn.commit()
conn.close()
return mention
def get_entity_mentions(self, entity_id: str) -> List[EntityMention]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entity_mentions WHERE entity_id = ? ORDER BY transcript_id, start_pos",
(entity_id,)
).fetchall()
conn.close()
return [EntityMention(**dict(r)) for r in rows]
# ==================== Transcript Operations ====================
def save_transcript(self, transcript_id: str, project_id: str, filename: str, full_text: str, transcript_type: str = "audio"):
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"INSERT INTO transcripts (id, project_id, filename, full_text, type, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(transcript_id, project_id, filename, full_text, transcript_type, now)
)
conn.commit()
conn.close()
def get_transcript(self, transcript_id: str) -> Optional[dict]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone()
conn.close()
return dict(row) if row else None
def list_project_transcripts(self, project_id: str) -> List[dict]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM transcripts WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def update_transcript(self, transcript_id: str, full_text: str) -> dict:
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"UPDATE transcripts SET full_text = ?, updated_at = ? WHERE id = ?",
(full_text, now, transcript_id)
)
conn.commit()
row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone()
conn.close()
return dict(row) if row else None
# ==================== Relation Operations ====================
def create_relation(self, project_id: str, source_entity_id: str, target_entity_id: str,
relation_type: str = "related", evidence: str = "", transcript_id: str = ""):
conn = self.get_conn()
relation_id = str(uuid.uuid4())[:8]
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO entity_relations
(id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(relation_id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, now)
)
conn.commit()
conn.close()
return relation_id
def get_entity_relations(self, entity_id: str) -> List[dict]:
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM entity_relations
WHERE source_entity_id = ? OR target_entity_id = ?
ORDER BY created_at DESC""",
(entity_id, entity_id)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_project_relations(self, project_id: str) -> List[dict]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entity_relations WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def update_relation(self, relation_id: str, **kwargs) -> dict:
conn = self.get_conn()
allowed_fields = ['relation_type', 'evidence']
updates = []
values = []
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
values.append(kwargs[field])
if updates:
query = f"UPDATE entity_relations SET {', '.join(updates)} WHERE id = ?"
values.append(relation_id)
conn.execute(query, values)
conn.commit()
row = conn.execute("SELECT * FROM entity_relations WHERE id = ?", (relation_id,)).fetchone()
conn.close()
return dict(row) if row else None
def delete_relation(self, relation_id: str):
conn = self.get_conn()
conn.execute("DELETE FROM entity_relations WHERE id = ?", (relation_id,))
conn.commit()
conn.close()
# ==================== Glossary Operations ====================
def add_glossary_term(self, project_id: str, term: str, pronunciation: str = "") -> str:
conn = self.get_conn()
existing = conn.execute(
"SELECT * FROM glossary WHERE project_id = ? AND term = ?",
(project_id, term)
).fetchone()
if existing:
conn.execute("UPDATE glossary SET frequency = frequency + 1 WHERE id = ?", (existing['id'],))
conn.commit()
conn.close()
return existing['id']
term_id = str(uuid.uuid4())[:8]
conn.execute(
"INSERT INTO glossary (id, project_id, term, pronunciation, frequency) VALUES (?, ?, ?, ?, ?)",
(term_id, project_id, term, pronunciation, 1)
)
conn.commit()
conn.close()
return term_id
def list_glossary(self, project_id: str) -> List[dict]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM glossary WHERE project_id = ? ORDER BY frequency DESC",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def delete_glossary_term(self, term_id: str):
conn = self.get_conn()
conn.execute("DELETE FROM glossary WHERE id = ?", (term_id,))
conn.commit()
conn.close()
# ==================== Phase 4: Agent & Provenance ====================
def get_relation_with_details(self, relation_id: str) -> Optional[dict]:
conn = self.get_conn()
row = conn.execute(
"""SELECT r.*,
s.name as source_name, t.name as target_name,
tr.filename as transcript_filename, tr.full_text as transcript_text
FROM entity_relations r
JOIN entities s ON r.source_entity_id = s.id
JOIN entities t ON r.target_entity_id = t.id
LEFT JOIN transcripts tr ON r.transcript_id = tr.id
WHERE r.id = ?""",
(relation_id,)
).fetchone()
conn.close()
return dict(row) if row else None
def get_entity_with_mentions(self, entity_id: str) -> Optional[dict]:
conn = self.get_conn()
entity_row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone()
if not entity_row:
conn.close()
return None
entity = dict(entity_row)
entity['aliases'] = json.loads(entity['aliases']) if entity['aliases'] else []
mentions = conn.execute(
"""SELECT m.*, t.filename, t.created_at as transcript_date
FROM entity_mentions m
JOIN transcripts t ON m.transcript_id = t.id
WHERE m.entity_id = ? ORDER BY t.created_at, m.start_pos""",
(entity_id,)
).fetchall()
entity['mentions'] = [dict(m) for m in mentions]
entity['mention_count'] = len(mentions)
relations = conn.execute(
"""SELECT r.*, s.name as source_name, t.name as target_name
FROM entity_relations r
JOIN entities s ON r.source_entity_id = s.id
JOIN entities t ON r.target_entity_id = t.id
WHERE r.source_entity_id = ? OR r.target_entity_id = ?
ORDER BY r.created_at DESC""",
(entity_id, entity_id)
).fetchall()
entity['relations'] = [dict(r) for r in relations]
conn.close()
return entity
def search_entities(self, project_id: str, query: str) -> List[Entity]:
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM entities
WHERE project_id = ? AND
(name LIKE ? OR definition LIKE ? OR aliases LIKE ?)
ORDER BY name""",
(project_id, f'%{query}%', f'%{query}%', f'%{query}%')
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
def get_project_summary(self, project_id: str) -> dict:
conn = self.get_conn()
project = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
entity_count = conn.execute(
"SELECT COUNT(*) as count FROM entities WHERE project_id = ?", (project_id,)
).fetchone()['count']
transcript_count = conn.execute(
"SELECT COUNT(*) as count FROM transcripts WHERE project_id = ?", (project_id,)
).fetchone()['count']
relation_count = conn.execute(
"SELECT COUNT(*) as count FROM entity_relations WHERE project_id = ?", (project_id,)
).fetchone()['count']
recent_transcripts = conn.execute(
"""SELECT filename, full_text, created_at FROM transcripts
WHERE project_id = ? ORDER BY created_at DESC LIMIT 5""",
(project_id,)
).fetchall()
top_entities = conn.execute(
"""SELECT e.name, e.type, e.definition, COUNT(m.id) as mention_count
FROM entities e
LEFT JOIN entity_mentions m ON e.id = m.entity_id
WHERE e.project_id = ?
GROUP BY e.id ORDER BY mention_count DESC LIMIT 10""",
(project_id,)
).fetchall()
conn.close()
return {
'project': dict(project) if project else {},
'statistics': {
'entity_count': entity_count,
'transcript_count': transcript_count,
'relation_count': relation_count
},
'recent_transcripts': [dict(t) for t in recent_transcripts],
'top_entities': [dict(e) for e in top_entities]
}
def get_transcript_context(self, transcript_id: str, position: int, context_chars: int = 200) -> str:
conn = self.get_conn()
row = conn.execute("SELECT full_text FROM transcripts WHERE id = ?", (transcript_id,)).fetchone()
conn.close()
if not row:
return ""
text = row['full_text']
start = max(0, position - context_chars)
end = min(len(text), position + context_chars)
return text[start:end]
# ==================== Phase 5: Timeline Operations ====================
def get_project_timeline(self, project_id: str, entity_id: str = None, start_date: str = None, end_date: str = None) -> List[dict]:
conn = self.get_conn()
conditions = ["t.project_id = ?"]
params = [project_id]
if entity_id:
conditions.append("m.entity_id = ?")
params.append(entity_id)
if start_date:
conditions.append("t.created_at >= ?")
params.append(start_date)
if end_date:
conditions.append("t.created_at <= ?")
params.append(end_date)
where_clause = " AND ".join(conditions)
mentions = conn.execute(
f"""SELECT m.*, e.name as entity_name, e.type as entity_type, e.definition,
t.filename, t.created_at as event_date, t.type as source_type
FROM entity_mentions m
JOIN entities e ON m.entity_id = e.id
JOIN transcripts t ON m.transcript_id = t.id
WHERE {where_clause} ORDER BY t.created_at, m.start_pos""",
params
).fetchall()
timeline_events = []
for m in mentions:
timeline_events.append({
'id': m['id'],
'type': 'mention',
'event_date': m['event_date'],
'entity_id': m['entity_id'],
'entity_name': m['entity_name'],
'entity_type': m['entity_type'],
'text_snippet': m['text_snippet'],
'confidence': m['confidence'],
'source': {'transcript_id': m['transcript_id'], 'filename': m['filename'], 'type': m['source_type']}
})
conn.close()
timeline_events.sort(key=lambda x: x['event_date'])
return timeline_events
def get_entity_timeline_summary(self, project_id: str) -> dict:
conn = self.get_conn()
daily_stats = conn.execute(
"""SELECT DATE(t.created_at) as date, COUNT(*) as count
FROM entity_mentions m
JOIN transcripts t ON m.transcript_id = t.id
WHERE t.project_id = ? GROUP BY DATE(t.created_at) ORDER BY date""",
(project_id,)
).fetchall()
entity_stats = conn.execute(
"""SELECT e.name, e.type, COUNT(m.id) as mention_count,
MIN(t.created_at) as first_mentioned,
MAX(t.created_at) as last_mentioned
FROM entities e
LEFT JOIN entity_mentions m ON e.id = m.entity_id
LEFT JOIN transcripts t ON m.transcript_id = t.id
WHERE e.project_id = ?
GROUP BY e.id ORDER BY mention_count DESC LIMIT 20""",
(project_id,)
).fetchall()
conn.close()
return {
'daily_activity': [dict(d) for d in daily_stats],
'top_entities': [dict(e) for e in entity_stats]
}
# ==================== Phase 5: Entity Attributes ====================
def create_attribute_template(self, template: AttributeTemplate) -> AttributeTemplate:
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO attribute_templates
(id, project_id, name, type, options, default_value, description, is_required, sort_order, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(template.id, template.project_id, template.name, template.type,
json.dumps(template.options) if template.options else None,
template.default_value, template.description, template.is_required,
template.sort_order, now, now)
)
conn.commit()
conn.close()
return template
def get_attribute_template(self, template_id: str) -> Optional[AttributeTemplate]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM attribute_templates WHERE id = ?", (template_id,)).fetchone()
conn.close()
if row:
data = dict(row)
data['options'] = json.loads(data['options']) if data['options'] else []
return AttributeTemplate(**data)
return None
def list_attribute_templates(self, project_id: str) -> List[AttributeTemplate]:
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM attribute_templates WHERE project_id = ?
ORDER BY sort_order, created_at""",
(project_id,)
).fetchall()
conn.close()
templates = []
for row in rows:
data = dict(row)
data['options'] = json.loads(data['options']) if data['options'] else []
templates.append(AttributeTemplate(**data))
return templates
def update_attribute_template(self, template_id: str, **kwargs) -> Optional[AttributeTemplate]:
conn = self.get_conn()
allowed_fields = ['name', 'type', 'options', 'default_value', 'description', 'is_required', 'sort_order']
updates = []
values = []
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
if field == 'options':
values.append(json.dumps(kwargs[field]) if kwargs[field] else None)
else:
values.append(kwargs[field])
if updates:
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(template_id)
query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_attribute_template(template_id)
def delete_attribute_template(self, template_id: str):
conn = self.get_conn()
conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,))
conn.commit()
conn.close()
def set_entity_attribute(self, attr: EntityAttribute, changed_by: str = "system", change_reason: str = "") -> EntityAttribute:
conn = self.get_conn()
now = datetime.now().isoformat()
old_row = conn.execute(
"SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(attr.entity_id, attr.template_id)
).fetchone()
old_value = old_row['value'] if old_row else None
if old_value != attr.value:
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], attr.entity_id, attr.template_id,
old_value, attr.value, changed_by, now, change_reason)
)
conn.execute(
"""INSERT OR REPLACE INTO entity_attributes
(id, entity_id, template_id, value, created_at, updated_at)
VALUES (
COALESCE((SELECT id FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?),
?, ?, ?,
COALESCE((SELECT created_at FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?),
?)""",
(attr.entity_id, attr.template_id, attr.id,
attr.entity_id, attr.template_id, attr.value,
attr.entity_id, attr.template_id, now, now)
)
conn.commit()
conn.close()
return attr
def get_entity_attributes(self, entity_id: str) -> List[EntityAttribute]:
conn = self.get_conn()
rows = conn.execute(
"""SELECT ea.*, at.name as template_name, at.type as template_type
FROM entity_attributes ea
LEFT JOIN attribute_templates at ON ea.template_id = at.id
WHERE ea.entity_id = ? ORDER BY ea.created_at""",
(entity_id,)
).fetchall()
conn.close()
return [EntityAttribute(**dict(r)) for r in rows]
def get_entity_with_attributes(self, entity_id: str) -> Optional[Entity]:
entity = self.get_entity(entity_id)
if not entity:
return None
attrs = self.get_entity_attributes(entity_id)
entity.attributes = {
attr.template_name: {'value': attr.value, 'type': attr.template_type, 'template_id': attr.template_id}
for attr in attrs
}
return entity
def delete_entity_attribute(self, entity_id: str, template_id: str, changed_by: str = "system", change_reason: str = ""):
conn = self.get_conn()
old_row = conn.execute(
"SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(entity_id, template_id)
).fetchone()
if old_row:
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], entity_id, template_id,
old_row['value'], None, changed_by, datetime.now().isoformat(), change_reason or "属性删除")
)
conn.execute(
"DELETE FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(entity_id, template_id)
)
conn.commit()
conn.close()
def get_attribute_history(self, entity_id: str = None, template_id: str = None, limit: int = 50) -> List[AttributeHistory]:
conn = self.get_conn()
conditions = []
params = []
if entity_id:
conditions.append("ah.entity_id = ?")
params.append(entity_id)
if template_id:
conditions.append("ah.template_id = ?")
params.append(template_id)
where_clause = " AND ".join(conditions) if conditions else "1=1"
rows = conn.execute(
f"""SELECT ah.*
FROM attribute_history ah
WHERE {where_clause}
ORDER BY ah.changed_at DESC LIMIT ?""",
params + [limit]
).fetchall()
conn.close()
return [AttributeHistory(**dict(r)) for r in rows]
def search_entities_by_attributes(self, project_id: str, attribute_filters: Dict[str, str]) -> List[Entity]:
entities = self.list_project_entities(project_id)
if not attribute_filters:
return entities
entity_ids = [e.id for e in entities]
if not entity_ids:
return []
conn = self.get_conn()
placeholders = ','.join(['?' for _ in entity_ids])
rows = conn.execute(
f"""SELECT ea.*, at.name as template_name
FROM entity_attributes ea
JOIN attribute_templates at ON ea.template_id = at.id
WHERE ea.entity_id IN ({placeholders})""",
entity_ids
).fetchall()
conn.close()
entity_attrs = {}
for row in rows:
eid = row['entity_id']
if eid not in entity_attrs:
entity_attrs[eid] = {}
entity_attrs[eid][row['template_name']] = row['value']
filtered = []
for entity in entities:
attrs = entity_attrs.get(entity.id, {})
match = True
for attr_name, attr_value in attribute_filters.items():
if attrs.get(attr_name) != attr_value:
match = False
break
if match:
entity.attributes = attrs
filtered.append(entity)
return filtered
# ==================== Phase 7: Multimodal Support ====================
def create_video(self, video_id: str, project_id: str, filename: str,
duration: float = 0, fps: float = 0, resolution: Dict = None,
audio_transcript_id: str = None, full_ocr_text: str = "",
extracted_entities: List[Dict] = None,
extracted_relations: List[Dict] = None) -> str:
"""创建视频记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO videos
(id, project_id, filename, duration, fps, resolution,
audio_transcript_id, full_ocr_text, extracted_entities,
extracted_relations, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(video_id, project_id, filename, duration, fps,
json.dumps(resolution) if resolution else None,
audio_transcript_id, full_ocr_text,
json.dumps(extracted_entities or []),
json.dumps(extracted_relations or []),
'completed', now, now)
)
conn.commit()
conn.close()
return video_id
def get_video(self, video_id: str) -> Optional[Dict]:
"""获取视频信息"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM videos WHERE id = ?", (video_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
data['resolution'] = json.loads(data['resolution']) if data['resolution'] else None
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
return data
return None
def list_project_videos(self, project_id: str) -> List[Dict]:
"""获取项目的所有视频"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM videos WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
videos = []
for row in rows:
data = dict(row)
data['resolution'] = json.loads(data['resolution']) if data['resolution'] else None
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
videos.append(data)
return videos
def create_video_frame(self, frame_id: str, video_id: str, frame_number: int,
timestamp: float, image_url: str = None,
ocr_text: str = None, extracted_entities: List[Dict] = None) -> str:
"""创建视频帧记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO video_frames
(id, video_id, frame_number, timestamp, image_url, ocr_text, extracted_entities, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(frame_id, video_id, frame_number, timestamp, image_url, ocr_text,
json.dumps(extracted_entities or []), now)
)
conn.commit()
conn.close()
return frame_id
def get_video_frames(self, video_id: str) -> List[Dict]:
"""获取视频的所有帧"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM video_frames WHERE video_id = ? ORDER BY timestamp""",
(video_id,)
).fetchall()
conn.close()
frames = []
for row in rows:
data = dict(row)
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
frames.append(data)
return frames
def create_image(self, image_id: str, project_id: str, filename: str,
ocr_text: str = "", description: str = "",
extracted_entities: List[Dict] = None,
extracted_relations: List[Dict] = None) -> str:
"""创建图片记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO images
(id, project_id, filename, ocr_text, description,
extracted_entities, extracted_relations, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(image_id, project_id, filename, ocr_text, description,
json.dumps(extracted_entities or []),
json.dumps(extracted_relations or []),
'completed', now, now)
)
conn.commit()
conn.close()
return image_id
def get_image(self, image_id: str) -> Optional[Dict]:
"""获取图片信息"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM images WHERE id = ?", (image_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
return data
return None
def list_project_images(self, project_id: str) -> List[Dict]:
"""获取项目的所有图片"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM images WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
images = []
for row in rows:
data = dict(row)
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
images.append(data)
return images
def create_multimodal_mention(self, mention_id: str, project_id: str,
entity_id: str, modality: str, source_id: str,
source_type: str, text_snippet: str = "",
confidence: float = 1.0) -> str:
"""创建多模态实体提及记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT OR REPLACE INTO multimodal_mentions
(id, project_id, entity_id, modality, source_id, source_type,
text_snippet, confidence, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(mention_id, project_id, entity_id, modality, source_id,
source_type, text_snippet, confidence, now)
)
conn.commit()
conn.close()
return mention_id
def get_entity_multimodal_mentions(self, entity_id: str) -> List[Dict]:
"""获取实体的多模态提及"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT m.*, e.name as entity_name
FROM multimodal_mentions m
JOIN entities e ON m.entity_id = e.id
WHERE m.entity_id = ? ORDER BY m.created_at DESC""",
(entity_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_project_multimodal_mentions(self, project_id: str,
modality: str = None) -> List[Dict]:
"""获取项目的多模态提及"""
conn = self.get_conn()
if modality:
rows = conn.execute(
"""SELECT m.*, e.name as entity_name
FROM multimodal_mentions m
JOIN entities e ON m.entity_id = e.id
WHERE m.project_id = ? AND m.modality = ?
ORDER BY m.created_at DESC""",
(project_id, modality)
).fetchall()
else:
rows = conn.execute(
"""SELECT m.*, e.name as entity_name
FROM multimodal_mentions m
JOIN entities e ON m.entity_id = e.id
WHERE m.project_id = ? ORDER BY m.created_at DESC""",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def create_multimodal_entity_link(self, link_id: str, entity_id: str,
linked_entity_id: str, link_type: str,
confidence: float = 1.0,
evidence: str = "",
modalities: List[str] = None) -> str:
"""创建多模态实体关联"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT OR REPLACE INTO multimodal_entity_links
(id, entity_id, linked_entity_id, link_type, confidence,
evidence, modalities, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(link_id, entity_id, linked_entity_id, link_type, confidence,
evidence, json.dumps(modalities or []), now)
)
conn.commit()
conn.close()
return link_id
def get_entity_multimodal_links(self, entity_id: str) -> List[Dict]:
"""获取实体的多模态关联"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT l.*, e1.name as entity_name, e2.name as linked_entity_name
FROM multimodal_entity_links l
JOIN entities e1 ON l.entity_id = e1.id
JOIN entities e2 ON l.linked_entity_id = e2.id
WHERE l.entity_id = ? OR l.linked_entity_id = ?""",
(entity_id, entity_id)
).fetchall()
conn.close()
links = []
for row in rows:
data = dict(row)
data['modalities'] = json.loads(data['modalities']) if data['modalities'] else []
links.append(data)
return links
def get_project_multimodal_stats(self, project_id: str) -> Dict:
"""获取项目多模态统计信息"""
conn = self.get_conn()
stats = {
'video_count': 0,
'image_count': 0,
'multimodal_entity_count': 0,
'cross_modal_links': 0,
'modality_distribution': {}
}
# 视频数量
row = conn.execute(
"SELECT COUNT(*) as count FROM videos WHERE project_id = ?",
(project_id,)
).fetchone()
stats['video_count'] = row['count']
# 图片数量
row = conn.execute(
"SELECT COUNT(*) as count FROM images WHERE project_id = ?",
(project_id,)
).fetchone()
stats['image_count'] = row['count']
# 多模态实体数量
row = conn.execute(
"""SELECT COUNT(DISTINCT entity_id) as count
FROM multimodal_mentions WHERE project_id = ?""",
(project_id,)
).fetchone()
stats['multimodal_entity_count'] = row['count']
# 跨模态关联数量
row = conn.execute(
"""SELECT COUNT(*) as count FROM multimodal_entity_links
WHERE entity_id IN (SELECT id FROM entities WHERE project_id = ?)""",
(project_id,)
).fetchone()
stats['cross_modal_links'] = row['count']
# 模态分布
for modality in ['audio', 'video', 'image', 'document']:
row = conn.execute(
"""SELECT COUNT(*) as count FROM multimodal_mentions
WHERE project_id = ? AND modality = ?""",
(project_id, modality)
).fetchone()
stats['modality_distribution'][modality] = row['count']
conn.close()
return stats
# Singleton instance
_db_manager = None
def get_db_manager() -> DatabaseManager:
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager