Files
insightflow/backend/main.py
OpenClaw Bot c557cc52c4 Phase 7 Task 4: 协作与共享模块
- 创建 collaboration_manager.py 协作管理模块
  - CollaborationManager: 协作管理主类
  - 项目分享链接管理 - 支持只读/评论/编辑/管理员权限
  - 评论和批注系统 - 支持实体、关系、转录文本评论
  - 变更历史追踪 - 记录所有数据操作变更
  - 团队成员管理 - 支持多角色权限控制

- 更新 schema.sql 添加协作相关数据库表
  - project_shares: 项目分享表
  - comments: 评论表
  - change_history: 变更历史表
  - team_members: 团队成员表

- 更新 main.py 添加协作相关 API 端点
  - 项目分享相关端点
  - 评论和批注相关端点
  - 变更历史相关端点
  - 团队成员管理端点

- 更新 README.md 和 STATUS.md
2026-02-24 00:13:09 +08:00

3134 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Backend - Phase 3 (Memory & Growth)
Knowledge Growth: Multi-file fusion + Entity Alignment + Document Import
ASR: 阿里云听悟 + OSS
"""
import os
import sys
import json
import httpx
import uuid
import re
import io
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Optional, Union
from datetime import datetime
# Add backend directory to path for imports
backend_dir = os.path.dirname(os.path.abspath(__file__))
if backend_dir not in sys.path:
sys.path.insert(0, backend_dir)
# Import clients
try:
from oss_uploader import get_oss_uploader
OSS_AVAILABLE = True
except ImportError:
OSS_AVAILABLE = False
try:
from tingwu_client import TingwuClient
TINGWU_AVAILABLE = True
except ImportError:
TINGWU_AVAILABLE = False
try:
from db_manager import get_db_manager, Project, Entity, EntityMention
DB_AVAILABLE = True
except ImportError as e:
print(f"DB import error: {e}")
DB_AVAILABLE = False
try:
from document_processor import DocumentProcessor
DOC_PROCESSOR_AVAILABLE = True
except ImportError:
DOC_PROCESSOR_AVAILABLE = False
try:
from entity_aligner import EntityAligner
ALIGNER_AVAILABLE = True
except ImportError:
ALIGNER_AVAILABLE = False
try:
from llm_client import get_llm_client, ChatMessage
LLM_CLIENT_AVAILABLE = True
except ImportError:
LLM_CLIENT_AVAILABLE = False
try:
from knowledge_reasoner import get_knowledge_reasoner, KnowledgeReasoner, ReasoningType
REASONER_AVAILABLE = True
except ImportError:
REASONER_AVAILABLE = False
try:
from export_manager import get_export_manager, ExportEntity, ExportRelation, ExportTranscript
EXPORT_AVAILABLE = True
except ImportError:
EXPORT_AVAILABLE = False
try:
from neo4j_manager import get_neo4j_manager, sync_project_to_neo4j, NEO4J_AVAILABLE
except ImportError:
NEO4J_AVAILABLE = False
try:
from collaboration_manager import get_collaboration_manager, CollaborationManager
COLLABORATION_AVAILABLE = True
except ImportError as e:
print(f"Collaboration import error: {e}")
COLLABORATION_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.3.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Models
class EntityModel(BaseModel):
id: str
name: str
type: str
definition: Optional[str] = ""
aliases: List[str] = []
class TranscriptSegment(BaseModel):
start: float
end: float
text: str
speaker: Optional[str] = "Speaker A"
class AnalysisResult(BaseModel):
transcript_id: str
project_id: str
segments: List[TranscriptSegment]
entities: List[EntityModel]
full_text: str
created_at: str
class ProjectCreate(BaseModel):
name: str
description: str = ""
class EntityUpdate(BaseModel):
name: Optional[str] = None
type: Optional[str] = None
definition: Optional[str] = None
aliases: Optional[List[str]] = None
class RelationCreate(BaseModel):
source_entity_id: str
target_entity_id: str
relation_type: str
evidence: Optional[str] = ""
class TranscriptUpdate(BaseModel):
full_text: str
class AgentQuery(BaseModel):
query: str
stream: bool = False
class AgentCommand(BaseModel):
command: str
class EntityMergeRequest(BaseModel):
source_entity_id: str
target_entity_id: str
class GlossaryTermCreate(BaseModel):
term: str
pronunciation: Optional[str] = ""
# Phase 7: 协作与共享 - 请求模型
class ShareLinkCreate(BaseModel):
permission: str = "read_only" # read_only, comment, edit, admin
expires_in_days: Optional[int] = None
max_uses: Optional[int] = None
password: Optional[str] = None
allow_download: bool = False
allow_export: bool = False
class ShareLinkVerify(BaseModel):
token: str
password: Optional[str] = None
class CommentCreate(BaseModel):
target_type: str # entity, relation, transcript, project
target_id: str
parent_id: Optional[str] = None
content: str
mentions: Optional[List[str]] = None
class CommentUpdate(BaseModel):
content: str
class CommentResolve(BaseModel):
resolved: bool
class TeamMemberInvite(BaseModel):
user_id: str
user_name: str
user_email: str
role: str = "viewer" # owner, admin, editor, viewer, commenter
class TeamMemberRoleUpdate(BaseModel):
role: str
# API Keys
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
KIMI_BASE_URL = os.getenv("KIMI_BASE_URL", "https://api.kimi.com/coding")
# Phase 3: Entity Aligner singleton
_aligner = None
def get_aligner():
global _aligner
if _aligner is None and ALIGNER_AVAILABLE:
_aligner = EntityAligner()
return _aligner
# Phase 3: Document Processor singleton
_doc_processor = None
def get_doc_processor():
global _doc_processor
if _doc_processor is None and DOC_PROCESSOR_AVAILABLE:
_doc_processor = DocumentProcessor()
return _doc_processor
# Phase 7: Collaboration Manager singleton
_collaboration_manager = None
def get_collab_manager():
global _collaboration_manager
if _collaboration_manager is None and COLLABORATION_AVAILABLE:
db = get_db_manager() if DB_AVAILABLE else None
_collaboration_manager = get_collaboration_manager(db)
return _collaboration_manager
# Phase 2: Entity Edit API
@app.put("/api/v1/entities/{entity_id}")
async def update_entity(entity_id: str, update: EntityUpdate):
"""更新实体信息(名称、类型、定义、别名)"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
# 更新字段
update_data = {k: v for k, v in update.dict().items() if v is not None}
updated = db.update_entity(entity_id, **update_data)
return {
"id": updated.id,
"name": updated.name,
"type": updated.type,
"definition": updated.definition,
"aliases": updated.aliases
}
@app.delete("/api/v1/entities/{entity_id}")
async def delete_entity(entity_id: str):
"""删除实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
db.delete_entity(entity_id)
return {"success": True, "message": f"Entity {entity_id} deleted"}
@app.post("/api/v1/entities/{entity_id}/merge")
async def merge_entities_endpoint(entity_id: str, merge_req: EntityMergeRequest):
"""合并两个实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
# 验证两个实体都存在
source = db.get_entity(merge_req.source_entity_id)
target = db.get_entity(merge_req.target_entity_id)
if not source or not target:
raise HTTPException(status_code=404, detail="Entity not found")
result = db.merge_entities(merge_req.target_entity_id, merge_req.source_entity_id)
return {
"success": True,
"merged_entity": {
"id": result.id,
"name": result.name,
"type": result.type,
"definition": result.definition,
"aliases": result.aliases
}
}
# Phase 2: Relation Edit API
@app.post("/api/v1/projects/{project_id}/relations")
async def create_relation_endpoint(project_id: str, relation: RelationCreate):
"""创建新的实体关系"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
# 验证实体存在
source = db.get_entity(relation.source_entity_id)
target = db.get_entity(relation.target_entity_id)
if not source or not target:
raise HTTPException(status_code=404, detail="Source or target entity not found")
relation_id = db.create_relation(
project_id=project_id,
source_entity_id=relation.source_entity_id,
target_entity_id=relation.target_entity_id,
relation_type=relation.relation_type,
evidence=relation.evidence
)
return {
"id": relation_id,
"source_id": relation.source_entity_id,
"target_id": relation.target_entity_id,
"type": relation.relation_type,
"success": True
}
@app.delete("/api/v1/relations/{relation_id}")
async def delete_relation(relation_id: str):
"""删除关系"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
db.delete_relation(relation_id)
return {"success": True, "message": f"Relation {relation_id} deleted"}
@app.put("/api/v1/relations/{relation_id}")
async def update_relation(relation_id: str, relation: RelationCreate):
"""更新关系"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
updated = db.update_relation(
relation_id=relation_id,
relation_type=relation.relation_type,
evidence=relation.evidence
)
return {
"id": relation_id,
"type": updated["relation_type"],
"evidence": updated["evidence"],
"success": True
}
# Phase 2: Transcript Edit API
@app.get("/api/v1/transcripts/{transcript_id}")
async def get_transcript(transcript_id: str):
"""获取转录详情"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
transcript = db.get_transcript(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
return transcript
@app.put("/api/v1/transcripts/{transcript_id}")
async def update_transcript(transcript_id: str, update: TranscriptUpdate):
"""更新转录文本(人工修正)"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
transcript = db.get_transcript(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
updated = db.update_transcript(transcript_id, update.full_text)
return {
"id": transcript_id,
"full_text": updated["full_text"],
"updated_at": updated["updated_at"],
"success": True
}
# Phase 2: Manual Entity Creation
class ManualEntityCreate(BaseModel):
name: str
type: str = "OTHER"
definition: str = ""
transcript_id: Optional[str] = None
start_pos: Optional[int] = None
end_pos: Optional[int] = None
@app.post("/api/v1/projects/{project_id}/entities")
async def create_manual_entity(project_id: str, entity: ManualEntityCreate):
"""手动创建实体(划词新建)"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
# 检查是否已存在
existing = db.get_entity_by_name(project_id, entity.name)
if existing:
return {
"id": existing.id,
"name": existing.name,
"type": existing.type,
"existed": True
}
entity_id = str(uuid.uuid4())[:8]
new_entity = db.create_entity(Entity(
id=entity_id,
project_id=project_id,
name=entity.name,
type=entity.type,
definition=entity.definition
))
# 如果有提及位置信息,保存提及
if entity.transcript_id and entity.start_pos is not None and entity.end_pos is not None:
transcript = db.get_transcript(entity.transcript_id)
if transcript:
text = transcript["full_text"]
mention = EntityMention(
id=str(uuid.uuid4())[:8],
entity_id=entity_id,
transcript_id=entity.transcript_id,
start_pos=entity.start_pos,
end_pos=entity.end_pos,
text_snippet=text[max(0, entity.start_pos-20):min(len(text), entity.end_pos+20)],
confidence=1.0
)
db.add_mention(mention)
return {
"id": new_entity.id,
"name": new_entity.name,
"type": new_entity.type,
"definition": new_entity.definition,
"success": True
}
def transcribe_audio(audio_data: bytes, filename: str) -> dict:
"""转录音频OSS上传 + 听悟转录"""
# 1. 上传 OSS
if not OSS_AVAILABLE:
print("OSS not available, using mock")
return mock_transcribe()
try:
uploader = get_oss_uploader()
audio_url, object_name = uploader.upload_audio(audio_data, filename)
print(f"Uploaded to OSS: {object_name}")
except Exception as e:
print(f"OSS upload failed: {e}")
return mock_transcribe()
# 2. 听悟转录
if not TINGWU_AVAILABLE:
print("Tingwu not available, using mock")
return mock_transcribe()
try:
client = TingwuClient()
result = client.transcribe(audio_url)
print(f"Transcription complete: {len(result['segments'])} segments")
return result
except Exception as e:
print(f"Tingwu failed: {e}")
return mock_transcribe()
def mock_transcribe() -> dict:
"""Mock 转录结果"""
return {
"full_text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成。",
"segments": [
{"start": 0.0, "end": 5.0, "text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成。", "speaker": "Speaker A"}
]
}
def extract_entities_with_llm(text: str) -> tuple[List[dict], List[dict]]:
"""使用 Kimi API 提取实体和关系
Returns:
(entities, relations): 实体列表和关系列表
"""
if not KIMI_API_KEY or not text:
return [], []
prompt = f"""从以下会议文本中提取关键实体和它们之间的关系,以 JSON 格式返回:
文本:{text[:3000]}
要求:
1. entities: 每个实体包含 name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), definition(一句话定义)
2. relations: 每个关系包含 source(源实体名), target(目标实体名), type(关系类型: belongs_to/works_with/depends_on/mentions/related)
3. 只返回 JSON 对象,格式: {{"entities": [...], "relations": [...]}}
示例:
{{
"entities": [
{{"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}},
{{"name": "K8s", "type": "TECH", "definition": "Kubernetes容器编排平台"}}
],
"relations": [
{{"source": "Project Alpha", "target": "K8s", "type": "depends_on"}}
]
}}
"""
try:
response = httpx.post(
f"{KIMI_BASE_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"},
json={"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1},
timeout=60.0
)
response.raise_for_status()
result = response.json()
content = result["choices"][0]["message"]["content"]
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
return data.get("entities", []), data.get("relations", [])
except Exception as e:
print(f"LLM extraction failed: {e}")
return [], []
def align_entity(project_id: str, name: str, db, definition: str = "") -> Optional['Entity']:
"""实体对齐 - Phase 3: 使用 embedding 对齐"""
# 1. 首先尝试精确匹配
existing = db.get_entity_by_name(project_id, name)
if existing:
return existing
# 2. 使用 embedding 对齐(如果可用)
aligner = get_aligner()
if aligner:
similar = aligner.find_similar_entity(project_id, name, definition)
if similar:
return similar
# 3. 回退到简单相似度匹配
similar = db.find_similar_entities(project_id, name)
if similar:
return similar[0]
return None
# API Endpoints
@app.post("/api/v1/projects", response_model=dict)
async def create_project(project: ProjectCreate):
"""创建新项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project_id = str(uuid.uuid4())[:8]
p = db.create_project(project_id, project.name, project.description)
return {"id": p.id, "name": p.name, "description": p.description}
@app.get("/api/v1/projects")
async def list_projects():
"""列出所有项目"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
projects = db.list_projects()
return [{"id": p.id, "name": p.name, "description": p.description} for p in projects]
@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult)
async def upload_audio(project_id: str, file: UploadFile = File(...)):
"""上传音频到指定项目 - Phase 3: 支持多文件融合"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
content = await file.read()
# 转录
print(f"Processing: {file.filename}")
tw_result = transcribe_audio(content, file.filename)
# 提取实体和关系
print("Extracting entities and relations...")
raw_entities, raw_relations = extract_entities_with_llm(tw_result["full_text"])
# 保存转录记录
transcript_id = str(uuid.uuid4())[:8]
db.save_transcript(
transcript_id=transcript_id,
project_id=project_id,
filename=file.filename,
full_text=tw_result["full_text"]
)
# 实体对齐并保存 - Phase 3: 使用增强对齐
aligned_entities = []
entity_name_to_id = {} # 用于关系映射
for raw_ent in raw_entities:
existing = align_entity(project_id, raw_ent["name"], db, raw_ent.get("definition", ""))
if existing:
ent_model = EntityModel(
id=existing.id,
name=existing.name,
type=existing.type,
definition=existing.definition,
aliases=existing.aliases
)
entity_name_to_id[raw_ent["name"]] = existing.id
else:
new_ent = db.create_entity(Entity(
id=str(uuid.uuid4())[:8],
project_id=project_id,
name=raw_ent["name"],
type=raw_ent.get("type", "OTHER"),
definition=raw_ent.get("definition", "")
))
ent_model = EntityModel(
id=new_ent.id,
name=new_ent.name,
type=new_ent.type,
definition=new_ent.definition
)
entity_name_to_id[raw_ent["name"]] = new_ent.id
aligned_entities.append(ent_model)
# 保存实体提及位置
full_text = tw_result["full_text"]
name = raw_ent["name"]
start_pos = 0
while True:
pos = full_text.find(name, start_pos)
if pos == -1:
break
mention = EntityMention(
id=str(uuid.uuid4())[:8],
entity_id=entity_name_to_id[name],
transcript_id=transcript_id,
start_pos=pos,
end_pos=pos + len(name),
text_snippet=full_text[max(0, pos-20):min(len(full_text), pos+len(name)+20)],
confidence=1.0
)
db.add_mention(mention)
start_pos = pos + 1
# 保存关系
for rel in raw_relations:
source_id = entity_name_to_id.get(rel.get("source", ""))
target_id = entity_name_to_id.get(rel.get("target", ""))
if source_id and target_id:
db.create_relation(
project_id=project_id,
source_entity_id=source_id,
target_entity_id=target_id,
relation_type=rel.get("type", "related"),
evidence=tw_result["full_text"][:200],
transcript_id=transcript_id
)
# 构建片段
segments = [TranscriptSegment(**seg) for seg in tw_result["segments"]]
return AnalysisResult(
transcript_id=transcript_id,
project_id=project_id,
segments=segments,
entities=aligned_entities,
full_text=tw_result["full_text"],
created_at=datetime.now().isoformat()
)
# Phase 3: Document Upload API
@app.post("/api/v1/projects/{project_id}/upload-document")
async def upload_document(project_id: str, file: UploadFile = File(...)):
"""上传 PDF/DOCX 文档到指定项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
if not DOC_PROCESSOR_AVAILABLE:
raise HTTPException(status_code=500, detail="Document processor not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
content = await file.read()
# 处理文档
processor = get_doc_processor()
try:
result = processor.process(content, file.filename)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Document processing failed: {str(e)}")
# 保存文档转录记录
transcript_id = str(uuid.uuid4())[:8]
db.save_transcript(
transcript_id=transcript_id,
project_id=project_id,
filename=file.filename,
full_text=result["text"],
transcript_type="document"
)
# 提取实体和关系
raw_entities, raw_relations = extract_entities_with_llm(result["text"])
# 实体对齐并保存
aligned_entities = []
entity_name_to_id = {}
for raw_ent in raw_entities:
existing = align_entity(project_id, raw_ent["name"], db, raw_ent.get("definition", ""))
if existing:
entity_name_to_id[raw_ent["name"]] = existing.id
aligned_entities.append(EntityModel(
id=existing.id,
name=existing.name,
type=existing.type,
definition=existing.definition,
aliases=existing.aliases
))
else:
new_ent = db.create_entity(Entity(
id=str(uuid.uuid4())[:8],
project_id=project_id,
name=raw_ent["name"],
type=raw_ent.get("type", "OTHER"),
definition=raw_ent.get("definition", "")
))
entity_name_to_id[raw_ent["name"]] = new_ent.id
aligned_entities.append(EntityModel(
id=new_ent.id,
name=new_ent.name,
type=new_ent.type,
definition=new_ent.definition
))
# 保存实体提及位置
full_text = result["text"]
name = raw_ent["name"]
start_pos = 0
while True:
pos = full_text.find(name, start_pos)
if pos == -1:
break
mention = EntityMention(
id=str(uuid.uuid4())[:8],
entity_id=entity_name_to_id[name],
transcript_id=transcript_id,
start_pos=pos,
end_pos=pos + len(name),
text_snippet=full_text[max(0, pos-20):min(len(full_text), pos+len(name)+20)],
confidence=1.0
)
db.add_mention(mention)
start_pos = pos + 1
# 保存关系
for rel in raw_relations:
source_id = entity_name_to_id.get(rel.get("source", ""))
target_id = entity_name_to_id.get(rel.get("target", ""))
if source_id and target_id:
db.create_relation(
project_id=project_id,
source_entity_id=source_id,
target_entity_id=target_id,
relation_type=rel.get("type", "related"),
evidence=result["text"][:200],
transcript_id=transcript_id
)
return {
"transcript_id": transcript_id,
"project_id": project_id,
"filename": file.filename,
"text_length": len(result["text"]),
"entities": [e.dict() for e in aligned_entities],
"created_at": datetime.now().isoformat()
}
# Phase 3: Knowledge Base API
@app.get("/api/v1/projects/{project_id}/knowledge-base")
async def get_knowledge_base(project_id: str):
"""获取项目知识库 - 包含所有实体、关系、术语表"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取所有实体
entities = db.list_project_entities(project_id)
# 获取所有关系
relations = db.list_project_relations(project_id)
# 获取所有转录
transcripts = db.list_project_transcripts(project_id)
# 获取术语表
glossary = db.list_glossary(project_id)
# 构建实体统计和属性
entity_stats = {}
entity_attributes = {}
for ent in entities:
mentions = db.get_entity_mentions(ent.id)
entity_stats[ent.id] = {
"mention_count": len(mentions),
"transcript_ids": list(set([m.transcript_id for m in mentions]))
}
# Phase 5: 获取实体属性
attrs = db.get_entity_attributes(ent.id)
entity_attributes[ent.id] = attrs
# 构建实体名称映射
entity_map = {e.id: e.name for e in entities}
return {
"project": {
"id": project.id,
"name": project.name,
"description": project.description
},
"stats": {
"entity_count": len(entities),
"relation_count": len(relations),
"transcript_count": len(transcripts),
"glossary_count": len(glossary)
},
"entities": [
{
"id": e.id,
"name": e.name,
"type": e.type,
"definition": e.definition,
"aliases": e.aliases,
"mention_count": entity_stats.get(e.id, {}).get("mention_count", 0),
"appears_in": entity_stats.get(e.id, {}).get("transcript_ids", []),
"attributes": entity_attributes.get(e.id, []) # Phase 5: 包含属性
}
for e in entities
],
"relations": [
{
"id": r["id"],
"source_id": r["source_entity_id"],
"source_name": entity_map.get(r["source_entity_id"], "Unknown"),
"target_id": r["target_entity_id"],
"target_name": entity_map.get(r["target_entity_id"], "Unknown"),
"type": r["relation_type"],
"evidence": r["evidence"]
}
for r in relations
],
"glossary": [
{
"id": g["id"],
"term": g["term"],
"pronunciation": g["pronunciation"],
"frequency": g["frequency"]
}
for g in glossary
],
"transcripts": [
{
"id": t["id"],
"filename": t["filename"],
"type": t.get("type", "audio"),
"created_at": t["created_at"]
}
for t in transcripts
]
}
# Phase 3: Glossary API
@app.post("/api/v1/projects/{project_id}/glossary")
async def add_glossary_term(project_id: str, term: GlossaryTermCreate):
"""添加术语到项目术语表"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
term_id = db.add_glossary_term(
project_id=project_id,
term=term.term,
pronunciation=term.pronunciation
)
return {
"id": term_id,
"term": term.term,
"pronunciation": term.pronunciation,
"success": True
}
@app.get("/api/v1/projects/{project_id}/glossary")
async def get_glossary(project_id: str):
"""获取项目术语表"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
glossary = db.list_glossary(project_id)
return glossary
@app.delete("/api/v1/glossary/{term_id}")
async def delete_glossary_term(term_id: str):
"""删除术语"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
db.delete_glossary_term(term_id)
return {"success": True}
# Phase 3: Entity Alignment API
@app.post("/api/v1/projects/{project_id}/align-entities")
async def align_project_entities(project_id: str, threshold: float = 0.85):
"""运行实体对齐算法,合并相似实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
aligner = get_aligner()
if not aligner:
raise HTTPException(status_code=500, detail="Entity aligner not available")
db = get_db_manager()
entities = db.list_project_entities(project_id)
merged_count = 0
merged_pairs = []
# 使用 embedding 对齐
for i, entity in enumerate(entities):
# 跳过已合并的实体
existing = db.get_entity(entity.id)
if not existing:
continue
similar = aligner.find_similar_entity(
project_id,
entity.name,
entity.definition,
exclude_id=entity.id,
threshold=threshold
)
if similar:
# 合并实体
db.merge_entities(similar.id, entity.id)
merged_count += 1
merged_pairs.append({
"source": entity.name,
"target": similar.name
})
return {
"success": True,
"merged_count": merged_count,
"merged_pairs": merged_pairs
}
@app.get("/api/v1/projects/{project_id}/entities")
async def get_project_entities(project_id: str):
"""获取项目的全局实体列表"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
entities = db.list_project_entities(project_id)
return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition, "aliases": e.aliases} for e in entities]
@app.get("/api/v1/projects/{project_id}/relations")
async def get_project_relations(project_id: str):
"""获取项目的实体关系列表"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
relations = db.list_project_relations(project_id)
# 获取实体名称映射
entities = db.list_project_entities(project_id)
entity_map = {e.id: e.name for e in entities}
return [{
"id": r["id"],
"source_id": r["source_entity_id"],
"source_name": entity_map.get(r["source_entity_id"], "Unknown"),
"target_id": r["target_entity_id"],
"target_name": entity_map.get(r["target_entity_id"], "Unknown"),
"type": r["relation_type"],
"evidence": r["evidence"]
} for r in relations]
@app.get("/api/v1/projects/{project_id}/transcripts")
async def get_project_transcripts(project_id: str):
"""获取项目的转录列表"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
transcripts = db.list_project_transcripts(project_id)
return [{
"id": t["id"],
"filename": t["filename"],
"type": t.get("type", "audio"),
"created_at": t["created_at"],
"preview": t["full_text"][:100] + "..." if len(t["full_text"]) > 100 else t["full_text"]
} for t in transcripts]
@app.get("/api/v1/entities/{entity_id}/mentions")
async def get_entity_mentions(entity_id: str):
"""获取实体的所有提及位置"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
mentions = db.get_entity_mentions(entity_id)
return [{
"id": m.id,
"transcript_id": m.transcript_id,
"start_pos": m.start_pos,
"end_pos": m.end_pos,
"text_snippet": m.text_snippet,
"confidence": m.confidence
} for m in mentions]
# Health check
@app.get("/health")
async def health_check():
return {
"status": "ok",
"version": "0.6.0",
"phase": "Phase 5 - Knowledge Reasoning",
"oss_available": OSS_AVAILABLE,
"tingwu_available": TINGWU_AVAILABLE,
"db_available": DB_AVAILABLE,
"doc_processor_available": DOC_PROCESSOR_AVAILABLE,
"aligner_available": ALIGNER_AVAILABLE,
"llm_client_available": LLM_CLIENT_AVAILABLE,
"reasoner_available": REASONER_AVAILABLE
}
# ==================== Phase 4: Agent 助手 API ====================
@app.post("/api/v1/projects/{project_id}/agent/query")
async def agent_query(project_id: str, query: AgentQuery):
"""Agent RAG 问答"""
if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE:
raise HTTPException(status_code=500, detail="Service not available")
db = get_db_manager()
llm = get_llm_client()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目上下文
project_context = db.get_project_summary(project_id)
# 构建上下文
context_parts = []
for t in project_context.get('recent_transcripts', []):
context_parts.append(f"{t['filename']}\n{t['full_text'][:1000]}")
context = "\n\n".join(context_parts)
if query.stream:
from fastapi.responses import StreamingResponse
import json
async def stream_response():
messages = [
ChatMessage(role="system", content="你是一个专业的项目分析助手,擅长从会议记录中提取洞察。"),
ChatMessage(role="user", content=f"""基于以下项目信息回答问题:
## 项目信息
{json.dumps(project_context, ensure_ascii=False, indent=2)}
## 相关上下文
{context[:4000]}
## 用户问题
{query.query}
请用中文回答,保持简洁专业。如果信息不足,请明确说明。""")
]
async for chunk in llm.chat_stream(messages):
yield f"data: {json.dumps({'content': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(stream_response(), media_type="text/event-stream")
else:
answer = await llm.rag_query(query.query, context, project_context)
return {"answer": answer, "project_id": project_id}
@app.post("/api/v1/projects/{project_id}/agent/command")
async def agent_command(project_id: str, command: AgentCommand):
"""Agent 指令执行 - 解析并执行自然语言指令"""
if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE:
raise HTTPException(status_code=500, detail="Service not available")
db = get_db_manager()
llm = get_llm_client()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目上下文
project_context = db.get_project_summary(project_id)
# 解析指令
parsed = await llm.agent_command(command.command, project_context)
intent = parsed.get("intent", "unknown")
params = parsed.get("params", {})
result = {"intent": intent, "explanation": parsed.get("explanation", "")}
# 执行指令
if intent == "merge_entities":
# 合并实体
source_names = params.get("source_names", [])
target_name = params.get("target_name", "")
target_entity = None
source_entities = []
# 查找目标实体
for e in project_context.get("top_entities", []):
if e["name"] == target_name or target_name in e["name"]:
target_entity = db.get_entity_by_name(project_id, e["name"])
break
# 查找源实体
for name in source_names:
for e in project_context.get("top_entities", []):
if e["name"] == name or name in e["name"]:
ent = db.get_entity_by_name(project_id, e["name"])
if ent and (not target_entity or ent.id != target_entity.id):
source_entities.append(ent)
break
merged = []
if target_entity:
for source in source_entities:
try:
db.merge_entities(target_entity.id, source.id)
merged.append(source.name)
except Exception as e:
print(f"Merge failed: {e}")
result["action"] = "merge_entities"
result["target"] = target_entity.name if target_entity else None
result["merged"] = merged
result["success"] = len(merged) > 0
elif intent == "answer_question":
# 问答 - 调用 RAG
answer = await llm.rag_query(params.get("question", command.command), "", project_context)
result["action"] = "answer"
result["answer"] = answer
elif intent == "edit_entity":
# 编辑实体
entity_name = params.get("entity_name", "")
field = params.get("field", "")
value = params.get("value", "")
entity = db.get_entity_by_name(project_id, entity_name)
if entity:
updated = db.update_entity(entity.id, **{field: value})
result["action"] = "edit_entity"
result["entity"] = {"id": updated.id, "name": updated.name} if updated else None
result["success"] = updated is not None
else:
result["success"] = False
result["error"] = "Entity not found"
else:
result["action"] = "none"
result["message"] = "无法理解的指令,请尝试:\n- 合并实体:把所有'客户端'合并到'App'\n- 提问:张总对项目的态度如何?\n- 编辑:修改'K8s'的定义为..."
return result
@app.get("/api/v1/projects/{project_id}/agent/suggest")
async def agent_suggest(project_id: str):
"""获取 Agent 建议 - 基于项目数据提供洞察"""
if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE:
raise HTTPException(status_code=500, detail="Service not available")
db = get_db_manager()
llm = get_llm_client()
project_context = db.get_project_summary(project_id)
# 生成建议
prompt = f"""基于以下项目数据提供3-5条分析建议
{json.dumps(project_context, ensure_ascii=False, indent=2)}
请提供:
1. 数据洞察发现
2. 建议的操作(如合并相似实体、补充定义等)
3. 值得关注的关键信息
返回 JSON 格式:{{"suggestions": [{{"type": "insight|action", "title": "...", "description": "..."}}]}}"""
messages = [ChatMessage(role="user", content=prompt)]
content = await llm.chat(messages, temperature=0.3)
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return data
except:
pass
return {"suggestions": []}
# ==================== Phase 4: 知识溯源 API ====================
@app.get("/api/v1/relations/{relation_id}/provenance")
async def get_relation_provenance(relation_id: str):
"""获取关系的知识溯源信息"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
relation = db.get_relation_with_details(relation_id)
if not relation:
raise HTTPException(status_code=404, detail="Relation not found")
return {
"relation_id": relation_id,
"source": relation.get("source_name"),
"target": relation.get("target_name"),
"type": relation.get("relation_type"),
"evidence": relation.get("evidence"),
"transcript": {
"id": relation.get("transcript_id"),
"filename": relation.get("transcript_filename"),
} if relation.get("transcript_id") else None
}
@app.get("/api/v1/entities/{entity_id}/details")
async def get_entity_details(entity_id: str):
"""获取实体详情,包含所有提及位置"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity_with_mentions(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
return entity
@app.get("/api/v1/entities/{entity_id}/evolution")
async def get_entity_evolution(entity_id: str):
"""分析实体的演变和态度变化"""
if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE:
raise HTTPException(status_code=500, detail="Service not available")
db = get_db_manager()
llm = get_llm_client()
entity = db.get_entity_with_mentions(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
# 分析演变
analysis = await llm.analyze_entity_evolution(entity["name"], entity.get("mentions", []))
return {
"entity_id": entity_id,
"entity_name": entity["name"],
"mention_count": entity.get("mention_count", 0),
"analysis": analysis,
"timeline": [
{
"date": m.get("transcript_date"),
"snippet": m.get("text_snippet"),
"transcript_id": m.get("transcript_id"),
"filename": m.get("filename")
}
for m in entity.get("mentions", [])
]
}
# ==================== Phase 4: 实体管理增强 API ====================
@app.get("/api/v1/projects/{project_id}/entities/search")
async def search_entities(project_id: str, q: str):
"""搜索实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entities = db.search_entities(project_id, q)
return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities]
# ==================== Phase 5: 时间线视图 API ====================
@app.get("/api/v1/projects/{project_id}/timeline")
async def get_project_timeline(
project_id: str,
entity_id: str = None,
start_date: str = None,
end_date: str = None
):
"""获取项目时间线 - 按时间顺序的实体提及和关系事件"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
timeline = db.get_project_timeline(project_id, entity_id, start_date, end_date)
return {
"project_id": project_id,
"events": timeline,
"total_count": len(timeline)
}
@app.get("/api/v1/projects/{project_id}/timeline/summary")
async def get_timeline_summary(project_id: str):
"""获取项目时间线摘要统计"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
summary = db.get_entity_timeline_summary(project_id)
return {
"project_id": project_id,
"project_name": project.name,
**summary
}
@app.get("/api/v1/entities/{entity_id}/timeline")
async def get_entity_timeline(entity_id: str):
"""获取单个实体的时间线"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
timeline = db.get_project_timeline(entity.project_id, entity_id)
return {
"entity_id": entity_id,
"entity_name": entity.name,
"entity_type": entity.type,
"events": timeline,
"total_count": len(timeline)
}
# ==================== Phase 5: 知识推理与问答增强 API ====================
class ReasoningQuery(BaseModel):
query: str
reasoning_depth: str = "medium" # shallow/medium/deep
stream: bool = False
@app.post("/api/v1/projects/{project_id}/reasoning/query")
async def reasoning_query(project_id: str, query: ReasoningQuery):
"""
增强问答 - 基于知识推理的智能问答
支持多种推理类型:
- 因果推理:分析原因和影响
- 对比推理:比较实体间的异同
- 时序推理:分析时间线和演变
- 关联推理:发现隐含关联
"""
if not DB_AVAILABLE or not REASONER_AVAILABLE:
raise HTTPException(status_code=500, detail="Knowledge reasoner not available")
db = get_db_manager()
reasoner = get_knowledge_reasoner()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目上下文
project_context = db.get_project_summary(project_id)
# 获取知识图谱数据
entities = db.list_project_entities(project_id)
relations = db.list_project_relations(project_id)
graph_data = {
"entities": [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities],
"relations": relations
}
# 执行增强问答
result = await reasoner.enhanced_qa(
query=query.query,
project_context=project_context,
graph_data=graph_data,
reasoning_depth=query.reasoning_depth
)
return {
"answer": result.answer,
"reasoning_type": result.reasoning_type.value,
"confidence": result.confidence,
"evidence": result.evidence,
"knowledge_gaps": result.gaps,
"project_id": project_id
}
@app.post("/api/v1/projects/{project_id}/reasoning/inference-path")
async def find_inference_path(
project_id: str,
start_entity: str,
end_entity: str
):
"""
发现两个实体之间的推理路径
在知识图谱中搜索从 start_entity 到 end_entity 的路径
"""
if not DB_AVAILABLE or not REASONER_AVAILABLE:
raise HTTPException(status_code=500, detail="Knowledge reasoner not available")
db = get_db_manager()
reasoner = get_knowledge_reasoner()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取知识图谱数据
entities = db.list_project_entities(project_id)
relations = db.list_project_relations(project_id)
graph_data = {
"entities": [{"id": e.id, "name": e.name, "type": e.type} for e in entities],
"relations": relations
}
# 查找推理路径
paths = reasoner.find_inference_paths(start_entity, end_entity, graph_data)
return {
"start_entity": start_entity,
"end_entity": end_entity,
"paths": [
{
"path": path.path,
"strength": path.strength,
"path_description": " -> ".join([p["entity"] for p in path.path])
}
for path in paths[:5] # 最多返回5条路径
],
"total_paths": len(paths)
}
class SummaryRequest(BaseModel):
summary_type: str = "comprehensive" # comprehensive/executive/technical/risk
@app.post("/api/v1/projects/{project_id}/reasoning/summary")
async def project_summary(project_id: str, req: SummaryRequest):
"""
项目智能总结
根据类型生成不同侧重点的总结:
- comprehensive: 全面总结
- executive: 高管摘要
- technical: 技术总结
- risk: 风险分析
"""
if not DB_AVAILABLE or not REASONER_AVAILABLE:
raise HTTPException(status_code=500, detail="Knowledge reasoner not available")
db = get_db_manager()
reasoner = get_knowledge_reasoner()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目上下文
project_context = db.get_project_summary(project_id)
# 获取知识图谱数据
entities = db.list_project_entities(project_id)
relations = db.list_project_relations(project_id)
graph_data = {
"entities": [{"id": e.id, "name": e.name, "type": e.type} for e in entities],
"relations": relations
}
# 生成总结
summary = await reasoner.summarize_project(
project_context=project_context,
graph_data=graph_data,
summary_type=req.summary_type
)
return {
"project_id": project_id,
"summary_type": req.summary_type,
**summary
**summary
}
# ==================== Phase 5: 实体属性扩展 API ====================
class AttributeTemplateCreate(BaseModel):
name: str
type: str # text, number, date, select, multiselect, boolean
options: Optional[List[str]] = None
default_value: Optional[str] = ""
description: Optional[str] = ""
is_required: bool = False
sort_order: int = 0
class AttributeTemplateUpdate(BaseModel):
name: Optional[str] = None
type: Optional[str] = None
options: Optional[List[str]] = None
default_value: Optional[str] = None
description: Optional[str] = None
is_required: Optional[bool] = None
sort_order: Optional[int] = None
class EntityAttributeSet(BaseModel):
name: str
type: str
value: Optional[Union[str, int, float, List[str]]] = None
template_id: Optional[str] = None
options: Optional[List[str]] = None
change_reason: Optional[str] = ""
class EntityAttributeBatchSet(BaseModel):
attributes: List[EntityAttributeSet]
change_reason: Optional[str] = ""
# 属性模板管理 API
@app.post("/api/v1/projects/{project_id}/attribute-templates")
async def create_attribute_template_endpoint(project_id: str, template: AttributeTemplateCreate):
"""创建属性模板"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
from db_manager import AttributeTemplate
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
new_template = AttributeTemplate(
id=str(uuid.uuid4())[:8],
project_id=project_id,
name=template.name,
type=template.type,
options=template.options or [],
default_value=template.default_value or "",
description=template.description or "",
is_required=template.is_required,
sort_order=template.sort_order
)
db.create_attribute_template(new_template)
return {
"id": new_template.id,
"name": new_template.name,
"type": new_template.type,
"success": True
}
@app.get("/api/v1/projects/{project_id}/attribute-templates")
async def list_attribute_templates_endpoint(project_id: str):
"""列出项目的所有属性模板"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
templates = db.list_attribute_templates(project_id)
return [
{
"id": t.id,
"name": t.name,
"type": t.type,
"options": t.options,
"default_value": t.default_value,
"description": t.description,
"is_required": t.is_required,
"sort_order": t.sort_order
}
for t in templates
]
@app.get("/api/v1/attribute-templates/{template_id}")
async def get_attribute_template_endpoint(template_id: str):
"""获取属性模板详情"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
template = db.get_attribute_template(template_id)
if not template:
raise HTTPException(status_code=404, detail="Template not found")
return {
"id": template.id,
"name": template.name,
"type": template.type,
"options": template.options,
"default_value": template.default_value,
"description": template.description,
"is_required": template.is_required,
"sort_order": template.sort_order
}
@app.put("/api/v1/attribute-templates/{template_id}")
async def update_attribute_template_endpoint(template_id: str, update: AttributeTemplateUpdate):
"""更新属性模板"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
template = db.get_attribute_template(template_id)
if not template:
raise HTTPException(status_code=404, detail="Template not found")
update_data = {k: v for k, v in update.dict().items() if v is not None}
updated = db.update_attribute_template(template_id, **update_data)
return {
"id": updated.id,
"name": updated.name,
"type": updated.type,
"success": True
}
@app.delete("/api/v1/attribute-templates/{template_id}")
async def delete_attribute_template_endpoint(template_id: str):
"""删除属性模板"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
db.delete_attribute_template(template_id)
return {"success": True, "message": f"Template {template_id} deleted"}
# 实体属性值管理 API
@app.post("/api/v1/entities/{entity_id}/attributes")
async def set_entity_attribute_endpoint(entity_id: str, attr: EntityAttributeSet):
"""设置实体属性值"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
# 验证类型
valid_types = ['text', 'number', 'date', 'select', 'multiselect']
if attr.type not in valid_types:
raise HTTPException(status_code=400, detail=f"Invalid type. Must be one of: {valid_types}")
# 处理 value
value = attr.value
if attr.type == 'multiselect' and isinstance(value, list):
value = json.dumps(value)
elif value is not None:
value = str(value)
# 处理 options
options = attr.options
if options:
options = json.dumps(options)
# 检查是否已存在
conn = db.get_conn()
existing = conn.execute(
"SELECT * FROM entity_attributes WHERE entity_id = ? AND name = ?",
(entity_id, attr.name)
).fetchone()
now = datetime.now().isoformat()
if existing:
# 记录历史
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], entity_id, attr.name, existing['value'], value,
"user", now, attr.change_reason or "")
)
# 更新
conn.execute(
"""UPDATE entity_attributes
SET value = ?, type = ?, options = ?, updated_at = ?
WHERE id = ?""",
(value, attr.type, options, now, existing['id'])
)
attr_id = existing['id']
else:
# 创建
attr_id = str(uuid.uuid4())[:8]
conn.execute(
"""INSERT INTO entity_attributes
(id, entity_id, template_id, name, type, value, options, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(attr_id, entity_id, attr.template_id, attr.name, attr.type, value, options, now, now)
)
# 记录历史
conn.execute(
"""INSERT INTO attribute_history
(id, entity_id, attribute_name, old_value, new_value, changed_by, changed_at, change_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(str(uuid.uuid4())[:8], entity_id, attr.name, None, value,
"user", now, attr.change_reason or "创建属性")
)
conn.commit()
conn.close()
return {
"id": attr_id,
"entity_id": entity_id,
"name": attr.name,
"type": attr.type,
"value": attr.value,
"success": True
}
@app.post("/api/v1/entities/{entity_id}/attributes/batch")
async def batch_set_entity_attributes_endpoint(entity_id: str, batch: EntityAttributeBatchSet):
"""批量设置实体属性值"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
from db_manager import EntityAttribute
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
results = []
for attr_data in batch.attributes:
template = db.get_attribute_template(attr_data.template_id)
if template:
new_attr = EntityAttribute(
id=str(uuid.uuid4())[:8],
entity_id=entity_id,
template_id=attr_data.template_id,
value=attr_data.value
)
db.set_entity_attribute(new_attr, changed_by="user",
change_reason=batch.change_reason or "批量更新")
results.append({
"template_id": attr_data.template_id,
"template_name": template.name,
"value": attr_data.value
})
return {
"entity_id": entity_id,
"updated_count": len(results),
"attributes": results,
"success": True
}
@app.get("/api/v1/entities/{entity_id}/attributes")
async def get_entity_attributes_endpoint(entity_id: str):
"""获取实体的所有属性值"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
attrs = db.get_entity_attributes(entity_id)
return [
{
"id": a.id,
"template_id": a.template_id,
"template_name": a.template_name,
"template_type": a.template_type,
"value": a.value
}
for a in attrs
]
@app.delete("/api/v1/entities/{entity_id}/attributes/{template_id}")
async def delete_entity_attribute_endpoint(entity_id: str, template_id: str,
reason: Optional[str] = ""):
"""删除实体属性值"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
db.delete_entity_attribute(entity_id, template_id,
changed_by="user", change_reason=reason)
return {"success": True, "message": "Attribute deleted"}
# 属性历史 API
@app.get("/api/v1/entities/{entity_id}/attributes/history")
async def get_entity_attribute_history_endpoint(entity_id: str, limit: int = 50):
"""获取实体的属性变更历史"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
history = db.get_attribute_history(entity_id=entity_id, limit=limit)
return [
{
"id": h.id,
"attribute_name": h.attribute_name,
"old_value": h.old_value,
"new_value": h.new_value,
"changed_by": h.changed_by,
"changed_at": h.changed_at,
"change_reason": h.change_reason
}
for h in history
]
@app.get("/api/v1/attribute-templates/{template_id}/history")
async def get_template_history_endpoint(template_id: str, limit: int = 50):
"""获取属性模板的所有变更历史(跨实体)"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
history = db.get_attribute_history(template_id=template_id, limit=limit)
return [
{
"id": h.id,
"entity_id": h.entity_id,
"template_name": h.template_name,
"old_value": h.old_value,
"new_value": h.new_value,
"changed_by": h.changed_by,
"changed_at": h.changed_at,
"change_reason": h.change_reason
}
for h in history
]
# 属性筛选搜索 API
@app.get("/api/v1/projects/{project_id}/entities/search-by-attributes")
async def search_entities_by_attributes_endpoint(
project_id: str,
attribute_filter: Optional[str] = None # JSON 格式: {"职位": "经理", "部门": "技术部"}
):
"""根据属性筛选搜索实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
filters = {}
if attribute_filter:
try:
filters = json.loads(attribute_filter)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid attribute_filter JSON")
entities = db.search_entities_by_attributes(project_id, filters)
return [
{
"id": e.id,
"name": e.name,
"type": e.type,
"definition": e.definition,
"attributes": e.attributes
}
for e in entities
]
# ==================== 导出功能 API ====================
from fastapi.responses import StreamingResponse, FileResponse
@app.get("/api/v1/projects/{project_id}/export/graph-svg")
async def export_graph_svg_endpoint(project_id: str):
"""导出知识图谱为 SVG"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
export_mgr = get_export_manager()
svg_content = export_mgr.export_knowledge_graph_svg(project_id, entities, relations)
return StreamingResponse(
io.BytesIO(svg_content.encode('utf-8')),
media_type="image/svg+xml",
headers={"Content-Disposition": f"attachment; filename=insightflow-graph-{project_id}.svg"}
)
@app.get("/api/v1/projects/{project_id}/export/graph-png")
async def export_graph_png_endpoint(project_id: str):
"""导出知识图谱为 PNG"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
export_mgr = get_export_manager()
png_bytes = export_mgr.export_knowledge_graph_png(project_id, entities, relations)
return StreamingResponse(
io.BytesIO(png_bytes),
media_type="image/png",
headers={"Content-Disposition": f"attachment; filename=insightflow-graph-{project_id}.png"}
)
@app.get("/api/v1/projects/{project_id}/export/entities-excel")
async def export_entities_excel_endpoint(project_id: str):
"""导出实体数据为 Excel"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取实体数据
entities_data = db.get_project_entities(project_id)
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
export_mgr = get_export_manager()
excel_bytes = export_mgr.export_entities_excel(entities)
return StreamingResponse(
io.BytesIO(excel_bytes),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename=insightflow-entities-{project_id}.xlsx"}
)
@app.get("/api/v1/projects/{project_id}/export/entities-csv")
async def export_entities_csv_endpoint(project_id: str):
"""导出实体数据为 CSV"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取实体数据
entities_data = db.get_project_entities(project_id)
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
export_mgr = get_export_manager()
csv_content = export_mgr.export_entities_csv(entities)
return StreamingResponse(
io.BytesIO(csv_content.encode('utf-8')),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=insightflow-entities-{project_id}.csv"}
)
@app.get("/api/v1/projects/{project_id}/export/relations-csv")
async def export_relations_csv_endpoint(project_id: str):
"""导出关系数据为 CSV"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取关系数据
relations_data = db.get_project_relations(project_id)
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
export_mgr = get_export_manager()
csv_content = export_mgr.export_relations_csv(relations)
return StreamingResponse(
io.BytesIO(csv_content.encode('utf-8')),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=insightflow-relations-{project_id}.csv"}
)
@app.get("/api/v1/projects/{project_id}/export/report-pdf")
async def export_report_pdf_endpoint(project_id: str):
"""导出项目报告为 PDF"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
transcripts_data = db.get_project_transcripts(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
transcripts = []
for t in transcripts_data:
segments = json.loads(t.segments) if t.segments else []
transcripts.append(ExportTranscript(
id=t.id,
name=t.name,
type=t.type,
content=t.full_text or "",
segments=segments,
entity_mentions=[]
))
# 获取项目总结
summary = ""
if REASONER_AVAILABLE:
try:
reasoner = get_knowledge_reasoner()
summary_result = reasoner.generate_project_summary(project_id, db)
summary = summary_result.get("summary", "")
except:
pass
export_mgr = get_export_manager()
pdf_bytes = export_mgr.export_project_report_pdf(
project_id, project.name, entities, relations, transcripts, summary
)
return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=insightflow-report-{project_id}.pdf"}
)
@app.get("/api/v1/projects/{project_id}/export/project-json")
async def export_project_json_endpoint(project_id: str):
"""导出完整项目数据为 JSON"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
transcripts_data = db.get_project_transcripts(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
transcripts = []
for t in transcripts_data:
segments = json.loads(t.segments) if t.segments else []
transcripts.append(ExportTranscript(
id=t.id,
name=t.name,
type=t.type,
content=t.full_text or "",
segments=segments,
entity_mentions=[]
))
export_mgr = get_export_manager()
json_content = export_mgr.export_project_json(
project_id, project.name, entities, relations, transcripts
)
return StreamingResponse(
io.BytesIO(json_content.encode('utf-8')),
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename=insightflow-project-{project_id}.json"}
)
@app.get("/api/v1/transcripts/{transcript_id}/export/markdown")
async def export_transcript_markdown_endpoint(transcript_id: str):
"""导出转录文本为 Markdown"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
transcript = db.get_transcript(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
# 获取实体提及
mentions = db.get_transcript_entity_mentions(transcript_id)
# 获取项目实体用于映射
entities_data = db.get_project_entities(transcript.project_id)
entities_map = {e.id: ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={}
) for e in entities_data}
segments = json.loads(transcript.segments) if transcript.segments else []
export_transcript = ExportTranscript(
id=transcript.id,
name=transcript.name,
type=transcript.type,
content=transcript.full_text or "",
segments=segments,
entity_mentions=[{
"entity_id": m.entity_id,
"entity_name": m.entity_name,
"position": m.position,
"context": m.context
} for m in mentions]
)
export_mgr = get_export_manager()
markdown_content = export_mgr.export_transcript_markdown(export_transcript, entities_map)
return StreamingResponse(
io.BytesIO(markdown_content.encode('utf-8')),
media_type="text/markdown",
headers={"Content-Disposition": f"attachment; filename=insightflow-transcript-{transcript_id}.md"}
)
# ==================== Neo4j Graph Database API ====================
class Neo4jSyncRequest(BaseModel):
project_id: str
class PathQueryRequest(BaseModel):
source_entity_id: str
target_entity_id: str
max_depth: int = 10
class GraphQueryRequest(BaseModel):
entity_ids: List[str]
depth: int = 1
@app.get("/api/v1/neo4j/status")
async def neo4j_status():
"""获取 Neo4j 连接状态"""
if not NEO4J_AVAILABLE:
return {
"available": False,
"connected": False,
"message": "Neo4j driver not installed"
}
try:
manager = get_neo4j_manager()
connected = manager.is_connected()
return {
"available": True,
"connected": connected,
"uri": manager.uri if connected else None,
"message": "Connected" if connected else "Not connected"
}
except Exception as e:
return {
"available": True,
"connected": False,
"message": str(e)
}
@app.post("/api/v1/neo4j/sync")
async def neo4j_sync_project(request: Neo4jSyncRequest):
"""同步项目数据到 Neo4j"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
db = get_db_manager()
project = db.get_project(request.project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目所有实体
entities = db.get_project_entities(request.project_id)
entities_data = []
for e in entities:
entities_data.append({
"id": e.id,
"name": e.name,
"type": e.type,
"definition": e.definition,
"aliases": json.loads(e.aliases) if e.aliases else [],
"properties": e.attributes if hasattr(e, 'attributes') else {}
})
# 获取项目所有关系
relations = db.get_project_relations(request.project_id)
relations_data = []
for r in relations:
relations_data.append({
"id": r.id,
"source_entity_id": r.source_entity_id,
"target_entity_id": r.target_entity_id,
"relation_type": r.relation_type,
"evidence": r.evidence,
"properties": {}
})
# 同步到 Neo4j
sync_project_to_neo4j(
project_id=request.project_id,
project_name=project.name,
entities=entities_data,
relations=relations_data
)
return {
"success": True,
"project_id": request.project_id,
"entities_synced": len(entities_data),
"relations_synced": len(relations_data),
"message": f"Synced {len(entities_data)} entities and {len(relations_data)} relations to Neo4j"
}
@app.get("/api/v1/projects/{project_id}/graph/stats")
async def get_graph_stats(project_id: str):
"""获取项目图统计信息"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
stats = manager.get_graph_stats(project_id)
return stats
@app.post("/api/v1/graph/shortest-path")
async def find_shortest_path(request: PathQueryRequest):
"""查找两个实体之间的最短路径"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
path = manager.find_shortest_path(
request.source_entity_id,
request.target_entity_id,
request.max_depth
)
if not path:
return {
"found": False,
"message": "No path found between entities"
}
return {
"found": True,
"path": {
"nodes": path.nodes,
"relationships": path.relationships,
"length": path.length
}
}
@app.post("/api/v1/graph/paths")
async def find_all_paths(request: PathQueryRequest):
"""查找两个实体之间的所有路径"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
paths = manager.find_all_paths(
request.source_entity_id,
request.target_entity_id,
request.max_depth
)
return {
"count": len(paths),
"paths": [
{
"nodes": p.nodes,
"relationships": p.relationships,
"length": p.length
}
for p in paths
]
}
@app.get("/api/v1/entities/{entity_id}/neighbors")
async def get_entity_neighbors(
entity_id: str,
relation_type: str = None,
limit: int = 50
):
"""获取实体的邻居节点"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
neighbors = manager.find_neighbors(entity_id, relation_type, limit)
return {
"entity_id": entity_id,
"count": len(neighbors),
"neighbors": neighbors
}
@app.get("/api/v1/entities/{entity_id1}/common-neighbors/{entity_id2}")
async def get_common_neighbors(entity_id1: str, entity_id2: str):
"""获取两个实体的共同邻居"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
common = manager.find_common_neighbors(entity_id1, entity_id2)
return {
"entity_id1": entity_id1,
"entity_id2": entity_id2,
"count": len(common),
"common_neighbors": common
}
@app.get("/api/v1/projects/{project_id}/graph/centrality")
async def get_centrality_analysis(
project_id: str,
metric: str = "degree"
):
"""获取中心性分析结果"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
rankings = manager.find_central_entities(project_id, metric)
return {
"metric": metric,
"count": len(rankings),
"rankings": [
{
"entity_id": r.entity_id,
"entity_name": r.entity_name,
"score": r.score,
"rank": r.rank
}
for r in rankings
]
}
@app.get("/api/v1/projects/{project_id}/graph/communities")
async def get_communities(project_id: str):
"""获取社区发现结果"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
communities = manager.detect_communities(project_id)
return {
"count": len(communities),
"communities": [
{
"community_id": c.community_id,
"size": c.size,
"density": c.density,
"nodes": c.nodes
}
for c in communities
]
}
@app.post("/api/v1/graph/subgraph")
async def get_subgraph(request: GraphQueryRequest):
"""获取子图"""
if not NEO4J_AVAILABLE:
raise HTTPException(status_code=503, detail="Neo4j not available")
manager = get_neo4j_manager()
if not manager.is_connected():
raise HTTPException(status_code=503, detail="Neo4j not connected")
subgraph = manager.get_subgraph(request.entity_ids, request.depth)
return subgraph
# ==========================================
# Phase 7: 协作与共享 API
# ==========================================
# ----- 项目分享 -----
@app.post("/api/v1/projects/{project_id}/shares")
async def create_share_link(project_id: str, request: ShareLinkCreate, created_by: str = "current_user"):
"""创建项目分享链接"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
share = manager.create_share_link(
project_id=project_id,
created_by=created_by,
permission=request.permission,
expires_in_days=request.expires_in_days,
max_uses=request.max_uses,
password=request.password,
allow_download=request.allow_download,
allow_export=request.allow_export
)
return {
"id": share.id,
"token": share.token,
"permission": share.permission,
"created_at": share.created_at,
"expires_at": share.expires_at,
"max_uses": share.max_uses,
"share_url": f"/share/{share.token}"
}
@app.get("/api/v1/projects/{project_id}/shares")
async def list_project_shares(project_id: str):
"""列出项目的所有分享链接"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
shares = manager.list_project_shares(project_id)
return {
"shares": [
{
"id": s.id,
"token": s.token,
"permission": s.permission,
"created_at": s.created_at,
"expires_at": s.expires_at,
"use_count": s.use_count,
"max_uses": s.max_uses,
"is_active": s.is_active,
"has_password": s.password_hash is not None,
"allow_download": s.allow_download,
"allow_export": s.allow_export
}
for s in shares
]
}
@app.post("/api/v1/shares/verify")
async def verify_share_link(request: ShareLinkVerify):
"""验证分享链接"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
share = manager.validate_share_token(request.token, request.password)
if not share:
raise HTTPException(status_code=401, detail="Invalid or expired share link")
# 增加使用次数
manager.increment_share_usage(request.token)
return {
"valid": True,
"project_id": share.project_id,
"permission": share.permission,
"allow_download": share.allow_download,
"allow_export": share.allow_export
}
@app.get("/api/v1/shares/{token}/access")
async def access_shared_project(token: str, password: Optional[str] = None):
"""通过分享链接访问项目"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
share = manager.validate_share_token(token, password)
if not share:
raise HTTPException(status_code=401, detail="Invalid or expired share link")
# 增加使用次数
manager.increment_share_usage(token)
# 获取项目信息
if not DB_AVAILABLE:
raise HTTPException(status_code=503, detail="Database not available")
db = get_db_manager()
project = db.get_project(share.project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return {
"project": {
"id": project.id,
"name": project.name,
"description": project.description,
"created_at": project.created_at
},
"permission": share.permission,
"allow_download": share.allow_download,
"allow_export": share.allow_export
}
@app.delete("/api/v1/shares/{share_id}")
async def revoke_share_link(share_id: str, revoked_by: str = "current_user"):
"""撤销分享链接"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
success = manager.revoke_share_link(share_id, revoked_by)
if not success:
raise HTTPException(status_code=404, detail="Share link not found")
return {"success": True, "message": "Share link revoked"}
# ----- 评论和批注 -----
@app.post("/api/v1/projects/{project_id}/comments")
async def add_comment(project_id: str, request: CommentCreate, author: str = "current_user", author_name: str = "User"):
"""添加评论"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
comment = manager.add_comment(
project_id=project_id,
target_type=request.target_type,
target_id=request.target_id,
author=author,
author_name=author_name,
content=request.content,
parent_id=request.parent_id,
mentions=request.mentions
)
return {
"id": comment.id,
"target_type": comment.target_type,
"target_id": comment.target_id,
"parent_id": comment.parent_id,
"author": comment.author,
"author_name": comment.author_name,
"content": comment.content,
"created_at": comment.created_at,
"resolved": comment.resolved
}
@app.get("/api/v1/{target_type}/{target_id}/comments")
async def get_comments(target_type: str, target_id: str, include_resolved: bool = True):
"""获取评论列表"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
comments = manager.get_comments(target_type, target_id, include_resolved)
return {
"count": len(comments),
"comments": [
{
"id": c.id,
"parent_id": c.parent_id,
"author": c.author,
"author_name": c.author_name,
"content": c.content,
"created_at": c.created_at,
"updated_at": c.updated_at,
"resolved": c.resolved,
"resolved_by": c.resolved_by,
"resolved_at": c.resolved_at
}
for c in comments
]
}
@app.get("/api/v1/projects/{project_id}/comments")
async def get_project_comments(project_id: str, limit: int = 50, offset: int = 0):
"""获取项目下的所有评论"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
comments = manager.get_project_comments(project_id, limit, offset)
return {
"count": len(comments),
"comments": [
{
"id": c.id,
"target_type": c.target_type,
"target_id": c.target_id,
"parent_id": c.parent_id,
"author": c.author,
"author_name": c.author_name,
"content": c.content,
"created_at": c.created_at,
"resolved": c.resolved
}
for c in comments
]
}
@app.put("/api/v1/comments/{comment_id}")
async def update_comment(comment_id: str, request: CommentUpdate, updated_by: str = "current_user"):
"""更新评论"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
comment = manager.update_comment(comment_id, request.content, updated_by)
if not comment:
raise HTTPException(status_code=404, detail="Comment not found or not authorized")
return {
"id": comment.id,
"content": comment.content,
"updated_at": comment.updated_at
}
@app.post("/api/v1/comments/{comment_id}/resolve")
async def resolve_comment(comment_id: str, resolved_by: str = "current_user"):
"""标记评论为已解决"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
success = manager.resolve_comment(comment_id, resolved_by)
if not success:
raise HTTPException(status_code=404, detail="Comment not found")
return {"success": True, "message": "Comment resolved"}
@app.delete("/api/v1/comments/{comment_id}")
async def delete_comment(comment_id: str, deleted_by: str = "current_user"):
"""删除评论"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
success = manager.delete_comment(comment_id, deleted_by)
if not success:
raise HTTPException(status_code=404, detail="Comment not found or not authorized")
return {"success": True, "message": "Comment deleted"}
# ----- 变更历史 -----
@app.get("/api/v1/projects/{project_id}/history")
async def get_change_history(
project_id: str,
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
limit: int = 50,
offset: int = 0
):
"""获取变更历史"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
records = manager.get_change_history(project_id, entity_type, entity_id, limit, offset)
return {
"count": len(records),
"history": [
{
"id": r.id,
"change_type": r.change_type,
"entity_type": r.entity_type,
"entity_id": r.entity_id,
"entity_name": r.entity_name,
"changed_by": r.changed_by,
"changed_by_name": r.changed_by_name,
"changed_at": r.changed_at,
"old_value": r.old_value,
"new_value": r.new_value,
"description": r.description,
"reverted": r.reverted
}
for r in records
]
}
@app.get("/api/v1/projects/{project_id}/history/stats")
async def get_change_history_stats(project_id: str):
"""获取变更统计"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
stats = manager.get_change_stats(project_id)
return stats
@app.get("/api/v1/{entity_type}/{entity_id}/versions")
async def get_entity_versions(entity_type: str, entity_id: str):
"""获取实体版本历史"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
records = manager.get_entity_version_history(entity_type, entity_id)
return {
"count": len(records),
"versions": [
{
"id": r.id,
"change_type": r.change_type,
"changed_by": r.changed_by,
"changed_by_name": r.changed_by_name,
"changed_at": r.changed_at,
"old_value": r.old_value,
"new_value": r.new_value,
"description": r.description
}
for r in records
]
}
@app.post("/api/v1/history/{record_id}/revert")
async def revert_change(record_id: str, reverted_by: str = "current_user"):
"""回滚变更"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
success = manager.revert_change(record_id, reverted_by)
if not success:
raise HTTPException(status_code=404, detail="Change record not found or already reverted")
return {"success": True, "message": "Change reverted"}
# ----- 团队成员 -----
@app.post("/api/v1/projects/{project_id}/members")
async def invite_team_member(project_id: str, request: TeamMemberInvite, invited_by: str = "current_user"):
"""邀请团队成员"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
member = manager.add_team_member(
project_id=project_id,
user_id=request.user_id,
user_name=request.user_name,
user_email=request.user_email,
role=request.role,
invited_by=invited_by
)
return {
"id": member.id,
"user_id": member.user_id,
"user_name": member.user_name,
"user_email": member.user_email,
"role": member.role,
"joined_at": member.joined_at,
"permissions": member.permissions
}
@app.get("/api/v1/projects/{project_id}/members")
async def list_team_members(project_id: str):
"""列出团队成员"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
members = manager.get_team_members(project_id)
return {
"count": len(members),
"members": [
{
"id": m.id,
"user_id": m.user_id,
"user_name": m.user_name,
"user_email": m.user_email,
"role": m.role,
"joined_at": m.joined_at,
"last_active_at": m.last_active_at,
"permissions": m.permissions
}
for m in members
]
}
@app.put("/api/v1/members/{member_id}/role")
async def update_member_role(member_id: str, request: TeamMemberRoleUpdate, updated_by: str = "current_user"):
"""更新成员角色"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
success = manager.update_member_role(member_id, request.role, updated_by)
if not success:
raise HTTPException(status_code=404, detail="Member not found")
return {"success": True, "message": "Member role updated"}
@app.delete("/api/v1/members/{member_id}")
async def remove_team_member(member_id: str, removed_by: str = "current_user"):
"""移除团队成员"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
success = manager.remove_team_member(member_id, removed_by)
if not success:
raise HTTPException(status_code=404, detail="Member not found")
return {"success": True, "message": "Member removed"}
@app.get("/api/v1/projects/{project_id}/permissions")
async def check_project_permissions(project_id: str, user_id: str = "current_user"):
"""检查用户权限"""
if not COLLABORATION_AVAILABLE:
raise HTTPException(status_code=503, detail="Collaboration module not available")
manager = get_collab_manager()
members = manager.get_team_members(project_id)
user_member = None
for m in members:
if m.user_id == user_id:
user_member = m
break
if not user_member:
return {
"has_access": False,
"role": None,
"permissions": []
}
return {
"has_access": True,
"role": user_member.role,
"permissions": user_member.permissions
}
# Serve frontend - MUST be last to not override API routes
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)