Files
insightflow/backend/plugin_manager.py
AutoFix Bot 2a0ed6af4d fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题 (816+ 处)
- 添加缺失的导入 (json, re)
- 统一SQL查询格式
- 修复赋值语句空格问题

修复文件:
- db_manager.py (96处)
- search_manager.py (77处)
- ops_manager.py (66处)
- developer_ecosystem_manager.py (68处)
- growth_manager.py (60处)
- enterprise_manager.py (61处)
- tenant_manager.py (57处)
- plugin_manager.py (48处)
- subscription_manager.py (46处)
- security_manager.py (29处)
- workflow_manager.py (32处)
- localization_manager.py (31处)
- api_key_manager.py (20处)
- ai_manager.py (23处)
- performance_manager.py (24处)
- neo4j_manager.py (25处)
- collaboration_manager.py (33处)
- test_phase8_task8.py (16处)
- test_phase8_task6.py (4处)
- knowledge_reasoner.py (添加import json)
- llm_client.py (添加import json)
2026-03-03 00:11:51 +08:00

1430 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Plugin Manager - Phase 7 Task 7
插件与集成系统Chrome插件、飞书/钉钉机器人、Zapier/Make集成、WebDAV同步
"""
import base64
import hashlib
import hmac
import json
import os
import sqlite3
import time
import urllib.parse
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
import httpx
# Constants
UUID_LENGTH = 8 # UUID 截断长度
# WebDAV 支持
try:
import webdav4.client as webdav_client
WEBDAV_AVAILABLE = True
except ImportError:
WEBDAV_AVAILABLE = False
class PluginType(Enum):
"""插件类型"""
CHROME_EXTENSION = "chrome_extension"
FEISHU_BOT = "feishu_bot"
DINGTALK_BOT = "dingtalk_bot"
ZAPIER = "zapier"
MAKE = "make"
WEBDAV = "webdav"
CUSTOM = "custom"
class PluginStatus(Enum):
"""插件状态"""
ACTIVE = "active"
INACTIVE = "inactive"
ERROR = "error"
PENDING = "pending"
@dataclass
class Plugin:
"""插件配置"""
id: str
name: str
plugin_type: str
project_id: str
status: str = "active"
config: dict = field(default_factory=dict)
created_at: str = ""
updated_at: str = ""
last_used_at: str | None = None
use_count: int = 0
@dataclass
class PluginConfig:
"""插件详细配置"""
id: str
plugin_id: str
config_key: str
config_value: str
is_encrypted: bool = False
created_at: str = ""
updated_at: str = ""
@dataclass
class BotSession:
"""机器人会话"""
id: str
bot_type: str # feishu, dingtalk
session_id: str # 群ID或会话ID
session_name: str
project_id: str | None = None
webhook_url: str = ""
secret: str = ""
is_active: bool = True
created_at: str = ""
updated_at: str = ""
last_message_at: str | None = None
message_count: int = 0
@dataclass
class WebhookEndpoint:
"""Webhook 端点配置Zapier/Make集成"""
id: str
name: str
endpoint_type: str # zapier, make, custom
endpoint_url: str
project_id: str | None = None
auth_type: str = "none" # none, api_key, oauth, custom
auth_config: dict = field(default_factory=dict)
trigger_events: list[str] = field(default_factory=list)
is_active: bool = True
created_at: str = ""
updated_at: str = ""
last_triggered_at: str | None = None
trigger_count: int = 0
@dataclass
class WebDAVSync:
"""WebDAV 同步配置"""
id: str
name: str
project_id: str
server_url: str
username: str
password: str = "" # 加密存储
remote_path: str = "/insightflow"
sync_mode: str = "bidirectional" # bidirectional, upload_only, download_only
sync_interval: int = 3600 # 秒
last_sync_at: str | None = None
last_sync_status: str = "pending" # pending, success, failed
last_sync_error: str = ""
is_active: bool = True
created_at: str = ""
updated_at: str = ""
sync_count: int = 0
@dataclass
class ChromeExtensionToken:
"""Chrome 扩展令牌"""
id: str
token: str
user_id: str | None = None
project_id: str | None = None
name: str = ""
permissions: list[str] = field(default_factory=lambda: ["read", "write"])
expires_at: str | None = None
created_at: str = ""
last_used_at: str | None = None
use_count: int = 0
is_revoked: bool = False
class PluginManager:
"""插件管理主类"""
def __init__(self, db_manager=None) -> None:
self.db = db_manager
self._handlers = {}
self._register_default_handlers()
def _register_default_handlers(self) -> None:
"""注册默认处理器"""
self._handlers[PluginType.CHROME_EXTENSION] = ChromeExtensionHandler(self)
self._handlers[PluginType.FEISHU_BOT] = BotHandler(self, "feishu")
self._handlers[PluginType.DINGTALK_BOT] = BotHandler(self, "dingtalk")
self._handlers[PluginType.ZAPIER] = WebhookIntegration(self, "zapier")
self._handlers[PluginType.MAKE] = WebhookIntegration(self, "make")
self._handlers[PluginType.WEBDAV] = WebDAVSyncManager(self)
def get_handler(self, plugin_type: PluginType) -> Any | None:
"""获取插件处理器"""
return self._handlers.get(plugin_type)
# ==================== Plugin CRUD ====================
def create_plugin(self, plugin: Plugin) -> Plugin:
"""创建插件"""
conn = self.db.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""INSERT INTO plugins
(id, name, plugin_type, project_id, status, config, created_at, updated_at, use_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
plugin.id,
plugin.name,
plugin.plugin_type,
plugin.project_id,
plugin.status,
json.dumps(plugin.config),
now,
now,
0,
),
)
conn.commit()
conn.close()
plugin.created_at = now
plugin.updated_at = now
return plugin
def get_plugin(self, plugin_id: str) -> Plugin | None:
"""获取插件"""
conn = self.db.get_conn()
row = conn.execute("SELECT * FROM plugins WHERE id = ?", (plugin_id,)).fetchone()
conn.close()
if row:
return self._row_to_plugin(row)
return None
def list_plugins(
self, project_id: str = None, plugin_type: str = None, status: str = None
) -> list[Plugin]:
"""列出插件"""
conn = self.db.get_conn()
conditions = []
params = []
if project_id:
conditions.append("project_id = ?")
params.append(project_id)
if plugin_type:
conditions.append("plugin_type = ?")
params.append(plugin_type)
if status:
conditions.append("status = ?")
params.append(status)
where_clause = " AND ".join(conditions) if conditions else "1 = 1"
rows = conn.execute(
f"SELECT * FROM plugins WHERE {where_clause} ORDER BY created_at DESC", params
).fetchall()
conn.close()
return [self._row_to_plugin(row) for row in rows]
def update_plugin(self, plugin_id: str, **kwargs) -> Plugin | None:
"""更新插件"""
conn = self.db.get_conn()
allowed_fields = ["name", "status", "config"]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
if f == "config":
values.append(json.dumps(kwargs[f]))
else:
values.append(kwargs[f])
if not updates:
conn.close()
return self.get_plugin(plugin_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(plugin_id)
query = f"UPDATE plugins SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_plugin(plugin_id)
def delete_plugin(self, plugin_id: str) -> bool:
"""删除插件"""
conn = self.db.get_conn()
# 删除关联的配置
conn.execute("DELETE FROM plugin_configs WHERE plugin_id = ?", (plugin_id,))
# 删除插件
cursor = conn.execute("DELETE FROM plugins WHERE id = ?", (plugin_id,))
conn.commit()
conn.close()
return cursor.rowcount > 0
def _row_to_plugin(self, row: sqlite3.Row) -> Plugin:
"""将数据库行转换为 Plugin 对象"""
return Plugin(
id=row["id"],
name=row["name"],
plugin_type=row["plugin_type"],
project_id=row["project_id"],
status=row["status"],
config=json.loads(row["config"]) if row["config"] else {},
created_at=row["created_at"],
updated_at=row["updated_at"],
last_used_at=row["last_used_at"],
use_count=row["use_count"],
)
# ==================== Plugin Config ====================
def set_plugin_config(
self, plugin_id: str, key: str, value: str, is_encrypted: bool = False
) -> PluginConfig:
"""设置插件配置"""
conn = self.db.get_conn()
now = datetime.now().isoformat()
# 检查是否已存在
existing = conn.execute(
"SELECT id FROM plugin_configs WHERE plugin_id = ? AND config_key = ?",
(plugin_id, key),
).fetchone()
if existing:
conn.execute(
"""UPDATE plugin_configs
SET config_value = ?, is_encrypted = ?, updated_at = ?
WHERE id = ?""",
(value, is_encrypted, now, existing["id"]),
)
config_id = existing["id"]
else:
config_id = str(uuid.uuid4())[:UUID_LENGTH]
conn.execute(
"""INSERT INTO plugin_configs
(id, plugin_id, config_key, config_value, is_encrypted, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(config_id, plugin_id, key, value, is_encrypted, now, now),
)
conn.commit()
conn.close()
return PluginConfig(
id=config_id,
plugin_id=plugin_id,
config_key=key,
config_value=value,
is_encrypted=is_encrypted,
created_at=now,
updated_at=now,
)
def get_plugin_config(self, plugin_id: str, key: str) -> str | None:
"""获取插件配置"""
conn = self.db.get_conn()
row = conn.execute(
"SELECT config_value FROM plugin_configs WHERE plugin_id = ? AND config_key = ?",
(plugin_id, key),
).fetchone()
conn.close()
return row["config_value"] if row else None
def get_all_plugin_configs(self, plugin_id: str) -> dict[str, str]:
"""获取插件所有配置"""
conn = self.db.get_conn()
rows = conn.execute(
"SELECT config_key, config_value FROM plugin_configs WHERE plugin_id = ?", (plugin_id,)
).fetchall()
conn.close()
return {row["config_key"]: row["config_value"] for row in rows}
def delete_plugin_config(self, plugin_id: str, key: str) -> bool:
"""删除插件配置"""
conn = self.db.get_conn()
cursor = conn.execute(
"DELETE FROM plugin_configs WHERE plugin_id = ? AND config_key = ?", (plugin_id, key)
)
conn.commit()
conn.close()
return cursor.rowcount > 0
def record_plugin_usage(self, plugin_id: str) -> None:
"""记录插件使用"""
conn = self.db.get_conn()
now = datetime.now().isoformat()
conn.execute(
"""UPDATE plugins
SET use_count = use_count + 1, last_used_at = ?
WHERE id = ?""",
(now, plugin_id),
)
conn.commit()
conn.close()
class ChromeExtensionHandler:
"""Chrome 扩展处理器"""
def __init__(self, plugin_manager: PluginManager) -> None:
self.pm = plugin_manager
def create_token(
self,
name: str,
user_id: str = None,
project_id: str = None,
permissions: list[str] = None,
expires_days: int = None,
) -> ChromeExtensionToken:
"""创建 Chrome 扩展令牌"""
token_id = str(uuid.uuid4())[:UUID_LENGTH]
# 生成随机令牌
raw_token = (
f"if_ext_{base64.urlsafe_b64encode(os.urandom(32)).decode('utf-8').rstrip(' = ')}"
)
# 哈希存储
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
now = datetime.now().isoformat()
expires_at = None
if expires_days:
from datetime import timedelta
expires_at = (datetime.now() + timedelta(days=expires_days)).isoformat()
conn = self.pm.db.get_conn()
conn.execute(
"""INSERT INTO chrome_extension_tokens
(id, token_hash, user_id, project_id, name, permissions, expires_at,
created_at, is_revoked, use_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
token_id,
token_hash,
user_id,
project_id,
name,
json.dumps(permissions or ["read"]),
expires_at,
now,
False,
0,
),
)
conn.commit()
conn.close()
return ChromeExtensionToken(
id=token_id,
token=raw_token, # 仅返回一次
user_id=user_id,
project_id=project_id,
name=name,
permissions=permissions or ["read"],
expires_at=expires_at,
created_at=now,
)
def validate_token(self, token: str) -> ChromeExtensionToken | None:
"""验证 Chrome 扩展令牌"""
token_hash = hashlib.sha256(token.encode()).hexdigest()
conn = self.pm.db.get_conn()
row = conn.execute(
"""SELECT * FROM chrome_extension_tokens
WHERE token_hash = ? AND is_revoked = 0""",
(token_hash,),
).fetchone()
conn.close()
if not row:
return None
# 检查是否过期
if row["expires_at"] and datetime.now().isoformat() > row["expires_at"]:
return None
# 更新使用记录
now = datetime.now().isoformat()
conn = self.pm.db.get_conn()
conn.execute(
"""UPDATE chrome_extension_tokens
SET use_count = use_count + 1, last_used_at = ?
WHERE id = ?""",
(now, row["id"]),
)
conn.commit()
conn.close()
return ChromeExtensionToken(
id=row["id"],
token="", # 不返回实际令牌
user_id=row["user_id"],
project_id=row["project_id"],
name=row["name"],
permissions=json.loads(row["permissions"]),
expires_at=row["expires_at"],
created_at=row["created_at"],
last_used_at=now,
use_count=row["use_count"] + 1,
)
def revoke_token(self, token_id: str) -> bool:
"""撤销令牌"""
conn = self.pm.db.get_conn()
cursor = conn.execute(
"UPDATE chrome_extension_tokens SET is_revoked = 1 WHERE id = ?", (token_id,)
)
conn.commit()
conn.close()
return cursor.rowcount > 0
def list_tokens(
self, user_id: str = None, project_id: str = None
) -> list[ChromeExtensionToken]:
"""列出令牌"""
conn = self.pm.db.get_conn()
conditions = ["is_revoked = 0"]
params = []
if user_id:
conditions.append("user_id = ?")
params.append(user_id)
if project_id:
conditions.append("project_id = ?")
params.append(project_id)
where_clause = " AND ".join(conditions)
rows = conn.execute(
f"SELECT * FROM chrome_extension_tokens WHERE {where_clause} ORDER BY created_at DESC",
params,
).fetchall()
conn.close()
tokens = []
for row in rows:
tokens.append(
ChromeExtensionToken(
id=row["id"],
token="", # 不返回实际令牌
user_id=row["user_id"],
project_id=row["project_id"],
name=row["name"],
permissions=json.loads(row["permissions"]),
expires_at=row["expires_at"],
created_at=row["created_at"],
last_used_at=row["last_used_at"],
use_count=row["use_count"],
is_revoked=bool(row["is_revoked"]),
)
)
return tokens
async def import_webpage(
self,
token: ChromeExtensionToken,
url: str,
title: str,
content: str,
html_content: str = None,
) -> dict:
"""导入网页内容"""
if not token.project_id:
return {"success": False, "error": "Token not associated with any project"}
if "write" not in token.permissions:
return {"success": False, "error": "Insufficient permissions"}
# 创建转录记录(将网页作为文档处理)
transcript_id = str(uuid.uuid4())[:UUID_LENGTH]
now = datetime.now().isoformat()
# 构建完整文本
full_text = f"# {title}\n\nURL: {url}\n\n{content}"
conn = self.pm.db.get_conn()
conn.execute(
"""INSERT INTO transcripts
(id, project_id, filename, full_text, type, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(transcript_id, token.project_id, f"web_{title[:50]}.md", full_text, "webpage", now),
)
conn.commit()
conn.close()
return {
"success": True,
"transcript_id": transcript_id,
"project_id": token.project_id,
"url": url,
"title": title,
"content_length": len(content),
}
class BotHandler:
"""飞书/钉钉机器人处理器"""
def __init__(self, plugin_manager: PluginManager, bot_type: str) -> None:
self.pm = plugin_manager
self.bot_type = bot_type
def create_session(
self,
session_id: str,
session_name: str,
project_id: str = None,
webhook_url: str = "",
secret: str = "",
) -> BotSession:
"""创建机器人会话"""
bot_id = str(uuid.uuid4())[:UUID_LENGTH]
now = datetime.now().isoformat()
conn = self.pm.db.get_conn()
conn.execute(
"""INSERT INTO bot_sessions
(id, bot_type, session_id, session_name, project_id, webhook_url, secret,
is_active, created_at, updated_at, message_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
bot_id,
self.bot_type,
session_id,
session_name,
project_id,
webhook_url,
secret,
True,
now,
now,
0,
),
)
conn.commit()
conn.close()
return BotSession(
id=bot_id,
bot_type=self.bot_type,
session_id=session_id,
session_name=session_name,
project_id=project_id,
webhook_url=webhook_url,
secret=secret,
is_active=True,
created_at=now,
updated_at=now,
)
def get_session(self, session_id: str) -> BotSession | None:
"""获取会话"""
conn = self.pm.db.get_conn()
row = conn.execute(
"""SELECT * FROM bot_sessions
WHERE session_id = ? AND bot_type = ?""",
(session_id, self.bot_type),
).fetchone()
conn.close()
if row:
return self._row_to_session(row)
return None
def list_sessions(self, project_id: str = None) -> list[BotSession]:
"""列出会话"""
conn = self.pm.db.get_conn()
if project_id:
rows = conn.execute(
"""SELECT * FROM bot_sessions
WHERE bot_type = ? AND project_id = ? ORDER BY created_at DESC""",
(self.bot_type, project_id),
).fetchall()
else:
rows = conn.execute(
"""SELECT * FROM bot_sessions
WHERE bot_type = ? ORDER BY created_at DESC""",
(self.bot_type,),
).fetchall()
conn.close()
return [self._row_to_session(row) for row in rows]
def update_session(self, session_id: str, **kwargs) -> BotSession | None:
"""更新会话"""
conn = self.pm.db.get_conn()
allowed_fields = ["session_name", "project_id", "webhook_url", "secret", "is_active"]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
values.append(kwargs[f])
if not updates:
conn.close()
return self.get_session(session_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(session_id)
values.append(self.bot_type)
query = (
f"UPDATE bot_sessions SET {', '.join(updates)} WHERE session_id = ? AND bot_type = ?"
)
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_session(session_id)
def delete_session(self, session_id: str) -> bool:
"""删除会话"""
conn = self.pm.db.get_conn()
cursor = conn.execute(
"DELETE FROM bot_sessions WHERE session_id = ? AND bot_type = ?",
(session_id, self.bot_type),
)
conn.commit()
conn.close()
return cursor.rowcount > 0
def _row_to_session(self, row: sqlite3.Row) -> BotSession:
"""将数据库行转换为 BotSession 对象"""
return BotSession(
id=row["id"],
bot_type=row["bot_type"],
session_id=row["session_id"],
session_name=row["session_name"],
project_id=row["project_id"],
webhook_url=row["webhook_url"],
secret=row["secret"],
is_active=bool(row["is_active"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
last_message_at=row["last_message_at"],
message_count=row["message_count"],
)
async def handle_message(self, session: BotSession, message: dict) -> dict:
"""处理收到的消息"""
now = datetime.now().isoformat()
# 更新消息统计
conn = self.pm.db.get_conn()
conn.execute(
"""UPDATE bot_sessions
SET message_count = message_count + 1, last_message_at = ?
WHERE id = ?""",
(now, session.id),
)
conn.commit()
conn.close()
# 处理消息
msg_type = message.get("msg_type", "text")
content = message.get("content", {})
if msg_type == "text":
text = content.get("text", "")
return await self._handle_text_message(session, text, message)
elif msg_type == "audio":
# 处理音频消息
return await self._handle_audio_message(session, message)
elif msg_type == "file":
# 处理文件消息
return await self._handle_file_message(session, message)
return {"success": False, "error": "Unsupported message type"}
async def _handle_text_message(self, session: BotSession, text: str, raw_message: dict) -> dict:
"""处理文本消息"""
# 简单命令处理
if text.startswith("/help"):
return {
"success": True,
"response": """🤖 InsightFlow 机器人命令:
/help - 显示帮助
/status - 查看项目状态
/analyze <URL> - 分析网页内容
/search <关键词> - 搜索知识库""",
}
if text.startswith("/status"):
if not session.project_id:
return {"success": True, "response": "⚠️ 当前会话未绑定项目"}
# 获取项目状态
summary = self.pm.db.get_project_summary(session.project_id)
stats = summary.get("statistics", {})
return {
"success": True,
"response": f"""📊 项目状态:
实体数量: {stats.get("entity_count", 0)}
关系数量: {stats.get("relation_count", 0)}
转录数量: {stats.get("transcript_count", 0)}""",
}
# 默认回复
return {
"success": True,
"response": f"收到消息:{text[:100]}...\n\n使用 /help 查看可用命令",
}
async def _handle_audio_message(self, session: BotSession, message: dict) -> dict:
"""处理音频消息"""
if not session.project_id:
return {"success": False, "error": "Session not bound to any project"}
# 下载音频文件
audio_url = message.get("content", {}).get("download_url")
if not audio_url:
return {"success": False, "error": "No audio URL provided"}
try:
async with httpx.AsyncClient() as client:
response = await client.get(audio_url)
audio_data = response.content
# 保存音频文件
filename = f"bot_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3"
# 这里应该调用 ASR 服务进行转录
# 简化处理,返回提示
return {
"success": True,
"response": "🎵 收到音频文件,正在处理中...\n分析完成后会通知您。",
"audio_size": len(audio_data),
"filename": filename,
}
except Exception as e:
return {"success": False, "error": f"Failed to process audio: {str(e)}"}
async def _handle_file_message(self, session: BotSession, message: dict) -> dict:
"""处理文件消息"""
return {"success": True, "response": "📎 收到文件,正在处理中..."}
async def send_message(self, session: BotSession, message: str, msg_type: str = "text") -> bool:
"""发送消息到群聊"""
if not session.webhook_url:
return False
try:
if self.bot_type == "feishu":
return await self._send_feishu_message(session, message, msg_type)
elif self.bot_type == "dingtalk":
return await self._send_dingtalk_message(session, message, msg_type)
return False
except Exception as e:
print(f"Failed to send {self.bot_type} message: {e}")
return False
async def _send_feishu_message(self, session: BotSession, message: str, msg_type: str) -> bool:
"""发送飞书消息"""
timestamp = str(int(time.time()))
# 生成签名
if session.secret:
string_to_sign = f"{timestamp}\n{session.secret}"
hmac_code = hmac.new(
session.secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
digestmod=hashlib.sha256,
).digest()
sign = base64.b64encode(hmac_code).decode("utf-8")
else:
sign = ""
payload = {
"timestamp": timestamp,
"sign": sign,
"msg_type": "text",
"content": {"text": message},
}
async with httpx.AsyncClient() as client:
response = await client.post(
session.webhook_url, json=payload, headers={"Content-Type": "application/json"}
)
return response.status_code == 200
async def _send_dingtalk_message(
self, session: BotSession, message: str, msg_type: str
) -> bool:
"""发送钉钉消息"""
timestamp = str(round(time.time() * 1000))
# 生成签名
if session.secret:
string_to_sign = f"{timestamp}\n{session.secret}"
hmac_code = hmac.new(
session.secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
digestmod=hashlib.sha256,
).digest()
sign = base64.b64encode(hmac_code).decode("utf-8")
sign = urllib.parse.quote(sign)
else:
sign = ""
payload = {"msgtype": "text", "text": {"content": message}}
url = session.webhook_url
if sign:
url = f"{url}&timestamp = {timestamp}&sign = {sign}"
async with httpx.AsyncClient() as client:
response = await client.post(
url, json=payload, headers={"Content-Type": "application/json"}
)
return response.status_code == 200
class WebhookIntegration:
"""Zapier/Make Webhook 集成"""
def __init__(self, plugin_manager: PluginManager, endpoint_type: str) -> None:
self.pm = plugin_manager
self.endpoint_type = endpoint_type
def create_endpoint(
self,
name: str,
endpoint_url: str,
project_id: str = None,
auth_type: str = "none",
auth_config: dict = None,
trigger_events: list[str] = None,
) -> WebhookEndpoint:
"""创建 Webhook 端点"""
endpoint_id = str(uuid.uuid4())[:UUID_LENGTH]
now = datetime.now().isoformat()
conn = self.pm.db.get_conn()
conn.execute(
"""INSERT INTO webhook_endpoints
(id, name, endpoint_type, endpoint_url, project_id, auth_type, auth_config,
trigger_events, is_active, created_at, updated_at, trigger_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
endpoint_id,
name,
self.endpoint_type,
endpoint_url,
project_id,
auth_type,
json.dumps(auth_config or {}),
json.dumps(trigger_events or []),
True,
now,
now,
0,
),
)
conn.commit()
conn.close()
return WebhookEndpoint(
id=endpoint_id,
name=name,
endpoint_type=self.endpoint_type,
endpoint_url=endpoint_url,
project_id=project_id,
auth_type=auth_type,
auth_config=auth_config or {},
trigger_events=trigger_events or [],
is_active=True,
created_at=now,
updated_at=now,
)
def get_endpoint(self, endpoint_id: str) -> WebhookEndpoint | None:
"""获取端点"""
conn = self.pm.db.get_conn()
row = conn.execute(
"SELECT * FROM webhook_endpoints WHERE id = ? AND endpoint_type = ?",
(endpoint_id, self.endpoint_type),
).fetchone()
conn.close()
if row:
return self._row_to_endpoint(row)
return None
def list_endpoints(self, project_id: str = None) -> list[WebhookEndpoint]:
"""列出端点"""
conn = self.pm.db.get_conn()
if project_id:
rows = conn.execute(
"""SELECT * FROM webhook_endpoints
WHERE endpoint_type = ? AND project_id = ? ORDER BY created_at DESC""",
(self.endpoint_type, project_id),
).fetchall()
else:
rows = conn.execute(
"""SELECT * FROM webhook_endpoints
WHERE endpoint_type = ? ORDER BY created_at DESC""",
(self.endpoint_type,),
).fetchall()
conn.close()
return [self._row_to_endpoint(row) for row in rows]
def update_endpoint(self, endpoint_id: str, **kwargs) -> WebhookEndpoint | None:
"""更新端点"""
conn = self.pm.db.get_conn()
allowed_fields = [
"name",
"endpoint_url",
"project_id",
"auth_type",
"auth_config",
"trigger_events",
"is_active",
]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
if f in ["auth_config", "trigger_events"]:
values.append(json.dumps(kwargs[f]))
else:
values.append(kwargs[f])
if not updates:
conn.close()
return self.get_endpoint(endpoint_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(endpoint_id)
query = f"UPDATE webhook_endpoints SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_endpoint(endpoint_id)
def delete_endpoint(self, endpoint_id: str) -> bool:
"""删除端点"""
conn = self.pm.db.get_conn()
cursor = conn.execute("DELETE FROM webhook_endpoints WHERE id = ?", (endpoint_id,))
conn.commit()
conn.close()
return cursor.rowcount > 0
def _row_to_endpoint(self, row: sqlite3.Row) -> WebhookEndpoint:
"""将数据库行转换为 WebhookEndpoint 对象"""
return WebhookEndpoint(
id=row["id"],
name=row["name"],
endpoint_type=row["endpoint_type"],
endpoint_url=row["endpoint_url"],
project_id=row["project_id"],
auth_type=row["auth_type"],
auth_config=json.loads(row["auth_config"]) if row["auth_config"] else {},
trigger_events=json.loads(row["trigger_events"]) if row["trigger_events"] else [],
is_active=bool(row["is_active"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
last_triggered_at=row["last_triggered_at"],
trigger_count=row["trigger_count"],
)
async def trigger(self, endpoint: WebhookEndpoint, event_type: str, data: dict) -> bool:
"""触发 Webhook"""
if not endpoint.is_active:
return False
if event_type not in endpoint.trigger_events:
return False
try:
headers = {"Content-Type": "application/json"}
# 添加认证头
if endpoint.auth_type == "api_key":
api_key = endpoint.auth_config.get("api_key", "")
header_name = endpoint.auth_config.get("header_name", "X-API-Key")
headers[header_name] = api_key
elif endpoint.auth_type == "bearer":
token = endpoint.auth_config.get("token", "")
headers["Authorization"] = f"Bearer {token}"
payload = {"event": event_type, "timestamp": datetime.now().isoformat(), "data": data}
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint.endpoint_url, json=payload, headers=headers, timeout=30.0
)
success = response.status_code in [200, 201, 202]
# 更新触发统计
now = datetime.now().isoformat()
conn = self.pm.db.get_conn()
conn.execute(
"""UPDATE webhook_endpoints
SET trigger_count = trigger_count + 1, last_triggered_at = ?
WHERE id = ?""",
(now, endpoint.id),
)
conn.commit()
conn.close()
return success
except Exception as e:
print(f"Failed to trigger webhook: {e}")
return False
async def test_endpoint(self, endpoint: WebhookEndpoint) -> dict:
"""测试端点"""
test_data = {
"message": "This is a test event from InsightFlow",
"test": True,
"timestamp": datetime.now().isoformat(),
}
success = await self.trigger(endpoint, "test", test_data)
return {
"success": success,
"endpoint_id": endpoint.id,
"endpoint_url": endpoint.endpoint_url,
"message": "Test event sent successfully" if success else "Failed to send test event",
}
class WebDAVSyncManager:
"""WebDAV 同步管理"""
def __init__(self, plugin_manager: PluginManager) -> None:
self.pm = plugin_manager
def create_sync(
self,
name: str,
project_id: str,
server_url: str,
username: str,
password: str,
remote_path: str = "/insightflow",
sync_mode: str = "bidirectional",
sync_interval: int = 3600,
) -> WebDAVSync:
"""创建 WebDAV 同步配置"""
sync_id = str(uuid.uuid4())[:UUID_LENGTH]
now = datetime.now().isoformat()
conn = self.pm.db.get_conn()
conn.execute(
"""INSERT INTO webdav_syncs
(id, name, project_id, server_url, username, password, remote_path,
sync_mode, sync_interval, last_sync_status, is_active, created_at, updated_at, sync_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
sync_id,
name,
project_id,
server_url,
username,
password,
remote_path,
sync_mode,
sync_interval,
"pending",
True,
now,
now,
0,
),
)
conn.commit()
conn.close()
return WebDAVSync(
id=sync_id,
name=name,
project_id=project_id,
server_url=server_url,
username=username,
password=password,
remote_path=remote_path,
sync_mode=sync_mode,
sync_interval=sync_interval,
last_sync_status="pending",
is_active=True,
created_at=now,
updated_at=now,
)
def get_sync(self, sync_id: str) -> WebDAVSync | None:
"""获取同步配置"""
conn = self.pm.db.get_conn()
row = conn.execute("SELECT * FROM webdav_syncs WHERE id = ?", (sync_id,)).fetchone()
conn.close()
if row:
return self._row_to_sync(row)
return None
def list_syncs(self, project_id: str = None) -> list[WebDAVSync]:
"""列出同步配置"""
conn = self.pm.db.get_conn()
if project_id:
rows = conn.execute(
"SELECT * FROM webdav_syncs WHERE project_id = ? ORDER BY created_at DESC",
(project_id,),
).fetchall()
else:
rows = conn.execute("SELECT * FROM webdav_syncs ORDER BY created_at DESC").fetchall()
conn.close()
return [self._row_to_sync(row) for row in rows]
def update_sync(self, sync_id: str, **kwargs) -> WebDAVSync | None:
"""更新同步配置"""
conn = self.pm.db.get_conn()
allowed_fields = [
"name",
"server_url",
"username",
"password",
"remote_path",
"sync_mode",
"sync_interval",
"is_active",
]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
values.append(kwargs[f])
if not updates:
conn.close()
return self.get_sync(sync_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(sync_id)
query = f"UPDATE webdav_syncs SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
conn.close()
return self.get_sync(sync_id)
def delete_sync(self, sync_id: str) -> bool:
"""删除同步配置"""
conn = self.pm.db.get_conn()
cursor = conn.execute("DELETE FROM webdav_syncs WHERE id = ?", (sync_id,))
conn.commit()
conn.close()
return cursor.rowcount > 0
def _row_to_sync(self, row: sqlite3.Row) -> WebDAVSync:
"""将数据库行转换为 WebDAVSync 对象"""
return WebDAVSync(
id=row["id"],
name=row["name"],
project_id=row["project_id"],
server_url=row["server_url"],
username=row["username"],
password=row["password"],
remote_path=row["remote_path"],
sync_mode=row["sync_mode"],
sync_interval=row["sync_interval"],
last_sync_at=row["last_sync_at"],
last_sync_status=row["last_sync_status"],
last_sync_error=row["last_sync_error"] or "",
is_active=bool(row["is_active"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
sync_count=row["sync_count"],
)
async def test_connection(self, sync: WebDAVSync) -> dict:
"""测试 WebDAV 连接"""
if not WEBDAV_AVAILABLE:
return {"success": False, "error": "WebDAV library not available"}
try:
client = webdav_client.Client(sync.server_url, auth=(sync.username, sync.password))
# 尝试列出根目录
client.list("/")
return {"success": True, "message": "Connection successful"}
except Exception as e:
return {"success": False, "error": str(e)}
async def sync_project(self, sync: WebDAVSync) -> dict:
"""同步项目到 WebDAV"""
if not WEBDAV_AVAILABLE:
return {"success": False, "error": "WebDAV library not available"}
if not sync.is_active:
return {"success": False, "error": "Sync is not active"}
try:
client = webdav_client.Client(sync.server_url, auth=(sync.username, sync.password))
# 确保远程目录存在
remote_project_path = f"{sync.remote_path}/{sync.project_id}"
try:
client.mkdir(remote_project_path)
except (OSError, IOError):
pass # 目录可能已存在
# 获取项目数据
project = self.pm.db.get_project(sync.project_id)
if not project:
return {"success": False, "error": "Project not found"}
# 导出项目数据为 JSON
entities = self.pm.db.list_project_entities(sync.project_id)
relations = self.pm.db.list_project_relations(sync.project_id)
transcripts = self.pm.db.list_project_transcripts(sync.project_id)
export_data = {
"project": {
"id": project.id,
"name": project.name,
"description": project.description,
},
"entities": [{"id": e.id, "name": e.name, "type": e.type} for e in entities],
"relations": relations,
"transcripts": [{"id": t["id"], "filename": t["filename"]} for t in transcripts],
"exported_at": datetime.now().isoformat(),
}
# 上传 JSON 文件
json_content = json.dumps(export_data, ensure_ascii=False, indent=2)
json_path = f"{remote_project_path}/project_export.json"
# 使用临时文件上传
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write(json_content)
temp_path = f.name
client.upload_file(temp_path, json_path)
os.unlink(temp_path)
# 更新同步状态
now = datetime.now().isoformat()
conn = self.pm.db.get_conn()
conn.execute(
"""UPDATE webdav_syncs
SET last_sync_at = ?, last_sync_status = ?, sync_count = sync_count + 1
WHERE id = ?""",
(now, "success", sync.id),
)
conn.commit()
conn.close()
return {
"success": True,
"message": "Project synced successfully",
"entities_count": len(entities),
"relations_count": len(relations),
"remote_path": json_path,
}
except Exception as e:
# 更新失败状态
conn = self.pm.db.get_conn()
conn.execute(
"""UPDATE webdav_syncs
SET last_sync_status = ?, last_sync_error = ?
WHERE id = ?""",
("failed", str(e), sync.id),
)
conn.commit()
conn.close()
return {"success": False, "error": str(e)}
# Singleton instance
_plugin_manager = None
def get_plugin_manager(db_manager=None) -> None:
"""获取 PluginManager 单例"""
global _plugin_manager
if _plugin_manager is None:
_plugin_manager = PluginManager(db_manager)
return _plugin_manager