Files
insightflow/backend/ops_manager.py
OpenClaw Bot 2aded2de48 Phase 8: 完成 AI 能力增强、运营与增长工具、开发者生态、运维与监控
- Task 4: AI 能力增强 (ai_manager.py)
  - 自定义模型训练(领域特定实体识别)
  - 多模态大模型集成(GPT-4V、Claude 3、Gemini、Kimi-VL)
  - 知识图谱 RAG 智能问答
  - 智能摘要(提取式/生成式/关键点/时间线)
  - 预测性分析(趋势/异常/增长/演变预测)

- Task 5: 运营与增长工具 (growth_manager.py)
  - 用户行为分析(Mixpanel/Amplitude 集成)
  - A/B 测试框架
  - 邮件营销自动化
  - 推荐系统(邀请返利、团队升级激励)

- Task 6: 开发者生态 (developer_ecosystem_manager.py)
  - SDK 发布管理(Python/JavaScript/Go)
  - 模板市场
  - 插件市场
  - 开发者文档与示例代码

- Task 8: 运维与监控 (ops_manager.py)
  - 实时告警系统(PagerDuty/Opsgenie 集成)
  - 容量规划与自动扩缩容
  - 灾备与故障转移
  - 成本优化

Phase 8 全部 8 个任务已完成!
2026-02-27 00:01:40 +08:00

