#!/usr/bin/env python3 """ InsightFlow Backend - Phase 3 (Production Ready) Knowledge Growth: Multi-file fusion + Entity Alignment ASR: 阿里云听悟 + OSS """ import os import json import httpx import uuid from fastapi import FastAPI, File, UploadFile, HTTPException, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List, Optional from datetime import datetime # Import clients try: from oss_uploader import get_oss_uploader OSS_AVAILABLE = True except ImportError: OSS_AVAILABLE = False try: from tingwu_client import TingwuClient TINGWU_AVAILABLE = True except ImportError: TINGWU_AVAILABLE = False try: from db_manager import get_db_manager, Project, Entity, EntityMention DB_AVAILABLE = True except ImportError: DB_AVAILABLE = False app = FastAPI(title="InsightFlow", version="0.3.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Models class EntityModel(BaseModel): id: str name: str type: str definition: Optional[str] = "" aliases: List[str] = [] class TranscriptSegment(BaseModel): start: float end: float text: str speaker: Optional[str] = "Speaker A" class AnalysisResult(BaseModel): transcript_id: str project_id: str segments: List[TranscriptSegment] entities: List[EntityModel] full_text: str created_at: str class ProjectCreate(BaseModel): name: str description: str = "" class EntityUpdate(BaseModel): name: Optional[str] = None type: Optional[str] = None definition: Optional[str] = None aliases: Optional[List[str]] = None class RelationCreate(BaseModel): source_entity_id: str target_entity_id: str relation_type: str evidence: Optional[str] = "" class TranscriptUpdate(BaseModel): full_text: str class EntityMergeRequest(BaseModel): source_entity_id: str target_entity_id: str # API Keys KIMI_API_KEY = os.getenv("KIMI_API_KEY", "") KIMI_BASE_URL = "https://api.kimi.com/coding" # Phase 2: Entity Edit API @app.put("/api/v1/entities/{entity_id}") async def update_entity(entity_id: str, update: EntityUpdate): """更新实体信息(名称、类型、定义、别名)""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() entity = db.get_entity(entity_id) if not entity: raise HTTPException(status_code=404, detail="Entity not found") # 更新字段 update_data = {k: v for k, v in update.dict().items() if v is not None} updated = db.update_entity(entity_id, **update_data) return { "id": updated.id, "name": updated.name, "type": updated.type, "definition": updated.definition, "aliases": updated.aliases } @app.delete("/api/v1/entities/{entity_id}") async def delete_entity(entity_id: str): """删除实体""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() entity = db.get_entity(entity_id) if not entity: raise HTTPException(status_code=404, detail="Entity not found") db.delete_entity(entity_id) return {"success": True, "message": f"Entity {entity_id} deleted"} @app.post("/api/v1/entities/{entity_id}/merge") async def merge_entities_endpoint(entity_id: str, merge_req: EntityMergeRequest): """合并两个实体""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() # 验证两个实体都存在 source = db.get_entity(merge_req.source_entity_id) target = db.get_entity(merge_req.target_entity_id) if not source or not target: raise HTTPException(status_code=404, detail="Entity not found") result = db.merge_entities(merge_req.target_entity_id, merge_req.source_entity_id) return { "success": True, "merged_entity": { "id": result.id, "name": result.name, "type": result.type, "definition": result.definition, "aliases": result.aliases } } # Phase 2: Relation Edit API @app.post("/api/v1/projects/{project_id}/relations") async def create_relation_endpoint(project_id: str, relation: RelationCreate): """创建新的实体关系""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() # 验证实体存在 source = db.get_entity(relation.source_entity_id) target = db.get_entity(relation.target_entity_id) if not source or not target: raise HTTPException(status_code=404, detail="Source or target entity not found") relation_id = db.create_relation( project_id=project_id, source_entity_id=relation.source_entity_id, target_entity_id=relation.target_entity_id, relation_type=relation.relation_type, evidence=relation.evidence ) return { "id": relation_id, "source_id": relation.source_entity_id, "target_id": relation.target_entity_id, "type": relation.relation_type, "success": True } @app.delete("/api/v1/relations/{relation_id}") async def delete_relation(relation_id: str): """删除关系""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() db.delete_relation(relation_id) return {"success": True, "message": f"Relation {relation_id} deleted"} @app.put("/api/v1/relations/{relation_id}") async def update_relation(relation_id: str, relation: RelationCreate): """更新关系""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() updated = db.update_relation( relation_id=relation_id, relation_type=relation.relation_type, evidence=relation.evidence ) return { "id": relation_id, "type": updated["relation_type"], "evidence": updated["evidence"], "success": True } # Phase 2: Transcript Edit API @app.get("/api/v1/transcripts/{transcript_id}") async def get_transcript(transcript_id: str): """获取转录详情""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() transcript = db.get_transcript(transcript_id) if not transcript: raise HTTPException(status_code=404, detail="Transcript not found") return transcript @app.put("/api/v1/transcripts/{transcript_id}") async def update_transcript(transcript_id: str, update: TranscriptUpdate): """更新转录文本(人工修正)""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() transcript = db.get_transcript(transcript_id) if not transcript: raise HTTPException(status_code=404, detail="Transcript not found") updated = db.update_transcript(transcript_id, update.full_text) return { "id": transcript_id, "full_text": updated["full_text"], "updated_at": updated["updated_at"], "success": True } # Phase 2: Manual Entity Creation class ManualEntityCreate(BaseModel): name: str type: str = "OTHER" definition: str = "" transcript_id: Optional[str] = None start_pos: Optional[int] = None end_pos: Optional[int] = None @app.post("/api/v1/projects/{project_id}/entities") async def create_manual_entity(project_id: str, entity: ManualEntityCreate): """手动创建实体(划词新建)""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() # 检查是否已存在 existing = db.get_entity_by_name(project_id, entity.name) if existing: return { "id": existing.id, "name": existing.name, "type": existing.type, "existed": True } entity_id = str(uuid.uuid4())[:8] new_entity = db.create_entity(Entity( id=entity_id, project_id=project_id, name=entity.name, type=entity.type, definition=entity.definition )) # 如果有提及位置信息,保存提及 if entity.transcript_id and entity.start_pos is not None and entity.end_pos is not None: transcript = db.get_transcript(entity.transcript_id) if transcript: text = transcript["full_text"] mention = EntityMention( id=str(uuid.uuid4())[:8], entity_id=entity_id, transcript_id=entity.transcript_id, start_pos=entity.start_pos, end_pos=entity.end_pos, text_snippet=text[max(0, entity.start_pos-20):min(len(text), entity.end_pos+20)], confidence=1.0 ) db.add_mention(mention) return { "id": new_entity.id, "name": new_entity.name, "type": new_entity.type, "definition": new_entity.definition, "success": True } def transcribe_audio(audio_data: bytes, filename: str) -> dict: """转录音频:OSS上传 + 听悟转录""" # 1. 上传 OSS if not OSS_AVAILABLE: print("OSS not available, using mock") return mock_transcribe() try: uploader = get_oss_uploader() audio_url, object_name = uploader.upload_audio(audio_data, filename) print(f"Uploaded to OSS: {object_name}") except Exception as e: print(f"OSS upload failed: {e}") return mock_transcribe() # 2. 听悟转录 if not TINGWU_AVAILABLE: print("Tingwu not available, using mock") return mock_transcribe() try: client = TingwuClient() result = client.transcribe(audio_url) print(f"Transcription complete: {len(result['segments'])} segments") return result except Exception as e: print(f"Tingwu failed: {e}") return mock_transcribe() def mock_transcribe() -> dict: """Mock 转录结果""" return { "full_text": "我们今天讨论 Project Alpha 的进度,K8s 集群已经部署完成。", "segments": [ {"start": 0.0, "end": 5.0, "text": "我们今天讨论 Project Alpha 的进度,K8s 集群已经部署完成。", "speaker": "Speaker A"} ] } def extract_entities_with_llm(text: str) -> tuple[List[dict], List[dict]]: """使用 Kimi API 提取实体和关系 Returns: (entities, relations): 实体列表和关系列表 """ if not KIMI_API_KEY or not text: return [], [] prompt = f"""从以下会议文本中提取关键实体和它们之间的关系,以 JSON 格式返回: 文本:{text[:3000]} 要求: 1. entities: 每个实体包含 name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), definition(一句话定义) 2. relations: 每个关系包含 source(源实体名), target(目标实体名), type(关系类型: belongs_to/works_with/depends_on/mentions/related) 3. 只返回 JSON 对象,格式: {{"entities": [...], "relations": [...]}} 示例: {{ "entities": [ {{"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}}, {{"name": "K8s", "type": "TECH", "definition": "Kubernetes容器编排平台"}} ], "relations": [ {{"source": "Project Alpha", "target": "K8s", "type": "depends_on"}} ] }} """ try: response = httpx.post( f"{KIMI_BASE_URL}/v1/chat/completions", headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"}, json={"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1}, timeout=60.0 ) response.raise_for_status() result = response.json() content = result["choices"][0]["message"]["content"] import re json_match = re.search(r'\{{.*?\}}', content, re.DOTALL) if json_match: data = json.loads(json_match.group()) return data.get("entities", []), data.get("relations", []) except Exception as e: print(f"LLM extraction failed: {e}") return [], [] def align_entity(project_id: str, name: str, db) -> Optional[Entity]: """实体对齐""" existing = db.get_entity_by_name(project_id, name) if existing: return existing similar = db.find_similar_entities(project_id, name) if similar: return similar[0] return None # API Endpoints @app.post("/api/v1/projects", response_model=dict) async def create_project(project: ProjectCreate): """创建新项目""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() project_id = str(uuid.uuid4())[:8] p = db.create_project(project_id, project.name, project.description) return {"id": p.id, "name": p.name, "description": p.description} @app.get("/api/v1/projects") async def list_projects(): """列出所有项目""" if not DB_AVAILABLE: return [] db = get_db_manager() projects = db.list_projects() return [{"id": p.id, "name": p.name, "description": p.description} for p in projects] @app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult) async def upload_audio(project_id: str, file: UploadFile = File(...)): """上传音频到指定项目""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() project = db.get_project(project_id) if not project: raise HTTPException(status_code=404, detail="Project not found") content = await file.read() # 转录 print(f"Processing: {file.filename}") tw_result = transcribe_audio(content, file.filename) # 提取实体和关系 print("Extracting entities and relations...") raw_entities, raw_relations = extract_entities_with_llm(tw_result["full_text"]) # 保存转录记录 transcript_id = str(uuid.uuid4())[:8] db.save_transcript( transcript_id=transcript_id, project_id=project_id, filename=file.filename, full_text=tw_result["full_text"] ) # 实体对齐并保存 aligned_entities = [] entity_name_to_id = {} # 用于关系映射 for raw_ent in raw_entities: existing = align_entity(project_id, raw_ent["name"], db) if existing: ent_model = EntityModel( id=existing.id, name=existing.name, type=existing.type, definition=existing.definition, aliases=existing.aliases ) entity_name_to_id[raw_ent["name"]] = existing.id else: new_ent = db.create_entity(Entity( id=str(uuid.uuid4())[:8], project_id=project_id, name=raw_ent["name"], type=raw_ent.get("type", "OTHER"), definition=raw_ent.get("definition", "") )) ent_model = EntityModel( id=new_ent.id, name=new_ent.name, type=new_ent.type, definition=new_ent.definition ) entity_name_to_id[raw_ent["name"]] = new_ent.id aligned_entities.append(ent_model) # 保存实体提及位置 full_text = tw_result["full_text"] name = raw_ent["name"] start_pos = 0 while True: pos = full_text.find(name, start_pos) if pos == -1: break mention = EntityMention( id=str(uuid.uuid4())[:8], entity_id=entity_name_to_id[name], transcript_id=transcript_id, start_pos=pos, end_pos=pos + len(name), text_snippet=full_text[max(0, pos-20):min(len(full_text), pos+len(name)+20)], confidence=1.0 ) db.add_mention(mention) start_pos = pos + 1 # 保存关系 for rel in raw_relations: source_id = entity_name_to_id.get(rel.get("source", "")) target_id = entity_name_to_id.get(rel.get("target", "")) if source_id and target_id: db.create_relation( project_id=project_id, source_entity_id=source_id, target_entity_id=target_id, relation_type=rel.get("type", "related"), evidence=tw_result["full_text"][:200], transcript_id=transcript_id ) # 构建片段 segments = [TranscriptSegment(**seg) for seg in tw_result["segments"]] return AnalysisResult( transcript_id=transcript_id, project_id=project_id, segments=segments, entities=aligned_entities, full_text=tw_result["full_text"], created_at=datetime.now().isoformat() ) @app.get("/api/v1/projects/{project_id}/entities") async def get_project_entities(project_id: str): """获取项目的全局实体列表""" if not DB_AVAILABLE: return [] db = get_db_manager() entities = db.list_project_entities(project_id) return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities] @app.get("/api/v1/projects/{project_id}/relations") async def get_project_relations(project_id: str): """获取项目的实体关系列表""" if not DB_AVAILABLE: return [] db = get_db_manager() relations = db.list_project_relations(project_id) # 获取实体名称映射 entities = db.list_project_entities(project_id) entity_map = {e.id: e.name for e in entities} return [{ "id": r["id"], "source_id": r["source_entity_id"], "source_name": entity_map.get(r["source_entity_id"], "Unknown"), "target_id": r["target_entity_id"], "target_name": entity_map.get(r["target_entity_id"], "Unknown"), "type": r["relation_type"], "evidence": r["evidence"] } for r in relations] @app.get("/api/v1/projects/{project_id}/transcripts") async def get_project_transcripts(project_id: str): """获取项目的转录列表""" if not DB_AVAILABLE: return [] db = get_db_manager() transcripts = db.list_project_transcripts(project_id) return [{ "id": t["id"], "filename": t["filename"], "created_at": t["created_at"], "preview": t["full_text"][:100] + "..." if len(t["full_text"]) > 100 else t["full_text"] } for t in transcripts] @app.get("/api/v1/entities/{entity_id}/mentions") async def get_entity_mentions(entity_id: str): """获取实体的所有提及位置""" if not DB_AVAILABLE: return [] db = get_db_manager() mentions = db.get_entity_mentions(entity_id) return [{ "id": m.id, "transcript_id": m.transcript_id, "start_pos": m.start_pos, "end_pos": m.end_pos, "text_snippet": m.text_snippet, "confidence": m.confidence } for m in mentions] @app.post("/api/v1/entities/{entity_id}/merge") async def merge_entities_endpoint(entity_id: str, merge_req: EntityMergeRequest): """合并两个实体""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") db = get_db_manager() # 验证两个实体都存在 source = db.get_entity(merge_req.source_entity_id) target = db.get_entity(merge_req.target_entity_id) if not source or not target: raise HTTPException(status_code=404, detail="Entity not found") result = db.merge_entities(merge_req.target_entity_id, merge_req.source_entity_id) return { "success": True, "merged_entity": { "id": result.id, "name": result.name, "type": result.type, "definition": result.definition, "aliases": result.aliases } } # Health check @app.get("/health") async def health_check(): return { "status": "ok", "version": "0.3.0", "oss_available": OSS_AVAILABLE, "tingwu_available": TINGWU_AVAILABLE, "db_available": DB_AVAILABLE } # Serve frontend app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)