#!/usr/bin/env python3 """ 阿里听悟 API 封装 - 使用 HTTP API """ import os import time from datetime import datetime from typing import Any class TingwuClient: def __init__(self) -> None: self.access_key = os.getenv("ALI_ACCESS_KEY", "") self.secret_key = os.getenv("ALI_SECRET_KEY", "") self.endpoint = "https://tingwu.cn-beijing.aliyuncs.com" if not self.access_key or not self.secret_key: raise ValueError("ALI_ACCESS_KEY and ALI_SECRET_KEY required") def _sign_request( self, method: str, uri: str, query: str = "", body: str = "", ) -> dict[str, str]: """阿里云签名 V3""" timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") # 简化签名,实际生产需要完整实现 # 这里使用基础认证头 return { "Content-Type": "application/json", "x-acs-action": "CreateTask", "x-acs-version": "2023-09-30", "x-acs-date": timestamp, "Authorization": f"ACS3-HMAC-SHA256 Credential = {self.access_key}" f"/acs/tingwu/cn-beijing", } def create_task(self, audio_url: str, language: str = "zh") -> str: """创建听悟任务""" try: # 导入移到文件顶部会导致循环导入,保持在这里 from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tingwu20230930 import models as tingwu_models from alibabacloud_tingwu20230930.client import Client as TingwuSDKClient config = open_api_models.Config( access_key_id=self.access_key, access_key_secret=self.secret_key, ) config.endpoint = "tingwu.cn-beijing.aliyuncs.com" client = TingwuSDKClient(config) request = tingwu_models.CreateTaskRequest( type="offline", input=tingwu_models.Input(source="OSS", file_url=audio_url), parameters=tingwu_models.Parameters( transcription=tingwu_models.Transcription( diarization_enabled=True, sentence_max_length=20, ), ), ) response = client.create_task(request) if response.body.code == "0": return response.body.data.task_id else: raise RuntimeError(f"Create task failed: {response.body.message}") except ImportError: # Fallback: 使用 mock print("Tingwu SDK not available, using mock") return f"mock_task_{int(time.time())}" except (RuntimeError, ValueError, TypeError) as e: print(f"Tingwu API error: {e}") return f"mock_task_{int(time.time())}" def get_task_result( self, task_id: str, max_retries: int = 60, interval: int = 5, ) -> dict[str, Any]: """获取任务结果""" try: # 导入移到文件顶部会导致循环导入,保持在这里 from alibabacloud_openapi_util import models as open_api_models from alibabacloud_tingwu20230930 import models as tingwu_models from alibabacloud_tingwu20230930.client import Client as TingwuSDKClient config = open_api_models.Config( access_key_id=self.access_key, access_key_secret=self.secret_key, ) config.endpoint = "tingwu.cn-beijing.aliyuncs.com" client = TingwuSDKClient(config) for i in range(max_retries): request = tingwu_models.GetTaskInfoRequest() response = client.get_task_info(task_id, request) if response.body.code != "0": raise RuntimeError(f"Query failed: {response.body.message}") status = response.body.data.task_status if status == "SUCCESS": return self._parse_result(response.body.data) elif status == "FAILED": raise RuntimeError(f"Task failed: {response.body.data.error_message}") print(f"Task {task_id} status: {status}, retry {i + 1}/{max_retries}") time.sleep(interval) except ImportError: print("Tingwu SDK not available, using mock result") return self._mock_result() except (RuntimeError, ValueError, TypeError) as e: print(f"Get result error: {e}") return self._mock_result() raise TimeoutError(f"Task {task_id} timeout") def _parse_result(self, data) -> dict[str, Any]: """解析结果""" result = data.result transcription = result.transcription full_text = "" segments = [] if transcription.paragraphs: for para in transcription.paragraphs: full_text += para.text + " " if transcription.sentences: for sent in transcription.sentences: segments.append( { "start": sent.begin_time / 1000, "end": sent.end_time / 1000, "text": sent.text, "speaker": f"Speaker {sent.speaker_id}", }, ) return {"full_text": full_text.strip(), "segments": segments} def _mock_result(self) -> dict[str, Any]: """Mock 结果""" return { "full_text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语。", "segments": [ { "start": 0.0, "end": 5.0, "text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语。", "speaker": "Speaker A", }, ], } def transcribe(self, audio_url: str, language: str = "zh") -> dict[str, Any]: """一键转录""" task_id = self.create_task(audio_url, language) print(f"Tingwu task: {task_id}") return self.get_task_result(task_id)