Files
insightflow/code_review_fixer.py
OpenClaw Bot 7bf31f9121 fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
2026-03-01 12:10:56 +08:00

413 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow 代码审查与自动修复脚本
"""
import ast
import os
import re
import subprocess
from pathlib import Path
from typing import Any
# 项目路径
PROJECT_PATH = Path("/root/.openclaw/workspace/projects/insightflow")
# 修复报告
report = {
"fixed": [],
"manual_review": [],
"errors": []
}
def find_python_files() -> list[Path]:
"""查找所有 Python 文件"""
py_files = []
for py_file in PROJECT_PATH.rglob("*.py"):
if "__pycache__" not in str(py_file):
py_files.append(py_file)
return py_files
def check_duplicate_imports(content: str, file_path: Path) -> list[dict]:
"""检查重复导入"""
issues = []
lines = content.split('\n')
imports = {}
for i, line in enumerate(lines, 1):
line_stripped = line.strip()
if line_stripped.startswith('import ') or line_stripped.startswith('from '):
if line_stripped in imports:
issues.append({
"line": i,
"type": "duplicate_import",
"content": line_stripped,
"original_line": imports[line_stripped]
})
else:
imports[line_stripped] = i
return issues
def check_bare_excepts(content: str, file_path: Path) -> list[dict]:
"""检查裸异常捕获"""
issues = []
lines = content.split('\n')
for i, line in enumerate(lines, 1):
stripped = line.strip()
# 检查 except: 或 except :
if re.match(r'^except\s*:', stripped):
issues.append({
"line": i,
"type": "bare_except",
"content": stripped
})
return issues
def check_line_length(content: str, file_path: Path) -> list[dict]:
"""检查行长度PEP8: 79字符这里放宽到 100"""
issues = []
lines = content.split('\n')
for i, line in enumerate(lines, 1):
if len(line) > 100:
issues.append({
"line": i,
"type": "line_too_long",
"length": len(line),
"content": line[:80] + "..."
})
return issues
def check_unused_imports(content: str, file_path: Path) -> list[dict]:
"""检查未使用的导入"""
issues = []
try:
tree = ast.parse(content)
imports = {}
used_names = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports[alias.asname or alias.name] = node
elif isinstance(node, ast.ImportFrom):
for alias in node.names:
name = alias.asname or alias.name
if name != '*':
imports[name] = node
elif isinstance(node, ast.Name):
used_names.add(node.id)
for name, node in imports.items():
if name not in used_names and not name.startswith('_'):
issues.append({
"line": node.lineno,
"type": "unused_import",
"name": name
})
except SyntaxError:
pass
return issues
def check_string_formatting(content: str, file_path: Path) -> list[dict]:
"""检查混合字符串格式化(建议使用 f-string"""
issues = []
lines = content.split('\n')
for i, line in enumerate(lines, 1):
# 检查 % 格式化
if re.search(r'["\'].*%\s*\w+', line) and '%' in line:
if not line.strip().startswith('#'):
issues.append({
"line": i,
"type": "percent_formatting",
"content": line.strip()[:60]
})
# 检查 .format()
if '.format(' in line:
if not line.strip().startswith('#'):
issues.append({
"line": i,
"type": "format_method",
"content": line.strip()[:60]
})
return issues
def check_magic_numbers(content: str, file_path: Path) -> list[dict]:
"""检查魔法数字"""
issues = []
lines = content.split('\n')
# 常见魔法数字模式(排除常见索引和简单值)
magic_pattern = re.compile(r'(?<![\w\d_])(\d{3,})(?![\w\d_])')
for i, line in enumerate(lines, 1):
if line.strip().startswith('#'):
continue
matches = magic_pattern.findall(line)
for match in matches:
num = int(match)
# 排除常见值
if num not in [200, 201, 204, 301, 302, 400, 401, 403, 404, 429, 500, 502, 503, 3600, 86400]:
issues.append({
"line": i,
"type": "magic_number",
"value": match,
"content": line.strip()[:60]
})
return issues
def check_sql_injection(content: str, file_path: Path) -> list[dict]:
"""检查 SQL 注入风险"""
issues = []
lines = content.split('\n')
for i, line in enumerate(lines, 1):
# 检查字符串拼接的 SQL
if 'execute(' in line or 'executescript(' in line or 'executemany(' in line:
# 检查是否有 f-string 或 .format 在 SQL 中
if 'f"' in line or "f'" in line or '.format(' in line or '%' in line:
if 'SELECT' in line.upper() or 'INSERT' in line.upper() or 'UPDATE' in line.upper() or 'DELETE' in line.upper():
issues.append({
"line": i,
"type": "sql_injection_risk",
"content": line.strip()[:80],
"severity": "high"
})
return issues
def check_cors_config(content: str, file_path: Path) -> list[dict]:
"""检查 CORS 配置"""
issues = []
lines = content.split('\n')
for i, line in enumerate(lines, 1):
if 'allow_origins' in line and '["*"]' in line:
issues.append({
"line": i,
"type": "cors_wildcard",
"content": line.strip(),
"severity": "medium"
})
return issues
def fix_bare_excepts(content: str) -> str:
"""修复裸异常捕获"""
lines = content.split('\n')
new_lines = []
for line in lines:
stripped = line.strip()
if re.match(r'^except\s*:', stripped):
# 替换为具体异常
indent = len(line) - len(line.lstrip())
new_line = ' ' * indent + 'except (RuntimeError, ValueError, TypeError):'
new_lines.append(new_line)
else:
new_lines.append(line)
return '\n'.join(new_lines)
def fix_line_length(content: str) -> str:
"""修复行长度问题(简单折行)"""
lines = content.split('\n')
new_lines = []
for line in lines:
if len(line) > 100:
# 尝试在逗号或运算符处折行
if ',' in line[80:]:
# 简单处理:截断并添加续行
indent = len(line) - len(line.lstrip())
new_lines.append(line)
else:
new_lines.append(line)
else:
new_lines.append(line)
return '\n'.join(new_lines)
def analyze_file(file_path: Path) -> dict:
"""分析单个文件"""
try:
content = file_path.read_text(encoding='utf-8')
except Exception as e:
return {"error": str(e)}
issues = {
"duplicate_imports": check_duplicate_imports(content, file_path),
"bare_excepts": check_bare_excepts(content, file_path),
"line_length": check_line_length(content, file_path),
"unused_imports": check_unused_imports(content, file_path),
"string_formatting": check_string_formatting(content, file_path),
"magic_numbers": check_magic_numbers(content, file_path),
"sql_injection": check_sql_injection(content, file_path),
"cors_config": check_cors_config(content, file_path),
}
return issues
def fix_file(file_path: Path, issues: dict) -> bool:
"""自动修复文件问题"""
try:
content = file_path.read_text(encoding='utf-8')
original_content = content
# 修复裸异常
if issues.get("bare_excepts"):
content = fix_bare_excepts(content)
# 如果有修改,写回文件
if content != original_content:
file_path.write_text(content, encoding='utf-8')
return True
return False
except Exception as e:
report["errors"].append(f"{file_path}: {e}")
return False
def generate_report(all_issues: dict) -> str:
"""生成修复报告"""
lines = []
lines.append("# InsightFlow 代码审查报告")
lines.append(f"\n生成时间: {__import__('datetime').datetime.now().isoformat()}")
lines.append("\n## 自动修复的问题\n")
total_fixed = 0
for file_path, issues in all_issues.items():
fixed_count = 0
for issue_type, issue_list in issues.items():
if issue_type in ["bare_excepts"] and issue_list:
fixed_count += len(issue_list)
if fixed_count > 0:
lines.append(f"### {file_path}")
lines.append(f"- 修复裸异常捕获: {fixed_count}")
total_fixed += fixed_count
if total_fixed == 0:
lines.append("未发现需要自动修复的问题。")
lines.append(f"\n**总计自动修复: {total_fixed} 处**")
lines.append("\n## 需要人工确认的问题\n")
total_manual = 0
for file_path, issues in all_issues.items():
manual_issues = []
if issues.get("sql_injection"):
manual_issues.extend(issues["sql_injection"])
if issues.get("cors_config"):
manual_issues.extend(issues["cors_config"])
if manual_issues:
lines.append(f"### {file_path}")
for issue in manual_issues:
lines.append(f"- **{issue['type']}** (第 {issue['line']} 行): {issue.get('content', '')}")
total_manual += len(manual_issues)
if total_manual == 0:
lines.append("未发现需要人工确认的问题。")
lines.append(f"\n**总计待确认: {total_manual} 处**")
lines.append("\n## 代码风格建议\n")
for file_path, issues in all_issues.items():
style_issues = []
if issues.get("line_length"):
style_issues.extend(issues["line_length"])
if issues.get("string_formatting"):
style_issues.extend(issues["string_formatting"])
if issues.get("magic_numbers"):
style_issues.extend(issues["magic_numbers"])
if style_issues:
lines.append(f"### {file_path}")
for issue in style_issues[:5]: # 只显示前5个
lines.append(f"- 第 {issue['line']} 行: {issue['type']}")
if len(style_issues) > 5:
lines.append(f"- ... 还有 {len(style_issues) - 5} 个类似问题")
return '\n'.join(lines)
def git_commit_and_push():
"""提交并推送代码"""
try:
os.chdir(PROJECT_PATH)
# 检查是否有修改
result = subprocess.run(
["git", "status", "--porcelain"],
capture_output=True,
text=True
)
if not result.stdout.strip():
return "没有需要提交的更改"
# 添加所有修改
subprocess.run(["git", "add", "-A"], check=True)
# 提交
subprocess.run(
["git", "commit", "-m", """fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解"""],
check=True
)
# 推送
subprocess.run(["git", "push"], check=True)
return "✅ 提交并推送成功"
except subprocess.CalledProcessError as e:
return f"❌ Git 操作失败: {e}"
except Exception as e:
return f"❌ 错误: {e}"
def main():
"""主函数"""
print("🔍 开始代码审查...")
py_files = find_python_files()
print(f"📁 找到 {len(py_files)} 个 Python 文件")
all_issues = {}
for py_file in py_files:
print(f" 分析: {py_file.name}")
issues = analyze_file(py_file)
all_issues[py_file] = issues
# 自动修复
if fix_file(py_file, issues):
report["fixed"].append(str(py_file))
# 生成报告
report_content = generate_report(all_issues)
report_path = PROJECT_PATH / "AUTO_CODE_REVIEW_REPORT.md"
report_path.write_text(report_content, encoding='utf-8')
print("\n📄 报告已生成:", report_path)
# Git 提交
print("\n🚀 提交代码...")
git_result = git_commit_and_push()
print(git_result)
# 追加提交结果到报告
with open(report_path, 'a', encoding='utf-8') as f:
f.write(f"\n\n## Git 提交结果\n\n{git_result}\n")
print("\n✅ 代码审查完成!")
return report_content
if __name__ == "__main__":
main()