feat: Phase 3 knowledge growth - multi-file fusion + entity alignment

This commit is contained in:
OpenClaw Bot
2026-02-17 13:47:06 +08:00
parent b28af9a611
commit 460bc5b052
3 changed files with 447 additions and 85 deletions

231
backend/db_manager.py Normal file
View File

@@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
InsightFlow Database Manager
处理项目、实体、关系的持久化
"""
import os
import json
import sqlite3
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db")
@dataclass
class Project:
id: str
name: str
description: str = ""
created_at: str = ""
updated_at: str = ""
@dataclass
class Entity:
id: str
project_id: str
name: str
type: str
definition: str = ""
canonical_name: str = ""
aliases: List[str] = None
def __post_init__(self):
if self.aliases is None:
self.aliases = []
@dataclass
class EntityMention:
id: str
entity_id: str
transcript_id: str
start_pos: int
end_pos: int
text_snippet: str
confidence: float = 1.0
class DatabaseManager:
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.init_db()
def get_conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(self):
"""初始化数据库表"""
with open(os.path.join(os.path.dirname(__file__), 'schema.sql'), 'r') as f:
schema = f.read()
conn = self.get_conn()
conn.executescript(schema)
conn.commit()
conn.close()
# Project operations
def create_project(self, project_id: str, name: str, description: str = "") -> Project:
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"INSERT INTO projects (id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
(project_id, name, description, now, now)
)
conn.commit()
conn.close()
return Project(id=project_id, name=name, description=description, created_at=now, updated_at=now)
def get_project(self, project_id: str) -> Optional[Project]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
conn.close()
if row:
return Project(**dict(row))
return None
def list_projects(self) -> List[Project]:
conn = self.get_conn()
rows = conn.execute("SELECT * FROM projects ORDER BY updated_at DESC").fetchall()
conn.close()
return [Project(**dict(r)) for r in rows]
# Entity operations
def create_entity(self, entity: Entity) -> Entity:
conn = self.get_conn()
conn.execute(
"""INSERT INTO entities (id, project_id, name, canonical_name, type, definition, aliases, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(entity.id, entity.project_id, entity.name, entity.canonical_name, entity.type,
entity.definition, json.dumps(entity.aliases), datetime.now().isoformat(), datetime.now().isoformat())
)
conn.commit()
conn.close()
return entity
def get_entity_by_name(self, project_id: str, name: str) -> Optional[Entity]:
"""通过名称查找实体(用于对齐)"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM entities WHERE project_id = ? AND (name = ? OR canonical_name = ? OR aliases LIKE ?)",
(project_id, name, name, f'%"{name}"%')
).fetchone()
conn.close()
if row:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
return Entity(**data)
return None
def find_similar_entities(self, project_id: str, name: str, threshold: float = 0.8) -> List[Entity]:
"""查找相似实体(简单实现,生产可用 embedding"""
# TODO: 使用 embedding 或模糊匹配
# 现在简单返回包含相同关键词的实体
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entities WHERE project_id = ? AND name LIKE ?",
(project_id, f"%{name}%")
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
def merge_entities(self, target_id: str, source_id: str) -> Entity:
"""合并两个实体(实体对齐)"""
conn = self.get_conn()
# 获取两个实体
target = conn.execute("SELECT * FROM entities WHERE id = ?", (target_id,)).fetchone()
source = conn.execute("SELECT * FROM entities WHERE id = ?", (source_id,)).fetchone()
if not target or not source:
conn.close()
raise ValueError("Entity not found")
# 合并别名
target_aliases = set(json.loads(target['aliases']) if target['aliases'] else [])
target_aliases.add(source['name'])
target_aliases.update(json.loads(source['aliases']) if source['aliases'] else [])
# 更新目标实体
conn.execute(
"UPDATE entities SET aliases = ?, updated_at = ? WHERE id = ?",
(json.dumps(list(target_aliases)), datetime.now().isoformat(), target_id)
)
# 更新提及记录
conn.execute(
"UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?",
(target_id, source_id)
)
# 删除源实体
conn.execute("DELETE FROM entities WHERE id = ?", (source_id,))
conn.commit()
conn.close()
return self.get_entity(target_id)
def get_entity(self, entity_id: str) -> Optional[Entity]:
conn = self.get_conn()
row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone()
conn.close()
if row:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
return Entity(**data)
return None
def list_project_entities(self, project_id: str) -> List[Entity]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entities WHERE project_id = ? ORDER BY updated_at DESC",
(project_id,)
).fetchall()
conn.close()
entities = []
for row in rows:
data = dict(row)
data['aliases'] = json.loads(data['aliases']) if data['aliases'] else []
entities.append(Entity(**data))
return entities
# Mention operations
def add_mention(self, mention: EntityMention) -> EntityMention:
conn = self.get_conn()
conn.execute(
"""INSERT INTO entity_mentions (id, entity_id, transcript_id, start_pos, end_pos, text_snippet, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(mention.id, mention.entity_id, mention.transcript_id, mention.start_pos,
mention.end_pos, mention.text_snippet, mention.confidence)
)
conn.commit()
conn.close()
return mention
def get_entity_mentions(self, entity_id: str) -> List[EntityMention]:
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM entity_mentions WHERE entity_id = ? ORDER BY transcript_id, start_pos",
(entity_id,)
).fetchall()
conn.close()
return [EntityMention(**dict(r)) for r in rows]
# Singleton instance
_db_manager = None
def get_db_manager() -> DatabaseManager:
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager

