Files
insightflow/backend/main.py
2026-02-18 18:03:01 +08:00

981 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Backend - Phase 3 (Memory & Growth)
Knowledge Growth: Multi-file fusion + Entity Alignment + Document Import
ASR: 阿里云听悟 + OSS
"""
import os
import json
import httpx
import uuid
import re
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
# Import clients
try:
from oss_uploader import get_oss_uploader
OSS_AVAILABLE = True
except ImportError:
OSS_AVAILABLE = False
try:
from tingwu_client import TingwuClient
TINGWU_AVAILABLE = True
except ImportError:
TINGWU_AVAILABLE = False
try:
from db_manager import get_db_manager, Project, Entity, EntityMention
DB_AVAILABLE = True
except ImportError:
DB_AVAILABLE = False
try:
from document_processor import DocumentProcessor
DOC_PROCESSOR_AVAILABLE = True
except ImportError:
DOC_PROCESSOR_AVAILABLE = False
try:
from entity_aligner import EntityAligner
ALIGNER_AVAILABLE = True
except ImportError:
ALIGNER_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.3.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Models
class EntityModel(BaseModel):
id: str
name: str
type: str
definition: Optional[str] = ""
aliases: List[str] = []
class TranscriptSegment(BaseModel):
start: float
end: float
text: str
speaker: Optional[str] = "Speaker A"
class AnalysisResult(BaseModel):
transcript_id: str
project_id: str
segments: List[TranscriptSegment]
entities: List[EntityModel]
full_text: str
created_at: str
class ProjectCreate(BaseModel):
name: str
description: str = ""
class EntityUpdate(BaseModel):
name: Optional[str] = None
type: Optional[str] = None
definition: Optional[str] = None
aliases: Optional[List[str]] = None
class RelationCreate(BaseModel):
source_entity_id: str
target_entity_id: str
relation_type: str
evidence: Optional[str] = ""
class TranscriptUpdate(BaseModel):
full_text: str
class EntityMergeRequest(BaseModel):
source_entity_id: str
target_entity_id: str
class GlossaryTermCreate(BaseModel):
term: str
pronunciation: Optional[str] = ""
# API Keys
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
KIMI_BASE_URL = os.getenv("KIMI_BASE_URL", "https://api.kimi.com/coding")
# Phase 3: Entity Aligner singleton
_aligner = None
def get_aligner():
global _aligner
if _aligner is None and ALIGNER_AVAILABLE:
_aligner = EntityAligner()
return _aligner
# Phase 3: Document Processor singleton
_doc_processor = None
def get_doc_processor():
global _doc_processor
if _doc_processor is None and DOC_PROCESSOR_AVAILABLE:
_doc_processor = DocumentProcessor()
return _doc_processor
# Phase 2: Entity Edit API
@app.put("/api/v1/entities/{entity_id}")
async def update_entity(entity_id: str, update: EntityUpdate):
"""更新实体信息(名称、类型、定义、别名)"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
# 更新字段
update_data = {k: v for k, v in update.dict().items() if v is not None}
updated = db.update_entity(entity_id, **update_data)
return {
"id": updated.id,
"name": updated.name,
"type": updated.type,
"definition": updated.definition,
"aliases": updated.aliases
}
@app.delete("/api/v1/entities/{entity_id}")
async def delete_entity(entity_id: str):
"""删除实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
entity = db.get_entity(entity_id)
if not entity:
raise HTTPException(status_code=404, detail="Entity not found")
db.delete_entity(entity_id)
return {"success": True, "message": f"Entity {entity_id} deleted"}
@app.post("/api/v1/entities/{entity_id}/merge")
async def merge_entities_endpoint(entity_id: str, merge_req: EntityMergeRequest):
"""合并两个实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
# 验证两个实体都存在
source = db.get_entity(merge_req.source_entity_id)
target = db.get_entity(merge_req.target_entity_id)
if not source or not target:
raise HTTPException(status_code=404, detail="Entity not found")
result = db.merge_entities(merge_req.target_entity_id, merge_req.source_entity_id)
return {
"success": True,
"merged_entity": {
"id": result.id,
"name": result.name,
"type": result.type,
"definition": result.definition,
"aliases": result.aliases
}
}
# Phase 2: Relation Edit API
@app.post("/api/v1/projects/{project_id}/relations")
async def create_relation_endpoint(project_id: str, relation: RelationCreate):
"""创建新的实体关系"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
# 验证实体存在
source = db.get_entity(relation.source_entity_id)
target = db.get_entity(relation.target_entity_id)
if not source or not target:
raise HTTPException(status_code=404, detail="Source or target entity not found")
relation_id = db.create_relation(
project_id=project_id,
source_entity_id=relation.source_entity_id,
target_entity_id=relation.target_entity_id,
relation_type=relation.relation_type,
evidence=relation.evidence
)
return {
"id": relation_id,
"source_id": relation.source_entity_id,
"target_id": relation.target_entity_id,
"type": relation.relation_type,
"success": True
}
@app.delete("/api/v1/relations/{relation_id}")
async def delete_relation(relation_id: str):
"""删除关系"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
db.delete_relation(relation_id)
return {"success": True, "message": f"Relation {relation_id} deleted"}
@app.put("/api/v1/relations/{relation_id}")
async def update_relation(relation_id: str, relation: RelationCreate):
"""更新关系"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
updated = db.update_relation(
relation_id=relation_id,
relation_type=relation.relation_type,
evidence=relation.evidence
)
return {
"id": relation_id,
"type": updated["relation_type"],
"evidence": updated["evidence"],
"success": True
}
# Phase 2: Transcript Edit API
@app.get("/api/v1/transcripts/{transcript_id}")
async def get_transcript(transcript_id: str):
"""获取转录详情"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
transcript = db.get_transcript(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
return transcript
@app.put("/api/v1/transcripts/{transcript_id}")
async def update_transcript(transcript_id: str, update: TranscriptUpdate):
"""更新转录文本(人工修正)"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
transcript = db.get_transcript(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
updated = db.update_transcript(transcript_id, update.full_text)
return {
"id": transcript_id,
"full_text": updated["full_text"],
"updated_at": updated["updated_at"],
"success": True
}
# Phase 2: Manual Entity Creation
class ManualEntityCreate(BaseModel):
name: str
type: str = "OTHER"
definition: str = ""
transcript_id: Optional[str] = None
start_pos: Optional[int] = None
end_pos: Optional[int] = None
@app.post("/api/v1/projects/{project_id}/entities")
async def create_manual_entity(project_id: str, entity: ManualEntityCreate):
"""手动创建实体(划词新建)"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
# 检查是否已存在
existing = db.get_entity_by_name(project_id, entity.name)
if existing:
return {
"id": existing.id,
"name": existing.name,
"type": existing.type,
"existed": True
}
entity_id = str(uuid.uuid4())[:8]
new_entity = db.create_entity(Entity(
id=entity_id,
project_id=project_id,
name=entity.name,
type=entity.type,
definition=entity.definition
))
# 如果有提及位置信息,保存提及
if entity.transcript_id and entity.start_pos is not None and entity.end_pos is not None:
transcript = db.get_transcript(entity.transcript_id)
if transcript:
text = transcript["full_text"]
mention = EntityMention(
id=str(uuid.uuid4())[:8],
entity_id=entity_id,
transcript_id=entity.transcript_id,
start_pos=entity.start_pos,
end_pos=entity.end_pos,
text_snippet=text[max(0, entity.start_pos-20):min(len(text), entity.end_pos+20)],
confidence=1.0
)
db.add_mention(mention)
return {
"id": new_entity.id,
"name": new_entity.name,
"type": new_entity.type,
"definition": new_entity.definition,
"success": True
}
def transcribe_audio(audio_data: bytes, filename: str) -> dict:
"""转录音频OSS上传 + 听悟转录"""
# 1. 上传 OSS
if not OSS_AVAILABLE:
print("OSS not available, using mock")
return mock_transcribe()
try:
uploader = get_oss_uploader()
audio_url, object_name = uploader.upload_audio(audio_data, filename)
print(f"Uploaded to OSS: {object_name}")
except Exception as e:
print(f"OSS upload failed: {e}")
return mock_transcribe()
# 2. 听悟转录
if not TINGWU_AVAILABLE:
print("Tingwu not available, using mock")
return mock_transcribe()
try:
client = TingwuClient()
result = client.transcribe(audio_url)
print(f"Transcription complete: {len(result['segments'])} segments")
return result
except Exception as e:
print(f"Tingwu failed: {e}")
return mock_transcribe()
def mock_transcribe() -> dict:
"""Mock 转录结果"""
return {
"full_text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成。",
"segments": [
{"start": 0.0, "end": 5.0, "text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成。", "speaker": "Speaker A"}
]
}
def extract_entities_with_llm(text: str) -> tuple[List[dict], List[dict]]:
"""使用 Kimi API 提取实体和关系
Returns:
(entities, relations): 实体列表和关系列表
"""
if not KIMI_API_KEY or not text:
return [], []
prompt = f"""从以下会议文本中提取关键实体和它们之间的关系,以 JSON 格式返回:
文本:{text[:3000]}
要求:
1. entities: 每个实体包含 name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), definition(一句话定义)
2. relations: 每个关系包含 source(源实体名), target(目标实体名), type(关系类型: belongs_to/works_with/depends_on/mentions/related)
3. 只返回 JSON 对象,格式: {{"entities": [...], "relations": [...]}}
示例:
{{
"entities": [
{{"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}},
{{"name": "K8s", "type": "TECH", "definition": "Kubernetes容器编排平台"}}
],
"relations": [
{{"source": "Project Alpha", "target": "K8s", "type": "depends_on"}}
]
}}
"""
try:
response = httpx.post(
f"{KIMI_BASE_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"},
json={"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1},
timeout=60.0
)
response.raise_for_status()
result = response.json()
content = result["choices"][0]["message"]["content"]
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
data = json.loads(json_match.group())
return data.get("entities", []), data.get("relations", [])
except Exception as e:
print(f"LLM extraction failed: {e}")
return [], []
def align_entity(project_id: str, name: str, db, definition: str = "") -> Optional['Entity']:
"""实体对齐 - Phase 3: 使用 embedding 对齐"""
# 1. 首先尝试精确匹配
existing = db.get_entity_by_name(project_id, name)
if existing:
return existing
# 2. 使用 embedding 对齐(如果可用)
aligner = get_aligner()
if aligner:
similar = aligner.find_similar_entity(project_id, name, definition)
if similar:
return similar
# 3. 回退到简单相似度匹配
similar = db.find_similar_entities(project_id, name)
if similar:
return similar[0]
return None
# API Endpoints
@app.post("/api/v1/projects", response_model=dict)
async def create_project(project: ProjectCreate):
"""创建新项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project_id = str(uuid.uuid4())[:8]
p = db.create_project(project_id, project.name, project.description)
return {"id": p.id, "name": p.name, "description": p.description}
@app.get("/api/v1/projects")
async def list_projects():
"""列出所有项目"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
projects = db.list_projects()
return [{"id": p.id, "name": p.name, "description": p.description} for p in projects]
@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult)
async def upload_audio(project_id: str, file: UploadFile = File(...)):
"""上传音频到指定项目 - Phase 3: 支持多文件融合"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
content = await file.read()
# 转录
print(f"Processing: {file.filename}")
tw_result = transcribe_audio(content, file.filename)
# 提取实体和关系
print("Extracting entities and relations...")
raw_entities, raw_relations = extract_entities_with_llm(tw_result["full_text"])
# 保存转录记录
transcript_id = str(uuid.uuid4())[:8]
db.save_transcript(
transcript_id=transcript_id,
project_id=project_id,
filename=file.filename,
full_text=tw_result["full_text"]
)
# 实体对齐并保存 - Phase 3: 使用增强对齐
aligned_entities = []
entity_name_to_id = {} # 用于关系映射
for raw_ent in raw_entities:
existing = align_entity(project_id, raw_ent["name"], db, raw_ent.get("definition", ""))
if existing:
ent_model = EntityModel(
id=existing.id,
name=existing.name,
type=existing.type,
definition=existing.definition,
aliases=existing.aliases
)
entity_name_to_id[raw_ent["name"]] = existing.id
else:
new_ent = db.create_entity(Entity(
id=str(uuid.uuid4())[:8],
project_id=project_id,
name=raw_ent["name"],
type=raw_ent.get("type", "OTHER"),
definition=raw_ent.get("definition", "")
))
ent_model = EntityModel(
id=new_ent.id,
name=new_ent.name,
type=new_ent.type,
definition=new_ent.definition
)
entity_name_to_id[raw_ent["name"]] = new_ent.id
aligned_entities.append(ent_model)
# 保存实体提及位置
full_text = tw_result["full_text"]
name = raw_ent["name"]
start_pos = 0
while True:
pos = full_text.find(name, start_pos)
if pos == -1:
break
mention = EntityMention(
id=str(uuid.uuid4())[:8],
entity_id=entity_name_to_id[name],
transcript_id=transcript_id,
start_pos=pos,
end_pos=pos + len(name),
text_snippet=full_text[max(0, pos-20):min(len(full_text), pos+len(name)+20)],
confidence=1.0
)
db.add_mention(mention)
start_pos = pos + 1
# 保存关系
for rel in raw_relations:
source_id = entity_name_to_id.get(rel.get("source", ""))
target_id = entity_name_to_id.get(rel.get("target", ""))
if source_id and target_id:
db.create_relation(
project_id=project_id,
source_entity_id=source_id,
target_entity_id=target_id,
relation_type=rel.get("type", "related"),
evidence=tw_result["full_text"][:200],
transcript_id=transcript_id
)
# 构建片段
segments = [TranscriptSegment(**seg) for seg in tw_result["segments"]]
return AnalysisResult(
transcript_id=transcript_id,
project_id=project_id,
segments=segments,
entities=aligned_entities,
full_text=tw_result["full_text"],
created_at=datetime.now().isoformat()
)
# Phase 3: Document Upload API
@app.post("/api/v1/projects/{project_id}/upload-document")
async def upload_document(project_id: str, file: UploadFile = File(...)):
"""上传 PDF/DOCX 文档到指定项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
if not DOC_PROCESSOR_AVAILABLE:
raise HTTPException(status_code=500, detail="Document processor not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
content = await file.read()
# 处理文档
processor = get_doc_processor()
try:
result = processor.process(content, file.filename)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Document processing failed: {str(e)}")
# 保存文档转录记录
transcript_id = str(uuid.uuid4())[:8]
db.save_transcript(
transcript_id=transcript_id,
project_id=project_id,
filename=file.filename,
full_text=result["text"],
transcript_type="document"
)
# 提取实体和关系
raw_entities, raw_relations = extract_entities_with_llm(result["text"])
# 实体对齐并保存
aligned_entities = []
entity_name_to_id = {}
for raw_ent in raw_entities:
existing = align_entity(project_id, raw_ent["name"], db, raw_ent.get("definition", ""))
if existing:
entity_name_to_id[raw_ent["name"]] = existing.id
aligned_entities.append(EntityModel(
id=existing.id,
name=existing.name,
type=existing.type,
definition=existing.definition,
aliases=existing.aliases
))
else:
new_ent = db.create_entity(Entity(
id=str(uuid.uuid4())[:8],
project_id=project_id,
name=raw_ent["name"],
type=raw_ent.get("type", "OTHER"),
definition=raw_ent.get("definition", "")
))
entity_name_to_id[raw_ent["name"]] = new_ent.id
aligned_entities.append(EntityModel(
id=new_ent.id,
name=new_ent.name,
type=new_ent.type,
definition=new_ent.definition
))
# 保存实体提及位置
full_text = result["text"]
name = raw_ent["name"]
start_pos = 0
while True:
pos = full_text.find(name, start_pos)
if pos == -1:
break
mention = EntityMention(
id=str(uuid.uuid4())[:8],
entity_id=entity_name_to_id[name],
transcript_id=transcript_id,
start_pos=pos,
end_pos=pos + len(name),
text_snippet=full_text[max(0, pos-20):min(len(full_text), pos+len(name)+20)],
confidence=1.0
)
db.add_mention(mention)
start_pos = pos + 1
# 保存关系
for rel in raw_relations:
source_id = entity_name_to_id.get(rel.get("source", ""))
target_id = entity_name_to_id.get(rel.get("target", ""))
if source_id and target_id:
db.create_relation(
project_id=project_id,
source_entity_id=source_id,
target_entity_id=target_id,
relation_type=rel.get("type", "related"),
evidence=result["text"][:200],
transcript_id=transcript_id
)
return {
"transcript_id": transcript_id,
"project_id": project_id,
"filename": file.filename,
"text_length": len(result["text"]),
"entities": [e.dict() for e in aligned_entities],
"created_at": datetime.now().isoformat()
}
# Phase 3: Knowledge Base API
@app.get("/api/v1/projects/{project_id}/knowledge-base")
async def get_knowledge_base(project_id: str):
"""获取项目知识库 - 包含所有实体、关系、术语表"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取所有实体
entities = db.list_project_entities(project_id)
# 获取所有关系
relations = db.list_project_relations(project_id)
# 获取所有转录
transcripts = db.list_project_transcripts(project_id)
# 获取术语表
glossary = db.list_glossary(project_id)
# 构建实体统计
entity_stats = {}
for ent in entities:
mentions = db.get_entity_mentions(ent.id)
entity_stats[ent.id] = {
"mention_count": len(mentions),
"transcript_ids": list(set([m.transcript_id for m in mentions]))
}
# 构建实体名称映射
entity_map = {e.id: e.name for e in entities}
return {
"project": {
"id": project.id,
"name": project.name,
"description": project.description
},
"stats": {
"entity_count": len(entities),
"relation_count": len(relations),
"transcript_count": len(transcripts),
"glossary_count": len(glossary)
},
"entities": [
{
"id": e.id,
"name": e.name,
"type": e.type,
"definition": e.definition,
"aliases": e.aliases,
"mention_count": entity_stats.get(e.id, {}).get("mention_count", 0),
"appears_in": entity_stats.get(e.id, {}).get("transcript_ids", [])
}
for e in entities
],
"relations": [
{
"id": r["id"],
"source_id": r["source_entity_id"],
"source_name": entity_map.get(r["source_entity_id"], "Unknown"),
"target_id": r["target_entity_id"],
"target_name": entity_map.get(r["target_entity_id"], "Unknown"),
"type": r["relation_type"],
"evidence": r["evidence"]
}
for r in relations
],
"glossary": [
{
"id": g["id"],
"term": g["term"],
"pronunciation": g["pronunciation"],
"frequency": g["frequency"]
}
for g in glossary
],
"transcripts": [
{
"id": t["id"],
"filename": t["filename"],
"type": t.get("type", "audio"),
"created_at": t["created_at"]
}
for t in transcripts
]
}
# Phase 3: Glossary API
@app.post("/api/v1/projects/{project_id}/glossary")
async def add_glossary_term(project_id: str, term: GlossaryTermCreate):
"""添加术语到项目术语表"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
term_id = db.add_glossary_term(
project_id=project_id,
term=term.term,
pronunciation=term.pronunciation
)
return {
"id": term_id,
"term": term.term,
"pronunciation": term.pronunciation,
"success": True
}
@app.get("/api/v1/projects/{project_id}/glossary")
async def get_glossary(project_id: str):
"""获取项目术语表"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
glossary = db.list_glossary(project_id)
return glossary
@app.delete("/api/v1/glossary/{term_id}")
async def delete_glossary_term(term_id: str):
"""删除术语"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
db.delete_glossary_term(term_id)
return {"success": True}
# Phase 3: Entity Alignment API
@app.post("/api/v1/projects/{project_id}/align-entities")
async def align_project_entities(project_id: str, threshold: float = 0.85):
"""运行实体对齐算法,合并相似实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
aligner = get_aligner()
if not aligner:
raise HTTPException(status_code=500, detail="Entity aligner not available")
db = get_db_manager()
entities = db.list_project_entities(project_id)
merged_count = 0
merged_pairs = []
# 使用 embedding 对齐
for i, entity in enumerate(entities):
# 跳过已合并的实体
existing = db.get_entity(entity.id)
if not existing:
continue
similar = aligner.find_similar_entity(
project_id,
entity.name,
entity.definition,
exclude_id=entity.id,
threshold=threshold
)
if similar:
# 合并实体
db.merge_entities(similar.id, entity.id)
merged_count += 1
merged_pairs.append({
"source": entity.name,
"target": similar.name
})
return {
"success": True,
"merged_count": merged_count,
"merged_pairs": merged_pairs
}
@app.get("/api/v1/projects/{project_id}/entities")
async def get_project_entities(project_id: str):
"""获取项目的全局实体列表"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
entities = db.list_project_entities(project_id)
return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition, "aliases": e.aliases} for e in entities]
@app.get("/api/v1/projects/{project_id}/relations")
async def get_project_relations(project_id: str):
"""获取项目的实体关系列表"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
relations = db.list_project_relations(project_id)
# 获取实体名称映射
entities = db.list_project_entities(project_id)
entity_map = {e.id: e.name for e in entities}
return [{
"id": r["id"],
"source_id": r["source_entity_id"],
"source_name": entity_map.get(r["source_entity_id"], "Unknown"),
"target_id": r["target_entity_id"],
"target_name": entity_map.get(r["target_entity_id"], "Unknown"),
"type": r["relation_type"],
"evidence": r["evidence"]
} for r in relations]
@app.get("/api/v1/projects/{project_id}/transcripts")
async def get_project_transcripts(project_id: str):
"""获取项目的转录列表"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
transcripts = db.list_project_transcripts(project_id)
return [{
"id": t["id"],
"filename": t["filename"],
"type": t.get("type", "audio"),
"created_at": t["created_at"],
"preview": t["full_text"][:100] + "..." if len(t["full_text"]) > 100 else t["full_text"]
} for t in transcripts]
@app.get("/api/v1/entities/{entity_id}/mentions")
async def get_entity_mentions(entity_id: str):
"""获取实体的所有提及位置"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
mentions = db.get_entity_mentions(entity_id)
return [{
"id": m.id,
"transcript_id": m.transcript_id,
"start_pos": m.start_pos,
"end_pos": m.end_pos,
"text_snippet": m.text_snippet,
"confidence": m.confidence
} for m in mentions]
# Health check
@app.get("/health")
async def health_check():
return {
"status": "ok",
"version": "0.3.0",
"phase": "Phase 3 - Memory & Growth",
"oss_available": OSS_AVAILABLE,
"tingwu_available": TINGWU_AVAILABLE,
"db_available": DB_AVAILABLE,
"doc_processor_available": DOC_PROCESSOR_AVAILABLE,
"aligner_available": ALIGNER_AVAILABLE
}
# Serve frontend
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)