- 修复重复导入/字段 - 修复异常处理 - 修复PEP8格式问题 - 添加类型注解 - 清理未使用的导入 - 统一字符串格式化为f-string - 修复行长度超过120字符的问题
165 lines
5.0 KiB
Python
165 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
InsightFlow 代码自动修复脚本
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
PROJECT_DIR = Path("/root/.openclaw/workspace/projects/insightflow")
|
|
BACKEND_DIR = PROJECT_DIR / "backend"
|
|
|
|
|
|
def run_flake8():
|
|
"""运行 flake8 检查"""
|
|
result = subprocess.run(
|
|
["flake8", "--max-line-length=120", "--ignore=E501,W503", "."],
|
|
cwd=BACKEND_DIR,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return result.stdout
|
|
|
|
|
|
def fix_missing_imports():
|
|
"""修复缺失的导入"""
|
|
fixes = []
|
|
|
|
# 检查 workflow_manager.py 中的 urllib
|
|
workflow_file = BACKEND_DIR / "workflow_manager.py"
|
|
if workflow_file.exists():
|
|
content = workflow_file.read_text()
|
|
if "import urllib" not in content and "urllib" in content:
|
|
# 在文件开头添加导入
|
|
lines = content.split("\n")
|
|
import_idx = 0
|
|
for i, line in enumerate(lines):
|
|
if line.startswith("import ") or line.startswith("from "):
|
|
import_idx = i + 1
|
|
lines.insert(import_idx, "import urllib.parse")
|
|
workflow_file.write_text("\n".join(lines))
|
|
fixes.append("workflow_manager.py: 添加 urllib.parse 导入")
|
|
|
|
# 检查 plugin_manager.py 中的 urllib
|
|
plugin_file = BACKEND_DIR / "plugin_manager.py"
|
|
if plugin_file.exists():
|
|
content = plugin_file.read_text()
|
|
if "import urllib" not in content and "urllib" in content:
|
|
lines = content.split("\n")
|
|
import_idx = 0
|
|
for i, line in enumerate(lines):
|
|
if line.startswith("import ") or line.startswith("from "):
|
|
import_idx = i + 1
|
|
lines.insert(import_idx, "import urllib.parse")
|
|
plugin_file.write_text("\n".join(lines))
|
|
fixes.append("plugin_manager.py: 添加 urllib.parse 导入")
|
|
|
|
# 检查 main.py 中的 PlainTextResponse
|
|
main_file = BACKEND_DIR / "main.py"
|
|
if main_file.exists():
|
|
content = main_file.read_text()
|
|
if (
|
|
"PlainTextResponse" in content
|
|
and "from fastapi.responses import" in content
|
|
):
|
|
# 检查是否已导入
|
|
if (
|
|
"PlainTextResponse"
|
|
not in content.split("from fastapi.responses import")[1].split("\n")[0]
|
|
):
|
|
# 添加导入
|
|
content = content.replace(
|
|
"from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse",
|
|
"from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse",
|
|
)
|
|
# 实际上已经导入了,可能是误报
|
|
|
|
return fixes
|
|
|
|
|
|
def fix_unused_imports():
|
|
"""修复未使用的导入"""
|
|
fixes = []
|
|
|
|
# code_reviewer.py 中的未使用导入
|
|
code_reviewer = PROJECT_DIR / "code_reviewer.py"
|
|
if code_reviewer.exists():
|
|
content = code_reviewer.read_text()
|
|
original = content
|
|
# 移除未使用的导入
|
|
content = re.sub(r"^import os\n", "", content, flags=re.MULTILINE)
|
|
content = re.sub(r"^import subprocess\n", "", content, flags=re.MULTILINE)
|
|
content = re.sub(r"^from typing import Any\n", "", content, flags=re.MULTILINE)
|
|
if content != original:
|
|
code_reviewer.write_text(content)
|
|
fixes.append("code_reviewer.py: 移除未使用的导入")
|
|
|
|
return fixes
|
|
|
|
|
|
def fix_formatting():
|
|
"""使用 autopep8 修复格式问题"""
|
|
fixes = []
|
|
|
|
# 运行 autopep8 修复格式问题
|
|
result = subprocess.run(
|
|
["autopep8", "--in-place", "--aggressive", "--max-line-length=120", "."],
|
|
cwd=BACKEND_DIR,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
fixes.append("使用 autopep8 修复了格式问题")
|
|
|
|
return fixes
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("InsightFlow 代码自动修复")
|
|
print("=" * 60)
|
|
|
|
all_fixes = []
|
|
|
|
# 1. 修复缺失的导入
|
|
print("\n[1/3] 修复缺失的导入...")
|
|
fixes = fix_missing_imports()
|
|
all_fixes.extend(fixes)
|
|
for f in fixes:
|
|
print(f" ✓ {f}")
|
|
|
|
# 2. 修复未使用的导入
|
|
print("\n[2/3] 修复未使用的导入...")
|
|
fixes = fix_unused_imports()
|
|
all_fixes.extend(fixes)
|
|
for f in fixes:
|
|
print(f" ✓ {f}")
|
|
|
|
# 3. 修复格式问题
|
|
print("\n[3/3] 修复 PEP8 格式问题...")
|
|
fixes = fix_formatting()
|
|
all_fixes.extend(fixes)
|
|
for f in fixes:
|
|
print(f" ✓ {f}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"修复完成!共修复 {len(all_fixes)} 个问题")
|
|
print("=" * 60)
|
|
|
|
# 再次运行 flake8 检查
|
|
print("\n重新运行 flake8 检查...")
|
|
remaining = run_flake8()
|
|
if remaining:
|
|
lines = remaining.strip().split("\n")
|
|
print(f" 仍有 {len(lines)} 个问题需要手动处理")
|
|
else:
|
|
print(" ✓ 所有问题已修复!")
|
|
|
|
return all_fixes
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|