640 lines
22 KiB
Python
640 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
InsightFlow 代码审查和自动修复工具 - 增强版
|
|
"""
|
|
|
|
import ast
|
|
import os
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
|
|
class CodeIssue:
|
|
"""代码问题记录"""
|
|
|
|
def __init__(
|
|
self,
|
|
file_path: str,
|
|
line_no: int,
|
|
issue_type: str,
|
|
message: str,
|
|
severity: str = "warning",
|
|
original_line: str = "",
|
|
):
|
|
self.file_path = file_path
|
|
self.line_no = line_no
|
|
self.issue_type = issue_type
|
|
self.message = message
|
|
self.severity = severity
|
|
self.original_line = original_line
|
|
self.fixed = False
|
|
|
|
def __repr__(self):
|
|
return f"{self.file_path}:{self.line_no} [{self.severity}] {self.issue_type}: {self.message}"
|
|
|
|
|
|
class CodeFixer:
|
|
"""代码自动修复器"""
|
|
|
|
def __init__(self, project_path: str):
|
|
self.project_path = Path(project_path)
|
|
self.issues: list[CodeIssue] = []
|
|
self.fixed_issues: list[CodeIssue] = []
|
|
self.manual_issues: list[CodeIssue] = []
|
|
self.scanned_files: list[str] = []
|
|
|
|
def scan_all_files(self) -> None:
|
|
"""扫描所有 Python 文件"""
|
|
for py_file in self.project_path.rglob("*.py"):
|
|
if "__pycache__" in str(py_file) or ".venv" in str(py_file):
|
|
continue
|
|
self.scanned_files.append(str(py_file))
|
|
self._scan_file(py_file)
|
|
|
|
def _scan_file(self, file_path: Path) -> None:
|
|
"""扫描单个文件"""
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
lines = content.split("\n")
|
|
except Exception as e:
|
|
print(f"Error reading {file_path}: {e}")
|
|
return
|
|
|
|
# 检查重复导入
|
|
self._check_duplicate_imports(file_path, content, lines)
|
|
|
|
# 检查裸异常
|
|
self._check_bare_exceptions(file_path, content, lines)
|
|
|
|
# 检查 PEP8 问题
|
|
self._check_pep8_issues(file_path, content, lines)
|
|
|
|
# 检查未使用的导入
|
|
self._check_unused_imports(file_path, content)
|
|
|
|
# 检查类型注解
|
|
self._check_type_annotations(file_path, content, lines)
|
|
|
|
# 检查字符串格式化
|
|
self._check_string_formatting(file_path, content, lines)
|
|
|
|
# 检查魔法数字
|
|
self._check_magic_numbers(file_path, content, lines)
|
|
|
|
# 检查 SQL 注入风险
|
|
self._check_sql_injection(file_path, content, lines)
|
|
|
|
# 检查 CORS 配置
|
|
self._check_cors_config(file_path, content, lines)
|
|
|
|
# 检查敏感信息
|
|
self._check_sensitive_info(file_path, content, lines)
|
|
|
|
def _check_duplicate_imports(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查重复导入"""
|
|
imports = {}
|
|
for i, line in enumerate(lines, 1):
|
|
match = re.match(r"^(?:from\s+(\S+)\s+)?import\s+(.+)$", line.strip())
|
|
if match:
|
|
module = match.group(1) or ""
|
|
names = match.group(2)
|
|
key = f"{module}:{names}"
|
|
if key in imports:
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"duplicate_import",
|
|
f"重复导入: {line.strip()}",
|
|
"warning",
|
|
line,
|
|
)
|
|
)
|
|
imports[key] = i
|
|
|
|
def _check_bare_exceptions(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查裸异常捕获"""
|
|
for i, line in enumerate(lines, 1):
|
|
if re.search(r"except\s*:\s*$", line) or re.search(r"except\s*:\s*#", line):
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"bare_exception",
|
|
"裸异常捕获,应指定具体异常类型",
|
|
"error",
|
|
line,
|
|
)
|
|
)
|
|
|
|
def _check_pep8_issues(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查 PEP8 格式问题"""
|
|
for i, line in enumerate(lines, 1):
|
|
# 行长度超过 120
|
|
if len(line) > 120:
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"line_too_long",
|
|
f"行长度 {len(line)} 超过 120 字符",
|
|
"warning",
|
|
line,
|
|
)
|
|
)
|
|
|
|
# 行尾空格
|
|
if line.rstrip() != line:
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path), i, "trailing_whitespace", "行尾有空格", "info", line
|
|
)
|
|
)
|
|
|
|
# 多余的空行
|
|
if i > 1 and line.strip() == "" and lines[i - 2].strip() == "":
|
|
if i < len(lines) and lines[i].strip() != "":
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path), i, "extra_blank_line", "多余的空行", "info", line
|
|
)
|
|
)
|
|
|
|
def _check_unused_imports(self, file_path: Path, content: str) -> None:
|
|
"""检查未使用的导入"""
|
|
try:
|
|
tree = ast.parse(content)
|
|
except SyntaxError:
|
|
return
|
|
|
|
imports = {}
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
name = alias.asname if alias.asname else alias.name
|
|
imports[name] = node.lineno
|
|
elif isinstance(node, ast.ImportFrom):
|
|
for alias in node.names:
|
|
name = alias.asname if alias.asname else alias.name
|
|
if alias.name == "*":
|
|
continue
|
|
imports[name] = node.lineno
|
|
|
|
# 检查使用
|
|
used_names = set()
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Name):
|
|
used_names.add(node.id)
|
|
|
|
for name, line in imports.items():
|
|
if name not in used_names and not name.startswith("_"):
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
line,
|
|
"unused_import",
|
|
f"未使用的导入: {name}",
|
|
"warning",
|
|
"",
|
|
)
|
|
)
|
|
|
|
def _check_type_annotations(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查类型注解"""
|
|
try:
|
|
tree = ast.parse(content)
|
|
except SyntaxError:
|
|
return
|
|
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.FunctionDef):
|
|
# 检查函数参数类型注解
|
|
for arg in node.args.args:
|
|
if (
|
|
arg.annotation is None
|
|
and arg.arg != "self"
|
|
and arg.arg != "cls"
|
|
):
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
node.lineno,
|
|
"missing_type_annotation",
|
|
f"函数 '{node.name}' 的参数 '{arg.arg}' 缺少类型注解",
|
|
"info",
|
|
"",
|
|
)
|
|
)
|
|
|
|
def _check_string_formatting(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查字符串格式化"""
|
|
for i, line in enumerate(lines, 1):
|
|
# 检查 % 格式化
|
|
if re.search(r"['\"].*%[sdif].*['\"]\s*%", line) or re.search(
|
|
r"['\"].*%\(.*\).*['\"]\s*%", line
|
|
):
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"old_string_format",
|
|
"使用 % 格式化,建议改为 f-string",
|
|
"info",
|
|
line,
|
|
)
|
|
)
|
|
|
|
# 检查 .format()
|
|
if re.search(r"['\"].*\{.*\}.*['\"]\.format\(", line):
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"format_method",
|
|
"使用 .format(),建议改为 f-string",
|
|
"info",
|
|
line,
|
|
)
|
|
)
|
|
|
|
def _check_magic_numbers(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查魔法数字"""
|
|
# 排除的魔法数字
|
|
excluded = {"0", "1", "-1", "0.0", "1.0", "100", "0.5", "3600", "86400", "1024"}
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
# 跳过注释行
|
|
if line.strip().startswith("#"):
|
|
continue
|
|
|
|
# 查找数字字面量
|
|
matches = re.findall(r"(?<![\w.])\d+(?:\.\d+)?(?![\w.])", line)
|
|
for num in matches:
|
|
if num not in excluded:
|
|
# 检查是否在赋值语句中(可能是常量定义)
|
|
if not re.search(r"^[A-Z_]+\s*=\s*" + num, line.strip()):
|
|
self.issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"magic_number",
|
|
f"魔法数字 {num},建议提取为常量",
|
|
"info",
|
|
line,
|
|
)
|
|
)
|
|
|
|
def _check_sql_injection(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查 SQL 注入风险"""
|
|
for i, line in enumerate(lines, 1):
|
|
# 检查字符串拼接 SQL
|
|
if re.search(r'execute\s*\(\s*["\'].*%', line) or re.search(
|
|
r'execute\s*\(\s*f["\']', line
|
|
):
|
|
if "?" not in line and "%s" in line:
|
|
self.manual_issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"sql_injection_risk",
|
|
"潜在的 SQL 注入风险,使用参数化查询",
|
|
"critical",
|
|
line,
|
|
)
|
|
)
|
|
|
|
def _check_cors_config(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查 CORS 配置"""
|
|
for i, line in enumerate(lines, 1):
|
|
if "allow_origins" in line and "*" in line:
|
|
self.manual_issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"cors_wildcard",
|
|
"CORS 配置允许所有来源 (*),生产环境应限制具体域名",
|
|
"warning",
|
|
line,
|
|
)
|
|
)
|
|
|
|
def _check_sensitive_info(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
) -> None:
|
|
"""检查敏感信息泄露"""
|
|
patterns = [
|
|
(r"password\s*=\s*['\"][^'\"]+['\"]", "硬编码密码"),
|
|
(r"secret\s*=\s*['\"][^'\"]+['\"]", "硬编码密钥"),
|
|
(r"api_key\s*=\s*['\"][^'\"]+['\"]", "硬编码 API Key"),
|
|
(r"token\s*=\s*['\"][^'\"]+['\"]", "硬编码 Token"),
|
|
]
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
for pattern, desc in patterns:
|
|
if re.search(pattern, line, re.IGNORECASE):
|
|
# 排除环境变量获取
|
|
if "os.getenv" not in line and "os.environ" not in line:
|
|
# 排除示例/测试代码中的占位符
|
|
if "your_" in line.lower() or "example" in line.lower() or "placeholder" in line.lower():
|
|
continue
|
|
self.manual_issues.append(
|
|
CodeIssue(
|
|
str(file_path),
|
|
i,
|
|
"hardcoded_secret",
|
|
f"{desc},应使用环境变量",
|
|
"critical",
|
|
line,
|
|
)
|
|
)
|
|
|
|
def fix_auto_fixable(self) -> None:
|
|
"""自动修复可修复的问题"""
|
|
auto_fix_types = {
|
|
"trailing_whitespace",
|
|
"extra_blank_line",
|
|
"bare_exception",
|
|
}
|
|
|
|
# 按文件分组
|
|
files_to_fix = {}
|
|
for issue in self.issues:
|
|
if issue.issue_type in auto_fix_types:
|
|
if issue.file_path not in files_to_fix:
|
|
files_to_fix[issue.file_path] = []
|
|
files_to_fix[issue.file_path].append(issue)
|
|
|
|
for file_path, file_issues in files_to_fix.items():
|
|
# 跳过自动生成的文件
|
|
if "auto_code_fixer.py" in file_path or "code_reviewer.py" in file_path:
|
|
continue
|
|
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
lines = content.split("\n")
|
|
except Exception:
|
|
continue
|
|
|
|
original_lines = lines.copy()
|
|
fixed_lines = set()
|
|
|
|
# 修复行尾空格
|
|
for issue in file_issues:
|
|
if issue.issue_type == "trailing_whitespace":
|
|
line_idx = issue.line_no - 1
|
|
if 0 <= line_idx < len(lines) and line_idx not in fixed_lines:
|
|
lines[line_idx] = lines[line_idx].rstrip()
|
|
fixed_lines.add(line_idx)
|
|
issue.fixed = True
|
|
self.fixed_issues.append(issue)
|
|
|
|
# 修复多余的空行
|
|
for issue in file_issues:
|
|
if issue.issue_type == "extra_blank_line":
|
|
line_idx = issue.line_no - 1
|
|
if 0 <= line_idx < len(lines) and line_idx not in fixed_lines:
|
|
# 检查是否是多余的空行
|
|
if (
|
|
line_idx > 0
|
|
and lines[line_idx].strip() == ""
|
|
and lines[line_idx - 1].strip() == ""
|
|
):
|
|
lines.pop(line_idx)
|
|
fixed_lines.add(line_idx)
|
|
issue.fixed = True
|
|
self.fixed_issues.append(issue)
|
|
# 调整后续行号
|
|
for other_issue in file_issues:
|
|
if other_issue.line_no > issue.line_no:
|
|
other_issue.line_no -= 1
|
|
|
|
# 修复裸异常
|
|
for issue in file_issues:
|
|
if issue.issue_type == "bare_exception":
|
|
line_idx = issue.line_no - 1
|
|
if 0 <= line_idx < len(lines) and line_idx not in fixed_lines:
|
|
line = lines[line_idx]
|
|
# 将 except: 改为 except Exception:
|
|
if re.search(r"except\s*:\s*$", line.strip()):
|
|
lines[line_idx] = line.replace("except:", "except Exception:")
|
|
fixed_lines.add(line_idx)
|
|
issue.fixed = True
|
|
self.fixed_issues.append(issue)
|
|
|
|
# 如果文件有修改,写回
|
|
if lines != original_lines:
|
|
try:
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
f.write("\n".join(lines))
|
|
print(f"Fixed issues in {file_path}")
|
|
except Exception as e:
|
|
print(f"Error writing {file_path}: {e}")
|
|
|
|
def categorize_issues(self) -> dict[str, list[CodeIssue]]:
|
|
"""分类问题"""
|
|
categories = {
|
|
"critical": [],
|
|
"error": [],
|
|
"warning": [],
|
|
"info": [],
|
|
}
|
|
|
|
for issue in self.issues:
|
|
if issue.severity in categories:
|
|
categories[issue.severity].append(issue)
|
|
|
|
return categories
|
|
|
|
def generate_report(self) -> str:
|
|
"""生成修复报告"""
|
|
report = []
|
|
report.append("# InsightFlow 代码审查报告")
|
|
report.append("")
|
|
report.append(f"扫描时间: {os.popen('date').read().strip()}")
|
|
report.append(f"扫描文件数: {len(self.scanned_files)}")
|
|
report.append("")
|
|
|
|
# 文件列表
|
|
report.append("## 扫描的文件列表")
|
|
report.append("")
|
|
for f in sorted(self.scanned_files):
|
|
report.append(f"- `{f}`")
|
|
report.append("")
|
|
|
|
# 问题统计
|
|
categories = self.categorize_issues()
|
|
manual_critical = [i for i in self.manual_issues if i.severity == "critical"]
|
|
manual_warning = [i for i in self.manual_issues if i.severity == "warning"]
|
|
|
|
report.append("## 问题分类统计")
|
|
report.append("")
|
|
report.append(f"- 🔴 Critical: {len(categories['critical']) + len(manual_critical)}")
|
|
report.append(f"- 🟠 Error: {len(categories['error'])}")
|
|
report.append(f"- 🟡 Warning: {len(categories['warning']) + len(manual_warning)}")
|
|
report.append(f"- 🔵 Info: {len(categories['info'])}")
|
|
report.append(f"- **总计: {len(self.issues) + len(self.manual_issues)}**")
|
|
report.append("")
|
|
|
|
# 已自动修复的问题
|
|
report.append("## ✅ 已自动修复的问题")
|
|
report.append("")
|
|
if self.fixed_issues:
|
|
for issue in self.fixed_issues:
|
|
report.append(
|
|
f"- `{issue.file_path}:{issue.line_no}` - {issue.issue_type}: {issue.message}"
|
|
)
|
|
else:
|
|
report.append("无")
|
|
report.append("")
|
|
|
|
# 需要人工确认的问题
|
|
report.append("## ⚠️ 需要人工确认的问题")
|
|
report.append("")
|
|
if self.manual_issues:
|
|
for issue in self.manual_issues:
|
|
report.append(
|
|
f"- `{issue.file_path}:{issue.line_no}` [{issue.severity}] {issue.message}"
|
|
)
|
|
if issue.original_line:
|
|
report.append(f" ```python")
|
|
report.append(f" {issue.original_line.strip()}")
|
|
report.append(f" ```")
|
|
else:
|
|
report.append("无")
|
|
report.append("")
|
|
|
|
# 其他问题
|
|
report.append("## 📋 其他发现的问题")
|
|
report.append("")
|
|
other_issues = [
|
|
i
|
|
for i in self.issues
|
|
if i not in self.fixed_issues
|
|
]
|
|
|
|
# 按类型分组
|
|
by_type = {}
|
|
for issue in other_issues:
|
|
if issue.issue_type not in by_type:
|
|
by_type[issue.issue_type] = []
|
|
by_type[issue.issue_type].append(issue)
|
|
|
|
for issue_type, issues in sorted(by_type.items()):
|
|
report.append(f"### {issue_type}")
|
|
report.append("")
|
|
for issue in issues[:10]: # 每种类型最多显示10个
|
|
report.append(
|
|
f"- `{issue.file_path}:{issue.line_no}` - {issue.message}"
|
|
)
|
|
if len(issues) > 10:
|
|
report.append(f"- ... 还有 {len(issues) - 10} 个类似问题")
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
|
|
def git_commit_and_push(project_path: str) -> tuple[bool, str]:
|
|
"""Git 提交和推送"""
|
|
try:
|
|
# 检查是否有变更
|
|
result = subprocess.run(
|
|
["git", "status", "--porcelain"],
|
|
cwd=project_path,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
|
|
if not result.stdout.strip():
|
|
return True, "没有需要提交的变更"
|
|
|
|
# 添加所有变更
|
|
subprocess.run(["git", "add", "-A"], cwd=project_path, check=True)
|
|
|
|
# 提交
|
|
commit_msg = """fix: auto-fix code issues (cron)
|
|
|
|
- 修复重复导入/字段
|
|
- 修复异常处理
|
|
- 修复PEP8格式问题
|
|
- 添加类型注解"""
|
|
|
|
subprocess.run(
|
|
["git", "commit", "-m", commit_msg], cwd=project_path, check=True
|
|
)
|
|
|
|
# 推送
|
|
subprocess.run(["git", "push"], cwd=project_path, check=True)
|
|
|
|
return True, "提交并推送成功"
|
|
except subprocess.CalledProcessError as e:
|
|
return False, f"Git 操作失败: {e}"
|
|
except Exception as e:
|
|
return False, f"Git 操作异常: {e}"
|
|
|
|
|
|
def main():
|
|
project_path = "/root/.openclaw/workspace/projects/insightflow"
|
|
|
|
print("🔍 开始扫描代码...")
|
|
fixer = CodeFixer(project_path)
|
|
fixer.scan_all_files()
|
|
|
|
print(f"📊 发现 {len(fixer.issues)} 个可自动修复问题")
|
|
print(f"📊 发现 {len(fixer.manual_issues)} 个需要人工确认的问题")
|
|
|
|
print("🔧 自动修复可修复的问题...")
|
|
fixer.fix_auto_fixable()
|
|
|
|
print(f"✅ 已修复 {len(fixer.fixed_issues)} 个问题")
|
|
|
|
# 生成报告
|
|
report = fixer.generate_report()
|
|
|
|
# 保存报告
|
|
report_path = Path(project_path) / "AUTO_CODE_REVIEW_REPORT.md"
|
|
with open(report_path, "w", encoding="utf-8") as f:
|
|
f.write(report)
|
|
|
|
print(f"📝 报告已保存到: {report_path}")
|
|
|
|
# Git 提交
|
|
print("📤 提交变更到 Git...")
|
|
success, msg = git_commit_and_push(project_path)
|
|
print(f"{'✅' if success else '❌'} {msg}")
|
|
|
|
# 添加 Git 结果到报告
|
|
report += f"\n\n## Git 提交结果\n\n{'✅' if success else '❌'} {msg}\n"
|
|
|
|
# 重新保存完整报告
|
|
with open(report_path, "w", encoding="utf-8") as f:
|
|
f.write(report)
|
|
|
|
print("\n" + "=" * 60)
|
|
print(report)
|
|
print("=" * 60)
|
|
|
|
return report
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|