#!/usr/bin/env python3 """ 阿里听悟 API 封装 文档: https://help.aliyun.com/document_detail/2712534.html """ import os import time import json import httpx from typing import Optional, Dict, Any from datetime import datetime class TingwuClient: def __init__(self): self.access_key = os.getenv("ALI_ACCESS_KEY") self.secret_key = os.getenv("ALI_SECRET_KEY") self.endpoint = "https://tingwu.cn-beijing.aliyuncs.com" if not self.access_key or not self.secret_key: raise ValueError("ALI_ACCESS_KEY and ALI_SECRET_KEY required") def _sign_request(self, method: str, uri: str, body: str = "") -> Dict[str, str]: """签名请求(简化版,实际生产需要完整签名实现)""" from alibabacloud_tea_openapi import models as open_api_models # 使用阿里云 SDK 签名 # 这里简化处理,实际应该使用官方 SDK return { "Content-Type": "application/json", "x-acs-action": uri.split("?")[0].split("/")[-1], "x-acs-version": "2023-09-30" } def create_task(self, audio_url: str, language: str = "zh") -> str: """创建听悟任务,返回 task_id""" url = f"{self.endpoint}/openapi/tingwu/v2/tasks" payload = { "Input": { "Source": "OSS", "FileUrl": audio_url }, "Parameters": { "Transcription": { "DiarizationEnabled": True, "SentenceMaxLength": 20 }, "Summarization": { "Enabled": False } } } try: response = httpx.post( url, json=payload, headers=self._sign_request("POST", "/openapi/tingwu/v2/tasks"), timeout=30.0 ) response.raise_for_status() result = response.json() if result.get("Code") == "0": return result["Data"]["TaskId"] else: raise Exception(f"Create task failed: {result.get('Message')}") except Exception as e: print(f"Create task error: {e}") raise def get_task_result(self, task_id: str, max_retries: int = 60, interval: int = 5) -> Dict[str, Any]: """轮询获取任务结果""" url = f"{self.endpoint}/openapi/tingwu/v2/tasks/{task_id}" for i in range(max_retries): try: response = httpx.get( url, headers=self._sign_request("GET", f"/openapi/tingwu/v2/tasks/{task_id}"), timeout=30.0 ) response.raise_for_status() result = response.json() if result.get("Code") != "0": raise Exception(f"Query failed: {result.get('Message')}") data = result["Data"] status = data.get("TaskStatus") if status == "SUCCESS": return self._parse_result(data) elif status == "FAILED": raise Exception(f"Task failed: {data.get('ErrorMessage')}") # 继续等待 print(f"Task {task_id} status: {status}, retry {i+1}/{max_retries}") time.sleep(interval) except Exception as e: print(f"Query error: {e}") time.sleep(interval) raise TimeoutError(f"Task {task_id} timeout") def _parse_result(self, data: Dict) -> Dict[str, Any]: """解析听悟结果""" result = data.get("Result", {}) transcription = result.get("Transcription", {}) # 提取全文 full_text = "" paragraphs = transcription.get("Paragraphs", []) for para in paragraphs: full_text += para.get("Text", "") + " " # 提取带说话人的片段 segments = [] sentences = transcription.get("Sentences", []) for sent in sentences: segments.append({ "start": sent.get("BeginTime", 0) / 1000, # ms to s "end": sent.get("EndTime", 0) / 1000, "text": sent.get("Text", ""), "speaker": f"Speaker {sent.get('SpeakerId', 'A')}" }) return { "full_text": full_text.strip(), "segments": segments } def transcribe(self, audio_url: str, language: str = "zh") -> Dict[str, Any]: """一键转录:创建任务并等待结果""" task_id = self.create_task(audio_url, language) print(f"Tingwu task created: {task_id}") return self.get_task_result(task_id)