Phase 5: 完成导出功能

- 新增 export_manager.py 导出管理模块
- 知识图谱导出 SVG/PNG
- 实体数据导出 Excel/CSV
- 关系数据导出 CSV
- 项目报告导出 PDF
- 转录文本导出 Markdown
- 项目完整数据导出 JSON
- 前端添加导出面板和功能
- 更新依赖: pandas, openpyxl, reportlab, cairosvg
This commit is contained in:
OpenClaw Bot
2026-02-20 06:06:23 +08:00
parent 2470064f65
commit 6318cd0af9
6 changed files with 1365 additions and 1 deletions

View File

@@ -11,6 +11,7 @@ import json
import httpx
import uuid
import re
import io
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
@@ -67,6 +68,12 @@ try:
except ImportError:
REASONER_AVAILABLE = False
try:
from export_manager import get_export_manager, ExportEntity, ExportRelation, ExportTranscript
EXPORT_AVAILABLE = True
except ImportError:
EXPORT_AVAILABLE = False
app = FastAPI(title="InsightFlow", version="0.3.0")
app.add_middleware(
@@ -1911,6 +1918,411 @@ async def search_entities_by_attributes_endpoint(
]
# ==================== 导出功能 API ====================
from fastapi.responses import StreamingResponse, FileResponse
@app.get("/api/v1/projects/{project_id}/export/graph-svg")
async def export_graph_svg_endpoint(project_id: str):
"""导出知识图谱为 SVG"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
export_mgr = get_export_manager()
svg_content = export_mgr.export_knowledge_graph_svg(project_id, entities, relations)
return StreamingResponse(
io.BytesIO(svg_content.encode('utf-8')),
media_type="image/svg+xml",
headers={"Content-Disposition": f"attachment; filename=insightflow-graph-{project_id}.svg"}
)
@app.get("/api/v1/projects/{project_id}/export/graph-png")
async def export_graph_png_endpoint(project_id: str):
"""导出知识图谱为 PNG"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
export_mgr = get_export_manager()
png_bytes = export_mgr.export_knowledge_graph_png(project_id, entities, relations)
return StreamingResponse(
io.BytesIO(png_bytes),
media_type="image/png",
headers={"Content-Disposition": f"attachment; filename=insightflow-graph-{project_id}.png"}
)
@app.get("/api/v1/projects/{project_id}/export/entities-excel")
async def export_entities_excel_endpoint(project_id: str):
"""导出实体数据为 Excel"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取实体数据
entities_data = db.get_project_entities(project_id)
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
export_mgr = get_export_manager()
excel_bytes = export_mgr.export_entities_excel(entities)
return StreamingResponse(
io.BytesIO(excel_bytes),
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers={"Content-Disposition": f"attachment; filename=insightflow-entities-{project_id}.xlsx"}
)
@app.get("/api/v1/projects/{project_id}/export/entities-csv")
async def export_entities_csv_endpoint(project_id: str):
"""导出实体数据为 CSV"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取实体数据
entities_data = db.get_project_entities(project_id)
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
export_mgr = get_export_manager()
csv_content = export_mgr.export_entities_csv(entities)
return StreamingResponse(
io.BytesIO(csv_content.encode('utf-8')),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=insightflow-entities-{project_id}.csv"}
)
@app.get("/api/v1/projects/{project_id}/export/relations-csv")
async def export_relations_csv_endpoint(project_id: str):
"""导出关系数据为 CSV"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取关系数据
relations_data = db.get_project_relations(project_id)
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
export_mgr = get_export_manager()
csv_content = export_mgr.export_relations_csv(relations)
return StreamingResponse(
io.BytesIO(csv_content.encode('utf-8')),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=insightflow-relations-{project_id}.csv"}
)
@app.get("/api/v1/projects/{project_id}/export/report-pdf")
async def export_report_pdf_endpoint(project_id: str):
"""导出项目报告为 PDF"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
transcripts_data = db.get_project_transcripts(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
transcripts = []
for t in transcripts_data:
segments = json.loads(t.segments) if t.segments else []
transcripts.append(ExportTranscript(
id=t.id,
name=t.name,
type=t.type,
content=t.full_text or "",
segments=segments,
entity_mentions=[]
))
# 获取项目总结
summary = ""
if REASONER_AVAILABLE:
try:
reasoner = get_knowledge_reasoner()
summary_result = reasoner.generate_project_summary(project_id, db)
summary = summary_result.get("summary", "")
except:
pass
export_mgr = get_export_manager()
pdf_bytes = export_mgr.export_project_report_pdf(
project_id, project.name, entities, relations, transcripts, summary
)
return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=insightflow-report-{project_id}.pdf"}
)
@app.get("/api/v1/projects/{project_id}/export/project-json")
async def export_project_json_endpoint(project_id: str):
"""导出完整项目数据为 JSON"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
project = db.get_project(project_id)
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# 获取项目数据
entities_data = db.get_project_entities(project_id)
relations_data = db.get_project_relations(project_id)
transcripts_data = db.get_project_transcripts(project_id)
# 转换为导出格式
entities = []
for e in entities_data:
attrs = db.get_entity_attributes(e.id)
entities.append(ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={a.template_name: a.value for a in attrs}
))
relations = []
for r in relations_data:
relations.append(ExportRelation(
id=r.id,
source=r.source_name,
target=r.target_name,
relation_type=r.relation_type,
confidence=r.confidence,
evidence=r.evidence or ""
))
transcripts = []
for t in transcripts_data:
segments = json.loads(t.segments) if t.segments else []
transcripts.append(ExportTranscript(
id=t.id,
name=t.name,
type=t.type,
content=t.full_text or "",
segments=segments,
entity_mentions=[]
))
export_mgr = get_export_manager()
json_content = export_mgr.export_project_json(
project_id, project.name, entities, relations, transcripts
)
return StreamingResponse(
io.BytesIO(json_content.encode('utf-8')),
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename=insightflow-project-{project_id}.json"}
)
@app.get("/api/v1/transcripts/{transcript_id}/export/markdown")
async def export_transcript_markdown_endpoint(transcript_id: str):
"""导出转录文本为 Markdown"""
if not DB_AVAILABLE or not EXPORT_AVAILABLE:
raise HTTPException(status_code=500, detail="Export functionality not available")
db = get_db_manager()
transcript = db.get_transcript(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
# 获取实体提及
mentions = db.get_transcript_entity_mentions(transcript_id)
# 获取项目实体用于映射
entities_data = db.get_project_entities(transcript.project_id)
entities_map = {e.id: ExportEntity(
id=e.id,
name=e.name,
type=e.type,
definition=e.definition or "",
aliases=json.loads(e.aliases) if e.aliases else [],
mention_count=e.mention_count,
attributes={}
) for e in entities_data}
segments = json.loads(transcript.segments) if transcript.segments else []
export_transcript = ExportTranscript(
id=transcript.id,
name=transcript.name,
type=transcript.type,
content=transcript.full_text or "",
segments=segments,
entity_mentions=[{
"entity_id": m.entity_id,
"entity_name": m.entity_name,
"position": m.position,
"context": m.context
} for m in mentions]
)
export_mgr = get_export_manager()
markdown_content = export_mgr.export_transcript_markdown(export_transcript, entities_map)
return StreamingResponse(
io.BytesIO(markdown_content.encode('utf-8')),
media_type="text/markdown",
headers={"Content-Disposition": f"attachment; filename=insightflow-transcript-{transcript_id}.md"}
)
# Serve frontend - MUST be last to not override API routes
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")