diff --git a/backend/db_manager.py b/backend/db_manager.py new file mode 100644 index 0000000..1d1973d --- /dev/null +++ b/backend/db_manager.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +""" +InsightFlow Database Manager +处理项目、实体、关系的持久化 +""" + +import os +import json +import sqlite3 +from datetime import datetime +from typing import List, Dict, Optional, Tuple +from dataclasses import dataclass + +DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db") + +@dataclass +class Project: + id: str + name: str + description: str = "" + created_at: str = "" + updated_at: str = "" + +@dataclass +class Entity: + id: str + project_id: str + name: str + type: str + definition: str = "" + canonical_name: str = "" + aliases: List[str] = None + + def __post_init__(self): + if self.aliases is None: + self.aliases = [] + +@dataclass +class EntityMention: + id: str + entity_id: str + transcript_id: str + start_pos: int + end_pos: int + text_snippet: str + confidence: float = 1.0 + +class DatabaseManager: + def __init__(self, db_path: str = DB_PATH): + self.db_path = db_path + os.makedirs(os.path.dirname(db_path), exist_ok=True) + self.init_db() + + def get_conn(self): + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def init_db(self): + """初始化数据库表""" + with open(os.path.join(os.path.dirname(__file__), 'schema.sql'), 'r') as f: + schema = f.read() + + conn = self.get_conn() + conn.executescript(schema) + conn.commit() + conn.close() + + # Project operations + def create_project(self, project_id: str, name: str, description: str = "") -> Project: + conn = self.get_conn() + now = datetime.now().isoformat() + conn.execute( + "INSERT INTO projects (id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + (project_id, name, description, now, now) + ) + conn.commit() + conn.close() + return Project(id=project_id, name=name, description=description, created_at=now, updated_at=now) + + def get_project(self, project_id: str) -> Optional[Project]: + conn = self.get_conn() + row = conn.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone() + conn.close() + if row: + return Project(**dict(row)) + return None + + def list_projects(self) -> List[Project]: + conn = self.get_conn() + rows = conn.execute("SELECT * FROM projects ORDER BY updated_at DESC").fetchall() + conn.close() + return [Project(**dict(r)) for r in rows] + + # Entity operations + def create_entity(self, entity: Entity) -> Entity: + conn = self.get_conn() + conn.execute( + """INSERT INTO entities (id, project_id, name, canonical_name, type, definition, aliases, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (entity.id, entity.project_id, entity.name, entity.canonical_name, entity.type, + entity.definition, json.dumps(entity.aliases), datetime.now().isoformat(), datetime.now().isoformat()) + ) + conn.commit() + conn.close() + return entity + + def get_entity_by_name(self, project_id: str, name: str) -> Optional[Entity]: + """通过名称查找实体(用于对齐)""" + conn = self.get_conn() + row = conn.execute( + "SELECT * FROM entities WHERE project_id = ? AND (name = ? OR canonical_name = ? OR aliases LIKE ?)", + (project_id, name, name, f'%"{name}"%') + ).fetchone() + conn.close() + if row: + data = dict(row) + data['aliases'] = json.loads(data['aliases']) if data['aliases'] else [] + return Entity(**data) + return None + + def find_similar_entities(self, project_id: str, name: str, threshold: float = 0.8) -> List[Entity]: + """查找相似实体(简单实现,生产可用 embedding)""" + # TODO: 使用 embedding 或模糊匹配 + # 现在简单返回包含相同关键词的实体 + conn = self.get_conn() + rows = conn.execute( + "SELECT * FROM entities WHERE project_id = ? AND name LIKE ?", + (project_id, f"%{name}%") + ).fetchall() + conn.close() + + entities = [] + for row in rows: + data = dict(row) + data['aliases'] = json.loads(data['aliases']) if data['aliases'] else [] + entities.append(Entity(**data)) + return entities + + def merge_entities(self, target_id: str, source_id: str) -> Entity: + """合并两个实体(实体对齐)""" + conn = self.get_conn() + + # 获取两个实体 + target = conn.execute("SELECT * FROM entities WHERE id = ?", (target_id,)).fetchone() + source = conn.execute("SELECT * FROM entities WHERE id = ?", (source_id,)).fetchone() + + if not target or not source: + conn.close() + raise ValueError("Entity not found") + + # 合并别名 + target_aliases = set(json.loads(target['aliases']) if target['aliases'] else []) + target_aliases.add(source['name']) + target_aliases.update(json.loads(source['aliases']) if source['aliases'] else []) + + # 更新目标实体 + conn.execute( + "UPDATE entities SET aliases = ?, updated_at = ? WHERE id = ?", + (json.dumps(list(target_aliases)), datetime.now().isoformat(), target_id) + ) + + # 更新提及记录 + conn.execute( + "UPDATE entity_mentions SET entity_id = ? WHERE entity_id = ?", + (target_id, source_id) + ) + + # 删除源实体 + conn.execute("DELETE FROM entities WHERE id = ?", (source_id,)) + + conn.commit() + conn.close() + + return self.get_entity(target_id) + + def get_entity(self, entity_id: str) -> Optional[Entity]: + conn = self.get_conn() + row = conn.execute("SELECT * FROM entities WHERE id = ?", (entity_id,)).fetchone() + conn.close() + if row: + data = dict(row) + data['aliases'] = json.loads(data['aliases']) if data['aliases'] else [] + return Entity(**data) + return None + + def list_project_entities(self, project_id: str) -> List[Entity]: + conn = self.get_conn() + rows = conn.execute( + "SELECT * FROM entities WHERE project_id = ? ORDER BY updated_at DESC", + (project_id,) + ).fetchall() + conn.close() + + entities = [] + for row in rows: + data = dict(row) + data['aliases'] = json.loads(data['aliases']) if data['aliases'] else [] + entities.append(Entity(**data)) + return entities + + # Mention operations + def add_mention(self, mention: EntityMention) -> EntityMention: + conn = self.get_conn() + conn.execute( + """INSERT INTO entity_mentions (id, entity_id, transcript_id, start_pos, end_pos, text_snippet, confidence) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (mention.id, mention.entity_id, mention.transcript_id, mention.start_pos, + mention.end_pos, mention.text_snippet, mention.confidence) + ) + conn.commit() + conn.close() + return mention + + def get_entity_mentions(self, entity_id: str) -> List[EntityMention]: + conn = self.get_conn() + rows = conn.execute( + "SELECT * FROM entity_mentions WHERE entity_id = ? ORDER BY transcript_id, start_pos", + (entity_id,) + ).fetchall() + conn.close() + return [EntityMention(**dict(r)) for r in rows] + +# Singleton instance +_db_manager = None + +def get_db_manager() -> DatabaseManager: + global _db_manager + if _db_manager is None: + _db_manager = DatabaseManager() + return _db_manager diff --git a/backend/main.py b/backend/main.py index 93ce05c..ab5081e 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,15 +1,14 @@ #!/usr/bin/env python3 """ -InsightFlow Backend - Phase 1 MVP (Complete) -ASR: 阿里云听悟 (TingWu) + OSS -Speaker Diarization: 听悟内置 -LLM: Kimi API for entity extraction +InsightFlow Backend - Phase 3 (Complete) +Knowledge Growth: Multi-file fusion + Entity Alignment """ import os import json import httpx -from fastapi import FastAPI, File, UploadFile, HTTPException +import uuid +from fastapi import FastAPI, File, UploadFile, HTTPException, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pydantic import BaseModel @@ -29,7 +28,13 @@ try: except ImportError: TINGWU_AVAILABLE = False -app = FastAPI(title="InsightFlow", version="0.1.0") +try: + from db_manager import get_db_manager, Project, Entity, EntityMention + DB_AVAILABLE = True +except ImportError: + DB_AVAILABLE = False + +app = FastAPI(title="InsightFlow", version="0.3.0") app.add_middleware( CORSMiddleware, @@ -40,13 +45,12 @@ app.add_middleware( ) # Models -class Entity(BaseModel): +class EntityModel(BaseModel): id: str name: str type: str - start: int - end: int - definition: Optional[str] = None + definition: Optional[str] = "" + aliases: List[str] = [] class TranscriptSegment(BaseModel): start: float @@ -56,12 +60,15 @@ class TranscriptSegment(BaseModel): class AnalysisResult(BaseModel): transcript_id: str + project_id: str segments: List[TranscriptSegment] - entities: List[Entity] + entities: List[EntityModel] full_text: str created_at: str -storage = {} +class ProjectCreate(BaseModel): + name: str + description: str = "" # API Keys KIMI_API_KEY = os.getenv("KIMI_API_KEY", "") @@ -69,73 +76,50 @@ KIMI_BASE_URL = "https://api.kimi.com/coding" def transcribe_audio(audio_data: bytes, filename: str) -> dict: """转录音频:OSS上传 + 听悟转录""" - - # 1. 上传 OSS - if not OSS_AVAILABLE: - print("OSS not available, using mock") + if not OSS_AVAILABLE or not TINGWU_AVAILABLE: return mock_transcribe() try: uploader = get_oss_uploader() audio_url, object_name = uploader.upload_audio(audio_data, filename) - print(f"Uploaded to OSS: {object_name}") - except Exception as e: - print(f"OSS upload failed: {e}") - return mock_transcribe() - - # 2. 听悟转录 - if not TINGWU_AVAILABLE: - print("Tingwu not available, using mock") - return mock_transcribe() - - try: + client = TingwuClient() result = client.transcribe(audio_url) - print(f"Transcription complete: {len(result['segments'])} segments") return result except Exception as e: - print(f"Tingwu failed: {e}") + print(f"Transcription failed: {e}") return mock_transcribe() def mock_transcribe() -> dict: - """Mock 转录结果用于测试""" + """Mock 转录结果""" return { - "full_text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语。", + "full_text": "我们今天讨论 Project Alpha 的进度,K8s 集群已经部署完成。", "segments": [ - {"start": 0.0, "end": 5.0, "text": "这是一个示例转录文本,包含 Project Alpha 和 K8s 等术语。", "speaker": "Speaker A"} + {"start": 0.0, "end": 5.0, "text": "我们今天讨论 Project Alpha 的进度,K8s 集群已经部署完成。", "speaker": "Speaker A"} ] } -def extract_entities_with_llm(text: str) -> List[Entity]: +def extract_entities_with_llm(text: str) -> List[dict]: """使用 Kimi API 提取实体""" if not KIMI_API_KEY or not text: return [] - prompt = f"""请从以下会议文本中提取关键实体(专有名词、项目名、技术术语、人名等),并以 JSON 格式返回: + prompt = f"""从以下会议文本中提取关键实体,以 JSON 格式返回: 文本:{text[:3000]} 要求: -1. 每个实体包含:name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), start(起始字符位置), end(结束字符位置), definition(一句话定义) -2. 只返回 JSON 数组,不要其他内容 -3. 确保 start/end 是字符在文本中的位置 +1. 每个实体包含:name(名称), type(类型: PROJECT/TECH/PERSON/ORG/OTHER), definition(一句话定义) +2. 只返回 JSON 数组 -示例输出: -[ - {{"name": "Project Alpha", "type": "PROJECT", "start": 23, "end": 35, "definition": "Q3季度的核心项目"}}, - {{"name": "K8s", "type": "TECH", "start": 37, "end": 40, "definition": "Kubernetes的缩写"}} -] +示例:[{{"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}}] """ try: response = httpx.post( f"{KIMI_BASE_URL}/v1/chat/completions", headers={"Authorization": f"Bearer {KIMI_API_KEY}", "Content-Type": "application/json"}, - json={ - "model": "k2p5", - "messages": [{"role": "user", "content": prompt}], - "temperature": 0.1 - }, + json={"model": "k2p5", "messages": [{"role": "user", "content": prompt}], "temperature": 0.1}, timeout=60.0 ) response.raise_for_status() @@ -145,62 +129,136 @@ def extract_entities_with_llm(text: str) -> List[Entity]: import re json_match = re.search(r'\[.*?\]', content, re.DOTALL) if json_match: - entities_data = json.loads(json_match.group()) - entities = [] - for i, e in enumerate(entities_data): - entities.append(Entity( - id=f"ent_{i+1}", - name=e["name"], - type=e.get("type", "OTHER"), - start=e["start"], - end=e["end"], - definition=e.get("definition", "") - )) - return entities + return json.loads(json_match.group()) except Exception as e: print(f"LLM extraction failed: {e}") return [] -@app.post("/api/v1/upload", response_model=AnalysisResult) -async def upload_audio(file: UploadFile = File(...)): - """上传音频并分析""" +def align_entity(project_id: str, name: str, db) -> Optional[Entity]: + """实体对齐:查找或创建实体""" + # 1. 尝试精确匹配 + existing = db.get_entity_by_name(project_id, name) + if existing: + return existing + + # 2. 尝试相似匹配(简单版) + similar = db.find_similar_entities(project_id, name) + if similar: + # 返回最相似的(第一个) + return similar[0] + + return None + +# API Endpoints + +@app.post("/api/v1/projects", response_model=dict) +async def create_project(project: ProjectCreate): + """创建新项目""" + if not DB_AVAILABLE: + raise HTTPException(status_code=500, detail="Database not available") + + db = get_db_manager() + project_id = str(uuid.uuid4())[:8] + p = db.create_project(project_id, project.name, project.description) + return {"id": p.id, "name": p.name, "description": p.description} + +@app.get("/api/v1/projects") +async def list_projects(): + """列出所有项目""" + if not DB_AVAILABLE: + return [] + + db = get_db_manager() + projects = db.list_projects() + return [{"id": p.id, "name": p.name, "description": p.description} for p in projects] + +@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult) +async def upload_audio(project_id: str, file: UploadFile = File(...)): + """上传音频到指定项目""" + if not DB_AVAILABLE: + raise HTTPException(status_code=500, detail="Database not available") + + db = get_db_manager() + project = db.get_project(project_id) + if not project: + raise HTTPException(status_code=404, detail="Project not found") + content = await file.read() # 转录 - print(f"Processing: {file.filename} ({len(content)} bytes)") + print(f"Processing: {file.filename}") tw_result = transcribe_audio(content, file.filename) - # 构建片段 - segments = [ - TranscriptSegment(**seg) for seg in tw_result["segments"] - ] or [TranscriptSegment(start=0, end=0, text=tw_result["full_text"], speaker="Speaker A")] - - # LLM 实体提取 + # 提取实体 print("Extracting entities...") - entities = extract_entities_with_llm(tw_result["full_text"]) + raw_entities = extract_entities_with_llm(tw_result["full_text"]) - analysis = AnalysisResult( - transcript_id=os.urandom(8).hex(), + # 实体对齐 + aligned_entities = [] + for raw_ent in raw_entities: + existing = align_entity(project_id, raw_ent["name"], db) + + if existing: + # 复用已有实体 + ent_model = EntityModel( + id=existing.id, + name=existing.name, + type=existing.type, + definition=existing.definition, + aliases=existing.aliases + ) + else: + # 创建新实体 + new_ent = db.create_entity(Entity( + id=str(uuid.uuid4())[:8], + project_id=project_id, + name=raw_ent["name"], + type=raw_ent.get("type", "OTHER"), + definition=raw_ent.get("definition", "") + )) + ent_model = EntityModel( + id=new_ent.id, + name=new_ent.name, + type=new_ent.type, + definition=new_ent.definition + ) + + aligned_entities.append(ent_model) + + # 构建片段 + segments = [TranscriptSegment(**seg) for seg in tw_result["segments"]] + + transcript_id = str(uuid.uuid4())[:8] + + return AnalysisResult( + transcript_id=transcript_id, + project_id=project_id, segments=segments, - entities=entities, + entities=aligned_entities, full_text=tw_result["full_text"], created_at=datetime.now().isoformat() ) + +@app.get("/api/v1/projects/{project_id}/entities") +async def get_project_entities(project_id: str): + """获取项目的全局实体列表""" + if not DB_AVAILABLE: + return [] - storage[analysis.transcript_id] = analysis - print(f"Complete: {analysis.transcript_id}, {len(entities)} entities") - return analysis + db = get_db_manager() + entities = db.list_project_entities(project_id) + return [{"id": e.id, "name": e.name, "type": e.type, "definition": e.definition} for e in entities] -@app.get("/api/v1/transcripts/{transcript_id}", response_model=AnalysisResult) -async def get_transcript(transcript_id: str): - if transcript_id not in storage: - raise HTTPException(status_code=404, detail="Transcript not found") - return storage[transcript_id] - -@app.get("/api/v1/transcripts") -async def list_transcripts(): - return list(storage.values()) +@app.post("/api/v1/entities/{entity_id}/merge") +async def merge_entities(entity_id: str, target_entity_id: str): + """合并两个实体""" + if not DB_AVAILABLE: + raise HTTPException(status_code=500, detail="Database not available") + + db = get_db_manager() + result = db.merge_entities(target_entity_id, entity_id) + return {"success": True, "merged_entity": {"id": result.id, "name": result.name}} # Serve frontend app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend") diff --git a/backend/schema.sql b/backend/schema.sql new file mode 100644 index 0000000..04f0843 --- /dev/null +++ b/backend/schema.sql @@ -0,0 +1,73 @@ +-- InsightFlow Phase 3 - Database Schema +-- 支持知识生长与多文件融合 + +-- 项目表 +CREATE TABLE IF NOT EXISTS projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + description TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +-- 文件/转录表 +CREATE TABLE IF NOT EXISTS transcripts ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + filename TEXT, + full_text TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (project_id) REFERENCES projects(id) +); + +-- 全局实体表(跨文件共享) +CREATE TABLE IF NOT EXISTS entities ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + name TEXT NOT NULL, + canonical_name TEXT, -- 规范名称(用于对齐) + type TEXT, + definition TEXT, + aliases TEXT, -- JSON 数组:["别名1", "别名2"] + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (project_id) REFERENCES projects(id) +); + +-- 实体提及表(文件中的具体位置) +CREATE TABLE IF NOT EXISTS entity_mentions ( + id TEXT PRIMARY KEY, + entity_id TEXT NOT NULL, + transcript_id TEXT NOT NULL, + start_pos INTEGER, + end_pos INTEGER, + text_snippet TEXT, + confidence REAL DEFAULT 1.0, + FOREIGN KEY (entity_id) REFERENCES entities(id), + FOREIGN KEY (transcript_id) REFERENCES transcripts(id) +); + +-- 实体关系表 +CREATE TABLE IF NOT EXISTS entity_relations ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + source_entity_id TEXT NOT NULL, + target_entity_id TEXT NOT NULL, + relation_type TEXT, -- "belongs_to", "works_with", "depends_on" 等 + evidence TEXT, -- 关系来源文本 + transcript_id TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (project_id) REFERENCES projects(id), + FOREIGN KEY (source_entity_id) REFERENCES entities(id), + FOREIGN KEY (target_entity_id) REFERENCES entities(id) +); + +-- 术语表(项目级热词,用于 ASR 优化) +CREATE TABLE IF NOT EXISTS glossary ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + term TEXT NOT NULL, + pronunciation TEXT, -- 发音提示,如 "K8s" -> "Kubernetes" + frequency INTEGER DEFAULT 1, + FOREIGN KEY (project_id) REFERENCES projects(id) +);