#!/usr/bin/env python3 """ Document Processor - Phase 3 支持 PDF 和 DOCX 文档导入 """ import io import os class DocumentProcessor: """文档处理器 - 提取 PDF/DOCX 文本""" def __init__(self) -> None: self.supported_formats = { ".pdf": self._extract_pdf, ".docx": self._extract_docx, ".doc": self._extract_docx, ".txt": self._extract_txt, ".md": self._extract_txt, } def process(self, content: bytes, filename: str) -> dict[str, str]: """ 处理文档并提取文本 Args: content: 文件二进制内容 filename: 文件名 Returns: {"text": "提取的文本内容", "format": "文件格式"} """ ext = os.path.splitext(filename.lower())[1] if ext not in self.supported_formats: raise ValueError( f"Unsupported file format: {ext}. Supported: {list(self.supported_formats.keys())}", ) extractor = self.supported_formats[ext] text = extractor(content) # 清理文本 text = self._clean_text(text) return {"text": text, "format": ext, "filename": filename} def _extract_pdf(self, content: bytes) -> str: """提取 PDF 文本""" try: import PyPDF2 pdf_file = io.BytesIO(content) reader = PyPDF2.PdfReader(pdf_file) text_parts = [] for page in reader.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n\n".join(text_parts) except ImportError: # Fallback: 尝试使用 pdfplumber try: import pdfplumber text_parts = [] with pdfplumber.open(io.BytesIO(content)) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text_parts.append(page_text) return "\n\n".join(text_parts) except ImportError: raise ImportError( "PDF processing requires PyPDF2 or pdfplumber. " "Install with: pip install PyPDF2", ) except Exception as e: raise ValueError(f"PDF extraction failed: {e!s}") def _extract_docx(self, content: bytes) -> str: """提取 DOCX 文本""" try: import docx doc_file = io.BytesIO(content) doc = docx.Document(doc_file) text_parts = [] for para in doc.paragraphs: if para.text.strip(): text_parts.append(para.text) # 提取表格中的文本 for table in doc.tables: for row in table.rows: row_text = [] for cell in row.cells: if cell.text.strip(): row_text.append(cell.text.strip()) if row_text: text_parts.append(" | ".join(row_text)) return "\n\n".join(text_parts) except ImportError: raise ImportError( "DOCX processing requires python-docx. Install with: pip install python-docx", ) except Exception as e: raise ValueError(f"DOCX extraction failed: {e!s}") def _extract_txt(self, content: bytes) -> str: """提取纯文本""" # 尝试多种编码 encodings = ["utf-8", "gbk", "gb2312", "latin-1"] for encoding in encodings: try: return content.decode(encoding) except UnicodeDecodeError: continue # 如果都失败了,使用 latin-1 并忽略错误 return content.decode("latin-1", errors="ignore") def _clean_text(self, text: str) -> str: """清理提取的文本""" if not text: return "" # 移除多余的空白字符 lines = text.split("\n") cleaned_lines = [] for line in lines: line = line.strip() # 移除空行,但保留段落分隔 if line: cleaned_lines.append(line) # 合并行,保留段落结构 text = "\n\n".join(cleaned_lines) # 移除多余的空格 text = " ".join(text.split()) # 移除控制字符 text = "".join(char for char in text if ord(char) >= 32 or char in "\n\r\t") return text.strip() def is_supported(self, filename: str) -> bool: """检查文件格式是否支持""" ext = os.path.splitext(filename.lower())[1] return ext in self.supported_formats # 简单的文本提取器(不需要外部依赖) class SimpleTextExtractor: """简单的文本提取器,用于测试""" def extract(self, content: bytes, filename: str) -> str: """尝试提取文本""" encodings = ["utf-8", "gbk", "latin-1"] for encoding in encodings: try: return content.decode(encoding) except UnicodeDecodeError: continue return content.decode("latin-1", errors="ignore") if __name__ == "__main__": # 测试 processor = DocumentProcessor() # 测试文本提取 test_text = "Hello World\n\nThis is a test document.\n\nMultiple paragraphs." result = processor.process(test_text.encode("utf-8"), "test.txt") print(f"Text extraction test: {len(result['text'])} chars") print(result["text"][:100])