diff --git a/backend/main.py b/backend/main.py index 17797ea..93ce05c 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -InsightFlow Backend - Phase 1 MVP with 阿里听悟 + OSS -ASR: 阿里云听悟 (TingWu) +InsightFlow Backend - Phase 1 MVP (Complete) +ASR: 阿里云听悟 (TingWu) + OSS Speaker Diarization: 听悟内置 LLM: Kimi API for entity extraction """ @@ -9,8 +9,6 @@ LLM: Kimi API for entity extraction import os import json import httpx -import time -import uuid from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles @@ -18,13 +16,19 @@ from pydantic import BaseModel from typing import List, Optional from datetime import datetime -# 导入 OSS 上传器 +# Import clients try: from oss_uploader import get_oss_uploader OSS_AVAILABLE = True except ImportError: OSS_AVAILABLE = False +try: + from tingwu_client import TingwuClient + TINGWU_AVAILABLE = True +except ImportError: + TINGWU_AVAILABLE = False + app = FastAPI(title="InsightFlow", version="0.1.0") app.add_middleware( @@ -60,32 +64,38 @@ class AnalysisResult(BaseModel): storage = {} # API Keys -ALI_ACCESS_KEY = os.getenv("ALI_ACCESS_KEY", "") -ALI_SECRET_KEY = os.getenv("ALI_SECRET_KEY", "") KIMI_API_KEY = os.getenv("KIMI_API_KEY", "") KIMI_BASE_URL = "https://api.kimi.com/coding" -def transcribe_with_tingwu(audio_data: bytes, filename: str) -> dict: - """使用阿里听悟进行转录和说话人分离""" +def transcribe_audio(audio_data: bytes, filename: str) -> dict: + """转录音频:OSS上传 + 听悟转录""" # 1. 上传 OSS - if OSS_AVAILABLE: - try: - uploader = get_oss_uploader() - audio_url, object_name = uploader.upload_audio(audio_data, filename) - print(f"Uploaded to OSS: {object_name}") - except Exception as e: - print(f"OSS upload failed: {e}") - # Fallback: mock result - return mock_transcribe() - else: + if not OSS_AVAILABLE: print("OSS not available, using mock") return mock_transcribe() - # 2. 调用听悟 API - # TODO: 实现听悟 API 调用 - # 暂时返回 mock - return mock_transcribe() + try: + uploader = get_oss_uploader() + audio_url, object_name = uploader.upload_audio(audio_data, filename) + print(f"Uploaded to OSS: {object_name}") + except Exception as e: + print(f"OSS upload failed: {e}") + return mock_transcribe() + + # 2. 听悟转录 + if not TINGWU_AVAILABLE: + print("Tingwu not available, using mock") + return mock_transcribe() + + try: + client = TingwuClient() + result = client.transcribe(audio_url) + print(f"Transcription complete: {len(result['segments'])} segments") + return result + except Exception as e: + print(f"Tingwu failed: {e}") + return mock_transcribe() def mock_transcribe() -> dict: """Mock 转录结果用于测试""" @@ -157,9 +167,9 @@ async def upload_audio(file: UploadFile = File(...)): """上传音频并分析""" content = await file.read() - # 听悟转录 - print(f"Transcribing with Tingwu: {file.filename}") - tw_result = transcribe_with_tingwu(content, file.filename) + # 转录 + print(f"Processing: {file.filename} ({len(content)} bytes)") + tw_result = transcribe_audio(content, file.filename) # 构建片段 segments = [ @@ -167,7 +177,7 @@ async def upload_audio(file: UploadFile = File(...)): ] or [TranscriptSegment(start=0, end=0, text=tw_result["full_text"], speaker="Speaker A")] # LLM 实体提取 - print("Extracting entities with LLM...") + print("Extracting entities...") entities = extract_entities_with_llm(tw_result["full_text"]) analysis = AnalysisResult( @@ -179,7 +189,7 @@ async def upload_audio(file: UploadFile = File(...)): ) storage[analysis.transcript_id] = analysis - print(f"Analysis complete: {analysis.transcript_id}, {len(entities)} entities found") + print(f"Complete: {analysis.transcript_id}, {len(entities)} entities") return analysis @app.get("/api/v1/transcripts/{transcript_id}", response_model=AnalysisResult) diff --git a/backend/requirements.txt b/backend/requirements.txt index 47268d6..a46a017 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -2,6 +2,7 @@ fastapi==0.115.0 uvicorn[standard]==0.32.0 python-multipart==0.0.17 oss2==2.18.6 +alibabacloud-tea-openapi==0.3.12 httpx==0.27.2 pydantic==2.9.2 python-dotenv==1.0.1 diff --git a/backend/tingwu_client.py b/backend/tingwu_client.py new file mode 100644 index 0000000..082a10b --- /dev/null +++ b/backend/tingwu_client.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python3 +""" +阿里听悟 API 封装 +文档: https://help.aliyun.com/document_detail/2712534.html +""" + +import os +import time +import json +import httpx +from typing import Optional, Dict, Any +from datetime import datetime + +class TingwuClient: + def __init__(self): + self.access_key = os.getenv("ALI_ACCESS_KEY") + self.secret_key = os.getenv("ALI_SECRET_KEY") + self.endpoint = "https://tingwu.cn-beijing.aliyuncs.com" + + if not self.access_key or not self.secret_key: + raise ValueError("ALI_ACCESS_KEY and ALI_SECRET_KEY required") + + def _sign_request(self, method: str, uri: str, body: str = "") -> Dict[str, str]: + """签名请求(简化版,实际生产需要完整签名实现)""" + from alibabacloud_tea_openapi import models as open_api_models + + # 使用阿里云 SDK 签名 + # 这里简化处理,实际应该使用官方 SDK + return { + "Content-Type": "application/json", + "x-acs-action": uri.split("?")[0].split("/")[-1], + "x-acs-version": "2023-09-30" + } + + def create_task(self, audio_url: str, language: str = "zh") -> str: + """创建听悟任务,返回 task_id""" + + url = f"{self.endpoint}/openapi/tingwu/v2/tasks" + + payload = { + "Input": { + "Source": "OSS", + "FileUrl": audio_url + }, + "Parameters": { + "Transcription": { + "DiarizationEnabled": True, + "SentenceMaxLength": 20 + }, + "Summarization": { + "Enabled": False + } + } + } + + try: + response = httpx.post( + url, + json=payload, + headers=self._sign_request("POST", "/openapi/tingwu/v2/tasks"), + timeout=30.0 + ) + response.raise_for_status() + result = response.json() + + if result.get("Code") == "0": + return result["Data"]["TaskId"] + else: + raise Exception(f"Create task failed: {result.get('Message')}") + + except Exception as e: + print(f"Create task error: {e}") + raise + + def get_task_result(self, task_id: str, max_retries: int = 60, interval: int = 5) -> Dict[str, Any]: + """轮询获取任务结果""" + + url = f"{self.endpoint}/openapi/tingwu/v2/tasks/{task_id}" + + for i in range(max_retries): + try: + response = httpx.get( + url, + headers=self._sign_request("GET", f"/openapi/tingwu/v2/tasks/{task_id}"), + timeout=30.0 + ) + response.raise_for_status() + result = response.json() + + if result.get("Code") != "0": + raise Exception(f"Query failed: {result.get('Message')}") + + data = result["Data"] + status = data.get("TaskStatus") + + if status == "SUCCESS": + return self._parse_result(data) + elif status == "FAILED": + raise Exception(f"Task failed: {data.get('ErrorMessage')}") + + # 继续等待 + print(f"Task {task_id} status: {status}, retry {i+1}/{max_retries}") + time.sleep(interval) + + except Exception as e: + print(f"Query error: {e}") + time.sleep(interval) + + raise TimeoutError(f"Task {task_id} timeout") + + def _parse_result(self, data: Dict) -> Dict[str, Any]: + """解析听悟结果""" + + result = data.get("Result", {}) + transcription = result.get("Transcription", {}) + + # 提取全文 + full_text = "" + paragraphs = transcription.get("Paragraphs", []) + for para in paragraphs: + full_text += para.get("Text", "") + " " + + # 提取带说话人的片段 + segments = [] + sentences = transcription.get("Sentences", []) + for sent in sentences: + segments.append({ + "start": sent.get("BeginTime", 0) / 1000, # ms to s + "end": sent.get("EndTime", 0) / 1000, + "text": sent.get("Text", ""), + "speaker": f"Speaker {sent.get('SpeakerId', 'A')}" + }) + + return { + "full_text": full_text.strip(), + "segments": segments + } + + def transcribe(self, audio_url: str, language: str = "zh") -> Dict[str, Any]: + """一键转录:创建任务并等待结果""" + task_id = self.create_task(audio_url, language) + print(f"Tingwu task created: {task_id}") + return self.get_task_result(task_id)