290 lines
8.5 KiB
Python
290 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InsightFlow Backend - Phase 3 (Production Ready)
|
||
Knowledge Growth: Multi-file fusion + Entity Alignment
|
||
ASR: 阿里云听悟 + OSS
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import httpx
|
||
import uuid
|
||
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.staticfiles import StaticFiles
|
||
from pydantic import BaseModel
|
||
from typing import List, Optional
|
||
from datetime import datetime
|
||
|
||
# Import clients
|
||
try:
|
||
from oss_uploader import get_oss_uploader
|
||
OSS_AVAILABLE = True
|
||
except ImportError:
|
||
OSS_AVAILABLE = False
|
||
|
||
try:
|
||
from tingwu_client import TingwuClient
|
||
TINGWU_AVAILABLE = True
|
||
except ImportError:
|
||
TINGWU_AVAILABLE = False
|
||
|
||
try:
|
||
from db_manager import get_db_manager, Project, Entity, EntityMention
|
||
DB_AVAILABLE = True
|
||
except ImportError:
|
||
DB_AVAILABLE = False
|
||
|
||
app = FastAPI(title="InsightFlow", version="0.3.0")
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# Models
|
||
class EntityModel(BaseModel):
|
||
id: str
|
||
name: str
|
||
type: str
|
||
definition: Optional[str] = ""
|
||
aliases: List[str] = []
|
||
|
||
class TranscriptSegment(BaseModel):
|
||
start: float
|
||
end: float
|
||
text: str
|
||
speaker: Optional[str] = "Speaker A"
|
||
|
||
class AnalysisResult(BaseModel):
|
||
transcript_id: str
|
||
project_id: str
|
||
segments: List[TranscriptSegment]
|
||
entities: List[EntityModel]
|
||
full_text: str
|
||
created_at: str
|
||
|
||
class ProjectCreate(BaseModel):
|
||
name: str
|
||
description: str = ""
|
||
|
||
# API Keys
|
||
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
|
||
KIMI_BASE_URL = "https://api.kimi.com/coding"
|
||
|
||
def transcribe_audio(audio_data: bytes, filename: str) -> dict:
|
||
"""转录音频:OSS上传 + 听悟转录"""
|
||
|
||
# 1. 上传 OSS
|
||
if not OSS_AVAILABLE:
|
||
print("OSS not available, using mock")
|
||
return mock_transcribe()
|
||
|
||
try:
|
||
uploader = get_oss_uploader()
|
||
audio_url, object_name = uploader.upload_audio(audio_data, filename)
|
||
print(f"Uploaded to OSS: {object_name}")
|
||
except Exception as e:
|
||
print(f"OSS upload failed: {e}")
|
||
return mock_transcribe()
|
||
|
||
# 2. 听悟转录
|
||
if not TINGWU_AVAILABLE:
|
||
print("Tingwu not available, using mock")
|
||
return mock_transcribe()
|
||
|
||
try:
|
||
client = TingwuClient()
|
||
result = client.transcribe(audio_url)
|
||
print(f"Transcription complete: {len(result['segments'])} segments")
|
||
return result
|
||
except Exception as e:
|
||
print(f"Tingwu failed: {e}")
|
||
return mock_transcribe()
|
||
|
||
def mock_transcribe() -> dict:
|
||
"""Mock 转录结果"""
|
||
return {
|
||
"full_text": "我们今天讨论 Project Alpha 的进度,K8s 集群已经部署完成。",
|
||
"segments": [
|
||
{"start": 0.0, "end": 5.0, "text": "我们今天讨论 Project Alpha 的进度,K8s 集群已经部署完成。", "speaker": "Speaker A"}
|
||
]
|
||
}
|
||
|
||
def extract_entities_with_llm(text: str) -> List[dict]:
|
||
"""使用 Kimi API 提取实体"""
|
||
if not KIMI_API_KEY or not text:
|
||
return []
|
||
|
||
prompt = f"""从以下会议文本中提取关键实体,以 JSON 格式返回:
|
||
|
||
文本:{text[:3000]}
|
||
|
||
要求:
|
||
1. 每个实体包含:name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), definition(一句话定义)
|
||
2. 只返回 JSON 数组
|
||
|
||
示例:[{{"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}}]
|
||
"""
|
||
|
||
try:
|
||
response = httpx.post(
|
||
f"{KIMI_BASE_URL}/v1/chat/completions",
|
||
headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"},
|
||
json={"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1},
|
||
timeout=60.0
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
content = result["choices"][0]["message"]["content"]
|
||
|
||
import re
|
||
json_match = re.search(r'\[.*?\]', content, re.DOTALL)
|
||
if json_match:
|
||
return json.loads(json_match.group())
|
||
except Exception as e:
|
||
print(f"LLM extraction failed: {e}")
|
||
|
||
return []
|
||
|
||
def align_entity(project_id: str, name: str, db) -> Optional[Entity]:
|
||
"""实体对齐"""
|
||
existing = db.get_entity_by_name(project_id, name)
|
||
if existing:
|
||
return existing
|
||
|
||
similar = db.find_similar_entities(project_id, name)
|
||
if similar:
|
||
return similar[0]
|
||
|
||
return None
|
||
|
||
# API Endpoints
|
||
|
||
@app.post("/api/v1/projects", response_model=dict)
|
||
async def create_project(project: ProjectCreate):
|
||
"""创建新项目"""
|
||
if not DB_AVAILABLE:
|
||
raise HTTPException(status_code=500, detail="Database not available")
|
||
|
||
db = get_db_manager()
|
||
project_id = str(uuid.uuid4())[:8]
|
||
p = db.create_project(project_id, project.name, project.description)
|
||
return {"id": p.id, "name": p.name, "description": p.description}
|
||
|
||
@app.get("/api/v1/projects")
|
||
async def list_projects():
|
||
"""列出所有项目"""
|
||
if not DB_AVAILABLE:
|
||
return []
|
||
|
||
db = get_db_manager()
|
||
projects = db.list_projects()
|
||
return [{"id": p.id, "name": p.name, "description": p.description} for p in projects]
|
||
|
||
@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult)
|
||
async def upload_audio(project_id: str, file: UploadFile = File(...)):
|
||
"""上传音频到指定项目"""
|
||
if not DB_AVAILABLE:
|
||
raise HTTPException(status_code=500, detail="Database not available")
|
||
|
||
db = get_db_manager()
|
||
project = db.get_project(project_id)
|
||
if not project:
|
||
raise HTTPException(status_code=404, detail="Project not found")
|
||
|
||
content = await file.read()
|
||
|
||
# 转录
|
||
print(f"Processing: {file.filename}")
|
||
tw_result = transcribe_audio(content, file.filename)
|
||
|
||
# 提取实体
|
||
print("Extracting entities...")
|
||
raw_entities = extract_entities_with_llm(tw_result["full_text"])
|
||
|
||
# 实体对齐
|
||
aligned_entities = []
|
||
for raw_ent in raw_entities:
|
||
existing = align_entity(project_id, raw_ent["name"], db)
|
||
|
||
if existing:
|
||
ent_model = EntityModel(
|
||
id=existing.id,
|
||
name=existing.name,
|
||
type=existing.type,
|
||
definition=existing.definition,
|
||
aliases=existing.aliases
|
||
)
|
||
else:
|
||
new_ent = db.create_entity(Entity(
|
||
id=str(uuid.uuid4())[:8],
|
||
project_id=project_id,
|
||
name=raw_ent["name"],
|
||
type=raw_ent.get("type", "OTHER"),
|
||
definition=raw_ent.get("definition", "")
|
||
))
|
||
ent_model = EntityModel(
|
||
id=new_ent.id,
|
||
name=new_ent.name,
|
||
type=new_ent.type,
|
||
definition=new_ent.definition
|
||
)
|
||
|
||
aligned_entities.append(ent_model)
|
||
|
||
# 构建片段
|
||
segments = [TranscriptSegment(**seg) for seg in tw_result["segments"]]
|
||
|
||
transcript_id = str(uuid.uuid4())[:8]
|
||
|
||
return AnalysisResult(
|
||
transcript_id=transcript_id,
|
||
project_id=project_id,
|
||
segments=segments,
|
||
entities=aligned_entities,
|
||
full_text=tw_result["full_text"],
|
||
created_at=datetime.now().isoformat()
|
||
)
|
||
|
||
@app.get("/api/v1/projects/{project_id}/entities")
|
||
async def get_project_entities(project_id: str):
|
||
"""获取项目的全局实体列表"""
|
||
if not DB_AVAILABLE:
|
||
return []
|
||
|
||
db = get_db_manager()
|
||
entities = db.list_project_entities(project_id)
|
||
return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities]
|
||
|
||
@app.post("/api/v1/entities/{entity_id}/merge")
|
||
async def merge_entities(entity_id: str, target_entity_id: str):
|
||
"""合并两个实体"""
|
||
if not DB_AVAILABLE:
|
||
raise HTTPException(status_code=500, detail="Database not available")
|
||
|
||
db = get_db_manager()
|
||
result = db.merge_entities(target_entity_id, entity_id)
|
||
return {"success": True, "merged_entity": {"id": result.id, "name": result.name}}
|
||
|
||
# Health check
|
||
@app.get("/health")
|
||
async def health_check():
|
||
return {
|
||
"status": "ok",
|
||
"version": "0.3.0",
|
||
"oss_available": OSS_AVAILABLE,
|
||
"tingwu_available": TINGWU_AVAILABLE,
|
||
"db_available": DB_AVAILABLE
|
||
}
|
||
|
||
# Serve frontend
|
||
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|