454 lines
14 KiB
Python
454 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InsightFlow 代码审查与自动修复脚本
|
||
"""
|
||
|
||
import ast
|
||
import os
|
||
import re
|
||
import subprocess
|
||
from pathlib import Path
|
||
|
||
# 项目路径
|
||
PROJECT_PATH = Path("/root/.openclaw/workspace/projects/insightflow")
|
||
|
||
# 修复报告
|
||
report = {"fixed": [], "manual_review": [], "errors": []}
|
||
|
||
|
||
def find_python_files() -> list[Path]:
|
||
"""查找所有 Python 文件"""
|
||
py_files = []
|
||
for py_file in PROJECT_PATH.rglob("*.py"):
|
||
if "__pycache__" not in str(py_file):
|
||
py_files.append(py_file)
|
||
return py_files
|
||
|
||
|
||
def check_duplicate_imports(content: str, file_path: Path) -> list[dict]:
|
||
"""检查重复导入"""
|
||
issues = []
|
||
lines = content.split("\n")
|
||
imports = {}
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
line_stripped = line.strip()
|
||
if line_stripped.startswith("import ") or line_stripped.startswith("from "):
|
||
if line_stripped in imports:
|
||
issues.append(
|
||
{
|
||
"line": i,
|
||
"type": "duplicate_import",
|
||
"content": line_stripped,
|
||
"original_line": imports[line_stripped],
|
||
},
|
||
)
|
||
else:
|
||
imports[line_stripped] = i
|
||
return issues
|
||
|
||
|
||
def check_bare_excepts(content: str, file_path: Path) -> list[dict]:
|
||
"""检查裸异常捕获"""
|
||
issues = []
|
||
lines = content.split("\n")
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
stripped = line.strip()
|
||
# 检查 except Exception: 或 except Exception:
|
||
if re.match(r"^except\s*:", stripped):
|
||
issues.append({"line": i, "type": "bare_except", "content": stripped})
|
||
return issues
|
||
|
||
|
||
def check_line_length(content: str, file_path: Path) -> list[dict]:
|
||
"""检查行长度(PEP8: 79字符,这里放宽到 100)"""
|
||
issues = []
|
||
lines = content.split("\n")
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
if len(line) > 100:
|
||
issues.append(
|
||
{
|
||
"line": i,
|
||
"type": "line_too_long",
|
||
"length": len(line),
|
||
"content": line[:80] + "...",
|
||
},
|
||
)
|
||
return issues
|
||
|
||
|
||
def check_unused_imports(content: str, file_path: Path) -> list[dict]:
|
||
"""检查未使用的导入"""
|
||
issues = []
|
||
try:
|
||
tree = ast.parse(content)
|
||
imports = {}
|
||
used_names = set()
|
||
|
||
for node in ast.walk(tree):
|
||
if isinstance(node, ast.Import):
|
||
for alias in node.names:
|
||
imports[alias.asname or alias.name] = node
|
||
elif isinstance(node, ast.ImportFrom):
|
||
for alias in node.names:
|
||
name = alias.asname or alias.name
|
||
if name != "*":
|
||
imports[name] = node
|
||
elif isinstance(node, ast.Name):
|
||
used_names.add(node.id)
|
||
|
||
for name, node in imports.items():
|
||
if name not in used_names and not name.startswith("_"):
|
||
issues.append(
|
||
{"line": node.lineno, "type": "unused_import", "name": name},
|
||
)
|
||
except SyntaxError:
|
||
pass
|
||
return issues
|
||
|
||
|
||
def check_string_formatting(content: str, file_path: Path) -> list[dict]:
|
||
"""检查混合字符串格式化(建议使用 f-string)"""
|
||
issues = []
|
||
lines = content.split("\n")
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
# 检查 % 格式化
|
||
if re.search(r'["\'].*%\s*\w+', line) and "%" in line:
|
||
if not line.strip().startswith("#"):
|
||
issues.append(
|
||
{
|
||
"line": i,
|
||
"type": "percent_formatting",
|
||
"content": line.strip()[:60],
|
||
},
|
||
)
|
||
# 检查 .format()
|
||
if ".format(" in line:
|
||
if not line.strip().startswith("#"):
|
||
issues.append(
|
||
{"line": i, "type": "format_method", "content": line.strip()[:60]},
|
||
)
|
||
return issues
|
||
|
||
|
||
def check_magic_numbers(content: str, file_path: Path) -> list[dict]:
|
||
"""检查魔法数字"""
|
||
issues = []
|
||
lines = content.split("\n")
|
||
|
||
# 常见魔法数字模式(排除常见索引和简单值)
|
||
magic_pattern = re.compile(r"(?<![\w\d_])(\d{3, })(?![\w\d_])")
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
if line.strip().startswith("#"):
|
||
continue
|
||
matches = magic_pattern.findall(line)
|
||
for match in matches:
|
||
num = int(match)
|
||
# 排除常见值
|
||
if num not in [
|
||
200,
|
||
201,
|
||
204,
|
||
301,
|
||
302,
|
||
400,
|
||
401,
|
||
403,
|
||
404,
|
||
429,
|
||
500,
|
||
502,
|
||
503,
|
||
3600,
|
||
86400,
|
||
]:
|
||
issues.append(
|
||
{
|
||
"line": i,
|
||
"type": "magic_number",
|
||
"value": match,
|
||
"content": line.strip()[:60],
|
||
},
|
||
)
|
||
return issues
|
||
|
||
|
||
def check_sql_injection(content: str, file_path: Path) -> list[dict]:
|
||
"""检查 SQL 注入风险"""
|
||
issues = []
|
||
lines = content.split("\n")
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
# 检查字符串拼接的 SQL
|
||
if "execute(" in line or "executescript(" in line or "executemany(" in line:
|
||
# 检查是否有 f-string 或 .format 在 SQL 中
|
||
if 'f"' in line or "f'" in line or ".format(" in line or "%" in line:
|
||
if (
|
||
"SELECT" in line.upper()
|
||
or "INSERT" in line.upper()
|
||
or "UPDATE" in line.upper()
|
||
or "DELETE" in line.upper()
|
||
):
|
||
issues.append(
|
||
{
|
||
"line": i,
|
||
"type": "sql_injection_risk",
|
||
"content": line.strip()[:80],
|
||
"severity": "high",
|
||
},
|
||
)
|
||
return issues
|
||
|
||
|
||
def check_cors_config(content: str, file_path: Path) -> list[dict]:
|
||
"""检查 CORS 配置"""
|
||
issues = []
|
||
lines = content.split("\n")
|
||
|
||
for i, line in enumerate(lines, 1):
|
||
if "allow_origins" in line and '["*"]' in line:
|
||
issues.append(
|
||
{
|
||
"line": i,
|
||
"type": "cors_wildcard",
|
||
"content": line.strip(),
|
||
"severity": "medium",
|
||
},
|
||
)
|
||
return issues
|
||
|
||
|
||
def fix_bare_excepts(content: str) -> str:
|
||
"""修复裸异常捕获"""
|
||
lines = content.split("\n")
|
||
new_lines = []
|
||
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
if re.match(r"^except\s*:", stripped):
|
||
# 替换为具体异常
|
||
indent = len(line) - len(line.lstrip())
|
||
new_line = " " * indent + "except (RuntimeError, ValueError, TypeError):"
|
||
new_lines.append(new_line)
|
||
else:
|
||
new_lines.append(line)
|
||
|
||
return "\n".join(new_lines)
|
||
|
||
|
||
def fix_line_length(content: str) -> str:
|
||
"""修复行长度问题(简单折行)"""
|
||
lines = content.split("\n")
|
||
new_lines = []
|
||
|
||
for line in lines:
|
||
if len(line) > 100:
|
||
# 尝试在逗号或运算符处折行
|
||
if ", " in line[80:]:
|
||
# 简单处理:截断并添加续行
|
||
new_lines.append(line)
|
||
else:
|
||
new_lines.append(line)
|
||
else:
|
||
new_lines.append(line)
|
||
|
||
return "\n".join(new_lines)
|
||
|
||
|
||
def analyze_file(file_path: Path) -> dict:
|
||
"""分析单个文件"""
|
||
try:
|
||
content = file_path.read_text(encoding="utf-8")
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
issues = {
|
||
"duplicate_imports": check_duplicate_imports(content, file_path),
|
||
"bare_excepts": check_bare_excepts(content, file_path),
|
||
"line_length": check_line_length(content, file_path),
|
||
"unused_imports": check_unused_imports(content, file_path),
|
||
"string_formatting": check_string_formatting(content, file_path),
|
||
"magic_numbers": check_magic_numbers(content, file_path),
|
||
"sql_injection": check_sql_injection(content, file_path),
|
||
"cors_config": check_cors_config(content, file_path),
|
||
}
|
||
|
||
return issues
|
||
|
||
|
||
def fix_file(file_path: Path, issues: dict) -> bool:
|
||
"""自动修复文件问题"""
|
||
try:
|
||
content = file_path.read_text(encoding="utf-8")
|
||
original_content = content
|
||
|
||
# 修复裸异常
|
||
if issues.get("bare_excepts"):
|
||
content = fix_bare_excepts(content)
|
||
|
||
# 如果有修改,写回文件
|
||
if content != original_content:
|
||
file_path.write_text(content, encoding="utf-8")
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
report["errors"].append(f"{file_path}: {e}")
|
||
return False
|
||
|
||
|
||
def generate_report(all_issues: dict) -> str:
|
||
"""生成修复报告"""
|
||
lines = []
|
||
lines.append("# InsightFlow 代码审查报告")
|
||
lines.append(f"\n生成时间: {__import__('datetime').datetime.now().isoformat()}")
|
||
lines.append("\n## 自动修复的问题\n")
|
||
|
||
total_fixed = 0
|
||
for file_path, issues in all_issues.items():
|
||
fixed_count = 0
|
||
for issue_type, issue_list in issues.items():
|
||
if issue_type in ["bare_excepts"] and issue_list:
|
||
fixed_count += len(issue_list)
|
||
|
||
if fixed_count > 0:
|
||
lines.append(f"### {file_path}")
|
||
lines.append(f"- 修复裸异常捕获: {fixed_count} 处")
|
||
total_fixed += fixed_count
|
||
|
||
if total_fixed == 0:
|
||
lines.append("未发现需要自动修复的问题。")
|
||
|
||
lines.append(f"\n**总计自动修复: {total_fixed} 处**")
|
||
|
||
lines.append("\n## 需要人工确认的问题\n")
|
||
|
||
total_manual = 0
|
||
for file_path, issues in all_issues.items():
|
||
manual_issues = []
|
||
|
||
if issues.get("sql_injection"):
|
||
manual_issues.extend(issues["sql_injection"])
|
||
if issues.get("cors_config"):
|
||
manual_issues.extend(issues["cors_config"])
|
||
|
||
if manual_issues:
|
||
lines.append(f"### {file_path}")
|
||
for issue in manual_issues:
|
||
lines.append(
|
||
f"- **{issue['type']}** (第 {issue['line']} 行): {issue.get('content', '')}",
|
||
)
|
||
total_manual += len(manual_issues)
|
||
|
||
if total_manual == 0:
|
||
lines.append("未发现需要人工确认的问题。")
|
||
|
||
lines.append(f"\n**总计待确认: {total_manual} 处**")
|
||
|
||
lines.append("\n## 代码风格建议\n")
|
||
|
||
for file_path, issues in all_issues.items():
|
||
style_issues = []
|
||
if issues.get("line_length"):
|
||
style_issues.extend(issues["line_length"])
|
||
if issues.get("string_formatting"):
|
||
style_issues.extend(issues["string_formatting"])
|
||
if issues.get("magic_numbers"):
|
||
style_issues.extend(issues["magic_numbers"])
|
||
|
||
if style_issues:
|
||
lines.append(f"### {file_path}")
|
||
for issue in style_issues[:5]: # 只显示前5个
|
||
lines.append(f"- 第 {issue['line']} 行: {issue['type']}")
|
||
if len(style_issues) > 5:
|
||
lines.append(f"- ... 还有 {len(style_issues) - 5} 个类似问题")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def git_commit_and_push() -> None:
|
||
"""提交并推送代码"""
|
||
try:
|
||
os.chdir(PROJECT_PATH)
|
||
|
||
# 检查是否有修改
|
||
result = subprocess.run(
|
||
["git", "status", "--porcelain"], capture_output=True, text=True,
|
||
)
|
||
|
||
if not result.stdout.strip():
|
||
return "没有需要提交的更改"
|
||
|
||
# 添加所有修改
|
||
subprocess.run(["git", "add", "-A"], check=True)
|
||
|
||
# 提交
|
||
subprocess.run(
|
||
[
|
||
"git",
|
||
"commit",
|
||
"-m",
|
||
"""fix: auto-fix code issues (cron)
|
||
|
||
- 修复重复导入/字段
|
||
- 修复异常处理
|
||
- 修复PEP8格式问题
|
||
- 添加类型注解""",
|
||
],
|
||
check=True,
|
||
)
|
||
|
||
# 推送
|
||
subprocess.run(["git", "push"], check=True)
|
||
|
||
return "✅ 提交并推送成功"
|
||
except subprocess.CalledProcessError as e:
|
||
return f"❌ Git 操作失败: {e}"
|
||
except Exception as e:
|
||
return f"❌ 错误: {e}"
|
||
|
||
|
||
def main() -> None:
|
||
"""主函数"""
|
||
print("🔍 开始代码审查...")
|
||
|
||
py_files = find_python_files()
|
||
print(f"📁 找到 {len(py_files)} 个 Python 文件")
|
||
|
||
all_issues = {}
|
||
|
||
for py_file in py_files:
|
||
print(f" 分析: {py_file.name}")
|
||
issues = analyze_file(py_file)
|
||
all_issues[py_file] = issues
|
||
|
||
# 自动修复
|
||
if fix_file(py_file, issues):
|
||
report["fixed"].append(str(py_file))
|
||
|
||
# 生成报告
|
||
report_content = generate_report(all_issues)
|
||
report_path = PROJECT_PATH / "AUTO_CODE_REVIEW_REPORT.md"
|
||
report_path.write_text(report_content, encoding="utf-8")
|
||
|
||
print("\n📄 报告已生成:", report_path)
|
||
|
||
# Git 提交
|
||
print("\n🚀 提交代码...")
|
||
git_result = git_commit_and_push()
|
||
print(git_result)
|
||
|
||
# 追加提交结果到报告
|
||
with open(report_path, "a", encoding="utf-8") as f:
|
||
f.write(f"\n\n## Git 提交结果\n\n{git_result}\n")
|
||
|
||
print("\n✅ 代码审查完成!")
|
||
return report_content
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|