2731 lines
100 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Operations & Monitoring Manager - Phase 8 Task 8
运维与监控管理模块
- 实时告警系统(规则配置、多渠道通知、告警分级、抑制聚合)
- 容量规划与自动扩缩容(资源监控、容量预测、自动扩缩容策略)
- 灾备与故障转移(多活架构、健康检查、自动故障转移、数据备份恢复)
- 成本优化(资源利用率监控、成本分析、闲置资源识别、优化建议)
作者: InsightFlow Team
"""
import os
import json
import sqlite3
import httpx
import asyncio
import hashlib
import uuid
import re
import time
import statistics
from typing import List, Dict, Optional, Any, Tuple, Callable
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum
from collections import defaultdict
import threading
# Database path
DB_PATH = os.path.join(os.path.dirname(__file__), "insightflow.db")
class AlertSeverity(str, Enum):
"""告警严重级别 P0-P3"""
P0 = "p0" # 紧急 - 系统不可用,需要立即处理
P1 = "p1" # 严重 - 核心功能受损需要1小时内处理
P2 = "p2" # 一般 - 部分功能受影响需要4小时内处理
P3 = "p3" # 轻微 - 非核心功能问题24小时内处理
class AlertStatus(str, Enum):
"""告警状态"""
FIRING = "firing" # 正在告警
RESOLVED = "resolved" # 已恢复
ACKNOWLEDGED = "acknowledged" # 已确认
SUPPRESSED = "suppressed" # 已抑制
class AlertChannelType(str, Enum):
"""告警渠道类型"""
PAGERDUTY = "pagerduty"
OPSGENIE = "opsgenie"
FEISHU = "feishu"
DINGTALK = "dingtalk"
SLACK = "slack"
EMAIL = "email"
SMS = "sms"
WEBHOOK = "webhook"
class AlertRuleType(str, Enum):
"""告警规则类型"""
THRESHOLD = "threshold" # 阈值告警
ANOMALY = "anomaly" # 异常检测
PREDICTIVE = "predictive" # 预测性告警
COMPOSITE = "composite" # 复合告警
class ResourceType(str, Enum):
"""资源类型"""
CPU = "cpu"
MEMORY = "memory"
DISK = "disk"
NETWORK = "network"
GPU = "gpu"
DATABASE = "database"
CACHE = "cache"
QUEUE = "queue"
class ScalingAction(str, Enum):
"""扩缩容动作"""
SCALE_UP = "scale_up" # 扩容
SCALE_DOWN = "scale_down" # 缩容
MAINTAIN = "maintain" # 保持
class HealthStatus(str, Enum):
"""健康状态"""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
class BackupStatus(str, Enum):
"""备份状态"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
VERIFIED = "verified"
@dataclass
class AlertRule:
"""告警规则"""
id: str
tenant_id: str
name: str
description: str
rule_type: AlertRuleType
severity: AlertSeverity
metric: str # 监控指标
condition: str # 条件: >, <, ==, >=, <=, !=
threshold: float
duration: int # 持续时间(秒)
evaluation_interval: int # 评估间隔(秒)
channels: List[str] # 告警渠道ID列表
labels: Dict[str, str] # 标签
annotations: Dict[str, str] # 注释
is_enabled: bool
created_at: str
updated_at: str
created_by: str
@dataclass
class AlertChannel:
"""告警渠道配置"""
id: str
tenant_id: str
name: str
channel_type: AlertChannelType
config: Dict # 渠道特定配置
severity_filter: List[str] # 过滤的告警级别
is_enabled: bool
success_count: int
fail_count: int
last_used_at: Optional[str]
created_at: str
updated_at: str
@dataclass
class Alert:
"""告警实例"""
id: str
rule_id: str
tenant_id: str
severity: AlertSeverity
status: AlertStatus
title: str
description: str
metric: str
value: float
threshold: float
labels: Dict[str, str]
annotations: Dict[str, str]
started_at: str
resolved_at: Optional[str]
acknowledged_by: Optional[str]
acknowledged_at: Optional[str]
notification_sent: Dict[str, bool] # 渠道发送状态
suppression_count: int # 抑制计数
@dataclass
class AlertSuppressionRule:
"""告警抑制规则"""
id: str
tenant_id: str
name: str
matchers: Dict[str, str] # 匹配条件
duration: int # 抑制持续时间(秒)
is_regex: bool # 是否使用正则匹配
created_at: str
expires_at: Optional[str]
@dataclass
class AlertGroup:
"""告警聚合组"""
id: str
tenant_id: str
group_key: str # 聚合键
alerts: List[str] # 告警ID列表
created_at: str
updated_at: str
@dataclass
class ResourceMetric:
"""资源指标"""
id: str
tenant_id: str
resource_type: ResourceType
resource_id: str
metric_name: str
metric_value: float
unit: str
timestamp: str
metadata: Dict
@dataclass
class CapacityPlan:
"""容量规划"""
id: str
tenant_id: str
resource_type: ResourceType
current_capacity: float
predicted_capacity: float
prediction_date: str
confidence: float
recommended_action: str
estimated_cost: float
created_at: str
@dataclass
class AutoScalingPolicy:
"""自动扩缩容策略"""
id: str
tenant_id: str
name: str
resource_type: ResourceType
min_instances: int
max_instances: int
target_utilization: float # 目标利用率
scale_up_threshold: float
scale_down_threshold: float
scale_up_step: int
scale_down_step: int
cooldown_period: int # 冷却时间(秒)
is_enabled: bool
created_at: str
updated_at: str
@dataclass
class ScalingEvent:
"""扩缩容事件"""
id: str
policy_id: str
tenant_id: str
action: ScalingAction
from_count: int
to_count: int
reason: str
triggered_by: str # 触发来源: manual, auto, scheduled
status: str # pending, in_progress, completed, failed
started_at: str
completed_at: Optional[str]
error_message: Optional[str]
@dataclass
class HealthCheck:
"""健康检查配置"""
id: str
tenant_id: str
name: str
target_type: str # service, database, api, etc.
target_id: str
check_type: str # http, tcp, ping, custom
check_config: Dict # 检查配置
interval: int # 检查间隔(秒)
timeout: int # 超时时间(秒)
retry_count: int
healthy_threshold: int
unhealthy_threshold: int
is_enabled: bool
created_at: str
updated_at: str
@dataclass
class HealthCheckResult:
"""健康检查结果"""
id: str
check_id: str
tenant_id: str
status: HealthStatus
response_time: float # 响应时间(毫秒)
message: str
details: Dict
checked_at: str
@dataclass
class FailoverConfig:
"""故障转移配置"""
id: str
tenant_id: str
name: str
primary_region: str
secondary_regions: List[str] # 备用区域列表
failover_trigger: str # 触发条件
auto_failover: bool
failover_timeout: int # 故障转移超时(秒)
health_check_id: str
is_enabled: bool
created_at: str
updated_at: str
@dataclass
class FailoverEvent:
"""故障转移事件"""
id: str
config_id: str
tenant_id: str
from_region: str
to_region: str
reason: str
status: str # initiated, in_progress, completed, failed, rolled_back
started_at: str
completed_at: Optional[str]
rolled_back_at: Optional[str]
@dataclass
class BackupJob:
"""备份任务"""
id: str
tenant_id: str
name: str
backup_type: str # full, incremental, differential
target_type: str # database, files, configuration
target_id: str
schedule: str # cron 表达式
retention_days: int
encryption_enabled: bool
compression_enabled: bool
storage_location: str
is_enabled: bool
created_at: str
updated_at: str
@dataclass
class BackupRecord:
"""备份记录"""
id: str
job_id: str
tenant_id: str
status: BackupStatus
size_bytes: int
checksum: str
started_at: str
completed_at: Optional[str]
verified_at: Optional[str]
error_message: Optional[str]
storage_path: str
@dataclass
class CostReport:
"""成本报告"""
id: str
tenant_id: str
report_period: str # YYYY-MM
total_cost: float
currency: str
breakdown: Dict[str, float] # 按资源类型分解
trends: Dict # 趋势数据
anomalies: List[Dict] # 异常检测
created_at: str
@dataclass
class ResourceUtilization:
"""资源利用率"""
id: str
tenant_id: str
resource_type: ResourceType
resource_id: str
utilization_rate: float # 0-1
peak_utilization: float
avg_utilization: float
idle_time_percent: float
report_date: str
recommendations: List[str]
@dataclass
class IdleResource:
"""闲置资源"""
id: str
tenant_id: str
resource_type: ResourceType
resource_id: str
resource_name: str
idle_since: str
estimated_monthly_cost: float
currency: str
reason: str
recommendation: str
detected_at: str
@dataclass
class CostOptimizationSuggestion:
"""成本优化建议"""
id: str
tenant_id: str
category: str # resource_rightsize, reserved_instances, spot_instances, etc.
title: str
description: str
potential_savings: float
currency: str
confidence: float
difficulty: str # easy, medium, hard
implementation_steps: List[str]
risk_level: str # low, medium, high
is_applied: bool
created_at: str
applied_at: Optional[str]
class OpsManager:
"""运维与监控管理主类"""
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
self._alert_evaluators: Dict[str, Callable] = {}
self._running = False
self._evaluator_thread = None
self._register_default_evaluators()
def _get_db(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _register_default_evaluators(self):
"""注册默认的告警评估器"""
self._alert_evaluators[AlertRuleType.THRESHOLD.value] = self._evaluate_threshold_rule
self._alert_evaluators[AlertRuleType.ANOMALY.value] = self._evaluate_anomaly_rule
self._alert_evaluators[AlertRuleType.PREDICTIVE.value] = self._evaluate_predictive_rule
# ==================== 告警规则管理 ====================
def create_alert_rule(self, tenant_id: str, name: str, description: str,
rule_type: AlertRuleType, severity: AlertSeverity,
metric: str, condition: str, threshold: float,
duration: int, evaluation_interval: int,
channels: List[str], labels: Dict, annotations: Dict,
created_by: str) -> AlertRule:
"""创建告警规则"""
rule_id = f"ar_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
rule = AlertRule(
id=rule_id,
tenant_id=tenant_id,
name=name,
description=description,
rule_type=rule_type,
severity=severity,
metric=metric,
condition=condition,
threshold=threshold,
duration=duration,
evaluation_interval=evaluation_interval,
channels=channels,
labels=labels or {},
annotations=annotations or {},
is_enabled=True,
created_at=now,
updated_at=now,
created_by=created_by
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO alert_rules
(id, tenant_id, name, description, rule_type, severity, metric, condition,
threshold, duration, evaluation_interval, channels, labels, annotations,
is_enabled, created_at, updated_at, created_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (rule.id, rule.tenant_id, rule.name, rule.description,
rule.rule_type.value, rule.severity.value, rule.metric, rule.condition,
rule.threshold, rule.duration, rule.evaluation_interval,
json.dumps(rule.channels), json.dumps(rule.labels), json.dumps(rule.annotations),
rule.is_enabled, rule.created_at, rule.updated_at, rule.created_by))
conn.commit()
return rule
def get_alert_rule(self, rule_id: str) -> Optional[AlertRule]:
"""获取告警规则"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM alert_rules WHERE id = ?",
(rule_id,)
).fetchone()
if row:
return self._row_to_alert_rule(row)
return None
def list_alert_rules(self, tenant_id: str, is_enabled: Optional[bool] = None) -> List[AlertRule]:
"""列出租户的所有告警规则"""
query = "SELECT * FROM alert_rules WHERE tenant_id = ?"
params = [tenant_id]
if is_enabled is not None:
query += " AND is_enabled = ?"
params.append(1 if is_enabled else 0)
query += " ORDER BY created_at DESC"
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_alert_rule(row) for row in rows]
def update_alert_rule(self, rule_id: str, **kwargs) -> Optional[AlertRule]:
"""更新告警规则"""
allowed_fields = ['name', 'description', 'severity', 'metric', 'condition',
'threshold', 'duration', 'evaluation_interval', 'channels',
'labels', 'annotations', 'is_enabled']
updates = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not updates:
return self.get_alert_rule(rule_id)
# 处理列表和字典字段
if 'channels' in updates:
updates['channels'] = json.dumps(updates['channels'])
if 'labels' in updates:
updates['labels'] = json.dumps(updates['labels'])
if 'annotations' in updates:
updates['annotations'] = json.dumps(updates['annotations'])
if 'severity' in updates and isinstance(updates['severity'], AlertSeverity):
updates['severity'] = updates['severity'].value
updates['updated_at'] = datetime.now().isoformat()
with self._get_db() as conn:
set_clause = ", ".join([f"{k} = ?" for k in updates.keys()])
conn.execute(
f"UPDATE alert_rules SET {set_clause} WHERE id = ?",
list(updates.values()) + [rule_id]
)
conn.commit()
return self.get_alert_rule(rule_id)
def delete_alert_rule(self, rule_id: str) -> bool:
"""删除告警规则"""
with self._get_db() as conn:
conn.execute("DELETE FROM alert_rules WHERE id = ?", (rule_id,))
conn.commit()
return conn.total_changes > 0
# ==================== 告警渠道管理 ====================
def create_alert_channel(self, tenant_id: str, name: str,
channel_type: AlertChannelType, config: Dict,
severity_filter: List[str] = None) -> AlertChannel:
"""创建告警渠道"""
channel_id = f"ac_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
channel = AlertChannel(
id=channel_id,
tenant_id=tenant_id,
name=name,
channel_type=channel_type,
config=config,
severity_filter=severity_filter or [s.value for s in AlertSeverity],
is_enabled=True,
success_count=0,
fail_count=0,
last_used_at=None,
created_at=now,
updated_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO alert_channels
(id, tenant_id, name, channel_type, config, severity_filter,
is_enabled, success_count, fail_count, last_used_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (channel.id, channel.tenant_id, channel.name, channel.channel_type.value,
json.dumps(channel.config), json.dumps(channel.severity_filter),
channel.is_enabled, channel.success_count, channel.fail_count,
channel.last_used_at, channel.created_at, channel.updated_at))
conn.commit()
return channel
def get_alert_channel(self, channel_id: str) -> Optional[AlertChannel]:
"""获取告警渠道"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM alert_channels WHERE id = ?",
(channel_id,)
).fetchone()
if row:
return self._row_to_alert_channel(row)
return None
def list_alert_channels(self, tenant_id: str) -> List[AlertChannel]:
"""列出租户的所有告警渠道"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM alert_channels WHERE tenant_id = ? ORDER BY created_at DESC",
(tenant_id,)
).fetchall()
return [self._row_to_alert_channel(row) for row in rows]
def test_alert_channel(self, channel_id: str) -> bool:
"""测试告警渠道"""
channel = self.get_alert_channel(channel_id)
if not channel:
return False
test_alert = Alert(
id="test",
rule_id="test",
tenant_id=channel.tenant_id,
severity=AlertSeverity.P3,
status=AlertStatus.FIRING,
title="测试告警",
description="这是一条测试告警消息,用于验证告警渠道配置。",
metric="test_metric",
value=0.0,
threshold=0.0,
labels={"test": "true"},
annotations={},
started_at=datetime.now().isoformat(),
resolved_at=None,
acknowledged_by=None,
acknowledged_at=None,
notification_sent={},
suppression_count=0
)
return asyncio.run(self._send_alert_to_channel(test_alert, channel))
# ==================== 告警评估与触发 ====================
def _evaluate_threshold_rule(self, rule: AlertRule, metrics: List[ResourceMetric]) -> bool:
"""评估阈值告警规则"""
if not metrics:
return False
# 获取最近 duration 秒内的指标
cutoff_time = datetime.now() - timedelta(seconds=rule.duration)
recent_metrics = [
m for m in metrics
if datetime.fromisoformat(m.timestamp) > cutoff_time
]
if not recent_metrics:
return False
# 计算平均值
avg_value = statistics.mean([m.metric_value for m in recent_metrics])
# 评估条件
condition_map = {
'>': lambda x, y: x > y,
'<': lambda x, y: x < y,
'>=': lambda x, y: x >= y,
'<=': lambda x, y: x <= y,
'==': lambda x, y: x == y,
'!=': lambda x, y: x != y,
}
evaluator = condition_map.get(rule.condition)
if evaluator:
return evaluator(avg_value, rule.threshold)
return False
def _evaluate_anomaly_rule(self, rule: AlertRule, metrics: List[ResourceMetric]) -> bool:
"""评估异常检测规则(基于标准差)"""
if len(metrics) < 10:
return False
values = [m.metric_value for m in metrics]
mean = statistics.mean(values)
std = statistics.stdev(values) if len(values) > 1 else 0
if std == 0:
return False
# 最近值偏离均值超过3个标准差视为异常
latest_value = values[-1]
z_score = abs(latest_value - mean) / std
return z_score > 3.0
def _evaluate_predictive_rule(self, rule: AlertRule, metrics: List[ResourceMetric]) -> bool:
"""评估预测性告警规则(基于线性趋势)"""
if len(metrics) < 5:
return False
# 简单的线性趋势预测
values = [m.metric_value for m in metrics[-10:]] # 最近10个点
n = len(values)
if n < 2:
return False
x = list(range(n))
mean_x = sum(x) / n
mean_y = sum(values) / n
# 计算斜率
numerator = sum((x[i] - mean_x) * (values[i] - mean_y) for i in range(n))
denominator = sum((x[i] - mean_x) ** 2 for i in range(n))
slope = numerator / denominator if denominator != 0 else 0
# 预测下一个值
predicted = values[-1] + slope
# 如果预测值超过阈值,触发告警
condition_map = {
'>': lambda x, y: x > y,
'<': lambda x, y: x < y,
}
evaluator = condition_map.get(rule.condition)
if evaluator:
return evaluator(predicted, rule.threshold)
return False
async def evaluate_alert_rules(self, tenant_id: str):
"""评估所有告警规则"""
rules = self.list_alert_rules(tenant_id, is_enabled=True)
for rule in rules:
# 获取相关指标
metrics = self.get_recent_metrics(tenant_id, rule.metric,
seconds=rule.duration + rule.evaluation_interval)
# 评估规则
evaluator = self._alert_evaluators.get(rule.rule_type.value)
if evaluator and evaluator(rule, metrics):
# 触发告警
await self._trigger_alert(rule, metrics[-1] if metrics else None)
async def _trigger_alert(self, rule: AlertRule, metric: Optional[ResourceMetric]):
"""触发告警"""
# 检查是否已有相同告警在触发中
existing = self.get_active_alert_by_rule(rule.id)
if existing:
# 更新抑制计数
self._increment_suppression_count(existing.id)
return
# 检查抑制规则
if self._is_alert_suppressed(rule):
return
alert_id = f"al_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
alert = Alert(
id=alert_id,
rule_id=rule.id,
tenant_id=rule.tenant_id,
severity=rule.severity,
status=AlertStatus.FIRING,
title=rule.annotations.get('summary', f"告警: {rule.name}"),
description=rule.annotations.get('description', rule.description),
metric=rule.metric,
value=metric.metric_value if metric else 0.0,
threshold=rule.threshold,
labels=rule.labels,
annotations=rule.annotations,
started_at=now,
resolved_at=None,
acknowledged_by=None,
acknowledged_at=None,
notification_sent={},
suppression_count=0
)
# 保存告警
with self._get_db() as conn:
conn.execute("""
INSERT INTO alerts
(id, rule_id, tenant_id, severity, status, title, description,
metric, value, threshold, labels, annotations, started_at,
notification_sent, suppression_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (alert.id, alert.rule_id, alert.tenant_id, alert.severity.value,
alert.status.value, alert.title, alert.description,
alert.metric, alert.value, alert.threshold,
json.dumps(alert.labels), json.dumps(alert.annotations),
alert.started_at, json.dumps(alert.notification_sent),
alert.suppression_count))
conn.commit()
# 发送告警通知
await self._send_alert_notifications(alert, rule)
async def _send_alert_notifications(self, alert: Alert, rule: AlertRule):
"""发送告警通知到所有配置的渠道"""
channels = []
for channel_id in rule.channels:
channel = self.get_alert_channel(channel_id)
if channel and channel.is_enabled:
channels.append(channel)
for channel in channels:
# 检查严重级别过滤
if alert.severity.value not in channel.severity_filter:
continue
success = await self._send_alert_to_channel(alert, channel)
# 更新发送状态
alert.notification_sent[channel.id] = success
self._update_alert_notification_status(alert.id, channel.id, success)
async def _send_alert_to_channel(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送告警到指定渠道"""
try:
if channel.channel_type == AlertChannelType.FEISHU:
return await self._send_feishu_alert(alert, channel)
elif channel.channel_type == AlertChannelType.DINGTALK:
return await self._send_dingtalk_alert(alert, channel)
elif channel.channel_type == AlertChannelType.SLACK:
return await self._send_slack_alert(alert, channel)
elif channel.channel_type == AlertChannelType.EMAIL:
return await self._send_email_alert(alert, channel)
elif channel.channel_type == AlertChannelType.PAGERDUTY:
return await self._send_pagerduty_alert(alert, channel)
elif channel.channel_type == AlertChannelType.OPSGENIE:
return await self._send_opsgenie_alert(alert, channel)
elif channel.channel_type == AlertChannelType.WEBHOOK:
return await self._send_webhook_alert(alert, channel)
else:
return False
except Exception as e:
print(f"Failed to send alert to {channel.name}: {e}")
return False
async def _send_feishu_alert(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送飞书告警"""
config = channel.config
webhook_url = config.get('webhook_url')
secret = config.get('secret', '')
if not webhook_url:
return False
# 构建飞书消息
severity_colors = {
AlertSeverity.P0.value: "red",
AlertSeverity.P1.value: "orange",
AlertSeverity.P2.value: "yellow",
AlertSeverity.P3.value: "blue"
}
message = {
"msg_type": "interactive",
"card": {
"config": {"wide_screen_mode": True},
"header": {
"title": {
"tag": "plain_text",
"content": f"🚨 [{alert.severity.value.upper()}] {alert.title}"
},
"template": severity_colors.get(alert.severity.value, "blue")
},
"elements": [
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": f"**描述:** {alert.description}\n\n**指标:** {alert.metric}\n**当前值:** {alert.value}\n**阈值:** {alert.threshold}"
}
},
{
"tag": "div",
"text": {
"tag": "lark_md",
"content": f"**时间:** {alert.started_at}"
}
}
]
}
}
async with httpx.AsyncClient() as client:
response = await client.post(webhook_url, json=message, timeout=30.0)
success = response.status_code == 200
if success:
self._update_channel_stats(channel.id, success=True)
else:
self._update_channel_stats(channel.id, success=False)
return success
async def _send_dingtalk_alert(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送钉钉告警"""
config = channel.config
webhook_url = config.get('webhook_url')
secret = config.get('secret', '')
if not webhook_url:
return False
# 构建钉钉消息
message = {
"msgtype": "markdown",
"markdown": {
"title": f"[{alert.severity.value.upper()}] {alert.title}",
"text": f"## 🚨 [{alert.severity.value.upper()}] {alert.title}\n\n" +
f"**描述:** {alert.description}\n\n" +
f"**指标:** {alert.metric}\n" +
f"**当前值:** {alert.value}\n" +
f"**阈值:** {alert.threshold}\n\n" +
f"**时间:** {alert.started_at}"
}
}
async with httpx.AsyncClient() as client:
response = await client.post(webhook_url, json=message, timeout=30.0)
success = response.status_code == 200
self._update_channel_stats(channel.id, success)
return success
async def _send_slack_alert(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送 Slack 告警"""
config = channel.config
webhook_url = config.get('webhook_url')
if not webhook_url:
return False
severity_emojis = {
AlertSeverity.P0.value: "🔴",
AlertSeverity.P1.value: "🟠",
AlertSeverity.P2.value: "🟡",
AlertSeverity.P3.value: "🔵"
}
emoji = severity_emojis.get(alert.severity.value, "")
message = {
"text": f"{emoji} [{alert.severity.value.upper()}] {alert.title}",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} [{alert.severity.value.upper()}] {alert.title}"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*描述:*\n{alert.description}"},
{"type": "mrkdwn", "text": f"*指标:*\n{alert.metric}"},
{"type": "mrkdwn", "text": f"*当前值:*\n{alert.value}"},
{"type": "mrkdwn", "text": f"*阈值:*\n{alert.threshold}"}
]
},
{
"type": "context",
"elements": [
{"type": "mrkdwn", "text": f"触发时间: {alert.started_at}"}
]
}
]
}
async with httpx.AsyncClient() as client:
response = await client.post(webhook_url, json=message, timeout=30.0)
success = response.status_code == 200
self._update_channel_stats(channel.id, success)
return success
async def _send_email_alert(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送邮件告警(模拟实现)"""
# 实际实现需要集成邮件服务如 SendGrid、AWS SES 等
config = channel.config
smtp_host = config.get('smtp_host')
smtp_port = config.get('smtp_port', 587)
username = config.get('username')
password = config.get('password')
to_addresses = config.get('to_addresses', [])
if not all([smtp_host, username, password, to_addresses]):
return False
# 这里模拟发送成功
self._update_channel_stats(channel.id, True)
return True
async def _send_pagerduty_alert(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送 PagerDuty 告警"""
config = channel.config
integration_key = config.get('integration_key')
if not integration_key:
return False
severity_map = {
AlertSeverity.P0.value: "critical",
AlertSeverity.P1.value: "error",
AlertSeverity.P2.value: "warning",
AlertSeverity.P3.value: "info"
}
message = {
"routing_key": integration_key,
"event_action": "trigger",
"dedup_key": alert.id,
"payload": {
"summary": alert.title,
"severity": severity_map.get(alert.severity.value, "warning"),
"source": alert.labels.get('instance', 'unknown'),
"custom_details": {
"description": alert.description,
"metric": alert.metric,
"value": alert.value,
"threshold": alert.threshold
}
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=message,
timeout=30.0
)
success = response.status_code == 202
self._update_channel_stats(channel.id, success)
return success
async def _send_opsgenie_alert(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送 Opsgenie 告警"""
config = channel.config
api_key = config.get('api_key')
if not api_key:
return False
priority_map = {
AlertSeverity.P0.value: "P1",
AlertSeverity.P1.value: "P2",
AlertSeverity.P2.value: "P3",
AlertSeverity.P3.value: "P4"
}
message = {
"message": alert.title,
"description": alert.description,
"priority": priority_map.get(alert.severity.value, "P3"),
"alias": alert.id,
"details": {
"metric": alert.metric,
"value": str(alert.value),
"threshold": str(alert.threshold)
}
}
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.opsgenie.com/v2/alerts",
json=message,
headers={"Authorization": f"GenieKey {api_key}"},
timeout=30.0
)
success = response.status_code in [200, 201, 202]
self._update_channel_stats(channel.id, success)
return success
async def _send_webhook_alert(self, alert: Alert, channel: AlertChannel) -> bool:
"""发送 Webhook 告警"""
config = channel.config
webhook_url = config.get('webhook_url')
headers = config.get('headers', {})
if not webhook_url:
return False
message = {
"alert_id": alert.id,
"severity": alert.severity.value,
"status": alert.status.value,
"title": alert.title,
"description": alert.description,
"metric": alert.metric,
"value": alert.value,
"threshold": alert.threshold,
"labels": alert.labels,
"started_at": alert.started_at
}
async with httpx.AsyncClient() as client:
response = await client.post(
webhook_url,
json=message,
headers=headers,
timeout=30.0
)
success = response.status_code in [200, 201, 202]
self._update_channel_stats(channel.id, success)
return success
# ==================== 告警查询与管理 ====================
def get_active_alert_by_rule(self, rule_id: str) -> Optional[Alert]:
"""获取规则对应的活跃告警"""
with self._get_db() as conn:
row = conn.execute(
"""SELECT * FROM alerts
WHERE rule_id = ? AND status = ?
ORDER BY started_at DESC LIMIT 1""",
(rule_id, AlertStatus.FIRING.value)
).fetchone()
if row:
return self._row_to_alert(row)
return None
def get_alert(self, alert_id: str) -> Optional[Alert]:
"""获取告警详情"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM alerts WHERE id = ?",
(alert_id,)
).fetchone()
if row:
return self._row_to_alert(row)
return None
def list_alerts(self, tenant_id: str, status: Optional[AlertStatus] = None,
severity: Optional[AlertSeverity] = None,
limit: int = 100) -> List[Alert]:
"""列出租户的告警"""
query = "SELECT * FROM alerts WHERE tenant_id = ?"
params = [tenant_id]
if status:
query += " AND status = ?"
params.append(status.value)
if severity:
query += " AND severity = ?"
params.append(severity.value)
query += " ORDER BY started_at DESC LIMIT ?"
params.append(limit)
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_alert(row) for row in rows]
def acknowledge_alert(self, alert_id: str, user_id: str) -> Optional[Alert]:
"""确认告警"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute("""
UPDATE alerts
SET status = ?, acknowledged_by = ?, acknowledged_at = ?
WHERE id = ?
""", (AlertStatus.ACKNOWLEDGED.value, user_id, now, alert_id))
conn.commit()
return self.get_alert(alert_id)
def resolve_alert(self, alert_id: str) -> Optional[Alert]:
"""解决告警"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute("""
UPDATE alerts
SET status = ?, resolved_at = ?
WHERE id = ?
""", (AlertStatus.RESOLVED.value, now, alert_id))
conn.commit()
return self.get_alert(alert_id)
def _increment_suppression_count(self, alert_id: str):
"""增加告警抑制计数"""
with self._get_db() as conn:
conn.execute("""
UPDATE alerts
SET suppression_count = suppression_count + 1
WHERE id = ?
""", (alert_id,))
conn.commit()
def _update_alert_notification_status(self, alert_id: str, channel_id: str, success: bool):
"""更新告警通知状态"""
with self._get_db() as conn:
row = conn.execute(
"SELECT notification_sent FROM alerts WHERE id = ?",
(alert_id,)
).fetchone()
if row:
notification_sent = json.loads(row['notification_sent'])
notification_sent[channel_id] = success
conn.execute(
"UPDATE alerts SET notification_sent = ? WHERE id = ?",
(json.dumps(notification_sent), alert_id)
)
conn.commit()
def _update_channel_stats(self, channel_id: str, success: bool):
"""更新渠道统计"""
now = datetime.now().isoformat()
with self._get_db() as conn:
if success:
conn.execute("""
UPDATE alert_channels
SET success_count = success_count + 1, last_used_at = ?
WHERE id = ?
""", (now, channel_id))
else:
conn.execute("""
UPDATE alert_channels
SET fail_count = fail_count + 1, last_used_at = ?
WHERE id = ?
""", (now, channel_id))
conn.commit()
# ==================== 告警抑制与聚合 ====================
def create_suppression_rule(self, tenant_id: str, name: str,
matchers: Dict[str, str], duration: int,
is_regex: bool = False,
expires_at: Optional[str] = None) -> AlertSuppressionRule:
"""创建告警抑制规则"""
rule_id = f"sr_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
rule = AlertSuppressionRule(
id=rule_id,
tenant_id=tenant_id,
name=name,
matchers=matchers,
duration=duration,
is_regex=is_regex,
created_at=now,
expires_at=expires_at
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO alert_suppression_rules
(id, tenant_id, name, matchers, duration, is_regex, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (rule.id, rule.tenant_id, rule.name, json.dumps(rule.matchers),
rule.duration, rule.is_regex, rule.created_at, rule.expires_at))
conn.commit()
return rule
def _is_alert_suppressed(self, rule: AlertRule) -> bool:
"""检查告警是否被抑制"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM alert_suppression_rules WHERE tenant_id = ?",
(rule.tenant_id,)
).fetchall()
for row in rows:
suppression_rule = self._row_to_suppression_rule(row)
# 检查是否过期
if suppression_rule.expires_at:
if datetime.now() > datetime.fromisoformat(suppression_rule.expires_at):
continue
# 检查匹配
matchers = suppression_rule.matchers
match = True
for key, pattern in matchers.items():
value = rule.labels.get(key, '')
if suppression_rule.is_regex:
if not re.match(pattern, value):
match = False
break
else:
if value != pattern:
match = False
break
if match:
return True
return False
# ==================== 资源监控 ====================
def record_resource_metric(self, tenant_id: str, resource_type: ResourceType,
resource_id: str, metric_name: str,
metric_value: float, unit: str,
metadata: Dict = None) -> ResourceMetric:
"""记录资源指标"""
metric_id = f"rm_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
metric = ResourceMetric(
id=metric_id,
tenant_id=tenant_id,
resource_type=resource_type,
resource_id=resource_id,
metric_name=metric_name,
metric_value=metric_value,
unit=unit,
timestamp=now,
metadata=metadata or {}
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO resource_metrics
(id, tenant_id, resource_type, resource_id, metric_name,
metric_value, unit, timestamp, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (metric.id, metric.tenant_id, metric.resource_type.value,
metric.resource_id, metric.metric_name, metric.metric_value,
metric.unit, metric.timestamp, json.dumps(metric.metadata)))
conn.commit()
return metric
def get_recent_metrics(self, tenant_id: str, metric_name: str,
seconds: int = 3600) -> List[ResourceMetric]:
"""获取最近的指标数据"""
cutoff_time = (datetime.now() - timedelta(seconds=seconds)).isoformat()
with self._get_db() as conn:
rows = conn.execute(
"""SELECT * FROM resource_metrics
WHERE tenant_id = ? AND metric_name = ? AND timestamp > ?
ORDER BY timestamp DESC""",
(tenant_id, metric_name, cutoff_time)
).fetchall()
return [self._row_to_resource_metric(row) for row in rows]
def get_resource_metrics(self, tenant_id: str, resource_type: ResourceType,
resource_id: str, metric_name: str,
start_time: str, end_time: str) -> List[ResourceMetric]:
"""获取指定资源的指标数据"""
with self._get_db() as conn:
rows = conn.execute(
"""SELECT * FROM resource_metrics
WHERE tenant_id = ? AND resource_type = ? AND resource_id = ?
AND metric_name = ? AND timestamp BETWEEN ? AND ?
ORDER BY timestamp ASC""",
(tenant_id, resource_type.value, resource_id, metric_name, start_time, end_time)
).fetchall()
return [self._row_to_resource_metric(row) for row in rows]
# ==================== 容量规划 ====================
def create_capacity_plan(self, tenant_id: str, resource_type: ResourceType,
current_capacity: float, prediction_date: str,
confidence: float = 0.8) -> CapacityPlan:
"""创建容量规划"""
plan_id = f"cp_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
# 基于历史数据预测
metrics = self.get_recent_metrics(tenant_id, f"{resource_type.value}_usage", seconds=30*24*3600)
if metrics:
values = [m.metric_value for m in metrics]
trend = self._calculate_trend(values)
# 预测未来容量需求
days_ahead = (datetime.fromisoformat(prediction_date) - datetime.now()).days
predicted_capacity = current_capacity * (1 + trend * days_ahead / 30)
# 推荐操作
if predicted_capacity > current_capacity * 1.2:
recommended_action = "scale_up"
estimated_cost = (predicted_capacity - current_capacity) * 10 # 简化计算
elif predicted_capacity < current_capacity * 0.5:
recommended_action = "scale_down"
estimated_cost = 0
else:
recommended_action = "maintain"
estimated_cost = 0
else:
predicted_capacity = current_capacity
recommended_action = "insufficient_data"
estimated_cost = 0
plan = CapacityPlan(
id=plan_id,
tenant_id=tenant_id,
resource_type=resource_type,
current_capacity=current_capacity,
predicted_capacity=predicted_capacity,
prediction_date=prediction_date,
confidence=confidence,
recommended_action=recommended_action,
estimated_cost=estimated_cost,
created_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO capacity_plans
(id, tenant_id, resource_type, current_capacity, predicted_capacity,
prediction_date, confidence, recommended_action, estimated_cost, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (plan.id, plan.tenant_id, plan.resource_type.value,
plan.current_capacity, plan.predicted_capacity,
plan.prediction_date, plan.confidence,
plan.recommended_action, plan.estimated_cost, plan.created_at))
conn.commit()
return plan
def _calculate_trend(self, values: List[float]) -> float:
"""计算趋势(增长率)"""
if len(values) < 2:
return 0.0
# 使用最近的数据计算趋势
recent = values[-10:] if len(values) > 10 else values
n = len(recent)
if n < 2:
return 0.0
# 简单线性回归计算斜率
x = list(range(n))
mean_x = sum(x) / n
mean_y = sum(recent) / n
numerator = sum((x[i] - mean_x) * (recent[i] - mean_y) for i in range(n))
denominator = sum((x[i] - mean_x) ** 2 for i in range(n))
slope = numerator / denominator if denominator != 0 else 0
# 归一化为增长率
if mean_y != 0:
return slope / mean_y
return 0.0
def get_capacity_plans(self, tenant_id: str) -> List[CapacityPlan]:
"""获取容量规划列表"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM capacity_plans WHERE tenant_id = ? ORDER BY created_at DESC",
(tenant_id,)
).fetchall()
return [self._row_to_capacity_plan(row) for row in rows]
# ==================== 自动扩缩容 ====================
def create_auto_scaling_policy(self, tenant_id: str, name: str,
resource_type: ResourceType, min_instances: int,
max_instances: int, target_utilization: float,
scale_up_threshold: float, scale_down_threshold: float,
scale_up_step: int = 1, scale_down_step: int = 1,
cooldown_period: int = 300) -> AutoScalingPolicy:
"""创建自动扩缩容策略"""
policy_id = f"asp_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
policy = AutoScalingPolicy(
id=policy_id,
tenant_id=tenant_id,
name=name,
resource_type=resource_type,
min_instances=min_instances,
max_instances=max_instances,
target_utilization=target_utilization,
scale_up_threshold=scale_up_threshold,
scale_down_threshold=scale_down_threshold,
scale_up_step=scale_up_step,
scale_down_step=scale_down_step,
cooldown_period=cooldown_period,
is_enabled=True,
created_at=now,
updated_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO auto_scaling_policies
(id, tenant_id, name, resource_type, min_instances, max_instances,
target_utilization, scale_up_threshold, scale_down_threshold,
scale_up_step, scale_down_step, cooldown_period, is_enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (policy.id, policy.tenant_id, policy.name, policy.resource_type.value,
policy.min_instances, policy.max_instances, policy.target_utilization,
policy.scale_up_threshold, policy.scale_down_threshold,
policy.scale_up_step, policy.scale_down_step, policy.cooldown_period,
policy.is_enabled, policy.created_at, policy.updated_at))
conn.commit()
return policy
def get_auto_scaling_policy(self, policy_id: str) -> Optional[AutoScalingPolicy]:
"""获取自动扩缩容策略"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM auto_scaling_policies WHERE id = ?",
(policy_id,)
).fetchone()
if row:
return self._row_to_auto_scaling_policy(row)
return None
def list_auto_scaling_policies(self, tenant_id: str) -> List[AutoScalingPolicy]:
"""列出租户的自动扩缩容策略"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM auto_scaling_policies WHERE tenant_id = ? ORDER BY created_at DESC",
(tenant_id,)
).fetchall()
return [self._row_to_auto_scaling_policy(row) for row in rows]
def evaluate_scaling_policy(self, policy_id: str, current_instances: int,
current_utilization: float) -> Optional[ScalingEvent]:
"""评估扩缩容策略"""
policy = self.get_auto_scaling_policy(policy_id)
if not policy or not policy.is_enabled:
return None
# 检查是否在冷却期
last_event = self.get_last_scaling_event(policy_id)
if last_event:
last_time = datetime.fromisoformat(last_event.started_at)
if (datetime.now() - last_time).total_seconds() < policy.cooldown_period:
return None
action = None
reason = ""
if current_utilization > policy.scale_up_threshold:
if current_instances < policy.max_instances:
action = ScalingAction.SCALE_UP
reason = f"利用率 {current_utilization:.1%} 超过扩容阈值 {policy.scale_up_threshold:.1%}"
elif current_utilization < policy.scale_down_threshold:
if current_instances > policy.min_instances:
action = ScalingAction.SCALE_DOWN
reason = f"利用率 {current_utilization:.1%} 低于缩容阈值 {policy.scale_down_threshold:.1%}"
if action:
if action == ScalingAction.SCALE_UP:
new_count = min(current_instances + policy.scale_up_step, policy.max_instances)
else:
new_count = max(current_instances - policy.scale_down_step, policy.min_instances)
return self._create_scaling_event(policy, action, current_instances, new_count, reason)
return None
def _create_scaling_event(self, policy: AutoScalingPolicy, action: ScalingAction,
from_count: int, to_count: int, reason: str) -> ScalingEvent:
"""创建扩缩容事件"""
event_id = f"se_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
event = ScalingEvent(
id=event_id,
policy_id=policy.id,
tenant_id=policy.tenant_id,
action=action,
from_count=from_count,
to_count=to_count,
reason=reason,
triggered_by="auto",
status="pending",
started_at=now,
completed_at=None,
error_message=None
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO scaling_events
(id, policy_id, tenant_id, action, from_count, to_count, reason,
triggered_by, status, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (event.id, event.policy_id, event.tenant_id, event.action.value,
event.from_count, event.to_count, event.reason,
event.triggered_by, event.status, event.started_at))
conn.commit()
return event
def get_last_scaling_event(self, policy_id: str) -> Optional[ScalingEvent]:
"""获取最近的扩缩容事件"""
with self._get_db() as conn:
row = conn.execute(
"""SELECT * FROM scaling_events
WHERE policy_id = ?
ORDER BY started_at DESC LIMIT 1""",
(policy_id,)
).fetchone()
if row:
return self._row_to_scaling_event(row)
return None
def update_scaling_event_status(self, event_id: str, status: str,
error_message: str = None) -> Optional[ScalingEvent]:
"""更新扩缩容事件状态"""
now = datetime.now().isoformat()
with self._get_db() as conn:
if status in ['completed', 'failed']:
conn.execute("""
UPDATE scaling_events
SET status = ?, completed_at = ?, error_message = ?
WHERE id = ?
""", (status, now, error_message, event_id))
else:
conn.execute("""
UPDATE scaling_events
SET status = ?, error_message = ?
WHERE id = ?
""", (status, error_message, event_id))
conn.commit()
return self.get_scaling_event(event_id)
def get_scaling_event(self, event_id: str) -> Optional[ScalingEvent]:
"""获取扩缩容事件"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM scaling_events WHERE id = ?",
(event_id,)
).fetchone()
if row:
return self._row_to_scaling_event(row)
return None
def list_scaling_events(self, tenant_id: str, policy_id: str = None,
limit: int = 100) -> List[ScalingEvent]:
"""列出租户的扩缩容事件"""
query = "SELECT * FROM scaling_events WHERE tenant_id = ?"
params = [tenant_id]
if policy_id:
query += " AND policy_id = ?"
params.append(policy_id)
query += " ORDER BY started_at DESC LIMIT ?"
params.append(limit)
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_scaling_event(row) for row in rows]
# ==================== 健康检查与故障转移 ====================
def create_health_check(self, tenant_id: str, name: str, target_type: str,
target_id: str, check_type: str, check_config: Dict,
interval: int = 60, timeout: int = 10,
retry_count: int = 3) -> HealthCheck:
"""创建健康检查"""
check_id = f"hc_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
check = HealthCheck(
id=check_id,
tenant_id=tenant_id,
name=name,
target_type=target_type,
target_id=target_id,
check_type=check_type,
check_config=check_config,
interval=interval,
timeout=timeout,
retry_count=retry_count,
healthy_threshold=2,
unhealthy_threshold=3,
is_enabled=True,
created_at=now,
updated_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO health_checks
(id, tenant_id, name, target_type, target_id, check_type, check_config,
interval, timeout, retry_count, healthy_threshold, unhealthy_threshold,
is_enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (check.id, check.tenant_id, check.name, check.target_type,
check.target_id, check.check_type, json.dumps(check.check_config),
check.interval, check.timeout, check.retry_count,
check.healthy_threshold, check.unhealthy_threshold,
check.is_enabled, check.created_at, check.updated_at))
conn.commit()
return check
def get_health_check(self, check_id: str) -> Optional[HealthCheck]:
"""获取健康检查配置"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM health_checks WHERE id = ?",
(check_id,)
).fetchone()
if row:
return self._row_to_health_check(row)
return None
def list_health_checks(self, tenant_id: str) -> List[HealthCheck]:
"""列出租户的健康检查"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM health_checks WHERE tenant_id = ? ORDER BY created_at DESC",
(tenant_id,)
).fetchall()
return [self._row_to_health_check(row) for row in rows]
async def execute_health_check(self, check_id: str) -> HealthCheckResult:
"""执行健康检查"""
check = self.get_health_check(check_id)
if not check:
raise ValueError(f"Health check {check_id} not found")
result_id = f"hcr_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
# 模拟健康检查(实际实现需要根据 check_type 执行具体检查)
if check.check_type == 'http':
status, response_time, message = await self._check_http_health(check)
elif check.check_type == 'tcp':
status, response_time, message = await self._check_tcp_health(check)
elif check.check_type == 'ping':
status, response_time, message = await self._check_ping_health(check)
else:
status, response_time, message = HealthStatus.UNKNOWN, 0, "Unknown check type"
result = HealthCheckResult(
id=result_id,
check_id=check_id,
tenant_id=check.tenant_id,
status=status,
response_time=response_time,
message=message,
details={},
checked_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO health_check_results
(id, check_id, tenant_id, status, response_time, message, details, checked_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (result.id, result.check_id, result.tenant_id, result.status.value,
result.response_time, result.message, json.dumps(result.details),
result.checked_at))
conn.commit()
return result
async def _check_http_health(self, check: HealthCheck) -> Tuple[HealthStatus, float, str]:
"""HTTP 健康检查"""
config = check.check_config
url = config.get('url')
expected_status = config.get('expected_status', 200)
if not url:
return HealthStatus.UNHEALTHY, 0, "URL not configured"
start_time = time.time()
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=check.timeout)
response_time = (time.time() - start_time) * 1000
if response.status_code == expected_status:
return HealthStatus.HEALTHY, response_time, "OK"
else:
return HealthStatus.DEGRADED, response_time, f"Unexpected status: {response.status_code}"
except Exception as e:
return HealthStatus.UNHEALTHY, (time.time() - start_time) * 1000, str(e)
async def _check_tcp_health(self, check: HealthCheck) -> Tuple[HealthStatus, float, str]:
"""TCP 健康检查"""
config = check.check_config
host = config.get('host')
port = config.get('port')
if not host or not port:
return HealthStatus.UNHEALTHY, 0, "Host or port not configured"
start_time = time.time()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=check.timeout
)
response_time = (time.time() - start_time) * 1000
writer.close()
await writer.wait_closed()
return HealthStatus.HEALTHY, response_time, "TCP connection successful"
except asyncio.TimeoutError:
return HealthStatus.UNHEALTHY, (time.time() - start_time) * 1000, "Connection timeout"
except Exception as e:
return HealthStatus.UNHEALTHY, (time.time() - start_time) * 1000, str(e)
async def _check_ping_health(self, check: HealthCheck) -> Tuple[HealthStatus, float, str]:
"""Ping 健康检查(模拟)"""
config = check.check_config
host = config.get('host')
if not host:
return HealthStatus.UNHEALTHY, 0, "Host not configured"
# 实际实现需要使用系统 ping 命令或 ICMP 库
# 这里模拟成功
return HealthStatus.HEALTHY, 10.0, "Ping successful"
def get_health_check_results(self, check_id: str, limit: int = 100) -> List[HealthCheckResult]:
"""获取健康检查历史结果"""
with self._get_db() as conn:
rows = conn.execute(
"""SELECT * FROM health_check_results
WHERE check_id = ?
ORDER BY checked_at DESC LIMIT ?""",
(check_id, limit)
).fetchall()
return [self._row_to_health_check_result(row) for row in rows]
# ==================== 故障转移 ====================
def create_failover_config(self, tenant_id: str, name: str, primary_region: str,
secondary_regions: List[str], failover_trigger: str,
auto_failover: bool = False, failover_timeout: int = 300,
health_check_id: str = None) -> FailoverConfig:
"""创建故障转移配置"""
config_id = f"fc_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
config = FailoverConfig(
id=config_id,
tenant_id=tenant_id,
name=name,
primary_region=primary_region,
secondary_regions=secondary_regions,
failover_trigger=failover_trigger,
auto_failover=auto_failover,
failover_timeout=failover_timeout,
health_check_id=health_check_id,
is_enabled=True,
created_at=now,
updated_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO failover_configs
(id, tenant_id, name, primary_region, secondary_regions, failover_trigger,
auto_failover, failover_timeout, health_check_id, is_enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (config.id, config.tenant_id, config.name, config.primary_region,
json.dumps(config.secondary_regions), config.failover_trigger,
config.auto_failover, config.failover_timeout, config.health_check_id,
config.is_enabled, config.created_at, config.updated_at))
conn.commit()
return config
def get_failover_config(self, config_id: str) -> Optional[FailoverConfig]:
"""获取故障转移配置"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM failover_configs WHERE id = ?",
(config_id,)
).fetchone()
if row:
return self._row_to_failover_config(row)
return None
def list_failover_configs(self, tenant_id: str) -> List[FailoverConfig]:
"""列出租户的故障转移配置"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM failover_configs WHERE tenant_id = ? ORDER BY created_at DESC",
(tenant_id,)
).fetchall()
return [self._row_to_failover_config(row) for row in rows]
def initiate_failover(self, config_id: str, reason: str) -> Optional[FailoverEvent]:
"""发起故障转移"""
config = self.get_failover_config(config_id)
if not config or not config.is_enabled:
return None
event_id = f"fe_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
# 选择备用区域
to_region = config.secondary_regions[0] if config.secondary_regions else None
if not to_region:
return None
event = FailoverEvent(
id=event_id,
config_id=config_id,
tenant_id=config.tenant_id,
from_region=config.primary_region,
to_region=to_region,
reason=reason,
status="initiated",
started_at=now,
completed_at=None,
rolled_back_at=None
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO failover_events
(id, config_id, tenant_id, from_region, to_region, reason, status, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (event.id, event.config_id, event.tenant_id, event.from_region,
event.to_region, event.reason, event.status, event.started_at))
conn.commit()
return event
def update_failover_status(self, event_id: str, status: str) -> Optional[FailoverEvent]:
"""更新故障转移状态"""
now = datetime.now().isoformat()
with self._get_db() as conn:
if status == 'completed':
conn.execute("""
UPDATE failover_events
SET status = ?, completed_at = ?
WHERE id = ?
""", (status, now, event_id))
elif status == 'rolled_back':
conn.execute("""
UPDATE failover_events
SET status = ?, rolled_back_at = ?
WHERE id = ?
""", (status, now, event_id))
else:
conn.execute("""
UPDATE failover_events
SET status = ?
WHERE id = ?
""", (status, event_id))
conn.commit()
return self.get_failover_event(event_id)
def get_failover_event(self, event_id: str) -> Optional[FailoverEvent]:
"""获取故障转移事件"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM failover_events WHERE id = ?",
(event_id,)
).fetchone()
if row:
return self._row_to_failover_event(row)
return None
def list_failover_events(self, tenant_id: str, limit: int = 100) -> List[FailoverEvent]:
"""列出租户的故障转移事件"""
with self._get_db() as conn:
rows = conn.execute(
"""SELECT * FROM failover_events
WHERE tenant_id = ?
ORDER BY started_at DESC LIMIT ?""",
(tenant_id, limit)
).fetchall()
return [self._row_to_failover_event(row) for row in rows]
# ==================== 数据备份与恢复 ====================
def create_backup_job(self, tenant_id: str, name: str, backup_type: str,
target_type: str, target_id: str, schedule: str,
retention_days: int = 30, encryption_enabled: bool = True,
compression_enabled: bool = True,
storage_location: str = None) -> BackupJob:
"""创建备份任务"""
job_id = f"bj_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
job = BackupJob(
id=job_id,
tenant_id=tenant_id,
name=name,
backup_type=backup_type,
target_type=target_type,
target_id=target_id,
schedule=schedule,
retention_days=retention_days,
encryption_enabled=encryption_enabled,
compression_enabled=compression_enabled,
storage_location=storage_location or f"backups/{tenant_id}",
is_enabled=True,
created_at=now,
updated_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO backup_jobs
(id, tenant_id, name, backup_type, target_type, target_id, schedule,
retention_days, encryption_enabled, compression_enabled, storage_location,
is_enabled, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (job.id, job.tenant_id, job.name, job.backup_type, job.target_type,
job.target_id, job.schedule, job.retention_days, job.encryption_enabled,
job.compression_enabled, job.storage_location, job.is_enabled,
job.created_at, job.updated_at))
conn.commit()
return job
def get_backup_job(self, job_id: str) -> Optional[BackupJob]:
"""获取备份任务"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM backup_jobs WHERE id = ?",
(job_id,)
).fetchone()
if row:
return self._row_to_backup_job(row)
return None
def list_backup_jobs(self, tenant_id: str) -> List[BackupJob]:
"""列出租户的备份任务"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM backup_jobs WHERE tenant_id = ? ORDER BY created_at DESC",
(tenant_id,)
).fetchall()
return [self._row_to_backup_job(row) for row in rows]
def execute_backup(self, job_id: str) -> Optional[BackupRecord]:
"""执行备份"""
job = self.get_backup_job(job_id)
if not job or not job.is_enabled:
return None
record_id = f"br_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
record = BackupRecord(
id=record_id,
job_id=job_id,
tenant_id=job.tenant_id,
status=BackupStatus.IN_PROGRESS,
size_bytes=0,
checksum="",
started_at=now,
completed_at=None,
verified_at=None,
error_message=None,
storage_path=f"{job.storage_location}/{record_id}"
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO backup_records
(id, job_id, tenant_id, status, size_bytes, checksum, started_at, storage_path)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (record.id, record.job_id, record.tenant_id, record.status.value,
record.size_bytes, record.checksum, record.started_at, record.storage_path))
conn.commit()
# 异步执行备份(实际实现中应该启动后台任务)
# 这里模拟备份完成
self._complete_backup(record_id, size_bytes=1024*1024*100) # 模拟100MB
return record
def _complete_backup(self, record_id: str, size_bytes: int, checksum: str = None):
"""完成备份"""
now = datetime.now().isoformat()
checksum = checksum or hashlib.sha256(str(time.time()).encode()).hexdigest()[:16]
with self._get_db() as conn:
conn.execute("""
UPDATE backup_records
SET status = ?, size_bytes = ?, checksum = ?, completed_at = ?
WHERE id = ?
""", (BackupStatus.COMPLETED.value, size_bytes, checksum, now, record_id))
conn.commit()
def get_backup_record(self, record_id: str) -> Optional[BackupRecord]:
"""获取备份记录"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM backup_records WHERE id = ?",
(record_id,)
).fetchone()
if row:
return self._row_to_backup_record(row)
return None
def list_backup_records(self, tenant_id: str, job_id: str = None,
limit: int = 100) -> List[BackupRecord]:
"""列出租户的备份记录"""
query = "SELECT * FROM backup_records WHERE tenant_id = ?"
params = [tenant_id]
if job_id:
query += " AND job_id = ?"
params.append(job_id)
query += " ORDER BY started_at DESC LIMIT ?"
params.append(limit)
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_backup_record(row) for row in rows]
def restore_from_backup(self, record_id: str) -> bool:
"""从备份恢复"""
record = self.get_backup_record(record_id)
if not record or record.status != BackupStatus.COMPLETED:
return False
# 实际实现中执行恢复操作
# 这里模拟成功
return True
# ==================== 成本优化 ====================
def generate_cost_report(self, tenant_id: str, year: int, month: int) -> CostReport:
"""生成成本报告"""
report_id = f"cr_{uuid.uuid4().hex[:16]}"
report_period = f"{year:04d}-{month:02d}"
now = datetime.now().isoformat()
# 获取资源利用率数据
utilizations = self.get_resource_utilizations(tenant_id, report_period)
# 计算成本分解
breakdown = {}
total_cost = 0.0
for util in utilizations:
# 简化计算:假设每单位资源每月成本
unit_cost = 10.0
resource_cost = unit_cost * util.utilization_rate
breakdown[util.resource_type.value] = breakdown.get(util.resource_type.value, 0) + resource_cost
total_cost += resource_cost
# 检测异常
anomalies = self._detect_cost_anomalies(utilizations)
# 计算趋势
trends = self._calculate_cost_trends(tenant_id, year, month)
report = CostReport(
id=report_id,
tenant_id=tenant_id,
report_period=report_period,
total_cost=total_cost,
currency="CNY",
breakdown=breakdown,
trends=trends,
anomalies=anomalies,
created_at=now
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO cost_reports
(id, tenant_id, report_period, total_cost, currency, breakdown, trends, anomalies, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (report.id, report.tenant_id, report.report_period, report.total_cost,
report.currency, json.dumps(report.breakdown), json.dumps(report.trends),
json.dumps(report.anomalies), report.created_at))
conn.commit()
return report
def _detect_cost_anomalies(self, utilizations: List[ResourceUtilization]) -> List[Dict]:
"""检测成本异常"""
anomalies = []
for util in utilizations:
# 检测低利用率
if util.utilization_rate < 0.1:
anomalies.append({
"type": "low_utilization",
"resource_type": util.resource_type.value,
"resource_id": util.resource_id,
"utilization_rate": util.utilization_rate,
"severity": "high" if util.utilization_rate < 0.05 else "medium"
})
# 检测高峰利用率
if util.peak_utilization > 0.9:
anomalies.append({
"type": "high_peak",
"resource_type": util.resource_type.value,
"resource_id": util.resource_id,
"peak_utilization": util.peak_utilization,
"severity": "medium"
})
return anomalies
def _calculate_cost_trends(self, tenant_id: str, year: int, month: int) -> Dict:
"""计算成本趋势"""
# 简化实现:返回模拟趋势
return {
"month_over_month": 0.05, # 5% 增长
"year_over_year": 0.15, # 15% 增长
"forecast_next_month": 1.05
}
def record_resource_utilization(self, tenant_id: str, resource_type: ResourceType,
resource_id: str, utilization_rate: float,
peak_utilization: float, avg_utilization: float,
idle_time_percent: float, report_date: str,
recommendations: List[str] = None) -> ResourceUtilization:
"""记录资源利用率"""
util_id = f"ru_{uuid.uuid4().hex[:16]}"
util = ResourceUtilization(
id=util_id,
tenant_id=tenant_id,
resource_type=resource_type,
resource_id=resource_id,
utilization_rate=utilization_rate,
peak_utilization=peak_utilization,
avg_utilization=avg_utilization,
idle_time_percent=idle_time_percent,
report_date=report_date,
recommendations=recommendations or []
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO resource_utilizations
(id, tenant_id, resource_type, resource_id, utilization_rate,
peak_utilization, avg_utilization, idle_time_percent, report_date, recommendations)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (util.id, util.tenant_id, util.resource_type.value, util.resource_id,
util.utilization_rate, util.peak_utilization, util.avg_utilization,
util.idle_time_percent, util.report_date, json.dumps(util.recommendations)))
conn.commit()
return util
def get_resource_utilizations(self, tenant_id: str, report_period: str) -> List[ResourceUtilization]:
"""获取资源利用率列表"""
with self._get_db() as conn:
rows = conn.execute(
"""SELECT * FROM resource_utilizations
WHERE tenant_id = ? AND report_date LIKE ?
ORDER BY report_date DESC""",
(tenant_id, f"{report_period}%")
).fetchall()
return [self._row_to_resource_utilization(row) for row in rows]
def detect_idle_resources(self, tenant_id: str) -> List[IdleResource]:
"""检测闲置资源"""
idle_resources = []
# 获取最近30天的利用率数据
with self._get_db() as conn:
thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat()
rows = conn.execute(
"""SELECT resource_type, resource_id, AVG(utilization_rate) as avg_utilization,
MAX(idle_time_percent) as max_idle_time
FROM resource_utilizations
WHERE tenant_id = ? AND report_date > ?
GROUP BY resource_type, resource_id
HAVING avg_utilization < 0.1 AND max_idle_time > 0.8""",
(tenant_id, thirty_days_ago)
).fetchall()
for row in rows:
idle_id = f"ir_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
idle_resource = IdleResource(
id=idle_id,
tenant_id=tenant_id,
resource_type=ResourceType(row['resource_type']),
resource_id=row['resource_id'],
resource_name=f"{row['resource_type']}-{row['resource_id']}",
idle_since=thirty_days_ago,
estimated_monthly_cost=50.0, # 简化计算
currency="CNY",
reason="Low utilization rate over 30 days",
recommendation="Consider downsizing or terminating this resource",
detected_at=now
)
conn.execute("""
INSERT OR REPLACE INTO idle_resources
(id, tenant_id, resource_type, resource_id, resource_name, idle_since,
estimated_monthly_cost, currency, reason, recommendation, detected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (idle_resource.id, idle_resource.tenant_id, idle_resource.resource_type.value,
idle_resource.resource_id, idle_resource.resource_name, idle_resource.idle_since,
idle_resource.estimated_monthly_cost, idle_resource.currency,
idle_resource.reason, idle_resource.recommendation, idle_resource.detected_at))
idle_resources.append(idle_resource)
conn.commit()
return idle_resources
def get_idle_resources(self, tenant_id: str) -> List[IdleResource]:
"""获取闲置资源列表"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM idle_resources WHERE tenant_id = ? ORDER BY detected_at DESC",
(tenant_id,)
).fetchall()
return [self._row_to_idle_resource(row) for row in rows]
def generate_cost_optimization_suggestions(self, tenant_id: str) -> List[CostOptimizationSuggestion]:
"""生成成本优化建议"""
suggestions = []
# 基于闲置资源生成建议
idle_resources = self.detect_idle_resources(tenant_id)
total_potential_savings = sum(r.estimated_monthly_cost for r in idle_resources)
if total_potential_savings > 0:
suggestion_id = f"cos_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
suggestion = CostOptimizationSuggestion(
id=suggestion_id,
tenant_id=tenant_id,
category="resource_rightsize",
title="清理闲置资源",
description=f"检测到 {len(idle_resources)} 个闲置资源,建议清理以节省成本。",
potential_savings=total_potential_savings,
currency="CNY",
confidence=0.85,
difficulty="easy",
implementation_steps=[
"Review the list of idle resources",
"Confirm resources are no longer needed",
"Terminate or downsize unused resources"
],
risk_level="low",
is_applied=False,
created_at=now,
applied_at=None
)
with self._get_db() as conn:
conn.execute("""
INSERT INTO cost_optimization_suggestions
(id, tenant_id, category, title, description, potential_savings, currency,
confidence, difficulty, implementation_steps, risk_level, is_applied, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (suggestion.id, suggestion.tenant_id, suggestion.category, suggestion.title,
suggestion.description, suggestion.potential_savings, suggestion.currency,
suggestion.confidence, suggestion.difficulty,
json.dumps(suggestion.implementation_steps), suggestion.risk_level,
suggestion.is_applied, suggestion.created_at))
conn.commit()
suggestions.append(suggestion)
# 添加更多优化建议...
return suggestions
def get_cost_optimization_suggestions(self, tenant_id: str,
is_applied: bool = None) -> List[CostOptimizationSuggestion]:
"""获取成本优化建议"""
query = "SELECT * FROM cost_optimization_suggestions WHERE tenant_id = ?"
params = [tenant_id]
if is_applied is not None:
query += " AND is_applied = ?"
params.append(1 if is_applied else 0)
query += " ORDER BY potential_savings DESC"
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_cost_optimization_suggestion(row) for row in rows]
def apply_cost_optimization_suggestion(self, suggestion_id: str) -> Optional[CostOptimizationSuggestion]:
"""应用成本优化建议"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute("""
UPDATE cost_optimization_suggestions
SET is_applied = ?, applied_at = ?
WHERE id = ?
""", (True, now, suggestion_id))
conn.commit()
return self.get_cost_optimization_suggestion(suggestion_id)
def get_cost_optimization_suggestion(self, suggestion_id: str) -> Optional[CostOptimizationSuggestion]:
"""获取成本优化建议详情"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM cost_optimization_suggestions WHERE id = ?",
(suggestion_id,)
).fetchone()
if row:
return self._row_to_cost_optimization_suggestion(row)
return None
# ==================== 辅助方法:数据库行转换 ====================
def _row_to_alert_rule(self, row) -> AlertRule:
return AlertRule(
id=row["id"],
tenant_id=row["tenant_id"],
name=row["name"],
description=row["description"],
rule_type=AlertRuleType(row["rule_type"]),
severity=AlertSeverity(row["severity"]),
metric=row["metric"],
condition=row["condition"],
threshold=row["threshold"],
duration=row["duration"],
evaluation_interval=row["evaluation_interval"],
channels=json.loads(row["channels"]),
labels=json.loads(row["labels"]),
annotations=json.loads(row["annotations"]),
is_enabled=bool(row["is_enabled"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
created_by=row["created_by"]
)
def _row_to_alert_channel(self, row) -> AlertChannel:
return AlertChannel(
id=row["id"],
tenant_id=row["tenant_id"],
name=row["name"],
channel_type=AlertChannelType(row["channel_type"]),
config=json.loads(row["config"]),
severity_filter=json.loads(row["severity_filter"]),
is_enabled=bool(row["is_enabled"]),
success_count=row["success_count"],
fail_count=row["fail_count"],
last_used_at=row["last_used_at"],
created_at=row["created_at"],
updated_at=row["updated_at"]
)
def _row_to_alert(self, row) -> Alert:
return Alert(
id=row["id"],
rule_id=row["rule_id"],
tenant_id=row["tenant_id"],
severity=AlertSeverity(row["severity"]),
status=AlertStatus(row["status"]),
title=row["title"],
description=row["description"],
metric=row["metric"],
value=row["value"],
threshold=row["threshold"],
labels=json.loads(row["labels"]),
annotations=json.loads(row["annotations"]),
started_at=row["started_at"],
resolved_at=row["resolved_at"],
acknowledged_by=row["acknowledged_by"],
acknowledged_at=row["acknowledged_at"],
notification_sent=json.loads(row["notification_sent"]),
suppression_count=row["suppression_count"]
)
def _row_to_suppression_rule(self, row) -> AlertSuppressionRule:
return AlertSuppressionRule(
id=row["id"],
tenant_id=row["tenant_id"],
name=row["name"],
matchers=json.loads(row["matchers"]),
duration=row["duration"],
is_regex=bool(row["is_regex"]),
created_at=row["created_at"],
expires_at=row["expires_at"]
)
def _row_to_resource_metric(self, row) -> ResourceMetric:
return ResourceMetric(
id=row["id"],
tenant_id=row["tenant_id"],
resource_type=ResourceType(row["resource_type"]),
resource_id=row["resource_id"],
metric_name=row["metric_name"],
metric_value=row["metric_value"],
unit=row["unit"],
timestamp=row["timestamp"],
metadata=json.loads(row["metadata"])
)
def _row_to_capacity_plan(self, row) -> CapacityPlan:
return CapacityPlan(
id=row["id"],
tenant_id=row["tenant_id"],
resource_type=ResourceType(row["resource_type"]),
current_capacity=row["current_capacity"],
predicted_capacity=row["predicted_capacity"],
prediction_date=row["prediction_date"],
confidence=row["confidence"],
recommended_action=row["recommended_action"],
estimated_cost=row["estimated_cost"],
created_at=row["created_at"]
)
def _row_to_auto_scaling_policy(self, row) -> AutoScalingPolicy:
return AutoScalingPolicy(
id=row["id"],
tenant_id=row["tenant_id"],
name=row["name"],
resource_type=ResourceType(row["resource_type"]),
min_instances=row["min_instances"],
max_instances=row["max_instances"],
target_utilization=row["target_utilization"],
scale_up_threshold=row["scale_up_threshold"],
scale_down_threshold=row["scale_down_threshold"],
scale_up_step=row["scale_up_step"],
scale_down_step=row["scale_down_step"],
cooldown_period=row["cooldown_period"],
is_enabled=bool(row["is_enabled"]),
created_at=row["created_at"],
updated_at=row["updated_at"]
)
def _row_to_scaling_event(self, row) -> ScalingEvent:
return ScalingEvent(
id=row["id"],
policy_id=row["policy_id"],
tenant_id=row["tenant_id"],
action=ScalingAction(row["action"]),
from_count=row["from_count"],
to_count=row["to_count"],
reason=row["reason"],
triggered_by=row["triggered_by"],
status=row["status"],
started_at=row["started_at"],
completed_at=row["completed_at"],
error_message=row["error_message"]
)
def _row_to_health_check(self, row) -> HealthCheck:
return HealthCheck(
id=row["id"],
tenant_id=row["tenant_id"],
name=row["name"],
target_type=row["target_type"],
target_id=row["target_id"],
check_type=row["check_type"],
check_config=json.loads(row["check_config"]),
interval=row["interval"],
timeout=row["timeout"],
retry_count=row["retry_count"],
healthy_threshold=row["healthy_threshold"],
unhealthy_threshold=row["unhealthy_threshold"],
is_enabled=bool(row["is_enabled"]),
created_at=row["created_at"],
updated_at=row["updated_at"]
)
def _row_to_health_check_result(self, row) -> HealthCheckResult:
return HealthCheckResult(
id=row["id"],
check_id=row["check_id"],
tenant_id=row["tenant_id"],
status=HealthStatus(row["status"]),
response_time=row["response_time"],
message=row["message"],
details=json.loads(row["details"]),
checked_at=row["checked_at"]
)
def _row_to_failover_config(self, row) -> FailoverConfig:
return FailoverConfig(
id=row["id"],
tenant_id=row["tenant_id"],
name=row["name"],
primary_region=row["primary_region"],
secondary_regions=json.loads(row["secondary_regions"]),
failover_trigger=row["failover_trigger"],
auto_failover=bool(row["auto_failover"]),
failover_timeout=row["failover_timeout"],
health_check_id=row["health_check_id"],
is_enabled=bool(row["is_enabled"]),
created_at=row["created_at"],
updated_at=row["updated_at"]
)
def _row_to_failover_event(self, row) -> FailoverEvent:
return FailoverEvent(
id=row["id"],
config_id=row["config_id"],
tenant_id=row["tenant_id"],
from_region=row["from_region"],
to_region=row["to_region"],
reason=row["reason"],
status=row["status"],
started_at=row["started_at"],
completed_at=row["completed_at"],
rolled_back_at=row["rolled_back_at"]
)
def _row_to_backup_job(self, row) -> BackupJob:
return BackupJob(
id=row["id"],
tenant_id=row["tenant_id"],
name=row["name"],
backup_type=row["backup_type"],
target_type=row["target_type"],
target_id=row["target_id"],
schedule=row["schedule"],
retention_days=row["retention_days"],
encryption_enabled=bool(row["encryption_enabled"]),
compression_enabled=bool(row["compression_enabled"]),
storage_location=row["storage_location"],
is_enabled=bool(row["is_enabled"]),
created_at=row["created_at"],
updated_at=row["updated_at"]
)
def _row_to_backup_record(self, row) -> BackupRecord:
return BackupRecord(
id=row["id"],
job_id=row["job_id"],
tenant_id=row["tenant_id"],
status=BackupStatus(row["status"]),
size_bytes=row["size_bytes"],
checksum=row["checksum"],
started_at=row["started_at"],
completed_at=row["completed_at"],
verified_at=row["verified_at"],
error_message=row["error_message"],
storage_path=row["storage_path"]
)
def _row_to_resource_utilization(self, row) -> ResourceUtilization:
return ResourceUtilization(
id=row["id"],
tenant_id=row["tenant_id"],
resource_type=ResourceType(row["resource_type"]),
resource_id=row["resource_id"],
utilization_rate=row["utilization_rate"],
peak_utilization=row["peak_utilization"],
avg_utilization=row["avg_utilization"],
idle_time_percent=row["idle_time_percent"],
report_date=row["report_date"],
recommendations=json.loads(row["recommendations"])
)
def _row_to_idle_resource(self, row) -> IdleResource:
return IdleResource(
id=row["id"],
tenant_id=row["tenant_id"],
resource_type=ResourceType(row["resource_type"]),
resource_id=row["resource_id"],
resource_name=row["resource_name"],
idle_since=row["idle_since"],
estimated_monthly_cost=row["estimated_monthly_cost"],
currency=row["currency"],
reason=row["reason"],
recommendation=row["recommendation"],
detected_at=row["detected_at"]
)
def _row_to_cost_optimization_suggestion(self, row) -> CostOptimizationSuggestion:
return CostOptimizationSuggestion(
id=row["id"],
tenant_id=row["tenant_id"],
category=row["category"],
title=row["title"],
description=row["description"],
potential_savings=row["potential_savings"],
currency=row["currency"],
confidence=row["confidence"],
difficulty=row["difficulty"],
implementation_steps=json.loads(row["implementation_steps"]),
risk_level=row["risk_level"],
is_applied=bool(row["is_applied"]),
created_at=row["created_at"],
applied_at=row["applied_at"]
)
# Singleton instance
_ops_manager = None
def get_ops_manager() -> OpsManager:
global _ops_manager
if _ops_manager is None:
_ops_manager = OpsManager()
return _ops_manager