Files
insightflow/code_reviewer.py
2026-03-02 18:13:08 +08:00

455 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow 代码审查与自动修复脚本
"""
import ast
import os
import re
import subprocess
from pathlib import Path
class CodeIssue:
def __init__(
self,
file_path: str,
line_no: int,
issue_type: str,
message: str,
severity: str = "info",
) -> None:
self.file_path = file_path
self.line_no = line_no
self.issue_type = issue_type
self.message = message
self.severity = severity # info, warning, error
self.fixed = False
def __repr__(self) -> str:
return f"{self.severity.upper()}: {self.file_path}:{self.line_no} - {self.issue_type}: {self.message}"
class CodeReviewer:
def __init__(self, base_path: str) -> None:
self.base_path = Path(base_path)
self.issues: list[CodeIssue] = []
self.fixed_issues: list[CodeIssue] = []
self.manual_review_issues: list[CodeIssue] = []
def scan_all(self) -> None:
"""扫描所有 Python 文件"""
for py_file in self.base_path.rglob("*.py"):
if "__pycache__" in str(py_file):
continue
self.scan_file(py_file)
def scan_file(self, file_path: Path) -> None:
"""扫描单个文件"""
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
lines = content.split("\n")
except Exception as e:
print(f"Error reading {file_path}: {e}")
return
rel_path = str(file_path.relative_to(self.base_path))
# 1. 检查裸异常捕获
self._check_bare_exceptions(content, lines, rel_path)
# 2. 检查重复导入
self._check_duplicate_imports(content, lines, rel_path)
# 3. 检查 PEP8 问题
self._check_pep8_issues(content, lines, rel_path)
# 4. 检查未使用的导入
self._check_unused_imports(content, lines, rel_path)
# 5. 检查混合字符串格式化
self._check_string_formatting(content, lines, rel_path)
# 6. 检查魔法数字
self._check_magic_numbers(content, lines, rel_path)
# 7. 检查 SQL 注入风险
self._check_sql_injection(content, lines, rel_path)
# 8. 检查 CORS 配置
self._check_cors_config(content, lines, rel_path)
# 9. 检查敏感信息
self._check_sensitive_info(content, lines, rel_path)
def _check_bare_exceptions(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查裸异常捕获"""
for i, line in enumerate(lines, 1):
if re.search(r"except\s*:\s*$", line.strip()) or re.search(
r"except\s+Exception\s*:\s*$", line.strip()
):
# 跳过有注释说明的情况
if "# noqa" in line or "# intentional" in line.lower():
continue
issue = CodeIssue(
file_path,
i,
"bare_exception",
"裸异常捕获,应该使用具体异常类型",
"warning",
)
self.issues.append(issue)
def _check_duplicate_imports(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查重复导入"""
imports = {}
for i, line in enumerate(lines, 1):
match = re.match(r"^(?:from\s+(\S+)\s+)?import\s+(.+)$", line.strip())
if match:
module = match.group(1) or ""
names = match.group(2).split(", ")
for name in names:
name = name.strip().split()[0] # 处理 'as' 别名
key = f"{module}.{name}" if module else name
if key in imports:
issue = CodeIssue(
file_path,
i,
"duplicate_import",
f"重复导入: {key}",
"warning",
)
self.issues.append(issue)
imports[key] = i
def _check_pep8_issues(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查 PEP8 问题"""
for i, line in enumerate(lines, 1):
# 行长度超过 120
if len(line) > 120:
issue = CodeIssue(
file_path,
i,
"line_too_long",
f"行长度 {len(line)} 超过 120 字符",
"info",
)
self.issues.append(issue)
# 行尾空格
if line.rstrip() != line:
issue = CodeIssue(
file_path, i, "trailing_whitespace", "行尾有空格", "info"
)
self.issues.append(issue)
# 多余的空行
if i > 1 and line.strip() == "" and lines[i - 2].strip() == "":
if i < len(lines) and lines[i].strip() == "":
issue = CodeIssue(
file_path, i, "extra_blank_line", "多余的空行", "info"
)
self.issues.append(issue)
def _check_unused_imports(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查未使用的导入"""
try:
tree = ast.parse(content)
except SyntaxError:
return
imported_names = {}
used_names = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
name = alias.asname if alias.asname else alias.name
imported_names[name] = node.lineno
elif isinstance(node, ast.ImportFrom):
for alias in node.names:
name = alias.asname if alias.asname else alias.name
if name != "*":
imported_names[name] = node.lineno
elif isinstance(node, ast.Name):
used_names.add(node.id)
for name, lineno in imported_names.items():
if name not in used_names and not name.startswith("_"):
# 排除一些常见例外
if name in ["annotations", "TYPE_CHECKING"]:
continue
issue = CodeIssue(
file_path, lineno, "unused_import", f"未使用的导入: {name}", "info"
)
self.issues.append(issue)
def _check_string_formatting(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查混合字符串格式化"""
has_fstring = False
has_percent = False
has_format = False
for i, line in enumerate(lines, 1):
if re.search(r'f["\']', line):
has_fstring = True
if re.search(r"%[sdfr]", line) and not re.search(r"\d+%", line):
has_percent = True
if ".format(" in line:
has_format = True
if has_fstring and (has_percent or has_format):
issue = CodeIssue(
file_path,
0,
"mixed_formatting",
"文件混合使用多种字符串格式化方式,建议统一为 f-string",
"info",
)
self.issues.append(issue)
def _check_magic_numbers(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查魔法数字"""
# 常见的魔法数字模式
magic_patterns = [
(r" = \s*(\d{3, })\s*[^:]", "可能的魔法数字"),
(r"timeout\s* = \s*(\d+)", "timeout 魔法数字"),
(r"limit\s* = \s*(\d+)", "limit 魔法数字"),
(r"port\s* = \s*(\d+)", "port 魔法数字"),
]
for i, line in enumerate(lines, 1):
# 跳过注释和字符串
code_part = line.split("#")[0]
if not code_part.strip():
continue
for pattern, msg in magic_patterns:
if re.search(pattern, code_part, re.IGNORECASE):
# 排除常见的合理数字
match = re.search(r"(\d{3, })", code_part)
if match:
num = int(match.group(1))
if num in [
200,
404,
500,
401,
403,
429,
1000,
1024,
2048,
4096,
8080,
3000,
8000,
]:
continue
issue = CodeIssue(
file_path, i, "magic_number", f"{msg}: {num}", "info"
)
self.issues.append(issue)
def _check_sql_injection(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查 SQL 注入风险"""
for i, line in enumerate(lines, 1):
# 检查字符串拼接的 SQL
if re.search(r'execute\s*\(\s*["\'].*%s', line) or re.search(
r'execute\s*\(\s*f["\']', line
):
if "?" not in line and "%s" in line:
issue = CodeIssue(
file_path,
i,
"sql_injection_risk",
"可能的 SQL 注入风险 - 需要人工确认",
"error",
)
self.manual_review_issues.append(issue)
def _check_cors_config(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查 CORS 配置"""
for i, line in enumerate(lines, 1):
if "allow_origins" in line and '["*"]' in line:
issue = CodeIssue(
file_path,
i,
"cors_wildcard",
"CORS 允许所有来源 - 需要人工确认",
"warning",
)
self.manual_review_issues.append(issue)
def _check_sensitive_info(
self, content: str, lines: list[str], file_path: str
) -> None:
"""检查敏感信息"""
for i, line in enumerate(lines, 1):
# 检查硬编码密钥
if re.search(
r'(password|secret|key|token)\s* = \s*["\'][^"\']+["\']',
line,
re.IGNORECASE,
):
if (
"os.getenv" not in line
and "environ" not in line
and "getenv" not in line
):
# 排除一些常见假阳性
if not re.search(r'["\']\*+["\']', line) and not re.search(
r'["\']<[^"\']*>["\']', line
):
issue = CodeIssue(
file_path,
i,
"hardcoded_secret",
"可能的硬编码敏感信息 - 需要人工确认",
"error",
)
self.manual_review_issues.append(issue)
def auto_fix(self) -> None:
"""自动修复问题"""
# 按文件分组问题
issues_by_file: dict[str, list[CodeIssue]] = {}
for issue in self.issues:
if issue.file_path not in issues_by_file:
issues_by_file[issue.file_path] = []
issues_by_file[issue.file_path].append(issue)
for file_path, issues in issues_by_file.items():
full_path = self.base_path / file_path
if not full_path.exists():
continue
try:
with open(full_path, "r", encoding="utf-8") as f:
content = f.read()
lines = content.split("\n")
except Exception as e:
print(f"Error reading {full_path}: {e}")
continue
original_lines = lines.copy()
# 修复行尾空格
for issue in issues:
if issue.issue_type == "trailing_whitespace":
idx = issue.line_no - 1
if 0 <= idx < len(lines):
lines[idx] = lines[idx].rstrip()
issue.fixed = True
# 修复裸异常
for issue in issues:
if issue.issue_type == "bare_exception":
idx = issue.line_no - 1
if 0 <= idx < len(lines):
line = lines[idx]
# 将 except Exception: 改为 except Exception:
if re.search(r"except\s*:\s*$", line.strip()):
lines[idx] = line.replace(
"except Exception:", "except Exception:"
)
issue.fixed = True
elif re.search(r"except\s+Exception\s*:\s*$", line.strip()):
# 已经是 Exception但可能需要更具体
pass
# 如果文件有修改,写回
if lines != original_lines:
with open(full_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"Fixed issues in {file_path}")
# 移动到已修复列表
self.fixed_issues = [i for i in self.issues if i.fixed]
self.issues = [i for i in self.issues if not i.fixed]
def generate_report(self) -> str:
"""生成审查报告"""
report = []
report.append("# InsightFlow 代码审查报告")
report.append(f"\n扫描路径: {self.base_path}")
report.append(f"扫描时间: {__import__('datetime').datetime.now().isoformat()}")
report.append("\n## 已自动修复的问题\n")
if self.fixed_issues:
report.append(f"共修复 {len(self.fixed_issues)} 个问题:\n")
for issue in self.fixed_issues:
report.append(
f"- ✅ {issue.file_path}:{issue.line_no} - {issue.issue_type}: {issue.message}"
)
else:
report.append("")
report.append("\n## 需要人工确认的问题\n")
if self.manual_review_issues:
report.append(f"共发现 {len(self.manual_review_issues)} 个问题:\n")
for issue in self.manual_review_issues:
report.append(
f"- ⚠️ {issue.file_path}:{issue.line_no} - {issue.issue_type}: {issue.message}"
)
else:
report.append("")
report.append("\n## 建议手动修复的问题\n")
if self.issues:
report.append(f"共发现 {len(self.issues)} 个问题:\n")
for issue in self.issues:
report.append(
f"- 📝 {issue.file_path}:{issue.line_no} - {issue.issue_type}: {issue.message}"
)
else:
report.append("")
return "\n".join(report)
def main() -> None:
base_path = "/root/.openclaw/workspace/projects/insightflow/backend"
reviewer = CodeReviewer(base_path)
print("开始扫描代码...")
reviewer.scan_all()
print(f"发现 {len(reviewer.issues)} 个可自动修复问题")
print(f"发现 {len(reviewer.manual_review_issues)} 个需要人工确认的问题")
print("\n开始自动修复...")
reviewer.auto_fix()
print(f"\n已修复 {len(reviewer.fixed_issues)} 个问题")
# 生成报告
report = reviewer.generate_report()
report_path = Path(base_path).parent / "CODE_REVIEW_REPORT.md"
with open(report_path, "w", encoding="utf-8") as f:
f.write(report)
print(f"\n报告已保存到: {report_path}")
return reviewer
if __name__ == "__main__":
main()