Phase 3: Memory & Growth - Multi-file fusion, Entity alignment with embedding, Document import, Knowledge base panel
This commit is contained in:
180
backend/document_processor.py
Normal file
180
backend/document_processor.py
Normal file
@@ -0,0 +1,180 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Document Processor - Phase 3
|
||||
支持 PDF 和 DOCX 文档导入
|
||||
"""
|
||||
|
||||
import os
|
||||
import io
|
||||
from typing import Dict, Optional
|
||||
|
||||
class DocumentProcessor:
|
||||
"""文档处理器 - 提取 PDF/DOCX 文本"""
|
||||
|
||||
def __init__(self):
|
||||
self.supported_formats = {
|
||||
'.pdf': self._extract_pdf,
|
||||
'.docx': self._extract_docx,
|
||||
'.doc': self._extract_docx,
|
||||
'.txt': self._extract_txt,
|
||||
'.md': self._extract_txt,
|
||||
}
|
||||
|
||||
def process(self, content: bytes, filename: str) -> Dict[str, str]:
|
||||
"""
|
||||
处理文档并提取文本
|
||||
|
||||
Args:
|
||||
content: 文件二进制内容
|
||||
filename: 文件名
|
||||
|
||||
Returns:
|
||||
{"text": "提取的文本内容", "format": "文件格式"}
|
||||
"""
|
||||
ext = os.path.splitext(filename.lower())[1]
|
||||
|
||||
if ext not in self.supported_formats:
|
||||
raise ValueError(f"Unsupported file format: {ext}. Supported: {list(self.supported_formats.keys())}")
|
||||
|
||||
extractor = self.supported_formats[ext]
|
||||
text = extractor(content)
|
||||
|
||||
# 清理文本
|
||||
text = self._clean_text(text)
|
||||
|
||||
return {
|
||||
"text": text,
|
||||
"format": ext,
|
||||
"filename": filename
|
||||
}
|
||||
|
||||
def _extract_pdf(self, content: bytes) -> str:
|
||||
"""提取 PDF 文本"""
|
||||
try:
|
||||
import PyPDF2
|
||||
pdf_file = io.BytesIO(content)
|
||||
reader = PyPDF2.PdfReader(pdf_file)
|
||||
|
||||
text_parts = []
|
||||
for page in reader.pages:
|
||||
page_text = page.extract_text()
|
||||
if page_text:
|
||||
text_parts.append(page_text)
|
||||
|
||||
return "\n\n".join(text_parts)
|
||||
except ImportError:
|
||||
# Fallback: 尝试使用 pdfplumber
|
||||
try:
|
||||
import pdfplumber
|
||||
text_parts = []
|
||||
with pdfplumber.open(io.BytesIO(content)) as pdf:
|
||||
for page in pdf.pages:
|
||||
page_text = page.extract_text()
|
||||
if page_text:
|
||||
text_parts.append(page_text)
|
||||
return "\n\n".join(text_parts)
|
||||
except ImportError:
|
||||
raise ImportError("PDF processing requires PyPDF2 or pdfplumber. Install with: pip install PyPDF2")
|
||||
except Exception as e:
|
||||
raise ValueError(f"PDF extraction failed: {str(e)}")
|
||||
|
||||
def _extract_docx(self, content: bytes) -> str:
|
||||
"""提取 DOCX 文本"""
|
||||
try:
|
||||
import docx
|
||||
doc_file = io.BytesIO(content)
|
||||
doc = docx.Document(doc_file)
|
||||
|
||||
text_parts = []
|
||||
for para in doc.paragraphs:
|
||||
if para.text.strip():
|
||||
text_parts.append(para.text)
|
||||
|
||||
# 提取表格中的文本
|
||||
for table in doc.tables:
|
||||
for row in table.rows:
|
||||
row_text = []
|
||||
for cell in row.cells:
|
||||
if cell.text.strip():
|
||||
row_text.append(cell.text.strip())
|
||||
if row_text:
|
||||
text_parts.append(" | ".join(row_text))
|
||||
|
||||
return "\n\n".join(text_parts)
|
||||
except ImportError:
|
||||
raise ImportError("DOCX processing requires python-docx. Install with: pip install python-docx")
|
||||
except Exception as e:
|
||||
raise ValueError(f"DOCX extraction failed: {str(e)}")
|
||||
|
||||
def _extract_txt(self, content: bytes) -> str:
|
||||
"""提取纯文本"""
|
||||
# 尝试多种编码
|
||||
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1']
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
return content.decode(encoding)
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
# 如果都失败了,使用 latin-1 并忽略错误
|
||||
return content.decode('latin-1', errors='ignore')
|
||||
|
||||
def _clean_text(self, text: str) -> str:
|
||||
"""清理提取的文本"""
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
# 移除多余的空白字符
|
||||
lines = text.split('\n')
|
||||
cleaned_lines = []
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
# 移除空行,但保留段落分隔
|
||||
if line:
|
||||
cleaned_lines.append(line)
|
||||
|
||||
# 合并行,保留段落结构
|
||||
text = '\n\n'.join(cleaned_lines)
|
||||
|
||||
# 移除多余的空格
|
||||
text = ' '.join(text.split())
|
||||
|
||||
# 移除控制字符
|
||||
text = ''.join(char for char in text if ord(char) >= 32 or char in '\n\r\t')
|
||||
|
||||
return text.strip()
|
||||
|
||||
def is_supported(self, filename: str) -> bool:
|
||||
"""检查文件格式是否支持"""
|
||||
ext = os.path.splitext(filename.lower())[1]
|
||||
return ext in self.supported_formats
|
||||
|
||||
|
||||
# 简单的文本提取器(不需要外部依赖)
|
||||
class SimpleTextExtractor:
|
||||
"""简单的文本提取器,用于测试"""
|
||||
|
||||
def extract(self, content: bytes, filename: str) -> str:
|
||||
"""尝试提取文本"""
|
||||
encodings = ['utf-8', 'gbk', 'latin-1']
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
return content.decode(encoding)
|
||||
except UnicodeDecodeError:
|
||||
continue
|
||||
|
||||
return content.decode('latin-1', errors='ignore')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 测试
|
||||
processor = DocumentProcessor()
|
||||
|
||||
# 测试文本提取
|
||||
test_text = "Hello World\n\nThis is a test document.\n\nMultiple paragraphs."
|
||||
result = processor.process(test_text.encode('utf-8'), "test.txt")
|
||||
print(f"Text extraction test: {len(result['text'])} chars")
|
||||
print(result['text'][:100])
|
||||
Reference in New Issue
Block a user