- 修复隐式 Optional 类型注解 (RUF013) - 修复不必要的赋值后返回 (RET504) - 优化列表推导式 (PERF401) - 修复未使用的参数 (ARG002) - 清理重复导入 - 优化异常处理
188 lines
5.6 KiB
Python
188 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Document Processor - Phase 3
|
|
支持 PDF 和 DOCX 文档导入
|
|
"""
|
|
|
|
import io
|
|
import os
|
|
|
|
|
|
class DocumentProcessor:
|
|
"""文档处理器 - 提取 PDF/DOCX 文本"""
|
|
|
|
def __init__(self) -> None:
|
|
self.supported_formats = {
|
|
".pdf": self._extract_pdf,
|
|
".docx": self._extract_docx,
|
|
".doc": self._extract_docx,
|
|
".txt": self._extract_txt,
|
|
".md": self._extract_txt,
|
|
}
|
|
|
|
def process(self, content: bytes, filename: str) -> dict[str, str]:
|
|
"""
|
|
处理文档并提取文本
|
|
|
|
Args:
|
|
content: 文件二进制内容
|
|
filename: 文件名
|
|
|
|
Returns:
|
|
{"text": "提取的文本内容", "format": "文件格式"}
|
|
"""
|
|
ext = os.path.splitext(filename.lower())[1]
|
|
|
|
if ext not in self.supported_formats:
|
|
raise ValueError(
|
|
f"Unsupported file format: {ext}. Supported: {list(self.supported_formats.keys())}",
|
|
)
|
|
|
|
extractor = self.supported_formats[ext]
|
|
text = extractor(content)
|
|
|
|
# 清理文本
|
|
text = self._clean_text(text)
|
|
|
|
return {"text": text, "format": ext, "filename": filename}
|
|
|
|
def _extract_pdf(self, content: bytes) -> str:
|
|
"""提取 PDF 文本"""
|
|
try:
|
|
import PyPDF2
|
|
|
|
pdf_file = io.BytesIO(content)
|
|
reader = PyPDF2.PdfReader(pdf_file)
|
|
|
|
text_parts = []
|
|
for page in reader.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text_parts.append(page_text)
|
|
|
|
return "\n\n".join(text_parts)
|
|
except ImportError:
|
|
# Fallback: 尝试使用 pdfplumber
|
|
try:
|
|
import pdfplumber
|
|
|
|
text_parts = []
|
|
with pdfplumber.open(io.BytesIO(content)) as pdf:
|
|
for page in pdf.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text_parts.append(page_text)
|
|
return "\n\n".join(text_parts)
|
|
except ImportError:
|
|
raise ImportError(
|
|
"PDF processing requires PyPDF2 or pdfplumber. Install with: pip install PyPDF2",
|
|
)
|
|
except Exception as e:
|
|
raise ValueError(f"PDF extraction failed: {e!s}")
|
|
|
|
def _extract_docx(self, content: bytes) -> str:
|
|
"""提取 DOCX 文本"""
|
|
try:
|
|
import docx
|
|
|
|
doc_file = io.BytesIO(content)
|
|
doc = docx.Document(doc_file)
|
|
|
|
text_parts = []
|
|
for para in doc.paragraphs:
|
|
if para.text.strip():
|
|
text_parts.append(para.text)
|
|
|
|
# 提取表格中的文本
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
row_text = []
|
|
for cell in row.cells:
|
|
if cell.text.strip():
|
|
row_text.append(cell.text.strip())
|
|
if row_text:
|
|
text_parts.append(" | ".join(row_text))
|
|
|
|
return "\n\n".join(text_parts)
|
|
except ImportError:
|
|
raise ImportError(
|
|
"DOCX processing requires python-docx. Install with: pip install python-docx",
|
|
)
|
|
except Exception as e:
|
|
raise ValueError(f"DOCX extraction failed: {e!s}")
|
|
|
|
def _extract_txt(self, content: bytes) -> str:
|
|
"""提取纯文本"""
|
|
# 尝试多种编码
|
|
encodings = ["utf-8", "gbk", "gb2312", "latin-1"]
|
|
|
|
for encoding in encodings:
|
|
try:
|
|
return content.decode(encoding)
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
# 如果都失败了,使用 latin-1 并忽略错误
|
|
return content.decode("latin-1", errors="ignore")
|
|
|
|
def _clean_text(self, text: str) -> str:
|
|
"""清理提取的文本"""
|
|
if not text:
|
|
return ""
|
|
|
|
# 移除多余的空白字符
|
|
lines = text.split("\n")
|
|
cleaned_lines = []
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
# 移除空行,但保留段落分隔
|
|
if line:
|
|
cleaned_lines.append(line)
|
|
|
|
# 合并行,保留段落结构
|
|
text = "\n\n".join(cleaned_lines)
|
|
|
|
# 移除多余的空格
|
|
text = " ".join(text.split())
|
|
|
|
# 移除控制字符
|
|
text = "".join(char for char in text if ord(char) >= 32 or char in "\n\r\t")
|
|
|
|
return text.strip()
|
|
|
|
def is_supported(self, filename: str) -> bool:
|
|
"""检查文件格式是否支持"""
|
|
ext = os.path.splitext(filename.lower())[1]
|
|
return ext in self.supported_formats
|
|
|
|
|
|
# 简单的文本提取器(不需要外部依赖)
|
|
|
|
|
|
class SimpleTextExtractor:
|
|
"""简单的文本提取器,用于测试"""
|
|
|
|
def extract(self, content: bytes, filename: str) -> str:
|
|
"""尝试提取文本"""
|
|
encodings = ["utf-8", "gbk", "latin-1"]
|
|
|
|
for encoding in encodings:
|
|
try:
|
|
return content.decode(encoding)
|
|
except UnicodeDecodeError:
|
|
continue
|
|
|
|
return content.decode("latin-1", errors="ignore")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 测试
|
|
processor = DocumentProcessor()
|
|
|
|
# 测试文本提取
|
|
test_text = "Hello World\n\nThis is a test document.\n\nMultiple paragraphs."
|
|
result = processor.process(test_text.encode("utf-8"), "test.txt")
|
|
print(f"Text extraction test: {len(result['text'])} chars")
|
|
print(result["text"][:100])
|