From d17a58ceae3f4783c93123d6de706de52d58632a Mon Sep 17 00:00:00 2001 From: AutoFix Bot Date: Tue, 3 Mar 2026 06:05:24 +0800 Subject: [PATCH] chore: remove temporary code analyzer script --- code_analyzer.py | 672 ----------------------------------------------- 1 file changed, 672 deletions(-) delete mode 100644 code_analyzer.py diff --git a/code_analyzer.py b/code_analyzer.py deleted file mode 100644 index e39d147..0000000 --- a/code_analyzer.py +++ /dev/null @@ -1,672 +0,0 @@ -#!/usr/bin/env python3 -""" -代码审查和自动修复工具 -用于扫描和修复 Python 代码中的常见问题 -""" - -import ast -import os -import re -import subprocess -from pathlib import Path -from typing import Dict, List, Set, Tuple, Any -from dataclasses import dataclass, field - - -@dataclass -class CodeIssue: - """代码问题记录""" - file_path: str - line_no: int - issue_type: str - description: str - original_code: str = "" - fixed_code: str = "" - severity: str = "warning" # info, warning, error, critical - - -@dataclass -class FixReport: - """修复报告""" - fixed_issues: List[CodeIssue] = field(default_factory=list) - manual_review_issues: List[CodeIssue] = field(default_factory=list) - files_modified: Set[str] = field(default_factory=set) - stats: Dict[str, int] = field(default_factory=dict) - - -class CodeAnalyzer(ast.NodeVisitor): - """AST 代码分析器""" - - def __init__(self, file_path: str, source: str): - self.file_path = file_path - self.source = source - self.lines = source.split('\n') - self.issues: List[CodeIssue] = [] - self.imports: List[Tuple[int, str, str]] = [] # (line, name, alias) - self.imported_names: Set[str] = set() - self.used_names: Set[str] = set() - self.function_names: Set[str] = set() - self.class_names: Set[str] = set() - self.current_function = None - self.current_class = None - self.in_exception_handler = False - - def analyze(self) -> List[CodeIssue]: - """执行完整分析""" - try: - tree = ast.parse(self.source) - self.visit(tree) - self._check_unused_imports() - self._check_line_length() - self._check_formatting() - return self.issues - except SyntaxError as e: - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=e.lineno or 1, - issue_type="syntax_error", - description=f"语法错误: {e}", - severity="error" - )) - return self.issues - - def visit_Import(self, node): - for alias in node.names: - name = alias.asname if alias.asname else alias.name - self.imports.append((node.lineno, alias.name, name)) - self.imported_names.add(name) - self.generic_visit(node) - - def visit_ImportFrom(self, node): - module = node.module or "" - for alias in node.names: - name = alias.asname if alias.asname else alias.name - full_name = f"{module}.{alias.name}" if module else alias.name - self.imports.append((node.lineno, full_name, name)) - self.imported_names.add(name) - self.generic_visit(node) - - def visit_Name(self, node): - self.used_names.add(node.id) - self.generic_visit(node) - - def visit_FunctionDef(self, node): - self.function_names.add(node.name) - old_function = self.current_function - self.current_function = node.name - - # 检查函数是否有类型注解 - if node.returns is None and not node.name.startswith('_'): - # 检查是否是特殊方法 - if not node.name.startswith('__') or not node.name.endswith('__'): - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.lineno, - issue_type="missing_return_annotation", - description=f"函数 '{node.name}' 缺少返回类型注解", - severity="info" - )) - - for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs: - if arg.annotation is None and arg.arg != 'self' and arg.arg != 'cls': - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.lineno, - issue_type="missing_arg_annotation", - description=f"函数 '{node.name}' 的参数 '{arg.arg}' 缺少类型注解", - severity="info" - )) - - self.generic_visit(node) - self.current_function = old_function - - def visit_AsyncFunctionDef(self, node): - self.visit_FunctionDef(node) # 复用同步函数的检查 - - def visit_ClassDef(self, node): - self.class_names.add(node.name) - old_class = self.current_class - self.current_class = node.name - - # 检查重复的字段定义 - field_names = [] - for item in node.body: - if isinstance(item, ast.AnnAssign) and isinstance(item.target, ast.Name): - field_names.append((item.target.id, item.lineno)) - elif isinstance(item, ast.Assign): - for target in item.targets: - if isinstance(target, ast.Name): - field_names.append((target.id, item.lineno)) - - # 检查重复 - seen = {} - for name, line in field_names: - if name in seen: - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=line, - issue_type="duplicate_field", - description=f"类 '{node.name}' 中字段 '{name}' 重复定义 (首次定义在第 {seen[name]} 行)", - severity="warning" - )) - else: - seen[name] = line - - self.generic_visit(node) - self.current_class = old_class - - def visit_ExceptHandler(self, node): - # 检查裸异常捕获 - if node.type is None: - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.lineno, - issue_type="bare_except", - description="使用裸 except: 捕获所有异常,建议指定具体异常类型", - original_code=self.lines[node.lineno - 1] if node.lineno <= len(self.lines) else "", - severity="warning" - )) - elif isinstance(node.type, ast.Name) and node.type.id == 'Exception': - # 检查是否过于宽泛 - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.lineno, - issue_type="broad_except", - description="捕获过于宽泛的 Exception,建议指定更具体的异常类型", - severity="info" - )) - - old_in_handler = self.in_exception_handler - self.in_exception_handler = True - self.generic_visit(node) - self.in_exception_handler = old_in_handler - - def visit_Call(self, node): - # 检查字符串格式化 - if isinstance(node.func, ast.Attribute): - if node.func.attr in ('format', 'sprintf'): - self._check_string_formatting(node) - elif isinstance(node.func, ast.Name) and node.func.id == 'format': - self._check_string_formatting(node) - - # 检查魔法数字 - for arg in node.args: - if isinstance(arg, ast.Constant) and isinstance(arg.value, (int, float)): - if not self._is_common_number(arg.value): - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=arg.lineno, - issue_type="magic_number", - description=f"发现魔法数字: {arg.value},建议提取为常量", - severity="info" - )) - - self.generic_visit(node) - - def visit_BinOp(self, node): - # 检查 % 格式化 - if isinstance(node.op, ast.Mod): - if isinstance(node.left, ast.Constant) and isinstance(node.left.value, str): - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.lineno, - issue_type="old_string_formatting", - description="使用 % 字符串格式化,建议改用 f-string", - original_code=self.lines[node.lineno - 1] if node.lineno <= len(self.lines) else "", - severity="info" - )) - - # 检查魔法数字 - if isinstance(node.right, ast.Constant) and isinstance(node.right.value, (int, float)): - if not self._is_common_number(node.right.value): - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.right.lineno, - issue_type="magic_number", - description=f"发现魔法数字: {node.right.value},建议提取为常量", - severity="info" - )) - - self.generic_visit(node) - - def visit_Constant(self, node): - # 检查 SQL 注入风险 - if isinstance(node.value, str): - sql_patterns = [ - r'\bSELECT\s+.*\s+FROM\b', - r'\bINSERT\s+INTO\b', - r'\bUPDATE\s+.*\s+SET\b', - r'\bDELETE\s+FROM\b', - r'\bDROP\s+TABLE\b', - ] - upper_val = node.value.upper() - for pattern in sql_patterns: - if re.search(pattern, upper_val) and ('%' in node.value or '{' in node.value or '+' in node.value): - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.lineno, - issue_type="potential_sql_injection", - description="可能存在 SQL 注入风险,请使用参数化查询", - severity="critical" - )) - break - - self.generic_visit(node) - - def _check_string_formatting(self, node): - """检查字符串格式化方式""" - line = self.lines[node.lineno - 1] if node.lineno <= len(self.lines) else "" - if '.format(' in line or 'format(' in line: - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=node.lineno, - issue_type="old_string_formatting", - description="使用 .format() 字符串格式化,建议改用 f-string", - original_code=line, - severity="info" - )) - - def _is_common_number(self, value): - """判断是否为常见数字(不需要提取为常量)""" - common = {0, 1, 2, -1, 100, 1000, 0.5, 1.0, 24, 60, 3600} - return value in common or (isinstance(value, int) and -10 <= value <= 10) - - def _check_unused_imports(self): - """检查未使用的导入""" - for line_no, full_name, alias in self.imports: - # 排除一些常见的副作用导入 - if full_name in ('typing', 'os', 'sys', 'json', 'logging'): - continue - - # 检查是否被使用 - if alias not in self.used_names: - # 排除 __future__ 导入 - if not full_name.startswith('__future__'): - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=line_no, - issue_type="unused_import", - description=f"未使用的导入: {alias}", - severity="warning" - )) - - def _check_line_length(self): - """检查行长度""" - for i, line in enumerate(self.lines, 1): - if len(line) > 88: - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=i, - issue_type="line_too_long", - description=f"行长度 {len(line)} 超过 88 字符限制", - original_code=line, - severity="warning" - )) - - def _check_formatting(self): - """检查 PEP8 格式问题""" - prev_line = "" - for i, line in enumerate(self.lines, 1): - # 检查行尾空格 - if line.rstrip() != line: - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=i, - issue_type="trailing_whitespace", - description="行尾有空格", - original_code=line, - severity="info" - )) - - # 检查缩进(应该使用 4 个空格) - stripped = line.lstrip() - if stripped and line != stripped: - indent = len(line) - len(stripped) - if indent % 4 != 0: - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=i, - issue_type="indentation", - description=f"缩进不是 4 的倍数 ({indent} 空格)", - severity="warning" - )) - - # 检查空行 - if prev_line.strip() == "" and line.strip() == "": - # 检查是否是类或函数定义之间(允许最多 2 个空行) - pass # 简化处理 - - prev_line = line - - -class CodeFixer: - """代码修复器""" - - def __init__(self, file_path: str, source: str, issues: List[CodeIssue]): - self.file_path = file_path - self.source = source - self.lines = source.split('\n') - self.issues = issues - self.modified = False - self.fixes_applied: List[CodeIssue] = [] - - def fix(self) -> Tuple[str, List[CodeIssue]]: - """执行自动修复""" - # 按行号倒序处理,避免行号变化影响 - sorted_issues = sorted(self.issues, key=lambda x: x.line_no, reverse=True) - - for issue in sorted_issues: - fix_result = self._fix_issue(issue) - if fix_result: - self.fixes_applied.append(issue) - self.modified = True - - return '\n'.join(self.lines), self.fixes_applied - - def _fix_issue(self, issue: CodeIssue) -> bool: - """修复单个问题,返回是否成功""" - line_idx = issue.line_no - 1 - if line_idx < 0 or line_idx >= len(self.lines): - return False - - line = self.lines[line_idx] - - if issue.issue_type == "trailing_whitespace": - self.lines[line_idx] = line.rstrip() - issue.fixed_code = self.lines[line_idx] - return True - - elif issue.issue_type == "bare_except": - # 将裸 except 改为 except Exception - new_line = re.sub(r'\bexcept\s*:', 'except Exception:', line) - if new_line != line: - self.lines[line_idx] = new_line - issue.fixed_code = new_line - return True - - elif issue.issue_type == "old_string_formatting": - # 尝试转换为 f-string(简化处理) - # 注意:复杂情况需要更智能的处理 - pass - - return False - - -class SecurityChecker: - """安全检查器 - 识别需要人工确认的问题""" - - CRITICAL_PATTERNS = [ - # SQL 注入 - (r'execute\s*\(\s*["\'].*%s', 'sql_injection', '可能存在 SQL 注入风险'), - (r'execute\s*\(\s*f["\']', 'sql_injection_fstring', '在 SQL 中使用 f-string 可能导致注入'), - (r'\.raw\s*\(\s*["\']', 'sql_raw', '使用原始 SQL 查询'), - - # CORS 配置 - (r'CORS\s*\(\s*.*origins\s*=\s*["\']\*', 'cors_wildcard', 'CORS 配置允许所有来源 (*)'), - (r'allow_origins\s*=\s*\[?\s*["\']\*', 'cors_wildcard', 'CORS 配置允许所有来源 (*)'), - - # 敏感信息 - (r'password\s*=\s*["\'][^"\']+["\']', 'hardcoded_password', '硬编码密码'), - (r'secret\s*=\s*["\'][^"\']+["\']', 'hardcoded_secret', '硬编码密钥'), - (r'api_key\s*=\s*["\'][^"\']+["\']', 'hardcoded_api_key', '硬编码 API 密钥'), - (r'token\s*=\s*["\'][^"\']+["\']', 'hardcoded_token', '硬编码 Token'), - (r'AK\w{16,}', 'aliyun_key', '可能的阿里云 AccessKey'), - (r'SK\w{16,}', 'aliyun_secret', '可能的阿里云 Secret'), - - # 不安全的操作 - (r'eval\s*\(', 'dangerous_eval', '使用 eval() 存在安全风险'), - (r'exec\s*\(', 'dangerous_exec', '使用 exec() 存在安全风险'), - (r'__import__\s*\(', 'dangerous_import', '使用 __import__() 存在安全风险'), - (r'subprocess\.call.*shell\s*=\s*True', 'shell_injection', '使用 shell=True 可能导致命令注入'), - (r'os\.system\s*\(', 'os_system', '使用 os.system() 存在安全风险'), - - # 调试代码 - (r'pdb\.set_trace\s*\(', 'debugger', '包含调试代码 pdb.set_trace()'), - (r'breakpoint\s*\(\s*\)', 'debugger', '包含调试代码 breakpoint()'), - (r'print\s*\([^)]*password', 'debug_print', '可能打印敏感信息'), - (r'print\s*\([^)]*secret', 'debug_print', '可能打印敏感信息'), - - # 不安全的反序列化 - (r'pickle\.loads?\s*\(', 'unsafe_pickle', '使用 pickle 反序列化不可信数据存在风险'), - (r'yaml\.load\s*\([^)]*\)(?!.*Loader)', 'unsafe_yaml', '使用 yaml.load() 未指定 Loader'), - ] - - def __init__(self, file_path: str, source: str): - self.file_path = file_path - self.source = source - self.lines = source.split('\n') - self.issues: List[CodeIssue] = [] - - def check(self) -> List[CodeIssue]: - """执行安全检查""" - for i, line in enumerate(self.lines, 1): - for pattern, issue_type, description in self.CRITICAL_PATTERNS: - if re.search(pattern, line, re.IGNORECASE): - self.issues.append(CodeIssue( - file_path=self.file_path, - line_no=i, - issue_type=issue_type, - description=description, - original_code=line.strip(), - severity="critical" - )) - - return self.issues - - -def scan_and_fix_project(project_path: str) -> FixReport: - """扫描并修复整个项目""" - report = FixReport() - project_path = Path(project_path) - - # 统计 - stats = { - "files_scanned": 0, - "files_modified": 0, - "issues_found": 0, - "issues_fixed": 0, - "critical_issues": 0, - } - - # 查找所有 Python 文件 - python_files = list(project_path.rglob("*.py")) - - for py_file in python_files: - # 跳过虚拟环境等目录 - skip = False - for part in py_file.parts: - if part.startswith('.') and part not in ('.', './'): - if part not in ('.openclaw',): - skip = True - break - if part in ('venv', 'env', '__pycache__', 'node_modules'): - skip = True - break - if skip: - continue - - stats["files_scanned"] += 1 - - try: - source = py_file.read_text(encoding='utf-8') - except Exception as e: - print(f"无法读取文件 {py_file}: {e}") - continue - - # 分析代码 - analyzer = CodeAnalyzer(str(py_file), source) - issues = analyzer.analyze() - - # 安全检查 - security_checker = SecurityChecker(str(py_file), source) - security_issues = security_checker.check() - - # 分类问题 - auto_fixable = [] - for issue in issues: - if issue.issue_type in ('trailing_whitespace', 'bare_except'): - auto_fixable.append(issue) - elif issue.severity == 'critical': - report.manual_review_issues.append(issue) - else: - # 其他问题也尝试修复 - auto_fixable.append(issue) - - stats["issues_found"] += len(issues) + len(security_issues) - stats["critical_issues"] += len([i for i in security_issues if i.severity == 'critical']) - - # 执行自动修复 - if auto_fixable: - fixer = CodeFixer(str(py_file), source, auto_fixable) - new_source, fixes = fixer.fix() - - if fixer.modified: - py_file.write_text(new_source, encoding='utf-8') - report.files_modified.add(str(py_file)) - report.fixed_issues.extend(fixes) - stats["issues_fixed"] += len(fixes) - - # 添加需要人工审核的问题 - report.manual_review_issues.extend(security_issues) - - report.stats = stats - return report - - -def generate_report(report: FixReport) -> str: - """生成修复报告""" - lines = [] - lines.append("# 代码审查修复报告") - lines.append("") - lines.append("## 统计信息") - lines.append("") - for key, value in report.stats.items(): - lines.append(f"- {key}: {value}") - lines.append("") - - lines.append("## 已修复的问题") - lines.append("") - if report.fixed_issues: - # 按类型分组 - by_type: Dict[str, List[CodeIssue]] = {} - for issue in report.fixed_issues: - by_type.setdefault(issue.issue_type, []).append(issue) - - for issue_type, issues in sorted(by_type.items()): - lines.append(f"### {issue_type} ({len(issues)} 个)") - for issue in issues[:10]: # 限制显示数量 - lines.append(f"- `{issue.file_path}:{issue.line_no}` - {issue.description}") - if len(issues) > 10: - lines.append(f"- ... 还有 {len(issues) - 10} 个") - lines.append("") - else: - lines.append("未发现可自动修复的问题。") - lines.append("") - - lines.append("## 修改的文件") - lines.append("") - if report.files_modified: - for f in sorted(report.files_modified): - lines.append(f"- `{f}`") - else: - lines.append("无文件修改。") - lines.append("") - - lines.append("## 需要人工确认的问题") - lines.append("") - if report.manual_review_issues: - # 按严重程度分组 - critical = [i for i in report.manual_review_issues if i.severity == 'critical'] - warnings = [i for i in report.manual_review_issues if i.severity != 'critical'] - - if critical: - lines.append("### 🔴 严重问题") - lines.append("") - for issue in critical: - lines.append(f"- `{issue.file_path}:{issue.line_no}` **{issue.issue_type}**: {issue.description}") - if issue.original_code: - lines.append(f" ```python") - lines.append(f" {issue.original_code}") - lines.append(f" ```") - lines.append("") - - if warnings: - lines.append("### 🟡 警告") - lines.append("") - for issue in warnings[:20]: - lines.append(f"- `{issue.file_path}:{issue.line_no}` **{issue.issue_type}**: {issue.description}") - if len(warnings) > 20: - lines.append(f"- ... 还有 {len(warnings) - 20} 个") - lines.append("") - else: - lines.append("未发现需要人工确认的问题。") - lines.append("") - - lines.append("## 建议") - lines.append("") - lines.append("1. 请仔细审查所有标记为 '严重' 的问题") - lines.append("2. 考虑为关键函数添加类型注解") - lines.append("3. 检查是否有硬编码的敏感信息需要移除") - lines.append("4. 验证 CORS 配置是否符合安全要求") - lines.append("") - - return '\n'.join(lines) - - -def git_commit_push(project_path: str, commit_message: str) -> Tuple[bool, str]: - """执行 git add, commit, push""" - try: - os.chdir(project_path) - - # git add - result = subprocess.run(['git', 'add', '-A'], capture_output=True, text=True) - if result.returncode != 0: - return False, f"git add 失败: {result.stderr}" - - # git commit - result = subprocess.run(['git', 'commit', '-m', commit_message], capture_output=True, text=True) - if result.returncode != 0: - if "nothing to commit" in result.stdout or "nothing to commit" in result.stderr: - return True, "没有需要提交的更改" - return False, f"git commit 失败: {result.stderr}" - - # git push - result = subprocess.run(['git', 'push'], capture_output=True, text=True) - if result.returncode != 0: - return False, f"git push 失败: {result.stderr}" - - return True, "成功提交并推送" - except Exception as e: - return False, f"Git 操作失败: {e}" - - -def main(): - project_path = "/root/.openclaw/workspace/projects/insightflow" - - print("开始扫描项目...") - report = scan_and_fix_project(project_path) - - print(f"扫描完成: {report.stats['files_scanned']} 个文件") - print(f"发现问题: {report.stats['issues_found']} 个") - print(f"自动修复: {len(report.fixed_issues)} 个") - print(f"需要人工确认: {len(report.manual_review_issues)} 个") - - # 生成报告 - report_content = generate_report(report) - report_path = Path(project_path) / "code_fix_report.md" - report_path.write_text(report_content, encoding='utf-8') - print(f"报告已保存: {report_path}") - - # Git 操作 - if report.files_modified: - print("执行 git 提交...") - success, message = git_commit_push(project_path, "fix: auto-fix code issues (cron)") - print(f"Git 操作: {message}") - else: - print("没有文件修改,跳过 git 提交") - - return report, report_content - - -if __name__ == "__main__": - main()