Files
insightflow/backend/main.py
2026-02-17 18:12:11 +08:00

290 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Backend - Phase 3 (Production Ready)
Knowledge Growth: Multi-file fusion + Entity Alignment
ASR: 阿里云听悟 + OSS
"""
import os
import json
import httpx
import uuid
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
# Import clients
try:
from oss_uploader import get_oss_uploader
OSS_AVAILABLE = True
except ImportError:
OSS_AVAILABLE = False
try:
from tingwu_client import TingwuClient
TINGWU_AVAILABLE = True
except ImportError:
TINGWU_AVAILABLE = False
try:
from db_manager import get_db_manager, Project, Entity, EntityMention
DB_AVAILABLE = True
except ImportError:
DB_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.3.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Models
class EntityModel(BaseModel):
id: str
name: str
type: str
definition: Optional[str] = ""
aliases: List[str] = []
class TranscriptSegment(BaseModel):
start: float
end: float
text: str
speaker: Optional[str] = "Speaker A"
class AnalysisResult(BaseModel):
transcript_id: str
project_id: str
segments: List[TranscriptSegment]
entities: List[EntityModel]
full_text: str
created_at: str
class ProjectCreate(BaseModel):
name: str
description: str = ""
# API Keys
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
KIMI_BASE_URL = "https://api.kimi.com/coding"
def transcribe_audio(audio_data: bytes, filename: str) -> dict:
"""转录音频OSS上传 + 听悟转录"""
# 1. 上传 OSS
if not OSS_AVAILABLE:
print("OSS not available, using mock")
return mock_transcribe()
try:
uploader = get_oss_uploader()
audio_url, object_name = uploader.upload_audio(audio_data, filename)
print(f"Uploaded to OSS: {object_name}")
except Exception as e:
print(f"OSS upload failed: {e}")
return mock_transcribe()
# 2. 听悟转录
if not TINGWU_AVAILABLE:
print("Tingwu not available, using mock")
return mock_transcribe()
try:
client = TingwuClient()
result = client.transcribe(audio_url)
print(f"Transcription complete: {len(result['segments'])} segments")
return result
except Exception as e:
print(f"Tingwu failed: {e}")
return mock_transcribe()
def mock_transcribe() -> dict:
"""Mock 转录结果"""
return {
"full_text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成。",
"segments": [
{"start": 0.0, "end": 5.0, "text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成。", "speaker": "Speaker A"}
]
}
def extract_entities_with_llm(text: str) -> List[dict]:
"""使用 Kimi API 提取实体"""
if not KIMI_API_KEY or not text:
return []
prompt = f"""从以下会议文本中提取关键实体,以 JSON 格式返回:
文本:{text[:3000]}
要求:
1. 每个实体包含name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), definition(一句话定义)
2. 只返回 JSON 数组
示例:[{{"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}}]
"""
try:
response = httpx.post(
f"{KIMI_BASE_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"},
json={"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1},
timeout=60.0
)
response.raise_for_status()
result = response.json()
content = result["choices"][0]["message"]["content"]
import re
json_match = re.search(r'\[.*?\]', content, re.DOTALL)
if json_match:
return json.loads(json_match.group())
except Exception as e:
print(f"LLM extraction failed: {e}")
return []
def align_entity(project_id: str, name: str, db) -> Optional[Entity]:
"""实体对齐"""
existing = db.get_entity_by_name(project_id, name)
if existing:
return existing
similar = db.find_similar_entities(project_id, name)
if similar:
return similar[0]
return None
# API Endpoints
@app.post("/api/v1/projects", response_model=dict)
async def create_project(project: ProjectCreate):
"""创建新项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project_id = str(uuid.uuid4())[:8]
p = db.create_project(project_id, project.name, project.description)
return {"id": p.id, "name": p.name, "description": p.description}
@app.get("/api/v1/projects")
async def list_projects():
"""列出所有项目"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
projects = db.list_projects()
return [{"id": p.id, "name": p.name, "description": p.description} for p in projects]
@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult)
async def upload_audio(project_id: str, file: UploadFile = File(...)):
"""上传音频到指定项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
content = await file.read()
# 转录
print(f"Processing: {file.filename}")
tw_result = transcribe_audio(content, file.filename)
# 提取实体
print("Extracting entities...")
raw_entities = extract_entities_with_llm(tw_result["full_text"])
# 实体对齐
aligned_entities = []
for raw_ent in raw_entities:
existing = align_entity(project_id, raw_ent["name"], db)
if existing:
ent_model = EntityModel(
id=existing.id,
name=existing.name,
type=existing.type,
definition=existing.definition,
aliases=existing.aliases
)
else:
new_ent = db.create_entity(Entity(
id=str(uuid.uuid4())[:8],
project_id=project_id,
name=raw_ent["name"],
type=raw_ent.get("type", "OTHER"),
definition=raw_ent.get("definition", "")
))
ent_model = EntityModel(
id=new_ent.id,
name=new_ent.name,
type=new_ent.type,
definition=new_ent.definition
)
aligned_entities.append(ent_model)
# 构建片段
segments = [TranscriptSegment(**seg) for seg in tw_result["segments"]]
transcript_id = str(uuid.uuid4())[:8]
return AnalysisResult(
transcript_id=transcript_id,
project_id=project_id,
segments=segments,
entities=aligned_entities,
full_text=tw_result["full_text"],
created_at=datetime.now().isoformat()
)
@app.get("/api/v1/projects/{project_id}/entities")
async def get_project_entities(project_id: str):
"""获取项目的全局实体列表"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
entities = db.list_project_entities(project_id)
return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities]
@app.post("/api/v1/entities/{entity_id}/merge")
async def merge_entities(entity_id: str, target_entity_id: str):
"""合并两个实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
result = db.merge_entities(target_entity_id, entity_id)
return {"success": True, "merged_entity": {"id": result.id, "name": result.name}}
# Health check
@app.get("/health")
async def health_check():
return {
"status": "ok",
"version": "0.3.0",
"oss_available": OSS_AVAILABLE,
"tingwu_available": TINGWU_AVAILABLE,
"db_available": DB_AVAILABLE
}
# Serve frontend
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)