491 lines
16 KiB
Python
491 lines
16 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InsightFlow Knowledge Reasoning - Phase 5
|
||
知识推理与问答增强模块
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
|
||
import httpx
|
||
|
||
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
|
||
KIMI_BASE_URL = os.getenv("KIMI_BASE_URL", "https://api.kimi.com/coding")
|
||
|
||
class ReasoningType(Enum):
|
||
"""推理类型"""
|
||
|
||
CAUSAL = "causal" # 因果推理
|
||
ASSOCIATIVE = "associative" # 关联推理
|
||
TEMPORAL = "temporal" # 时序推理
|
||
COMPARATIVE = "comparative" # 对比推理
|
||
SUMMARY = "summary" # 总结推理
|
||
|
||
@dataclass
|
||
class ReasoningResult:
|
||
"""推理结果"""
|
||
|
||
answer: str
|
||
reasoning_type: ReasoningType
|
||
confidence: float
|
||
evidence: list[dict] # 支撑证据
|
||
related_entities: list[str] # 相关实体
|
||
gaps: list[str] # 知识缺口
|
||
|
||
@dataclass
|
||
class InferencePath:
|
||
"""推理路径"""
|
||
|
||
start_entity: str
|
||
end_entity: str
|
||
path: list[dict] # 路径上的节点和关系
|
||
strength: float # 路径强度
|
||
|
||
class KnowledgeReasoner:
|
||
"""知识推理引擎"""
|
||
|
||
def __init__(self, api_key: str = None, base_url: str = None):
|
||
self.api_key = api_key or KIMI_API_KEY
|
||
self.base_url = base_url or KIMI_BASE_URL
|
||
self.headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
|
||
|
||
async def _call_llm(self, prompt: str, temperature: float = 0.3) -> str:
|
||
"""调用 LLM"""
|
||
if not self.api_key:
|
||
raise ValueError("KIMI_API_KEY not set")
|
||
|
||
payload = {"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": temperature}
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
response = await client.post(
|
||
f"{self.base_url}/v1/chat/completions", headers=self.headers, json=payload, timeout=120.0
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
return result["choices"][0]["message"]["content"]
|
||
|
||
async def enhanced_qa(
|
||
self, query: str, project_context: dict, graph_data: dict, reasoning_depth: str = "medium"
|
||
) -> ReasoningResult:
|
||
"""
|
||
增强问答 - 结合图谱推理的问答
|
||
|
||
Args:
|
||
query: 用户问题
|
||
project_context: 项目上下文
|
||
graph_data: 知识图谱数据
|
||
reasoning_depth: 推理深度 (shallow/medium/deep)
|
||
"""
|
||
# 1. 分析问题类型
|
||
analysis = await self._analyze_question(query)
|
||
|
||
# 2. 根据问题类型选择推理策略
|
||
if analysis["type"] == "causal":
|
||
return await self._causal_reasoning(query, project_context, graph_data)
|
||
elif analysis["type"] == "comparative":
|
||
return await self._comparative_reasoning(query, project_context, graph_data)
|
||
elif analysis["type"] == "temporal":
|
||
return await self._temporal_reasoning(query, project_context, graph_data)
|
||
else:
|
||
return await self._associative_reasoning(query, project_context, graph_data)
|
||
|
||
async def _analyze_question(self, query: str) -> dict:
|
||
"""分析问题类型和意图"""
|
||
prompt = f"""分析以下问题的类型和意图:
|
||
|
||
问题:{query}
|
||
|
||
请返回 JSON 格式:
|
||
{{
|
||
"type": "causal|comparative|temporal|factual|opinion",
|
||
"entities": ["提到的实体"],
|
||
"intent": "问题意图描述",
|
||
"complexity": "simple|medium|complex"
|
||
}}
|
||
|
||
类型说明:
|
||
- causal: 因果类问题(为什么、导致、影响)
|
||
- comparative: 对比类问题(区别、比较、优劣)
|
||
- temporal: 时序类问题(什么时候、进度、变化)
|
||
- factual: 事实类问题(是什么、有哪些)
|
||
- opinion: 观点类问题(怎么看、态度、评价)"""
|
||
|
||
content = await self._call_llm(prompt, temperature=0.1)
|
||
|
||
import re
|
||
|
||
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
|
||
if json_match:
|
||
try:
|
||
return json.loads(json_match.group())
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass
|
||
|
||
return {"type": "factual", "entities": [], "intent": "general", "complexity": "simple"}
|
||
|
||
async def _causal_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
|
||
"""因果推理 - 分析原因和影响"""
|
||
|
||
# 构建因果分析提示
|
||
entities_str = json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)
|
||
relations_str = json.dumps(graph_data.get("relations", []), ensure_ascii=False, indent=2)
|
||
|
||
prompt = f"""基于以下知识图谱进行因果推理分析:
|
||
|
||
## 问题
|
||
{query}
|
||
|
||
## 实体
|
||
{entities_str[:2000]}
|
||
|
||
## 关系
|
||
{relations_str[:2000]}
|
||
|
||
## 项目上下文
|
||
{json.dumps(project_context, ensure_ascii=False, indent=2)[:1500]}
|
||
|
||
请进行因果分析,返回 JSON 格式:
|
||
{{
|
||
"answer": "详细回答",
|
||
"reasoning_chain": ["推理步骤1", "推理步骤2"],
|
||
"root_causes": ["根本原因1", "根本原因2"],
|
||
"effects": ["影响1", "影响2"],
|
||
"confidence": 0.85,
|
||
"evidence": ["证据1", "证据2"],
|
||
"knowledge_gaps": ["缺失信息1"]
|
||
}}"""
|
||
|
||
content = await self._call_llm(prompt, temperature=0.3)
|
||
|
||
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
|
||
|
||
if json_match:
|
||
try:
|
||
data = json.loads(json_match.group())
|
||
return ReasoningResult(
|
||
answer=data.get("answer", ""),
|
||
reasoning_type=ReasoningType.CAUSAL,
|
||
confidence=data.get("confidence", 0.7),
|
||
evidence=[{"text": e} for e in data.get("evidence", [])],
|
||
related_entities=[],
|
||
gaps=data.get("knowledge_gaps", []),
|
||
)
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass
|
||
|
||
return ReasoningResult(
|
||
answer=content,
|
||
reasoning_type=ReasoningType.CAUSAL,
|
||
confidence=0.5,
|
||
evidence=[],
|
||
related_entities=[],
|
||
gaps=["无法完成因果推理"],
|
||
)
|
||
|
||
async def _comparative_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
|
||
"""对比推理 - 比较实体间的异同"""
|
||
|
||
prompt = f"""基于以下知识图谱进行对比分析:
|
||
|
||
## 问题
|
||
{query}
|
||
|
||
## 实体
|
||
{json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)[:2000]}
|
||
|
||
## 关系
|
||
{json.dumps(graph_data.get("relations", []), ensure_ascii=False, indent=2)[:1500]}
|
||
|
||
请进行对比分析,返回 JSON 格式:
|
||
{{
|
||
"answer": "详细对比分析",
|
||
"similarities": ["相似点1", "相似点2"],
|
||
"differences": ["差异点1", "差异点2"],
|
||
"comparison_table": {{"维度": ["实体A值", "实体B值"]}},
|
||
"confidence": 0.85,
|
||
"evidence": ["证据1"],
|
||
"knowledge_gaps": []
|
||
}}"""
|
||
|
||
content = await self._call_llm(prompt, temperature=0.3)
|
||
|
||
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
|
||
|
||
if json_match:
|
||
try:
|
||
data = json.loads(json_match.group())
|
||
return ReasoningResult(
|
||
answer=data.get("answer", ""),
|
||
reasoning_type=ReasoningType.COMPARATIVE,
|
||
confidence=data.get("confidence", 0.7),
|
||
evidence=[{"text": e} for e in data.get("evidence", [])],
|
||
related_entities=[],
|
||
gaps=data.get("knowledge_gaps", []),
|
||
)
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass
|
||
|
||
return ReasoningResult(
|
||
answer=content,
|
||
reasoning_type=ReasoningType.COMPARATIVE,
|
||
confidence=0.5,
|
||
evidence=[],
|
||
related_entities=[],
|
||
gaps=[],
|
||
)
|
||
|
||
async def _temporal_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
|
||
"""时序推理 - 分析时间线和演变"""
|
||
|
||
prompt = f"""基于以下知识图谱进行时序分析:
|
||
|
||
## 问题
|
||
{query}
|
||
|
||
## 项目时间线
|
||
{json.dumps(project_context.get("timeline", []), ensure_ascii=False, indent=2)[:2000]}
|
||
|
||
## 实体提及历史
|
||
{json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)[:1500]}
|
||
|
||
请进行时序分析,返回 JSON 格式:
|
||
{{
|
||
"answer": "时序分析结果",
|
||
"timeline": [{{"date": "时间", "event": "事件", "significance": "重要性"}}],
|
||
"trends": ["趋势1", "趋势2"],
|
||
"milestones": ["里程碑1"],
|
||
"confidence": 0.85,
|
||
"evidence": ["证据1"],
|
||
"knowledge_gaps": []
|
||
}}"""
|
||
|
||
content = await self._call_llm(prompt, temperature=0.3)
|
||
|
||
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
|
||
|
||
if json_match:
|
||
try:
|
||
data = json.loads(json_match.group())
|
||
return ReasoningResult(
|
||
answer=data.get("answer", ""),
|
||
reasoning_type=ReasoningType.TEMPORAL,
|
||
confidence=data.get("confidence", 0.7),
|
||
evidence=[{"text": e} for e in data.get("evidence", [])],
|
||
related_entities=[],
|
||
gaps=data.get("knowledge_gaps", []),
|
||
)
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass
|
||
|
||
return ReasoningResult(
|
||
answer=content,
|
||
reasoning_type=ReasoningType.TEMPORAL,
|
||
confidence=0.5,
|
||
evidence=[],
|
||
related_entities=[],
|
||
gaps=[],
|
||
)
|
||
|
||
async def _associative_reasoning(self, query: str, project_context: dict, graph_data: dict) -> ReasoningResult:
|
||
"""关联推理 - 发现实体间的隐含关联"""
|
||
|
||
prompt = f"""基于以下知识图谱进行关联分析:
|
||
|
||
## 问题
|
||
{query}
|
||
|
||
## 实体
|
||
{json.dumps(graph_data.get("entities", [])[:20], ensure_ascii=False, indent=2)}
|
||
|
||
## 关系
|
||
{json.dumps(graph_data.get("relations", [])[:30], ensure_ascii=False, indent=2)}
|
||
|
||
请进行关联推理,发现隐含联系,返回 JSON 格式:
|
||
{{
|
||
"answer": "关联分析结果",
|
||
"direct_connections": ["直接关联1"],
|
||
"indirect_connections": ["间接关联1"],
|
||
"inferred_relations": [{{"source": "A", "target": "B", "relation": "可能关系", "confidence": 0.7}}],
|
||
"confidence": 0.85,
|
||
"evidence": ["证据1"],
|
||
"knowledge_gaps": []
|
||
}}"""
|
||
|
||
content = await self._call_llm(prompt, temperature=0.4)
|
||
|
||
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
|
||
|
||
if json_match:
|
||
try:
|
||
data = json.loads(json_match.group())
|
||
return ReasoningResult(
|
||
answer=data.get("answer", ""),
|
||
reasoning_type=ReasoningType.ASSOCIATIVE,
|
||
confidence=data.get("confidence", 0.7),
|
||
evidence=[{"text": e} for e in data.get("evidence", [])],
|
||
related_entities=[],
|
||
gaps=data.get("knowledge_gaps", []),
|
||
)
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass
|
||
|
||
return ReasoningResult(
|
||
answer=content,
|
||
reasoning_type=ReasoningType.ASSOCIATIVE,
|
||
confidence=0.5,
|
||
evidence=[],
|
||
related_entities=[],
|
||
gaps=[],
|
||
)
|
||
|
||
def find_inference_paths(
|
||
self, start_entity: str, end_entity: str, graph_data: dict, max_depth: int = 3
|
||
) -> list[InferencePath]:
|
||
"""
|
||
发现两个实体之间的推理路径
|
||
|
||
使用 BFS 在关系图中搜索路径
|
||
"""
|
||
relations = graph_data.get("relations", [])
|
||
|
||
# 构建邻接表
|
||
adj = {}
|
||
for r in relations:
|
||
src = r.get("source_id") or r.get("source")
|
||
tgt = r.get("target_id") or r.get("target")
|
||
if src not in adj:
|
||
adj[src] = []
|
||
if tgt not in adj:
|
||
adj[tgt] = []
|
||
adj[src].append({"target": tgt, "relation": r.get("type", "related"), "data": r})
|
||
# 无向图也添加反向
|
||
adj[tgt].append({"target": src, "relation": r.get("type", "related"), "data": r, "reverse": True})
|
||
|
||
# BFS 搜索路径
|
||
from collections import deque
|
||
|
||
paths = []
|
||
queue = deque([(start_entity, [{"entity": start_entity, "relation": None}])])
|
||
{start_entity}
|
||
|
||
while queue and len(paths) < 5:
|
||
current, path = queue.popleft()
|
||
|
||
if current == end_entity and len(path) > 1:
|
||
# 找到一条路径
|
||
paths.append(
|
||
InferencePath(
|
||
start_entity=start_entity,
|
||
end_entity=end_entity,
|
||
path=path,
|
||
strength=self._calculate_path_strength(path),
|
||
)
|
||
)
|
||
continue
|
||
|
||
if len(path) >= max_depth:
|
||
continue
|
||
|
||
for neighbor in adj.get(current, []):
|
||
next_entity = neighbor["target"]
|
||
if next_entity not in [p["entity"] for p in path]: # 避免循环
|
||
new_path = path + [
|
||
{
|
||
"entity": next_entity,
|
||
"relation": neighbor["relation"],
|
||
"relation_data": neighbor.get("data", {}),
|
||
}
|
||
]
|
||
queue.append((next_entity, new_path))
|
||
|
||
# 按强度排序
|
||
paths.sort(key=lambda p: p.strength, reverse=True)
|
||
return paths
|
||
|
||
def _calculate_path_strength(self, path: list[dict]) -> float:
|
||
"""计算路径强度"""
|
||
if len(path) < 2:
|
||
return 0.0
|
||
|
||
# 路径越短越强
|
||
length_factor = 1.0 / len(path)
|
||
|
||
# 关系置信度
|
||
confidence_sum = 0
|
||
confidence_count = 0
|
||
for node in path[1:]: # 跳过第一个节点
|
||
rel_data = node.get("relation_data", {})
|
||
if "confidence" in rel_data:
|
||
confidence_sum += rel_data["confidence"]
|
||
confidence_count += 1
|
||
|
||
confidence_factor = (confidence_sum / confidence_count) if confidence_count > 0 else 0.5
|
||
|
||
return length_factor * confidence_factor
|
||
|
||
async def summarize_project(
|
||
self, project_context: dict, graph_data: dict, summary_type: str = "comprehensive"
|
||
) -> dict:
|
||
"""
|
||
项目智能总结
|
||
|
||
Args:
|
||
summary_type: comprehensive/executive/technical/risk
|
||
"""
|
||
type_prompts = {
|
||
"comprehensive": "全面总结项目的所有方面",
|
||
"executive": "高管摘要,关注关键决策和风险",
|
||
"technical": "技术总结,关注架构和技术栈",
|
||
"risk": "风险分析,关注潜在问题和依赖",
|
||
}
|
||
|
||
prompt = f"""请对以下项目进行{type_prompts.get(summary_type, "全面总结")}:
|
||
|
||
## 项目信息
|
||
{json.dumps(project_context, ensure_ascii=False, indent=2)[:3000]}
|
||
|
||
## 知识图谱
|
||
实体数: {len(graph_data.get("entities", []))}
|
||
关系数: {len(graph_data.get("relations", []))}
|
||
|
||
请返回 JSON 格式:
|
||
{{
|
||
"overview": "项目概述",
|
||
"key_points": ["要点1", "要点2"],
|
||
"key_entities": ["关键实体1"],
|
||
"risks": ["风险1"],
|
||
"recommendations": ["建议1"],
|
||
"confidence": 0.85
|
||
}}"""
|
||
|
||
content = await self._call_llm(prompt, temperature=0.3)
|
||
|
||
json_match = re.search(r"\{{.*?\}}", content, re.DOTALL)
|
||
|
||
if json_match:
|
||
try:
|
||
return json.loads(json_match.group())
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass
|
||
|
||
return {
|
||
"overview": content,
|
||
"key_points": [],
|
||
"key_entities": [],
|
||
"risks": [],
|
||
"recommendations": [],
|
||
"confidence": 0.5,
|
||
}
|
||
|
||
# Singleton instance
|
||
_reasoner = None
|
||
|
||
def get_knowledge_reasoner() -> KnowledgeReasoner:
|
||
global _reasoner
|
||
if _reasoner is None:
|
||
_reasoner = KnowledgeReasoner()
|
||
return _reasoner
|