Files
insightflow/backend/db_manager.py
OpenClaw Bot 7b67f3756e Phase 5: 实体属性扩展功能
- 数据库层:
  - 新增 entity_attributes 表存储自定义属性
  - 新增 attribute_templates 表管理属性模板
  - 新增 attribute_history 表记录属性变更历史

- 后端 API:
  - GET/POST /api/v1/projects/{id}/attribute-templates - 属性模板管理
  - GET/POST/PUT/DELETE /api/v1/entities/{id}/attributes - 实体属性 CRUD
  - GET /api/v1/entities/{id}/attributes/history - 属性变更历史
  - GET /api/v1/projects/{id}/entities/search-by-attributes - 属性筛选搜索

- 前端 UI:
  - 实体详情面板添加属性展示
  - 属性编辑表单(支持文本、数字、日期、单选、多选)
  - 属性模板管理界面
  - 属性变更历史查看
  - 知识库实体卡片显示属性预览
  - 属性筛选搜索栏
2026-02-20 00:10:49 +08:00

1785 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Database Manager - Phase 3
处理项目、实体、关系的持久化
支持文档类型和多文件融合
"""
import os
import json
import sqlite3
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db")
@dataclass
class Project:
id: str
name: str
description: str = ""
created_at: str = ""
updated_at: str = ""
@dataclass
class Entity:
id: str
project_id: str
name: str
type: str
definition: str = ""
canonical_name: str = ""
aliases: List[str] = None
attributes: Dict = None # Phase 5: 实体属性
def __post_init__(self):
if self.aliases is None:
self.aliases = []
if self.attributes is None:
self.attributes = {}
@dataclass
class AttributeTemplate:
"""属性模板定义"""
id: str
project_id: str
name: str
type: str # text, number, date, select, multiselect, boolean
options: List[str] = None # 用于 select/multiselect
default_value: str = ""
description: str = ""
is_required: bool = False
display_order: int = 0
created_at: str = ""
updated_at: str = ""
def __post_init__(self):
if self.options is None:
self.options = []
@dataclass
class EntityAttribute:
"""实体属性值"""
id: str
entity_id: str
template_id: str
value: str
template_name: str = "" # 关联查询时填充
template_type: str = "" # 关联查询时填充
created_at: str = ""
updated_at: str = ""
@dataclass
class AttributeHistory:
"""属性变更历史"""
id: str
entity_id: str
template_id: str
template_name: str = ""
old_value: str = ""
new_value: str = ""
changed_by: str = ""
changed_at: str = ""
change_reason: str = ""
@dataclass
class EntityMention:
id: str
entity_id: str
transcript_id: str
start_pos: int
end_pos: int
text_snippet: str
confidence: float = 1.0
class DatabaseManager:
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.init_db()
def get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(self):
"""初始化数据库表"""
with open(os.path.join(os.path.dirname(__file__), 'schema.sql'), 'r') as f:
schema = f.read()
conn = self.get_conn()
conn.executescript(schema)
conn.commit()
conn.close()
# Project operations
def create_project(self, project_id: str, name: str, description: str = "") -> Project:
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"INSERT INTO projects (id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
(project_id, name, description, now, now)
)
conn.commit()
conn.close()
return Project(id=project_id, name=name, description=description, created_at=now, updated_at=now)
def get_project(self, project_id: str) -> Optional[Project]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
conn.close()
if row:
return Project(**dict(row))
return None
def list_projects(self) -> List[Project]:
conn = self.get_conn()
rows = conn.execute("SELECT * FROM projects ORDER BY updated_at DESC").fetchall()
conn.close()
return [Project(**dict(r)) for r in rows]
# Entity operations
def create_entity(self, entity: Entity) -> Entity:
conn = self.get_conn()
conn.execute(
"""INSERT INTO entities (id, project_id, name, canonical_name, type, definition, aliases, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(entity.id, entity.project_id, entity.name, entity.canonical_name, entity.type,
entity.definition, json.dumps(entity.aliases), datetime.now().isoformat(), datetime.now().isoformat())
)
conn.commit()
conn.close()
return entity
def get_entity_by_name(self, project_id: str, name: str) -> Optional[Entity]:
"""通过名称查找实体(用于对齐)"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM entities WHERE project_id = ? AND (name = ? OR canonical_name = ? OR aliases LIKE ?)",
(project_id, name, name, f'%"{name}"%')
).fetchone()
conn.close()
if row:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
return Entity(**data)
return None
def find_similar_entities(self, project_id: str, name: str, threshold: float = 0.8) -> List[Entity]:
"""查找相似实体(简单实现,生产可用 embedding"""
# TODO: 使用 embedding 或模糊匹配
# 现在简单返回包含相同关键词的实体
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entities WHERE project_id = ? AND name LIKE ?",
(project_id, f"%{name}%")
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
def merge_entities(self, target_id: str, source_id: str) -> Entity:
"""合并两个实体(实体对齐)"""
conn = self.get_conn()
# 获取两个实体
target = conn.execute("SELECT * FROM entities WHERE id = ?", (target_id,)).fetchone()
source = conn.execute("SELECT * FROM entities WHERE id = ?", (source_id,)).fetchone()
if not target or not source:
conn.close()
raise ValueError("Entity not found")
# 合并别名
target_aliases = set(json.loads(target['aliases']) if target['aliases'] else [])
target_aliases.add(source['name'])
target_aliases.update(json.loads(source['aliases']) if source['aliases'] else [])
# 更新目标实体
conn.execute(
"UPDATE entities SET aliases = ?, updated_at = ? WHERE id = ?",
(json.dumps(list(target_aliases)), datetime.now().isoformat(), target_id)
)
# 更新提及记录
conn.execute(
"UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?",
(target_id, source_id)
)
# 更新关系 - source 作为 source_entity_id
conn.execute(
"UPDATE entity_relations SET source_entity_id = ? WHERE source_entity_id = ?",
(target_id, source_id)
)
# 更新关系 - source 作为 target_entity_id
conn.execute(
"UPDATE entity_relations SET target_entity_id = ? WHERE target_entity_id = ?",
(target_id, source_id)
)
# 删除源实体
conn.execute("DELETE FROM entities WHERE id = ?", (source_id,))
conn.commit()
conn.close()
return self.get_entity(target_id)
def get_entity(self, entity_id: str) -> Optional[Entity]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone()
conn.close()
if row:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
return Entity(**data)
return None
def list_project_entities(self, project_id: str) -> List[Entity]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entities WHERE project_id = ? ORDER BY updated_at DESC",
(project_id,)
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
# Mention operations
def add_mention(self, mention: EntityMention) -> EntityMention:
conn = self.get_conn()
conn.execute(
"""INSERT INTO entity_mentions (id, entity_id, transcript_id, start_pos, end_pos, text_snippet, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(mention.id, mention.entity_id, mention.transcript_id, mention.start_pos,
mention.end_pos, mention.text_snippet, mention.confidence)
)
conn.commit()
conn.close()
return mention
def get_entity_mentions(self, entity_id: str) -> List[EntityMention]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entity_mentions WHERE entity_id = ? ORDER BY transcript_id, start_pos",
(entity_id,)
).fetchall()
conn.close()
return [EntityMention(**dict(r)) for r in rows]
# Transcript operations
def save_transcript(self, transcript_id: str, project_id: str, filename: str, full_text: str, transcript_type: str = "audio"):
"""保存转录记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"INSERT INTO transcripts (id, project_id, filename, full_text, type, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(transcript_id, project_id, filename, full_text, transcript_type, now)
)
conn.commit()
conn.close()
def get_transcript(self, transcript_id: str) -> Optional[dict]:
"""获取转录记录"""
conn = self.get_conn()
row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone()
conn.close()
if row:
return dict(row)
return None
def list_project_transcripts(self, project_id: str) -> List[dict]:
"""列出项目的所有转录"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM transcripts WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
# Relation operations
def create_relation(self, project_id: str, source_entity_id: str, target_entity_id: str,
relation_type: str = "related", evidence: str = "", transcript_id: str = ""):
"""创建实体关系"""
conn = self.get_conn()
relation_id = str(uuid.uuid4())[:8]
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO entity_relations
(id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(relation_id, project_id, source_entity_id, target_entity_id, relation_type, evidence, transcript_id, now)
)
conn.commit()
conn.close()
return relation_id
def get_entity_relations(self, entity_id: str) -> List[dict]:
"""获取实体的所有关系"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM entity_relations
WHERE source_entity_id = ? OR target_entity_id = ?
ORDER BY created_at DESC""",
(entity_id, entity_id)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_project_relations(self, project_id: str) -> List[dict]:
"""列出项目的所有关系"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entity_relations WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def update_entity(self, entity_id: str, **kwargs) -> Entity:
"""更新实体信息"""
conn = self.get_conn()
# 构建更新字段
allowed_fields = ['name', 'type', 'definition', 'canonical_name']
updates = []
values = []
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
values.append(kwargs[field])
# 处理别名
if 'aliases' in kwargs:
updates.append("aliases = ?")
values.append(json.dumps(kwargs['aliases']))
if not updates:
conn.close()
return self.get_entity(entity_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(entity_id)
query = f"UPDATE entities SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_entity(entity_id)
def delete_entity(self, entity_id: str):
"""删除实体及其关联数据"""
conn = self.get_conn()
# 删除提及记录
conn.execute("DELETE FROM entity_mentions WHERE entity_id = ?", (entity_id,))
# 删除关系
conn.execute("DELETE FROM entity_relations WHERE source_entity_id = ? OR target_entity_id = ?",
(entity_id, entity_id))
# 删除实体
conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,))
conn.commit()
conn.close()
def delete_relation(self, relation_id: str):
"""删除关系"""
conn = self.get_conn()
conn.execute("DELETE FROM entity_relations WHERE id = ?", (relation_id,))
conn.commit()
conn.close()
def update_relation(self, relation_id: str, **kwargs) -> dict:
"""更新关系"""
conn = self.get_conn()
allowed_fields = ['relation_type', 'evidence']
updates = []
values = []
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
values.append(kwargs[field])
if updates:
query = f"UPDATE entity_relations SET {', '.join(updates)} WHERE id = ?"
values.append(relation_id)
conn.execute(query, values)
conn.commit()
row = conn.execute("SELECT * FROM entity_relations WHERE id = ?", (relation_id,)).fetchone()
conn.close()
return dict(row) if row else None
def update_transcript(self, transcript_id: str, full_text: str) -> dict:
"""更新转录文本"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"UPDATE transcripts SET full_text = ?, updated_at = ? WHERE id = ?",
(full_text, now, transcript_id)
)
conn.commit()
row = conn.execute("SELECT * FROM transcripts WHERE id = ?", (transcript_id,)).fetchone()
conn.close()
return dict(row) if row else None
# Phase 3: Glossary operations
def add_glossary_term(self, project_id: str, term: str, pronunciation: str = "") -> str:
"""添加术语到术语表"""
conn = self.get_conn()
# 检查是否已存在
existing = conn.execute(
"SELECT * FROM glossary WHERE project_id = ? AND term = ?",
(project_id, term)
).fetchone()
if existing:
# 更新频率
conn.execute(
"UPDATE glossary SET frequency = frequency + 1 WHERE id = ?",
(existing['id'],)
)
conn.commit()
conn.close()
return existing['id']
term_id = str(uuid.uuid4())[:8]
conn.execute(
"INSERT INTO glossary (id, project_id, term, pronunciation, frequency) VALUES (?, ?, ?, ?, ?)",
(term_id, project_id, term, pronunciation, 1)
)
conn.commit()
conn.close()
return term_id
def list_glossary(self, project_id: str) -> List[dict]:
"""列出项目术语表"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM glossary WHERE project_id = ? ORDER BY frequency DESC",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def delete_glossary_term(self, term_id: str):
"""删除术语"""
conn = self.get_conn()
conn.execute("DELETE FROM glossary WHERE id = ?", (term_id,))
conn.commit()
conn.close()
# Phase 3: Get all entities for embedding
def get_all_entities_for_embedding(self, project_id: str) -> List[Entity]:
"""获取所有实体用于 embedding 计算"""
return self.list_project_entities(project_id)
# Phase 4: Agent & Provenance methods
def get_relation_with_details(self, relation_id: str) -> Optional[dict]:
"""获取关系详情,包含源文档信息"""
conn = self.get_conn()
row = conn.execute(
"""SELECT r.*,
s.name as source_name, t.name as target_name,
tr.filename as transcript_filename, tr.full_text as transcript_text
FROM entity_relations r
JOIN entities s ON r.source_entity_id = s.id
JOIN entities t ON r.target_entity_id = t.id
LEFT JOIN transcripts tr ON r.transcript_id = tr.id
WHERE r.id = ?""",
(relation_id,)
).fetchone()
conn.close()
if row:
return dict(row)
return None
def get_entity_with_mentions(self, entity_id: str) -> Optional[dict]:
"""获取实体详情及所有提及位置"""
conn = self.get_conn()
# 获取实体信息
entity_row = conn.execute(
"SELECT * FROM entities WHERE id = ?", (entity_id,)
).fetchone()
if not entity_row:
conn.close()
return None
entity = dict(entity_row)
entity['aliases'] = json.loads(entity['aliases']) if entity['aliases'] else []
# 获取提及位置
mentions = conn.execute(
"""SELECT m.*, t.filename, t.created_at as transcript_date
FROM entity_mentions m
JOIN transcripts t ON m.transcript_id = t.id
WHERE m.entity_id = ?
ORDER BY t.created_at, m.start_pos""",
(entity_id,)
).fetchall()
entity['mentions'] = [dict(m) for m in mentions]
entity['mention_count'] = len(mentions)
# 获取相关关系
relations = conn.execute(
"""SELECT r.*,
s.name as source_name, t.name as target_name
FROM entity_relations r
JOIN entities s ON r.source_entity_id = s.id
JOIN entities t ON r.target_entity_id = t.id
WHERE r.source_entity_id = ? OR r.target_entity_id = ?
ORDER BY r.created_at DESC""",
(entity_id, entity_id)
).fetchall()
entity['relations'] = [dict(r) for r in relations]
conn.close()
return entity
def search_entities(self, project_id: str, query: str) -> List[Entity]:
"""搜索实体"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM entities
WHERE project_id = ? AND
(name LIKE ? OR definition LIKE ? OR aliases LIKE ?)
ORDER BY name""",
(project_id, f'%{query}%', f'%{query}%', f'%{query}%')
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
def get_project_summary(self, project_id: str) -> dict:
"""获取项目摘要信息,用于 RAG 上下文"""
conn = self.get_conn()
# 项目基本信息
project = conn.execute(
"SELECT * FROM projects WHERE id = ?", (project_id,)
).fetchone()
# 统计信息
entity_count = conn.execute(
"SELECT COUNT(*) as count FROM entities WHERE project_id = ?",
(project_id,)
).fetchone()['count']
transcript_count = conn.execute(
"SELECT COUNT(*) as count FROM transcripts WHERE project_id = ?",
(project_id,)
).fetchone()['count']
relation_count = conn.execute(
"SELECT COUNT(*) as count FROM entity_relations WHERE project_id = ?",
(project_id,)
).fetchone()['count']
# 获取最近的转录文本片段
recent_transcripts = conn.execute(
"""SELECT filename, full_text, created_at
FROM transcripts
WHERE project_id = ?
ORDER BY created_at DESC
LIMIT 5""",
(project_id,)
).fetchall()
# 获取高频实体
top_entities = conn.execute(
"""SELECT e.name, e.type, e.definition, COUNT(m.id) as mention_count
FROM entities e
LEFT JOIN entity_mentions m ON e.id = m.entity_id
WHERE e.project_id = ?
GROUP BY e.id
ORDER BY mention_count DESC
LIMIT 10""",
(project_id,)
).fetchall()
conn.close()
return {
'project': dict(project) if project else {},
'statistics': {
'entity_count': entity_count,
'transcript_count': transcript_count,
'relation_count': relation_count
},
'recent_transcripts': [dict(t) for t in recent_transcripts],
'top_entities': [dict(e) for e in top_entities]
}
# Phase 5: Timeline operations
def get_project_timeline(self, project_id: str, entity_id: str = None, start_date: str = None, end_date: str = None) -> List[dict]:
"""获取项目时间线数据 - 按时间顺序的实体提及和事件"""
conn = self.get_conn()
# 构建查询条件
conditions = ["t.project_id = ?"]
params = [project_id]
if entity_id:
conditions.append("m.entity_id = ?")
params.append(entity_id)
if start_date:
conditions.append("t.created_at >= ?")
params.append(start_date)
if end_date:
conditions.append("t.created_at <= ?")
params.append(end_date)
where_clause = " AND ".join(conditions)
# 获取实体提及时间线
mentions = conn.execute(
f"""SELECT m.*, e.name as entity_name, e.type as entity_type, e.definition,
t.filename, t.created_at as event_date, t.type as source_type
FROM entity_mentions m
JOIN entities e ON m.entity_id = e.id
JOIN transcripts t ON m.transcript_id = t.id
WHERE {where_clause}
ORDER BY t.created_at, m.start_pos""",
params
).fetchall()
# 获取关系创建时间线
relation_conditions = ["r.project_id = ?"]
relation_params = [project_id]
if start_date:
relation_conditions.append("r.created_at >= ?")
relation_params.append(start_date)
if end_date:
relation_conditions.append("r.created_at <= ?")
relation_params.append(end_date)
relation_where = " AND ".join(relation_conditions)
relations = conn.execute(
f"""SELECT r.*,
s.name as source_name, t.name as target_name,
tr.filename, r.created_at as event_date
FROM entity_relations r
JOIN entities s ON r.source_entity_id = s.id
JOIN entities t ON r.target_entity_id = t.id
LEFT JOIN transcripts tr ON r.transcript_id = tr.id
WHERE {relation_where}
ORDER BY r.created_at""",
relation_params
).fetchall()
conn.close()
# 合并并格式化时间线事件
timeline_events = []
for m in mentions:
timeline_events.append({
'id': m['id'],
'type': 'mention',
'event_date': m['event_date'],
'entity_id': m['entity_id'],
'entity_name': m['entity_name'],
'entity_type': m['entity_type'],
'entity_definition': m['definition'],
'text_snippet': m['text_snippet'],
'confidence': m['confidence'],
'source': {
'transcript_id': m['transcript_id'],
'filename': m['filename'],
'type': m['source_type']
}
})
for r in relations:
timeline_events.append({
'id': r['id'],
'type': 'relation',
'event_date': r['event_date'],
'relation_type': r['relation_type'],
'source_entity': r['source_name'],
'target_entity': r['target_name'],
'evidence': r['evidence'],
'source': {
'transcript_id': r.get('transcript_id'),
'filename': r['filename']
}
})
# 按时间排序
timeline_events.sort(key=lambda x: x['event_date'])
return timeline_events
def get_entity_timeline_summary(self, project_id: str) -> dict:
"""获取项目实体时间线摘要统计"""
conn = self.get_conn()
# 按日期统计提及数量
daily_stats = conn.execute(
"""SELECT DATE(t.created_at) as date, COUNT(*) as count
FROM entity_mentions m
JOIN transcripts t ON m.transcript_id = t.id
WHERE t.project_id = ?
GROUP BY DATE(t.created_at)
ORDER BY date""",
(project_id,)
).fetchall()
# 按实体统计提及数量
entity_stats = conn.execute(
"""SELECT e.name, e.type, COUNT(m.id) as mention_count,
MIN(t.created_at) as first_mentioned,
MAX(t.created_at) as last_mentioned
FROM entities e
LEFT JOIN entity_mentions m ON e.id = m.entity_id
LEFT JOIN transcripts t ON m.transcript_id = t.id
WHERE e.project_id = ?
GROUP BY e.id
ORDER BY mention_count DESC
LIMIT 20""",
(project_id,)
).fetchall()
# 获取活跃时间段
active_periods = conn.execute(
"""SELECT
DATE(t.created_at) as date,
COUNT(DISTINCT m.entity_id) as active_entities,
COUNT(m.id) as total_mentions
FROM transcripts t
LEFT JOIN entity_mentions m ON t.id = m.transcript_id
WHERE t.project_id = ?
GROUP BY DATE(t.created_at)
ORDER BY date""",
(project_id,)
).fetchall()
conn.close()
return {
'daily_activity': [dict(d) for d in daily_stats],
'top_entities': [dict(e) for e in entity_stats],
'active_periods': [dict(a) for a in active_periods]
}
# Phase 5: Attribute Template operations
def create_attribute_template(self, project_id: str, template_data: dict) -> dict:
"""创建属性模板"""
conn = self.get_conn()
template_id = str(uuid.uuid4())[:8]
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO attribute_templates
(id, project_id, name, type, description, options, is_required, default_value, sort_order, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(template_id, project_id, template_data['name'], template_data['type'],
template_data.get('description', ''),
json.dumps(template_data.get('options', [])) if template_data.get('options') else None,
1 if template_data.get('is_required') else 0,
template_data.get('default_value'),
template_data.get('sort_order', 0),
now, now)
)
conn.commit()
conn.close()
return self.get_attribute_template(template_id)
def get_attribute_template(self, template_id: str) -> Optional[dict]:
"""获取属性模板详情"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM attribute_templates WHERE id = ?",
(template_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
if data.get('options'):
data['options'] = json.loads(data['options'])
return data
return None
def list_attribute_templates(self, project_id: str) -> List[dict]:
"""列出项目的所有属性模板"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM attribute_templates
WHERE project_id = ?
ORDER BY sort_order, created_at""",
(project_id,)
).fetchall()
conn.close()
templates = []
for row in rows:
data = dict(row)
if data.get('options'):
data['options'] = json.loads(data['options'])
templates.append(data)
return templates
def update_attribute_template(self, template_id: str, template_data: dict) -> Optional[dict]:
"""更新属性模板"""
conn = self.get_conn()
now = datetime.now().isoformat()
allowed_fields = ['name', 'type', 'description', 'is_required', 'default_value', 'sort_order']
updates = []
values = []
for field in allowed_fields:
if field in template_data:
updates.append(f"{field} = ?")
if field == 'is_required':
values.append(1 if template_data[field] else 0)
else:
values.append(template_data[field])
if 'options' in template_data:
updates.append("options = ?")
values.append(json.dumps(template_data['options']) if template_data['options'] else None)
if not updates:
conn.close()
return self.get_attribute_template(template_id)
updates.append("updated_at = ?")
values.append(now)
values.append(template_id)
query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_attribute_template(template_id)
def delete_attribute_template(self, template_id: str):
"""删除属性模板"""
conn = self.get_conn()
conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,))
conn.commit()
conn.close()
# Phase 5: Entity Attribute operations
def get_entity_attributes(self, entity_id: str) -> List[dict]:
"""获取实体的所有属性"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT a.*, t.name as template_name, t.description as template_description
FROM entity_attributes a
LEFT JOIN attribute_templates t ON a.template_id = t.id
WHERE a.entity_id = ?
ORDER BY t.sort_order, a.created_at""",
(entity_id,)
).fetchall()
conn.close()
attributes = []
for row in rows:
data = dict(row)
if data.get('options'):
data['options'] = json.loads(data['options'])
# 解析 value 根据 type
if data['type'] == 'number' and data['value']:
try:
data['value'] = float(data['value'])
except:
pass
elif data['type'] == 'multiselect' and data['value']:
try:
data['value'] = json.loads(data['value'])
except:
pass
attributes.append(data)
return attributes
def get_entity_attribute(self, attribute_id: str) -> Optional[dict]:
"""获取单个属性详情"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM entity_attributes WHERE id = ?",
(attribute_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
if data.get('options'):
data['options'] = json.loads(data['options'])
return data
return None
def set_entity_attribute(self, entity_id: str, attr_data: dict, changed_by: str = "system") -> dict:
"""设置实体属性值(创建或更新)"""
conn = self.get_conn()
now = datetime.now().isoformat()
# 检查是否已存在
existing = conn.execute(
"SELECT * FROM entity_attributes WHERE entity_id = ? AND name = ?",
(entity_id, attr_data['name'])
).fetchone()
# 处理 value 存储
value = attr_data['value']
if attr_data['type'] == 'multiselect' and isinstance(value, list):
value = json.dumps(value)
elif value is not None:
value = str(value)
if existing:
# 记录历史
old_value = existing['value']
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], entity_id, attr_data['name'], old_value, value,
changed_by, now, attr_data.get('change_reason', ''))
)
# 更新属性
conn.execute(
"""UPDATE entity_attributes
SET value = ?, type = ?, options = ?, updated_at = ?
WHERE id = ?""",
(value, attr_data['type'],
json.dumps(attr_data.get('options', [])) if attr_data.get('options') else existing['options'],
now, existing['id'])
)
attribute_id = existing['id']
else:
# 创建新属性
attribute_id = str(uuid.uuid4())[:8]
conn.execute(
"""INSERT INTO entity_attributes
(id, entity_id, template_id, name, type, value, options, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(attribute_id, entity_id, attr_data.get('template_id'),
attr_data['name'], attr_data['type'], value,
json.dumps(attr_data.get('options', [])) if attr_data.get('options') else None,
now, now)
)
# 记录历史
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], entity_id, attr_data['name'], None, value,
changed_by, now, attr_data.get('change_reason', '创建属性'))
)
conn.commit()
conn.close()
return self.get_entity_attribute(attribute_id)
def update_entity_attribute(self, attribute_id: str, attr_data: dict, changed_by: str = "system") -> Optional[dict]:
"""更新实体属性"""
conn = self.get_conn()
now = datetime.now().isoformat()
existing = conn.execute(
"SELECT * FROM entity_attributes WHERE id = ?",
(attribute_id,)
).fetchone()
if not existing:
conn.close()
return None
# 处理 value
value = attr_data.get('value')
if value is not None:
if attr_data.get('type', existing['type']) == 'multiselect' and isinstance(value, list):
value = json.dumps(value)
else:
value = str(value)
# 记录历史
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], existing['entity_id'], existing['name'],
existing['value'], value, changed_by, now,
attr_data.get('change_reason', ''))
)
# 更新字段
updates = []
values = []
if 'value' in attr_data:
updates.append("value = ?")
values.append(value)
if 'type' in attr_data:
updates.append("type = ?")
values.append(attr_data['type'])
if 'options' in attr_data:
updates.append("options = ?")
values.append(json.dumps(attr_data['options']) if attr_data['options'] else None)
if updates:
updates.append("updated_at = ?")
values.append(now)
values.append(attribute_id)
query = f"UPDATE entity_attributes SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_entity_attribute(attribute_id)
def delete_entity_attribute(self, attribute_id: str, changed_by: str = "system"):
"""删除实体属性"""
conn = self.get_conn()
existing = conn.execute(
"SELECT * FROM entity_attributes WHERE id = ?",
(attribute_id,)
).fetchone()
if existing:
# 记录历史
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], existing['entity_id'], existing['name'],
existing['value'], None, changed_by, datetime.now().isoformat(), '删除属性')
)
conn.execute("DELETE FROM entity_attributes WHERE id = ?", (attribute_id,))
conn.commit()
conn.close()
def get_attribute_history(self, entity_id: str, attribute_name: str = None) -> List[dict]:
"""获取属性变更历史"""
conn = self.get_conn()
if attribute_name:
rows = conn.execute(
"""SELECT * FROM attribute_history
WHERE entity_id = ? AND attribute_name = ?
ORDER BY changed_at DESC""",
(entity_id, attribute_name)
).fetchall()
else:
rows = conn.execute(
"""SELECT * FROM attribute_history
WHERE entity_id = ?
ORDER BY changed_at DESC""",
(entity_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def search_entities_by_attributes(self, project_id: str, filters: List[dict]) -> List[Entity]:
"""根据属性筛选搜索实体"""
conn = self.get_conn()
# 基础查询
base_query = "SELECT DISTINCT e.* FROM entities e"
where_conditions = ["e.project_id = ?"]
params = [project_id]
# 为每个过滤条件添加 JOIN
join_clauses = []
for i, f in enumerate(filters):
alias = f"a{i}"
join_clauses.append(
f"JOIN entity_attributes {alias} ON e.id = {alias}.entity_id AND {alias}.name = ?"
)
params.append(f['name'])
operator = f.get('operator', 'eq')
if operator == 'eq':
where_conditions.append(f"{alias}.value = ?")
params.append(str(f['value']))
elif operator == 'contains':
where_conditions.append(f"{alias}.value LIKE ?")
params.append(f"%{f['value']}%")
elif operator == 'gt':
where_conditions.append(f"CAST({alias}.value AS REAL) > ?")
params.append(float(f['value']))
elif operator == 'lt':
where_conditions.append(f"CAST({alias}.value AS REAL) < ?")
params.append(float(f['value']))
query = base_query + " " + " ".join(join_clauses) + " WHERE " + " AND ".join(where_conditions)
rows = conn.execute(query, params).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
def get_transcript_context(self, transcript_id: str, position: int, context_chars: int = 200) -> str:
"""获取转录文本的上下文"""
conn = self.get_conn()
row = conn.execute(
"SELECT full_text FROM transcripts WHERE id = ?",
(transcript_id,)
).fetchone()
conn.close()
if not row:
return ""
text = row['full_text']
start = max(0, position - context_chars)
end = min(len(text), position + context_chars)
return text[start:end]
# ==================== Phase 5: 实体属性管理 ====================
# ---- 属性模板管理 ----
def create_attribute_template(self, template: AttributeTemplate) -> AttributeTemplate:
"""创建属性模板"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO attribute_templates
(id, project_id, name, type, options, default_value, description, is_required, display_order, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(template.id, template.project_id, template.name, template.type,
json.dumps(template.options) if template.options else None,
template.default_value, template.description, template.is_required,
template.display_order, now, now)
)
conn.commit()
conn.close()
return template
def get_attribute_template(self, template_id: str) -> Optional[AttributeTemplate]:
"""获取属性模板"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM attribute_templates WHERE id = ?",
(template_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
data['options'] = json.loads(data['options']) if data['options'] else []
return AttributeTemplate(**data)
return None
def list_attribute_templates(self, project_id: str) -> List[AttributeTemplate]:
"""列出项目的所有属性模板"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM attribute_templates
WHERE project_id = ?
ORDER BY display_order, created_at""",
(project_id,)
).fetchall()
conn.close()
templates = []
for row in rows:
data = dict(row)
data['options'] = json.loads(data['options']) if data['options'] else []
templates.append(AttributeTemplate(**data))
return templates
def update_attribute_template(self, template_id: str, **kwargs) -> Optional[AttributeTemplate]:
"""更新属性模板"""
conn = self.get_conn()
allowed_fields = ['name', 'type', 'options', 'default_value',
'description', 'is_required', 'display_order']
updates = []
values = []
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
if field == 'options':
values.append(json.dumps(kwargs[field]) if kwargs[field] else None)
else:
values.append(kwargs[field])
if not updates:
conn.close()
return self.get_attribute_template(template_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(template_id)
query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_attribute_template(template_id)
def delete_attribute_template(self, template_id: str):
"""删除属性模板(会级联删除相关属性值)"""
conn = self.get_conn()
conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,))
conn.commit()
conn.close()
# ---- 实体属性值管理 ----
def set_entity_attribute(self, attr: EntityAttribute,
changed_by: str = "system",
change_reason: str = "") -> EntityAttribute:
"""设置实体属性值,自动记录历史"""
conn = self.get_conn()
now = datetime.now().isoformat()
# 获取旧值
old_row = conn.execute(
"SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(attr.entity_id, attr.template_id)
).fetchone()
old_value = old_row['value'] if old_row else None
# 记录变更历史
if old_value != attr.value:
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], attr.entity_id, attr.template_id,
old_value, attr.value, changed_by, now, change_reason)
)
# 插入或更新属性值
conn.execute(
"""INSERT OR REPLACE INTO entity_attributes
(id, entity_id, template_id, value, created_at, updated_at)
VALUES (
COALESCE((SELECT id FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?),
?, ?, ?,
COALESCE((SELECT created_at FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?),
?
)""",
(attr.entity_id, attr.template_id, attr.id,
attr.entity_id, attr.template_id, attr.value,
attr.entity_id, attr.template_id, now, now)
)
conn.commit()
conn.close()
return attr
def get_entity_attributes(self, entity_id: str) -> List[EntityAttribute]:
"""获取实体的所有属性值"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT ea.*, at.name as template_name, at.type as template_type
FROM entity_attributes ea
JOIN attribute_templates at ON ea.template_id = at.id
WHERE ea.entity_id = ?
ORDER BY at.display_order""",
(entity_id,)
).fetchall()
conn.close()
return [EntityAttribute(**dict(r)) for r in rows]
def get_entity_with_attributes(self, entity_id: str) -> Optional[Entity]:
"""获取实体详情,包含属性"""
entity = self.get_entity(entity_id)
if not entity:
return None
# 获取属性
attrs = self.get_entity_attributes(entity_id)
entity.attributes = {
attr.template_name: {
'value': attr.value,
'type': attr.template_type,
'template_id': attr.template_id
}
for attr in attrs
}
return entity
def delete_entity_attribute(self, entity_id: str, template_id: str,
changed_by: str = "system", change_reason: str = ""):
"""删除实体属性值"""
conn = self.get_conn()
# 获取旧值
old_row = conn.execute(
"SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(entity_id, template_id)
).fetchone()
if old_row:
# 记录删除历史
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], entity_id, template_id,
old_row['value'], None, changed_by, datetime.now().isoformat(),
change_reason or "属性删除")
)
# 删除属性
conn.execute(
"DELETE FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(entity_id, template_id)
)
conn.commit()
conn.close()
# ---- 属性历史管理 ----
def get_attribute_history(self, entity_id: str = None,
template_id: str = None,
limit: int = 50) -> List[AttributeHistory]:
"""获取属性变更历史"""
conn = self.get_conn()
conditions = []
params = []
if entity_id:
conditions.append("ah.entity_id = ?")
params.append(entity_id)
if template_id:
conditions.append("ah.template_id = ?")
params.append(template_id)
where_clause = " AND ".join(conditions) if conditions else "1=1"
rows = conn.execute(
f"""SELECT ah.*, at.name as template_name
FROM attribute_history ah
JOIN attribute_templates at ON ah.template_id = at.id
WHERE {where_clause}
ORDER BY ah.changed_at DESC
LIMIT ?""",
params + [limit]
).fetchall()
conn.close()
return [AttributeHistory(**dict(r)) for r in rows]
def search_entities_by_attributes(self, project_id: str,
attribute_filters: Dict[str, str]) -> List[Entity]:
"""根据属性筛选搜索实体"""
conn = self.get_conn()
# 获取项目所有实体
entities = self.list_project_entities(project_id)
if not attribute_filters:
return entities
# 获取所有实体的属性
entity_ids = [e.id for e in entities]
if not entity_ids:
return []
# 构建查询条件
placeholders = ','.join(['?' for _ in entity_ids])
rows = conn.execute(
f"""SELECT ea.*, at.name as template_name
FROM entity_attributes ea
JOIN attribute_templates at ON ea.template_id = at.id
WHERE ea.entity_id IN ({placeholders})""",
entity_ids
).fetchall()
conn.close()
# 按实体ID分组属性
entity_attrs = {}
for row in rows:
eid = row['entity_id']
if eid not in entity_attrs:
entity_attrs[eid] = {}
entity_attrs[eid][row['template_name']] = row['value']
# 过滤实体
filtered = []
for entity in entities:
attrs = entity_attrs.get(entity.id, {})
match = True
for attr_name, attr_value in attribute_filters.items():
if attrs.get(attr_name) != attr_value:
match = False
break
if match:
entity.attributes = attrs
filtered.append(entity)
return filtered
# Singleton instance
_db_manager = None
def get_db_manager() -> DatabaseManager:
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager
end = min(len(text), position + context_chars)
return text[start:end]
# ==================== Phase 5: 实体属性管理 ====================
# ---- 属性模板管理 ----
def create_attribute_template(self, template: AttributeTemplate) -> AttributeTemplate:
"""创建属性模板"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO attribute_templates
(id, project_id, name, type, options, default_value, description, is_required, display_order, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(template.id, template.project_id, template.name, template.type,
json.dumps(template.options) if template.options else None,
template.default_value, template.description, template.is_required,
template.display_order, now, now)
)
conn.commit()
conn.close()
return template
def get_attribute_template(self, template_id: str) -> Optional[AttributeTemplate]:
"""获取属性模板"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM attribute_templates WHERE id = ?",
(template_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
data['options'] = json.loads(data['options']) if data['options'] else []
return AttributeTemplate(**data)
return None
def list_attribute_templates(self, project_id: str) -> List[AttributeTemplate]:
"""列出项目的所有属性模板"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM attribute_templates
WHERE project_id = ?
ORDER BY display_order, created_at""",
(project_id,)
).fetchall()
conn.close()
templates = []
for row in rows:
data = dict(row)
data['options'] = json.loads(data['options']) if data['options'] else []
templates.append(AttributeTemplate(**data))
return templates
def update_attribute_template(self, template_id: str, **kwargs) -> Optional[AttributeTemplate]:
"""更新属性模板"""
conn = self.get_conn()
allowed_fields = ['name', 'type', 'options', 'default_value',
'description', 'is_required', 'display_order']
updates = []
values = []
for field in allowed_fields:
if field in kwargs:
updates.append(f"{field} = ?")
if field == 'options':
values.append(json.dumps(kwargs[field]) if kwargs[field] else None)
else:
values.append(kwargs[field])
if not updates:
conn.close()
return self.get_attribute_template(template_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(template_id)
query = f"UPDATE attribute_templates SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_attribute_template(template_id)
def delete_attribute_template(self, template_id: str):
"""删除属性模板(会级联删除相关属性值)"""
conn = self.get_conn()
conn.execute("DELETE FROM attribute_templates WHERE id = ?", (template_id,))
conn.commit()
conn.close()
# ---- 实体属性值管理 ----
def set_entity_attribute(self, attr: EntityAttribute,
changed_by: str = "system",
change_reason: str = "") -> EntityAttribute:
"""设置实体属性值,自动记录历史"""
conn = self.get_conn()
now = datetime.now().isoformat()
# 获取旧值
old_row = conn.execute(
"SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(attr.entity_id, attr.template_id)
).fetchone()
old_value = old_row['value'] if old_row else None
# 记录变更历史
if old_value != attr.value:
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], attr.entity_id, attr.template_id,
old_value, attr.value, changed_by, now, change_reason)
)
# 插入或更新属性值
conn.execute(
"""INSERT OR REPLACE INTO entity_attributes
(id, entity_id, template_id, value, created_at, updated_at)
VALUES (
COALESCE((SELECT id FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?),
?, ?, ?,
COALESCE((SELECT created_at FROM entity_attributes WHERE entity_id = ? AND template_id = ?), ?),
?
)""",
(attr.entity_id, attr.template_id, attr.id,
attr.entity_id, attr.template_id, attr.value,
attr.entity_id, attr.template_id, now, now)
)
conn.commit()
conn.close()
return attr
def get_entity_attributes(self, entity_id: str) -> List[EntityAttribute]:
"""获取实体的所有属性值"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT ea.*, at.name as template_name, at.type as template_type
FROM entity_attributes ea
JOIN attribute_templates at ON ea.template_id = at.id
WHERE ea.entity_id = ?
ORDER BY at.display_order""",
(entity_id,)
).fetchall()
conn.close()
return [EntityAttribute(**dict(r)) for r in rows]
def get_entity_with_attributes(self, entity_id: str) -> Optional[Entity]:
"""获取实体详情,包含属性"""
entity = self.get_entity(entity_id)
if not entity:
return None
# 获取属性
attrs = self.get_entity_attributes(entity_id)
entity.attributes = {
attr.template_name: {
'value': attr.value,
'type': attr.template_type,
'template_id': attr.template_id
}
for attr in attrs
}
return entity
def delete_entity_attribute(self, entity_id: str, template_id: str,
changed_by: str = "system", change_reason: str = ""):
"""删除实体属性值"""
conn = self.get_conn()
# 获取旧值
old_row = conn.execute(
"SELECT value FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(entity_id, template_id)
).fetchone()
if old_row:
# 记录删除历史
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, template_id, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], entity_id, template_id,
old_row['value'], None, changed_by, datetime.now().isoformat(),
change_reason or "属性删除")
)
# 删除属性
conn.execute(
"DELETE FROM entity_attributes WHERE entity_id = ? AND template_id = ?",
(entity_id, template_id)
)
conn.commit()
conn.close()
# ---- 属性历史管理 ----
def get_attribute_history(self, entity_id: str = None,
template_id: str = None,
limit: int = 50) -> List[AttributeHistory]:
"""获取属性变更历史"""
conn = self.get_conn()
conditions = []
params = []
if entity_id:
conditions.append("ah.entity_id = ?")
params.append(entity_id)
if template_id:
conditions.append("ah.template_id = ?")
params.append(template_id)
where_clause = " AND ".join(conditions) if conditions else "1=1"
rows = conn.execute(
f"""SELECT ah.*, at.name as template_name
FROM attribute_history ah
JOIN attribute_templates at ON ah.template_id = at.id
WHERE {where_clause}
ORDER BY ah.changed_at DESC
LIMIT ?""",
params + [limit]
).fetchall()
conn.close()
return [AttributeHistory(**dict(r)) for r in rows]
def search_entities_by_attributes(self, project_id: str,
attribute_filters: Dict[str, str]) -> List[Entity]:
"""根据属性筛选搜索实体"""
conn = self.get_conn()
# 获取项目所有实体
entities = self.list_project_entities(project_id)
if not attribute_filters:
return entities
# 获取所有实体的属性
entity_ids = [e.id for e in entities]
if not entity_ids:
return []
# 构建查询条件
placeholders = ','.join(['?' for _ in entity_ids])
rows = conn.execute(
f"""SELECT ea.*, at.name as template_name
FROM entity_attributes ea
JOIN attribute_templates at ON ea.template_id = at.id
WHERE ea.entity_id IN ({placeholders})""",
entity_ids
).fetchall()
conn.close()
# 按实体ID分组属性
entity_attrs = {}
for row in rows:
eid = row['entity_id']
if eid not in entity_attrs:
entity_attrs[eid] = {}
entity_attrs[eid][row['template_name']] = row['value']
# 过滤实体
filtered = []
for entity in entities:
attrs = entity_attrs.get(entity.id, {})
match = True
for attr_name, attr_value in attribute_filters.items():
if attrs.get(attr_name) != attr_value:
match = False
break
if match:
entity.attributes = attrs
filtered.append(entity)
return filtered
# Singleton instance
_db_manager = None
def get_db_manager() -> DatabaseManager:
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager