Merge remote changes

This commit is contained in:
OpenClaw Bot
2026-02-24 00:13:24 +08:00
49 changed files with 14745 additions and 3555 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

529
backend/api_key_manager.py Normal file
View File

@@ -0,0 +1,529 @@
#!/usr/bin/env python3
"""
InsightFlow API Key Manager - Phase 6
API Key 管理模块:生成、验证、撤销
"""
import os
import json
import hashlib
import secrets
import sqlite3
from datetime import datetime, timedelta
from typing import Optional, List, Dict
from dataclasses import dataclass
from enum import Enum
DB_PATH = os.getenv("DB_PATH", "/app/data/insightflow.db")
class ApiKeyStatus(Enum):
ACTIVE = "active"
REVOKED = "revoked"
EXPIRED = "expired"
@dataclass
class ApiKey:
id: str
key_hash: str # 存储哈希值,不存储原始 key
key_preview: str # 前8位预览如 "ak_live_abc..."
name: str # 密钥名称/描述
owner_id: Optional[str] # 所有者ID预留多用户支持
permissions: List[str] # 权限列表,如 ["read", "write"]
rate_limit: int # 每分钟请求限制
status: str # active, revoked, expired
created_at: str
expires_at: Optional[str]
last_used_at: Optional[str]
revoked_at: Optional[str]
revoked_reason: Optional[str]
total_calls: int = 0
class ApiKeyManager:
"""API Key 管理器"""
# Key 前缀
KEY_PREFIX = "ak_live_"
KEY_LENGTH = 48 # 总长度: 前缀(8) + 随机部分(40)
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库表"""
with sqlite3.connect(self.db_path) as conn:
conn.executescript("""
-- API Keys 表
CREATE TABLE IF NOT EXISTS api_keys (
id TEXT PRIMARY KEY,
key_hash TEXT UNIQUE NOT NULL,
key_preview TEXT NOT NULL,
name TEXT NOT NULL,
owner_id TEXT,
permissions TEXT NOT NULL DEFAULT '["read"]',
rate_limit INTEGER DEFAULT 60,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
last_used_at TIMESTAMP,
revoked_at TIMESTAMP,
revoked_reason TEXT,
total_calls INTEGER DEFAULT 0
);
-- API 调用日志表
CREATE TABLE IF NOT EXISTS api_call_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key_id TEXT NOT NULL,
endpoint TEXT NOT NULL,
method TEXT NOT NULL,
status_code INTEGER,
response_time_ms INTEGER,
ip_address TEXT,
user_agent TEXT,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (api_key_id) REFERENCES api_keys(id)
);
-- API 调用统计表(按天汇总)
CREATE TABLE IF NOT EXISTS api_call_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_key_id TEXT NOT NULL,
date TEXT NOT NULL,
endpoint TEXT NOT NULL,
method TEXT NOT NULL,
total_calls INTEGER DEFAULT 0,
success_calls INTEGER DEFAULT 0,
error_calls INTEGER DEFAULT 0,
avg_response_time_ms INTEGER DEFAULT 0,
FOREIGN KEY (api_key_id) REFERENCES api_keys(id),
UNIQUE(api_key_id, date, endpoint, method)
);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_api_keys_hash ON api_keys(key_hash);
CREATE INDEX IF NOT EXISTS idx_api_keys_status ON api_keys(status);
CREATE INDEX IF NOT EXISTS idx_api_keys_owner ON api_keys(owner_id);
CREATE INDEX IF NOT EXISTS idx_api_logs_key_id ON api_call_logs(api_key_id);
CREATE INDEX IF NOT EXISTS idx_api_logs_created ON api_call_logs(created_at);
CREATE INDEX IF NOT EXISTS idx_api_stats_key_date ON api_call_stats(api_key_id, date);
""")
conn.commit()
def _generate_key(self) -> str:
"""生成新的 API Key"""
# 生成 40 字符的随机字符串
random_part = secrets.token_urlsafe(30)[:40]
return f"{self.KEY_PREFIX}{random_part}"
def _hash_key(self, key: str) -> str:
"""对 API Key 进行哈希"""
return hashlib.sha256(key.encode()).hexdigest()
def _get_preview(self, key: str) -> str:
"""获取 Key 的预览前16位"""
return f"{key[:16]}..."
def create_key(
self,
name: str,
owner_id: Optional[str] = None,
permissions: List[str] = None,
rate_limit: int = 60,
expires_days: Optional[int] = None
) -> tuple[str, ApiKey]:
"""
创建新的 API Key
Returns:
tuple: (原始key仅返回一次, ApiKey对象)
"""
if permissions is None:
permissions = ["read"]
key_id = secrets.token_hex(16)
raw_key = self._generate_key()
key_hash = self._hash_key(raw_key)
key_preview = self._get_preview(raw_key)
expires_at = None
if expires_days:
expires_at = (datetime.now() + timedelta(days=expires_days)).isoformat()
api_key = ApiKey(
id=key_id,
key_hash=key_hash,
key_preview=key_preview,
name=name,
owner_id=owner_id,
permissions=permissions,
rate_limit=rate_limit,
status=ApiKeyStatus.ACTIVE.value,
created_at=datetime.now().isoformat(),
expires_at=expires_at,
last_used_at=None,
revoked_at=None,
revoked_reason=None,
total_calls=0
)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO api_keys (
id, key_hash, key_preview, name, owner_id, permissions,
rate_limit, status, created_at, expires_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
api_key.id, api_key.key_hash, api_key.key_preview,
api_key.name, api_key.owner_id, json.dumps(api_key.permissions),
api_key.rate_limit, api_key.status, api_key.created_at,
api_key.expires_at
))
conn.commit()
return raw_key, api_key
def validate_key(self, key: str) -> Optional[ApiKey]:
"""
验证 API Key
Returns:
ApiKey if valid, None otherwise
"""
key_hash = self._hash_key(key)
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM api_keys WHERE key_hash = ?",
(key_hash,)
).fetchone()
if not row:
return None
api_key = self._row_to_api_key(row)
# 检查状态
if api_key.status != ApiKeyStatus.ACTIVE.value:
return None
# 检查是否过期
if api_key.expires_at:
expires = datetime.fromisoformat(api_key.expires_at)
if datetime.now() > expires:
# 更新状态为过期
conn.execute(
"UPDATE api_keys SET status = ? WHERE id = ?",
(ApiKeyStatus.EXPIRED.value, api_key.id)
)
conn.commit()
return None
return api_key
def revoke_key(
self,
key_id: str,
reason: str = "",
owner_id: Optional[str] = None
) -> bool:
"""撤销 API Key"""
with sqlite3.connect(self.db_path) as conn:
# 验证所有权(如果提供了 owner_id
if owner_id:
row = conn.execute(
"SELECT owner_id FROM api_keys WHERE id = ?",
(key_id,)
).fetchone()
if not row or row[0] != owner_id:
return False
cursor = conn.execute("""
UPDATE api_keys
SET status = ?, revoked_at = ?, revoked_reason = ?
WHERE id = ? AND status = ?
""", (
ApiKeyStatus.REVOKED.value,
datetime.now().isoformat(),
reason,
key_id,
ApiKeyStatus.ACTIVE.value
))
conn.commit()
return cursor.rowcount > 0
def get_key_by_id(self, key_id: str, owner_id: Optional[str] = None) -> Optional[ApiKey]:
"""通过 ID 获取 API Key不包含敏感信息"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
if owner_id:
row = conn.execute(
"SELECT * FROM api_keys WHERE id = ? AND owner_id = ?",
(key_id, owner_id)
).fetchone()
else:
row = conn.execute(
"SELECT * FROM api_keys WHERE id = ?",
(key_id,)
).fetchone()
if row:
return self._row_to_api_key(row)
return None
def list_keys(
self,
owner_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[ApiKey]:
"""列出 API Keys"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = "SELECT * FROM api_keys WHERE 1=1"
params = []
if owner_id:
query += " AND owner_id = ?"
params.append(owner_id)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(query, params).fetchall()
return [self._row_to_api_key(row) for row in rows]
def update_key(
self,
key_id: str,
name: Optional[str] = None,
permissions: Optional[List[str]] = None,
rate_limit: Optional[int] = None,
owner_id: Optional[str] = None
) -> bool:
"""更新 API Key 信息"""
updates = []
params = []
if name is not None:
updates.append("name = ?")
params.append(name)
if permissions is not None:
updates.append("permissions = ?")
params.append(json.dumps(permissions))
if rate_limit is not None:
updates.append("rate_limit = ?")
params.append(rate_limit)
if not updates:
return False
params.append(key_id)
with sqlite3.connect(self.db_path) as conn:
# 验证所有权
if owner_id:
row = conn.execute(
"SELECT owner_id FROM api_keys WHERE id = ?",
(key_id,)
).fetchone()
if not row or row[0] != owner_id:
return False
query = f"UPDATE api_keys SET {', '.join(updates)} WHERE id = ?"
cursor = conn.execute(query, params)
conn.commit()
return cursor.rowcount > 0
def update_last_used(self, key_id: str):
"""更新最后使用时间"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE api_keys
SET last_used_at = ?, total_calls = total_calls + 1
WHERE id = ?
""", (datetime.now().isoformat(), key_id))
conn.commit()
def log_api_call(
self,
api_key_id: str,
endpoint: str,
method: str,
status_code: int = 200,
response_time_ms: int = 0,
ip_address: str = "",
user_agent: str = "",
error_message: str = ""
):
"""记录 API 调用日志"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO api_call_logs
(api_key_id, endpoint, method, status_code, response_time_ms,
ip_address, user_agent, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
api_key_id, endpoint, method, status_code, response_time_ms,
ip_address, user_agent, error_message
))
conn.commit()
def get_call_logs(
self,
api_key_id: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict]:
"""获取 API 调用日志"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = "SELECT * FROM api_call_logs WHERE 1=1"
params = []
if api_key_id:
query += " AND api_key_id = ?"
params.append(api_key_id)
if start_date:
query += " AND created_at >= ?"
params.append(start_date)
if end_date:
query += " AND created_at <= ?"
params.append(end_date)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(query, params).fetchall()
return [dict(row) for row in rows]
def get_call_stats(
self,
api_key_id: Optional[str] = None,
days: int = 30
) -> Dict:
"""获取 API 调用统计"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
# 总体统计
query = """
SELECT
COUNT(*) as total_calls,
COUNT(CASE WHEN status_code < 400 THEN 1 END) as success_calls,
COUNT(CASE WHEN status_code >= 400 THEN 1 END) as error_calls,
AVG(response_time_ms) as avg_response_time,
MAX(response_time_ms) as max_response_time,
MIN(response_time_ms) as min_response_time
FROM api_call_logs
WHERE created_at >= date('now', '-{} days')
""".format(days)
params = []
if api_key_id:
query = query.replace("WHERE created_at", "WHERE api_key_id = ? AND created_at")
params.insert(0, api_key_id)
row = conn.execute(query, params).fetchone()
# 按端点统计
endpoint_query = """
SELECT
endpoint,
method,
COUNT(*) as calls,
AVG(response_time_ms) as avg_time
FROM api_call_logs
WHERE created_at >= date('now', '-{} days')
""".format(days)
endpoint_params = []
if api_key_id:
endpoint_query = endpoint_query.replace("WHERE created_at", "WHERE api_key_id = ? AND created_at")
endpoint_params.insert(0, api_key_id)
endpoint_query += " GROUP BY endpoint, method ORDER BY calls DESC"
endpoint_rows = conn.execute(endpoint_query, endpoint_params).fetchall()
# 按天统计
daily_query = """
SELECT
date(created_at) as date,
COUNT(*) as calls,
COUNT(CASE WHEN status_code < 400 THEN 1 END) as success
FROM api_call_logs
WHERE created_at >= date('now', '-{} days')
""".format(days)
daily_params = []
if api_key_id:
daily_query = daily_query.replace("WHERE created_at", "WHERE api_key_id = ? AND created_at")
daily_params.insert(0, api_key_id)
daily_query += " GROUP BY date(created_at) ORDER BY date"
daily_rows = conn.execute(daily_query, daily_params).fetchall()
return {
"summary": {
"total_calls": row["total_calls"] or 0,
"success_calls": row["success_calls"] or 0,
"error_calls": row["error_calls"] or 0,
"avg_response_time_ms": round(row["avg_response_time"] or 0, 2),
"max_response_time_ms": row["max_response_time"] or 0,
"min_response_time_ms": row["min_response_time"] or 0,
},
"endpoints": [dict(r) for r in endpoint_rows],
"daily": [dict(r) for r in daily_rows]
}
def _row_to_api_key(self, row: sqlite3.Row) -> ApiKey:
"""将数据库行转换为 ApiKey 对象"""
return ApiKey(
id=row["id"],
key_hash=row["key_hash"],
key_preview=row["key_preview"],
name=row["name"],
owner_id=row["owner_id"],
permissions=json.loads(row["permissions"]),
rate_limit=row["rate_limit"],
status=row["status"],
created_at=row["created_at"],
expires_at=row["expires_at"],
last_used_at=row["last_used_at"],
revoked_at=row["revoked_at"],
revoked_reason=row["revoked_reason"],
total_calls=row["total_calls"]
)
# 全局实例
_api_key_manager: Optional[ApiKeyManager] = None
def get_api_key_manager() -> ApiKeyManager:
"""获取 API Key 管理器实例"""
global _api_key_manager
if _api_key_manager is None:
_api_key_manager = ApiKeyManager()
return _api_key_manager

