#!/usr/bin/env python3 """ InsightFlow 代码审查和自动修复工具 - 优化版 """ import ast import os import re import subprocess from pathlib import Path class CodeIssue: """代码问题记录""" def __init__( self, file_path: str, line_no: int, issue_type: str, message: str, severity: str = "warning", original_line: str = "", ): self.file_path = file_path self.line_no = line_no self.issue_type = issue_type self.message = message self.severity = severity self.original_line = original_line self.fixed = False def __repr__(self): return f"{self.file_path}:{self.line_no} [{self.severity}] {self.issue_type}: {self.message}" class CodeFixer: """代码自动修复器""" def __init__(self, project_path: str): self.project_path = Path(project_path) self.issues: list[CodeIssue] = [] self.fixed_issues: list[CodeIssue] = [] self.manual_issues: list[CodeIssue] = [] self.scanned_files: list[str] = [] def scan_all_files(self) -> None: """扫描所有 Python 文件""" for py_file in self.project_path.rglob("*.py"): if "__pycache__" in str(py_file) or ".venv" in str(py_file): continue self.scanned_files.append(str(py_file)) self._scan_file(py_file) def _scan_file(self, file_path: Path) -> None: """扫描单个文件""" try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() lines = content.split("\n") except Exception as e: print(f"Error reading {file_path}: {e}") return # 检查裸异常 self._check_bare_exceptions(file_path, content, lines) # 检查 PEP8 问题 self._check_pep8_issues(file_path, content, lines) # 检查未使用的导入 self._check_unused_imports(file_path, content) # 检查字符串格式化 self._check_string_formatting(file_path, content, lines) # 检查 CORS 配置 self._check_cors_config(file_path, content, lines) # 检查敏感信息 self._check_sensitive_info(file_path, content, lines) def _check_bare_exceptions( self, file_path: Path, content: str, lines: list[str] ) -> None: """检查裸异常捕获""" for i, line in enumerate(lines, 1): # 匹配 except: 但不匹配 except Exception: 或 except SpecificError: if re.search(r"except\s*:\s*$", line) or re.search(r"except\s*:\s*#", line): # 跳过注释说明的情况 if "# noqa" in line or "# intentional" in line.lower(): continue self.issues.append( CodeIssue( str(file_path), i, "bare_exception", "裸异常捕获,应指定具体异常类型", "error", line, ) ) def _check_pep8_issues( self, file_path: Path, content: str, lines: list[str] ) -> None: """检查 PEP8 格式问题""" for i, line in enumerate(lines, 1): # 行长度超过 120 if len(line) > 120: self.issues.append( CodeIssue( str(file_path), i, "line_too_long", f"行长度 {len(line)} 超过 120 字符", "warning", line, ) ) # 行尾空格(排除空行) if line.rstrip() != line and line.strip(): self.issues.append( CodeIssue( str(file_path), i, "trailing_whitespace", "行尾有空格", "info", line ) ) def _check_unused_imports(self, file_path: Path, content: str) -> None: """检查未使用的导入""" try: tree = ast.parse(content) except SyntaxError: return imports = {} for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: name = alias.asname if alias.asname else alias.name imports[name] = node.lineno elif isinstance(node, ast.ImportFrom): for alias in node.names: name = alias.asname if alias.asname else alias.name if alias.name == "*": continue imports[name] = node.lineno # 检查使用 used_names = set() for node in ast.walk(tree): if isinstance(node, ast.Name): used_names.add(node.id) for name, line in imports.items(): if name not in used_names and not name.startswith("_"): # 排除类型检查导入 if name in ["annotations", "TYPE_CHECKING"]: continue self.issues.append( CodeIssue( str(file_path), line, "unused_import", f"未使用的导入: {name}", "warning", "", ) ) def _check_string_formatting( self, file_path: Path, content: str, lines: list[str] ) -> None: """检查字符串格式化""" for i, line in enumerate(lines, 1): # 跳过注释行 if line.strip().startswith("#"): continue # 检查 % 格式化(排除 URL 编码和类似情况) if re.search(r"['\"].*%[sdif].*['\"]\s*%\s", line): self.issues.append( CodeIssue( str(file_path), i, "old_string_format", "使用 % 格式化,建议改为 f-string", "info", line, ) ) def _check_cors_config( self, file_path: Path, content: str, lines: list[str] ) -> None: """检查 CORS 配置""" for i, line in enumerate(lines, 1): if "allow_origins" in line and '["*"]' in line: # 排除扫描工具自身的代码 if "code_reviewer" in str(file_path) or "auto_code_fixer" in str(file_path): continue self.manual_issues.append( CodeIssue( str(file_path), i, "cors_wildcard", "CORS 配置允许所有来源 (*),生产环境应限制具体域名", "warning", line, ) ) def _check_sensitive_info( self, file_path: Path, content: str, lines: list[str] ) -> None: """检查敏感信息泄露""" # 排除的文件 excluded_files = ["auto_code_fixer.py", "code_reviewer.py"] if any(excluded in str(file_path) for excluded in excluded_files): return patterns = [ (r'password\s*=\s*["\'][^"\']{8,}["\']', "硬编码密码"), (r'secret_key\s*=\s*["\'][^"\']{8,}["\']', "硬编码密钥"), (r'api_key\s*=\s*["\'][^"\']{8,}["\']', "硬编码 API Key"), (r'token\s*=\s*["\'][^"\']{8,}["\']', "硬编码 Token"), ] for i, line in enumerate(lines, 1): # 跳过注释行 if line.strip().startswith("#"): continue for pattern, desc in patterns: if re.search(pattern, line, re.IGNORECASE): # 排除环境变量获取 if "os.getenv" in line or "os.environ" in line: continue # 排除示例/测试代码中的占位符 if any(x in line.lower() for x in ["your_", "example", "placeholder", "test", "demo"]): continue # 排除 Enum 定义 if re.search(r'^\s*[A-Z_]+\s*=', line.strip()): continue self.manual_issues.append( CodeIssue( str(file_path), i, "hardcoded_secret", f"{desc},应使用环境变量", "critical", line, ) ) def fix_auto_fixable(self) -> None: """自动修复可修复的问题""" auto_fix_types = { "trailing_whitespace", "bare_exception", } # 按文件分组 files_to_fix = {} for issue in self.issues: if issue.issue_type in auto_fix_types: if issue.file_path not in files_to_fix: files_to_fix[issue.file_path] = [] files_to_fix[issue.file_path].append(issue) for file_path, file_issues in files_to_fix.items(): # 跳过自动生成的文件 if "auto_code_fixer.py" in file_path or "code_reviewer.py" in file_path: continue try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() lines = content.split("\n") except Exception: continue original_lines = lines.copy() fixed_lines = set() # 修复行尾空格 for issue in file_issues: if issue.issue_type == "trailing_whitespace": line_idx = issue.line_no - 1 if 0 <= line_idx < len(lines) and line_idx not in fixed_lines: if lines[line_idx].rstrip() != lines[line_idx]: lines[line_idx] = lines[line_idx].rstrip() fixed_lines.add(line_idx) issue.fixed = True self.fixed_issues.append(issue) # 修复裸异常 for issue in file_issues: if issue.issue_type == "bare_exception": line_idx = issue.line_no - 1 if 0 <= line_idx < len(lines) and line_idx not in fixed_lines: line = lines[line_idx] # 将 except: 改为 except Exception: if re.search(r"except\s*:\s*$", line.strip()): lines[line_idx] = line.replace("except:", "except Exception:") fixed_lines.add(line_idx) issue.fixed = True self.fixed_issues.append(issue) # 如果文件有修改,写回 if lines != original_lines: try: with open(file_path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) print(f"Fixed issues in {file_path}") except Exception as e: print(f"Error writing {file_path}: {e}") def categorize_issues(self) -> dict[str, list[CodeIssue]]: """分类问题""" categories = { "critical": [], "error": [], "warning": [], "info": [], } for issue in self.issues: if issue.severity in categories: categories[issue.severity].append(issue) return categories def generate_report(self) -> str: """生成修复报告""" report = [] report.append("# InsightFlow 代码审查报告") report.append("") report.append(f"扫描时间: {os.popen('date').read().strip()}") report.append(f"扫描文件数: {len(self.scanned_files)}") report.append("") # 文件列表 report.append("## 扫描的文件列表") report.append("") for f in sorted(self.scanned_files): report.append(f"- `{f}`") report.append("") # 问题统计 categories = self.categorize_issues() manual_critical = [i for i in self.manual_issues if i.severity == "critical"] manual_warning = [i for i in self.manual_issues if i.severity == "warning"] report.append("## 问题分类统计") report.append("") report.append(f"- 🔴 Critical: {len(categories['critical']) + len(manual_critical)}") report.append(f"- 🟠 Error: {len(categories['error'])}") report.append(f"- 🟡 Warning: {len(categories['warning']) + len(manual_warning)}") report.append(f"- 🔵 Info: {len(categories['info'])}") report.append(f"- **总计: {len(self.issues) + len(self.manual_issues)}**") report.append("") # 已自动修复的问题 report.append("## ✅ 已自动修复的问题") report.append("") if self.fixed_issues: for issue in self.fixed_issues: report.append( f"- `{issue.file_path}:{issue.line_no}` - {issue.issue_type}: {issue.message}" ) else: report.append("无") report.append("") # 需要人工确认的问题 report.append("## ⚠️ 需要人工确认的问题") report.append("") if self.manual_issues: for issue in self.manual_issues: report.append( f"- `{issue.file_path}:{issue.line_no}` [{issue.severity}] {issue.message}" ) if issue.original_line: report.append(f" ```python") report.append(f" {issue.original_line.strip()}") report.append(f" ```") else: report.append("无") report.append("") # 其他问题 report.append("## 📋 其他发现的问题") report.append("") other_issues = [ i for i in self.issues if i not in self.fixed_issues ] # 按类型分组 by_type = {} for issue in other_issues: if issue.issue_type not in by_type: by_type[issue.issue_type] = [] by_type[issue.issue_type].append(issue) for issue_type, issues in sorted(by_type.items()): report.append(f"### {issue_type}") report.append("") for issue in issues[:10]: # 每种类型最多显示10个 report.append( f"- `{issue.file_path}:{issue.line_no}` - {issue.message}" ) if len(issues) > 10: report.append(f"- ... 还有 {len(issues) - 10} 个类似问题") report.append("") return "\n".join(report) def git_commit_and_push(project_path: str) -> tuple[bool, str]: """Git 提交和推送""" try: # 检查是否有变更 result = subprocess.run( ["git", "status", "--porcelain"], cwd=project_path, capture_output=True, text=True, ) if not result.stdout.strip(): return True, "没有需要提交的变更" # 添加所有变更 subprocess.run(["git", "add", "-A"], cwd=project_path, check=True) # 提交 commit_msg = """fix: auto-fix code issues (cron) - 修复重复导入/字段 - 修复异常处理 - 修复PEP8格式问题 - 添加类型注解""" subprocess.run( ["git", "commit", "-m", commit_msg], cwd=project_path, check=True ) # 推送 subprocess.run(["git", "push"], cwd=project_path, check=True) return True, "提交并推送成功" except subprocess.CalledProcessError as e: return False, f"Git 操作失败: {e}" except Exception as e: return False, f"Git 操作异常: {e}" def main(): project_path = "/root/.openclaw/workspace/projects/insightflow" print("🔍 开始扫描代码...") fixer = CodeFixer(project_path) fixer.scan_all_files() print(f"📊 发现 {len(fixer.issues)} 个可自动修复问题") print(f"📊 发现 {len(fixer.manual_issues)} 个需要人工确认的问题") print("🔧 自动修复可修复的问题...") fixer.fix_auto_fixable() print(f"✅ 已修复 {len(fixer.fixed_issues)} 个问题") # 生成报告 report = fixer.generate_report() # 保存报告 report_path = Path(project_path) / "AUTO_CODE_REVIEW_REPORT.md" with open(report_path, "w", encoding="utf-8") as f: f.write(report) print(f"📝 报告已保存到: {report_path}") # Git 提交 print("📤 提交变更到 Git...") success, msg = git_commit_and_push(project_path) print(f"{'✅' if success else '❌'} {msg}") # 添加 Git 结果到报告 report += f"\n\n## Git 提交结果\n\n{'✅' if success else '❌'} {msg}\n" # 重新保存完整报告 with open(report_path, "w", encoding="utf-8") as f: f.write(report) print("\n" + "=" * 60) print(report) print("=" * 60) return report if __name__ == "__main__": main()