Phase 7: Add workflow automation module

- Create workflow_manager.py with APScheduler integration
- Add WebhookNotifier supporting Feishu/DingTalk/Slack
- Update schema.sql with workflows, workflow_logs, webhook_configs tables
- Add workflow API endpoints (CRUD, trigger, logs)
- Add webhook API endpoints (CRUD, test)
- Update requirements.txt with APScheduler dependency
- Update STATUS.md with Phase 7 progress
This commit is contained in:
OpenClaw Bot
2026-02-23 00:05:41 +08:00
parent 0975de7f0a
commit 2e8f160f8b
5 changed files with 2327 additions and 12 deletions

View File

@@ -100,6 +100,17 @@ except ImportError as e:
print(f"Rate Limiter import error: {e}")
RATE_LIMITER_AVAILABLE = False
# Phase 7: Workflow Manager
try:
from workflow_manager import (
get_workflow_manager, WorkflowManager, Workflow, WorkflowTask,
WebhookConfig, WorkflowLog, WorkflowType, WebhookType, TaskStatus
)
WORKFLOW_AVAILABLE = True
except ImportError as e:
print(f"Workflow Manager import error: {e}")
WORKFLOW_AVAILABLE = False
# FastAPI app with enhanced metadata for Swagger
app = FastAPI(
title="InsightFlow API",
@@ -115,6 +126,7 @@ app = FastAPI(
* **知识推理** - 因果推理、对比分析、时序分析
* **图分析** - Neo4j 图数据库集成、路径查询
* **导出功能** - 多种格式导出PDF、Excel、CSV、JSON
* **工作流** - 自动化任务、Webhook 通知
## 认证
@@ -123,7 +135,7 @@ app = FastAPI(
X-API-Key: your_api_key_here
```
""",
version="0.6.0",
version="0.7.0",
contact={
"name": "InsightFlow Team",
"url": "https://github.com/insightflow/insightflow",
@@ -141,6 +153,8 @@ app = FastAPI(
{"name": "Graph", "description": "图分析和 Neo4j"},
{"name": "Export", "description": "数据导出"},
{"name": "API Keys", "description": "API 密钥管理"},
{"name": "Workflows", "description": "工作流自动化"},
{"name": "Webhooks", "description": "Webhook 配置"},
{"name": "System", "description": "系统信息"},
]
)
@@ -454,6 +468,173 @@ class GlossaryTermCreate(BaseModel):
term: str
pronunciation: Optional[str] = ""
# ==================== Phase 7: Workflow Pydantic Models ====================
class WorkflowCreate(BaseModel):
name: str = Field(..., description="工作流名称")
description: str = Field(default="", description="工作流描述")
workflow_type: str = Field(..., description="工作流类型: auto_analyze, auto_align, auto_relation, scheduled_report, custom")
project_id: str = Field(..., description="所属项目ID")
schedule: Optional[str] = Field(default=None, description="调度表达式(cron或分钟数)")
schedule_type: str = Field(default="manual", description="调度类型: manual, cron, interval")
config: Dict = Field(default_factory=dict, description="工作流配置")
webhook_ids: List[str] = Field(default_factory=list, description="关联的Webhook ID列表")
class WorkflowUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
status: Optional[str] = None # active, paused, error, completed
schedule: Optional[str] = None
schedule_type: Optional[str] = None
is_active: Optional[bool] = None
config: Optional[Dict] = None
webhook_ids: Optional[List[str]] = None
class WorkflowResponse(BaseModel):
id: str
name: str
description: str
workflow_type: str
project_id: str
status: str
schedule: Optional[str]
schedule_type: str
config: Dict
webhook_ids: List[str]
is_active: bool
created_at: str
updated_at: str
last_run_at: Optional[str]
next_run_at: Optional[str]
run_count: int
success_count: int
fail_count: int
class WorkflowListResponse(BaseModel):
workflows: List[WorkflowResponse]
total: int
class WorkflowTaskCreate(BaseModel):
name: str = Field(..., description="任务名称")
task_type: str = Field(..., description="任务类型: analyze, align, discover_relations, notify, custom")
config: Dict = Field(default_factory=dict, description="任务配置")
order: int = Field(default=0, description="执行顺序")
depends_on: List[str] = Field(default_factory=list, description="依赖的任务ID列表")
timeout_seconds: int = Field(default=300, description="超时时间(秒)")
retry_count: int = Field(default=3, description="重试次数")
retry_delay: int = Field(default=5, description="重试延迟(秒)")
class WorkflowTaskUpdate(BaseModel):
name: Optional[str] = None
task_type: Optional[str] = None
config: Optional[Dict] = None
order: Optional[int] = None
depends_on: Optional[List[str]] = None
timeout_seconds: Optional[int] = None
retry_count: Optional[int] = None
retry_delay: Optional[int] = None
class WorkflowTaskResponse(BaseModel):
id: str
workflow_id: str
name: str
task_type: str
config: Dict
order: int
depends_on: List[str]
timeout_seconds: int
retry_count: int
retry_delay: int
created_at: str
updated_at: str
class WebhookCreate(BaseModel):
name: str = Field(..., description="Webhook名称")
webhook_type: str = Field(..., description="Webhook类型: feishu, dingtalk, slack, custom")
url: str = Field(..., description="Webhook URL")
secret: str = Field(default="", description="签名密钥")
headers: Dict = Field(default_factory=dict, description="自定义请求头")
template: str = Field(default="", description="消息模板")
class WebhookUpdate(BaseModel):
name: Optional[str] = None
webhook_type: Optional[str] = None
url: Optional[str] = None
secret: Optional[str] = None
headers: Optional[Dict] = None
template: Optional[str] = None
is_active: Optional[bool] = None
class WebhookResponse(BaseModel):
id: str
name: str
webhook_type: str
url: str
headers: Dict
template: str
is_active: bool
created_at: str
updated_at: str
last_used_at: Optional[str]
success_count: int
fail_count: int
class WebhookListResponse(BaseModel):
webhooks: List[WebhookResponse]
total: int
class WorkflowLogResponse(BaseModel):
id: str
workflow_id: str
task_id: Optional[str]
status: str
start_time: Optional[str]
end_time: Optional[str]
duration_ms: int
input_data: Dict
output_data: Dict
error_message: str
created_at: str
class WorkflowLogListResponse(BaseModel):
logs: List[WorkflowLogResponse]
total: int
class WorkflowTriggerRequest(BaseModel):
input_data: Dict = Field(default_factory=dict, description="工作流输入数据")
class WorkflowTriggerResponse(BaseModel):
success: bool
workflow_id: str
log_id: str
results: Dict
duration_ms: int
class WorkflowStatsResponse(BaseModel):
total: int
success: int
failed: int
success_rate: float
avg_duration_ms: float
daily: List[Dict]
# API Keys
KIMI_API_KEY = os.getenv("KIMI_API_KEY", "")
KIMI_BASE_URL = os.getenv("KIMI_BASE_URL", "https://api.kimi.com/coding")
@@ -3208,8 +3389,8 @@ async def health_check():
async def system_status():
"""系统状态信息"""
status = {
"version": "0.6.0",
"phase": "Phase 6 - API Platform",
"version": "0.7.0",
"phase": "Phase 7 - Workflow Automation",
"features": {
"database": DB_AVAILABLE,
"oss": OSS_AVAILABLE,
@@ -3219,6 +3400,7 @@ async def system_status():
"export": EXPORT_AVAILABLE,
"api_keys": API_KEY_AVAILABLE,
"rate_limiting": RATE_LIMITER_AVAILABLE,
"workflow": WORKFLOW_AVAILABLE,
},
"api": {
"documentation": "/docs",
@@ -3230,6 +3412,443 @@ async def system_status():
return status
# ==================== Phase 7: Workflow Automation Endpoints ====================
class WorkflowCreateRequest(BaseModel):
"""创建工作流请求"""
name: str = Field(..., description="工作流名称")
task_type: str = Field(..., description="工作流类型: auto_analyze, auto_align, auto_relation, custom")
config: Dict = Field(default={}, description="工作流配置")
trigger_type: str = Field(default="manual", description="触发方式: schedule, event, manual")
schedule: Optional[str] = Field(default=None, description="定时规则 (Cron 表达式或间隔秒数)")
project_id: Optional[str] = Field(default=None, description="关联项目ID")
enabled: bool = Field(default=True, description="是否启用")
class WorkflowResponse(BaseModel):
"""工作流响应"""
id: str
name: str
task_type: str
config: Dict
trigger_type: str
schedule: Optional[str]
project_id: Optional[str]
enabled: bool
created_at: str
updated_at: str
last_run_at: Optional[str]
run_count: int
fail_count: int
class WorkflowLogResponse(BaseModel):
"""工作流日志响应"""
id: str
workflow_id: str
status: str
started_at: str
completed_at: Optional[str]
result: Optional[Dict]
error_message: Optional[str]
created_at: str
class WebhookCreateRequest(BaseModel):
"""创建 Webhook 请求"""
name: str = Field(..., description="Webhook 名称")
webhook_type: str = Field(..., description="类型: feishu, dingtalk, slack, custom")
url: str = Field(..., description="Webhook URL")
secret: Optional[str] = Field(default=None, description="密钥")
headers: Dict = Field(default={}, description="自定义请求头")
project_id: Optional[str] = Field(default=None, description="关联项目ID")
events: List[str] = Field(default=[], description="订阅的事件列表")
enabled: bool = Field(default=True, description="是否启用")
class WebhookResponse(BaseModel):
"""Webhook 响应"""
id: str
name: str
webhook_type: str
url: str
headers: Dict
project_id: Optional[str]
events: List[str]
enabled: bool
created_at: str
updated_at: str
@app.post("/api/v1/workflows", response_model=WorkflowResponse, tags=["Workflows"])
async def create_workflow(request: WorkflowCreateRequest, _=Depends(verify_api_key)):
"""
创建工作流
工作流类型:
- **auto_analyze**: 自动分析新上传的文件
- **auto_align**: 自动实体对齐
- **auto_relation**: 自动关系发现
- **custom**: 自定义工作流
触发方式:
- **manual**: 手动触发
- **schedule**: 定时触发 (需要设置 schedule 字段)
- **event**: 事件触发
定时规则示例:
- `cron:0 9 * * *` - 每天上午9点
- `interval:3600` - 每小时执行一次
- `60` - 每60分钟执行一次
"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
try:
workflow = await manager.create_workflow(
name=request.name,
task_type=WorkflowType(request.task_type),
config=request.config,
trigger_type=WorkflowTrigger(request.trigger_type),
schedule=request.schedule,
project_id=request.project_id,
enabled=request.enabled
)
return WorkflowResponse(
id=workflow.id,
name=workflow.name,
task_type=workflow.task_type.value,
config=workflow.config,
trigger_type=workflow.trigger_type.value,
schedule=workflow.schedule,
project_id=workflow.project_id,
enabled=workflow.enabled,
created_at=workflow.created_at.isoformat(),
updated_at=workflow.updated_at.isoformat(),
last_run_at=workflow.last_run_at.isoformat() if workflow.last_run_at else None,
run_count=workflow.run_count,
fail_count=workflow.fail_count
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/api/v1/workflows", response_model=List[WorkflowResponse], tags=["Workflows"])
async def list_workflows(project_id: Optional[str] = None, _=Depends(verify_api_key)):
"""获取工作流列表"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
workflows = await manager.get_workflows(project_id)
return [
WorkflowResponse(
id=w.id,
name=w.name,
task_type=w.task_type.value,
config=w.config,
trigger_type=w.trigger_type.value,
schedule=w.schedule,
project_id=w.project_id,
enabled=w.enabled,
created_at=w.created_at.isoformat(),
updated_at=w.updated_at.isoformat(),
last_run_at=w.last_run_at.isoformat() if w.last_run_at else None,
run_count=w.run_count,
fail_count=w.fail_count
)
for w in workflows
]
@app.get("/api/v1/workflows/{workflow_id}", response_model=WorkflowResponse, tags=["Workflows"])
async def get_workflow(workflow_id: str, _=Depends(verify_api_key)):
"""获取单个工作流详情"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
workflow = await manager.get_workflow(workflow_id)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
return WorkflowResponse(
id=workflow.id,
name=workflow.name,
task_type=workflow.task_type.value,
config=workflow.config,
trigger_type=workflow.trigger_type.value,
schedule=w.schedule,
project_id=workflow.project_id,
enabled=workflow.enabled,
created_at=workflow.created_at.isoformat(),
updated_at=workflow.updated_at.isoformat(),
last_run_at=workflow.last_run_at.isoformat() if workflow.last_run_at else None,
run_count=workflow.run_count,
fail_count=workflow.fail_count
)
@app.patch("/api/v1/workflows/{workflow_id}", response_model=WorkflowResponse, tags=["Workflows"])
async def update_workflow(workflow_id: str, request: WorkflowCreateRequest, _=Depends(verify_api_key)):
"""更新工作流"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
update_data = request.dict(exclude_unset=True)
workflow = await manager.update_workflow(workflow_id, **update_data)
if not workflow:
raise HTTPException(status_code=404, detail="Workflow not found")
return WorkflowResponse(
id=workflow.id,
name=workflow.name,
task_type=workflow.task_type.value,
config=workflow.config,
trigger_type=workflow.trigger_type.value,
schedule=workflow.schedule,
project_id=workflow.project_id,
enabled=workflow.enabled,
created_at=workflow.created_at.isoformat(),
updated_at=workflow.updated_at.isoformat(),
last_run_at=workflow.last_run_at.isoformat() if workflow.last_run_at else None,
run_count=workflow.run_count,
fail_count=workflow.fail_count
)
@app.delete("/api/v1/workflows/{workflow_id}", tags=["Workflows"])
async def delete_workflow(workflow_id: str, _=Depends(verify_api_key)):
"""删除工作流"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
success = await manager.delete_workflow(workflow_id)
if not success:
raise HTTPException(status_code=404, detail="Workflow not found")
return {"message": "Workflow deleted successfully"}
@app.post("/api/v1/workflows/{workflow_id}/trigger", response_model=WorkflowLogResponse, tags=["Workflows"])
async def trigger_workflow(workflow_id: str, _=Depends(verify_api_key)):
"""手动触发工作流"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
log = await manager.trigger_workflow(workflow_id)
if not log:
raise HTTPException(status_code=404, detail="Workflow not found")
return WorkflowLogResponse(
id=log.id,
workflow_id=log.workflow_id,
status=log.status.value,
started_at=log.started_at.isoformat(),
completed_at=log.completed_at.isoformat() if log.completed_at else None,
result=log.result,
error_message=log.error_message,
created_at=log.created_at.isoformat()
)
@app.get("/api/v1/workflows/{workflow_id}/logs", response_model=List[WorkflowLogResponse], tags=["Workflows"])
async def get_workflow_logs(workflow_id: str, limit: int = 50, _=Depends(verify_api_key)):
"""获取工作流执行日志"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
logs = await manager.get_workflow_logs(workflow_id, limit)
return [
WorkflowLogResponse(
id=log.id,
workflow_id=log.workflow_id,
status=log.status.value,
started_at=log.started_at.isoformat(),
completed_at=log.completed_at.isoformat() if log.completed_at else None,
result=log.result,
error_message=log.error_message,
created_at=log.created_at.isoformat()
)
for log in logs
]
# Webhook Endpoints
@app.post("/api/v1/webhooks", response_model=WebhookResponse, tags=["Workflows"])
async def create_webhook(request: WebhookCreateRequest, _=Depends(verify_api_key)):
"""
创建 Webhook 配置
Webhook 类型:
- **feishu**: 飞书机器人
- **dingtalk**: 钉钉机器人
- **slack**: Slack Incoming Webhook
- **custom**: 自定义 Webhook
事件类型:
- **workflow_completed**: 工作流完成
- **workflow_failed**: 工作流失败
- **entity_created**: 实体创建
- **relation_created**: 关系创建
"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
try:
webhook = await manager.create_webhook(
name=request.name,
webhook_type=WebhookType(request.webhook_type),
url=request.url,
secret=request.secret,
headers=request.headers,
project_id=request.project_id,
events=request.events
)
return WebhookResponse(
id=webhook.id,
name=webhook.name,
webhook_type=webhook.webhook_type.value,
url=webhook.url,
headers=webhook.headers,
project_id=webhook.project_id,
events=webhook.events,
enabled=webhook.enabled,
created_at=webhook.created_at.isoformat(),
updated_at=webhook.updated_at.isoformat()
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/api/v1/webhooks", response_model=List[WebhookResponse], tags=["Workflows"])
async def list_webhooks(project_id: Optional[str] = None, _=Depends(verify_api_key)):
"""获取 Webhook 列表"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
webhooks = await manager.get_webhooks(project_id)
return [
WebhookResponse(
id=w.id,
name=w.name,
webhook_type=w.webhook_type.value,
url=w.url,
headers=w.headers,
project_id=w.project_id,
events=w.events,
enabled=w.enabled,
created_at=w.created_at.isoformat(),
updated_at=w.updated_at.isoformat()
)
for w in webhooks
]
@app.get("/api/v1/webhooks/{webhook_id}", response_model=WebhookResponse, tags=["Workflows"])
async def get_webhook(webhook_id: str, _=Depends(verify_api_key)):
"""获取单个 Webhook 详情"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
webhook = await manager.get_webhook(webhook_id)
if not webhook:
raise HTTPException(status_code=404, detail="Webhook not found")
return WebhookResponse(
id=webhook.id,
name=webhook.name,
webhook_type=webhook.webhook_type.value,
url=webhook.url,
headers=webhook.headers,
project_id=webhook.project_id,
events=webhook.events,
enabled=webhook.enabled,
created_at=webhook.created_at.isoformat(),
updated_at=webhook.updated_at.isoformat()
)
@app.patch("/api/v1/webhooks/{webhook_id}", response_model=WebhookResponse, tags=["Workflows"])
async def update_webhook(webhook_id: str, request: WebhookCreateRequest, _=Depends(verify_api_key)):
"""更新 Webhook 配置"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
update_data = request.dict(exclude_unset=True)
webhook = await manager.update_webhook(webhook_id, **update_data)
if not webhook:
raise HTTPException(status_code=404, detail="Webhook not found")
return WebhookResponse(
id=webhook.id,
name=webhook.name,
webhook_type=webhook.webhook_type.value,
url=webhook.url,
headers=webhook.headers,
project_id=webhook.project_id,
events=webhook.events,
enabled=webhook.enabled,
created_at=webhook.created_at.isoformat(),
updated_at=webhook.updated_at.isoformat()
)
@app.delete("/api/v1/webhooks/{webhook_id}", tags=["Workflows"])
async def delete_webhook(webhook_id: str, _=Depends(verify_api_key)):
"""删除 Webhook 配置"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
success = await manager.delete_webhook(webhook_id)
if not success:
raise HTTPException(status_code=404, detail="Webhook not found")
return {"message": "Webhook deleted successfully"}
@app.post("/api/v1/webhooks/{webhook_id}/test", tags=["Workflows"])
async def test_webhook(webhook_id: str, _=Depends(verify_api_key)):
"""测试 Webhook 配置"""
if not WORKFLOW_AVAILABLE:
raise HTTPException(status_code=503, detail="Workflow automation not available")
manager = get_workflow_manager()
success = await manager.test_webhook(webhook_id)
if success:
return {"message": "Webhook test sent successfully"}
else:
raise HTTPException(status_code=400, detail="Webhook test failed")
@app.get("/api/v1/openapi.json", include_in_schema=False)
async def get_openapi():
"""获取 OpenAPI 规范"""

View File

@@ -33,3 +33,9 @@ neo4j==5.15.0
# API Documentation (Swagger/OpenAPI)
fastapi-offline-swagger==0.1.0
# Workflow Automation
apscheduler==3.10.4
# Phase 7: Workflow Automation
apscheduler==3.10.4

View File

@@ -178,8 +178,145 @@ CREATE INDEX IF NOT EXISTS idx_entity_attributes_entity ON entity_attributes(ent
CREATE INDEX IF NOT EXISTS idx_entity_attributes_template ON entity_attributes(template_id);
CREATE INDEX IF NOT EXISTS idx_attr_history_entity ON attribute_history(entity_id);
-- Phase 5: 属性相关索引
CREATE INDEX IF NOT EXISTS idx_attr_templates_project ON attribute_templates(project_id);
CREATE INDEX IF NOT EXISTS idx_entity_attributes_entity ON entity_attributes(entity_id);
CREATE INDEX IF NOT EXISTS idx_entity_attributes_template ON entity_attributes(template_id);
CREATE INDEX IF NOT EXISTS idx_attr_history_entity ON attribute_history(entity_id);
-- Phase 7: 工作流相关表
-- 工作流配置表
CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
workflow_type TEXT NOT NULL, -- auto_analyze, auto_align, auto_relation, scheduled_report, custom
project_id TEXT NOT NULL,
status TEXT DEFAULT 'active', -- active, paused, error, completed
schedule TEXT, -- cron expression or interval minutes
schedule_type TEXT DEFAULT 'manual', -- manual, cron, interval
config TEXT, -- JSON: workflow specific configuration
webhook_ids TEXT, -- JSON array of webhook config IDs
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
run_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 工作流任务表
CREATE TABLE IF NOT EXISTS workflow_tasks (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
name TEXT NOT NULL,
task_type TEXT NOT NULL, -- analyze, align, discover_relations, notify, custom
config TEXT, -- JSON: task specific configuration
task_order INTEGER DEFAULT 0,
depends_on TEXT, -- JSON array of task IDs
timeout_seconds INTEGER DEFAULT 300,
retry_count INTEGER DEFAULT 3,
retry_delay INTEGER DEFAULT 5,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE
);
-- Webhook 配置表
CREATE TABLE IF NOT EXISTS webhook_configs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
webhook_type TEXT NOT NULL, -- feishu, dingtalk, slack, custom
url TEXT NOT NULL,
secret TEXT, -- for signature verification
headers TEXT, -- JSON: custom headers
template TEXT, -- message template
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP,
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0
);
-- 工作流执行日志表
CREATE TABLE IF NOT EXISTS workflow_logs (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
task_id TEXT, -- NULL if workflow-level log
status TEXT DEFAULT 'pending', -- pending, running, success, failed, cancelled
start_time TIMESTAMP,
end_time TIMESTAMP,
duration_ms INTEGER,
input_data TEXT, -- JSON: input parameters
output_data TEXT, -- JSON: execution results
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE,
FOREIGN KEY (task_id) REFERENCES workflow_tasks(id) ON DELETE SET NULL
);
-- Phase 7: 工作流相关索引
CREATE INDEX IF NOT EXISTS idx_workflows_project ON workflows(project_id);
CREATE INDEX IF NOT EXISTS idx_workflows_status ON workflows(status);
CREATE INDEX IF NOT EXISTS idx_workflows_type ON workflows(workflow_type);
CREATE INDEX IF NOT EXISTS idx_workflow_tasks_workflow ON workflow_tasks(workflow_id);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_workflow ON workflow_logs(workflow_id);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_task ON workflow_logs(task_id);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_status ON workflow_logs(status);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_created ON workflow_logs(created_at);
-- Phase 7: 工作流自动化表
-- 工作流配置表
CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
task_type TEXT NOT NULL, -- auto_analyze, auto_align, auto_relation, custom
config TEXT, -- JSON 配置
trigger_type TEXT DEFAULT 'manual', -- schedule, event, manual
schedule TEXT, -- Cron 表达式或间隔
project_id TEXT,
enabled INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_run_at TIMESTAMP,
next_run_at TIMESTAMP,
run_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- 工作流执行日志表
CREATE TABLE IF NOT EXISTS workflow_logs (
id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
task_id TEXT NOT NULL,
status TEXT NOT NULL, -- pending, running, success, failed, cancelled
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
result TEXT, -- JSON 结果
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (workflow_id) REFERENCES workflows(id)
);
-- Webhook 配置表
CREATE TABLE IF NOT EXISTS webhook_configs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
webhook_type TEXT NOT NULL, -- feishu, dingtalk, slack, custom
url TEXT NOT NULL,
secret TEXT,
headers TEXT, -- JSON 格式
project_id TEXT,
events TEXT, -- JSON 数组,如 ["workflow_completed", "workflow_failed"]
enabled INTEGER DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id)
);
-- Phase 7: 工作流相关索引
CREATE INDEX IF NOT EXISTS idx_workflows_project ON workflows(project_id);
CREATE INDEX IF NOT EXISTS idx_workflows_enabled ON workflows(enabled);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_workflow ON workflow_logs(workflow_id);
CREATE INDEX IF NOT EXISTS idx_workflow_logs_created ON workflow_logs(created_at);
CREATE INDEX IF NOT EXISTS idx_webhook_configs_project ON webhook_configs(project_id);

1488
backend/workflow_manager.py Normal file

File diff suppressed because it is too large Load Diff