View File

@@ -878,6 +878,310 @@ class DatabaseManager:
filtered.append(entity)
return filtered
# ==================== Phase 7: Multimodal Support ====================
def create_video(self, video_id: str, project_id: str, filename: str,
duration: float = 0, fps: float = 0, resolution: Dict = None,
audio_transcript_id: str = None, full_ocr_text: str = "",
extracted_entities: List[Dict] = None,
extracted_relations: List[Dict] = None) -> str:
"""创建视频记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO videos
(id, project_id, filename, duration, fps, resolution,
audio_transcript_id, full_ocr_text, extracted_entities,
extracted_relations, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(video_id, project_id, filename, duration, fps,
json.dumps(resolution) if resolution else None,
audio_transcript_id, full_ocr_text,
json.dumps(extracted_entities or []),
json.dumps(extracted_relations or []),
'completed', now, now)
)
conn.commit()
conn.close()
return video_id
def get_video(self, video_id: str) -> Optional[Dict]:
"""获取视频信息"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM videos WHERE id = ?", (video_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
data['resolution'] = json.loads(data['resolution']) if data['resolution'] else None
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
return data
return None
def list_project_videos(self, project_id: str) -> List[Dict]:
"""获取项目的所有视频"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM videos WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
videos = []
for row in rows:
data = dict(row)
data['resolution'] = json.loads(data['resolution']) if data['resolution'] else None
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
videos.append(data)
return videos
def create_video_frame(self, frame_id: str, video_id: str, frame_number: int,
timestamp: float, image_url: str = None,
ocr_text: str = None, extracted_entities: List[Dict] = None) -> str:
"""创建视频帧记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO video_frames
(id, video_id, frame_number, timestamp, image_url, ocr_text, extracted_entities, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(frame_id, video_id, frame_number, timestamp, image_url, ocr_text,
json.dumps(extracted_entities or []), now)
)
conn.commit()
conn.close()
return frame_id
def get_video_frames(self, video_id: str) -> List[Dict]:
"""获取视频的所有帧"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT * FROM video_frames WHERE video_id = ? ORDER BY timestamp""",
(video_id,)
).fetchall()
conn.close()
frames = []
for row in rows:
data = dict(row)
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
frames.append(data)
return frames
def create_image(self, image_id: str, project_id: str, filename: str,
ocr_text: str = "", description: str = "",
extracted_entities: List[Dict] = None,
extracted_relations: List[Dict] = None) -> str:
"""创建图片记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO images
(id, project_id, filename, ocr_text, description,
extracted_entities, extracted_relations, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(image_id, project_id, filename, ocr_text, description,
json.dumps(extracted_entities or []),
json.dumps(extracted_relations or []),
'completed', now, now)
)
conn.commit()
conn.close()
return image_id
def get_image(self, image_id: str) -> Optional[Dict]:
"""获取图片信息"""
conn = self.get_conn()
row = conn.execute(
"SELECT * FROM images WHERE id = ?", (image_id,)
).fetchone()
conn.close()
if row:
data = dict(row)
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
return data
return None
def list_project_images(self, project_id: str) -> List[Dict]:
"""获取项目的所有图片"""
conn = self.get_conn()
rows = conn.execute(
"SELECT * FROM images WHERE project_id = ? ORDER BY created_at DESC",
(project_id,)
).fetchall()
conn.close()
images = []
for row in rows:
data = dict(row)
data['extracted_entities'] = json.loads(data['extracted_entities']) if data['extracted_entities'] else []
data['extracted_relations'] = json.loads(data['extracted_relations']) if data['extracted_relations'] else []
images.append(data)
return images
def create_multimodal_mention(self, mention_id: str, project_id: str,
entity_id: str, modality: str, source_id: str,
source_type: str, text_snippet: str = "",
confidence: float = 1.0) -> str:
"""创建多模态实体提及记录"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT OR REPLACE INTO multimodal_mentions
(id, project_id, entity_id, modality, source_id, source_type,
text_snippet, confidence, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(mention_id, project_id, entity_id, modality, source_id,
source_type, text_snippet, confidence, now)
)
conn.commit()
conn.close()
return mention_id
def get_entity_multimodal_mentions(self, entity_id: str) -> List[Dict]:
"""获取实体的多模态提及"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT m.*, e.name as entity_name
FROM multimodal_mentions m
JOIN entities e ON m.entity_id = e.id
WHERE m.entity_id = ? ORDER BY m.created_at DESC""",
(entity_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_project_multimodal_mentions(self, project_id: str,
modality: str = None) -> List[Dict]:
"""获取项目的多模态提及"""
conn = self.get_conn()
if modality:
rows = conn.execute(
"""SELECT m.*, e.name as entity_name
FROM multimodal_mentions m
JOIN entities e ON m.entity_id = e.id
WHERE m.project_id = ? AND m.modality = ?
ORDER BY m.created_at DESC""",
(project_id, modality)
).fetchall()
else:
rows = conn.execute(
"""SELECT m.*, e.name as entity_name
FROM multimodal_mentions m
JOIN entities e ON m.entity_id = e.id
WHERE m.project_id = ? ORDER BY m.created_at DESC""",
(project_id,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def create_multimodal_entity_link(self, link_id: str, entity_id: str,
linked_entity_id: str, link_type: str,
confidence: float = 1.0,
evidence: str = "",
modalities: List[str] = None) -> str:
"""创建多模态实体关联"""
conn = self.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT OR REPLACE INTO multimodal_entity_links
(id, entity_id, linked_entity_id, link_type, confidence,
evidence, modalities, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(link_id, entity_id, linked_entity_id, link_type, confidence,
evidence, json.dumps(modalities or []), now)
)
conn.commit()
conn.close()
return link_id
def get_entity_multimodal_links(self, entity_id: str) -> List[Dict]:
"""获取实体的多模态关联"""
conn = self.get_conn()
rows = conn.execute(
"""SELECT l.*, e1.name as entity_name, e2.name as linked_entity_name
FROM multimodal_entity_links l
JOIN entities e1 ON l.entity_id = e1.id
JOIN entities e2 ON l.linked_entity_id = e2.id
WHERE l.entity_id = ? OR l.linked_entity_id = ?""",
(entity_id, entity_id)
).fetchall()
conn.close()
links = []
for row in rows:
data = dict(row)
data['modalities'] = json.loads(data['modalities']) if data['modalities'] else []
links.append(data)
return links
def get_project_multimodal_stats(self, project_id: str) -> Dict:
"""获取项目多模态统计信息"""
conn = self.get_conn()
stats = {
'video_count': 0,
'image_count': 0,
'multimodal_entity_count': 0,
'cross_modal_links': 0,
'modality_distribution': {}
}
# 视频数量
row = conn.execute(
"SELECT COUNT(*) as count FROM videos WHERE project_id = ?",
(project_id,)
).fetchone()
stats['video_count'] = row['count']
# 图片数量
row = conn.execute(
"SELECT COUNT(*) as count FROM images WHERE project_id = ?",
(project_id,)
).fetchone()
stats['image_count'] = row['count']
# 多模态实体数量
row = conn.execute(
"""SELECT COUNT(DISTINCT entity_id) as count
FROM multimodal_mentions WHERE project_id = ?""",
(project_id,)
).fetchone()
stats['multimodal_entity_count'] = row['count']
# 跨模态关联数量
row = conn.execute(
"""SELECT COUNT(*) as count FROM multimodal_entity_links
WHERE entity_id IN (SELECT id FROM entities WHERE project_id = ?)""",
(project_id,)
).fetchone()
stats['cross_modal_links'] = row['count']
# 模态分布
for modality in ['audio', 'video', 'image', 'document']:
row = conn.execute(
"""SELECT COUNT(*) as count FROM multimodal_mentions
WHERE project_id = ? AND modality = ?""",
(project_id, modality)
).fetchone()
stats['modality_distribution'][modality] = row['count']
conn.close()
return stats
# Singleton instance
_db_manager = None

View File

@@ -0,0 +1,308 @@
# InsightFlow Phase 7 - 多模态支持 API 文档
## 概述
Phase 7 多模态支持模块为 InsightFlow 添加了处理视频和图片的能力,支持:
1. **视频处理**提取音频、关键帧、OCR 识别
2. **图片处理**识别白板、PPT、手写笔记等内容
3. **多模态实体关联**:跨模态实体对齐和知识融合
## 新增 API 端点
### 视频处理
#### 上传视频
```
POST /api/v1/projects/{project_id}/upload-video
```
**参数:**
- `file` (required): 视频文件
- `extract_interval` (optional): 关键帧提取间隔(秒),默认 5 秒
**响应:**
```json
{
"video_id": "abc123",
"project_id": "proj456",
"filename": "meeting.mp4",
"status": "completed",
"audio_extracted": true,
"frame_count": 24,
"ocr_text_preview": "会议内容预览...",
"message": "Video processed successfully"
}
```
#### 获取项目视频列表
```
GET /api/v1/projects/{project_id}/videos
```
**响应:**
```json
[
{
"id": "abc123",
"filename": "meeting.mp4",
"duration": 120.5,
"fps": 30.0,
"resolution": {"width": 1920, "height": 1080},
"ocr_preview": "会议内容...",
"status": "completed",
"created_at": "2024-01-15T10:30:00"
}
]
```
#### 获取视频关键帧
```
GET /api/v1/videos/{video_id}/frames
```
**响应:**
```json
[
{
"id": "frame001",
"frame_number": 1,
"timestamp": 0.0,
"image_url": "/tmp/frames/video123/frame_000001_0.00.jpg",
"ocr_text": "第一页内容...",
"entities": [{"name": "Project Alpha", "type": "PROJECT"}]
}
]
```
### 图片处理
#### 上传图片
```
POST /api/v1/projects/{project_id}/upload-image
```
**参数:**
- `file` (required): 图片文件
- `detect_type` (optional): 是否自动检测图片类型,默认 true
**响应:**
```json
{
"image_id": "img789",
"project_id": "proj456",
"filename": "whiteboard.jpg",
"image_type": "whiteboard",
"ocr_text_preview": "白板内容...",
"description": "这是一张白板图片。内容摘要:...",
"entity_count": 5,
"status": "completed"
}
```
#### 批量上传图片
```
POST /api/v1/projects/{project_id}/upload-images-batch
```
**参数:**
- `files` (required): 多个图片文件
**响应:**
```json
{
"project_id": "proj456",
"total_count": 3,
"success_count": 3,
"failed_count": 0,
"results": [
{
"image_id": "img001",
"status": "success",
"image_type": "ppt",
"entity_count": 4
}
]
}
```
#### 获取项目图片列表
```
GET /api/v1/projects/{project_id}/images
```
### 多模态实体关联
#### 跨模态实体对齐
```
POST /api/v1/projects/{project_id}/multimodal/align
```
**参数:**
- `threshold` (optional): 相似度阈值,默认 0.85
**响应:**
```json
{
"project_id": "proj456",
"aligned_count": 5,
"links": [
{
"link_id": "link001",
"source_entity_id": "ent001",
"target_entity_id": "ent002",
"source_modality": "video",
"target_modality": "document",
"link_type": "same_as",
"confidence": 0.95,
"evidence": "Cross-modal alignment: exact"
}
],
"message": "Successfully aligned 5 cross-modal entity pairs"
}
```
#### 获取多模态统计信息
```
GET /api/v1/projects/{project_id}/multimodal/stats
```
**响应:**
```json
{
"project_id": "proj456",
"video_count": 3,
"image_count": 10,
"multimodal_entity_count": 25,
"cross_modal_links": 8,
"modality_distribution": {
"audio": 15,
"video": 8,
"image": 12,
"document": 20
}
}
```
#### 获取实体多模态提及
```
GET /api/v1/entities/{entity_id}/multimodal-mentions
```
**响应:**
```json
[
{
"id": "mention001",
"entity_id": "ent001",
"entity_name": "Project Alpha",
"modality": "video",
"source_id": "video123",
"source_type": "video_frame",
"text_snippet": "Project Alpha 进度",
"confidence": 1.0,
"created_at": "2024-01-15T10:30:00"
}
]
```
#### 建议多模态实体合并
```
GET /api/v1/projects/{project_id}/multimodal/suggest-merges
```
**响应:**
```json
{
"project_id": "proj456",
"suggestion_count": 3,
"suggestions": [
{
"entity1": {"id": "ent001", "name": "K8s", "type": "TECH"},
"entity2": {"id": "ent002", "name": "Kubernetes", "type": "TECH"},
"similarity": 0.95,
"match_type": "alias_match",
"suggested_action": "merge"
}
]
}
```
## 数据库表结构
### videos 表
存储视频文件信息
- `id`: 视频ID
- `project_id`: 所属项目ID
- `filename`: 文件名
- `duration`: 视频时长(秒)
- `fps`: 帧率
- `resolution`: 分辨率JSON
- `audio_transcript_id`: 关联的音频转录ID
- `full_ocr_text`: 所有帧OCR文本合并
- `extracted_entities`: 提取的实体JSON
- `extracted_relations`: 提取的关系JSON
- `status`: 处理状态
### video_frames 表
存储视频关键帧信息
- `id`: 帧ID
- `video_id`: 所属视频ID
- `frame_number`: 帧序号
- `timestamp`: 时间戳(秒)
- `image_url`: 图片URL或路径
- `ocr_text`: OCR识别文本
- `extracted_entities`: 该帧提取的实体
### images 表
存储图片文件信息
- `id`: 图片ID
- `project_id`: 所属项目ID
- `filename`: 文件名
- `ocr_text`: OCR识别文本
- `description`: 图片描述
- `extracted_entities`: 提取的实体
- `extracted_relations`: 提取的关系
- `status`: 处理状态
### multimodal_mentions 表
存储实体在多模态中的提及
- `id`: 提及ID
- `project_id`: 所属项目ID
- `entity_id`: 实体ID
- `modality`: 模态类型audio/video/image/document
- `source_id`: 来源ID
- `source_type`: 来源类型
- `text_snippet`: 文本片段
- `confidence`: 置信度
### multimodal_entity_links 表
存储跨模态实体关联
- `id`: 关联ID
- `entity_id`: 实体ID
- `linked_entity_id`: 关联实体ID
- `link_type`: 关联类型same_as/related_to/part_of
- `confidence`: 置信度
- `evidence`: 关联证据
- `modalities`: 涉及的模态列表
## 依赖安装
```bash
pip install ffmpeg-python pillow opencv-python pytesseract
```
注意:使用 OCR 功能需要安装 Tesseract OCR 引擎:
- Ubuntu/Debian: `sudo apt-get install tesseract-ocr tesseract-ocr-chi-sim`
- macOS: `brew install tesseract tesseract-lang`
- Windows: 下载安装包从 https://github.com/UB-Mannheim/tesseract/wiki
## 环境变量
```bash
# 可选:自定义临时目录
export INSIGHTFLOW_TEMP_DIR=/path/to/temp
# 可选Tesseract 路径Windows
export TESSERACT_CMD=C:\Program Files\Tesseract-OCR\tesseract.exe
```

547
backend/image_processor.py Normal file
View File

@@ -0,0 +1,547 @@
#!/usr/bin/env python3
"""
InsightFlow Image Processor - Phase 7
图片处理模块识别白板、PPT、手写笔记等内容
"""
import os
import io
import json
import uuid
import base64
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
# 尝试导入图像处理库
try:
from PIL import Image, ImageEnhance, ImageFilter
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
try:
import cv2
import numpy as np
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import pytesseract
PYTESSERACT_AVAILABLE = True
except ImportError:
PYTESSERACT_AVAILABLE = False
@dataclass
class ImageEntity:
"""图片中检测到的实体"""
name: str
type: str
confidence: float
bbox: Optional[Tuple[int, int, int, int]] = None # (x, y, width, height)
@dataclass
class ImageRelation:
"""图片中检测到的关系"""
source: str
target: str
relation_type: str
confidence: float
@dataclass
class ImageProcessingResult:
"""图片处理结果"""
image_id: str
image_type: str # whiteboard, ppt, handwritten, screenshot, other
ocr_text: str
description: str
entities: List[ImageEntity]
relations: List[ImageRelation]
width: int
height: int
success: bool
error_message: str = ""
@dataclass
class BatchProcessingResult:
"""批量图片处理结果"""
results: List[ImageProcessingResult]
total_count: int
success_count: int
failed_count: int
class ImageProcessor:
"""图片处理器 - 处理各种类型图片"""
# 图片类型定义
IMAGE_TYPES = {
'whiteboard': '白板',
'ppt': 'PPT/演示文稿',
'handwritten': '手写笔记',
'screenshot': '屏幕截图',
'document': '文档图片',
'other': '其他'
}
def __init__(self, temp_dir: str = None):
"""
初始化图片处理器
Args:
temp_dir: 临时文件目录
"""
self.temp_dir = temp_dir or os.path.join(os.getcwd(), 'temp', 'images')
os.makedirs(self.temp_dir, exist_ok=True)
def preprocess_image(self, image, image_type: str = None):
"""
预处理图片以提高OCR质量
Args:
image: PIL Image 对象
image_type: 图片类型(用于针对性处理)
Returns:
处理后的图片
"""
if not PIL_AVAILABLE:
return image
try:
# 转换为RGB如果是RGBA
if image.mode == 'RGBA':
image = image.convert('RGB')
# 根据图片类型进行针对性处理
if image_type == 'whiteboard':
# 白板:增强对比度,去除背景
image = self._enhance_whiteboard(image)
elif image_type == 'handwritten':
# 手写笔记:降噪,增强对比度
image = self._enhance_handwritten(image)
elif image_type == 'screenshot':
# 截图:轻微锐化
image = image.filter(ImageFilter.SHARPEN)
# 通用处理:调整大小(如果太大)
max_size = 4096
if max(image.size) > max_size:
ratio = max_size / max(image.size)
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
image = image.resize(new_size, Image.Resampling.LANCZOS)
return image
except Exception as e:
print(f"Image preprocessing error: {e}")
return image
def _enhance_whiteboard(self, image):
"""增强白板图片"""
# 转换为灰度
gray = image.convert('L')
# 增强对比度
enhancer = ImageEnhance.Contrast(gray)
enhanced = enhancer.enhance(2.0)
# 二值化
threshold = 128
binary = enhanced.point(lambda x: 0 if x < threshold else 255, '1')
return binary.convert('L')
def _enhance_handwritten(self, image):
"""增强手写笔记图片"""
# 转换为灰度
gray = image.convert('L')
# 轻微降噪
blurred = gray.filter(ImageFilter.GaussianBlur(radius=1))
# 增强对比度
enhancer = ImageEnhance.Contrast(blurred)
enhanced = enhancer.enhance(1.5)
return enhanced
def detect_image_type(self, image, ocr_text: str = "") -> str:
"""
自动检测图片类型
Args:
image: PIL Image 对象
ocr_text: OCR识别的文本
Returns:
图片类型字符串
"""
if not PIL_AVAILABLE:
return 'other'
try:
# 基于图片特征和OCR内容判断类型
width, height = image.size
aspect_ratio = width / height
# 检测是否为PPT通常是16:9或4:3
if 1.3 <= aspect_ratio <= 1.8:
# 检查是否有典型的PPT特征标题、项目符号等
if any(keyword in ocr_text.lower() for keyword in ['slide', 'page', '', '']):
return 'ppt'
# 检测是否为白板(大量手写文字,可能有箭头、框等)
if CV2_AVAILABLE:
img_array = np.array(image.convert('RGB'))
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
# 检测边缘(白板通常有很多线条)
edges = cv2.Canny(gray, 50, 150)
edge_ratio = np.sum(edges > 0) / edges.size
# 如果边缘比例高,可能是白板
if edge_ratio > 0.05 and len(ocr_text) > 50:
return 'whiteboard'
# 检测是否为手写笔记(文字密度高,可能有涂鸦)
if len(ocr_text) > 100 and aspect_ratio < 1.5:
# 检查手写特征(不规则的行高)
return 'handwritten'
# 检测是否为截图可能有UI元素
if any(keyword in ocr_text.lower() for keyword in ['button', 'menu', 'click', '登录', '确定', '取消']):
return 'screenshot'
# 默认文档类型
if len(ocr_text) > 200:
return 'document'
return 'other'
except Exception as e:
print(f"Image type detection error: {e}")
return 'other'
def perform_ocr(self, image, lang: str = 'chi_sim+eng') -> Tuple[str, float]:
"""
对图片进行OCR识别
Args:
image: PIL Image 对象
lang: OCR语言
Returns:
(识别的文本, 置信度)
"""
if not PYTESSERACT_AVAILABLE:
return "", 0.0
try:
# 预处理图片
processed_image = self.preprocess_image(image)
# 执行OCR
text = pytesseract.image_to_string(processed_image, lang=lang)
# 获取置信度
data = pytesseract.image_to_data(processed_image, output_type=pytesseract.Output.DICT)
confidences = [int(c) for c in data['conf'] if int(c) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
return text.strip(), avg_confidence / 100.0
except Exception as e:
print(f"OCR error: {e}")
return "", 0.0
def extract_entities_from_text(self, text: str) -> List[ImageEntity]:
"""
从OCR文本中提取实体
Args:
text: OCR识别的文本
Returns:
实体列表
"""
entities = []
# 简单的实体提取规则可以替换为LLM调用
# 提取大写字母开头的词组(可能是专有名词)
import re
# 项目名称(通常是大写或带引号)
project_pattern = r'["\']([^"\']+)["\']|([A-Z][a-zA-Z0-9]*(?:\s+[A-Z][a-zA-Z0-9]*)+)'
for match in re.finditer(project_pattern, text):
name = match.group(1) or match.group(2)
if name and len(name) > 2:
entities.append(ImageEntity(
name=name.strip(),
type='PROJECT',
confidence=0.7
))
# 人名(中文)
name_pattern = r'([\u4e00-\u9fa5]{2,4})(?:先生|女士|总|经理|工程师|老师)'
for match in re.finditer(name_pattern, text):
entities.append(ImageEntity(
name=match.group(1),
type='PERSON',
confidence=0.8
))
# 技术术语
tech_keywords = ['K8s', 'Kubernetes', 'Docker', 'API', 'SDK', 'AI', 'ML',
'Python', 'Java', 'React', 'Vue', 'Node.js', '数据库', '服务器']
for keyword in tech_keywords:
if keyword in text:
entities.append(ImageEntity(
name=keyword,
type='TECH',
confidence=0.9
))
# 去重
seen = set()
unique_entities = []
for e in entities:
key = (e.name.lower(), e.type)
if key not in seen:
seen.add(key)
unique_entities.append(e)
return unique_entities
def generate_description(self, image_type: str, ocr_text: str,
entities: List[ImageEntity]) -> str:
"""
生成图片描述
Args:
image_type: 图片类型
ocr_text: OCR文本
entities: 检测到的实体
Returns:
图片描述
"""
type_name = self.IMAGE_TYPES.get(image_type, '图片')
description_parts = [f"这是一张{type_name}图片。"]
if ocr_text:
# 提取前200字符作为摘要
text_preview = ocr_text[:200].replace('\n', ' ')
if len(ocr_text) > 200:
text_preview += "..."
description_parts.append(f"内容摘要:{text_preview}")
if entities:
entity_names = [e.name for e in entities[:5]] # 最多显示5个实体
description_parts.append(f"识别到的关键实体:{', '.join(entity_names)}")
return " ".join(description_parts)
def process_image(self, image_data: bytes, filename: str = None,
image_id: str = None, detect_type: bool = True) -> ImageProcessingResult:
"""
处理单张图片
Args:
image_data: 图片二进制数据
filename: 文件名
image_id: 图片ID可选
detect_type: 是否自动检测图片类型
Returns:
图片处理结果
"""
image_id = image_id or str(uuid.uuid4())[:8]
if not PIL_AVAILABLE:
return ImageProcessingResult(
image_id=image_id,
image_type='other',
ocr_text='',
description='PIL not available',
entities=[],
relations=[],
width=0,
height=0,
success=False,
error_message='PIL library not available'
)
try:
# 加载图片
image = Image.open(io.BytesIO(image_data))
width, height = image.size
# 执行OCR
ocr_text, ocr_confidence = self.perform_ocr(image)
# 检测图片类型
image_type = 'other'
if detect_type:
image_type = self.detect_image_type(image, ocr_text)
# 提取实体
entities = self.extract_entities_from_text(ocr_text)
# 生成描述
description = self.generate_description(image_type, ocr_text, entities)
# 提取关系(基于实体共现)
relations = self._extract_relations(entities, ocr_text)
# 保存图片文件(可选)
if filename:
save_path = os.path.join(self.temp_dir, f"{image_id}_{filename}")
image.save(save_path)
return ImageProcessingResult(
image_id=image_id,
image_type=image_type,
ocr_text=ocr_text,
description=description,
entities=entities,
relations=relations,
width=width,
height=height,
success=True
)
except Exception as e:
return ImageProcessingResult(
image_id=image_id,
image_type='other',
ocr_text='',
description='',
entities=[],
relations=[],
width=0,
height=0,
success=False,
error_message=str(e)
)
def _extract_relations(self, entities: List[ImageEntity], text: str) -> List[ImageRelation]:
"""
从文本中提取实体关系
Args:
entities: 实体列表
text: 文本内容
Returns:
关系列表
"""
relations = []
if len(entities) < 2:
return relations
# 简单的关系提取:如果两个实体在同一句子中出现,则认为它们相关
sentences = text.replace('', '.').replace('', '!').replace('', '?').split('.')
for sentence in sentences:
sentence_entities = []
for entity in entities:
if entity.name in sentence:
sentence_entities.append(entity)
# 如果句子中有多个实体,建立关系
if len(sentence_entities) >= 2:
for i in range(len(sentence_entities)):
for j in range(i + 1, len(sentence_entities)):
relations.append(ImageRelation(
source=sentence_entities[i].name,
target=sentence_entities[j].name,
relation_type='related',
confidence=0.5
))
return relations
def process_batch(self, images_data: List[Tuple[bytes, str]],
project_id: str = None) -> BatchProcessingResult:
"""
批量处理图片
Args:
images_data: 图片数据列表,每项为 (image_data, filename)
project_id: 项目ID
Returns:
批量处理结果
"""
results = []
success_count = 0
failed_count = 0
for image_data, filename in images_data:
result = self.process_image(image_data, filename)
results.append(result)
if result.success:
success_count += 1
else:
failed_count += 1
return BatchProcessingResult(
results=results,
total_count=len(results),
success_count=success_count,
failed_count=failed_count
)
def image_to_base64(self, image_data: bytes) -> str:
"""
将图片转换为base64编码
Args:
image_data: 图片二进制数据
Returns:
base64编码的字符串
"""
return base64.b64encode(image_data).decode('utf-8')
def get_image_thumbnail(self, image_data: bytes, size: Tuple[int, int] = (200, 200)) -> bytes:
"""
生成图片缩略图
Args:
image_data: 图片二进制数据
size: 缩略图尺寸
Returns:
缩略图二进制数据
"""
if not PIL_AVAILABLE:
return image_data
try:
image = Image.open(io.BytesIO(image_data))
image.thumbnail(size, Image.Resampling.LANCZOS)
buffer = io.BytesIO()
image.save(buffer, format='JPEG')
return buffer.getvalue()
except Exception as e:
print(f"Thumbnail generation error: {e}")
return image_data
# Singleton instance
_image_processor = None
def get_image_processor(temp_dir: str = None) -> ImageProcessor:
"""获取图片处理器单例"""
global _image_processor
if _image_processor is None:
_image_processor = ImageProcessor(temp_dir)
return _image_processor

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,514 @@
#!/usr/bin/env python3
"""
InsightFlow Multimodal Entity Linker - Phase 7
多模态实体关联模块:跨模态实体对齐和知识融合
"""
import os
import json
import uuid
from typing import List, Dict, Optional, Tuple, Set
from dataclasses import dataclass
from difflib import SequenceMatcher
# 尝试导入embedding库
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
@dataclass
class MultimodalEntity:
"""多模态实体"""
id: str
entity_id: str
project_id: str
name: str
source_type: str # audio, video, image, document
source_id: str
mention_context: str
confidence: float
modality_features: Dict = None # 模态特定特征
def __post_init__(self):
if self.modality_features is None:
self.modality_features = {}
@dataclass
class EntityLink:
"""实体关联"""
id: str
project_id: str
source_entity_id: str
target_entity_id: str
link_type: str # same_as, related_to, part_of
source_modality: str
target_modality: str
confidence: float
evidence: str
@dataclass
class AlignmentResult:
"""对齐结果"""
entity_id: str
matched_entity_id: Optional[str]
similarity: float
match_type: str # exact, fuzzy, embedding
confidence: float
@dataclass
class FusionResult:
"""知识融合结果"""
canonical_entity_id: str
merged_entity_ids: List[str]
fused_properties: Dict
source_modalities: List[str]
confidence: float
class MultimodalEntityLinker:
"""多模态实体关联器 - 跨模态实体对齐和知识融合"""
# 关联类型
LINK_TYPES = {
'same_as': '同一实体',
'related_to': '相关实体',
'part_of': '组成部分',
'mentions': '提及关系'
}
# 模态类型
MODALITIES = ['audio', 'video', 'image', 'document']
def __init__(self, similarity_threshold: float = 0.85):
"""
初始化多模态实体关联器
Args:
similarity_threshold: 相似度阈值
"""
self.similarity_threshold = similarity_threshold
def calculate_string_similarity(self, s1: str, s2: str) -> float:
"""
计算字符串相似度
Args:
s1: 字符串1
s2: 字符串2
Returns:
相似度分数 (0-1)
"""
if not s1 or not s2:
return 0.0
s1, s2 = s1.lower().strip(), s2.lower().strip()
# 完全匹配
if s1 == s2:
return 1.0
# 包含关系
if s1 in s2 or s2 in s1:
return 0.9
# 编辑距离相似度
return SequenceMatcher(None, s1, s2).ratio()
def calculate_entity_similarity(self, entity1: Dict, entity2: Dict) -> Tuple[float, str]:
"""
计算两个实体的综合相似度
Args:
entity1: 实体1信息
entity2: 实体2信息
Returns:
(相似度, 匹配类型)
"""
# 名称相似度
name_sim = self.calculate_string_similarity(
entity1.get('name', ''),
entity2.get('name', '')
)
# 如果名称完全匹配
if name_sim == 1.0:
return 1.0, 'exact'
# 检查别名
aliases1 = set(a.lower() for a in entity1.get('aliases', []))
aliases2 = set(a.lower() for a in entity2.get('aliases', []))
if aliases1 & aliases2: # 有共同别名
return 0.95, 'alias_match'
if entity2.get('name', '').lower() in aliases1:
return 0.95, 'alias_match'
if entity1.get('name', '').lower() in aliases2:
return 0.95, 'alias_match'
# 定义相似度
def_sim = self.calculate_string_similarity(
entity1.get('definition', ''),
entity2.get('definition', '')
)
# 综合相似度
combined_sim = name_sim * 0.7 + def_sim * 0.3
if combined_sim >= self.similarity_threshold:
return combined_sim, 'fuzzy'
return combined_sim, 'none'
def find_matching_entity(self, query_entity: Dict,
candidate_entities: List[Dict],
exclude_ids: Set[str] = None) -> Optional[AlignmentResult]:
"""
在候选实体中查找匹配的实体
Args:
query_entity: 查询实体
candidate_entities: 候选实体列表
exclude_ids: 排除的实体ID
Returns:
对齐结果
"""
exclude_ids = exclude_ids or set()
best_match = None
best_similarity = 0.0
for candidate in candidate_entities:
if candidate.get('id') in exclude_ids:
continue
similarity, match_type = self.calculate_entity_similarity(
query_entity, candidate
)
if similarity > best_similarity and similarity >= self.similarity_threshold:
best_similarity = similarity
best_match = candidate
best_match_type = match_type
if best_match:
return AlignmentResult(
entity_id=query_entity.get('id'),
matched_entity_id=best_match.get('id'),
similarity=best_similarity,
match_type=best_match_type,
confidence=best_similarity
)
return None
def align_cross_modal_entities(self, project_id: str,
audio_entities: List[Dict],
video_entities: List[Dict],
image_entities: List[Dict],
document_entities: List[Dict]) -> List[EntityLink]:
"""
跨模态实体对齐
Args:
project_id: 项目ID
audio_entities: 音频模态实体
video_entities: 视频模态实体
image_entities: 图片模态实体
document_entities: 文档模态实体
Returns:
实体关联列表
"""
links = []
# 合并所有实体
all_entities = {
'audio': audio_entities,
'video': video_entities,
'image': image_entities,
'document': document_entities
}
# 跨模态对齐
for mod1 in self.MODALITIES:
for mod2 in self.MODALITIES:
if mod1 >= mod2: # 避免重复比较
continue
entities1 = all_entities.get(mod1, [])
entities2 = all_entities.get(mod2, [])
for ent1 in entities1:
# 在另一个模态中查找匹配
result = self.find_matching_entity(ent1, entities2)
if result and result.matched_entity_id:
link = EntityLink(
id=str(uuid.uuid4())[:8],
project_id=project_id,
source_entity_id=ent1.get('id'),
target_entity_id=result.matched_entity_id,
link_type='same_as' if result.similarity > 0.95 else 'related_to',
source_modality=mod1,
target_modality=mod2,
confidence=result.confidence,
evidence=f"Cross-modal alignment: {result.match_type}"
)
links.append(link)
return links
def fuse_entity_knowledge(self, entity_id: str,
linked_entities: List[Dict],
multimodal_mentions: List[Dict]) -> FusionResult:
"""
融合多模态实体知识
Args:
entity_id: 主实体ID
linked_entities: 关联的实体信息列表
multimodal_mentions: 多模态提及列表
Returns:
融合结果
"""
# 收集所有属性
fused_properties = {
'names': set(),
'definitions': [],
'aliases': set(),
'types': set(),
'modalities': set(),
'contexts': []
}
merged_ids = []
for entity in linked_entities:
merged_ids.append(entity.get('id'))
# 收集名称
fused_properties['names'].add(entity.get('name', ''))
# 收集定义
if entity.get('definition'):
fused_properties['definitions'].append(entity.get('definition'))
# 收集别名
fused_properties['aliases'].update(entity.get('aliases', []))
# 收集类型
fused_properties['types'].add(entity.get('type', 'OTHER'))
# 收集模态和上下文
for mention in multimodal_mentions:
fused_properties['modalities'].add(mention.get('source_type', ''))
if mention.get('mention_context'):
fused_properties['contexts'].append(mention.get('mention_context'))
# 选择最佳定义(最长的那个)
best_definition = max(fused_properties['definitions'], key=len) \
if fused_properties['definitions'] else ""
# 选择最佳名称(最常见的那个)
from collections import Counter
name_counts = Counter(fused_properties['names'])
best_name = name_counts.most_common(1)[0][0] if name_counts else ""
# 构建融合结果
return FusionResult(
canonical_entity_id=entity_id,
merged_entity_ids=merged_ids,
fused_properties={
'name': best_name,
'definition': best_definition,
'aliases': list(fused_properties['aliases']),
'types': list(fused_properties['types']),
'modalities': list(fused_properties['modalities']),
'contexts': fused_properties['contexts'][:10] # 最多10个上下文
},
source_modalities=list(fused_properties['modalities']),
confidence=min(1.0, len(linked_entities) * 0.2 + 0.5)
)
def detect_entity_conflicts(self, entities: List[Dict]) -> List[Dict]:
"""
检测实体冲突(同名但不同义)
Args:
entities: 实体列表
Returns:
冲突列表
"""
conflicts = []
# 按名称分组
name_groups = {}
for entity in entities:
name = entity.get('name', '').lower()
if name:
if name not in name_groups:
name_groups[name] = []
name_groups[name].append(entity)
# 检测同名但定义不同的实体
for name, group in name_groups.items():
if len(group) > 1:
# 检查定义是否相似
definitions = [e.get('definition', '') for e in group if e.get('definition')]
if len(definitions) > 1:
# 计算定义之间的相似度
sim_matrix = []
for i, d1 in enumerate(definitions):
for j, d2 in enumerate(definitions):
if i < j:
sim = self.calculate_string_similarity(d1, d2)
sim_matrix.append(sim)
# 如果定义相似度都很低,可能是冲突
if sim_matrix and all(s < 0.5 for s in sim_matrix):
conflicts.append({
'name': name,
'entities': group,
'type': 'homonym_conflict',
'suggestion': 'Consider disambiguating these entities'
})
return conflicts
def suggest_entity_merges(self, entities: List[Dict],
existing_links: List[EntityLink] = None) -> List[Dict]:
"""
建议实体合并
Args:
entities: 实体列表
existing_links: 现有实体关联
Returns:
合并建议列表
"""
suggestions = []
existing_pairs = set()
# 记录已有的关联
if existing_links:
for link in existing_links:
pair = tuple(sorted([link.source_entity_id, link.target_entity_id]))
existing_pairs.add(pair)
# 检查所有实体对
for i, ent1 in enumerate(entities):
for j, ent2 in enumerate(entities):
if i >= j:
continue
# 检查是否已有关联
pair = tuple(sorted([ent1.get('id'), ent2.get('id')]))
if pair in existing_pairs:
continue
# 计算相似度
similarity, match_type = self.calculate_entity_similarity(ent1, ent2)
if similarity >= self.similarity_threshold:
suggestions.append({
'entity1': ent1,
'entity2': ent2,
'similarity': similarity,
'match_type': match_type,
'suggested_action': 'merge' if similarity > 0.95 else 'link'
})
# 按相似度排序
suggestions.sort(key=lambda x: x['similarity'], reverse=True)
return suggestions
def create_multimodal_entity_record(self, project_id: str,
entity_id: str,
source_type: str,
source_id: str,
mention_context: str = "",
confidence: float = 1.0) -> MultimodalEntity:
"""
创建多模态实体记录
Args:
project_id: 项目ID
entity_id: 实体ID
source_type: 来源类型
source_id: 来源ID
mention_context: 提及上下文
confidence: 置信度
Returns:
多模态实体记录
"""
return MultimodalEntity(
id=str(uuid.uuid4())[:8],
entity_id=entity_id,
project_id=project_id,
name="", # 将在后续填充
source_type=source_type,
source_id=source_id,
mention_context=mention_context,
confidence=confidence
)
def analyze_modality_distribution(self, multimodal_entities: List[MultimodalEntity]) -> Dict:
"""
分析模态分布
Args:
multimodal_entities: 多模态实体列表
Returns:
模态分布统计
"""
distribution = {mod: 0 for mod in self.MODALITIES}
cross_modal_entities = set()
# 统计每个模态的实体数
for me in multimodal_entities:
if me.source_type in distribution:
distribution[me.source_type] += 1
# 统计跨模态实体
entity_modalities = {}
for me in multimodal_entities:
if me.entity_id not in entity_modalities:
entity_modalities[me.entity_id] = set()
entity_modalities[me.entity_id].add(me.source_type)
cross_modal_count = sum(1 for mods in entity_modalities.values() if len(mods) > 1)
return {
'modality_distribution': distribution,
'total_multimodal_records': len(multimodal_entities),
'unique_entities': len(entity_modalities),
'cross_modal_entities': cross_modal_count,
'cross_modal_ratio': cross_modal_count / len(entity_modalities) if entity_modalities else 0
}
# Singleton instance
_multimodal_entity_linker = None
def get_multimodal_entity_linker(similarity_threshold: float = 0.85) -> MultimodalEntityLinker:
"""获取多模态实体关联器单例"""
global _multimodal_entity_linker
if _multimodal_entity_linker is None:
_multimodal_entity_linker = MultimodalEntityLinker(similarity_threshold)
return _multimodal_entity_linker

View File

@@ -0,0 +1,434 @@
#!/usr/bin/env python3
"""
InsightFlow Multimodal Processor - Phase 7
视频处理模块提取音频、关键帧、OCR识别
"""
import os
import json
import uuid
import tempfile
import subprocess
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
# 尝试导入OCR库
try:
import pytesseract
from PIL import Image
PYTESSERACT_AVAILABLE = True
except ImportError:
PYTESSERACT_AVAILABLE = False
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import ffmpeg
FFMPEG_AVAILABLE = True
except ImportError:
FFMPEG_AVAILABLE = False
@dataclass
class VideoFrame:
"""视频关键帧数据类"""
id: str
video_id: str
frame_number: int
timestamp: float
frame_path: str
ocr_text: str = ""
ocr_confidence: float = 0.0
entities_detected: List[Dict] = None
def __post_init__(self):
if self.entities_detected is None:
self.entities_detected = []
@dataclass
class VideoInfo:
"""视频信息数据类"""
id: str
project_id: str
filename: str
file_path: str
duration: float = 0.0
width: int = 0
height: int = 0
fps: float = 0.0
audio_extracted: bool = False
audio_path: str = ""
transcript_id: str = ""
status: str = "pending"
error_message: str = ""
metadata: Dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
@dataclass
class VideoProcessingResult:
"""视频处理结果"""
video_id: str
audio_path: str
frames: List[VideoFrame]
ocr_results: List[Dict]
full_text: str # 整合的文本(音频转录 + OCR文本
success: bool
error_message: str = ""
class MultimodalProcessor:
"""多模态处理器 - 处理视频文件"""
def __init__(self, temp_dir: str = None, frame_interval: int = 5):
"""
初始化多模态处理器
Args:
temp_dir: 临时文件目录
frame_interval: 关键帧提取间隔(秒)
"""
self.temp_dir = temp_dir or tempfile.gettempdir()
self.frame_interval = frame_interval
self.video_dir = os.path.join(self.temp_dir, "videos")
self.frames_dir = os.path.join(self.temp_dir, "frames")
self.audio_dir = os.path.join(self.temp_dir, "audio")
# 创建目录
os.makedirs(self.video_dir, exist_ok=True)
os.makedirs(self.frames_dir, exist_ok=True)
os.makedirs(self.audio_dir, exist_ok=True)
def extract_video_info(self, video_path: str) -> Dict:
"""
提取视频基本信息
Args:
video_path: 视频文件路径
Returns:
视频信息字典
"""
try:
if FFMPEG_AVAILABLE:
probe = ffmpeg.probe(video_path)
video_stream = next((s for s in probe['streams'] if s['codec_type'] == 'video'), None)
audio_stream = next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None)
if video_stream:
return {
'duration': float(probe['format'].get('duration', 0)),
'width': int(video_stream.get('width', 0)),
'height': int(video_stream.get('height', 0)),
'fps': eval(video_stream.get('r_frame_rate', '0/1')),
'has_audio': audio_stream is not None,
'bitrate': int(probe['format'].get('bit_rate', 0))
}
else:
# 使用 ffprobe 命令行
cmd = [
'ffprobe', '-v', 'error', '-show_entries',
'format=duration,bit_rate', '-show_entries',
'stream=width,height,r_frame_rate', '-of', 'json',
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
'duration': float(data['format'].get('duration', 0)),
'width': int(data['streams'][0].get('width', 0)) if data['streams'] else 0,
'height': int(data['streams'][0].get('height', 0)) if data['streams'] else 0,
'fps': 30.0, # 默认值
'has_audio': len(data['streams']) > 1,
'bitrate': int(data['format'].get('bit_rate', 0))
}
except Exception as e:
print(f"Error extracting video info: {e}")
return {
'duration': 0,
'width': 0,
'height': 0,
'fps': 0,
'has_audio': False,
'bitrate': 0
}
def extract_audio(self, video_path: str, output_path: str = None) -> str:
"""
从视频中提取音频
Args:
video_path: 视频文件路径
output_path: 输出音频路径(可选)
Returns:
提取的音频文件路径
"""
if output_path is None:
video_name = Path(video_path).stem
output_path = os.path.join(self.audio_dir, f"{video_name}.wav")
try:
if FFMPEG_AVAILABLE:
(
ffmpeg
.input(video_path)
.output(output_path, ac=1, ar=16000, vn=None)
.overwrite_output()
.run(quiet=True)
)
else:
# 使用命令行 ffmpeg
cmd = [
'ffmpeg', '-i', video_path,
'-vn', '-acodec', 'pcm_s16le',
'-ac', '1', '-ar', '16000',
'-y', output_path
]
subprocess.run(cmd, check=True, capture_output=True)
return output_path
except Exception as e:
print(f"Error extracting audio: {e}")
raise
def extract_keyframes(self, video_path: str, video_id: str,
interval: int = None) -> List[str]:
"""
从视频中提取关键帧
Args:
video_path: 视频文件路径
video_id: 视频ID
interval: 提取间隔(秒),默认使用初始化时的间隔
Returns:
提取的帧文件路径列表
"""
interval = interval or self.frame_interval
frame_paths = []
# 创建帧存储目录
video_frames_dir = os.path.join(self.frames_dir, video_id)
os.makedirs(video_frames_dir, exist_ok=True)
try:
if CV2_AVAILABLE:
# 使用 OpenCV 提取帧
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_interval_frames = int(fps * interval)
frame_number = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_number % frame_interval_frames == 0:
timestamp = frame_number / fps
frame_path = os.path.join(
video_frames_dir,
f"frame_{frame_number:06d}_{timestamp:.2f}.jpg"
)
cv2.imwrite(frame_path, frame)
frame_paths.append(frame_path)
frame_number += 1
cap.release()
else:
# 使用 ffmpeg 命令行提取帧
video_name = Path(video_path).stem
output_pattern = os.path.join(video_frames_dir, "frame_%06d_%t.jpg")
cmd = [
'ffmpeg', '-i', video_path,
'-vf', f'fps=1/{interval}',
'-frame_pts', '1',
'-y', output_pattern
]
subprocess.run(cmd, check=True, capture_output=True)
# 获取生成的帧文件列表
frame_paths = sorted([
os.path.join(video_frames_dir, f)
for f in os.listdir(video_frames_dir)
if f.startswith('frame_')
])
except Exception as e:
print(f"Error extracting keyframes: {e}")
return frame_paths
def perform_ocr(self, image_path: str) -> Tuple[str, float]:
"""
对图片进行OCR识别
Args:
image_path: 图片文件路径
Returns:
(识别的文本, 置信度)
"""
if not PYTESSERACT_AVAILABLE:
return "", 0.0
try:
image = Image.open(image_path)
# 预处理:转换为灰度图
if image.mode != 'L':
image = image.convert('L')
# 使用 pytesseract 进行 OCR
text = pytesseract.image_to_string(image, lang='chi_sim+eng')
# 获取置信度数据
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
confidences = [int(c) for c in data['conf'] if int(c) > 0]
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
return text.strip(), avg_confidence / 100.0
except Exception as e:
print(f"OCR error for {image_path}: {e}")
return "", 0.0
def process_video(self, video_data: bytes, filename: str,
project_id: str, video_id: str = None) -> VideoProcessingResult:
"""
处理视频文件提取音频、关键帧、OCR
Args:
video_data: 视频文件二进制数据
filename: 视频文件名
project_id: 项目ID
video_id: 视频ID可选自动生成
Returns:
视频处理结果
"""
video_id = video_id or str(uuid.uuid4())[:8]
try:
# 保存视频文件
video_path = os.path.join(self.video_dir, f"{video_id}_{filename}")
with open(video_path, 'wb') as f:
f.write(video_data)
# 提取视频信息
video_info = self.extract_video_info(video_path)
# 提取音频
audio_path = ""
if video_info['has_audio']:
audio_path = self.extract_audio(video_path)
# 提取关键帧
frame_paths = self.extract_keyframes(video_path, video_id)
# 对关键帧进行 OCR
frames = []
ocr_results = []
all_ocr_text = []
for i, frame_path in enumerate(frame_paths):
# 解析帧信息
frame_name = os.path.basename(frame_path)
parts = frame_name.replace('.jpg', '').split('_')
frame_number = int(parts[1]) if len(parts) > 1 else i
timestamp = float(parts[2]) if len(parts) > 2 else i * self.frame_interval
# OCR 识别
ocr_text, confidence = self.perform_ocr(frame_path)
frame = VideoFrame(
id=str(uuid.uuid4())[:8],
video_id=video_id,
frame_number=frame_number,
timestamp=timestamp,
frame_path=frame_path,
ocr_text=ocr_text,
ocr_confidence=confidence
)
frames.append(frame)
if ocr_text:
ocr_results.append({
'frame_number': frame_number,
'timestamp': timestamp,
'text': ocr_text,
'confidence': confidence
})
all_ocr_text.append(ocr_text)
# 整合所有 OCR 文本
full_ocr_text = "\n\n".join(all_ocr_text)
return VideoProcessingResult(
video_id=video_id,
audio_path=audio_path,
frames=frames,
ocr_results=ocr_results,
full_text=full_ocr_text,
success=True
)
except Exception as e:
return VideoProcessingResult(
video_id=video_id,
audio_path="",
frames=[],
ocr_results=[],
full_text="",
success=False,
error_message=str(e)
)
def cleanup(self, video_id: str = None):
"""
清理临时文件
Args:
video_id: 视频ID可选清理特定视频的文件
"""
import shutil
if video_id:
# 清理特定视频的文件
for dir_path in [self.video_dir, self.frames_dir, self.audio_dir]:
target_dir = os.path.join(dir_path, video_id) if dir_path == self.frames_dir else dir_path
if os.path.exists(target_dir):
for f in os.listdir(target_dir):
if video_id in f:
os.remove(os.path.join(target_dir, f))
else:
# 清理所有临时文件
for dir_path in [self.video_dir, self.frames_dir, self.audio_dir]:
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
os.makedirs(dir_path, exist_ok=True)
# Singleton instance
_multimodal_processor = None
def get_multimodal_processor(temp_dir: str = None, frame_interval: int = 5) -> MultimodalProcessor:
"""获取多模态处理器单例"""
global _multimodal_processor
if _multimodal_processor is None:
_multimodal_processor = MultimodalProcessor(temp_dir, frame_interval)
return _multimodal_processor

1366
backend/plugin_manager.py Normal file

File diff suppressed because it is too large Load Diff

223
backend/rate_limiter.py Normal file
View File

@@ -0,0 +1,223 @@
#!/usr/bin/env python3
"""
InsightFlow Rate Limiter - Phase 6
API 限流中间件
支持基于内存的滑动窗口限流
"""
import time
import asyncio
from typing import Dict, Optional, Tuple, Callable
from dataclasses import dataclass, field
from collections import defaultdict
from functools import wraps
@dataclass
class RateLimitConfig:
"""限流配置"""
requests_per_minute: int = 60
burst_size: int = 10 # 突发请求数
window_size: int = 60 # 窗口大小(秒)
@dataclass
class RateLimitInfo:
"""限流信息"""
allowed: bool
remaining: int
reset_time: int # 重置时间戳
retry_after: int # 需要等待的秒数
class SlidingWindowCounter:
"""滑动窗口计数器"""
def __init__(self, window_size: int = 60):
self.window_size = window_size
self.requests: Dict[int, int] = defaultdict(int) # 秒级计数
self._lock = asyncio.Lock()
async def add_request(self) -> int:
"""添加请求,返回当前窗口内的请求数"""
async with self._lock:
now = int(time.time())
self.requests[now] += 1
self._cleanup_old(now)
return sum(self.requests.values())
async def get_count(self) -> int:
"""获取当前窗口内的请求数"""
async with self._lock:
now = int(time.time())
self._cleanup_old(now)
return sum(self.requests.values())
def _cleanup_old(self, now: int):
"""清理过期的请求记录"""
cutoff = now - self.window_size
old_keys = [k for k in self.requests.keys() if k < cutoff]
for k in old_keys:
del self.requests[k]
class RateLimiter:
"""API 限流器"""
def __init__(self):
# key -> SlidingWindowCounter
self.counters: Dict[str, SlidingWindowCounter] = {}
# key -> RateLimitConfig
self.configs: Dict[str, RateLimitConfig] = {}
self._lock = asyncio.Lock()
async def is_allowed(
self,
key: str,
config: Optional[RateLimitConfig] = None
) -> RateLimitInfo:
"""
检查是否允许请求
Args:
key: 限流键(如 API Key ID
config: 限流配置,如果为 None 则使用默认配置
Returns:
RateLimitInfo
"""
if config is None:
config = RateLimitConfig()
async with self._lock:
if key not in self.counters:
self.counters[key] = SlidingWindowCounter(config.window_size)
self.configs[key] = config
counter = self.counters[key]
stored_config = self.configs.get(key, config)
# 获取当前计数
current_count = await counter.get_count()
# 计算剩余配额
remaining = max(0, stored_config.requests_per_minute - current_count)
# 计算重置时间
now = int(time.time())
reset_time = now + stored_config.window_size
# 检查是否超过限制
if current_count >= stored_config.requests_per_minute:
return RateLimitInfo(
allowed=False,
remaining=0,
reset_time=reset_time,
retry_after=stored_config.window_size
)
# 允许请求,增加计数
await counter.add_request()
return RateLimitInfo(
allowed=True,
remaining=remaining - 1,
reset_time=reset_time,
retry_after=0
)
async def get_limit_info(self, key: str) -> RateLimitInfo:
"""获取限流信息(不增加计数)"""
if key not in self.counters:
config = RateLimitConfig()
return RateLimitInfo(
allowed=True,
remaining=config.requests_per_minute,
reset_time=int(time.time()) + config.window_size,
retry_after=0
)
counter = self.counters[key]
config = self.configs.get(key, RateLimitConfig())
current_count = await counter.get_count()
remaining = max(0, config.requests_per_minute - current_count)
reset_time = int(time.time()) + config.window_size
return RateLimitInfo(
allowed=current_count < config.requests_per_minute,
remaining=remaining,
reset_time=reset_time,
retry_after=max(0, config.window_size) if current_count >= config.requests_per_minute else 0
)
def reset(self, key: Optional[str] = None):
"""重置限流计数器"""
if key:
self.counters.pop(key, None)
self.configs.pop(key, None)
else:
self.counters.clear()
self.configs.clear()
# 全局限流器实例
_rate_limiter: Optional[RateLimiter] = None
def get_rate_limiter() -> RateLimiter:
"""获取限流器实例"""
global _rate_limiter
if _rate_limiter is None:
_rate_limiter = RateLimiter()
return _rate_limiter
# 限流装饰器(用于函数级别限流)
def rate_limit(
requests_per_minute: int = 60,
key_func: Optional[Callable] = None
):
"""
限流装饰器
Args:
requests_per_minute: 每分钟请求数限制
key_func: 生成限流键的函数,默认为 None使用函数名
"""
def decorator(func):
limiter = get_rate_limiter()
config = RateLimitConfig(requests_per_minute=requests_per_minute)
@wraps(func)
async def async_wrapper(*args, **kwargs):
key = key_func(*args, **kwargs) if key_func else func.__name__
info = await limiter.is_allowed(key, config)
if not info.allowed:
raise RateLimitExceeded(
f"Rate limit exceeded. Try again in {info.retry_after} seconds."
)
return await func(*args, **kwargs)
@wraps(func)
def sync_wrapper(*args, **kwargs):
key = key_func(*args, **kwargs) if key_func else func.__name__
# 同步版本使用 asyncio.run
info = asyncio.run(limiter.is_allowed(key, config))
if not info.allowed:
raise RateLimitExceeded(
f"Rate limit exceeded. Try again in {info.retry_after} seconds."
)
return func(*args, **kwargs)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
class RateLimitExceeded(Exception):
"""限流异常"""
pass

View File

@@ -30,3 +30,26 @@ cairosvg==2.7.1
# Neo4j Graph Database
neo4j==5.15.0
# API Documentation (Swagger/OpenAPI)
fastapi-offline-swagger==0.1.0
# Phase 7: Workflow Automation
apscheduler==3.10.4
# Phase 7: Multimodal Support
ffmpeg-python==0.2.0
pillow==10.2.0
opencv-python==4.9.0.80
pytesseract==0.3.10
# Phase 7 Task 7: Plugin & Integration
webdav4==0.9.8
urllib3==2.2.0
# Phase 7: Plugin & Integration
beautifulsoup4==4.12.3
webdavclient3==3.14.6
# Phase 7 Task 3: Security & Compliance
cryptography==42.0.0

View File

@@ -80,7 +80,7 @@ CREATE TABLE IF NOT EXISTS attribute_templates (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
name TEXT NOT NULL,
type TEXT NOT NULL, -- text/number/date/select/multiselect
type TEXT NOT NULL, -- text/number/date/select/multiselect/boolean
description TEXT,
options TEXT, -- JSON 数组,用于 select/multiselect 类型
is_required INTEGER DEFAULT 0,
@@ -111,54 +111,13 @@ CREATE TABLE IF NOT EXISTS entity_attributes (
CREATE TABLE IF NOT EXISTS attribute_history (
id TEXT PRIMARY KEY,
entity_id TEXT NOT NULL,
template_id TEXT,
attribute_name TEXT NOT NULL,
old_value TEXT,
new_value TEXT,
changed_by TEXT, -- 用户ID或系统
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
change_reason TEXT,
FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE
);
-- Phase 5: 属性模板表(项目级自定义属性定义)
CREATE TABLE IF NOT EXISTS attribute_templates (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
name TEXT NOT NULL, -- 属性名称,如"年龄"、"职位"
type TEXT NOT NULL, -- 属性类型: text, number, date, select, multiselect, boolean
options TEXT, -- JSON 数组,用于 select/multiselect 类型
default_value TEXT, -- 默认值
description TEXT, -- 属性描述
is_required BOOLEAN DEFAULT 0, -- 是否必填
display_order INTEGER DEFAULT 0, -- 显示顺序
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- Phase 5: 实体属性值表
CREATE TABLE IF NOT EXISTS entity_attributes (
id TEXT PRIMARY KEY,
entity_id TEXT NOT NULL,
template_id TEXT NOT NULL,
value TEXT, -- 属性值以JSON或字符串形式存储
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE,
FOREIGN KEY (template_id) REFERENCES attribute_templates(id) ON DELETE CASCADE,
UNIQUE(entity_id, template_id) -- 每个实体每个属性只能有一个值
);
-- Phase 5: 属性变更历史表
CREATE TABLE IF NOT EXISTS attribute_history (
id TEXT PRIMARY KEY,
entity_id TEXT NOT NULL,
template_id TEXT NOT NULL,
old_value TEXT,
new_value TEXT,
changed_by TEXT, -- 用户ID或"system"
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
change_reason TEXT, -- 变更原因
FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE,
FOREIGN KEY (template_id) REFERENCES attribute_templates(id) ON DELETE CASCADE
);
@@ -178,90 +137,499 @@ CREATE INDEX IF NOT EXISTS idx_entity_attributes_entity ON entity_attributes(ent
CREATE INDEX IF NOT EXISTS idx_entity_attributes_template ON entity_attributes(template_id);
CREATE INDEX IF NOT EXISTS idx_attr_history_entity ON attribute_history(entity_id);
-- Phase 7: 协作与共享 - 项目分享
CREATE TABLE IF NOT EXISTS project_shares (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
token TEXT NOT NULL UNIQUE, -- 分享令牌
permission TEXT DEFAULT 'read_only', -- 权限级别: read_only, comment, edit, admin
created_by TEXT NOT NULL, -- 创建者
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP, -- 过期时间
max_uses INTEGER, -- 最大使用次数
use_count INTEGER DEFAULT 0, -- 已使用次数
password_hash TEXT, -- 密码保护(哈希)
is_active BOOLEAN DEFAULT 1, -- 是否激活
allow_download BOOLEAN DEFAULT 0, -- 允许下载
allow_export BOOLEAN DEFAULT 0, -- 允许导出
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
);
-- Phase 7: 工作流相关
-- Phase 7: 协作与共享 - 评论
CREATE TABLE IF NOT EXISTS comments (
-- 工作流配置
CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
workflow_type TEXT NOT NULL, -- auto_analyze, auto_align, auto_relation, scheduled_report, custom
project_id TEXT NOT NULL,
target_type TEXT NOT NULL, -- 目标类型: entity, relation, transcript, project
target_id TEXT NOT NULL, -- 目标ID
parent_id TEXT, -- 父评论ID(支持回复)
author TEXT NOT NULL, -- 作者ID
author_name TEXT, -- 作者显示名
content TEXT NOT NULL, -- 评论内容
status TEXT DEFAULT 'active', -- active, paused, error, completed
schedule TEXT, -- cron expression or interval minutes
schedule_type TEXT DEFAULT 'manual', -- manual, cron, interval
config TEXT, -- JSON: workflow specific configuration
webhook_ids TEXT, -- JSON array of webhook config IDs
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
resolved BOOLEAN DEFAULT 0, -- 是否已解决
resolved_by TEXT, -- 解决者
resolved_at TIMESTAMP, -- 解决时间
mentions TEXT, -- JSON数组: 提及的用户
attachments TEXT, -- JSON数组: 附件
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE,
FOREIGN KEY (parent_id) REFERENCES comments(id) ON DELETE CASCADE
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
run_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- Phase 7: 协作与共享 - 变更历史
CREATE TABLE IF NOT EXISTS change_history (
-- 工作流任务
CREATE TABLE IF NOT EXISTS workflow_tasks (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
name TEXT NOT NULL,
task_type TEXT NOT NULL, -- analyze, align, discover_relations, notify, custom
config TEXT, -- JSON: task specific configuration
task_order INTEGER DEFAULT 0,
depends_on TEXT, -- JSON array of task IDs
timeout_seconds INTEGER DEFAULT 300,
retry_count INTEGER DEFAULT 3,
retry_delay INTEGER DEFAULT 5,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE
);
-- Webhook 配置表
CREATE TABLE IF NOT EXISTS webhook_configs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
webhook_type TEXT NOT NULL, -- feishu, dingtalk, slack, custom
url TEXT NOT NULL,
secret TEXT, -- for signature verification
headers TEXT, -- JSON: custom headers
template TEXT, -- message template
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0
);
-- 工作流执行日志表
CREATE TABLE IF NOT EXISTS workflow_logs (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
task_id TEXT, -- NULL if workflow-level log
status TEXT DEFAULT 'pending', -- pending, running, success, failed, cancelled
start_time TIMESTAMP,
end_time TIMESTAMP,
duration_ms INTEGER,
input_data TEXT, -- JSON: input parameters
output_data TEXT, -- JSON: execution results
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
FOREIGN KEY (task_id) REFERENCES workflow_tasks(id) ON DELETE SET NULL
);
-- Phase 7: 工作流相关索引
CREATE INDEX IF NOT EXISTS idx_workflows_project ON workflows(project_id);
CREATE INDEX IF NOT EXISTS idx_workflows_status ON workflows(status);
CREATE INDEX IF NOT EXISTS idx_workflows_type ON workflows(workflow_type);
CREATE INDEX IF NOT EXISTS idx_workflow_tasks_workflow ON workflow_tasks(workflow_id);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_workflow ON workflow_logs(workflow_id);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_task ON workflow_logs(task_id);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_status ON workflow_logs(status);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_created ON workflow_logs(created_at);
-- Phase 7: 多模态支持相关表
-- 视频表
CREATE TABLE IF NOT EXISTS videos (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
change_type TEXT NOT NULL, -- 变更类型: create, update, delete, merge, split
entity_type TEXT NOT NULL, -- 实体类型: entity, relation, transcript, project
entity_id TEXT NOT NULL, -- 实体ID
entity_name TEXT, -- 实体名称(用于显示)
changed_by TEXT NOT NULL, -- 变更者ID
changed_by_name TEXT, -- 变更者显示名
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
old_value TEXT, -- JSON: 旧值
new_value TEXT, -- JSON: 新值
description TEXT, -- 变更描述
session_id TEXT, -- 会话ID(批量变更关联)
reverted BOOLEAN DEFAULT 0, -- 是否已回滚
reverted_at TIMESTAMP, -- 回滚时间
reverted_by TEXT, -- 回滚者
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
filename TEXT NOT NULL,
duration REAL, -- 视频时长(秒)
fps REAL, -- 帧率
resolution TEXT, -- JSON: {"width": int, "height": int}
audio_transcript_id TEXT, -- 关联的音频转录ID
full_ocr_text TEXT, -- 所有帧OCR文本合并
extracted_entities TEXT, -- JSON: 提取的实体列表
extracted_relations TEXT, -- JSON: 提取的关系列表
status TEXT DEFAULT 'processing', -- processing, completed, failed
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (audio_transcript_id) REFERENCES transcripts(id)
);
-- Phase 7: 协作与共享 - 团队成员
CREATE TABLE IF NOT EXISTS team_members (
-- 视频关键帧
CREATE TABLE IF NOT EXISTS video_frames (
id TEXT PRIMARY KEY,
video_id TEXT NOT NULL,
frame_number INTEGER,
timestamp REAL, -- 时间戳(秒)
image_data BLOB, -- 帧图片数据可选可存储在OSS
image_url TEXT, -- 图片URL如果存储在OSS
ocr_text TEXT, -- OCR识别文本
extracted_entities TEXT, -- JSON: 该帧提取的实体
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (video_id) REFERENCES videos(id) ON DELETE CASCADE
);
-- 图片表
CREATE TABLE IF NOT EXISTS images (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
user_id TEXT NOT NULL, -- 用户ID
user_name TEXT, -- 用户名
user_email TEXT, -- 用户邮箱
role TEXT DEFAULT 'viewer', -- 角色: owner, admin, editor, viewer, commenter
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
invited_by TEXT, -- 邀请者
last_active_at TIMESTAMP, -- 最后活跃时间
permissions TEXT, -- JSON数组: 具体权限列表
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE,
UNIQUE(project_id, user_id) -- 每个项目每个用户只能有一条记录
filename TEXT NOT NULL,
image_data BLOB, -- 图片数据(可选)
image_url TEXT, -- 图片URL
ocr_text TEXT, -- OCR识别文本
description TEXT, -- 图片描述LLM生成
extracted_entities TEXT, -- JSON: 提取的实体列表
extracted_relations TEXT, -- JSON: 提取的关系列表
status TEXT DEFAULT 'processing',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- Phase 7: 协作与共享索引
CREATE INDEX IF NOT EXISTS idx_shares_project ON project_shares(project_id);
CREATE INDEX IF NOT EXISTS idx_shares_token ON project_shares(token);
CREATE INDEX IF NOT EXISTS idx_comments_project ON comments(project_id);
CREATE INDEX IF NOT EXISTS idx_comments_target ON comments(target_type, target_id);
CREATE INDEX IF NOT EXISTS idx_comments_parent ON comments(parent_id);
CREATE INDEX IF NOT EXISTS idx_change_history_project ON change_history(project_id);
CREATE INDEX IF NOT EXISTS idx_change_history_entity ON change_history(entity_type, entity_id);
CREATE INDEX IF NOT EXISTS idx_change_history_session ON change_history(session_id);
CREATE INDEX IF NOT EXISTS idx_team_members_project ON team_members(project_id);
CREATE INDEX IF NOT EXISTS idx_team_members_user ON team_members(user_id);
-- 多模态实体提及表
CREATE TABLE IF NOT EXISTS multimodal_mentions (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
entity_id TEXT NOT NULL,
modality TEXT NOT NULL, -- audio, video, image, document
source_id TEXT NOT NULL, -- transcript_id, video_id, image_id
source_type TEXT NOT NULL, -- 来源类型
position TEXT, -- JSON: 位置信息
text_snippet TEXT, -- 提及的文本片段
confidence REAL DEFAULT 1.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE
);
-- 多模态实体关联表
CREATE TABLE IF NOT EXISTS multimodal_entity_links (
id TEXT PRIMARY KEY,
entity_id TEXT NOT NULL,
linked_entity_id TEXT NOT NULL, -- 关联的实体ID
link_type TEXT NOT NULL, -- same_as, related_to, part_of
confidence REAL DEFAULT 1.0,
evidence TEXT, -- 关联证据
modalities TEXT, -- JSON: 涉及的模态列表
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE,
FOREIGN KEY (linked_entity_id) REFERENCES entities(id) ON DELETE CASCADE
);
-- 多模态相关索引
CREATE INDEX IF NOT EXISTS idx_videos_project ON videos(project_id);
CREATE INDEX IF NOT EXISTS idx_videos_status ON videos(status);
CREATE INDEX IF NOT EXISTS idx_video_frames_video ON video_frames(video_id);
CREATE INDEX IF NOT EXISTS idx_images_project ON images(project_id);
CREATE INDEX IF NOT EXISTS idx_images_status ON images(status);
CREATE INDEX IF NOT EXISTS idx_multimodal_mentions_project ON multimodal_mentions(project_id);
CREATE INDEX IF NOT EXISTS idx_multimodal_mentions_entity ON multimodal_mentions(entity_id);
CREATE INDEX IF NOT EXISTS idx_multimodal_mentions_modality ON multimodal_mentions(modality);
CREATE INDEX IF NOT EXISTS idx_multimodal_mentions_source ON multimodal_mentions(source_id);
CREATE INDEX IF NOT EXISTS idx_multimodal_links_entity ON multimodal_entity_links(entity_id);
CREATE INDEX IF NOT EXISTS idx_multimodal_links_linked ON multimodal_entity_links(linked_entity_id);
-- Phase 7 Task 7: 插件与集成相关表
-- 插件配置表
CREATE TABLE IF NOT EXISTS plugins (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
plugin_type TEXT NOT NULL, -- chrome_extension, feishu_bot, dingtalk_bot, zapier, make, webdav, custom
project_id TEXT,
status TEXT DEFAULT 'active', -- active, inactive, error, pending
config TEXT, -- JSON: plugin specific configuration
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
use_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 插件详细配置表
CREATE TABLE IF NOT EXISTS plugin_configs (
id TEXT PRIMARY KEY,
plugin_id TEXT NOT NULL,
config_key TEXT NOT NULL,
config_value TEXT,
is_encrypted BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (plugin_id) REFERENCES plugins(id) ON DELETE CASCADE,
UNIQUE(plugin_id, config_key)
);
-- 机器人会话表
CREATE TABLE IF NOT EXISTS bot_sessions (
id TEXT PRIMARY KEY,
bot_type TEXT NOT NULL, -- feishu, dingtalk
session_id TEXT NOT NULL, -- 群ID或会话ID
session_name TEXT NOT NULL,
project_id TEXT,
webhook_url TEXT,
secret TEXT, -- 签名密钥
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_message_at TIMESTAMP,
message_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- Webhook 端点表Zapier/Make集成
CREATE TABLE IF NOT EXISTS webhook_endpoints (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
endpoint_type TEXT NOT NULL, -- zapier, make, custom
endpoint_url TEXT NOT NULL,
project_id TEXT,
auth_type TEXT DEFAULT 'none', -- none, api_key, oauth, custom
auth_config TEXT, -- JSON: authentication configuration
trigger_events TEXT, -- JSON array: events that trigger this webhook
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_triggered_at TIMESTAMP,
trigger_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- WebDAV 同步配置表
CREATE TABLE IF NOT EXISTS webdav_syncs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
project_id TEXT NOT NULL,
server_url TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL, -- 建议加密存储
remote_path TEXT DEFAULT '/insightflow',
sync_mode TEXT DEFAULT 'bidirectional', -- bidirectional, upload_only, download_only
sync_interval INTEGER DEFAULT 3600, -- 秒
last_sync_at TIMESTAMP,
last_sync_status TEXT DEFAULT 'pending', -- pending, success, failed
last_sync_error TEXT,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
sync_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- Chrome 扩展令牌表
CREATE TABLE IF NOT EXISTS chrome_extension_tokens (
id TEXT PRIMARY KEY,
token_hash TEXT NOT NULL UNIQUE, -- SHA256 hash of the token
user_id TEXT,
project_id TEXT,
name TEXT,
permissions TEXT, -- JSON array: read, write, delete
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
use_count INTEGER DEFAULT 0,
is_revoked BOOLEAN DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 插件相关索引
CREATE INDEX IF NOT EXISTS idx_plugins_project ON plugins(project_id);
CREATE INDEX IF NOT EXISTS idx_plugins_type ON plugins(plugin_type);
CREATE INDEX IF NOT EXISTS idx_plugins_status ON plugins(status);
CREATE INDEX IF NOT EXISTS idx_plugin_configs_plugin ON plugin_configs(plugin_id);
CREATE INDEX IF NOT EXISTS idx_bot_sessions_project ON bot_sessions(project_id);
CREATE INDEX IF NOT EXISTS idx_bot_sessions_type ON bot_sessions(bot_type);
CREATE INDEX IF NOT EXISTS idx_webhook_endpoints_project ON webhook_endpoints(project_id);
CREATE INDEX IF NOT EXISTS idx_webhook_endpoints_type ON webhook_endpoints(endpoint_type);
CREATE INDEX IF NOT EXISTS idx_webdav_syncs_project ON webdav_syncs(project_id);
CREATE INDEX IF NOT EXISTS idx_chrome_tokens_project ON chrome_extension_tokens(project_id);
CREATE INDEX IF NOT EXISTS idx_chrome_tokens_hash ON chrome_extension_tokens(token_hash);
-- Phase 7: 插件与集成相关表
-- 插件表
CREATE TABLE IF NOT EXISTS plugins (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
plugin_type TEXT NOT NULL, -- chrome_extension, feishu_bot, dingtalk_bot, slack_bot, webhook, webdav, custom
project_id TEXT,
status TEXT DEFAULT 'active', -- active, inactive, error, pending
config TEXT, -- JSON: 插件配置
api_key TEXT UNIQUE, -- 用于认证的 API Key
api_secret TEXT, -- 用于签名验证的 Secret
webhook_url TEXT, -- 机器人 Webhook URL
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
use_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 机器人会话表
CREATE TABLE IF NOT EXISTS bot_sessions (
id TEXT PRIMARY KEY,
plugin_id TEXT NOT NULL,
platform TEXT NOT NULL, -- feishu, dingtalk, slack, wechat
session_id TEXT NOT NULL, -- 平台特定的会话ID
user_id TEXT,
user_name TEXT,
project_id TEXT, -- 关联的项目ID
context TEXT, -- JSON: 会话上下文
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_message_at TIMESTAMP,
message_count INTEGER DEFAULT 0,
FOREIGN KEY (plugin_id) REFERENCES plugins(id) ON DELETE CASCADE,
FOREIGN KEY (project_id) REFERENCES projects(id),
UNIQUE(plugin_id, session_id)
);
-- Webhook 端点表(用于 Zapier/Make 集成)
CREATE TABLE IF NOT EXISTS webhook_endpoints (
id TEXT PRIMARY KEY,
plugin_id TEXT NOT NULL,
name TEXT NOT NULL,
endpoint_path TEXT NOT NULL UNIQUE, -- 如 /webhook/zapier/abc123
endpoint_type TEXT NOT NULL, -- zapier, make, custom
secret TEXT, -- 用于签名验证
allowed_events TEXT, -- JSON: 允许的事件列表
target_project_id TEXT, -- 数据导入的目标项目
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_triggered_at TIMESTAMP,
trigger_count INTEGER DEFAULT 0,
FOREIGN KEY (plugin_id) REFERENCES plugins(id) ON DELETE CASCADE,
FOREIGN KEY (target_project_id) REFERENCES projects(id)
);
-- WebDAV 同步配置表
CREATE TABLE IF NOT EXISTS webdav_syncs (
id TEXT PRIMARY KEY,
plugin_id TEXT NOT NULL,
name TEXT NOT NULL,
server_url TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL, -- 建议加密存储
remote_path TEXT DEFAULT '/',
local_path TEXT DEFAULT './sync',
sync_direction TEXT DEFAULT 'bidirectional', -- upload, download, bidirectional
sync_mode TEXT DEFAULT 'manual', -- manual, realtime, scheduled
sync_schedule TEXT, -- cron expression
file_patterns TEXT, -- JSON: 文件匹配模式列表
auto_analyze BOOLEAN DEFAULT 1, -- 同步后自动分析
last_sync_at TIMESTAMP,
last_sync_status TEXT,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
sync_count INTEGER DEFAULT 0,
FOREIGN KEY (plugin_id) REFERENCES plugins(id) ON DELETE CASCADE
);
-- 插件活动日志表
CREATE TABLE IF NOT EXISTS plugin_activity_logs (
id TEXT PRIMARY KEY,
plugin_id TEXT NOT NULL,
activity_type TEXT NOT NULL, -- message, webhook, sync, error
source TEXT NOT NULL, -- 来源标识
details TEXT, -- JSON: 详细信息
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (plugin_id) REFERENCES plugins(id) ON DELETE CASCADE
);
-- 插件相关索引
CREATE INDEX IF NOT EXISTS idx_plugins_project ON plugins(project_id);
CREATE INDEX IF NOT EXISTS idx_plugins_type ON plugins(plugin_type);
CREATE INDEX IF NOT EXISTS idx_plugins_api_key ON plugins(api_key);
CREATE INDEX IF NOT EXISTS idx_bot_sessions_plugin ON bot_sessions(plugin_id);
CREATE INDEX IF NOT EXISTS idx_bot_sessions_project ON bot_sessions(project_id);
CREATE INDEX IF NOT EXISTS idx_webhook_endpoints_plugin ON webhook_endpoints(plugin_id);
CREATE INDEX IF NOT EXISTS idx_webdav_syncs_plugin ON webdav_syncs(plugin_id);
CREATE INDEX IF NOT EXISTS idx_plugin_logs_plugin ON plugin_activity_logs(plugin_id);
CREATE INDEX IF NOT EXISTS idx_plugin_logs_type ON plugin_activity_logs(activity_type);
CREATE INDEX IF NOT EXISTS idx_plugin_logs_created ON plugin_activity_logs(created_at);
-- ============================================
-- Phase 7 Task 3: 数据安全与合规
-- ============================================
-- 审计日志表
CREATE TABLE IF NOT EXISTS audit_logs (
id TEXT PRIMARY KEY,
action_type TEXT NOT NULL, -- create, read, update, delete, login, export, etc.
user_id TEXT,
user_ip TEXT,
user_agent TEXT,
resource_type TEXT, -- project, entity, transcript, api_key, etc.
resource_id TEXT,
action_details TEXT, -- JSON: 详细操作信息
before_value TEXT, -- 变更前的值
after_value TEXT, -- 变更后的值
success INTEGER DEFAULT 1, -- 0 = 失败, 1 = 成功
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 加密配置表
CREATE TABLE IF NOT EXISTS encryption_configs (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
is_enabled INTEGER DEFAULT 0,
encryption_type TEXT DEFAULT 'aes-256-gcm', -- aes-256-gcm, chacha20-poly1305
key_derivation TEXT DEFAULT 'pbkdf2', -- pbkdf2, argon2
master_key_hash TEXT, -- 主密钥哈希(用于验证)
salt TEXT, -- 密钥派生盐值
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 脱敏规则表
CREATE TABLE IF NOT EXISTS masking_rules (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
name TEXT NOT NULL,
rule_type TEXT NOT NULL, -- phone, email, id_card, bank_card, name, address, custom
pattern TEXT NOT NULL, -- 正则表达式
replacement TEXT NOT NULL, -- 替换模板
is_active INTEGER DEFAULT 1,
priority INTEGER DEFAULT 0,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 数据访问策略表
CREATE TABLE IF NOT EXISTS data_access_policies (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT,
allowed_users TEXT, -- JSON array: 允许访问的用户ID列表
allowed_roles TEXT, -- JSON array: 允许的角色列表
allowed_ips TEXT, -- JSON array: 允许的IP模式列表
time_restrictions TEXT, -- JSON: {"start_time": "09:00", "end_time": "18:00", "days_of_week": [0,1,2,3,4]}
max_access_count INTEGER, -- 最大访问次数限制
require_approval INTEGER DEFAULT 0, -- 是否需要审批
is_active INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 访问请求表(用于需要审批的访问)
CREATE TABLE IF NOT EXISTS access_requests (
id TEXT PRIMARY KEY,
policy_id TEXT NOT NULL,
user_id TEXT NOT NULL,
request_reason TEXT,
status TEXT DEFAULT 'pending', -- pending, approved, rejected, expired
approved_by TEXT,
approved_at TIMESTAMP,
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (policy_id) REFERENCES data_access_policies(id)
);
-- 数据安全相关索引
CREATE INDEX IF NOT EXISTS idx_audit_logs_user ON audit_logs(user_id);
CREATE INDEX IF NOT EXISTS idx_audit_logs_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX IF NOT EXISTS idx_audit_logs_action ON audit_logs(action_type);
CREATE INDEX IF NOT EXISTS idx_audit_logs_created ON audit_logs(created_at);
CREATE INDEX IF NOT EXISTS idx_encryption_project ON encryption_configs(project_id);
CREATE INDEX IF NOT EXISTS idx_masking_project ON masking_rules(project_id);
CREATE INDEX IF NOT EXISTS idx_access_policy_project ON data_access_policies(project_id);
CREATE INDEX IF NOT EXISTS idx_access_requests_policy ON access_requests(policy_id);
CREATE INDEX IF NOT EXISTS idx_access_requests_user ON access_requests(user_id);

View File

@@ -0,0 +1,104 @@
-- Phase 7: 多模态支持相关表
-- 视频表
CREATE TABLE IF NOT EXISTS videos (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
filename TEXT NOT NULL,
file_path TEXT,
duration REAL, -- 视频时长(秒)
width INTEGER, -- 视频宽度
height INTEGER, -- 视频高度
fps REAL, -- 帧率
audio_extracted INTEGER DEFAULT 0, -- 是否已提取音频
audio_path TEXT, -- 提取的音频文件路径
transcript_id TEXT, -- 关联的转录记录ID
status TEXT DEFAULT 'pending', -- pending, processing, completed, failed
error_message TEXT,
metadata TEXT, -- JSON: 其他元数据
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (transcript_id) REFERENCES transcripts(id)
);
-- 视频关键帧表
CREATE TABLE IF NOT EXISTS video_frames (
id TEXT PRIMARY KEY,
video_id TEXT NOT NULL,
frame_number INTEGER NOT NULL,
timestamp REAL NOT NULL, -- 帧时间戳(秒)
frame_path TEXT NOT NULL, -- 帧图片路径
ocr_text TEXT, -- OCR识别的文字
ocr_confidence REAL, -- OCR置信度
entities_detected TEXT, -- JSON: 检测到的实体
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (video_id) REFERENCES videos(id) ON DELETE CASCADE
);
-- 图片表
CREATE TABLE IF NOT EXISTS images (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
filename TEXT NOT NULL,
file_path TEXT,
image_type TEXT, -- whiteboard, ppt, handwritten, screenshot, other
width INTEGER,
height INTEGER,
ocr_text TEXT, -- OCR识别的文字
description TEXT, -- 图片描述LLM生成
entities_detected TEXT, -- JSON: 检测到的实体
relations_detected TEXT, -- JSON: 检测到的关系
transcript_id TEXT, -- 关联的转录记录ID可选
status TEXT DEFAULT 'pending', -- pending, processing, completed, failed
error_message TEXT,
metadata TEXT, -- JSON: 其他元数据
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (transcript_id) REFERENCES transcripts(id)
);
-- 多模态实体关联表
CREATE TABLE IF NOT EXISTS multimodal_entities (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
entity_id TEXT NOT NULL, -- 关联的实体ID
source_type TEXT NOT NULL, -- audio, video, image, document
source_id TEXT NOT NULL, -- 来源IDtranscript_id, video_id, image_id
mention_context TEXT, -- 提及上下文
confidence REAL DEFAULT 1.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (entity_id) REFERENCES entities(id),
UNIQUE(entity_id, source_type, source_id)
);
-- 多模态实体对齐表(跨模态实体关联)
CREATE TABLE IF NOT EXISTS multimodal_entity_links (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
source_entity_id TEXT NOT NULL, -- 源实体ID
target_entity_id TEXT NOT NULL, -- 目标实体ID
link_type TEXT NOT NULL, -- same_as, related_to, part_of
source_modality TEXT NOT NULL, -- audio, video, image, document
target_modality TEXT NOT NULL, -- audio, video, image, document
confidence REAL DEFAULT 1.0,
evidence TEXT, -- 关联证据
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id),
FOREIGN KEY (source_entity_id) REFERENCES entities(id),
FOREIGN KEY (target_entity_id) REFERENCES entities(id)
);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_videos_project ON videos(project_id);
CREATE INDEX IF NOT EXISTS idx_videos_status ON videos(status);
CREATE INDEX IF NOT EXISTS idx_video_frames_video ON video_frames(video_id);
CREATE INDEX IF NOT EXISTS idx_video_frames_timestamp ON video_frames(timestamp);
CREATE INDEX IF NOT EXISTS idx_images_project ON images(project_id);
CREATE INDEX IF NOT EXISTS idx_images_type ON images(image_type);
CREATE INDEX IF NOT EXISTS idx_images_status ON images(status);
CREATE INDEX IF NOT EXISTS idx_multimodal_entities_project ON multimodal_entities(project_id);
CREATE INDEX IF NOT EXISTS idx_multimodal_entities_entity ON multimodal_entities(entity_id);
CREATE INDEX IF NOT EXISTS idx_multimodal_entity_links_project ON multimodal_entity_links(project_id);

1232
backend/security_manager.py Normal file

File diff suppressed because it is too large Load Diff

157
backend/test_multimodal.py Normal file
View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
InsightFlow Multimodal Module Test Script
测试多模态支持模块
"""
import sys
import os
# 添加 backend 目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
print("=" * 60)
print("InsightFlow 多模态模块测试")
print("=" * 60)
# 测试导入
print("\n1. 测试模块导入...")
try:
from multimodal_processor import (
get_multimodal_processor, MultimodalProcessor,
VideoProcessingResult, VideoFrame
)
print(" ✓ multimodal_processor 导入成功")
except ImportError as e:
print(f" ✗ multimodal_processor 导入失败: {e}")
try:
from image_processor import (
get_image_processor, ImageProcessor,
ImageProcessingResult, ImageEntity, ImageRelation
)
print(" ✓ image_processor 导入成功")
except ImportError as e:
print(f" ✗ image_processor 导入失败: {e}")
try:
from multimodal_entity_linker import (
get_multimodal_entity_linker, MultimodalEntityLinker,
MultimodalEntity, EntityLink, AlignmentResult, FusionResult
)
print(" ✓ multimodal_entity_linker 导入成功")
except ImportError as e:
print(f" ✗ multimodal_entity_linker 导入失败: {e}")
# 测试初始化
print("\n2. 测试模块初始化...")
try:
processor = get_multimodal_processor()
print(f" ✓ MultimodalProcessor 初始化成功")
print(f" - 临时目录: {processor.temp_dir}")
print(f" - 帧提取间隔: {processor.frame_interval}")
except Exception as e:
print(f" ✗ MultimodalProcessor 初始化失败: {e}")
try:
img_processor = get_image_processor()
print(f" ✓ ImageProcessor 初始化成功")
print(f" - 临时目录: {img_processor.temp_dir}")
except Exception as e:
print(f" ✗ ImageProcessor 初始化失败: {e}")
try:
linker = get_multimodal_entity_linker()
print(f" ✓ MultimodalEntityLinker 初始化成功")
print(f" - 相似度阈值: {linker.similarity_threshold}")
except Exception as e:
print(f" ✗ MultimodalEntityLinker 初始化失败: {e}")
# 测试实体关联功能
print("\n3. 测试实体关联功能...")
try:
linker = get_multimodal_entity_linker()
# 测试字符串相似度
sim = linker.calculate_string_similarity("Project Alpha", "Project Alpha")
assert sim == 1.0, "完全匹配应该返回1.0"
print(f" ✓ 字符串相似度计算正常 (完全匹配: {sim})")
sim = linker.calculate_string_similarity("K8s", "Kubernetes")
print(f" ✓ 字符串相似度计算正常 (不同字符串: {sim:.2f})")
# 测试实体相似度
entity1 = {"name": "Project Alpha", "type": "PROJECT", "definition": "核心项目"}
entity2 = {"name": "Project Alpha", "type": "PROJECT", "definition": "主要项目"}
sim, match_type = linker.calculate_entity_similarity(entity1, entity2)
print(f" ✓ 实体相似度计算正常 (相似度: {sim:.2f}, 类型: {match_type})")
except Exception as e:
print(f" ✗ 实体关联功能测试失败: {e}")
# 测试图片处理功能(不需要实际图片)
print("\n4. 测试图片处理器功能...")
try:
processor = get_image_processor()
# 测试图片类型检测(使用模拟数据)
print(f" ✓ 支持的图片类型: {list(processor.IMAGE_TYPES.keys())}")
print(f" ✓ 图片类型描述: {processor.IMAGE_TYPES}")
except Exception as e:
print(f" ✗ 图片处理器功能测试失败: {e}")
# 测试视频处理配置
print("\n5. 测试视频处理器配置...")
try:
processor = get_multimodal_processor()
print(f" ✓ 视频目录: {processor.video_dir}")
print(f" ✓ 帧目录: {processor.frames_dir}")
print(f" ✓ 音频目录: {processor.audio_dir}")
# 检查目录是否存在
for dir_name, dir_path in [
("视频", processor.video_dir),
("", processor.frames_dir),
("音频", processor.audio_dir)
]:
if os.path.exists(dir_path):
print(f"{dir_name}目录存在: {dir_path}")
else:
print(f"{dir_name}目录不存在: {dir_path}")
except Exception as e:
print(f" ✗ 视频处理器配置测试失败: {e}")
# 测试数据库方法(如果数据库可用)
print("\n6. 测试数据库多模态方法...")
try:
from db_manager import get_db_manager
db = get_db_manager()
# 检查多模态表是否存在
conn = db.get_conn()
tables = ['videos', 'video_frames', 'images', 'multimodal_mentions', 'multimodal_entity_links']
for table in tables:
try:
conn.execute(f"SELECT 1 FROM {table} LIMIT 1")
print(f" ✓ 表 '{table}' 存在")
except Exception as e:
print(f" ✗ 表 '{table}' 不存在或无法访问: {e}")
conn.close()
except Exception as e:
print(f" ✗ 数据库多模态方法测试失败: {e}")
print("\n" + "=" * 60)
print("测试完成")
print("=" * 60)

1488
backend/workflow_manager.py Normal file

File diff suppressed because it is too large Load Diff