fix: auto-fix code issues (cron)

This commit is contained in:
AutoFix Bot
2026-03-03 06:05:06 +08:00
parent 9fd1da8fb7
commit ebfaf9c594
3 changed files with 925 additions and 124 deletions

672
code_analyzer.py Normal file
View File

@@ -0,0 +1,672 @@
#!/usr/bin/env python3
"""
代码审查和自动修复工具
用于扫描和修复 Python 代码中的常见问题
"""
import ast
import os
import re
import subprocess
from pathlib import Path
from typing import Dict, List, Set, Tuple, Any
from dataclasses import dataclass, field
@dataclass
class CodeIssue:
"""代码问题记录"""
file_path: str
line_no: int
issue_type: str
description: str
original_code: str = ""
fixed_code: str = ""
severity: str = "warning" # info, warning, error, critical
@dataclass
class FixReport:
"""修复报告"""
fixed_issues: List[CodeIssue] = field(default_factory=list)
manual_review_issues: List[CodeIssue] = field(default_factory=list)
files_modified: Set[str] = field(default_factory=set)
stats: Dict[str, int] = field(default_factory=dict)
class CodeAnalyzer(ast.NodeVisitor):
"""AST 代码分析器"""
def __init__(self, file_path: str, source: str):
self.file_path = file_path
self.source = source
self.lines = source.split('\n')
self.issues: List[CodeIssue] = []
self.imports: List[Tuple[int, str, str]] = [] # (line, name, alias)
self.imported_names: Set[str] = set()
self.used_names: Set[str] = set()
self.function_names: Set[str] = set()
self.class_names: Set[str] = set()
self.current_function = None
self.current_class = None
self.in_exception_handler = False
def analyze(self) -> List[CodeIssue]:
"""执行完整分析"""
try:
tree = ast.parse(self.source)
self.visit(tree)
self._check_unused_imports()
self._check_line_length()
self._check_formatting()
return self.issues
except SyntaxError as e:
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=e.lineno or 1,
issue_type="syntax_error",
description=f"语法错误: {e}",
severity="error"
))
return self.issues
def visit_Import(self, node):
for alias in node.names:
name = alias.asname if alias.asname else alias.name
self.imports.append((node.lineno, alias.name, name))
self.imported_names.add(name)
self.generic_visit(node)
def visit_ImportFrom(self, node):
module = node.module or ""
for alias in node.names:
name = alias.asname if alias.asname else alias.name
full_name = f"{module}.{alias.name}" if module else alias.name
self.imports.append((node.lineno, full_name, name))
self.imported_names.add(name)
self.generic_visit(node)
def visit_Name(self, node):
self.used_names.add(node.id)
self.generic_visit(node)
def visit_FunctionDef(self, node):
self.function_names.add(node.name)
old_function = self.current_function
self.current_function = node.name
# 检查函数是否有类型注解
if node.returns is None and not node.name.startswith('_'):
# 检查是否是特殊方法
if not node.name.startswith('__') or not node.name.endswith('__'):
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.lineno,
issue_type="missing_return_annotation",
description=f"函数 '{node.name}' 缺少返回类型注解",
severity="info"
))
for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs:
if arg.annotation is None and arg.arg != 'self' and arg.arg != 'cls':
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.lineno,
issue_type="missing_arg_annotation",
description=f"函数 '{node.name}' 的参数 '{arg.arg}' 缺少类型注解",
severity="info"
))
self.generic_visit(node)
self.current_function = old_function
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node) # 复用同步函数的检查
def visit_ClassDef(self, node):
self.class_names.add(node.name)
old_class = self.current_class
self.current_class = node.name
# 检查重复的字段定义
field_names = []
for item in node.body:
if isinstance(item, ast.AnnAssign) and isinstance(item.target, ast.Name):
field_names.append((item.target.id, item.lineno))
elif isinstance(item, ast.Assign):
for target in item.targets:
if isinstance(target, ast.Name):
field_names.append((target.id, item.lineno))
# 检查重复
seen = {}
for name, line in field_names:
if name in seen:
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=line,
issue_type="duplicate_field",
description=f"'{node.name}' 中字段 '{name}' 重复定义 (首次定义在第 {seen[name]} 行)",
severity="warning"
))
else:
seen[name] = line
self.generic_visit(node)
self.current_class = old_class
def visit_ExceptHandler(self, node):
# 检查裸异常捕获
if node.type is None:
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.lineno,
issue_type="bare_except",
description="使用裸 except: 捕获所有异常,建议指定具体异常类型",
original_code=self.lines[node.lineno - 1] if node.lineno <= len(self.lines) else "",
severity="warning"
))
elif isinstance(node.type, ast.Name) and node.type.id == 'Exception':
# 检查是否过于宽泛
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.lineno,
issue_type="broad_except",
description="捕获过于宽泛的 Exception建议指定更具体的异常类型",
severity="info"
))
old_in_handler = self.in_exception_handler
self.in_exception_handler = True
self.generic_visit(node)
self.in_exception_handler = old_in_handler
def visit_Call(self, node):
# 检查字符串格式化
if isinstance(node.func, ast.Attribute):
if node.func.attr in ('format', 'sprintf'):
self._check_string_formatting(node)
elif isinstance(node.func, ast.Name) and node.func.id == 'format':
self._check_string_formatting(node)
# 检查魔法数字
for arg in node.args:
if isinstance(arg, ast.Constant) and isinstance(arg.value, (int, float)):
if not self._is_common_number(arg.value):
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=arg.lineno,
issue_type="magic_number",
description=f"发现魔法数字: {arg.value},建议提取为常量",
severity="info"
))
self.generic_visit(node)
def visit_BinOp(self, node):
# 检查 % 格式化
if isinstance(node.op, ast.Mod):
if isinstance(node.left, ast.Constant) and isinstance(node.left.value, str):
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.lineno,
issue_type="old_string_formatting",
description="使用 % 字符串格式化,建议改用 f-string",
original_code=self.lines[node.lineno - 1] if node.lineno <= len(self.lines) else "",
severity="info"
))
# 检查魔法数字
if isinstance(node.right, ast.Constant) and isinstance(node.right.value, (int, float)):
if not self._is_common_number(node.right.value):
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.right.lineno,
issue_type="magic_number",
description=f"发现魔法数字: {node.right.value},建议提取为常量",
severity="info"
))
self.generic_visit(node)
def visit_Constant(self, node):
# 检查 SQL 注入风险
if isinstance(node.value, str):
sql_patterns = [
r'\bSELECT\s+.*\s+FROM\b',
r'\bINSERT\s+INTO\b',
r'\bUPDATE\s+.*\s+SET\b',
r'\bDELETE\s+FROM\b',
r'\bDROP\s+TABLE\b',
]
upper_val = node.value.upper()
for pattern in sql_patterns:
if re.search(pattern, upper_val) and ('%' in node.value or '{' in node.value or '+' in node.value):
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.lineno,
issue_type="potential_sql_injection",
description="可能存在 SQL 注入风险,请使用参数化查询",
severity="critical"
))
break
self.generic_visit(node)
def _check_string_formatting(self, node):
"""检查字符串格式化方式"""
line = self.lines[node.lineno - 1] if node.lineno <= len(self.lines) else ""
if '.format(' in line or 'format(' in line:
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=node.lineno,
issue_type="old_string_formatting",
description="使用 .format() 字符串格式化,建议改用 f-string",
original_code=line,
severity="info"
))
def _is_common_number(self, value):
"""判断是否为常见数字(不需要提取为常量)"""
common = {0, 1, 2, -1, 100, 1000, 0.5, 1.0, 24, 60, 3600}
return value in common or (isinstance(value, int) and -10 <= value <= 10)
def _check_unused_imports(self):
"""检查未使用的导入"""
for line_no, full_name, alias in self.imports:
# 排除一些常见的副作用导入
if full_name in ('typing', 'os', 'sys', 'json', 'logging'):
continue
# 检查是否被使用
if alias not in self.used_names:
# 排除 __future__ 导入
if not full_name.startswith('__future__'):
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=line_no,
issue_type="unused_import",
description=f"未使用的导入: {alias}",
severity="warning"
))
def _check_line_length(self):
"""检查行长度"""
for i, line in enumerate(self.lines, 1):
if len(line) > 88:
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=i,
issue_type="line_too_long",
description=f"行长度 {len(line)} 超过 88 字符限制",
original_code=line,
severity="warning"
))
def _check_formatting(self):
"""检查 PEP8 格式问题"""
prev_line = ""
for i, line in enumerate(self.lines, 1):
# 检查行尾空格
if line.rstrip() != line:
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=i,
issue_type="trailing_whitespace",
description="行尾有空格",
original_code=line,
severity="info"
))
# 检查缩进(应该使用 4 个空格)
stripped = line.lstrip()
if stripped and line != stripped:
indent = len(line) - len(stripped)
if indent % 4 != 0:
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=i,
issue_type="indentation",
description=f"缩进不是 4 的倍数 ({indent} 空格)",
severity="warning"
))
# 检查空行
if prev_line.strip() == "" and line.strip() == "":
# 检查是否是类或函数定义之间(允许最多 2 个空行)
pass # 简化处理
prev_line = line
class CodeFixer:
"""代码修复器"""
def __init__(self, file_path: str, source: str, issues: List[CodeIssue]):
self.file_path = file_path
self.source = source
self.lines = source.split('\n')
self.issues = issues
self.modified = False
self.fixes_applied: List[CodeIssue] = []
def fix(self) -> Tuple[str, List[CodeIssue]]:
"""执行自动修复"""
# 按行号倒序处理,避免行号变化影响
sorted_issues = sorted(self.issues, key=lambda x: x.line_no, reverse=True)
for issue in sorted_issues:
fix_result = self._fix_issue(issue)
if fix_result:
self.fixes_applied.append(issue)
self.modified = True
return '\n'.join(self.lines), self.fixes_applied
def _fix_issue(self, issue: CodeIssue) -> bool:
"""修复单个问题,返回是否成功"""
line_idx = issue.line_no - 1
if line_idx < 0 or line_idx >= len(self.lines):
return False
line = self.lines[line_idx]
if issue.issue_type == "trailing_whitespace":
self.lines[line_idx] = line.rstrip()
issue.fixed_code = self.lines[line_idx]
return True
elif issue.issue_type == "bare_except":
# 将裸 except 改为 except Exception
new_line = re.sub(r'\bexcept\s*:', 'except Exception:', line)
if new_line != line:
self.lines[line_idx] = new_line
issue.fixed_code = new_line
return True
elif issue.issue_type == "old_string_formatting":
# 尝试转换为 f-string简化处理
# 注意:复杂情况需要更智能的处理
pass
return False
class SecurityChecker:
"""安全检查器 - 识别需要人工确认的问题"""
CRITICAL_PATTERNS = [
# SQL 注入
(r'execute\s*\(\s*["\'].*%s', 'sql_injection', '可能存在 SQL 注入风险'),
(r'execute\s*\(\s*f["\']', 'sql_injection_fstring', '在 SQL 中使用 f-string 可能导致注入'),
(r'\.raw\s*\(\s*["\']', 'sql_raw', '使用原始 SQL 查询'),
# CORS 配置
(r'CORS\s*\(\s*.*origins\s*=\s*["\']\*', 'cors_wildcard', 'CORS 配置允许所有来源 (*)'),
(r'allow_origins\s*=\s*\[?\s*["\']\*', 'cors_wildcard', 'CORS 配置允许所有来源 (*)'),
# 敏感信息
(r'password\s*=\s*["\'][^"\']+["\']', 'hardcoded_password', '硬编码密码'),
(r'secret\s*=\s*["\'][^"\']+["\']', 'hardcoded_secret', '硬编码密钥'),
(r'api_key\s*=\s*["\'][^"\']+["\']', 'hardcoded_api_key', '硬编码 API 密钥'),
(r'token\s*=\s*["\'][^"\']+["\']', 'hardcoded_token', '硬编码 Token'),
(r'AK\w{16,}', 'aliyun_key', '可能的阿里云 AccessKey'),
(r'SK\w{16,}', 'aliyun_secret', '可能的阿里云 Secret'),
# 不安全的操作
(r'eval\s*\(', 'dangerous_eval', '使用 eval() 存在安全风险'),
(r'exec\s*\(', 'dangerous_exec', '使用 exec() 存在安全风险'),
(r'__import__\s*\(', 'dangerous_import', '使用 __import__() 存在安全风险'),
(r'subprocess\.call.*shell\s*=\s*True', 'shell_injection', '使用 shell=True 可能导致命令注入'),
(r'os\.system\s*\(', 'os_system', '使用 os.system() 存在安全风险'),
# 调试代码
(r'pdb\.set_trace\s*\(', 'debugger', '包含调试代码 pdb.set_trace()'),
(r'breakpoint\s*\(\s*\)', 'debugger', '包含调试代码 breakpoint()'),
(r'print\s*\([^)]*password', 'debug_print', '可能打印敏感信息'),
(r'print\s*\([^)]*secret', 'debug_print', '可能打印敏感信息'),
# 不安全的反序列化
(r'pickle\.loads?\s*\(', 'unsafe_pickle', '使用 pickle 反序列化不可信数据存在风险'),
(r'yaml\.load\s*\([^)]*\)(?!.*Loader)', 'unsafe_yaml', '使用 yaml.load() 未指定 Loader'),
]
def __init__(self, file_path: str, source: str):
self.file_path = file_path
self.source = source
self.lines = source.split('\n')
self.issues: List[CodeIssue] = []
def check(self) -> List[CodeIssue]:
"""执行安全检查"""
for i, line in enumerate(self.lines, 1):
for pattern, issue_type, description in self.CRITICAL_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
self.issues.append(CodeIssue(
file_path=self.file_path,
line_no=i,
issue_type=issue_type,
description=description,
original_code=line.strip(),
severity="critical"
))
return self.issues
def scan_and_fix_project(project_path: str) -> FixReport:
"""扫描并修复整个项目"""
report = FixReport()
project_path = Path(project_path)
# 统计
stats = {
"files_scanned": 0,
"files_modified": 0,
"issues_found": 0,
"issues_fixed": 0,
"critical_issues": 0,
}
# 查找所有 Python 文件
python_files = list(project_path.rglob("*.py"))
for py_file in python_files:
# 跳过虚拟环境等目录
skip = False
for part in py_file.parts:
if part.startswith('.') and part not in ('.', './'):
if part not in ('.openclaw',):
skip = True
break
if part in ('venv', 'env', '__pycache__', 'node_modules'):
skip = True
break
if skip:
continue
stats["files_scanned"] += 1
try:
source = py_file.read_text(encoding='utf-8')
except Exception as e:
print(f"无法读取文件 {py_file}: {e}")
continue
# 分析代码
analyzer = CodeAnalyzer(str(py_file), source)
issues = analyzer.analyze()
# 安全检查
security_checker = SecurityChecker(str(py_file), source)
security_issues = security_checker.check()
# 分类问题
auto_fixable = []
for issue in issues:
if issue.issue_type in ('trailing_whitespace', 'bare_except'):
auto_fixable.append(issue)
elif issue.severity == 'critical':
report.manual_review_issues.append(issue)
else:
# 其他问题也尝试修复
auto_fixable.append(issue)
stats["issues_found"] += len(issues) + len(security_issues)
stats["critical_issues"] += len([i for i in security_issues if i.severity == 'critical'])
# 执行自动修复
if auto_fixable:
fixer = CodeFixer(str(py_file), source, auto_fixable)
new_source, fixes = fixer.fix()
if fixer.modified:
py_file.write_text(new_source, encoding='utf-8')
report.files_modified.add(str(py_file))
report.fixed_issues.extend(fixes)
stats["issues_fixed"] += len(fixes)
# 添加需要人工审核的问题
report.manual_review_issues.extend(security_issues)
report.stats = stats
return report
def generate_report(report: FixReport) -> str:
"""生成修复报告"""
lines = []
lines.append("# 代码审查修复报告")
lines.append("")
lines.append("## 统计信息")
lines.append("")
for key, value in report.stats.items():
lines.append(f"- {key}: {value}")
lines.append("")
lines.append("## 已修复的问题")
lines.append("")
if report.fixed_issues:
# 按类型分组
by_type: Dict[str, List[CodeIssue]] = {}
for issue in report.fixed_issues:
by_type.setdefault(issue.issue_type, []).append(issue)
for issue_type, issues in sorted(by_type.items()):
lines.append(f"### {issue_type} ({len(issues)} 个)")
for issue in issues[:10]: # 限制显示数量
lines.append(f"- `{issue.file_path}:{issue.line_no}` - {issue.description}")
if len(issues) > 10:
lines.append(f"- ... 还有 {len(issues) - 10}")
lines.append("")
else:
lines.append("未发现可自动修复的问题。")
lines.append("")
lines.append("## 修改的文件")
lines.append("")
if report.files_modified:
for f in sorted(report.files_modified):
lines.append(f"- `{f}`")
else:
lines.append("无文件修改。")
lines.append("")
lines.append("## 需要人工确认的问题")
lines.append("")
if report.manual_review_issues:
# 按严重程度分组
critical = [i for i in report.manual_review_issues if i.severity == 'critical']
warnings = [i for i in report.manual_review_issues if i.severity != 'critical']
if critical:
lines.append("### 🔴 严重问题")
lines.append("")
for issue in critical:
lines.append(f"- `{issue.file_path}:{issue.line_no}` **{issue.issue_type}**: {issue.description}")
if issue.original_code:
lines.append(f" ```python")
lines.append(f" {issue.original_code}")
lines.append(f" ```")
lines.append("")
if warnings:
lines.append("### 🟡 警告")
lines.append("")
for issue in warnings[:20]:
lines.append(f"- `{issue.file_path}:{issue.line_no}` **{issue.issue_type}**: {issue.description}")
if len(warnings) > 20:
lines.append(f"- ... 还有 {len(warnings) - 20}")
lines.append("")
else:
lines.append("未发现需要人工确认的问题。")
lines.append("")
lines.append("## 建议")
lines.append("")
lines.append("1. 请仔细审查所有标记为 '严重' 的问题")
lines.append("2. 考虑为关键函数添加类型注解")
lines.append("3. 检查是否有硬编码的敏感信息需要移除")
lines.append("4. 验证 CORS 配置是否符合安全要求")
lines.append("")
return '\n'.join(lines)
def git_commit_push(project_path: str, commit_message: str) -> Tuple[bool, str]:
"""执行 git add, commit, push"""
try:
os.chdir(project_path)
# git add
result = subprocess.run(['git', 'add', '-A'], capture_output=True, text=True)
if result.returncode != 0:
return False, f"git add 失败: {result.stderr}"
# git commit
result = subprocess.run(['git', 'commit', '-m', commit_message], capture_output=True, text=True)
if result.returncode != 0:
if "nothing to commit" in result.stdout or "nothing to commit" in result.stderr:
return True, "没有需要提交的更改"
return False, f"git commit 失败: {result.stderr}"
# git push
result = subprocess.run(['git', 'push'], capture_output=True, text=True)
if result.returncode != 0:
return False, f"git push 失败: {result.stderr}"
return True, "成功提交并推送"
except Exception as e:
return False, f"Git 操作失败: {e}"
def main():
project_path = "/root/.openclaw/workspace/projects/insightflow"
print("开始扫描项目...")
report = scan_and_fix_project(project_path)
print(f"扫描完成: {report.stats['files_scanned']} 个文件")
print(f"发现问题: {report.stats['issues_found']}")
print(f"自动修复: {len(report.fixed_issues)}")
print(f"需要人工确认: {len(report.manual_review_issues)}")
# 生成报告
report_content = generate_report(report)
report_path = Path(project_path) / "code_fix_report.md"
report_path.write_text(report_content, encoding='utf-8')
print(f"报告已保存: {report_path}")
# Git 操作
if report.files_modified:
print("执行 git 提交...")
success, message = git_commit_push(project_path, "fix: auto-fix code issues (cron)")
print(f"Git 操作: {message}")
else:
print("没有文件修改,跳过 git 提交")
return report, report_content
if __name__ == "__main__":
main()