Files
insightflow/backend/oss_uploader.py
OpenClaw Bot 1a9b5391f7 fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
2026-02-28 09:15:51 +08:00

54 lines
1.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
OSS 上传工具 - 用于阿里听悟音频上传
"""
import os
import uuid
from datetime import datetime
import oss2
class OSSUploader:
def __init__(self):
self.access_key = os.getenv("ALI_ACCESS_KEY")
self.secret_key = os.getenv("ALI_SECRET_KEY")
self.bucket_name = os.getenv("OSS_BUCKET", "insightflow-audio")
self.region = os.getenv("OSS_REGION", "oss-cn-hangzhou.aliyuncs.com")
self.endpoint = f"https://{self.region}"
if not self.access_key or not self.secret_key:
raise ValueError("ALI_ACCESS_KEY and ALI_SECRET_KEY must be set")
self.auth = oss2.Auth(self.access_key, self.secret_key)
self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
def upload_audio(self, audio_data: bytes, filename: str) -> tuple:
"""上传音频到 OSS返回 (URL, object_name)"""
# 生成唯一文件名
ext = os.path.splitext(filename)[1] or ".wav"
object_name = f"audio/{datetime.now().strftime('%Y%m%d')}/{uuid.uuid4().hex}{ext}"
# 上传文件
self.bucket.put_object(object_name, audio_data)
# 生成临时访问 URL (1小时有效)
url = self.bucket.sign_url("GET", object_name, 3600)
return url, object_name
def delete_object(self, object_name: str) -> None:
"""删除 OSS 对象"""
self.bucket.delete_object(object_name)
# 单例
_oss_uploader = None
def get_oss_uploader() -> OSSUploader:
global _oss_uploader
if _oss_uploader is None:
_oss_uploader = OSSUploader()
return _oss_uploader