#!/usr/bin/env python3 """ InsightFlow Multimodal Processor - Phase 7 视频处理模块:提取音频、关键帧、OCR识别 """ import os import json import uuid import tempfile import subprocess from typing import List, Dict, Optional, Tuple from dataclasses import dataclass from pathlib import Path # 尝试导入OCR库 try: import pytesseract from PIL import Image PYTESSERACT_AVAILABLE = True except ImportError: PYTESSERACT_AVAILABLE = False try: import cv2 CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False try: import ffmpeg FFMPEG_AVAILABLE = True except ImportError: FFMPEG_AVAILABLE = False @dataclass class VideoFrame: """视频关键帧数据类""" id: str video_id: str frame_number: int timestamp: float frame_path: str ocr_text: str = "" ocr_confidence: float = 0.0 entities_detected: List[Dict] = None def __post_init__(self): if self.entities_detected is None: self.entities_detected = [] @dataclass class VideoInfo: """视频信息数据类""" id: str project_id: str filename: str file_path: str duration: float = 0.0 width: int = 0 height: int = 0 fps: float = 0.0 audio_extracted: bool = False audio_path: str = "" transcript_id: str = "" status: str = "pending" error_message: str = "" metadata: Dict = None def __post_init__(self): if self.metadata is None: self.metadata = {} @dataclass class VideoProcessingResult: """视频处理结果""" video_id: str audio_path: str frames: List[VideoFrame] ocr_results: List[Dict] full_text: str # 整合的文本(音频转录 + OCR文本) success: bool error_message: str = "" class MultimodalProcessor: """多模态处理器 - 处理视频文件""" def __init__(self, temp_dir: str = None, frame_interval: int = 5): """ 初始化多模态处理器 Args: temp_dir: 临时文件目录 frame_interval: 关键帧提取间隔(秒) """ self.temp_dir = temp_dir or tempfile.gettempdir() self.frame_interval = frame_interval self.video_dir = os.path.join(self.temp_dir, "videos") self.frames_dir = os.path.join(self.temp_dir, "frames") self.audio_dir = os.path.join(self.temp_dir, "audio") # 创建目录 os.makedirs(self.video_dir, exist_ok=True) os.makedirs(self.frames_dir, exist_ok=True) os.makedirs(self.audio_dir, exist_ok=True) def extract_video_info(self, video_path: str) -> Dict: """ 提取视频基本信息 Args: video_path: 视频文件路径 Returns: 视频信息字典 """ try: if FFMPEG_AVAILABLE: probe = ffmpeg.probe(video_path) video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None) audio_stream = next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None) if video_stream: return { 'duration': float(probe['format'].get('duration', 0)), 'width': int(video_stream.get('width', 0)), 'height': int(video_stream.get('height', 0)), 'fps': eval(video_stream.get('r_frame_rate', '0/1')), 'has_audio': audio_stream is not None, 'bitrate': int(probe['format'].get('bit_rate', 0)) } else: # 使用 ffprobe 命令行 cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration,bit_rate', '-show_entries', 'stream=width,height,r_frame_rate', '-of', 'json', video_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: data = json.loads(result.stdout) return { 'duration': float(data['format'].get('duration', 0)), 'width': int(data['streams'][0].get('width', 0)) if data['streams'] else 0, 'height': int(data['streams'][0].get('height', 0)) if data['streams'] else 0, 'fps': 30.0, # 默认值 'has_audio': len(data['streams']) > 1, 'bitrate': int(data['format'].get('bit_rate', 0)) } except Exception as e: print(f"Error extracting video info: {e}") return { 'duration': 0, 'width': 0, 'height': 0, 'fps': 0, 'has_audio': False, 'bitrate': 0 } def extract_audio(self, video_path: str, output_path: str = None) -> str: """ 从视频中提取音频 Args: video_path: 视频文件路径 output_path: 输出音频路径(可选) Returns: 提取的音频文件路径 """ if output_path is None: video_name = Path(video_path).stem output_path = os.path.join(self.audio_dir, f"{video_name}.wav") try: if FFMPEG_AVAILABLE: ( ffmpeg .input(video_path) .output(output_path, ac=1, ar=16000, vn=None) .overwrite_output() .run(quiet=True) ) else: # 使用命令行 ffmpeg cmd = [ 'ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ac', '1', '-ar', '16000', '-y', output_path ] subprocess.run(cmd, check=True, capture_output=True) return output_path except Exception as e: print(f"Error extracting audio: {e}") raise def extract_keyframes(self, video_path: str, video_id: str, interval: int = None) -> List[str]: """ 从视频中提取关键帧 Args: video_path: 视频文件路径 video_id: 视频ID interval: 提取间隔(秒),默认使用初始化时的间隔 Returns: 提取的帧文件路径列表 """ interval = interval or self.frame_interval frame_paths = [] # 创建帧存储目录 video_frames_dir = os.path.join(self.frames_dir, video_id) os.makedirs(video_frames_dir, exist_ok=True) try: if CV2_AVAILABLE: # 使用 OpenCV 提取帧 cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_interval_frames = int(fps * interval) frame_number = 0 while True: ret, frame = cap.read() if not ret: break if frame_number % frame_interval_frames == 0: timestamp = frame_number / fps frame_path = os.path.join( video_frames_dir, f"frame_{frame_number:06d}_{timestamp:.2f}.jpg" ) cv2.imwrite(frame_path, frame) frame_paths.append(frame_path) frame_number += 1 cap.release() else: # 使用 ffmpeg 命令行提取帧 video_name = Path(video_path).stem output_pattern = os.path.join(video_frames_dir, "frame_%06d_%t.jpg") cmd = [ 'ffmpeg', '-i', video_path, '-vf', f'fps=1/{interval}', '-frame_pts', '1', '-y', output_pattern ] subprocess.run(cmd, check=True, capture_output=True) # 获取生成的帧文件列表 frame_paths = sorted([ os.path.join(video_frames_dir, f) for f in os.listdir(video_frames_dir) if f.startswith('frame_') ]) except Exception as e: print(f"Error extracting keyframes: {e}") return frame_paths def perform_ocr(self, image_path: str) -> Tuple[str, float]: """ 对图片进行OCR识别 Args: image_path: 图片文件路径 Returns: (识别的文本, 置信度) """ if not PYTESSERACT_AVAILABLE: return "", 0.0 try: image = Image.open(image_path) # 预处理:转换为灰度图 if image.mode != 'L': image = image.convert('L') # 使用 pytesseract 进行 OCR text = pytesseract.image_to_string(image, lang='chi_sim+eng') # 获取置信度数据 data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT) confidences = [int(c) for c in data['conf'] if int(c) > 0] avg_confidence = sum(confidences) / len(confidences) if confidences else 0 return text.strip(), avg_confidence / 100.0 except Exception as e: print(f"OCR error for {image_path}: {e}") return "", 0.0 def process_video(self, video_data: bytes, filename: str, project_id: str, video_id: str = None) -> VideoProcessingResult: """ 处理视频文件:提取音频、关键帧、OCR Args: video_data: 视频文件二进制数据 filename: 视频文件名 project_id: 项目ID video_id: 视频ID(可选,自动生成) Returns: 视频处理结果 """ video_id = video_id or str(uuid.uuid4())[:8] try: # 保存视频文件 video_path = os.path.join(self.video_dir, f"{video_id}_{filename}") with open(video_path, 'wb') as f: f.write(video_data) # 提取视频信息 video_info = self.extract_video_info(video_path) # 提取音频 audio_path = "" if video_info['has_audio']: audio_path = self.extract_audio(video_path) # 提取关键帧 frame_paths = self.extract_keyframes(video_path, video_id) # 对关键帧进行 OCR frames = [] ocr_results = [] all_ocr_text = [] for i, frame_path in enumerate(frame_paths): # 解析帧信息 frame_name = os.path.basename(frame_path) parts = frame_name.replace('.jpg', '').split('_') frame_number = int(parts[1]) if len(parts) > 1 else i timestamp = float(parts[2]) if len(parts) > 2 else i * self.frame_interval # OCR 识别 ocr_text, confidence = self.perform_ocr(frame_path) frame = VideoFrame( id=str(uuid.uuid4())[:8], video_id=video_id, frame_number=frame_number, timestamp=timestamp, frame_path=frame_path, ocr_text=ocr_text, ocr_confidence=confidence ) frames.append(frame) if ocr_text: ocr_results.append({ 'frame_number': frame_number, 'timestamp': timestamp, 'text': ocr_text, 'confidence': confidence }) all_ocr_text.append(ocr_text) # 整合所有 OCR 文本 full_ocr_text = "\n\n".join(all_ocr_text) return VideoProcessingResult( video_id=video_id, audio_path=audio_path, frames=frames, ocr_results=ocr_results, full_text=full_ocr_text, success=True ) except Exception as e: return VideoProcessingResult( video_id=video_id, audio_path="", frames=[], ocr_results=[], full_text="", success=False, error_message=str(e) ) def cleanup(self, video_id: str = None): """ 清理临时文件 Args: video_id: 视频ID(可选,清理特定视频的文件) """ import shutil if video_id: # 清理特定视频的文件 for dir_path in [self.video_dir, self.frames_dir, self.audio_dir]: target_dir = os.path.join(dir_path, video_id) if dir_path == self.frames_dir else dir_path if os.path.exists(target_dir): for f in os.listdir(target_dir): if video_id in f: os.remove(os.path.join(target_dir, f)) else: # 清理所有临时文件 for dir_path in [self.video_dir, self.frames_dir, self.audio_dir]: if os.path.exists(dir_path): shutil.rmtree(dir_path) os.makedirs(dir_path, exist_ok=True) # Singleton instance _multimodal_processor = None def get_multimodal_processor(temp_dir: str = None, frame_interval: int = 5) -> MultimodalProcessor: """获取多模态处理器单例""" global _multimodal_processor if _multimodal_processor is None: _multimodal_processor = MultimodalProcessor(temp_dir, frame_interval) return _multimodal_processor