fix: auto-fix code issues (cron)

- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
- 修复重复函数定义 (health_check, create_webhook_endpoint, etc)
- 修复未定义名称 (SearchOperator, TenantTier, Query, Body, logger)
- 修复 workflow_manager.py 的类定义重复问题
- 添加缺失的导入
This commit is contained in:
OpenClaw Bot
2026-02-27 09:18:58 +08:00
parent 1d55ae8f1e
commit be22b763fa
39 changed files with 12535 additions and 10327 deletions

View File

@@ -5,28 +5,23 @@
import os
import time
import json
import httpx
import hmac
import hashlib
import base64
from datetime import datetime
from typing import Optional, Dict, Any
from urllib.parse import quote
from typing import Dict, Any
class TingwuClient:
def __init__(self):
self.access_key = os.getenv("ALI_ACCESS_KEY", "")
self.secret_key = os.getenv("ALI_SECRET_KEY", "")
self.endpoint = "https://tingwu.cn-beijing.aliyuncs.com"
if not self.access_key or not self.secret_key:
raise ValueError("ALI_ACCESS_KEY and ALI_SECRET_KEY required")
def _sign_request(self, method: str, uri: str, query: str = "", body: str = "") -> Dict[str, str]:
"""阿里云签名 V3"""
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
# 简化签名,实际生产需要完整实现
# 这里使用基础认证头
return {
@@ -36,11 +31,11 @@ class TingwuClient:
"x-acs-date": timestamp,
"Authorization": f"ACS3-HMAC-SHA256 Credential={self.access_key}/acs/tingwu/cn-beijing",
}
def create_task(self, audio_url: str, language: str = "zh") -> str:
"""创建听悟任务"""
url = f"{self.endpoint}/openapi/tingwu/v2/tasks"
f"{self.endpoint}/openapi/tingwu/v2/tasks"
payload = {
"Input": {
"Source": "OSS",
@@ -53,20 +48,20 @@ class TingwuClient:
}
}
}
# 使用阿里云 SDK 方式调用
try:
from alibabacloud_tingwu20230930 import models as tingwu_models
from alibabacloud_tingwu20230930.client import Client as TingwuSDKClient
from alibabacloud_tea_openapi import models as open_api_models
config = open_api_models.Config(
access_key_id=self.access_key,
access_key_secret=self.secret_key
)
config.endpoint = "tingwu.cn-beijing.aliyuncs.com"
client = TingwuSDKClient(config)
request = tingwu_models.CreateTaskRequest(
type="offline",
input=tingwu_models.Input(
@@ -80,13 +75,13 @@ class TingwuClient:
)
)
)
response = client.create_task(request)
if response.body.code == "0":
return response.body.data.task_id
else:
raise Exception(f"Create task failed: {response.body.message}")
except ImportError:
# Fallback: 使用 mock
print("Tingwu SDK not available, using mock")
@@ -94,59 +89,59 @@ class TingwuClient:
except Exception as e:
print(f"Tingwu API error: {e}")
return f"mock_task_{int(time.time())}"
def get_task_result(self, task_id: str, max_retries: int = 60, interval: int = 5) -> Dict[str, Any]:
"""获取任务结果"""
try:
from alibabacloud_tingwu20230930 import models as tingwu_models
from alibabacloud_tingwu20230930.client import Client as TingwuSDKClient
from alibabacloud_tea_openapi import models as open_api_models
config = open_api_models.Config(
access_key_id=self.access_key,
access_key_secret=self.secret_key
)
config.endpoint = "tingwu.cn-beijing.aliyuncs.com"
client = TingwuSDKClient(config)
for i in range(max_retries):
request = tingwu_models.GetTaskInfoRequest()
response = client.get_task_info(task_id, request)
if response.body.code != "0":
raise Exception(f"Query failed: {response.body.message}")
status = response.body.data.task_status
if status == "SUCCESS":
return self._parse_result(response.body.data)
elif status == "FAILED":
raise Exception(f"Task failed: {response.body.data.error_message}")
print(f"Task {task_id} status: {status}, retry {i+1}/{max_retries}")
print(f"Task {task_id} status: {status}, retry {i + 1}/{max_retries}")
time.sleep(interval)
except ImportError:
print("Tingwu SDK not available, using mock result")
return self._mock_result()
except Exception as e:
print(f"Get result error: {e}")
return self._mock_result()
raise TimeoutError(f"Task {task_id} timeout")
def _parse_result(self, data) -> Dict[str, Any]:
"""解析结果"""
result = data.result
transcription = result.transcription
full_text = ""
segments = []
if transcription.paragraphs:
for para in transcription.paragraphs:
full_text += para.text + " "
if transcription.sentences:
for sent in transcription.sentences:
segments.append({
@@ -155,12 +150,12 @@ class TingwuClient:
"text": sent.text,
"speaker": f"Speaker {sent.speaker_id}"
})
return {
"full_text": full_text.strip(),
"segments": segments
}
def _mock_result(self) -> Dict[str, Any]:
"""Mock 结果"""
return {
@@ -169,7 +164,7 @@ class TingwuClient:
{"start": 0.0, "end": 5.0, "text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语。", "speaker": "Speaker A"}
]
}
def transcribe(self, audio_url: str, language: str = "zh") -> Dict[str, Any]:
"""一键转录"""
task_id = self.create_task(audio_url, language)