Phase 5: 知识推理与问答增强

- 新增 knowledge_reasoner.py 推理引擎
- 支持因果/对比/时序/关联四种推理类型
- 智能项目总结 API (全面/高管/技术/风险)
- 实体关联路径发现功能
- 前端推理面板 UI 和交互
- 更新 API 端点和健康检查

Refs: Phase 5 开发任务
This commit is contained in:
OpenClaw Bot
2026-02-19 18:07:00 +08:00
parent cfdf37fc31
commit 9dd54b3a38
13 changed files with 1286 additions and 11 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,533 @@
#!/usr/bin/env python3
"""
InsightFlow Knowledge Reasoning - Phase 5
知识推理与问答增强模块
"""
import os
import json
import httpx
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from enum import Enum
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
KIMI_BASE_URL = os.getenv("KIMI_BASE_URL", "https://api.kimi.com/coding")
class ReasoningType(Enum):
"""推理类型"""
CAUSAL = "causal" # 因果推理
ASSOCIATIVE = "associative" # 关联推理
TEMPORAL = "temporal" # 时序推理
COMPARATIVE = "comparative" # 对比推理
SUMMARY = "summary" # 总结推理
@dataclass
class ReasoningResult:
"""推理结果"""
answer: str
reasoning_type: ReasoningType
confidence: float
evidence: List[Dict] # 支撑证据
related_entities: List[str] # 相关实体
gaps: List[str] # 知识缺口
@dataclass
class InferencePath:
"""推理路径"""
start_entity: str
end_entity: str
path: List[Dict] # 路径上的节点和关系
strength: float # 路径强度
class KnowledgeReasoner:
"""知识推理引擎"""
def __init__(self, api_key: str = None, base_url: str = None):
self.api_key = api_key or KIMI_API_KEY
self.base_url = base_url or KIMI_BASE_URL
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
async def _call_llm(self, prompt: str, temperature: float = 0.3) -> str:
"""调用 LLM"""
if not self.api_key:
raise ValueError("KIMI_API_KEY not set")
payload = {
"model": "k2p5",
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/v1/chat/completions",
headers=self.headers,
json=payload,
timeout=120.0
)
response.raise_for_status()
result = response.json()
return result["choices"][0]["message"]["content"]
async def enhanced_qa(
self,
query: str,
project_context: Dict,
graph_data: Dict,
reasoning_depth: str = "medium"
) -> ReasoningResult:
"""
增强问答 - 结合图谱推理的问答
Args:
query: 用户问题
project_context: 项目上下文
graph_data: 知识图谱数据
reasoning_depth: 推理深度 (shallow/medium/deep)
"""
# 1. 分析问题类型
analysis = await self._analyze_question(query)
# 2. 根据问题类型选择推理策略
if analysis["type"] == "causal":
return await self._causal_reasoning(query, project_context, graph_data)
elif analysis["type"] == "comparative":
return await self._comparative_reasoning(query, project_context, graph_data)
elif analysis["type"] == "temporal":
return await self._temporal_reasoning(query, project_context, graph_data)
else:
return await self._associative_reasoning(query, project_context, graph_data)
async def _analyze_question(self, query: str) -> Dict:
"""分析问题类型和意图"""
prompt = f"""分析以下问题的类型和意图:
问题:{query}
请返回 JSON 格式:
{{
"type": "causal|comparative|temporal|factual|opinion",
"entities": ["提到的实体"],
"intent": "问题意图描述",
"complexity": "simple|medium|complex"
}}
类型说明:
- causal: 因果类问题(为什么、导致、影响)
- comparative: 对比类问题(区别、比较、优劣)
- temporal: 时序类问题(什么时候、进度、变化)
- factual: 事实类问题(是什么、有哪些)
- opinion: 观点类问题(怎么看、态度、评价)"""
content = await self._call_llm(prompt, temperature=0.1)
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except:
pass
return {"type": "factual", "entities": [], "intent": "general", "complexity": "simple"}
async def _causal_reasoning(
self,
query: str,
project_context: Dict,
graph_data: Dict
) -> ReasoningResult:
"""因果推理 - 分析原因和影响"""
# 构建因果分析提示
entities_str = json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)
relations_str = json.dumps(graph_data.get("relations", []), ensure_ascii=False, indent=2)
prompt = f"""基于以下知识图谱进行因果推理分析:
## 问题
{query}
## 实体
{entities_str[:2000]}
## 关系
{relations_str[:2000]}
## 项目上下文
{json.dumps(project_context, ensure_ascii=False, indent=2)[:1500]}
请进行因果分析,返回 JSON 格式:
{{
"answer": "详细回答",
"reasoning_chain": ["推理步骤1", "推理步骤2"],
"root_causes": ["根本原因1", "根本原因2"],
"effects": ["影响1", "影响2"],
"confidence": 0.85,
"evidence": ["证据1", "证据2"],
"knowledge_gaps": ["缺失信息1"]
}}"""
content = await self._call_llm(prompt, temperature=0.3)
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.CAUSAL,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", [])
)
except:
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.CAUSAL,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=["无法完成因果推理"]
)
async def _comparative_reasoning(
self,
query: str,
project_context: Dict,
graph_data: Dict
) -> ReasoningResult:
"""对比推理 - 比较实体间的异同"""
prompt = f"""基于以下知识图谱进行对比分析:
## 问题
{query}
## 实体
{json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)[:2000]}
## 关系
{json.dumps(graph_data.get("relations", []), ensure_ascii=False, indent=2)[:1500]}
请进行对比分析,返回 JSON 格式:
{{
"answer": "详细对比分析",
"similarities": ["相似点1", "相似点2"],
"differences": ["差异点1", "差异点2"],
"comparison_table": {{"维度": ["实体A值", "实体B值"]}},
"confidence": 0.85,
"evidence": ["证据1"],
"knowledge_gaps": []
}}"""
content = await self._call_llm(prompt, temperature=0.3)
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.COMPARATIVE,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", [])
)
except:
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.COMPARATIVE,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=[]
)
async def _temporal_reasoning(
self,
query: str,
project_context: Dict,
graph_data: Dict
) -> ReasoningResult:
"""时序推理 - 分析时间线和演变"""
prompt = f"""基于以下知识图谱进行时序分析:
## 问题
{query}
## 项目时间线
{json.dumps(project_context.get("timeline", []), ensure_ascii=False, indent=2)[:2000]}
## 实体提及历史
{json.dumps(graph_data.get("entities", []), ensure_ascii=False, indent=2)[:1500]}
请进行时序分析,返回 JSON 格式:
{{
"answer": "时序分析结果",
"timeline": [{{"date": "时间", "event": "事件", "significance": "重要性"}}],
"trends": ["趋势1", "趋势2"],
"milestones": ["里程碑1"],
"confidence": 0.85,
"evidence": ["证据1"],
"knowledge_gaps": []
}}"""
content = await self._call_llm(prompt, temperature=0.3)
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.TEMPORAL,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", [])
)
except:
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.TEMPORAL,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=[]
)
async def _associative_reasoning(
self,
query: str,
project_context: Dict,
graph_data: Dict
) -> ReasoningResult:
"""关联推理 - 发现实体间的隐含关联"""
prompt = f"""基于以下知识图谱进行关联分析:
## 问题
{query}
## 实体
{json.dumps(graph_data.get("entities", [])[:20], ensure_ascii=False, indent=2)}
## 关系
{json.dumps(graph_data.get("relations", [])[:30], ensure_ascii=False, indent=2)}
请进行关联推理,发现隐含联系,返回 JSON 格式:
{{
"answer": "关联分析结果",
"direct_connections": ["直接关联1"],
"indirect_connections": ["间接关联1"],
"inferred_relations": [{{"source": "A", "target": "B", "relation": "可能关系", "confidence": 0.7}}],
"confidence": 0.85,
"evidence": ["证据1"],
"knowledge_gaps": []
}}"""
content = await self._call_llm(prompt, temperature=0.4)
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
return ReasoningResult(
answer=data.get("answer", ""),
reasoning_type=ReasoningType.ASSOCIATIVE,
confidence=data.get("confidence", 0.7),
evidence=[{"text": e} for e in data.get("evidence", [])],
related_entities=[],
gaps=data.get("knowledge_gaps", [])
)
except:
pass
return ReasoningResult(
answer=content,
reasoning_type=ReasoningType.ASSOCIATIVE,
confidence=0.5,
evidence=[],
related_entities=[],
gaps=[]
)
def find_inference_paths(
self,
start_entity: str,
end_entity: str,
graph_data: Dict,
max_depth: int = 3
) -> List[InferencePath]:
"""
发现两个实体之间的推理路径
使用 BFS 在关系图中搜索路径
"""
entities = {e["id"]: e for e in graph_data.get("entities", [])}
relations = graph_data.get("relations", [])
# 构建邻接表
adj = {}
for r in relations:
src = r.get("source_id") or r.get("source")
tgt = r.get("target_id") or r.get("target")
if src not in adj:
adj[src] = []
if tgt not in adj:
adj[tgt] = []
adj[src].append({"target": tgt, "relation": r.get("type", "related"), "data": r})
# 无向图也添加反向
adj[tgt].append({"target": src, "relation": r.get("type", "related"), "data": r, "reverse": True})
# BFS 搜索路径
from collections import deque
paths = []
queue = deque([(start_entity, [{"entity": start_entity, "relation": None}])])
visited = {start_entity}
while queue and len(paths) < 5:
current, path = queue.popleft()
if current == end_entity and len(path) > 1:
# 找到一条路径
paths.append(InferencePath(
start_entity=start_entity,
end_entity=end_entity,
path=path,
strength=self._calculate_path_strength(path)
))
continue
if len(path) >= max_depth:
continue
for neighbor in adj.get(current, []):
next_entity = neighbor["target"]
if next_entity not in [p["entity"] for p in path]: # 避免循环
new_path = path + [{
"entity": next_entity,
"relation": neighbor["relation"],
"relation_data": neighbor.get("data", {})
}]
queue.append((next_entity, new_path))
# 按强度排序
paths.sort(key=lambda p: p.strength, reverse=True)
return paths
def _calculate_path_strength(self, path: List[Dict]) -> float:
"""计算路径强度"""
if len(path) < 2:
return 0.0
# 路径越短越强
length_factor = 1.0 / len(path)
# 关系置信度
confidence_sum = 0
confidence_count = 0
for node in path[1:]: # 跳过第一个节点
rel_data = node.get("relation_data", {})
if "confidence" in rel_data:
confidence_sum += rel_data["confidence"]
confidence_count += 1
confidence_factor = (confidence_sum / confidence_count) if confidence_count > 0 else 0.5
return length_factor * confidence_factor
async def summarize_project(
self,
project_context: Dict,
graph_data: Dict,
summary_type: str = "comprehensive"
) -> Dict:
"""
项目智能总结
Args:
summary_type: comprehensive/executive/technical/risk
"""
type_prompts = {
"comprehensive": "全面总结项目的所有方面",
"executive": "高管摘要,关注关键决策和风险",
"technical": "技术总结,关注架构和技术栈",
"risk": "风险分析,关注潜在问题和依赖"
}
prompt = f"""请对以下项目进行{type_prompts.get(summary_type, "全面总结")}
## 项目信息
{json.dumps(project_context, ensure_ascii=False, indent=2)[:3000]}
## 知识图谱
实体数: {len(graph_data.get("entities", []))}
关系数: {len(graph_data.get("relations", []))}
请返回 JSON 格式:
{{
"overview": "项目概述",
"key_points": ["要点1", "要点2"],
"key_entities": ["关键实体1"],
"risks": ["风险1"],
"recommendations": ["建议1"],
"confidence": 0.85
}}"""
content = await self._call_llm(prompt, temperature=0.3)
import re
json_match = re.search(r'\{{.*?\}}', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except:
pass
return {
"overview": content,
"key_points": [],
"key_entities": [],
"risks": [],
"recommendations": [],
"confidence": 0.5
}
# Singleton instance
_reasoner = None
def get_knowledge_reasoner() -> KnowledgeReasoner:
global _reasoner
if _reasoner is None:
_reasoner = KnowledgeReasoner()
return _reasoner

View File

@@ -61,6 +61,12 @@ try:
except ImportError:
LLM_CLIENT_AVAILABLE = False
try:
from knowledge_reasoner import get_knowledge_reasoner, KnowledgeReasoner, ReasoningType
REASONER_AVAILABLE = True
except ImportError:
REASONER_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.3.0")
app.add_middleware(
@@ -983,14 +989,15 @@ async def get_entity_mentions(entity_id: str):
async def health_check():
return {
"status": "ok",
"version": "0.5.0",
"phase": "Phase 5 - Timeline View",
"version": "0.6.0",
"phase": "Phase 5 - Knowledge Reasoning",
"oss_available": OSS_AVAILABLE,
"tingwu_available": TINGWU_AVAILABLE,
"db_available": DB_AVAILABLE,
"doc_processor_available": DOC_PROCESSOR_AVAILABLE,
"aligner_available": ALIGNER_AVAILABLE,
"llm_client_available": LLM_CLIENT_AVAILABLE
"llm_client_available": LLM_CLIENT_AVAILABLE,
"reasoner_available": REASONER_AVAILABLE
}
@@ -1336,6 +1343,164 @@ async def get_entity_timeline(entity_id: str):
}
# ==================== Phase 5: 知识推理与问答增强 API ====================
class ReasoningQuery(BaseModel):
query: str
reasoning_depth: str = "medium" # shallow/medium/deep
stream: bool = False
@app.post("/api/v1/projects/{project_id}/reasoning/query")
async def reasoning_query(project_id: str, query: ReasoningQuery):
"""
增强问答 - 基于知识推理的智能问答
支持多种推理类型:
- 因果推理:分析原因和影响
- 对比推理:比较实体间的异同
- 时序推理:分析时间线和演变
- 关联推理:发现隐含关联
"""
if not DB_AVAILABLE or not REASONER_AVAILABLE:
raise HTTPException(status_code=500, detail="Knowledge reasoner not available")
db = get_db_manager()
reasoner = get_knowledge_reasoner()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目上下文
project_context = db.get_project_summary(project_id)
# 获取知识图谱数据
entities = db.list_project_entities(project_id)
relations = db.list_project_relations(project_id)
graph_data = {
"entities": [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities],
"relations": relations
}
# 执行增强问答
result = await reasoner.enhanced_qa(
query=query.query,
project_context=project_context,
graph_data=graph_data,
reasoning_depth=query.reasoning_depth
)
return {
"answer": result.answer,
"reasoning_type": result.reasoning_type.value,
"confidence": result.confidence,
"evidence": result.evidence,
"knowledge_gaps": result.gaps,
"project_id": project_id
}
@app.post("/api/v1/projects/{project_id}/reasoning/inference-path")
async def find_inference_path(
project_id: str,
start_entity: str,
end_entity: str
):
"""
发现两个实体之间的推理路径
在知识图谱中搜索从 start_entity 到 end_entity 的路径
"""
if not DB_AVAILABLE or not REASONER_AVAILABLE:
raise HTTPException(status_code=500, detail="Knowledge reasoner not available")
db = get_db_manager()
reasoner = get_knowledge_reasoner()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取知识图谱数据
entities = db.list_project_entities(project_id)
relations = db.list_project_relations(project_id)
graph_data = {
"entities": [{"id": e.id, "name": e.name, "type": e.type} for e in entities],
"relations": relations
}
# 查找推理路径
paths = reasoner.find_inference_paths(start_entity, end_entity, graph_data)
return {
"start_entity": start_entity,
"end_entity": end_entity,
"paths": [
{
"path": path.path,
"strength": path.strength,
"path_description": " -> ".join([p["entity"] for p in path.path])
}
for path in paths[:5] # 最多返回5条路径
],
"total_paths": len(paths)
}
class SummaryRequest(BaseModel):
summary_type: str = "comprehensive" # comprehensive/executive/technical/risk
@app.post("/api/v1/projects/{project_id}/reasoning/summary")
async def project_summary(project_id: str, req: SummaryRequest):
"""
项目智能总结
根据类型生成不同侧重点的总结:
- comprehensive: 全面总结
- executive: 高管摘要
- technical: 技术总结
- risk: 风险分析
"""
if not DB_AVAILABLE or not REASONER_AVAILABLE:
raise HTTPException(status_code=500, detail="Knowledge reasoner not available")
db = get_db_manager()
reasoner = get_knowledge_reasoner()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目上下文
project_context = db.get_project_summary(project_id)
# 获取知识图谱数据
entities = db.list_project_entities(project_id)
relations = db.list_project_relations(project_id)
graph_data = {
"entities": [{"id": e.id, "name": e.name, "type": e.type} for e in entities],
"relations": relations
}
# 生成总结
summary = await reasoner.summarize_project(
project_context=project_context,
graph_data=graph_data,
summary_type=req.summary_type
)
return {
"project_id": project_id,
"summary_type": req.summary_type,
**summary
}
# Serve frontend - MUST be last to not override API routes
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")