#!/usr/bin/env python3 """ InsightFlow 代码审查与自动修复脚本 """ import ast import os import re import subprocess from pathlib import Path from typing import Any # 项目路径 PROJECT_PATH = Path("/root/.openclaw/workspace/projects/insightflow") # 修复报告 report = { "fixed": [], "manual_review": [], "errors": [] } def find_python_files() -> list[Path]: """查找所有 Python 文件""" py_files = [] for py_file in PROJECT_PATH.rglob("*.py"): if "__pycache__" not in str(py_file): py_files.append(py_file) return py_files def check_duplicate_imports(content: str, file_path: Path) -> list[dict]: """检查重复导入""" issues = [] lines = content.split('\n') imports = {} for i, line in enumerate(lines, 1): line_stripped = line.strip() if line_stripped.startswith('import ') or line_stripped.startswith('from '): if line_stripped in imports: issues.append({ "line": i, "type": "duplicate_import", "content": line_stripped, "original_line": imports[line_stripped] }) else: imports[line_stripped] = i return issues def check_bare_excepts(content: str, file_path: Path) -> list[dict]: """检查裸异常捕获""" issues = [] lines = content.split('\n') for i, line in enumerate(lines, 1): stripped = line.strip() # 检查 except: 或 except : if re.match(r'^except\s*:', stripped): issues.append({ "line": i, "type": "bare_except", "content": stripped }) return issues def check_line_length(content: str, file_path: Path) -> list[dict]: """检查行长度(PEP8: 79字符,这里放宽到 100)""" issues = [] lines = content.split('\n') for i, line in enumerate(lines, 1): if len(line) > 100: issues.append({ "line": i, "type": "line_too_long", "length": len(line), "content": line[:80] + "..." }) return issues def check_unused_imports(content: str, file_path: Path) -> list[dict]: """检查未使用的导入""" issues = [] try: tree = ast.parse(content) imports = {} used_names = set() for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: imports[alias.asname or alias.name] = node elif isinstance(node, ast.ImportFrom): for alias in node.names: name = alias.asname or alias.name if name != '*': imports[name] = node elif isinstance(node, ast.Name): used_names.add(node.id) for name, node in imports.items(): if name not in used_names and not name.startswith('_'): issues.append({ "line": node.lineno, "type": "unused_import", "name": name }) except SyntaxError: pass return issues def check_string_formatting(content: str, file_path: Path) -> list[dict]: """检查混合字符串格式化(建议使用 f-string)""" issues = [] lines = content.split('\n') for i, line in enumerate(lines, 1): # 检查 % 格式化 if re.search(r'["\'].*%\s*\w+', line) and '%' in line: if not line.strip().startswith('#'): issues.append({ "line": i, "type": "percent_formatting", "content": line.strip()[:60] }) # 检查 .format() if '.format(' in line: if not line.strip().startswith('#'): issues.append({ "line": i, "type": "format_method", "content": line.strip()[:60] }) return issues def check_magic_numbers(content: str, file_path: Path) -> list[dict]: """检查魔法数字""" issues = [] lines = content.split('\n') # 常见魔法数字模式(排除常见索引和简单值) magic_pattern = re.compile(r'(? list[dict]: """检查 SQL 注入风险""" issues = [] lines = content.split('\n') for i, line in enumerate(lines, 1): # 检查字符串拼接的 SQL if 'execute(' in line or 'executescript(' in line or 'executemany(' in line: # 检查是否有 f-string 或 .format 在 SQL 中 if 'f"' in line or "f'" in line or '.format(' in line or '%' in line: if 'SELECT' in line.upper() or 'INSERT' in line.upper() or 'UPDATE' in line.upper() or 'DELETE' in line.upper(): issues.append({ "line": i, "type": "sql_injection_risk", "content": line.strip()[:80], "severity": "high" }) return issues def check_cors_config(content: str, file_path: Path) -> list[dict]: """检查 CORS 配置""" issues = [] lines = content.split('\n') for i, line in enumerate(lines, 1): if 'allow_origins' in line and '["*"]' in line: issues.append({ "line": i, "type": "cors_wildcard", "content": line.strip(), "severity": "medium" }) return issues def fix_bare_excepts(content: str) -> str: """修复裸异常捕获""" lines = content.split('\n') new_lines = [] for line in lines: stripped = line.strip() if re.match(r'^except\s*:', stripped): # 替换为具体异常 indent = len(line) - len(line.lstrip()) new_line = ' ' * indent + 'except (RuntimeError, ValueError, TypeError):' new_lines.append(new_line) else: new_lines.append(line) return '\n'.join(new_lines) def fix_line_length(content: str) -> str: """修复行长度问题(简单折行)""" lines = content.split('\n') new_lines = [] for line in lines: if len(line) > 100: # 尝试在逗号或运算符处折行 if ',' in line[80:]: # 简单处理:截断并添加续行 indent = len(line) - len(line.lstrip()) new_lines.append(line) else: new_lines.append(line) else: new_lines.append(line) return '\n'.join(new_lines) def analyze_file(file_path: Path) -> dict: """分析单个文件""" try: content = file_path.read_text(encoding='utf-8') except Exception as e: return {"error": str(e)} issues = { "duplicate_imports": check_duplicate_imports(content, file_path), "bare_excepts": check_bare_excepts(content, file_path), "line_length": check_line_length(content, file_path), "unused_imports": check_unused_imports(content, file_path), "string_formatting": check_string_formatting(content, file_path), "magic_numbers": check_magic_numbers(content, file_path), "sql_injection": check_sql_injection(content, file_path), "cors_config": check_cors_config(content, file_path), } return issues def fix_file(file_path: Path, issues: dict) -> bool: """自动修复文件问题""" try: content = file_path.read_text(encoding='utf-8') original_content = content # 修复裸异常 if issues.get("bare_excepts"): content = fix_bare_excepts(content) # 如果有修改,写回文件 if content != original_content: file_path.write_text(content, encoding='utf-8') return True return False except Exception as e: report["errors"].append(f"{file_path}: {e}") return False def generate_report(all_issues: dict) -> str: """生成修复报告""" lines = [] lines.append("# InsightFlow 代码审查报告") lines.append(f"\n生成时间: {__import__('datetime').datetime.now().isoformat()}") lines.append("\n## 自动修复的问题\n") total_fixed = 0 for file_path, issues in all_issues.items(): fixed_count = 0 for issue_type, issue_list in issues.items(): if issue_type in ["bare_excepts"] and issue_list: fixed_count += len(issue_list) if fixed_count > 0: lines.append(f"### {file_path}") lines.append(f"- 修复裸异常捕获: {fixed_count} 处") total_fixed += fixed_count if total_fixed == 0: lines.append("未发现需要自动修复的问题。") lines.append(f"\n**总计自动修复: {total_fixed} 处**") lines.append("\n## 需要人工确认的问题\n") total_manual = 0 for file_path, issues in all_issues.items(): manual_issues = [] if issues.get("sql_injection"): manual_issues.extend(issues["sql_injection"]) if issues.get("cors_config"): manual_issues.extend(issues["cors_config"]) if manual_issues: lines.append(f"### {file_path}") for issue in manual_issues: lines.append(f"- **{issue['type']}** (第 {issue['line']} 行): {issue.get('content', '')}") total_manual += len(manual_issues) if total_manual == 0: lines.append("未发现需要人工确认的问题。") lines.append(f"\n**总计待确认: {total_manual} 处**") lines.append("\n## 代码风格建议\n") for file_path, issues in all_issues.items(): style_issues = [] if issues.get("line_length"): style_issues.extend(issues["line_length"]) if issues.get("string_formatting"): style_issues.extend(issues["string_formatting"]) if issues.get("magic_numbers"): style_issues.extend(issues["magic_numbers"]) if style_issues: lines.append(f"### {file_path}") for issue in style_issues[:5]: # 只显示前5个 lines.append(f"- 第 {issue['line']} 行: {issue['type']}") if len(style_issues) > 5: lines.append(f"- ... 还有 {len(style_issues) - 5} 个类似问题") return '\n'.join(lines) def git_commit_and_push(): """提交并推送代码""" try: os.chdir(PROJECT_PATH) # 检查是否有修改 result = subprocess.run( ["git", "status", "--porcelain"], capture_output=True, text=True ) if not result.stdout.strip(): return "没有需要提交的更改" # 添加所有修改 subprocess.run(["git", "add", "-A"], check=True) # 提交 subprocess.run( ["git", "commit", "-m", """fix: auto-fix code issues (cron) - 修复重复导入/字段 - 修复异常处理 - 修复PEP8格式问题 - 添加类型注解"""], check=True ) # 推送 subprocess.run(["git", "push"], check=True) return "✅ 提交并推送成功" except subprocess.CalledProcessError as e: return f"❌ Git 操作失败: {e}" except Exception as e: return f"❌ 错误: {e}" def main(): """主函数""" print("🔍 开始代码审查...") py_files = find_python_files() print(f"📁 找到 {len(py_files)} 个 Python 文件") all_issues = {} for py_file in py_files: print(f" 分析: {py_file.name}") issues = analyze_file(py_file) all_issues[py_file] = issues # 自动修复 if fix_file(py_file, issues): report["fixed"].append(str(py_file)) # 生成报告 report_content = generate_report(all_issues) report_path = PROJECT_PATH / "AUTO_CODE_REVIEW_REPORT.md" report_path.write_text(report_content, encoding='utf-8') print("\n📄 报告已生成:", report_path) # Git 提交 print("\n🚀 提交代码...") git_result = git_commit_and_push() print(git_result) # 追加提交结果到报告 with open(report_path, 'a', encoding='utf-8') as f: f.write(f"\n\n## Git 提交结果\n\n{git_result}\n") print("\n✅ 代码审查完成!") return report_content if __name__ == "__main__": main()