Files
insightflow/backend/multimodal_processor.py
OpenClaw Bot 33555642db fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
2026-02-28 03:03:50 +08:00

433 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Multimodal Processor - Phase 7
视频处理模块提取音频、关键帧、OCR识别
"""
import json
import os
import subprocess
import tempfile
import uuid
from dataclasses import dataclass
from pathlib import Path
# 尝试导入OCR库
try:
import pytesseract
from PIL import Image
PYTESSERACT_AVAILABLE = True
except ImportError:
PYTESSERACT_AVAILABLE = False
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import ffmpeg
FFMPEG_AVAILABLE = True
except ImportError:
FFMPEG_AVAILABLE = False
@dataclass
class VideoFrame:
"""视频关键帧数据类"""
id: str
video_id: str
frame_number: int
timestamp: float
frame_path: str
ocr_text: str = ""
ocr_confidence: float = 0.0
entities_detected: list[dict] = None
def __post_init__(self):
if self.entities_detected is None:
self.entities_detected = []
@dataclass
class VideoInfo:
"""视频信息数据类"""
id: str
project_id: str
filename: str
file_path: str
duration: float = 0.0
width: int = 0
height: int = 0
fps: float = 0.0
audio_extracted: bool = False
audio_path: str = ""
transcript_id: str = ""
status: str = "pending"
error_message: str = ""
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class VideoProcessingResult:
"""视频处理结果"""
video_id: str
audio_path: str
frames: list[VideoFrame]
ocr_results: list[dict]
full_text: str # 整合的文本(音频转录 + OCR文本
success: bool
error_message: str = ""
class MultimodalProcessor:
"""多模态处理器 - 处理视频文件"""
def __init__(self, temp_dir: str = None, frame_interval: int = 5) -> None:
"""
初始化多模态处理器
Args:
temp_dir: 临时文件目录
frame_interval: 关键帧提取间隔(秒)
"""
self.temp_dir = temp_dir or tempfile.gettempdir()
self.frame_interval = frame_interval
self.video_dir = os.path.join(self.temp_dir, "videos")
self.frames_dir = os.path.join(self.temp_dir, "frames")
self.audio_dir = os.path.join(self.temp_dir, "audio")
# 创建目录
os.makedirs(self.video_dir, exist_ok=True)
os.makedirs(self.frames_dir, exist_ok=True)
os.makedirs(self.audio_dir, exist_ok=True)
def extract_video_info(self, video_path: str) -> dict:
"""
提取视频基本信息
Args:
video_path: 视频文件路径
Returns:
视频信息字典
"""
try:
if FFMPEG_AVAILABLE:
probe = ffmpeg.probe(video_path)
video_stream = next((s for s in probe["streams"] if s["codec_type"] == "video"), None)
audio_stream = next((s for s in probe["streams"] if s["codec_type"] == "audio"), None)
if video_stream:
return {
"duration": float(probe["format"].get("duration", 0)),
"width": int(video_stream.get("width", 0)),
"height": int(video_stream.get("height", 0)),
"fps": eval(video_stream.get("r_frame_rate", "0/1")),
"has_audio": audio_stream is not None,
"bitrate": int(probe["format"].get("bit_rate", 0)),
}
else:
# 使用 ffprobe 命令行
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration,bit_rate",
"-show_entries",
"stream=width,height,r_frame_rate",
"-of",
"json",
video_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
"duration": float(data["format"].get("duration", 0)),
"width": int(data["streams"][0].get("width", 0)) if data["streams"] else 0,
"height": int(data["streams"][0].get("height", 0)) if data["streams"] else 0,
"fps": 30.0, # 默认值
"has_audio": len(data["streams"]) > 1,
"bitrate": int(data["format"].get("bit_rate", 0)),
}
except Exception as e:
print(f"Error extracting video info: {e}")
return {"duration": 0, "width": 0, "height": 0, "fps": 0, "has_audio": False, "bitrate": 0}
def extract_audio(self, video_path: str, output_path: str = None) -> str:
"""
从视频中提取音频
Args:
video_path: 视频文件路径
output_path: 输出音频路径(可选)
Returns:
提取的音频文件路径
"""
if output_path is None:
video_name = Path(video_path).stem
output_path = os.path.join(self.audio_dir, f"{video_name}.wav")
try:
if FFMPEG_AVAILABLE:
(
ffmpeg.input(video_path)
.output(output_path, ac=1, ar=16000, vn=None)
.overwrite_output()
.run(quiet=True)
)
else:
# 使用命令行 ffmpeg
cmd = [
"ffmpeg",
"-i",
video_path,
"-vn",
"-acodec",
"pcm_s16le",
"-ac",
"1",
"-ar",
"16000",
"-y",
output_path,
]
subprocess.run(cmd, check=True, capture_output=True)
return output_path
except Exception as e:
print(f"Error extracting audio: {e}")
raise
def extract_keyframes(self, video_path: str, video_id: str, interval: int = None) -> list[str]:
"""
从视频中提取关键帧
Args:
video_path: 视频文件路径
video_id: 视频ID
interval: 提取间隔(秒),默认使用初始化时的间隔
Returns:
提取的帧文件路径列表
"""
interval = interval or self.frame_interval
frame_paths = []
# 创建帧存储目录
video_frames_dir = os.path.join(self.frames_dir, video_id)
os.makedirs(video_frames_dir, exist_ok=True)
try:
if CV2_AVAILABLE:
# 使用 OpenCV 提取帧
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_interval_frames = int(fps * interval)
frame_number = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % frame_interval_frames == 0:
timestamp = frame_number / fps
frame_path = os.path.join(video_frames_dir, f"frame_{frame_number:06d}_{timestamp:.2f}.jpg")
cv2.imwrite(frame_path, frame)
frame_paths.append(frame_path)
frame_number += 1
cap.release()
else:
# 使用 ffmpeg 命令行提取帧
Path(video_path).stem
output_pattern = os.path.join(video_frames_dir, "frame_%06d_%t.jpg")
cmd = ["ffmpeg", "-i", video_path, "-vf", f"fps=1/{interval}", "-frame_pts", "1", "-y", output_pattern]
subprocess.run(cmd, check=True, capture_output=True)
# 获取生成的帧文件列表
frame_paths = sorted(
[os.path.join(video_frames_dir, f) for f in os.listdir(video_frames_dir) if f.startswith("frame_")]
)
except Exception as e:
print(f"Error extracting keyframes: {e}")
return frame_paths
def perform_ocr(self, image_path: str) -> tuple[str, float]:
"""
对图片进行OCR识别
Args:
image_path: 图片文件路径
Returns:
(识别的文本, 置信度)
"""
if not PYTESSERACT_AVAILABLE:
return "", 0.0
try:
image = Image.open(image_path)
# 预处理:转换为灰度图
if image.mode != "L":
image = image.convert("L")
# 使用 pytesseract 进行 OCR
text = pytesseract.image_to_string(image, lang="chi_sim+eng")
# 获取置信度数据
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
confidences = [int(c) for c in data["conf"] if int(c) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
return text.strip(), avg_confidence / 100.0
except Exception as e:
print(f"OCR error for {image_path}: {e}")
return "", 0.0
def process_video(
self, video_data: bytes, filename: str, project_id: str, video_id: str = None
) -> VideoProcessingResult:
"""
处理视频文件提取音频、关键帧、OCR
Args:
video_data: 视频文件二进制数据
filename: 视频文件名
project_id: 项目ID
video_id: 视频ID可选自动生成
Returns:
视频处理结果
"""
video_id = video_id or str(uuid.uuid4())[:8]
try:
# 保存视频文件
video_path = os.path.join(self.video_dir, f"{video_id}_{filename}")
with open(video_path, "wb") as f:
f.write(video_data)
# 提取视频信息
video_info = self.extract_video_info(video_path)
# 提取音频
audio_path = ""
if video_info["has_audio"]:
audio_path = self.extract_audio(video_path)
# 提取关键帧
frame_paths = self.extract_keyframes(video_path, video_id)
# 对关键帧进行 OCR
frames = []
ocr_results = []
all_ocr_text = []
for i, frame_path in enumerate(frame_paths):
# 解析帧信息
frame_name = os.path.basename(frame_path)
parts = frame_name.replace(".jpg", "").split("_")
frame_number = int(parts[1]) if len(parts) > 1 else i
timestamp = float(parts[2]) if len(parts) > 2 else i * self.frame_interval
# OCR 识别
ocr_text, confidence = self.perform_ocr(frame_path)
frame = VideoFrame(
id=str(uuid.uuid4())[:8],
video_id=video_id,
frame_number=frame_number,
timestamp=timestamp,
frame_path=frame_path,
ocr_text=ocr_text,
ocr_confidence=confidence,
)
frames.append(frame)
if ocr_text:
ocr_results.append(
{
"frame_number": frame_number,
"timestamp": timestamp,
"text": ocr_text,
"confidence": confidence,
}
)
all_ocr_text.append(ocr_text)
# 整合所有 OCR 文本
full_ocr_text = "\n\n".join(all_ocr_text)
return VideoProcessingResult(
video_id=video_id,
audio_path=audio_path,
frames=frames,
ocr_results=ocr_results,
full_text=full_ocr_text,
success=True,
)
except Exception as e:
return VideoProcessingResult(
video_id=video_id,
audio_path="",
frames=[],
ocr_results=[],
full_text="",
success=False,
error_message=str(e),
)
def cleanup(self, video_id: str = None) -> None:
"""
清理临时文件
Args:
video_id: 视频ID可选清理特定视频的文件
"""
import shutil
if video_id:
# 清理特定视频的文件
for dir_path in [self.video_dir, self.frames_dir, self.audio_dir]:
target_dir = os.path.join(dir_path, video_id) if dir_path == self.frames_dir else dir_path
if os.path.exists(target_dir):
for f in os.listdir(target_dir):
if video_id in f:
os.remove(os.path.join(target_dir, f))
else:
# 清理所有临时文件
for dir_path in [self.video_dir, self.frames_dir, self.audio_dir]:
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
os.makedirs(dir_path, exist_ok=True)
# Singleton instance
_multimodal_processor = None
def get_multimodal_processor(temp_dir: str = None, frame_interval: int = 5) -> MultimodalProcessor:
"""获取多模态处理器单例"""
global _multimodal_processor
if _multimodal_processor is None:
_multimodal_processor = MultimodalProcessor(temp_dir, frame_interval)
return _multimodal_processor