View File

@@ -1,15 +1,14 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
InsightFlow Backend - Phase 1 MVP (Complete) InsightFlow Backend - Phase 3 (Complete)
ASR: 阿里云听悟 (TingWu) + OSS Knowledge Growth: Multi-file fusion + Entity Alignment
Speaker Diarization: 听悟内置
LLM: Kimi API for entity extraction
""" """
import os import os
import json import json
import httpx import httpx
from fastapi import FastAPI, File, UploadFile, HTTPException import uuid
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel from pydantic import BaseModel
@@ -29,7 +28,13 @@ try:
except ImportError: except ImportError:
TINGWU_AVAILABLE = False TINGWU_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.1.0") try:
from db_manager import get_db_manager, Project, Entity, EntityMention
DB_AVAILABLE = True
except ImportError:
DB_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.3.0")
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
@@ -40,13 +45,12 @@ app.add_middleware(
) )
# Models # Models
class Entity(BaseModel): class EntityModel(BaseModel):
id: str id: str
name: str name: str
type: str type: str
start: int definition: Optional[str] = ""
end: int aliases: List[str] = []
definition: Optional[str] = None
class TranscriptSegment(BaseModel): class TranscriptSegment(BaseModel):
start: float start: float
@@ -56,12 +60,15 @@ class TranscriptSegment(BaseModel):
class AnalysisResult(BaseModel): class AnalysisResult(BaseModel):
transcript_id: str transcript_id: str
project_id: str
segments: List[TranscriptSegment] segments: List[TranscriptSegment]
entities: List[Entity] entities: List[EntityModel]
full_text: str full_text: str
created_at: str created_at: str
storage = {} class ProjectCreate(BaseModel):
name: str
description: str = ""
# API Keys # API Keys
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "") KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
@@ -69,73 +76,50 @@ KIMI_BASE_URL = "https://api.kimi.com/coding"
def transcribe_audio(audio_data: bytes, filename: str) -> dict: def transcribe_audio(audio_data: bytes, filename: str) -> dict:
"""转录音频OSS上传 + 听悟转录""" """转录音频OSS上传 + 听悟转录"""
if not OSS_AVAILABLE or not TINGWU_AVAILABLE:
# 1. 上传 OSS
if not OSS_AVAILABLE:
print("OSS not available, using mock")
return mock_transcribe() return mock_transcribe()
try: try:
uploader = get_oss_uploader() uploader = get_oss_uploader()
audio_url, object_name = uploader.upload_audio(audio_data, filename) audio_url, object_name = uploader.upload_audio(audio_data, filename)
print(f"Uploaded to OSS: {object_name}")
except Exception as e:
print(f"OSS upload failed: {e}")
return mock_transcribe()
# 2. 听悟转录
if not TINGWU_AVAILABLE:
print("Tingwu not available, using mock")
return mock_transcribe()
try:
client = TingwuClient() client = TingwuClient()
result = client.transcribe(audio_url) result = client.transcribe(audio_url)
print(f"Transcription complete: {len(result['segments'])} segments")
return result return result
except Exception as e: except Exception as e:
print(f"Tingwu failed: {e}") print(f"Transcription failed: {e}")
return mock_transcribe() return mock_transcribe()
def mock_transcribe() -> dict: def mock_transcribe() -> dict:
"""Mock 转录结果用于测试""" """Mock 转录结果"""
return { return {
"full_text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语", "full_text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成",
"segments": [ "segments": [
{"start": 0.0, "end": 5.0, "text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语", "speaker": "Speaker A"} {"start": 0.0, "end": 5.0, "text": "我们今天讨论 Project Alpha 的进度K8s 集群已经部署完成", "speaker": "Speaker A"}
] ]
} }
def extract_entities_with_llm(text: str) -> List[Entity]: def extract_entities_with_llm(text: str) -> List[dict]:
"""使用 Kimi API 提取实体""" """使用 Kimi API 提取实体"""
if not KIMI_API_KEY or not text: if not KIMI_API_KEY or not text:
return [] return []
prompt = f"""从以下会议文本中提取关键实体(专有名词、项目名、技术术语、人名等),并以 JSON 格式返回: prompt = f"""从以下会议文本中提取关键实体以 JSON 格式返回:
文本:{text[:3000]} 文本:{text[:3000]}
要求: 要求:
1. 每个实体包含name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), start(起始字符位置), end(结束字符位置), definition(一句话定义) 1. 每个实体包含name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), definition(一句话定义)
2. 只返回 JSON 数组,不要其他内容 2. 只返回 JSON 数组
3. 确保 start/end 是字符在文本中的位置
示例输出 示例:[{{"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}}]
[
{{"name": "Project Alpha", "type": "PROJECT", "start": 23, "end": 35, "definition": "Q3季度的核心项目"}},
{{"name": "K8s", "type": "TECH", "start": 37, "end": 40, "definition": "Kubernetes的缩写"}}
]
""" """
try: try:
response = httpx.post( response = httpx.post(
f"{KIMI_BASE_URL}/v1/chat/completions", f"{KIMI_BASE_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"}, headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"},
json={ json={"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1},
"model": "k2p5",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1
},
timeout=60.0 timeout=60.0
) )
response.raise_for_status() response.raise_for_status()
@@ -145,62 +129,136 @@ def extract_entities_with_llm(text: str) -> List[Entity]:
import re import re
json_match = re.search(r'\[.*?\]', content, re.DOTALL) json_match = re.search(r'\[.*?\]', content, re.DOTALL)
if json_match: if json_match:
entities_data = json.loads(json_match.group()) return json.loads(json_match.group())
entities = []
for i, e in enumerate(entities_data):
entities.append(Entity(
id=f"ent_{i+1}",
name=e["name"],
type=e.get("type", "OTHER"),
start=e["start"],
end=e["end"],
definition=e.get("definition", "")
))
return entities
except Exception as e: except Exception as e:
print(f"LLM extraction failed: {e}") print(f"LLM extraction failed: {e}")
return [] return []
@app.post("/api/v1/upload", response_model=AnalysisResult) def align_entity(project_id: str, name: str, db) -> Optional[Entity]:
async def upload_audio(file: UploadFile = File(...)): """实体对齐:查找或创建实体"""
"""上传音频并分析""" # 1. 尝试精确匹配
existing = db.get_entity_by_name(project_id, name)
if existing:
return existing
# 2. 尝试相似匹配(简单版)
similar = db.find_similar_entities(project_id, name)
if similar:
# 返回最相似的(第一个)
return similar[0]
return None
# API Endpoints
@app.post("/api/v1/projects", response_model=dict)
async def create_project(project: ProjectCreate):
"""创建新项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project_id = str(uuid.uuid4())[:8]
p = db.create_project(project_id, project.name, project.description)
return {"id": p.id, "name": p.name, "description": p.description}
@app.get("/api/v1/projects")
async def list_projects():
"""列出所有项目"""
if not DB_AVAILABLE:
return []
db = get_db_manager()
projects = db.list_projects()
return [{"id": p.id, "name": p.name, "description": p.description} for p in projects]
@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult)
async def upload_audio(project_id: str, file: UploadFile = File(...)):
"""上传音频到指定项目"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
content = await file.read() content = await file.read()
# 转录 # 转录
print(f"Processing: {file.filename} ({len(content)} bytes)") print(f"Processing: {file.filename}")
tw_result = transcribe_audio(content, file.filename) tw_result = transcribe_audio(content, file.filename)
# 构建片段 # 提取实体
segments = [
TranscriptSegment(**seg) for seg in tw_result["segments"]
] or [TranscriptSegment(start=0, end=0, text=tw_result["full_text"], speaker="Speaker A")]
# LLM 实体提取
print("Extracting entities...") print("Extracting entities...")
entities = extract_entities_with_llm(tw_result["full_text"]) raw_entities = extract_entities_with_llm(tw_result["full_text"])
analysis = AnalysisResult( # 实体对齐
transcript_id=os.urandom(8).hex(), aligned_entities = []
for raw_ent in raw_entities:
existing = align_entity(project_id, raw_ent["name"], db)
if existing:
# 复用已有实体
ent_model = EntityModel(
id=existing.id,
name=existing.name,
type=existing.type,
definition=existing.definition,
aliases=existing.aliases
)
else:
# 创建新实体
new_ent = db.create_entity(Entity(
id=str(uuid.uuid4())[:8],
project_id=project_id,
name=raw_ent["name"],
type=raw_ent.get("type", "OTHER"),
definition=raw_ent.get("definition", "")
))
ent_model = EntityModel(
id=new_ent.id,
name=new_ent.name,
type=new_ent.type,
definition=new_ent.definition
)
aligned_entities.append(ent_model)
# 构建片段
segments = [TranscriptSegment(**seg) for seg in tw_result["segments"]]
transcript_id = str(uuid.uuid4())[:8]
return AnalysisResult(
transcript_id=transcript_id,
project_id=project_id,
segments=segments, segments=segments,
entities=entities, entities=aligned_entities,
full_text=tw_result["full_text"], full_text=tw_result["full_text"],
created_at=datetime.now().isoformat() created_at=datetime.now().isoformat()
) )
storage[analysis.transcript_id] = analysis @app.get("/api/v1/projects/{project_id}/entities")
print(f"Complete: {analysis.transcript_id}, {len(entities)} entities") async def get_project_entities(project_id: str):
return analysis """获取项目的全局实体列表"""
if not DB_AVAILABLE:
return []
@app.get("/api/v1/transcripts/{transcript_id}", response_model=AnalysisResult) db = get_db_manager()
async def get_transcript(transcript_id: str): entities = db.list_project_entities(project_id)
if transcript_id not in storage: return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities]
raise HTTPException(status_code=404, detail="Transcript not found")
return storage[transcript_id]
@app.get("/api/v1/transcripts") @app.post("/api/v1/entities/{entity_id}/merge")
async def list_transcripts(): async def merge_entities(entity_id: str, target_entity_id: str):
return list(storage.values()) """合并两个实体"""
if not DB_AVAILABLE:
raise HTTPException(status_code=500, detail="Database not available")
db = get_db_manager()
result = db.merge_entities(target_entity_id, entity_id)
return {"success": True, "merged_entity": {"id": result.id, "name": result.name}}
# Serve frontend # Serve frontend
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend") app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")

73
backend/schema.sql Normal file
View File

@@ -0,0 +1,73 @@
-- InsightFlow Phase 3 - Database Schema
-- 支持知识生长与多文件融合
-- 项目表
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 文件/转录表
CREATE TABLE IF NOT EXISTS transcripts (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
filename TEXT,
full_text TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 全局实体表(跨文件共享)
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
name TEXT NOT NULL,
canonical_name TEXT, -- 规范名称(用于对齐)
type TEXT,
definition TEXT,
aliases TEXT, -- JSON 数组:["别名1", "别名2"]
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 实体提及表(文件中的具体位置)
CREATE TABLE IF NOT EXISTS entity_mentions (
id TEXT PRIMARY KEY,
entity_id TEXT NOT NULL,
transcript_id TEXT NOT NULL,
start_pos INTEGER,
end_pos INTEGER,
text_snippet TEXT,
confidence REAL DEFAULT 1.0,
FOREIGN KEY (entity_id) REFERENCES entities(id),
FOREIGN KEY (transcript_id) REFERENCES transcripts(id)
);
-- 实体关系表
CREATE TABLE IF NOT EXISTS entity_relations (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
source_entity_id TEXT NOT NULL,
target_entity_id TEXT NOT NULL,
relation_type TEXT, -- "belongs_to", "works_with", "depends_on" 等
evidence TEXT, -- 关系来源文本
transcript_id TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (source_entity_id) REFERENCES entities(id),
FOREIGN KEY (target_entity_id) REFERENCES entities(id)
);
-- 术语表(项目级热词,用于 ASR 优化)
CREATE TABLE IF NOT EXISTS glossary (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
term TEXT NOT NULL,
pronunciation TEXT, -- 发音提示,如 "K8s" -> "Kubernetes"
frequency INTEGER DEFAULT 1,
FOREIGN KEY (project_id) REFERENCES projects(id)
);