Files
insightflow/backend/knowledge_reasoner.py
OpenClaw Bot 7853b2392b fix: auto-fix code issues - duplicate imports and unused imports
- 移除函数内部的重复 import re
- 移除函数内部的重复 import csv
- 移除函数内部的重复 import random
- 移除未使用的 urllib.request 导入
- 添加缺失的 time 导入到 ai_manager.py
2026-02-28 03:05:42 +08:00

489 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Knowledge Reasoning - Phase 5
知识推理与问答增强模块
"""
import json
import os
import re
from dataclasses import dataclass
from enum import Enum
import httpx
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
KIMI_BASE_URL = os.getenv("KIMI_BASE_URL", "https://api.kimi.com/coding")
class ReasoningType(Enum):
"""推理类型"""
CAUSAL = "causal" # 因果推理
ASSOCIATIVE = "associative" # 关联推理
TEMPORAL = "temporal" # 时序推理
COMPARATIVE = "comparative" # 对比推理
SUMMARY = "summary" # 总结推理
@dataclass
class ReasoningResult:
"""推理结果"""
answer: str
reasoning_type: ReasoningType
confidence: float
evidence: list[dict] # 支撑证据
related_entities: list[str] # 相关实体
gaps: list[str] # 知识缺口
@dataclass
class InferencePath:
"""推理路径"""
start_entity: str
end_entity: str
path: list[dict] # 路径上的节点和关系
strength: float # 路径强度
class KnowledgeReasoner:
"""知识推理引擎"""
def __init__(self, api_key: str = None, base_url: str = None):
self.api_key = api_key or KIMI_API_KEY
self.base_url = base_url or KIMI_BASE_URL
self.headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
async def _call_llm(self, prompt: str, temperature: float = 0.3) -> str:
"""调用 LLM"""
if not self.api_key:
raise ValueError("KIMI_API_KEY not set")
payload = {"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": temperature}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/v1/chat/completions", headers=self.headers, json=payload, timeout=120.0
)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"]
async def enhanced_qa(
self, query: str, project_context: dict, graph_data: dict, reasoning_depth: str = "medium"
) -> ReasoningResult:
"""
增强问答 - 结合图谱推理的问答
Args:
query: 用户问题
project_context: 项目上下文
graph_data: 知识图谱数据
reasoning_depth: 推理深度 (shallow/medium/deep)
"""
# 1. 分析问题类型
analysis = await self._analyze_question(query)
# 2. 根据问题类型选择推理策略
if analysis["type"] == "causal":
return await self._causal_reasoning(query, project_context, graph_data)
elif analysis["type"] == "comparative":
return await self._comparative_reasoning(query, project_context, graph_data)
elif analysis["type"] == "temporal":
return await self._temporal_reasoning(query, project_context, graph_data)
else:
return await self._associative_reasoning(query, project_context, graph_data)
async def _analyze_question(self, query: str) -> dict:
"""分析问题类型和意图"""
prompt = f"""分析以下问题的类型和意图:
问题:{query}
请返回 JSON 格式:
{{
"type": "causal|comparative|temporal|factual|opinion",
"entities": ["提到的实体"],
"intent": "问题意图描述",
"complexity": "simple|medium|complex"
}}
类型说明:
- causal: 因果类问题(为什么、导致、影响)
- comparative: 对比类问题(区别、比较、优劣)
- temporal: 时序类问题(什么时候、进度、变化)
- factual: 事实类问题(是什么、有哪些)
- opinion: 观点类问题(怎么看、态度、评价)"""
content = await self._call_llm(prompt, temperature=0.1)
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except (json.JSONDecodeError, KeyError):
pass
return {"type": "factual", "entities": [], "intent": "general", "complexity": "simple"}
async def _causal_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
"""因果推理 - 分析原因和影响"""
# 构建因果分析提示
entities_str = json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)
relations_str = json.dumps(graph_data.get("relations", []), ensure_ascii=False, indent=2)
prompt = f"""基于以下知识图谱进行因果推理分析:
## 问题
{query}
## 实体
{entities_str[:2000]}
## 关系
{relations_str[:2000]}
## 项目上下文
{json.dumps(project_context, ensure_ascii=False, indent=2)[:1500]}
请进行因果分析,返回 JSON 格式:
{{
"answer": "详细回答",
"reasoning_chain": ["推理步骤1", "推理步骤2"],
"root_causes": ["根本原因1", "根本原因2"],
"effects": ["影响1", "影响2"],
"confidence": 0.85,
"evidence": ["证据1", "证据2"],
"knowledge_gaps": ["缺失信息1"]
}}"""
content = await self._call_llm(prompt, temperature=0.3)
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.CAUSAL,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", []),
)
except (json.JSONDecodeError, KeyError):
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.CAUSAL,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=["无法完成因果推理"],
)
async def _comparative_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
"""对比推理 - 比较实体间的异同"""
prompt = f"""基于以下知识图谱进行对比分析:
## 问题
{query}
## 实体
{json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)[:2000]}
## 关系
{json.dumps(graph_data.get("relations", []), ensure_ascii=False, indent=2)[:1500]}
请进行对比分析,返回 JSON 格式:
{{
"answer": "详细对比分析",
"similarities": ["相似点1", "相似点2"],
"differences": ["差异点1", "差异点2"],
"comparison_table": {{"维度": ["实体A值", "实体B值"]}},
"confidence": 0.85,
"evidence": ["证据1"],
"knowledge_gaps": []
}}"""
content = await self._call_llm(prompt, temperature=0.3)
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.COMPARATIVE,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", []),
)
except (json.JSONDecodeError, KeyError):
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.COMPARATIVE,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=[],
)
async def _temporal_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
"""时序推理 - 分析时间线和演变"""
prompt = f"""基于以下知识图谱进行时序分析:
## 问题
{query}
## 项目时间线
{json.dumps(project_context.get("timeline", []), ensure_ascii=False, indent=2)[:2000]}
## 实体提及历史
{json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)[:1500]}
请进行时序分析,返回 JSON 格式:
{{
"answer": "时序分析结果",
"timeline": [{{"date": "时间", "event": "事件", "significance": "重要性"}}],
"trends": ["趋势1", "趋势2"],
"milestones": ["里程碑1"],
"confidence": 0.85,
"evidence": ["证据1"],
"knowledge_gaps": []
}}"""
content = await self._call_llm(prompt, temperature=0.3)
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.TEMPORAL,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", []),
)
except (json.JSONDecodeError, KeyError):
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.TEMPORAL,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=[],
)
async def _associative_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
"""关联推理 - 发现实体间的隐含关联"""
prompt = f"""基于以下知识图谱进行关联分析:
## 问题
{query}
## 实体
{json.dumps(graph_data.get("entities", [])[:20], ensure_ascii=False, indent=2)}
## 关系
{json.dumps(graph_data.get("relations", [])[:30], ensure_ascii=False, indent=2)}
请进行关联推理,发现隐含联系,返回 JSON 格式:
{{
"answer": "关联分析结果",
"direct_connections": ["直接关联1"],
"indirect_connections": ["间接关联1"],
"inferred_relations": [{{"source": "A", "target": "B", "relation": "可能关系", "confidence": 0.7}}],
"confidence": 0.85,
"evidence": ["证据1"],
"knowledge_gaps": []
}}"""
content = await self._call_llm(prompt, temperature=0.4)
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.ASSOCIATIVE,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", []),
)
except (json.JSONDecodeError, KeyError):
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.ASSOCIATIVE,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=[],
)
def find_inference_paths(
self, start_entity: str, end_entity: str, graph_data: dict, max_depth: int = 3
) -> list[InferencePath]:
"""
发现两个实体之间的推理路径
使用 BFS 在关系图中搜索路径
"""
relations = graph_data.get("relations", [])
# 构建邻接表
adj = {}
for r in relations:
src = r.get("source_id") or r.get("source")
tgt = r.get("target_id") or r.get("target")
if src not in adj:
adj[src] = []
if tgt not in adj:
adj[tgt] = []
adj[src].append({"target": tgt, "relation": r.get("type", "related"), "data": r})
# 无向图也添加反向
adj[tgt].append({"target": src, "relation": r.get("type", "related"), "data": r, "reverse": True})
# BFS 搜索路径
from collections import deque
paths = []
queue = deque([(start_entity, [{"entity": start_entity, "relation": None}])])
{start_entity}
while queue and len(paths) < 5:
current, path = queue.popleft()
if current == end_entity and len(path) > 1:
# 找到一条路径
paths.append(
InferencePath(
start_entity=start_entity,
end_entity=end_entity,
path=path,
strength=self._calculate_path_strength(path),
)
)
continue
if len(path) >= max_depth:
continue
for neighbor in adj.get(current, []):
next_entity = neighbor["target"]
if next_entity not in [p["entity"] for p in path]: # 避免循环
new_path = path + [
{
"entity": next_entity,
"relation": neighbor["relation"],
"relation_data": neighbor.get("data", {}),
}
]
queue.append((next_entity, new_path))
# 按强度排序
paths.sort(key=lambda p: p.strength, reverse=True)
return paths
def _calculate_path_strength(self, path: list[dict]) -> float:
"""计算路径强度"""
if len(path) < 2:
return 0.0
# 路径越短越强
length_factor = 1.0 / len(path)
# 关系置信度
confidence_sum = 0
confidence_count = 0
for node in path[1:]: # 跳过第一个节点
rel_data = node.get("relation_data", {})
if "confidence" in rel_data:
confidence_sum += rel_data["confidence"]
confidence_count += 1
confidence_factor = (confidence_sum / confidence_count) if confidence_count > 0 else 0.5
return length_factor * confidence_factor
async def summarize_project(
self, project_context: dict, graph_data: dict, summary_type: str = "comprehensive"
) -> dict:
"""
项目智能总结
Args:
summary_type: comprehensive/executive/technical/risk
"""
type_prompts = {
"comprehensive": "全面总结项目的所有方面",
"executive": "高管摘要,关注关键决策和风险",
"technical": "技术总结,关注架构和技术栈",
"risk": "风险分析,关注潜在问题和依赖",
}
prompt = f"""请对以下项目进行{type_prompts.get(summary_type, "全面总结")}
## 项目信息
{json.dumps(project_context, ensure_ascii=False, indent=2)[:3000]}
## 知识图谱
实体数: {len(graph_data.get("entities", []))}
关系数: {len(graph_data.get("relations", []))}
请返回 JSON 格式:
{{
"overview": "项目概述",
"key_points": ["要点1", "要点2"],
"key_entities": ["关键实体1"],
"risks": ["风险1"],
"recommendations": ["建议1"],
"confidence": 0.85
}}"""
content = await self._call_llm(prompt, temperature=0.3)
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except (json.JSONDecodeError, KeyError):
pass
return {
"overview": content,
"key_points": [],
"key_entities": [],
"risks": [],
"recommendations": [],
"confidence": 0.5,
}
# Singleton instance
_reasoner = None
def get_knowledge_reasoner() -> KnowledgeReasoner:
global _reasoner
if _reasoner is None:
_reasoner = KnowledgeReasoner()
return _reasoner