#!/usr/bin/env python3 """ Document Processor - Phase 3 支持 PDF 和 DOCX 文档导入 """ import os import io from typing import Dict, Optional class DocumentProcessor: """文档处理器 - 提取 PDF/DOCX 文本""" def __init__(self): self.supported_formats = { '.pdf': self._extract_pdf, '.docx': self._extract_docx, '.doc': self._extract_docx, '.txt': self._extract_txt, '.md': self._extract_txt, } def process(self, content: bytes, filename: str) -> Dict[str, str]: """ 处理文档并提取文本 Args: content: 文件二进制内容 filename: 文件名 Returns: {"text": "提取的文本内容", "format": "文件格式"} """ ext = os.path.splitext(filename.lower())[1] if ext not in self.supported_formats: raise ValueError(f"Unsupported file format: {ext}. Supported: {list(self.supported_formats.keys())}") extractor = self.supported_formats[ext] text = extractor(content) # 清理文本 text = self._clean_text(text) return { "text": text, "format": ext, "filename": filename } def _extract_pdf(self, content: bytes) -> str: """提取 PDF 文本""" try: import PyPDF2 pdf_file = io.BytesIO(content) reader = PyPDF2.PdfReader(pdf_file) text_parts = [] for page in reader.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n\n".join(text_parts) except ImportError: # Fallback: 尝试使用 pdfplumber try: import pdfplumber text_parts = [] with pdfplumber.open(io.BytesIO(content)) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n\n".join(text_parts) except ImportError: raise ImportError("PDF processing requires PyPDF2 or pdfplumber. Install with: pip install PyPDF2") except Exception as e: raise ValueError(f"PDF extraction failed: {str(e)}") def _extract_docx(self, content: bytes) -> str: """提取 DOCX 文本""" try: import docx doc_file = io.BytesIO(content) doc = docx.Document(doc_file) text_parts = [] for para in doc.paragraphs: if para.text.strip(): text_parts.append(para.text) # 提取表格中的文本 for table in doc.tables: for row in table.rows: row_text = [] for cell in row.cells: if cell.text.strip(): row_text.append(cell.text.strip()) if row_text: text_parts.append(" | ".join(row_text)) return "\n\n".join(text_parts) except ImportError: raise ImportError("DOCX processing requires python-docx. Install with: pip install python-docx") except Exception as e: raise ValueError(f"DOCX extraction failed: {str(e)}") def _extract_txt(self, content: bytes) -> str: """提取纯文本""" # 尝试多种编码 encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1'] for encoding in encodings: try: return content.decode(encoding) except UnicodeDecodeError: continue # 如果都失败了,使用 latin-1 并忽略错误 return content.decode('latin-1', errors='ignore') def _clean_text(self, text: str) -> str: """清理提取的文本""" if not text: return "" # 移除多余的空白字符 lines = text.split('\n') cleaned_lines = [] for line in lines: line = line.strip() # 移除空行,但保留段落分隔 if line: cleaned_lines.append(line) # 合并行,保留段落结构 text = '\n\n'.join(cleaned_lines) # 移除多余的空格 text = ' '.join(text.split()) # 移除控制字符 text = ''.join(char for char in text if ord(char) >= 32 or char in '\n\r\t') return text.strip() def is_supported(self, filename: str) -> bool: """检查文件格式是否支持""" ext = os.path.splitext(filename.lower())[1] return ext in self.supported_formats # 简单的文本提取器(不需要外部依赖) class SimpleTextExtractor: """简单的文本提取器,用于测试""" def extract(self, content: bytes, filename: str) -> str: """尝试提取文本""" encodings = ['utf-8', 'gbk', 'latin-1'] for encoding in encodings: try: return content.decode(encoding) except UnicodeDecodeError: continue return content.decode('latin-1', errors='ignore') if __name__ == "__main__": # 测试 processor = DocumentProcessor() # 测试文本提取 test_text = "Hello World\n\nThis is a test document.\n\nMultiple paragraphs." result = processor.process(test_text.encode('utf-8'), "test.txt") print(f"Text extraction test: {len(result['text'])} chars") print(result['text'][:100])