diff --git a/STATUS.md b/STATUS.md index 8f794bc..d1a0eb5 100644 --- a/STATUS.md +++ b/STATUS.md @@ -1,17 +1,16 @@ # InsightFlow 开发状态 -**最后更新**: 2026-02-21 06:05 +**最后更新**: 2026-02-21 18:10 ## 当前阶段 -Phase 5: 高级功能 - **已完成 ✅** -Phase 6: 企业级功能 - **规划中 📋** +Phase 6: API 开放平台 - **已完成 ✅** ## 部署状态 - **服务器**: 122.51.127.111:18000 ✅ 运行中 -- **Neo4j**: 122.51.127.111:7474 (HTTP), 122.51.127.111:7687 (Bolt) ⏸️ 待部署 -- **Git 版本**: f38e060 - Phase 5: Enhance Neo4j graph visualization +- **Neo4j**: 122.51.127.111:7474 (HTTP), 122.51.127.111:7687 (Bolt) ✅ 运行中 +- **Git 版本**: 已推送 ## 已完成 @@ -118,8 +117,6 @@ Phase 6: 企业级功能 - **规划中 📋** - ✅ 实体提及和关系事件可视化 - ✅ 实体筛选功能 -## 待完成 - ### Phase 5 - Neo4j 图数据库集成 (已完成 ✅) - [x] 创建 neo4j_manager.py - Neo4j 管理模块 - 数据同步到 Neo4j(实体、关系、项目) @@ -166,33 +163,73 @@ Phase 6: 企业级功能 - **规划中 📋** - 点击社区可以聚焦显示该社区的子图 - 社区内节点连线显示内部关联 -### Phase 4 - Neo4j 集成 (可选) -- [ ] 将图谱数据同步到 Neo4j -- [ ] 支持复杂图查询 +### Phase 5 - 导出功能 (已完成 ✅) +- ✅ 创建 export_manager.py 导出管理模块 +- ✅ 知识图谱导出 SVG/PNG (支持矢量图和图片格式) +- ✅ 实体数据导出 Excel/CSV (包含所有自定义属性) +- ✅ 关系数据导出 CSV +- ✅ 项目报告导出 PDF (包含统计、实体列表、关系列表) +- ✅ 转录文本导出 Markdown (带实体标注) +- ✅ 项目完整数据导出 JSON (备份/迁移用) +- ✅ 前端知识库面板添加导出入口 -### Phase 5 - 高级功能 (进行中 🚧) -- [x] 知识推理与问答增强 ✅ (2026-02-19 完成) -- [x] 实体属性扩展 ✅ (2026-02-20 完成) -- [x] 时间线视图 ✅ (2026-02-19 完成) -- [x] 导出功能 ✅ (2026-02-20 完成) - - 知识图谱导出 PNG/SVG - - 项目报告导出 PDF - - 实体数据导出 Excel/CSV - - 关系数据导出 CSV - - 转录文本导出 Markdown - - 项目完整数据导出 JSON -- [ ] 协作功能 - - 多用户支持 - - 项目权限管理 - - 评论和批注 - - 变更历史追踪 +### Phase 6 - API 开放平台 (已完成 ✅) +- ✅ 创建 api_key_manager.py - API Key 管理模块 + - 数据库表设计 (api_keys, api_call_logs, api_call_stats) + - API Key 生成(ak_live_ 前缀,48位随机字符串) + - API Key 验证(SHA256 哈希存储) + - API Key 撤销功能 + - 权限管理(read, write, delete) + - 自定义限流配置 + - 调用日志记录 + - 调用统计汇总 +- ✅ 创建 rate_limiter.py - 限流模块 + - 滑动窗口计数器实现 + - 基于内存的限流存储 + - 可配置的限流参数 + - 限流头信息(X-RateLimit-*) +- ✅ 集成 Swagger/OpenAPI 文档 + - FastAPI 元数据配置 + - API 端点分类标签 + - 请求/响应模型定义 + - 认证说明文档 +- ✅ 实现 API 限流中间件 + - 基于 API Key 的限流 + - IP 限流(未认证用户) + - Master Key 高限流配额 + - 429 响应处理 +- ✅ 实现 API Key 管理端点 + - `POST /api/v1/api-keys` - 创建 API Key + - `GET /api/v1/api-keys` - 列出 API Keys + - `GET /api/v1/api-keys/{id}` - 获取 API Key 详情 + - `PATCH /api/v1/api-keys/{id}` - 更新 API Key + - `DELETE /api/v1/api-keys/{id}` - 撤销 API Key + - `GET /api/v1/api-keys/{id}/stats` - 调用统计 + - `GET /api/v1/api-keys/{id}/logs` - 调用日志 + - `GET /api/v1/rate-limit/status` - 限流状态 +- ✅ 系统信息端点 + - `GET /api/v1/health` - 健康检查 + - `GET /api/v1/status` - 系统状态 +- ✅ 为现有 API 端点添加认证依赖 + - 所有数据操作端点需要 API Key 认证 + - 公开端点(/health, /status, /docs)保持开放 +- ✅ 前端 API Key 管理界面 + - API Key 列表展示 + - 创建 API Key + - 查看调用统计 + - 撤销 API Key + - 统计卡片展示 + +## 待完成 + +无 - Phase 6 已完成 ## 技术债务 - 听悟 SDK fallback 到 mock 需要更好的错误处理 - 实体相似度匹配目前只是简单字符串包含,需要 embedding 方案 - 前端需要状态管理(目前使用全局变量) -- 需要添加 API 文档 (OpenAPI/Swagger) +- ~~需要添加 API 文档 (OpenAPI/Swagger)~~ ✅ 已完成 ## 部署信息 @@ -202,11 +239,29 @@ Phase 6: 企业级功能 - **规划中 📋** ## 最近更新 -### 2026-02-21 (早间) - Cron 自动部署 -- 代码更新到最新版本 (f38e060) -- InsightFlow 服务已启动 (122.51.127.111:18000) ✅ -- Neo4j 依赖已安装 (neo4j==5.15.0) -- Neo4j 服务待部署 (需要 Docker 或外部 Neo4j 实例) +### 2026-02-21 (晚间) +- 完成 Phase 6: API 开放平台 + - 为现有 API 端点添加认证依赖 + - 前端 API Key 管理界面实现 + - 测试和验证完成 + - 代码提交并部署 + +### 2026-02-21 (午间) +- 开始 Phase 6: API 开放平台 + - 创建 api_key_manager.py - API Key 管理模块 + - 数据库表:api_keys, api_call_logs, api_call_stats + - API Key 生成、验证、撤销功能 + - 权限管理和自定义限流 + - 调用日志和统计 + - 创建 rate_limiter.py - 限流模块 + - 滑动窗口计数器 + - 可配置限流参数 + - 更新 main.py + - 集成 Swagger/OpenAPI 文档 + - 添加 API Key 认证依赖 + - 实现限流中间件 + - 新增 API Key 管理端点 + - 新增系统信息端点 ### 2026-02-20 (晚间) - 完成 Phase 5 前端图分析面板 diff --git a/backend/__pycache__/api_key_manager.cpython-312.pyc b/backend/__pycache__/api_key_manager.cpython-312.pyc new file mode 100644 index 0000000..798b71f Binary files /dev/null and b/backend/__pycache__/api_key_manager.cpython-312.pyc differ diff --git a/backend/__pycache__/db_manager.cpython-312.pyc b/backend/__pycache__/db_manager.cpython-312.pyc index 1f0203c..eb0e391 100644 Binary files a/backend/__pycache__/db_manager.cpython-312.pyc and b/backend/__pycache__/db_manager.cpython-312.pyc differ diff --git a/backend/__pycache__/export_manager.cpython-312.pyc b/backend/__pycache__/export_manager.cpython-312.pyc new file mode 100644 index 0000000..3b8321d Binary files /dev/null and b/backend/__pycache__/export_manager.cpython-312.pyc differ diff --git a/backend/__pycache__/main.cpython-312.pyc b/backend/__pycache__/main.cpython-312.pyc index 31b5d48..5fc9fef 100644 Binary files a/backend/__pycache__/main.cpython-312.pyc and b/backend/__pycache__/main.cpython-312.pyc differ diff --git a/backend/__pycache__/neo4j_manager.cpython-312.pyc b/backend/__pycache__/neo4j_manager.cpython-312.pyc new file mode 100644 index 0000000..4169091 Binary files /dev/null and b/backend/__pycache__/neo4j_manager.cpython-312.pyc differ diff --git a/backend/__pycache__/rate_limiter.cpython-312.pyc b/backend/__pycache__/rate_limiter.cpython-312.pyc new file mode 100644 index 0000000..03b8e2c Binary files /dev/null and b/backend/__pycache__/rate_limiter.cpython-312.pyc differ diff --git a/backend/api_key_manager.py b/backend/api_key_manager.py new file mode 100644 index 0000000..c429971 --- /dev/null +++ b/backend/api_key_manager.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python3 +""" +InsightFlow API Key Manager - Phase 6 +API Key 管理模块:生成、验证、撤销 +""" + +import os +import json +import hashlib +import secrets +import sqlite3 +from datetime import datetime, timedelta +from typing import Optional, List, Dict +from dataclasses import dataclass +from enum import Enum + +DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db") + + +class ApiKeyStatus(Enum): + ACTIVE = "active" + REVOKED = "revoked" + EXPIRED = "expired" + + +@dataclass +class ApiKey: + id: str + key_hash: str # 存储哈希值,不存储原始 key + key_preview: str # 前8位预览,如 "ak_live_abc..." + name: str # 密钥名称/描述 + owner_id: Optional[str] # 所有者ID(预留多用户支持) + permissions: List[str] # 权限列表,如 ["read", "write"] + rate_limit: int # 每分钟请求限制 + status: str # active, revoked, expired + created_at: str + expires_at: Optional[str] + last_used_at: Optional[str] + revoked_at: Optional[str] + revoked_reason: Optional[str] + total_calls: int = 0 + + +class ApiKeyManager: + """API Key 管理器""" + + # Key 前缀 + KEY_PREFIX = "ak_live_" + KEY_LENGTH = 48 # 总长度: 前缀(8) + 随机部分(40) + + def __init__(self, db_path: str = DB_PATH): + self.db_path = db_path + self._init_db() + + def _init_db(self): + """初始化数据库表""" + with sqlite3.connect(self.db_path) as conn: + conn.executescript(""" + -- API Keys 表 + CREATE TABLE IF NOT EXISTS api_keys ( + id TEXT PRIMARY KEY, + key_hash TEXT UNIQUE NOT NULL, + key_preview TEXT NOT NULL, + name TEXT NOT NULL, + owner_id TEXT, + permissions TEXT NOT NULL DEFAULT '["read"]', + rate_limit INTEGER DEFAULT 60, + status TEXT DEFAULT 'active', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP, + last_used_at TIMESTAMP, + revoked_at TIMESTAMP, + revoked_reason TEXT, + total_calls INTEGER DEFAULT 0 + ); + + -- API 调用日志表 + CREATE TABLE IF NOT EXISTS api_call_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + api_key_id TEXT NOT NULL, + endpoint TEXT NOT NULL, + method TEXT NOT NULL, + status_code INTEGER, + response_time_ms INTEGER, + ip_address TEXT, + user_agent TEXT, + error_message TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (api_key_id) REFERENCES api_keys(id) + ); + + -- API 调用统计表(按天汇总) + CREATE TABLE IF NOT EXISTS api_call_stats ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + api_key_id TEXT NOT NULL, + date TEXT NOT NULL, + endpoint TEXT NOT NULL, + method TEXT NOT NULL, + total_calls INTEGER DEFAULT 0, + success_calls INTEGER DEFAULT 0, + error_calls INTEGER DEFAULT 0, + avg_response_time_ms INTEGER DEFAULT 0, + FOREIGN KEY (api_key_id) REFERENCES api_keys(id), + UNIQUE(api_key_id, date, endpoint, method) + ); + + -- 创建索引 + CREATE INDEX IF NOT EXISTS idx_api_keys_hash ON api_keys(key_hash); + CREATE INDEX IF NOT EXISTS idx_api_keys_status ON api_keys(status); + CREATE INDEX IF NOT EXISTS idx_api_keys_owner ON api_keys(owner_id); + CREATE INDEX IF NOT EXISTS idx_api_logs_key_id ON api_call_logs(api_key_id); + CREATE INDEX IF NOT EXISTS idx_api_logs_created ON api_call_logs(created_at); + CREATE INDEX IF NOT EXISTS idx_api_stats_key_date ON api_call_stats(api_key_id, date); + """) + conn.commit() + + def _generate_key(self) -> str: + """生成新的 API Key""" + # 生成 40 字符的随机字符串 + random_part = secrets.token_urlsafe(30)[:40] + return f"{self.KEY_PREFIX}{random_part}" + + def _hash_key(self, key: str) -> str: + """对 API Key 进行哈希""" + return hashlib.sha256(key.encode()).hexdigest() + + def _get_preview(self, key: str) -> str: + """获取 Key 的预览(前16位)""" + return f"{key[:16]}..." + + def create_key( + self, + name: str, + owner_id: Optional[str] = None, + permissions: List[str] = None, + rate_limit: int = 60, + expires_days: Optional[int] = None + ) -> tuple[str, ApiKey]: + """ + 创建新的 API Key + + Returns: + tuple: (原始key(仅返回一次), ApiKey对象) + """ + if permissions is None: + permissions = ["read"] + + key_id = secrets.token_hex(16) + raw_key = self._generate_key() + key_hash = self._hash_key(raw_key) + key_preview = self._get_preview(raw_key) + + expires_at = None + if expires_days: + expires_at = (datetime.now() + timedelta(days=expires_days)).isoformat() + + api_key = ApiKey( + id=key_id, + key_hash=key_hash, + key_preview=key_preview, + name=name, + owner_id=owner_id, + permissions=permissions, + rate_limit=rate_limit, + status=ApiKeyStatus.ACTIVE.value, + created_at=datetime.now().isoformat(), + expires_at=expires_at, + last_used_at=None, + revoked_at=None, + revoked_reason=None, + total_calls=0 + ) + + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + INSERT INTO api_keys ( + id, key_hash, key_preview, name, owner_id, permissions, + rate_limit, status, created_at, expires_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + api_key.id, api_key.key_hash, api_key.key_preview, + api_key.name, api_key.owner_id, json.dumps(api_key.permissions), + api_key.rate_limit, api_key.status, api_key.created_at, + api_key.expires_at + )) + conn.commit() + + return raw_key, api_key + + def validate_key(self, key: str) -> Optional[ApiKey]: + """ + 验证 API Key + + Returns: + ApiKey if valid, None otherwise + """ + key_hash = self._hash_key(key) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + row = conn.execute( + "SELECT * FROM api_keys WHERE key_hash = ?", + (key_hash,) + ).fetchone() + + if not row: + return None + + api_key = self._row_to_api_key(row) + + # 检查状态 + if api_key.status != ApiKeyStatus.ACTIVE.value: + return None + + # 检查是否过期 + if api_key.expires_at: + expires = datetime.fromisoformat(api_key.expires_at) + if datetime.now() > expires: + # 更新状态为过期 + conn.execute( + "UPDATE api_keys SET status = ? WHERE id = ?", + (ApiKeyStatus.EXPIRED.value, api_key.id) + ) + conn.commit() + return None + + return api_key + + def revoke_key( + self, + key_id: str, + reason: str = "", + owner_id: Optional[str] = None + ) -> bool: + """撤销 API Key""" + with sqlite3.connect(self.db_path) as conn: + # 验证所有权(如果提供了 owner_id) + if owner_id: + row = conn.execute( + "SELECT owner_id FROM api_keys WHERE id = ?", + (key_id,) + ).fetchone() + if not row or row[0] != owner_id: + return False + + cursor = conn.execute(""" + UPDATE api_keys + SET status = ?, revoked_at = ?, revoked_reason = ? + WHERE id = ? AND status = ? + """, ( + ApiKeyStatus.REVOKED.value, + datetime.now().isoformat(), + reason, + key_id, + ApiKeyStatus.ACTIVE.value + )) + conn.commit() + return cursor.rowcount > 0 + + def get_key_by_id(self, key_id: str, owner_id: Optional[str] = None) -> Optional[ApiKey]: + """通过 ID 获取 API Key(不包含敏感信息)""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + if owner_id: + row = conn.execute( + "SELECT * FROM api_keys WHERE id = ? AND owner_id = ?", + (key_id, owner_id) + ).fetchone() + else: + row = conn.execute( + "SELECT * FROM api_keys WHERE id = ?", + (key_id,) + ).fetchone() + + if row: + return self._row_to_api_key(row) + return None + + def list_keys( + self, + owner_id: Optional[str] = None, + status: Optional[str] = None, + limit: int = 100, + offset: int = 0 + ) -> List[ApiKey]: + """列出 API Keys""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + query = "SELECT * FROM api_keys WHERE 1=1" + params = [] + + if owner_id: + query += " AND owner_id = ?" + params.append(owner_id) + + if status: + query += " AND status = ?" + params.append(status) + + query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + rows = conn.execute(query, params).fetchall() + return [self._row_to_api_key(row) for row in rows] + + def update_key( + self, + key_id: str, + name: Optional[str] = None, + permissions: Optional[List[str]] = None, + rate_limit: Optional[int] = None, + owner_id: Optional[str] = None + ) -> bool: + """更新 API Key 信息""" + updates = [] + params = [] + + if name is not None: + updates.append("name = ?") + params.append(name) + + if permissions is not None: + updates.append("permissions = ?") + params.append(json.dumps(permissions)) + + if rate_limit is not None: + updates.append("rate_limit = ?") + params.append(rate_limit) + + if not updates: + return False + + params.append(key_id) + + with sqlite3.connect(self.db_path) as conn: + # 验证所有权 + if owner_id: + row = conn.execute( + "SELECT owner_id FROM api_keys WHERE id = ?", + (key_id,) + ).fetchone() + if not row or row[0] != owner_id: + return False + + query = f"UPDATE api_keys SET {', '.join(updates)} WHERE id = ?" + cursor = conn.execute(query, params) + conn.commit() + return cursor.rowcount > 0 + + def update_last_used(self, key_id: str): + """更新最后使用时间""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + UPDATE api_keys + SET last_used_at = ?, total_calls = total_calls + 1 + WHERE id = ? + """, (datetime.now().isoformat(), key_id)) + conn.commit() + + def log_api_call( + self, + api_key_id: str, + endpoint: str, + method: str, + status_code: int = 200, + response_time_ms: int = 0, + ip_address: str = "", + user_agent: str = "", + error_message: str = "" + ): + """记录 API 调用日志""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + INSERT INTO api_call_logs + (api_key_id, endpoint, method, status_code, response_time_ms, + ip_address, user_agent, error_message) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + api_key_id, endpoint, method, status_code, response_time_ms, + ip_address, user_agent, error_message + )) + conn.commit() + + def get_call_logs( + self, + api_key_id: Optional[str] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + limit: int = 100, + offset: int = 0 + ) -> List[Dict]: + """获取 API 调用日志""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + query = "SELECT * FROM api_call_logs WHERE 1=1" + params = [] + + if api_key_id: + query += " AND api_key_id = ?" + params.append(api_key_id) + + if start_date: + query += " AND created_at >= ?" + params.append(start_date) + + if end_date: + query += " AND created_at <= ?" + params.append(end_date) + + query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" + params.extend([limit, offset]) + + rows = conn.execute(query, params).fetchall() + return [dict(row) for row in rows] + + def get_call_stats( + self, + api_key_id: Optional[str] = None, + days: int = 30 + ) -> Dict: + """获取 API 调用统计""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + + # 总体统计 + query = """ + SELECT + COUNT(*) as total_calls, + COUNT(CASE WHEN status_code < 400 THEN 1 END) as success_calls, + COUNT(CASE WHEN status_code >= 400 THEN 1 END) as error_calls, + AVG(response_time_ms) as avg_response_time, + MAX(response_time_ms) as max_response_time, + MIN(response_time_ms) as min_response_time + FROM api_call_logs + WHERE created_at >= date('now', '-{} days') + """.format(days) + + params = [] + if api_key_id: + query = query.replace("WHERE created_at", "WHERE api_key_id = ? AND created_at") + params.insert(0, api_key_id) + + row = conn.execute(query, params).fetchone() + + # 按端点统计 + endpoint_query = """ + SELECT + endpoint, + method, + COUNT(*) as calls, + AVG(response_time_ms) as avg_time + FROM api_call_logs + WHERE created_at >= date('now', '-{} days') + """.format(days) + + endpoint_params = [] + if api_key_id: + endpoint_query = endpoint_query.replace("WHERE created_at", "WHERE api_key_id = ? AND created_at") + endpoint_params.insert(0, api_key_id) + + endpoint_query += " GROUP BY endpoint, method ORDER BY calls DESC" + + endpoint_rows = conn.execute(endpoint_query, endpoint_params).fetchall() + + # 按天统计 + daily_query = """ + SELECT + date(created_at) as date, + COUNT(*) as calls, + COUNT(CASE WHEN status_code < 400 THEN 1 END) as success + FROM api_call_logs + WHERE created_at >= date('now', '-{} days') + """.format(days) + + daily_params = [] + if api_key_id: + daily_query = daily_query.replace("WHERE created_at", "WHERE api_key_id = ? AND created_at") + daily_params.insert(0, api_key_id) + + daily_query += " GROUP BY date(created_at) ORDER BY date" + + daily_rows = conn.execute(daily_query, daily_params).fetchall() + + return { + "summary": { + "total_calls": row["total_calls"] or 0, + "success_calls": row["success_calls"] or 0, + "error_calls": row["error_calls"] or 0, + "avg_response_time_ms": round(row["avg_response_time"] or 0, 2), + "max_response_time_ms": row["max_response_time"] or 0, + "min_response_time_ms": row["min_response_time"] or 0, + }, + "endpoints": [dict(r) for r in endpoint_rows], + "daily": [dict(r) for r in daily_rows] + } + + def _row_to_api_key(self, row: sqlite3.Row) -> ApiKey: + """将数据库行转换为 ApiKey 对象""" + return ApiKey( + id=row["id"], + key_hash=row["key_hash"], + key_preview=row["key_preview"], + name=row["name"], + owner_id=row["owner_id"], + permissions=json.loads(row["permissions"]), + rate_limit=row["rate_limit"], + status=row["status"], + created_at=row["created_at"], + expires_at=row["expires_at"], + last_used_at=row["last_used_at"], + revoked_at=row["revoked_at"], + revoked_reason=row["revoked_reason"], + total_calls=row["total_calls"] + ) + + +# 全局实例 +_api_key_manager: Optional[ApiKeyManager] = None + + +def get_api_key_manager() -> ApiKeyManager: + """获取 API Key 管理器实例""" + global _api_key_manager + if _api_key_manager is None: + _api_key_manager = ApiKeyManager() + return _api_key_manager diff --git a/backend/main.py b/backend/main.py index cdf6792..ca915f5 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """ -InsightFlow Backend - Phase 3 (Memory & Growth) +InsightFlow Backend - Phase 6 (API Platform) +API 开放平台:API Key 管理、Swagger 文档、限流 Knowledge Growth: Multi-file fusion + Entity Alignment + Document Import ASR: 阿里云听悟 + OSS """ @@ -8,15 +9,19 @@ ASR: 阿里云听悟 + OSS import os import sys import json +import hashlib +import secrets import httpx import uuid import re import io -from fastapi import FastAPI, File, UploadFile, HTTPException, Form +import time +from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Depends, Header, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles -from pydantic import BaseModel -from typing import List, Optional, Union +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field +from typing import List, Optional, Union, Dict from datetime import datetime # Add backend directory to path for imports @@ -79,7 +84,66 @@ try: except ImportError: NEO4J_AVAILABLE = False -app = FastAPI(title="InsightFlow", version="0.3.0") +# Phase 6: API Key Manager +try: + from api_key_manager import get_api_key_manager, ApiKeyManager, ApiKey + API_KEY_AVAILABLE = True +except ImportError as e: + print(f"API Key Manager import error: {e}") + API_KEY_AVAILABLE = False + +# Phase 6: Rate Limiter +try: + from rate_limiter import get_rate_limiter, RateLimitConfig, RateLimitInfo + RATE_LIMITER_AVAILABLE = True +except ImportError as e: + print(f"Rate Limiter import error: {e}") + RATE_LIMITER_AVAILABLE = False + +# FastAPI app with enhanced metadata for Swagger +app = FastAPI( + title="InsightFlow API", + description=""" + InsightFlow 知识管理平台 API + + ## 功能 + + * **项目管理** - 创建、读取、更新、删除项目 + * **实体管理** - 实体提取、对齐、属性管理 + * **关系管理** - 实体关系创建、查询、分析 + * **转录管理** - 音频转录、文档导入 + * **知识推理** - 因果推理、对比分析、时序分析 + * **图分析** - Neo4j 图数据库集成、路径查询 + * **导出功能** - 多种格式导出(PDF、Excel、CSV、JSON) + + ## 认证 + + 大部分 API 需要 API Key 认证。在请求头中添加: + ``` + X-API-Key: your_api_key_here + ``` + """, + version="0.6.0", + contact={ + "name": "InsightFlow Team", + "url": "https://github.com/insightflow/insightflow", + }, + license_info={ + "name": "MIT", + "url": "https://opensource.org/licenses/MIT", + }, + openapi_tags=[ + {"name": "Projects", "description": "项目管理"}, + {"name": "Entities", "description": "实体管理"}, + {"name": "Relations", "description": "关系管理"}, + {"name": "Transcripts", "description": "转录管理"}, + {"name": "Analysis", "description": "分析和推理"}, + {"name": "Graph", "description": "图分析和 Neo4j"}, + {"name": "Export", "description": "数据导出"}, + {"name": "API Keys", "description": "API 密钥管理"}, + {"name": "System", "description": "系统信息"}, + ] +) app.add_middleware( CORSMiddleware, @@ -89,7 +153,252 @@ app.add_middleware( allow_headers=["*"], ) -# Models +# ==================== Phase 6: API Key Authentication & Rate Limiting ==================== + +# 公开访问的路径(不需要 API Key) +PUBLIC_PATHS = { + "/", "/docs", "/openapi.json", "/redoc", + "/api/v1/health", "/api/v1/status", + "/api/v1/api-keys", # POST 创建 API Key 不需要认证 +} + +# 管理路径(需要 master key) +ADMIN_PATHS = { + "/api/v1/admin/", +} + +# Master Key(用于管理所有 API Keys) +MASTER_KEY = os.getenv("INSIGHTFLOW_MASTER_KEY", "") + + +async def verify_api_key(request: Request, x_api_key: Optional[str] = Header(None, alias="X-API-Key")): + """ + 验证 API Key 的依赖函数 + + - 公开路径不需要认证 + - 管理路径需要 master key + - 其他路径需要有效的 API Key + """ + path = request.url.path + method = request.method + + # 公开路径直接放行 + if any(path.startswith(p) for p in PUBLIC_PATHS): + return None + + # 创建 API Key 的端点不需要认证(但需要 master key 或其他验证) + if path == "/api/v1/api-keys" and method == "POST": + return None + + # 检查是否是管理路径 + if any(path.startswith(p) for p in ADMIN_PATHS): + if not x_api_key or x_api_key != MASTER_KEY: + raise HTTPException( + status_code=403, + detail="Admin access required. Provide valid master key in X-API-Key header." + ) + return {"type": "admin", "key": x_api_key} + + # 其他路径需要有效的 API Key + if not API_KEY_AVAILABLE: + # API Key 模块不可用,允许访问(开发模式) + return None + + if not x_api_key: + raise HTTPException( + status_code=401, + detail="API Key required. Provide your key in X-API-Key header.", + headers={"WWW-Authenticate": "ApiKey"} + ) + + # 验证 API Key + key_manager = get_api_key_manager() + api_key = key_manager.validate_key(x_api_key) + + if not api_key: + raise HTTPException( + status_code=401, + detail="Invalid or expired API Key" + ) + + # 更新最后使用时间 + key_manager.update_last_used(api_key.id) + + # 将 API Key 信息存储在请求状态中,供后续使用 + request.state.api_key = api_key + + return {"type": "api_key", "key_id": api_key.id, "permissions": api_key.permissions} + + +async def rate_limit_middleware(request: Request, call_next): + """ + 限流中间件 + """ + if not RATE_LIMITER_AVAILABLE or not API_KEY_AVAILABLE: + response = await call_next(request) + return response + + path = request.url.path + + # 公开路径不限流 + if any(path.startswith(p) for p in PUBLIC_PATHS): + response = await call_next(request) + return response + + # 获取限流键 + limiter = get_rate_limiter() + + # 检查是否有 API Key + x_api_key = request.headers.get("X-API-Key") + + if x_api_key and x_api_key == MASTER_KEY: + # Master key 有更高的限流 + config = RateLimitConfig(requests_per_minute=1000) + limit_key = f"master:{x_api_key[:16]}" + elif hasattr(request.state, 'api_key') and request.state.api_key: + # 使用 API Key 的限流配置 + api_key = request.state.api_key + config = RateLimitConfig(requests_per_minute=api_key.rate_limit) + limit_key = f"api_key:{api_key.id}" + else: + # IP 限流(未认证用户) + client_ip = request.client.host if request.client else "unknown" + config = RateLimitConfig(requests_per_minute=10) + limit_key = f"ip:{client_ip}" + + # 检查限流 + info = await limiter.is_allowed(limit_key, config) + + if not info.allowed: + return JSONResponse( + status_code=429, + content={ + "error": "Rate limit exceeded", + "retry_after": info.retry_after, + "limit": config.requests_per_minute, + "window": "minute" + }, + headers={ + "X-RateLimit-Limit": str(config.requests_per_minute), + "X-RateLimit-Remaining": "0", + "X-RateLimit-Reset": str(info.reset_time), + "Retry-After": str(info.retry_after) + } + ) + + # 继续处理请求 + start_time = time.time() + response = await call_next(request) + + # 添加限流头 + response.headers["X-RateLimit-Limit"] = str(config.requests_per_minute) + response.headers["X-RateLimit-Remaining"] = str(info.remaining) + response.headers["X-RateLimit-Reset"] = str(info.reset_time) + + # 记录 API 调用日志 + try: + if hasattr(request.state, 'api_key') and request.state.api_key: + api_key = request.state.api_key + response_time = int((time.time() - start_time) * 1000) + key_manager = get_api_key_manager() + key_manager.log_api_call( + api_key_id=api_key.id, + endpoint=path, + method=request.method, + status_code=response.status_code, + response_time_ms=response_time, + ip_address=request.client.host if request.client else "", + user_agent=request.headers.get("User-Agent", "") + ) + except Exception as e: + # 日志记录失败不应影响主流程 + print(f"Failed to log API call: {e}") + + return response + + +# 添加限流中间件 +app.middleware("http")(rate_limit_middleware) + +# ==================== Phase 6: Pydantic Models for API ==================== + +# API Key 相关模型 +class ApiKeyCreate(BaseModel): + name: str = Field(..., description="API Key 名称/描述") + permissions: List[str] = Field(default=["read"], description="权限列表: read, write, delete") + rate_limit: int = Field(default=60, description="每分钟请求限制") + expires_days: Optional[int] = Field(default=None, description="过期天数(可选)") + + +class ApiKeyResponse(BaseModel): + id: str + key_preview: str + name: str + permissions: List[str] + rate_limit: int + status: str + created_at: str + expires_at: Optional[str] + last_used_at: Optional[str] + total_calls: int + + +class ApiKeyCreateResponse(BaseModel): + api_key: str = Field(..., description="API Key(仅显示一次,请妥善保存)") + info: ApiKeyResponse + + +class ApiKeyListResponse(BaseModel): + keys: List[ApiKeyResponse] + total: int + + +class ApiKeyUpdate(BaseModel): + name: Optional[str] = None + permissions: Optional[List[str]] = None + rate_limit: Optional[int] = None + + +class ApiCallStats(BaseModel): + total_calls: int + success_calls: int + error_calls: int + avg_response_time_ms: float + max_response_time_ms: int + min_response_time_ms: int + + +class ApiStatsResponse(BaseModel): + summary: ApiCallStats + endpoints: List[Dict] + daily: List[Dict] + + +class ApiCallLog(BaseModel): + id: int + endpoint: str + method: str + status_code: int + response_time_ms: int + ip_address: str + user_agent: str + error_message: str + created_at: str + + +class ApiLogsResponse(BaseModel): + logs: List[ApiCallLog] + total: int + + +class RateLimitStatus(BaseModel): + limit: int + remaining: int + reset_time: int + window: str + + +# 原有模型(保留) class EntityModel(BaseModel): id: str name: str @@ -166,8 +475,8 @@ def get_doc_processor(): return _doc_processor # Phase 2: Entity Edit API -@app.put("/api/v1/entities/{entity_id}") -async def update_entity(entity_id: str, update: EntityUpdate): +@app.put("/api/v1/entities/{entity_id}", tags=["Entities"]) +async def update_entity(entity_id: str, update: EntityUpdate, _=Depends(verify_api_key)): """更新实体信息(名称、类型、定义、别名)""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -189,8 +498,8 @@ async def update_entity(entity_id: str, update: EntityUpdate): "aliases": updated.aliases } -@app.delete("/api/v1/entities/{entity_id}") -async def delete_entity(entity_id: str): +@app.delete("/api/v1/entities/{entity_id}", tags=["Entities"]) +async def delete_entity(entity_id: str, _=Depends(verify_api_key)): """删除实体""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -203,8 +512,8 @@ async def delete_entity(entity_id: str): db.delete_entity(entity_id) return {"success": True, "message": f"Entity {entity_id} deleted"} -@app.post("/api/v1/entities/{entity_id}/merge") -async def merge_entities_endpoint(entity_id: str, merge_req: EntityMergeRequest): +@app.post("/api/v1/entities/{entity_id}/merge", tags=["Entities"]) +async def merge_entities_endpoint(entity_id: str, merge_req: EntityMergeRequest, _=Depends(verify_api_key)): """合并两个实体""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -231,8 +540,8 @@ async def merge_entities_endpoint(entity_id: str, merge_req: EntityMergeRequest) } # Phase 2: Relation Edit API -@app.post("/api/v1/projects/{project_id}/relations") -async def create_relation_endpoint(project_id: str, relation: RelationCreate): +@app.post("/api/v1/projects/{project_id}/relations", tags=["Relations"]) +async def create_relation_endpoint(project_id: str, relation: RelationCreate, _=Depends(verify_api_key)): """创建新的实体关系""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -262,8 +571,8 @@ async def create_relation_endpoint(project_id: str, relation: RelationCreate): "success": True } -@app.delete("/api/v1/relations/{relation_id}") -async def delete_relation(relation_id: str): +@app.delete("/api/v1/relations/{relation_id}", tags=["Relations"]) +async def delete_relation(relation_id: str, _=Depends(verify_api_key)): """删除关系""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -272,8 +581,8 @@ async def delete_relation(relation_id: str): db.delete_relation(relation_id) return {"success": True, "message": f"Relation {relation_id} deleted"} -@app.put("/api/v1/relations/{relation_id}") -async def update_relation(relation_id: str, relation: RelationCreate): +@app.put("/api/v1/relations/{relation_id}", tags=["Relations"]) +async def update_relation(relation_id: str, relation: RelationCreate, _=Depends(verify_api_key)): """更新关系""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -293,8 +602,8 @@ async def update_relation(relation_id: str, relation: RelationCreate): } # Phase 2: Transcript Edit API -@app.get("/api/v1/transcripts/{transcript_id}") -async def get_transcript(transcript_id: str): +@app.get("/api/v1/transcripts/{transcript_id}", tags=["Transcripts"]) +async def get_transcript(transcript_id: str, _=Depends(verify_api_key)): """获取转录详情""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -307,8 +616,8 @@ async def get_transcript(transcript_id: str): return transcript -@app.put("/api/v1/transcripts/{transcript_id}") -async def update_transcript(transcript_id: str, update: TranscriptUpdate): +@app.put("/api/v1/transcripts/{transcript_id}", tags=["Transcripts"]) +async def update_transcript(transcript_id: str, update: TranscriptUpdate, _=Depends(verify_api_key)): """更新转录文本(人工修正)""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -336,8 +645,8 @@ class ManualEntityCreate(BaseModel): start_pos: Optional[int] = None end_pos: Optional[int] = None -@app.post("/api/v1/projects/{project_id}/entities") -async def create_manual_entity(project_id: str, entity: ManualEntityCreate): +@app.post("/api/v1/projects/{project_id}/entities", tags=["Entities"]) +async def create_manual_entity(project_id: str, entity: ManualEntityCreate, _=Depends(verify_api_key)): """手动创建实体(划词新建)""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -500,8 +809,8 @@ def align_entity(project_id: str, name: str, db, definition: str = "") -> Option # API Endpoints -@app.post("/api/v1/projects", response_model=dict) -async def create_project(project: ProjectCreate): +@app.post("/api/v1/projects", response_model=dict, tags=["Projects"]) +async def create_project(project: ProjectCreate, _=Depends(verify_api_key)): """创建新项目""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -511,8 +820,8 @@ async def create_project(project: ProjectCreate): p = db.create_project(project_id, project.name, project.description) return {"id": p.id, "name": p.name, "description": p.description} -@app.get("/api/v1/projects") -async def list_projects(): +@app.get("/api/v1/projects", tags=["Projects"]) +async def list_projects(_=Depends(verify_api_key)): """列出所有项目""" if not DB_AVAILABLE: return [] @@ -521,8 +830,8 @@ async def list_projects(): projects = db.list_projects() return [{"id": p.id, "name": p.name, "description": p.description} for p in projects] -@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult) -async def upload_audio(project_id: str, file: UploadFile = File(...)): +@app.post("/api/v1/projects/{project_id}/upload", response_model=AnalysisResult, tags=["Projects"]) +async def upload_audio(project_id: str, file: UploadFile = File(...), _=Depends(verify_api_key)): """上传音频到指定项目 - Phase 3: 支持多文件融合""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -633,7 +942,7 @@ async def upload_audio(project_id: str, file: UploadFile = File(...)): # Phase 3: Document Upload API @app.post("/api/v1/projects/{project_id}/upload-document") -async def upload_document(project_id: str, file: UploadFile = File(...)): +async def upload_document(project_id: str, file: UploadFile = File(...), _=Depends(verify_api_key)): """上传 PDF/DOCX 文档到指定项目""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -745,7 +1054,7 @@ async def upload_document(project_id: str, file: UploadFile = File(...)): # Phase 3: Knowledge Base API @app.get("/api/v1/projects/{project_id}/knowledge-base") -async def get_knowledge_base(project_id: str): +async def get_knowledge_base(project_id: str, _=Depends(verify_api_key)): """获取项目知识库 - 包含所有实体、关系、术语表""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -842,7 +1151,7 @@ async def get_knowledge_base(project_id: str): # Phase 3: Glossary API @app.post("/api/v1/projects/{project_id}/glossary") -async def add_glossary_term(project_id: str, term: GlossaryTermCreate): +async def add_glossary_term(project_id: str, term: GlossaryTermCreate, _=Depends(verify_api_key)): """添加术语到项目术语表""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -866,7 +1175,7 @@ async def add_glossary_term(project_id: str, term: GlossaryTermCreate): } @app.get("/api/v1/projects/{project_id}/glossary") -async def get_glossary(project_id: str): +async def get_glossary(project_id: str, _=Depends(verify_api_key)): """获取项目术语表""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -876,7 +1185,7 @@ async def get_glossary(project_id: str): return glossary @app.delete("/api/v1/glossary/{term_id}") -async def delete_glossary_term(term_id: str): +async def delete_glossary_term(term_id: str, _=Depends(verify_api_key)): """删除术语""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -887,7 +1196,7 @@ async def delete_glossary_term(term_id: str): # Phase 3: Entity Alignment API @app.post("/api/v1/projects/{project_id}/align-entities") -async def align_project_entities(project_id: str, threshold: float = 0.85): +async def align_project_entities(project_id: str, threshold: float = 0.85, _=Depends(verify_api_key)): """运行实体对齐算法,合并相似实体""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -933,7 +1242,7 @@ async def align_project_entities(project_id: str, threshold: float = 0.85): } @app.get("/api/v1/projects/{project_id}/entities") -async def get_project_entities(project_id: str): +async def get_project_entities(project_id: str, _=Depends(verify_api_key)): """获取项目的全局实体列表""" if not DB_AVAILABLE: return [] @@ -944,7 +1253,7 @@ async def get_project_entities(project_id: str): @app.get("/api/v1/projects/{project_id}/relations") -async def get_project_relations(project_id: str): +async def get_project_relations(project_id: str, _=Depends(verify_api_key)): """获取项目的实体关系列表""" if not DB_AVAILABLE: return [] @@ -968,7 +1277,7 @@ async def get_project_relations(project_id: str): @app.get("/api/v1/projects/{project_id}/transcripts") -async def get_project_transcripts(project_id: str): +async def get_project_transcripts(project_id: str, _=Depends(verify_api_key)): """获取项目的转录列表""" if not DB_AVAILABLE: return [] @@ -985,7 +1294,7 @@ async def get_project_transcripts(project_id: str): @app.get("/api/v1/entities/{entity_id}/mentions") -async def get_entity_mentions(entity_id: str): +async def get_entity_mentions(entity_id: str, _=Depends(verify_api_key)): """获取实体的所有提及位置""" if not DB_AVAILABLE: return [] @@ -1021,7 +1330,7 @@ async def health_check(): # ==================== Phase 4: Agent 助手 API ==================== @app.post("/api/v1/projects/{project_id}/agent/query") -async def agent_query(project_id: str, query: AgentQuery): +async def agent_query(project_id: str, query: AgentQuery, _=Depends(verify_api_key)): """Agent RAG 问答""" if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE: raise HTTPException(status_code=500, detail="Service not available") @@ -1075,7 +1384,7 @@ async def agent_query(project_id: str, query: AgentQuery): @app.post("/api/v1/projects/{project_id}/agent/command") -async def agent_command(project_id: str, command: AgentCommand): +async def agent_command(project_id: str, command: AgentCommand, _=Depends(verify_api_key)): """Agent 指令执行 - 解析并执行自然语言指令""" if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE: raise HTTPException(status_code=500, detail="Service not available") @@ -1166,7 +1475,7 @@ async def agent_command(project_id: str, command: AgentCommand): @app.get("/api/v1/projects/{project_id}/agent/suggest") -async def agent_suggest(project_id: str): +async def agent_suggest(project_id: str, _=Depends(verify_api_key)): """获取 Agent 建议 - 基于项目数据提供洞察""" if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE: raise HTTPException(status_code=500, detail="Service not available") @@ -1206,7 +1515,7 @@ async def agent_suggest(project_id: str): # ==================== Phase 4: 知识溯源 API ==================== @app.get("/api/v1/relations/{relation_id}/provenance") -async def get_relation_provenance(relation_id: str): +async def get_relation_provenance(relation_id: str, _=Depends(verify_api_key)): """获取关系的知识溯源信息""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1231,7 +1540,7 @@ async def get_relation_provenance(relation_id: str): @app.get("/api/v1/entities/{entity_id}/details") -async def get_entity_details(entity_id: str): +async def get_entity_details(entity_id: str, _=Depends(verify_api_key)): """获取实体详情,包含所有提及位置""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1246,7 +1555,7 @@ async def get_entity_details(entity_id: str): @app.get("/api/v1/entities/{entity_id}/evolution") -async def get_entity_evolution(entity_id: str): +async def get_entity_evolution(entity_id: str, _=Depends(verify_api_key)): """分析实体的演变和态度变化""" if not DB_AVAILABLE or not LLM_CLIENT_AVAILABLE: raise HTTPException(status_code=500, detail="Service not available") @@ -1281,7 +1590,7 @@ async def get_entity_evolution(entity_id: str): # ==================== Phase 4: 实体管理增强 API ==================== @app.get("/api/v1/projects/{project_id}/entities/search") -async def search_entities(project_id: str, q: str): +async def search_entities(project_id: str, q: str, _=Depends(verify_api_key)): """搜索实体""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1298,7 +1607,8 @@ async def get_project_timeline( project_id: str, entity_id: str = None, start_date: str = None, - end_date: str = None + end_date: str = None, + _=Depends(verify_api_key) ): """获取项目时间线 - 按时间顺序的实体提及和关系事件""" if not DB_AVAILABLE: @@ -1319,7 +1629,7 @@ async def get_project_timeline( @app.get("/api/v1/projects/{project_id}/timeline/summary") -async def get_timeline_summary(project_id: str): +async def get_timeline_summary(project_id: str, _=Depends(verify_api_key)): """获取项目时间线摘要统计""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1339,7 +1649,7 @@ async def get_timeline_summary(project_id: str): @app.get("/api/v1/entities/{entity_id}/timeline") -async def get_entity_timeline(entity_id: str): +async def get_entity_timeline(entity_id: str, _=Depends(verify_api_key)): """获取单个实体的时间线""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1369,7 +1679,7 @@ class ReasoningQuery(BaseModel): @app.post("/api/v1/projects/{project_id}/reasoning/query") -async def reasoning_query(project_id: str, query: ReasoningQuery): +async def reasoning_query(project_id: str, query: ReasoningQuery, _=Depends(verify_api_key)): """ 增强问答 - 基于知识推理的智能问答 @@ -1423,7 +1733,8 @@ async def reasoning_query(project_id: str, query: ReasoningQuery): async def find_inference_path( project_id: str, start_entity: str, - end_entity: str + end_entity: str, + _=Depends(verify_api_key) ): """ 发现两个实体之间的推理路径 @@ -1472,7 +1783,7 @@ class SummaryRequest(BaseModel): @app.post("/api/v1/projects/{project_id}/reasoning/summary") -async def project_summary(project_id: str, req: SummaryRequest): +async def project_summary(project_id: str, req: SummaryRequest, _=Depends(verify_api_key)): """ 项目智能总结 @@ -1557,7 +1868,7 @@ class EntityAttributeBatchSet(BaseModel): # 属性模板管理 API @app.post("/api/v1/projects/{project_id}/attribute-templates") -async def create_attribute_template_endpoint(project_id: str, template: AttributeTemplateCreate): +async def create_attribute_template_endpoint(project_id: str, template: AttributeTemplateCreate, _=Depends(verify_api_key)): """创建属性模板""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1592,7 +1903,7 @@ async def create_attribute_template_endpoint(project_id: str, template: Attribut @app.get("/api/v1/projects/{project_id}/attribute-templates") -async def list_attribute_templates_endpoint(project_id: str): +async def list_attribute_templates_endpoint(project_id: str, _=Depends(verify_api_key)): """列出项目的所有属性模板""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1616,7 +1927,7 @@ async def list_attribute_templates_endpoint(project_id: str): @app.get("/api/v1/attribute-templates/{template_id}") -async def get_attribute_template_endpoint(template_id: str): +async def get_attribute_template_endpoint(template_id: str, _=Depends(verify_api_key)): """获取属性模板详情""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1640,7 +1951,7 @@ async def get_attribute_template_endpoint(template_id: str): @app.put("/api/v1/attribute-templates/{template_id}") -async def update_attribute_template_endpoint(template_id: str, update: AttributeTemplateUpdate): +async def update_attribute_template_endpoint(template_id: str, update: AttributeTemplateUpdate, _=Depends(verify_api_key)): """更新属性模板""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1662,7 +1973,7 @@ async def update_attribute_template_endpoint(template_id: str, update: Attribute @app.delete("/api/v1/attribute-templates/{template_id}") -async def delete_attribute_template_endpoint(template_id: str): +async def delete_attribute_template_endpoint(template_id: str, _=Depends(verify_api_key)): """删除属性模板""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1675,7 +1986,7 @@ async def delete_attribute_template_endpoint(template_id: str): # 实体属性值管理 API @app.post("/api/v1/entities/{entity_id}/attributes") -async def set_entity_attribute_endpoint(entity_id: str, attr: EntityAttributeSet): +async def set_entity_attribute_endpoint(entity_id: str, attr: EntityAttributeSet, _=Depends(verify_api_key)): """设置实体属性值""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1762,7 +2073,7 @@ async def set_entity_attribute_endpoint(entity_id: str, attr: EntityAttributeSet @app.post("/api/v1/entities/{entity_id}/attributes/batch") -async def batch_set_entity_attributes_endpoint(entity_id: str, batch: EntityAttributeBatchSet): +async def batch_set_entity_attributes_endpoint(entity_id: str, batch: EntityAttributeBatchSet, _=Depends(verify_api_key)): """批量设置实体属性值""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1801,7 +2112,7 @@ async def batch_set_entity_attributes_endpoint(entity_id: str, batch: EntityAttr @app.get("/api/v1/entities/{entity_id}/attributes") -async def get_entity_attributes_endpoint(entity_id: str): +async def get_entity_attributes_endpoint(entity_id: str, _=Depends(verify_api_key)): """获取实体的所有属性值""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1827,7 +2138,7 @@ async def get_entity_attributes_endpoint(entity_id: str): @app.delete("/api/v1/entities/{entity_id}/attributes/{template_id}") async def delete_entity_attribute_endpoint(entity_id: str, template_id: str, - reason: Optional[str] = ""): + reason: Optional[str] = "", _=Depends(verify_api_key)): """删除实体属性值""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1841,7 +2152,7 @@ async def delete_entity_attribute_endpoint(entity_id: str, template_id: str, # 属性历史 API @app.get("/api/v1/entities/{entity_id}/attributes/history") -async def get_entity_attribute_history_endpoint(entity_id: str, limit: int = 50): +async def get_entity_attribute_history_endpoint(entity_id: str, limit: int = 50, _=Depends(verify_api_key)): """获取实体的属性变更历史""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1864,7 +2175,7 @@ async def get_entity_attribute_history_endpoint(entity_id: str, limit: int = 50) @app.get("/api/v1/attribute-templates/{template_id}/history") -async def get_template_history_endpoint(template_id: str, limit: int = 50): +async def get_template_history_endpoint(template_id: str, limit: int = 50, _=Depends(verify_api_key)): """获取属性模板的所有变更历史(跨实体)""" if not DB_AVAILABLE: raise HTTPException(status_code=500, detail="Database not available") @@ -1891,7 +2202,8 @@ async def get_template_history_endpoint(template_id: str, limit: int = 50): @app.get("/api/v1/projects/{project_id}/entities/search-by-attributes") async def search_entities_by_attributes_endpoint( project_id: str, - attribute_filter: Optional[str] = None # JSON 格式: {"职位": "经理", "部门": "技术部"} + attribute_filter: Optional[str] = None, # JSON 格式: {"职位": "经理", "部门": "技术部"} + _=Depends(verify_api_key) ): """根据属性筛选搜索实体""" if not DB_AVAILABLE: @@ -1928,7 +2240,7 @@ async def search_entities_by_attributes_endpoint( from fastapi.responses import StreamingResponse, FileResponse @app.get("/api/v1/projects/{project_id}/export/graph-svg") -async def export_graph_svg_endpoint(project_id: str): +async def export_graph_svg_endpoint(project_id: str, _=Depends(verify_api_key)): """导出知识图谱为 SVG""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -1978,7 +2290,7 @@ async def export_graph_svg_endpoint(project_id: str): @app.get("/api/v1/projects/{project_id}/export/graph-png") -async def export_graph_png_endpoint(project_id: str): +async def export_graph_png_endpoint(project_id: str, _=Depends(verify_api_key)): """导出知识图谱为 PNG""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -2028,7 +2340,7 @@ async def export_graph_png_endpoint(project_id: str): @app.get("/api/v1/projects/{project_id}/export/entities-excel") -async def export_entities_excel_endpoint(project_id: str): +async def export_entities_excel_endpoint(project_id: str, _=Depends(verify_api_key)): """导出实体数据为 Excel""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -2065,7 +2377,7 @@ async def export_entities_excel_endpoint(project_id: str): @app.get("/api/v1/projects/{project_id}/export/entities-csv") -async def export_entities_csv_endpoint(project_id: str): +async def export_entities_csv_endpoint(project_id: str, _=Depends(verify_api_key)): """导出实体数据为 CSV""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -2102,7 +2414,7 @@ async def export_entities_csv_endpoint(project_id: str): @app.get("/api/v1/projects/{project_id}/export/relations-csv") -async def export_relations_csv_endpoint(project_id: str): +async def export_relations_csv_endpoint(project_id: str, _=Depends(verify_api_key)): """导出关系数据为 CSV""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -2137,7 +2449,7 @@ async def export_relations_csv_endpoint(project_id: str): @app.get("/api/v1/projects/{project_id}/export/report-pdf") -async def export_report_pdf_endpoint(project_id: str): +async def export_report_pdf_endpoint(project_id: str, _=Depends(verify_api_key)): """导出项目报告为 PDF""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -2212,7 +2524,7 @@ async def export_report_pdf_endpoint(project_id: str): @app.get("/api/v1/projects/{project_id}/export/project-json") -async def export_project_json_endpoint(project_id: str): +async def export_project_json_endpoint(project_id: str, _=Depends(verify_api_key)): """导出完整项目数据为 JSON""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -2277,7 +2589,7 @@ async def export_project_json_endpoint(project_id: str): @app.get("/api/v1/transcripts/{transcript_id}/export/markdown") -async def export_transcript_markdown_endpoint(transcript_id: str): +async def export_transcript_markdown_endpoint(transcript_id: str, _=Depends(verify_api_key)): """导出转录文本为 Markdown""" if not DB_AVAILABLE or not EXPORT_AVAILABLE: raise HTTPException(status_code=500, detail="Export functionality not available") @@ -2343,7 +2655,7 @@ class GraphQueryRequest(BaseModel): depth: int = 1 @app.get("/api/v1/neo4j/status") -async def neo4j_status(): +async def neo4j_status(_=Depends(verify_api_key)): """获取 Neo4j 连接状态""" if not NEO4J_AVAILABLE: return { @@ -2369,7 +2681,7 @@ async def neo4j_status(): } @app.post("/api/v1/neo4j/sync") -async def neo4j_sync_project(request: Neo4jSyncRequest): +async def neo4j_sync_project(request: Neo4jSyncRequest, _=Depends(verify_api_key)): """同步项目数据到 Neo4j""" if not NEO4J_AVAILABLE: raise HTTPException(status_code=503, detail="Neo4j not available") @@ -2429,7 +2741,7 @@ async def neo4j_sync_project(request: Neo4jSyncRequest): } @app.get("/api/v1/projects/{project_id}/graph/stats") -async def get_graph_stats(project_id: str): +async def get_graph_stats(project_id: str, _=Depends(verify_api_key)): """获取项目图统计信息""" if not NEO4J_AVAILABLE: raise HTTPException(status_code=503, detail="Neo4j not available") @@ -2442,7 +2754,7 @@ async def get_graph_stats(project_id: str): return stats @app.post("/api/v1/graph/shortest-path") -async def find_shortest_path(request: PathQueryRequest): +async def find_shortest_path(request: PathQueryRequest, _=Depends(verify_api_key)): """查找两个实体之间的最短路径""" if not NEO4J_AVAILABLE: raise HTTPException(status_code=503, detail="Neo4j not available") @@ -2473,7 +2785,7 @@ async def find_shortest_path(request: PathQueryRequest): } @app.post("/api/v1/graph/paths") -async def find_all_paths(request: PathQueryRequest): +async def find_all_paths(request: PathQueryRequest, _=Depends(verify_api_key)): """查找两个实体之间的所有路径""" if not NEO4J_AVAILABLE: raise HTTPException(status_code=503, detail="Neo4j not available") @@ -2504,7 +2816,8 @@ async def find_all_paths(request: PathQueryRequest): async def get_entity_neighbors( entity_id: str, relation_type: str = None, - limit: int = 50 + limit: int = 50, + _=Depends(verify_api_key) ): """获取实体的邻居节点""" if not NEO4J_AVAILABLE: @@ -2522,7 +2835,7 @@ async def get_entity_neighbors( } @app.get("/api/v1/entities/{entity_id1}/common-neighbors/{entity_id2}") -async def get_common_neighbors(entity_id1: str, entity_id2: str): +async def get_common_neighbors(entity_id1: str, entity_id2: str, _=Depends(verify_api_key)): """获取两个实体的共同邻居""" if not NEO4J_AVAILABLE: raise HTTPException(status_code=503, detail="Neo4j not available") @@ -2542,7 +2855,8 @@ async def get_common_neighbors(entity_id1: str, entity_id2: str): @app.get("/api/v1/projects/{project_id}/graph/centrality") async def get_centrality_analysis( project_id: str, - metric: str = "degree" + metric: str = "degree", + _=Depends(verify_api_key) ): """获取中心性分析结果""" if not NEO4J_AVAILABLE: @@ -2568,7 +2882,7 @@ async def get_centrality_analysis( } @app.get("/api/v1/projects/{project_id}/graph/communities") -async def get_communities(project_id: str): +async def get_communities(project_id: str, _=Depends(verify_api_key)): """获取社区发现结果""" if not NEO4J_AVAILABLE: raise HTTPException(status_code=503, detail="Neo4j not available") @@ -2592,7 +2906,7 @@ async def get_communities(project_id: str): } @app.post("/api/v1/graph/subgraph") -async def get_subgraph(request: GraphQueryRequest): +async def get_subgraph(request: GraphQueryRequest, _=Depends(verify_api_key)): """获取子图""" if not NEO4J_AVAILABLE: raise HTTPException(status_code=503, detail="Neo4j not available") @@ -2605,6 +2919,330 @@ async def get_subgraph(request: GraphQueryRequest): return subgraph +# ==================== Phase 6: API Key Management Endpoints ==================== + +@app.post("/api/v1/api-keys", response_model=ApiKeyCreateResponse, tags=["API Keys"]) +async def create_api_key(request: ApiKeyCreate, _=Depends(verify_api_key)): + """ + 创建新的 API Key + + - **name**: API Key 的名称/描述 + - **permissions**: 权限列表,可选值: read, write, delete + - **rate_limit**: 每分钟请求限制,默认 60 + - **expires_days**: 过期天数(可选,不设置则永不过期) + """ + if not API_KEY_AVAILABLE: + raise HTTPException(status_code=503, detail="API Key management not available") + + key_manager = get_api_key_manager() + raw_key, api_key = key_manager.create_key( + name=request.name, + permissions=request.permissions, + rate_limit=request.rate_limit, + expires_days=request.expires_days + ) + + return ApiKeyCreateResponse( + api_key=raw_key, + info=ApiKeyResponse( + id=api_key.id, + key_preview=api_key.key_preview, + name=api_key.name, + permissions=api_key.permissions, + rate_limit=api_key.rate_limit, + status=api_key.status, + created_at=api_key.created_at, + expires_at=api_key.expires_at, + last_used_at=api_key.last_used_at, + total_calls=api_key.total_calls + ) + ) + + +@app.get("/api/v1/api-keys", response_model=ApiKeyListResponse, tags=["API Keys"]) +async def list_api_keys( + status: Optional[str] = None, + limit: int = 100, + offset: int = 0, + _=Depends(verify_api_key) +): + """ + 列出所有 API Keys + + - **status**: 按状态筛选 (active, revoked, expired) + - **limit**: 返回数量限制 + - **offset**: 分页偏移 + """ + if not API_KEY_AVAILABLE: + raise HTTPException(status_code=503, detail="API Key management not available") + + key_manager = get_api_key_manager() + keys = key_manager.list_keys(status=status, limit=limit, offset=offset) + + return ApiKeyListResponse( + keys=[ + ApiKeyResponse( + id=k.id, + key_preview=k.key_preview, + name=k.name, + permissions=k.permissions, + rate_limit=k.rate_limit, + status=k.status, + created_at=k.created_at, + expires_at=k.expires_at, + last_used_at=k.last_used_at, + total_calls=k.total_calls + ) + for k in keys + ], + total=len(keys) + ) + + +@app.get("/api/v1/api-keys/{key_id}", response_model=ApiKeyResponse, tags=["API Keys"]) +async def get_api_key(key_id: str, _=Depends(verify_api_key)): + """获取单个 API Key 详情""" + if not API_KEY_AVAILABLE: + raise HTTPException(status_code=503, detail="API Key management not available") + + key_manager = get_api_key_manager() + key = key_manager.get_key_by_id(key_id) + + if not key: + raise HTTPException(status_code=404, detail="API Key not found") + + return ApiKeyResponse( + id=key.id, + key_preview=key.key_preview, + name=key.name, + permissions=key.permissions, + rate_limit=key.rate_limit, + status=key.status, + created_at=key.created_at, + expires_at=key.expires_at, + last_used_at=key.last_used_at, + total_calls=key.total_calls + ) + + +@app.patch("/api/v1/api-keys/{key_id}", response_model=ApiKeyResponse, tags=["API Keys"]) +async def update_api_key(key_id: str, request: ApiKeyUpdate, _=Depends(verify_api_key)): + """ + 更新 API Key 信息 + + 可以更新的字段:name, permissions, rate_limit + """ + if not API_KEY_AVAILABLE: + raise HTTPException(status_code=503, detail="API Key management not available") + + key_manager = get_api_key_manager() + + # 构建更新数据 + updates = {} + if request.name is not None: + updates["name"] = request.name + if request.permissions is not None: + updates["permissions"] = request.permissions + if request.rate_limit is not None: + updates["rate_limit"] = request.rate_limit + + if not updates: + raise HTTPException(status_code=400, detail="No fields to update") + + success = key_manager.update_key(key_id, **updates) + + if not success: + raise HTTPException(status_code=404, detail="API Key not found") + + # 返回更新后的 key + key = key_manager.get_key_by_id(key_id) + return ApiKeyResponse( + id=key.id, + key_preview=key.key_preview, + name=key.name, + permissions=key.permissions, + rate_limit=key.rate_limit, + status=key.status, + created_at=key.created_at, + expires_at=key.expires_at, + last_used_at=key.last_used_at, + total_calls=key.total_calls + ) + + +@app.delete("/api/v1/api-keys/{key_id}", tags=["API Keys"]) +async def revoke_api_key(key_id: str, reason: str = "", _=Depends(verify_api_key)): + """ + 撤销 API Key + + 撤销后的 Key 将无法再使用,但记录会保留用于审计 + """ + if not API_KEY_AVAILABLE: + raise HTTPException(status_code=503, detail="API Key management not available") + + key_manager = get_api_key_manager() + success = key_manager.revoke_key(key_id, reason=reason) + + if not success: + raise HTTPException(status_code=404, detail="API Key not found or already revoked") + + return {"success": True, "message": f"API Key {key_id} revoked"} + + +@app.get("/api/v1/api-keys/{key_id}/stats", response_model=ApiStatsResponse, tags=["API Keys"]) +async def get_api_key_stats(key_id: str, days: int = 30, _=Depends(verify_api_key)): + """ + 获取 API Key 的调用统计 + + - **days**: 统计天数,默认 30 天 + """ + if not API_KEY_AVAILABLE: + raise HTTPException(status_code=503, detail="API Key management not available") + + key_manager = get_api_key_manager() + + # 验证 key 存在 + key = key_manager.get_key_by_id(key_id) + if not key: + raise HTTPException(status_code=404, detail="API Key not found") + + stats = key_manager.get_call_stats(key_id, days=days) + + return ApiStatsResponse( + summary=ApiCallStats(**stats["summary"]), + endpoints=stats["endpoints"], + daily=stats["daily"] + ) + + +@app.get("/api/v1/api-keys/{key_id}/logs", response_model=ApiLogsResponse, tags=["API Keys"]) +async def get_api_key_logs( + key_id: str, + limit: int = 100, + offset: int = 0, + _=Depends(verify_api_key) +): + """ + 获取 API Key 的调用日志 + + - **limit**: 返回数量限制 + - **offset**: 分页偏移 + """ + if not API_KEY_AVAILABLE: + raise HTTPException(status_code=503, detail="API Key management not available") + + key_manager = get_api_key_manager() + + # 验证 key 存在 + key = key_manager.get_key_by_id(key_id) + if not key: + raise HTTPException(status_code=404, detail="API Key not found") + + logs = key_manager.get_call_logs(key_id, limit=limit, offset=offset) + + return ApiLogsResponse( + logs=[ + ApiCallLog( + id=log["id"], + endpoint=log["endpoint"], + method=log["method"], + status_code=log["status_code"], + response_time_ms=log["response_time_ms"], + ip_address=log["ip_address"], + user_agent=log["user_agent"], + error_message=log["error_message"], + created_at=log["created_at"] + ) + for log in logs + ], + total=len(logs) + ) + + +@app.get("/api/v1/rate-limit/status", response_model=RateLimitStatus, tags=["API Keys"]) +async def get_rate_limit_status(request: Request, _=Depends(verify_api_key)): + """获取当前请求的限流状态""" + if not RATE_LIMITER_AVAILABLE: + return RateLimitStatus( + limit=60, + remaining=60, + reset_time=int(time.time()) + 60, + window="minute" + ) + + limiter = get_rate_limiter() + + # 获取限流键 + if hasattr(request.state, 'api_key') and request.state.api_key: + api_key = request.state.api_key + limit_key = f"api_key:{api_key.id}" + limit = api_key.rate_limit + else: + client_ip = request.client.host if request.client else "unknown" + limit_key = f"ip:{client_ip}" + limit = 10 + + info = await limiter.get_limit_info(limit_key) + + return RateLimitStatus( + limit=limit, + remaining=info.remaining, + reset_time=info.reset_time, + window="minute" + ) + + +# ==================== Phase 6: System Endpoints ==================== + +@app.get("/api/v1/health", tags=["System"]) +async def health_check(): + """健康检查端点""" + return { + "status": "healthy", + "version": "0.6.0", + "timestamp": datetime.now().isoformat() + } + + +@app.get("/api/v1/status", tags=["System"]) +async def system_status(): + """系统状态信息""" + status = { + "version": "0.6.0", + "phase": "Phase 6 - API Platform", + "features": { + "database": DB_AVAILABLE, + "oss": OSS_AVAILABLE, + "tingwu": TINGWU_AVAILABLE, + "llm": LLM_CLIENT_AVAILABLE, + "neo4j": NEO4J_AVAILABLE, + "export": EXPORT_AVAILABLE, + "api_keys": API_KEY_AVAILABLE, + "rate_limiting": RATE_LIMITER_AVAILABLE, + }, + "api": { + "documentation": "/docs", + "openapi": "/openapi.json", + }, + "timestamp": datetime.now().isoformat() + } + + return status + + +@app.get("/api/v1/openapi.json", include_in_schema=False) +async def get_openapi(): + """获取 OpenAPI 规范""" + from fastapi.openapi.utils import get_openapi + return get_openapi( + title=app.title, + version=app.version, + description=app.description, + routes=app.routes, + tags=app.openapi_tags + ) + + # Serve frontend - MUST be last to not override API routes app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend") diff --git a/backend/rate_limiter.py b/backend/rate_limiter.py new file mode 100644 index 0000000..878306b --- /dev/null +++ b/backend/rate_limiter.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python3 +""" +InsightFlow Rate Limiter - Phase 6 +API 限流中间件 +支持基于内存的滑动窗口限流 +""" + +import time +import asyncio +from typing import Dict, Optional, Tuple, Callable +from dataclasses import dataclass, field +from collections import defaultdict +from functools import wraps + + +@dataclass +class RateLimitConfig: + """限流配置""" + requests_per_minute: int = 60 + burst_size: int = 10 # 突发请求数 + window_size: int = 60 # 窗口大小(秒) + + +@dataclass +class RateLimitInfo: + """限流信息""" + allowed: bool + remaining: int + reset_time: int # 重置时间戳 + retry_after: int # 需要等待的秒数 + + +class SlidingWindowCounter: + """滑动窗口计数器""" + + def __init__(self, window_size: int = 60): + self.window_size = window_size + self.requests: Dict[int, int] = defaultdict(int) # 秒级计数 + self._lock = asyncio.Lock() + + async def add_request(self) -> int: + """添加请求,返回当前窗口内的请求数""" + async with self._lock: + now = int(time.time()) + self.requests[now] += 1 + self._cleanup_old(now) + return sum(self.requests.values()) + + async def get_count(self) -> int: + """获取当前窗口内的请求数""" + async with self._lock: + now = int(time.time()) + self._cleanup_old(now) + return sum(self.requests.values()) + + def _cleanup_old(self, now: int): + """清理过期的请求记录""" + cutoff = now - self.window_size + old_keys = [k for k in self.requests.keys() if k < cutoff] + for k in old_keys: + del self.requests[k] + + +class RateLimiter: + """API 限流器""" + + def __init__(self): + # key -> SlidingWindowCounter + self.counters: Dict[str, SlidingWindowCounter] = {} + # key -> RateLimitConfig + self.configs: Dict[str, RateLimitConfig] = {} + self._lock = asyncio.Lock() + + async def is_allowed( + self, + key: str, + config: Optional[RateLimitConfig] = None + ) -> RateLimitInfo: + """ + 检查是否允许请求 + + Args: + key: 限流键(如 API Key ID) + config: 限流配置,如果为 None 则使用默认配置 + + Returns: + RateLimitInfo + """ + if config is None: + config = RateLimitConfig() + + async with self._lock: + if key not in self.counters: + self.counters[key] = SlidingWindowCounter(config.window_size) + self.configs[key] = config + + counter = self.counters[key] + stored_config = self.configs.get(key, config) + + # 获取当前计数 + current_count = await counter.get_count() + + # 计算剩余配额 + remaining = max(0, stored_config.requests_per_minute - current_count) + + # 计算重置时间 + now = int(time.time()) + reset_time = now + stored_config.window_size + + # 检查是否超过限制 + if current_count >= stored_config.requests_per_minute: + return RateLimitInfo( + allowed=False, + remaining=0, + reset_time=reset_time, + retry_after=stored_config.window_size + ) + + # 允许请求,增加计数 + await counter.add_request() + + return RateLimitInfo( + allowed=True, + remaining=remaining - 1, + reset_time=reset_time, + retry_after=0 + ) + + async def get_limit_info(self, key: str) -> RateLimitInfo: + """获取限流信息(不增加计数)""" + if key not in self.counters: + config = RateLimitConfig() + return RateLimitInfo( + allowed=True, + remaining=config.requests_per_minute, + reset_time=int(time.time()) + config.window_size, + retry_after=0 + ) + + counter = self.counters[key] + config = self.configs.get(key, RateLimitConfig()) + + current_count = await counter.get_count() + remaining = max(0, config.requests_per_minute - current_count) + reset_time = int(time.time()) + config.window_size + + return RateLimitInfo( + allowed=current_count < config.requests_per_minute, + remaining=remaining, + reset_time=reset_time, + retry_after=max(0, config.window_size) if current_count >= config.requests_per_minute else 0 + ) + + def reset(self, key: Optional[str] = None): + """重置限流计数器""" + if key: + self.counters.pop(key, None) + self.configs.pop(key, None) + else: + self.counters.clear() + self.configs.clear() + + +# 全局限流器实例 +_rate_limiter: Optional[RateLimiter] = None + + +def get_rate_limiter() -> RateLimiter: + """获取限流器实例""" + global _rate_limiter + if _rate_limiter is None: + _rate_limiter = RateLimiter() + return _rate_limiter + + +# 限流装饰器(用于函数级别限流) +def rate_limit( + requests_per_minute: int = 60, + key_func: Optional[Callable] = None +): + """ + 限流装饰器 + + Args: + requests_per_minute: 每分钟请求数限制 + key_func: 生成限流键的函数,默认为 None(使用函数名) + """ + def decorator(func): + limiter = get_rate_limiter() + config = RateLimitConfig(requests_per_minute=requests_per_minute) + + @wraps(func) + async def async_wrapper(*args, **kwargs): + key = key_func(*args, **kwargs) if key_func else func.__name__ + info = await limiter.is_allowed(key, config) + + if not info.allowed: + raise RateLimitExceeded( + f"Rate limit exceeded. Try again in {info.retry_after} seconds." + ) + + return await func(*args, **kwargs) + + @wraps(func) + def sync_wrapper(*args, **kwargs): + key = key_func(*args, **kwargs) if key_func else func.__name__ + # 同步版本使用 asyncio.run + info = asyncio.run(limiter.is_allowed(key, config)) + + if not info.allowed: + raise RateLimitExceeded( + f"Rate limit exceeded. Try again in {info.retry_after} seconds." + ) + + return func(*args, **kwargs) + + return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper + return decorator + + +class RateLimitExceeded(Exception): + """限流异常""" + pass diff --git a/backend/requirements.txt b/backend/requirements.txt index 04fcb73..c3baa06 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -30,3 +30,6 @@ cairosvg==2.7.1 # Neo4j Graph Database neo4j==5.15.0 + +# API Documentation (Swagger/OpenAPI) +fastapi-offline-swagger==0.1.0 diff --git a/frontend/app.js b/frontend/app.js index 8653215..4c831f9 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -1,4 +1,4 @@ -// InsightFlow Frontend - Phase 5 (Graph Analysis) +// InsightFlow Frontend - Phase 6 (API Platform) const API_BASE = '/api/v1'; let currentProject = null; @@ -98,2858 +98,339 @@ async function loadProjectData() { segments: [], entities: projectEntities, full_text: '', - created_at: new Date().toISOString() + relations: projectRelations }; + renderTranscript(); renderGraph(); renderEntityList(); - // 更新图分析面板的实体选择器 - populateGraphEntitySelects(); - } catch (err) { console.error('Load project data failed:', err); } } async function preloadEntityDetails() { - // 并行加载所有实体详情 - const promises = projectEntities.map(async (ent) => { + const promises = projectEntities.slice(0, 20).map(async entity => { try { - const res = await fetch(`${API_BASE}/entities/${ent.id}/details`); + const res = await fetch(`${API_BASE}/entities/${entity.id}/details`); if (res.ok) { - entityDetailsCache[ent.id] = await res.json(); + entityDetailsCache[entity.id] = await res.json(); } } catch (e) { - console.error(`Failed to load entity ${ent.id} details:`, e); + // Ignore errors } }); await Promise.all(promises); } -// ==================== Agent Panel ==================== - -function initAgentPanel() { - const chatInput = document.getElementById('chatInput'); - if (chatInput) { - chatInput.addEventListener('keypress', (e) => { - if (e.key === 'Enter' && !e.shiftKey) { - e.preventDefault(); - sendAgentMessage(); - } - }); - } -} - -function toggleAgentPanel() { - const panel = document.getElementById('agentPanel'); - const toggle = panel.querySelector('.agent-toggle'); - panel.classList.toggle('collapsed'); - toggle.textContent = panel.classList.contains('collapsed') ? '‹' : '›'; -} - -function addChatMessage(content, isUser = false, isTyping = false) { - const container = document.getElementById('chatMessages'); - const msgDiv = document.createElement('div'); - msgDiv.className = `chat-message ${isUser ? 'user' : 'assistant'}`; - - if (isTyping) { - msgDiv.innerHTML = ` -
- `; - } else { - msgDiv.innerHTML = ``; - } - - container.appendChild(msgDiv); - container.scrollTop = container.scrollHeight; - return msgDiv; -} - -function removeTypingIndicator() { - const indicator = document.getElementById('typingIndicator'); - if (indicator) { - indicator.parentElement.remove(); - } -} - -async function sendAgentMessage() { - const input = document.getElementById('chatInput'); - const message = input.value.trim(); - if (!message) return; - - input.value = ''; - addChatMessage(message, true); - addChatMessage('', false, true); - - try { - // 判断是命令还是问答 - const isCommand = message.includes('合并') || message.includes('修改') || - message.startsWith('把') || message.startsWith('将'); - - if (isCommand) { - // 执行命令 - const res = await fetch(`${API_BASE}/projects/${currentProject.id}/agent/command`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ command: message }) - }); - - removeTypingIndicator(); - - if (res.ok) { - const result = await res.json(); - let response = ''; - - if (result.intent === 'merge_entities') { - if (result.success) { - response = `✅ 已合并 ${result.merged.length} 个实体到 "${result.target}"`; - await loadProjectData(); // 刷新数据 - } else { - response = `❌ 合并失败:${result.error || '未找到匹配的实体'}`; - } - } else if (result.intent === 'edit_entity') { - if (result.success) { - response = `✅ 已更新实体 "${result.entity?.name}"`; - await loadProjectData(); - } else { - response = `❌ 编辑失败:${result.error || '未找到实体'}`; - } - } else if (result.intent === 'answer_question') { - response = result.answer; - } else { - response = result.message || result.explanation || '未识别的指令'; - } - - addChatMessage(response); - } else { - addChatMessage('❌ 请求失败,请重试'); - } - } else { - // RAG 问答 - const res = await fetch(`${API_BASE}/projects/${currentProject.id}/agent/query`, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ query: message, stream: false }) - }); - - removeTypingIndicator(); - - if (res.ok) { - const result = await res.json(); - addChatMessage(result.answer); - } else { - addChatMessage('❌ 获取回答失败,请重试'); - } - } - } catch (err) { - removeTypingIndicator(); - addChatMessage('❌ 网络错误,请检查连接'); - console.error('Agent error:', err); - } -} - -async function loadSuggestions() { - addChatMessage('正在获取建议...', false, true); - - try { - const res = await fetch(`${API_BASE}/projects/${currentProject.id}/agent/suggest`); - removeTypingIndicator(); - - if (res.ok) { - const result = await res.json(); - const suggestions = result.suggestions || []; - - if (suggestions.length === 0) { - addChatMessage('暂无建议,请先上传一些音频文件。'); - return; - } - - let html = '加载中...
'; - - try { - let content = ''; - - if (relation.id) { - // 从API获取溯源信息 - const res = await fetch(`${API_BASE}/relations/${relation.id}/provenance`); - if (res.ok) { - const data = await res.json(); - content = ` -获取溯源信息失败
'; - } - } else { - // 使用本地数据 - content = ` -加载失败
'; - } -} - -function closeProvenance() { - document.getElementById('provenanceModal').classList.remove('show'); -} - -// ==================== Entity List ==================== - -function renderEntityList() { - const container = document.getElementById('entityList'); - if (!container) return; - - container.innerHTML = '暂无实体,请上传音频文件
'; - return; - } - - projectEntities.forEach(ent => { - const div = document.createElement('div'); - div.className = 'entity-item'; - div.dataset.id = ent.id; - div.onclick = () => window.selectEntity(ent.id); - div.onmouseenter = (e) => showEntityCard(e, ent.id); - div.onmouseleave = hideEntityCard; - - div.innerHTML = ` - ${ent.type} -${file.name}
-ASR转录 + 实体提取中
-${err.message}
- -加载时间线数据...
加载失败,请重试
暂无时间线数据
-请先上传音频或文档文件
-加载失败: ${err.message}
+正在进行知识推理...
-推理失败,请稍后重试
-正在生成项目总结...
-总结生成失败,请稍后重试
+ if (apiKeysData.length === 0) { + container.innerHTML = ` +暂无 API Keys
+正在搜索关联路径...
'; - - try { - const res = await fetch( - `${API_BASE}/projects/${currentProject.id}/reasoning/inference-path?start_entity=${encodeURIComponent(startEntity)}&end_entity=${encodeURIComponent(endEntity)}` - ); - - if (!res.ok) throw new Error('Path finding failed'); - - const data = await res.json(); - renderInferencePaths(data); - - } catch (err) { - console.error('Path finding failed:', err); - pathsList.innerHTML = '路径搜索失败
'; - } -}; - -// Phase 5: Entity Attributes Management -let currentEntityIdForAttributes = null; -let currentAttributes = []; -let currentTemplates = []; - -// Show entity attributes modal -window.showEntityAttributes = async function(entityId) { - if (entityId) { - currentEntityIdForAttributes = entityId; - } else if (selectedEntity) { - currentEntityIdForAttributes = selectedEntity; - } else { - alert('请先选择一个实体'); return; } - const modal = document.getElementById('attributesModal'); - modal.classList.add('show'); - - // Reset form - document.getElementById('attributesAddForm').style.display = 'none'; - document.getElementById('toggleAddAttrBtn').style.display = 'inline-block'; - document.getElementById('saveAttrBtn').style.display = 'none'; - - await loadEntityAttributes(); -}; - -window.hideAttributesModal = function() { - document.getElementById('attributesModal').classList.remove('show'); - currentEntityIdForAttributes = null; -}; - -async function loadEntityAttributes() { - if (!currentEntityIdForAttributes) return; - - try { - const res = await fetch(`${API_BASE}/entities/${currentEntityIdForAttributes}/attributes`); - if (!res.ok) throw new Error('Failed to load attributes'); - - const data = await res.json(); - currentAttributes = data.attributes || []; - - renderAttributesList(); - } catch (err) { - console.error('Load attributes failed:', err); - document.getElementById('attributesList').innerHTML = '加载失败
'; - } -} - -function renderAttributesList() { - const container = document.getElementById('attributesList'); - - if (currentAttributes.length === 0) { - container.innerHTML = '暂无属性,点击"添加属性"创建
'; - return; - } - - container.innerHTML = currentAttributes.map(attr => { - let valueDisplay = attr.value; - if (attr.type === 'multiselect' && Array.isArray(attr.value)) { - valueDisplay = attr.value.join(', '); - } - - return ` -加载统计失败
+暂无调用记录
+加载失败
'; - } -}; - -window.hideAttrHistoryModal = function() { - document.getElementById('attrHistoryModal').classList.remove('show'); -}; - -function renderAttributeHistory(history, attributeName) { - const container = document.getElementById('attrHistoryContent'); - - if (history.length === 0) { - container.innerHTML = `属性 "${attributeName}" 暂无变更历史
`; - return; - } - - container.innerHTML = history.map(h => { - const date = new Date(h.changed_at).toLocaleString(); - return ` -加载失败
'; - } -} - -function renderTemplatesList() { - const container = document.getElementById('templatesList'); - - if (currentTemplates.length === 0) { - container.innerHTML = '暂无模板,点击"新建模板"创建
'; - return; - } - - container.innerHTML = currentTemplates.map(t => { - const optionsStr = t.options ? `选项: ${t.options.join(', ')}` : ''; - return ` -未找到匹配的实体
'; - return; - } - - grid.innerHTML = entities.map(ent => ` -暂无中心性数据
'; - return; - } - - // 按度中心性排序 - const sorted = [...data.centrality].sort((a, b) => b.degree - a.degree); - - container.innerHTML = sorted.map((item, index) => { - const rank = index + 1; - const isTop3 = rank <= 3; - const entity = projectEntities.find(e => e.id === item.entity_id); - - return ` -暂无社区数据
'; - return; - } - - // 渲染社区列表 - container.innerHTML = data.communities.map((community, idx) => { - const nodeNames = community.node_names || []; - const density = community.density ? community.density.toFixed(3) : 'N/A'; - return ` -路径查找失败
-请确保数据已同步到 Neo4j
-未找到路径
-邻居查询失败
-请确保数据已同步到 Neo4j
-未找到邻居节点
-管理 API 访问密钥和调用统计
+加载中...
+