Files
insightflow/backend/workflow_manager.py
OpenClaw Bot ea58b6fe43 fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
2026-03-01 00:08:06 +08:00

1500 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Workflow Manager - Phase 7
智能工作流自动化模块
- 定时任务调度APScheduler
- 自动分析新上传文件
- 自动实体对齐和关系发现
- Webhook 通知系统飞书、钉钉、Slack
- 工作流配置管理
"""
import asyncio
import base64
import hashlib
import hmac
import json
import logging
import urllib.parse
import uuid
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
import httpx
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WorkflowStatus(Enum):
"""工作流状态"""
ACTIVE = "active"
PAUSED = "paused"
ERROR = "error"
COMPLETED = "completed"
class WorkflowType(Enum):
"""工作流类型"""
AUTO_ANALYZE = "auto_analyze" # 自动分析新文件
AUTO_ALIGN = "auto_align" # 自动实体对齐
AUTO_RELATION = "auto_relation" # 自动关系发现
SCHEDULED_REPORT = "scheduled_report" # 定时报告
CUSTOM = "custom" # 自定义工作流
class WebhookType(Enum):
"""Webhook 类型"""
FEISHU = "feishu"
DINGTALK = "dingtalk"
SLACK = "slack"
CUSTOM = "custom"
class TaskStatus(Enum):
"""任务执行状态"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class WorkflowTask:
"""工作流任务定义"""
id: str
workflow_id: str
name: str
task_type: str # analyze, align, discover_relations, notify, custom
config: dict = field(default_factory=dict)
order: int = 0
depends_on: list[str] = field(default_factory=list)
timeout_seconds: int = 300
retry_count: int = 3
retry_delay: int = 5
created_at: str = ""
updated_at: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
if not self.updated_at:
self.updated_at = self.created_at
@dataclass
class WebhookConfig:
"""Webhook 配置"""
id: str
name: str
webhook_type: str # feishu, dingtalk, slack, custom
url: str
secret: str = "" # 用于签名验证
headers: dict = field(default_factory=dict)
template: str = "" # 消息模板
is_active: bool = True
created_at: str = ""
updated_at: str = ""
last_used_at: str | None = None
success_count: int = 0
fail_count: int = 0
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
if not self.updated_at:
self.updated_at = self.created_at
@dataclass
class Workflow:
"""工作流定义"""
id: str
name: str
description: str
workflow_type: str
project_id: str
status: str = "active"
schedule: str | None = None # cron expression or interval
schedule_type: str = "manual" # manual, cron, interval
config: dict = field(default_factory=dict)
webhook_ids: list[str] = field(default_factory=list)
is_active: bool = True
created_at: str = ""
updated_at: str = ""
last_run_at: str | None = None
next_run_at: str | None = None
run_count: int = 0
success_count: int = 0
fail_count: int = 0
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
if not self.updated_at:
self.updated_at = self.created_at
@dataclass
class WorkflowLog:
"""工作流执行日志"""
id: str
workflow_id: str
task_id: str | None = None
status: str = "pending" # pending, running, success, failed, cancelled
start_time: str | None = None
end_time: str | None = None
duration_ms: int = 0
input_data: dict = field(default_factory=dict)
output_data: dict = field(default_factory=dict)
error_message: str = ""
created_at: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
class WebhookNotifier:
"""Webhook 通知器 - 支持飞书、钉钉、Slack"""
def __init__(self):
self.http_client = httpx.AsyncClient(timeout=30.0)
async def send(self, config: WebhookConfig, message: dict) -> bool:
"""发送 Webhook 通知"""
try:
webhook_type = WebhookType(config.webhook_type)
if webhook_type == WebhookType.FEISHU:
return await self._send_feishu(config, message)
elif webhook_type == WebhookType.DINGTALK:
return await self._send_dingtalk(config, message)
elif webhook_type == WebhookType.SLACK:
return await self._send_slack(config, message)
else:
return await self._send_custom(config, message)
except (TimeoutError, httpx.HTTPError) as e:
logger.error(f"Webhook send failed: {e}")
return False
async def _send_feishu(self, config: WebhookConfig, message: dict) -> bool:
"""发送飞书通知"""
timestamp = str(int(datetime.now().timestamp()))
# 签名计算
if config.secret:
string_to_sign = f"{timestamp}\n{config.secret}"
hmac_code = hmac.new(string_to_sign.encode("utf-8"), digestmod=hashlib.sha256).digest()
sign = base64.b64encode(hmac_code).decode("utf-8")
else:
sign = ""
# 构建消息体
if "content" in message:
# 文本消息
payload = {
"timestamp": timestamp,
"sign": sign,
"msg_type": "text",
"content": {"text": message["content"]},
}
elif "title" in message:
# 富文本消息
payload = {
"timestamp": timestamp,
"sign": sign,
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": message.get("title", ""),
"content": message.get("body", []),
}
}
},
}
else:
# 卡片消息
payload = {
"timestamp": timestamp,
"sign": sign,
"msg_type": "interactive",
"card": message.get("card", {}),
}
headers = {"Content-Type": "application/json", **config.headers}
response = await self.http_client.post(config.url, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
return result.get("code") == 0
async def _send_dingtalk(self, config: WebhookConfig, message: dict) -> bool:
"""发送钉钉通知"""
timestamp = str(round(datetime.now().timestamp() * 1000))
# 签名计算
if config.secret:
secret_enc = config.secret.encode("utf-8")
string_to_sign = f"{timestamp}\n{config.secret}"
hmac_code = hmac.new(
secret_enc, string_to_sign.encode("utf-8"), digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
url = f"{config.url}&timestamp={timestamp}&sign={sign}"
else:
url = config.url
# 构建消息体
if "content" in message:
payload = {"msgtype": "text", "text": {"content": message["content"]}}
elif "title" in message:
payload = {
"msgtype": "markdown",
"markdown": {"title": message["title"], "text": message.get("markdown", "")},
}
elif "link" in message:
payload = {
"msgtype": "link",
"link": {
"text": message.get("text", ""),
"title": message["title"],
"picUrl": message.get("pic_url", ""),
"messageUrl": message["link"],
},
}
else:
payload = {"msgtype": "action_card", "action_card": message.get("action_card", {})}
headers = {"Content-Type": "application/json", **config.headers}
response = await self.http_client.post(url, json=payload, headers=headers)
response.raise_for_status()
result = response.json()
return result.get("errcode") == 0
async def _send_slack(self, config: WebhookConfig, message: dict) -> bool:
"""发送 Slack 通知"""
# Slack 直接支持标准 webhook 格式
payload = {
"text": message.get("content", message.get("text", "")),
}
if "blocks" in message:
payload["blocks"] = message["blocks"]
if "attachments" in message:
payload["attachments"] = message["attachments"]
headers = {"Content-Type": "application/json", **config.headers}
response = await self.http_client.post(config.url, json=payload, headers=headers)
response.raise_for_status()
return response.text == "ok"
async def _send_custom(self, config: WebhookConfig, message: dict) -> bool:
"""发送自定义 Webhook 通知"""
headers = {"Content-Type": "application/json", **config.headers}
response = await self.http_client.post(config.url, json=message, headers=headers)
response.raise_for_status()
return True
async def close(self):
"""关闭 HTTP 客户端"""
await self.http_client.aclose()
class WorkflowManager:
"""工作流管理器 - 核心管理类"""
# 默认配置常量
DEFAULT_TIMEOUT: int = 300
DEFAULT_RETRY_COUNT: int = 3
DEFAULT_RETRY_DELAY: int = 5
def __init__(self, db_manager=None):
self.db = db_manager
self.scheduler = AsyncIOScheduler()
self.notifier = WebhookNotifier()
self._task_handlers: dict[str, Callable] = {}
self._running_tasks: dict[str, asyncio.Task] = {}
self._setup_default_handlers()
# 添加调度器事件监听
self.scheduler.add_listener(self._on_job_executed, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
def _setup_default_handlers(self) -> None:
"""设置默认的任务处理器"""
self._task_handlers = {
"analyze": self._handle_analyze_task,
"align": self._handle_align_task,
"discover_relations": self._handle_discover_relations_task,
"notify": self._handle_notify_task,
"custom": self._handle_custom_task,
}
def register_task_handler(self, task_type: str, handler: Callable) -> None:
"""注册自定义任务处理器"""
self._task_handlers[task_type] = handler
def start(self) -> None:
"""启动工作流管理器"""
if not self.scheduler.running:
self.scheduler.start()
logger.info("Workflow scheduler started")
# 加载并调度所有活跃的工作流
if self.db:
asyncio.create_task(self._load_and_schedule_workflows())
def stop(self) -> None:
"""停止工作流管理器"""
if self.scheduler.running:
self.scheduler.shutdown(wait=True)
logger.info("Workflow scheduler stopped")
async def _load_and_schedule_workflows(self):
"""从数据库加载并调度所有活跃工作流"""
try:
workflows = self.list_workflows(status="active")
for workflow in workflows:
if workflow.schedule and workflow.is_active:
self._schedule_workflow(workflow)
except (TimeoutError, httpx.HTTPError) as e:
logger.error(f"Failed to load workflows: {e}")
def _schedule_workflow(self, workflow: Workflow) -> None:
"""调度工作流"""
job_id = f"workflow_{workflow.id}"
# 移除已存在的任务
if self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
if workflow.schedule_type == "cron":
# Cron 表达式调度
trigger = CronTrigger.from_crontab(workflow.schedule)
elif workflow.schedule_type == "interval":
# 间隔调度
interval_minutes = int(workflow.schedule)
trigger = IntervalTrigger(minutes=interval_minutes)
else:
return
self.scheduler.add_job(
func=self._execute_workflow_job,
trigger=trigger,
id=job_id,
args=[workflow.id],
replace_existing=True,
max_instances=1,
coalesce=True,
)
logger.info(
f"Scheduled workflow {workflow.id} ({workflow.name}) with {workflow.schedule_type}"
)
async def _execute_workflow_job(self, workflow_id: str):
"""调度器调用的工作流执行函数"""
try:
await self.execute_workflow(workflow_id)
except (TimeoutError, httpx.HTTPError) as e:
logger.error(f"Scheduled workflow execution failed: {e}")
def _on_job_executed(self, event) -> None:
"""调度器事件处理"""
if event.exception:
logger.error(f"Job {event.job_id} failed: {event.exception}")
else:
logger.info(f"Job {event.job_id} executed successfully")
# ==================== Workflow CRUD ====================
def create_workflow(self, workflow: Workflow) -> Workflow:
"""创建工作流"""
conn = self.db.get_conn()
try:
conn.execute(
"""INSERT INTO workflows
(id, name, description, workflow_type, project_id, status,
schedule, schedule_type, config, webhook_ids, is_active,
created_at, updated_at, last_run_at, next_run_at,
run_count, success_count, fail_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
workflow.id,
workflow.name,
workflow.description,
workflow.workflow_type,
workflow.project_id,
workflow.status,
workflow.schedule,
workflow.schedule_type,
json.dumps(workflow.config),
json.dumps(workflow.webhook_ids),
workflow.is_active,
workflow.created_at,
workflow.updated_at,
workflow.last_run_at,
workflow.next_run_at,
workflow.run_count,
workflow.success_count,
workflow.fail_count,
),
)
conn.commit()
# 如果设置了调度,立即调度
if workflow.schedule and workflow.is_active:
self._schedule_workflow(workflow)
return workflow
finally:
conn.close()
def get_workflow(self, workflow_id: str) -> Workflow | None:
"""获取工作流"""
conn = self.db.get_conn()
try:
row = conn.execute("SELECT * FROM workflows WHERE id = ?", (workflow_id,)).fetchone()
if not row:
return None
return self._row_to_workflow(row)
finally:
conn.close()
def list_workflows(
self, project_id: str = None, status: str = None, workflow_type: str = None
) -> list[Workflow]:
"""列出工作流"""
conn = self.db.get_conn()
try:
conditions = []
params = []
if project_id:
conditions.append("project_id = ?")
params.append(project_id)
if status:
conditions.append("status = ?")
params.append(status)
if workflow_type:
conditions.append("workflow_type = ?")
params.append(workflow_type)
where_clause = " AND ".join(conditions) if conditions else "1=1"
rows = conn.execute(
f"SELECT * FROM workflows WHERE {where_clause} ORDER BY created_at DESC", params
).fetchall()
return [self._row_to_workflow(row) for row in rows]
finally:
conn.close()
def update_workflow(self, workflow_id: str, **kwargs) -> Workflow | None:
"""更新工作流"""
conn = self.db.get_conn()
try:
allowed_fields = [
"name",
"description",
"status",
"schedule",
"schedule_type",
"is_active",
"config",
"webhook_ids",
]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
if f in ["config", "webhook_ids"]:
values.append(json.dumps(kwargs[f]))
else:
values.append(kwargs[f])
if not updates:
return self.get_workflow(workflow_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(workflow_id)
query = f"UPDATE workflows SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
# 重新调度
workflow = self.get_workflow(workflow_id)
if workflow and workflow.schedule and workflow.is_active:
self._schedule_workflow(workflow)
elif workflow and not workflow.is_active:
job_id = f"workflow_{workflow_id}"
if self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
return workflow
finally:
conn.close()
def delete_workflow(self, workflow_id: str) -> bool:
"""删除工作流"""
conn = self.db.get_conn()
try:
# 移除调度
job_id = f"workflow_{workflow_id}"
if self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
# 删除相关任务
conn.execute("DELETE FROM workflow_tasks WHERE workflow_id = ?", (workflow_id,))
# 删除工作流
conn.execute("DELETE FROM workflows WHERE id = ?", (workflow_id,))
conn.commit()
return True
finally:
conn.close()
def _row_to_workflow(self, row) -> Workflow:
"""将数据库行转换为 Workflow 对象"""
return Workflow(
id=row["id"],
name=row["name"],
description=row["description"] or "",
workflow_type=row["workflow_type"],
project_id=row["project_id"],
status=row["status"],
schedule=row["schedule"],
schedule_type=row["schedule_type"],
config=json.loads(row["config"]) if row["config"] else {},
webhook_ids=json.loads(row["webhook_ids"]) if row["webhook_ids"] else [],
is_active=bool(row["is_active"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
last_run_at=row["last_run_at"],
next_run_at=row["next_run_at"],
run_count=row["run_count"] or 0,
success_count=row["success_count"] or 0,
fail_count=row["fail_count"] or 0,
)
# ==================== Workflow Task CRUD ====================
def create_task(self, task: WorkflowTask) -> WorkflowTask:
"""创建工作流任务"""
conn = self.db.get_conn()
try:
conn.execute(
"""INSERT INTO workflow_tasks
(id, workflow_id, name, task_type, config, task_order,
depends_on, timeout_seconds, retry_count, retry_delay,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
task.id,
task.workflow_id,
task.name,
task.task_type,
json.dumps(task.config),
task.order,
json.dumps(task.depends_on),
task.timeout_seconds,
task.retry_count,
task.retry_delay,
task.created_at,
task.updated_at,
),
)
conn.commit()
return task
finally:
conn.close()
def get_task(self, task_id: str) -> WorkflowTask | None:
"""获取任务"""
conn = self.db.get_conn()
try:
row = conn.execute("SELECT * FROM workflow_tasks WHERE id = ?", (task_id,)).fetchone()
if not row:
return None
return self._row_to_task(row)
finally:
conn.close()
def list_tasks(self, workflow_id: str) -> list[WorkflowTask]:
"""列出工作流的所有任务"""
conn = self.db.get_conn()
try:
rows = conn.execute(
"SELECT * FROM workflow_tasks WHERE workflow_id = ? ORDER BY task_order",
(workflow_id,),
).fetchall()
return [self._row_to_task(row) for row in rows]
finally:
conn.close()
def update_task(self, task_id: str, **kwargs) -> WorkflowTask | None:
"""更新任务"""
conn = self.db.get_conn()
try:
allowed_fields = [
"name",
"task_type",
"config",
"task_order",
"depends_on",
"timeout_seconds",
"retry_count",
"retry_delay",
]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
if f in ["config", "depends_on"]:
values.append(json.dumps(kwargs[f]))
else:
values.append(kwargs[f])
if not updates:
return self.get_task(task_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(task_id)
query = f"UPDATE workflow_tasks SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
return self.get_task(task_id)
finally:
conn.close()
def delete_task(self, task_id: str) -> bool:
"""删除任务"""
conn = self.db.get_conn()
try:
conn.execute("DELETE FROM workflow_tasks WHERE id = ?", (task_id,))
conn.commit()
return True
finally:
conn.close()
def _row_to_task(self, row) -> WorkflowTask:
"""将数据库行转换为 WorkflowTask 对象"""
return WorkflowTask(
id=row["id"],
workflow_id=row["workflow_id"],
name=row["name"],
task_type=row["task_type"],
config=json.loads(row["config"]) if row["config"] else {},
order=row["task_order"] or 0,
depends_on=json.loads(row["depends_on"]) if row["depends_on"] else [],
timeout_seconds=row["timeout_seconds"] or 300,
retry_count=row["retry_count"] or 3,
retry_delay=row["retry_delay"] or 5,
created_at=row["created_at"],
updated_at=row["updated_at"],
)
# ==================== Webhook Config CRUD ====================
def create_webhook(self, webhook: WebhookConfig) -> WebhookConfig:
"""创建 Webhook 配置"""
conn = self.db.get_conn()
try:
conn.execute(
"""INSERT INTO webhook_configs
(id, name, webhook_type, url, secret, headers, template,
is_active, created_at, updated_at, last_used_at,
success_count, fail_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
webhook.id,
webhook.name,
webhook.webhook_type,
webhook.url,
webhook.secret,
json.dumps(webhook.headers),
webhook.template,
webhook.is_active,
webhook.created_at,
webhook.updated_at,
webhook.last_used_at,
webhook.success_count,
webhook.fail_count,
),
)
conn.commit()
return webhook
finally:
conn.close()
def get_webhook(self, webhook_id: str) -> WebhookConfig | None:
"""获取 Webhook 配置"""
conn = self.db.get_conn()
try:
row = conn.execute(
"SELECT * FROM webhook_configs WHERE id = ?", (webhook_id,)
).fetchone()
if not row:
return None
return self._row_to_webhook(row)
finally:
conn.close()
def list_webhooks(self) -> list[WebhookConfig]:
"""列出所有 Webhook 配置"""
conn = self.db.get_conn()
try:
rows = conn.execute("SELECT * FROM webhook_configs ORDER BY created_at DESC").fetchall()
return [self._row_to_webhook(row) for row in rows]
finally:
conn.close()
def update_webhook(self, webhook_id: str, **kwargs) -> WebhookConfig | None:
"""更新 Webhook 配置"""
conn = self.db.get_conn()
try:
allowed_fields = [
"name",
"webhook_type",
"url",
"secret",
"headers",
"template",
"is_active",
]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
if f == "headers":
values.append(json.dumps(kwargs[f]))
else:
values.append(kwargs[f])
if not updates:
return self.get_webhook(webhook_id)
updates.append("updated_at = ?")
values.append(datetime.now().isoformat())
values.append(webhook_id)
query = f"UPDATE webhook_configs SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
return self.get_webhook(webhook_id)
finally:
conn.close()
def delete_webhook(self, webhook_id: str) -> bool:
"""删除 Webhook 配置"""
conn = self.db.get_conn()
try:
conn.execute("DELETE FROM webhook_configs WHERE id = ?", (webhook_id,))
conn.commit()
return True
finally:
conn.close()
def update_webhook_stats(self, webhook_id: str, success: bool) -> None:
"""更新 Webhook 统计"""
conn = self.db.get_conn()
try:
if success:
conn.execute(
"""UPDATE webhook_configs
SET success_count = success_count + 1, last_used_at = ?
WHERE id = ?""",
(datetime.now().isoformat(), webhook_id),
)
else:
conn.execute(
"""UPDATE webhook_configs
SET fail_count = fail_count + 1, last_used_at = ?
WHERE id = ?""",
(datetime.now().isoformat(), webhook_id),
)
conn.commit()
finally:
conn.close()
def _row_to_webhook(self, row) -> WebhookConfig:
"""将数据库行转换为 WebhookConfig 对象"""
return WebhookConfig(
id=row["id"],
name=row["name"],
webhook_type=row["webhook_type"],
url=row["url"],
secret=row["secret"] or "",
headers=json.loads(row["headers"]) if row["headers"] else {},
template=row["template"] or "",
is_active=bool(row["is_active"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
last_used_at=row["last_used_at"],
success_count=row["success_count"] or 0,
fail_count=row["fail_count"] or 0,
)
# ==================== Workflow Log ====================
def create_log(self, log: WorkflowLog) -> WorkflowLog:
"""创建工作流日志"""
conn = self.db.get_conn()
try:
conn.execute(
"""INSERT INTO workflow_logs
(id, workflow_id, task_id, status, start_time, end_time,
duration_ms, input_data, output_data, error_message, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
log.id,
log.workflow_id,
log.task_id,
log.status,
log.start_time,
log.end_time,
log.duration_ms,
json.dumps(log.input_data),
json.dumps(log.output_data),
log.error_message,
log.created_at,
),
)
conn.commit()
return log
finally:
conn.close()
def update_log(self, log_id: str, **kwargs) -> WorkflowLog | None:
"""更新工作流日志"""
conn = self.db.get_conn()
try:
allowed_fields = ["status", "end_time", "duration_ms", "output_data", "error_message"]
updates = []
values = []
for f in allowed_fields:
if f in kwargs:
updates.append(f"{f} = ?")
if f == "output_data":
values.append(json.dumps(kwargs[f]))
else:
values.append(kwargs[f])
if not updates:
return None
values.append(log_id)
query = f"UPDATE workflow_logs SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, values)
conn.commit()
return self.get_log(log_id)
finally:
conn.close()
def get_log(self, log_id: str) -> WorkflowLog | None:
"""获取日志"""
conn = self.db.get_conn()
try:
row = conn.execute("SELECT * FROM workflow_logs WHERE id = ?", (log_id,)).fetchone()
if not row:
return None
return self._row_to_log(row)
finally:
conn.close()
def list_logs(
self,
workflow_id: str = None,
task_id: str = None,
status: str = None,
limit: int = 100,
offset: int = 0,
) -> list[WorkflowLog]:
"""列出工作流日志"""
conn = self.db.get_conn()
try:
conditions = []
params = []
if workflow_id:
conditions.append("workflow_id = ?")
params.append(workflow_id)
if task_id:
conditions.append("task_id = ?")
params.append(task_id)
if status:
conditions.append("status = ?")
params.append(status)
where_clause = " AND ".join(conditions) if conditions else "1=1"
rows = conn.execute(
f"""SELECT * FROM workflow_logs
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT ? OFFSET ?""",
params + [limit, offset],
).fetchall()
return [self._row_to_log(row) for row in rows]
finally:
conn.close()
def get_workflow_stats(self, workflow_id: str, days: int = 30) -> dict:
"""获取工作流统计"""
conn = self.db.get_conn()
try:
since = (datetime.now() - timedelta(days=days)).isoformat()
# 总执行次数
total = conn.execute(
"SELECT COUNT(*) FROM workflow_logs WHERE workflow_id = ? AND created_at > ?",
(workflow_id, since),
).fetchone()[0]
# 成功次数
success = conn.execute(
"SELECT COUNT(*) FROM workflow_logs WHERE workflow_id = ? AND status = 'success' AND created_at > ?",
(workflow_id, since),
).fetchone()[0]
# 失败次数
failed = conn.execute(
"SELECT COUNT(*) FROM workflow_logs WHERE workflow_id = ? AND status = 'failed' AND created_at > ?",
(workflow_id, since),
).fetchone()[0]
# 平均执行时间
avg_duration = (
conn.execute(
"SELECT AVG(duration_ms) FROM workflow_logs WHERE workflow_id = ? AND created_at > ?",
(workflow_id, since),
).fetchone()[0]
or 0
)
# 每日统计
daily = conn.execute(
"""SELECT DATE(created_at) as date,
COUNT(*) as count,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success
FROM workflow_logs
WHERE workflow_id = ? AND created_at > ?
GROUP BY DATE(created_at)
ORDER BY date""",
(workflow_id, since),
).fetchall()
return {
"total": total,
"success": success,
"failed": failed,
"success_rate": round(success / total * 100, 2) if total > 0 else 0,
"avg_duration_ms": round(avg_duration, 2),
"daily": [
{"date": r["date"], "count": r["count"], "success": r["success"]} for r in daily
],
}
finally:
conn.close()
def _row_to_log(self, row) -> WorkflowLog:
"""将数据库行转换为 WorkflowLog 对象"""
return WorkflowLog(
id=row["id"],
workflow_id=row["workflow_id"],
task_id=row["task_id"],
status=row["status"],
start_time=row["start_time"],
end_time=row["end_time"],
duration_ms=row["duration_ms"] or 0,
input_data=json.loads(row["input_data"]) if row["input_data"] else {},
output_data=json.loads(row["output_data"]) if row["output_data"] else {},
error_message=row["error_message"] or "",
created_at=row["created_at"],
)
# ==================== Workflow Execution ====================
async def execute_workflow(self, workflow_id: str, input_data: dict = None) -> dict:
"""执行工作流"""
workflow = self.get_workflow(workflow_id)
if not workflow:
raise ValueError(f"Workflow {workflow_id} not found")
if not workflow.is_active:
raise ValueError(f"Workflow {workflow_id} is not active")
# 更新最后运行时间
now = datetime.now().isoformat()
self.update_workflow(workflow_id, last_run_at=now, run_count=workflow.run_count + 1)
# 创建工作流执行日志
log = WorkflowLog(
id=str(uuid.uuid4())[:8],
workflow_id=workflow_id,
status=TaskStatus.RUNNING.value,
start_time=now,
input_data=input_data or {},
)
self.create_log(log)
start_time = datetime.now()
results = {}
try:
# 获取所有任务
tasks = self.list_tasks(workflow_id)
if not tasks:
# 没有任务时执行默认行为
results = await self._execute_default_workflow(workflow, input_data)
else:
# 按依赖顺序执行任务
results = await self._execute_tasks_with_deps(tasks, input_data, log.id)
# 发送通知
await self._send_workflow_notification(workflow, results, success=True)
# 更新日志为成功
end_time = datetime.now()
duration = int((end_time - start_time).total_seconds() * 1000)
self.update_log(
log.id,
status=TaskStatus.SUCCESS.value,
end_time=end_time.isoformat(),
duration_ms=duration,
output_data=results,
)
# 更新成功计数
self.update_workflow(workflow_id, success_count=workflow.success_count + 1)
return {
"success": True,
"workflow_id": workflow_id,
"log_id": log.id,
"results": results,
"duration_ms": duration,
}
except (TimeoutError, httpx.HTTPError) as e:
logger.error(f"Workflow {workflow_id} execution failed: {e}")
# 更新日志为失败
end_time = datetime.now()
duration = int((end_time - start_time).total_seconds() * 1000)
self.update_log(
log.id,
status=TaskStatus.FAILED.value,
end_time=end_time.isoformat(),
duration_ms=duration,
error_message=str(e),
)
# 更新失败计数
self.update_workflow(workflow_id, fail_count=workflow.fail_count + 1)
# 发送失败通知
await self._send_workflow_notification(workflow, {"error": str(e)}, success=False)
raise
async def _execute_tasks_with_deps(
self, tasks: list[WorkflowTask], input_data: dict, log_id: str
) -> dict:
"""按依赖顺序执行任务"""
results = {}
completed_tasks = set()
while len(completed_tasks) < len(tasks):
# 找到可以执行的任务(依赖已完成)
ready_tasks = [
t
for t in tasks
if t.id not in completed_tasks
and all(dep in completed_tasks for dep in t.depends_on)
]
if not ready_tasks:
# 有循环依赖或无法完成的任务
raise ValueError("Circular dependency detected or tasks cannot be resolved")
# 并行执行就绪的任务
task_coros = []
for task in ready_tasks:
task_input = {**input_data, **results}
task_coros.append(self._execute_single_task(task, task_input, log_id))
task_results = await asyncio.gather(*task_coros, return_exceptions=True)
for task, result in zip(ready_tasks, task_results):
if isinstance(result, Exception):
logger.error(f"Task {task.id} failed: {result}")
if task.retry_count > 0:
# 重试逻辑
for attempt in range(task.retry_count):
await asyncio.sleep(task.retry_delay)
try:
result = await self._execute_single_task(task, task_input, log_id)
break
except (TimeoutError, httpx.HTTPError) as e:
logger.error(f"Task {task.id} retry {attempt + 1} failed: {e}")
if attempt == task.retry_count - 1:
raise
else:
raise result
results[task.name] = result
completed_tasks.add(task.id)
return results
async def _execute_single_task(self, task: WorkflowTask, input_data: dict, log_id: str) -> Any:
"""执行单个任务"""
handler = self._task_handlers.get(task.task_type)
if not handler:
raise ValueError(f"No handler for task type: {task.task_type}")
# 创建任务日志
task_log = WorkflowLog(
id=str(uuid.uuid4())[:8],
workflow_id=task.workflow_id,
task_id=task.id,
status=TaskStatus.RUNNING.value,
start_time=datetime.now().isoformat(),
input_data=input_data,
)
self.create_log(task_log)
try:
# 设置超时
result = await asyncio.wait_for(handler(task, input_data), timeout=task.timeout_seconds)
# 更新任务日志为成功
self.update_log(
task_log.id,
status=TaskStatus.SUCCESS.value,
end_time=datetime.now().isoformat(),
output_data={"result": result} if not isinstance(result, dict) else result,
)
return result
except TimeoutError:
self.update_log(
task_log.id,
status=TaskStatus.FAILED.value,
end_time=datetime.now().isoformat(),
error_message="Task timeout",
)
raise TimeoutError(f"Task {task.id} timed out after {task.timeout_seconds}s")
except Exception as e:
self.update_log(
task_log.id,
status=TaskStatus.FAILED.value,
end_time=datetime.now().isoformat(),
error_message=str(e),
)
raise
async def _execute_default_workflow(self, workflow: Workflow, input_data: dict) -> dict:
"""执行默认工作流(根据类型)"""
workflow_type = WorkflowType(workflow.workflow_type)
if workflow_type == WorkflowType.AUTO_ANALYZE:
return await self._auto_analyze_files(workflow, input_data)
elif workflow_type == WorkflowType.AUTO_ALIGN:
return await self._auto_align_entities(workflow, input_data)
elif workflow_type == WorkflowType.AUTO_RELATION:
return await self._auto_discover_relations(workflow, input_data)
elif workflow_type == WorkflowType.SCHEDULED_REPORT:
return await self._generate_scheduled_report(workflow, input_data)
else:
return {"message": "No default action for custom workflow"}
# ==================== Default Task Handlers ====================
async def _handle_analyze_task(self, task: WorkflowTask, input_data: dict) -> dict:
"""处理分析任务"""
project_id = input_data.get("project_id")
file_ids = input_data.get("file_ids", [])
if not project_id:
raise ValueError("project_id required for analyze task")
# 这里调用现有的文件分析逻辑
# 实际实现需要与 main.py 中的 upload_audio 逻辑集成
return {
"task": "analyze",
"project_id": project_id,
"files_processed": len(file_ids),
"status": "completed",
}
async def _handle_align_task(self, task: WorkflowTask, input_data: dict) -> dict:
"""处理实体对齐任务"""
project_id = input_data.get("project_id")
threshold = task.config.get("threshold", 0.85)
if not project_id:
raise ValueError("project_id required for align task")
# 这里调用实体对齐逻辑
return {
"task": "align",
"project_id": project_id,
"threshold": threshold,
"entities_merged": 0, # 实际实现需要调用对齐逻辑
"status": "completed",
}
async def _handle_discover_relations_task(self, task: WorkflowTask, input_data: dict) -> dict:
"""处理关系发现任务"""
project_id = input_data.get("project_id")
if not project_id:
raise ValueError("project_id required for discover_relations task")
# 这里调用关系发现逻辑
return {
"task": "discover_relations",
"project_id": project_id,
"relations_found": 0, # 实际实现需要调用关系发现逻辑
"status": "completed",
}
async def _handle_notify_task(self, task: WorkflowTask, input_data: dict) -> dict:
"""处理通知任务"""
webhook_id = task.config.get("webhook_id")
message = task.config.get("message", {})
if not webhook_id:
raise ValueError("webhook_id required for notify task")
webhook = self.get_webhook(webhook_id)
if not webhook:
raise ValueError(f"Webhook {webhook_id} not found")
# 替换模板变量
if webhook.template:
try:
message = json.loads(webhook.template.format(**input_data))
except (json.JSONDecodeError, KeyError, ValueError):
pass
success = await self.notifier.send(webhook, message)
self.update_webhook_stats(webhook_id, success)
return {"task": "notify", "webhook_id": webhook_id, "success": success}
async def _handle_custom_task(self, task: WorkflowTask, input_data: dict) -> dict:
"""处理自定义任务"""
# 自定义任务的具体逻辑由外部处理器实现
return {
"task": "custom",
"task_name": task.name,
"config": task.config,
"status": "completed",
}
# ==================== Default Workflow Implementations ====================
async def _auto_analyze_files(self, workflow: Workflow, input_data: dict) -> dict:
"""自动分析新上传的文件"""
project_id = workflow.project_id
# 获取未分析的文件(实际实现需要查询数据库)
# 这里是一个示例实现
return {
"workflow_type": "auto_analyze",
"project_id": project_id,
"files_analyzed": 0,
"entities_extracted": 0,
"relations_extracted": 0,
"status": "completed",
}
async def _auto_align_entities(self, workflow: Workflow, input_data: dict) -> dict:
"""自动实体对齐"""
project_id = workflow.project_id
threshold = workflow.config.get("threshold", 0.85)
return {
"workflow_type": "auto_align",
"project_id": project_id,
"threshold": threshold,
"entities_merged": 0,
"status": "completed",
}
async def _auto_discover_relations(self, workflow: Workflow, input_data: dict) -> dict:
"""自动关系发现"""
project_id = workflow.project_id
return {
"workflow_type": "auto_relation",
"project_id": project_id,
"relations_discovered": 0,
"status": "completed",
}
async def _generate_scheduled_report(self, workflow: Workflow, input_data: dict) -> dict:
"""生成定时报告"""
project_id = workflow.project_id
report_type = workflow.config.get("report_type", "summary")
return {
"workflow_type": "scheduled_report",
"project_id": project_id,
"report_type": report_type,
"status": "completed",
}
# ==================== Notification ====================
async def _send_workflow_notification(
self, workflow: Workflow, results: dict, success: bool = True
):
"""发送工作流执行通知"""
if not workflow.webhook_ids:
return
for webhook_id in workflow.webhook_ids:
webhook = self.get_webhook(webhook_id)
if not webhook or not webhook.is_active:
continue
# 构建通知消息
if webhook.webhook_type == WebhookType.FEISHU.value:
message = self._build_feishu_message(workflow, results, success)
elif webhook.webhook_type == WebhookType.DINGTALK.value:
message = self._build_dingtalk_message(workflow, results, success)
elif webhook.webhook_type == WebhookType.SLACK.value:
message = self._build_slack_message(workflow, results, success)
else:
message = {
"workflow_id": workflow.id,
"workflow_name": workflow.name,
"status": "success" if success else "failed",
"results": results,
"timestamp": datetime.now().isoformat(),
}
try:
result = await self.notifier.send(webhook, message)
self.update_webhook_stats(webhook_id, result)
except (TimeoutError, httpx.HTTPError) as e:
logger.error(f"Failed to send notification to {webhook_id}: {e}")
def _build_feishu_message(self, workflow: Workflow, results: dict, success: bool) -> dict:
"""构建飞书消息"""
status_text = "✅ 成功" if success else "❌ 失败"
return {
"title": f"工作流执行通知: {workflow.name}",
"body": [
[{"tag": "text", "text": f"工作流: {workflow.name}"}],
[{"tag": "text", "text": f"状态: {status_text}"}],
[{"tag": "text", "text": f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"}],
],
}
def _build_dingtalk_message(self, workflow: Workflow, results: dict, success: bool) -> dict:
"""构建钉钉消息"""
status_text = "✅ 成功" if success else "❌ 失败"
return {
"title": f"工作流执行通知: {workflow.name}",
"markdown": f"""### 工作流执行通知
**工作流:** {workflow.name}
**状态:** {status_text}
**时间:** {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
**结果:**
```json
{json.dumps(results, ensure_ascii=False, indent=2)}
```
""",
}
def _build_slack_message(self, workflow: Workflow, results: dict, success: bool) -> dict:
"""构建 Slack 消息"""
color = "#36a64f" if success else "#ff0000"
status_text = "Success" if success else "Failed"
return {
"attachments": [
{
"color": color,
"title": f"Workflow Execution: {workflow.name}",
"fields": [
{"title": "Status", "value": status_text, "short": True},
{
"title": "Time",
"value": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"short": True,
},
],
"footer": "InsightFlow",
"ts": int(datetime.now().timestamp()),
}
]
}
# Singleton instance
_workflow_manager = None
def get_workflow_manager(db_manager=None) -> WorkflowManager:
"""获取 WorkflowManager 单例"""
global _workflow_manager
if _workflow_manager is None:
_workflow_manager = WorkflowManager(db_manager)
return _workflow_manager