Files
insightflow/backend/oss_uploader.py
AutoFix Bot 71b0d137d2 fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
- 修复main.py中的语法错误(缺失try语句的from导入)
- 添加缺失的timedelta导入到plugin_manager.py
- 添加缺失的urllib.parse导入到plugin_manager.py和workflow_manager.py
- 添加缺失的os导入到document_processor.py
- 修复import排序问题
- 修复行长度超过100字符的问题
- 添加缺失的Alert导入到test_phase8_task8.py
- 添加缺失的get_export_manager导入到main.py
2026-03-04 09:27:30 +08:00

52 lines
1.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
OSS 上传工具 - 用于阿里听悟音频上传
"""
import os
import uuid
from datetime import datetime
import oss2
class OSSUploader:
def __init__(self) -> None:
self.access_key = os.getenv("ALI_ACCESS_KEY")
self.secret_key = os.getenv("ALI_SECRET_KEY")
self.bucket_name = os.getenv("OSS_BUCKET", "insightflow-audio")
self.region = os.getenv("OSS_REGION", "oss-cn-hangzhou.aliyuncs.com")
self.endpoint = f"https://{self.region}"
if not self.access_key or not self.secret_key:
raise ValueError("ALI_ACCESS_KEY and ALI_SECRET_KEY must be set")
self.auth = oss2.Auth(self.access_key, self.secret_key)
self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
def upload_audio(self, audio_data: bytes, filename: str) -> tuple:
"""上传音频到 OSS返回 (URL, object_name)"""
# 生成唯一文件名
ext = os.path.splitext(filename)[1] or ".wav"
object_name = f"audio/{datetime.now().strftime('%Y%m%d')}/{uuid.uuid4().hex}{ext}"
# 上传文件
self.bucket.put_object(object_name, audio_data)
# 生成临时访问 URL (1小时有效)
url = self.bucket.sign_url("GET", object_name, 3600)
return url, object_name
def delete_object(self, object_name: str) -> None:
"""删除 OSS 对象"""
self.bucket.delete_object(object_name)
# 单例
_oss_uploader = None
def get_oss_uploader() -> OSSUploader:
global _oss_uploader
if _oss_uploader is None:
_oss_uploader = OSSUploader()
return _oss_uploader