Files
insightflow/backend/main.py
2026-02-17 12:26:47 +08:00

201 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Backend - Phase 1 MVP with 阿里听悟 + OSS
ASR: 阿里云听悟 (TingWu)
Speaker Diarization: 听悟内置
LLM: Kimi API for entity extraction
"""
import os
import json
import httpx
import time
import uuid
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
# 导入 OSS 上传器
try:
from oss_uploader import get_oss_uploader
OSS_AVAILABLE = True
except ImportError:
OSS_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Models
class Entity(BaseModel):
id: str
name: str
type: str
start: int
end: int
definition: Optional[str] = None
class TranscriptSegment(BaseModel):
start: float
end: float
text: str
speaker: Optional[str] = "Speaker A"
class AnalysisResult(BaseModel):
transcript_id: str
segments: List[TranscriptSegment]
entities: List[Entity]
full_text: str
created_at: str
storage = {}
# API Keys
ALI_ACCESS_KEY = os.getenv("ALI_ACCESS_KEY", "")
ALI_SECRET_KEY = os.getenv("ALI_SECRET_KEY", "")
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
KIMI_BASE_URL = "https://api.kimi.com/coding"
def transcribe_with_tingwu(audio_data: bytes, filename: str) -> dict:
"""使用阿里听悟进行转录和说话人分离"""
# 1. 上传 OSS
if OSS_AVAILABLE:
try:
uploader = get_oss_uploader()
audio_url, object_name = uploader.upload_audio(audio_data, filename)
print(f"Uploaded to OSS: {object_name}")
except Exception as e:
print(f"OSS upload failed: {e}")
# Fallback: mock result
return mock_transcribe()
else:
print("OSS not available, using mock")
return mock_transcribe()
# 2. 调用听悟 API
# TODO: 实现听悟 API 调用
# 暂时返回 mock
return mock_transcribe()
def mock_transcribe() -> dict:
"""Mock 转录结果用于测试"""
return {
"full_text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语。",
"segments": [
{"start": 0.0, "end": 5.0, "text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语。", "speaker": "Speaker A"}
]
}
def extract_entities_with_llm(text: str) -> List[Entity]:
"""使用 Kimi API 提取实体"""
if not KIMI_API_KEY or not text:
return []
prompt = f"""请从以下会议文本中提取关键实体(专有名词、项目名、技术术语、人名等),并以 JSON 格式返回:
文本:{text[:3000]}
要求:
1. 每个实体包含name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), start(起始字符位置), end(结束字符位置), definition(一句话定义)
2. 只返回 JSON 数组,不要其他内容
3. 确保 start/end 是字符在文本中的位置
示例输出:
[
{{"name": "Project Alpha", "type": "PROJECT", "start": 23, "end": 35, "definition": "Q3季度的核心项目"}},
{{"name": "K8s", "type": "TECH", "start": 37, "end": 40, "definition": "Kubernetes的缩写"}}
]
"""
try:
response = httpx.post(
f"{KIMI_BASE_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"},
json={
"model": "k2p5",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1
},
timeout=60.0
)
response.raise_for_status()
result = response.json()
content = result["choices"][0]["message"]["content"]
import re
json_match = re.search(r'\[.*?\]', content, re.DOTALL)
if json_match:
entities_data = json.loads(json_match.group())
entities = []
for i, e in enumerate(entities_data):
entities.append(Entity(
id=f"ent_{i+1}",
name=e["name"],
type=e.get("type", "OTHER"),
start=e["start"],
end=e["end"],
definition=e.get("definition", "")
))
return entities
except Exception as e:
print(f"LLM extraction failed: {e}")
return []
@app.post("/api/v1/upload", response_model=AnalysisResult)
async def upload_audio(file: UploadFile = File(...)):
"""上传音频并分析"""
content = await file.read()
# 听悟转录
print(f"Transcribing with Tingwu: {file.filename}")
tw_result = transcribe_with_tingwu(content, file.filename)
# 构建片段
segments = [
TranscriptSegment(**seg) for seg in tw_result["segments"]
] or [TranscriptSegment(start=0, end=0, text=tw_result["full_text"], speaker="Speaker A")]
# LLM 实体提取
print("Extracting entities with LLM...")
entities = extract_entities_with_llm(tw_result["full_text"])
analysis = AnalysisResult(
transcript_id=os.urandom(8).hex(),
segments=segments,
entities=entities,
full_text=tw_result["full_text"],
created_at=datetime.now().isoformat()
)
storage[analysis.transcript_id] = analysis
print(f"Analysis complete: {analysis.transcript_id}, {len(entities)} entities found")
return analysis
@app.get("/api/v1/transcripts/{transcript_id}", response_model=AnalysisResult)
async def get_transcript(transcript_id: str):
if transcript_id not in storage:
raise HTTPException(status_code=404, detail="Transcript not found")
return storage[transcript_id]
@app.get("/api/v1/transcripts")
async def list_transcripts():
return list(storage.values())
# Serve frontend
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)