fix: auto-fix code issues (cron)

- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
This commit is contained in:
AutoFix Bot
2026-03-02 12:14:39 +08:00
parent e23f1fec08
commit 98527c4de4
39 changed files with 8109 additions and 8147 deletions

View File

@@ -12,7 +12,7 @@ class DocumentProcessor:
"""文档处理器 - 提取 PDF/DOCX 文本"""
def __init__(self) -> None:
self.supported_formats = {
self.supported_formats = {
".pdf": self._extract_pdf,
".docx": self._extract_docx,
".doc": self._extract_docx,
@@ -31,18 +31,18 @@ class DocumentProcessor:
Returns:
{"text": "提取的文本内容", "format": "文件格式"}
"""
ext = os.path.splitext(filename.lower())[1]
ext = os.path.splitext(filename.lower())[1]
if ext not in self.supported_formats:
raise ValueError(
f"Unsupported file format: {ext}. Supported: {list(self.supported_formats.keys())}"
)
extractor = self.supported_formats[ext]
text = extractor(content)
extractor = self.supported_formats[ext]
text = extractor(content)
# 清理文本
text = self._clean_text(text)
text = self._clean_text(text)
return {"text": text, "format": ext, "filename": filename}
@@ -51,12 +51,12 @@ class DocumentProcessor:
try:
import PyPDF2
pdf_file = io.BytesIO(content)
reader = PyPDF2.PdfReader(pdf_file)
pdf_file = io.BytesIO(content)
reader = PyPDF2.PdfReader(pdf_file)
text_parts = []
text_parts = []
for page in reader.pages:
page_text = page.extract_text()
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
@@ -66,10 +66,10 @@ class DocumentProcessor:
try:
import pdfplumber
text_parts = []
text_parts = []
with pdfplumber.open(io.BytesIO(content)) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
page_text = page.extract_text()
if page_text:
text_parts.append(page_text)
return "\n\n".join(text_parts)
@@ -85,10 +85,10 @@ class DocumentProcessor:
try:
import docx
doc_file = io.BytesIO(content)
doc = docx.Document(doc_file)
doc_file = io.BytesIO(content)
doc = docx.Document(doc_file)
text_parts = []
text_parts = []
for para in doc.paragraphs:
if para.text.strip():
text_parts.append(para.text)
@@ -96,7 +96,7 @@ class DocumentProcessor:
# 提取表格中的文本
for table in doc.tables:
for row in table.rows:
row_text = []
row_text = []
for cell in row.cells:
if cell.text.strip():
row_text.append(cell.text.strip())
@@ -114,7 +114,7 @@ class DocumentProcessor:
def _extract_txt(self, content: bytes) -> str:
"""提取纯文本"""
# 尝试多种编码
encodings = ["utf-8", "gbk", "gb2312", "latin-1"]
encodings = ["utf-8", "gbk", "gb2312", "latin-1"]
for encoding in encodings:
try:
@@ -123,7 +123,7 @@ class DocumentProcessor:
continue
# 如果都失败了,使用 latin-1 并忽略错误
return content.decode("latin-1", errors = "ignore")
return content.decode("latin-1", errors="ignore")
def _clean_text(self, text: str) -> str:
"""清理提取的文本"""
@@ -131,29 +131,29 @@ class DocumentProcessor:
return ""
# 移除多余的空白字符
lines = text.split("\n")
cleaned_lines = []
lines = text.split("\n")
cleaned_lines = []
for line in lines:
line = line.strip()
line = line.strip()
# 移除空行,但保留段落分隔
if line:
cleaned_lines.append(line)
# 合并行,保留段落结构
text = "\n\n".join(cleaned_lines)
text = "\n\n".join(cleaned_lines)
# 移除多余的空格
text = " ".join(text.split())
text = " ".join(text.split())
# 移除控制字符
text = "".join(char for char in text if ord(char) >= 32 or char in "\n\r\t")
text = "".join(char for char in text if ord(char) >= 32 or char in "\n\r\t")
return text.strip()
def is_supported(self, filename: str) -> bool:
"""检查文件格式是否支持"""
ext = os.path.splitext(filename.lower())[1]
ext = os.path.splitext(filename.lower())[1]
return ext in self.supported_formats
@@ -165,7 +165,7 @@ class SimpleTextExtractor:
def extract(self, content: bytes, filename: str) -> str:
"""尝试提取文本"""
encodings = ["utf-8", "gbk", "latin-1"]
encodings = ["utf-8", "gbk", "latin-1"]
for encoding in encodings:
try:
@@ -173,15 +173,15 @@ class SimpleTextExtractor:
except UnicodeDecodeError:
continue
return content.decode("latin-1", errors = "ignore")
return content.decode("latin-1", errors="ignore")
if __name__ == "__main__":
# 测试
processor = DocumentProcessor()
processor = DocumentProcessor()
# 测试文本提取
test_text = "Hello World\n\nThis is a test document.\n\nMultiple paragraphs."
result = processor.process(test_text.encode("utf-8"), "test.txt")
test_text = "Hello World\n\nThis is a test document.\n\nMultiple paragraphs."
result = processor.process(test_text.encode("utf-8"), "test.txt")
print(f"Text extraction test: {len(result['text'])} chars")
print(result["text"][:100])