#!/usr/bin/env python3 """ InsightFlow Operations & Monitoring Manager - Phase 8 Task 8 运维与监控管理模块 - 实时告警系统(规则配置、多渠道通知、告警分级、抑制聚合) - 容量规划与自动扩缩容(资源监控、容量预测、自动扩缩容策略) - 灾备与故障转移(多活架构、健康检查、自动故障转移、数据备份恢复) - 成本优化(资源利用率监控、成本分析、闲置资源识别、优化建议) 作者: InsightFlow Team """ import asyncio import hashlib import json import os import re import sqlite3 import statistics import time import uuid from collections.abc import Callable from dataclasses import dataclass from datetime import datetime, timedelta from enum import StrEnum import httpx # Database path DB_PATH = os.path.join(os.path.dirname(__file__), "insightflow.db") class AlertSeverity(StrEnum): """告警严重级别 P0-P3""" P0 = "p0" # 紧急 - 系统不可用,需要立即处理 P1 = "p1" # 严重 - 核心功能受损,需要1小时内处理 P2 = "p2" # 一般 - 部分功能受影响,需要4小时内处理 P3 = "p3" # 轻微 - 非核心功能问题,24小时内处理 class AlertStatus(StrEnum): """告警状态""" FIRING = "firing" # 正在告警 RESOLVED = "resolved" # 已恢复 ACKNOWLEDGED = "acknowledged" # 已确认 SUPPRESSED = "suppressed" # 已抑制 class AlertChannelType(StrEnum): """告警渠道类型""" PAGERDUTY = "pagerduty" OPSGENIE = "opsgenie" FEISHU = "feishu" DINGTALK = "dingtalk" SLACK = "slack" EMAIL = "email" SMS = "sms" WEBHOOK = "webhook" class AlertRuleType(StrEnum): """告警规则类型""" THRESHOLD = "threshold" # 阈值告警 ANOMALY = "anomaly" # 异常检测 PREDICTIVE = "predictive" # 预测性告警 COMPOSITE = "composite" # 复合告警 class ResourceType(StrEnum): """资源类型""" CPU = "cpu" MEMORY = "memory" DISK = "disk" NETWORK = "network" GPU = "gpu" DATABASE = "database" CACHE = "cache" QUEUE = "queue" class ScalingAction(StrEnum): """扩缩容动作""" SCALE_UP = "scale_up" # 扩容 SCALE_DOWN = "scale_down" # 缩容 MAINTAIN = "maintain" # 保持 class HealthStatus(StrEnum): """健康状态""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" UNKNOWN = "unknown" class BackupStatus(StrEnum): """备份状态""" PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" VERIFIED = "verified" @dataclass class AlertRule: """告警规则""" id: str tenant_id: str name: str description: str rule_type: AlertRuleType severity: AlertSeverity metric: str # 监控指标 condition: str # 条件: >, <, ==, >=, <=, != threshold: float duration: int # 持续时间(秒) evaluation_interval: int # 评估间隔(秒) channels: list[str] # 告警渠道ID列表 labels: dict[str, str] # 标签 annotations: dict[str, str] # 注释 is_enabled: bool created_at: str updated_at: str created_by: str @dataclass class AlertChannel: """告警渠道配置""" id: str tenant_id: str name: str channel_type: AlertChannelType config: dict # 渠道特定配置 severity_filter: list[str] # 过滤的告警级别 is_enabled: bool success_count: int fail_count: int last_used_at: str | None created_at: str updated_at: str @dataclass class Alert: """告警实例""" id: str rule_id: str tenant_id: str severity: AlertSeverity status: AlertStatus title: str description: str metric: str value: float threshold: float labels: dict[str, str] annotations: dict[str, str] started_at: str resolved_at: str | None acknowledged_by: str | None acknowledged_at: str | None notification_sent: dict[str, bool] # 渠道发送状态 suppression_count: int # 抑制计数 @dataclass class AlertSuppressionRule: """告警抑制规则""" id: str tenant_id: str name: str matchers: dict[str, str] # 匹配条件 duration: int # 抑制持续时间(秒) is_regex: bool # 是否使用正则匹配 created_at: str expires_at: str | None @dataclass class AlertGroup: """告警聚合组""" id: str tenant_id: str group_key: str # 聚合键 alerts: list[str] # 告警ID列表 created_at: str updated_at: str @dataclass class ResourceMetric: """资源指标""" id: str tenant_id: str resource_type: ResourceType resource_id: str metric_name: str metric_value: float unit: str timestamp: str metadata: dict @dataclass class CapacityPlan: """容量规划""" id: str tenant_id: str resource_type: ResourceType current_capacity: float predicted_capacity: float prediction_date: str confidence: float recommended_action: str estimated_cost: float created_at: str @dataclass class AutoScalingPolicy: """自动扩缩容策略""" id: str tenant_id: str name: str resource_type: ResourceType min_instances: int max_instances: int target_utilization: float # 目标利用率 scale_up_threshold: float scale_down_threshold: float scale_up_step: int scale_down_step: int cooldown_period: int # 冷却时间(秒) is_enabled: bool created_at: str updated_at: str @dataclass class ScalingEvent: """扩缩容事件""" id: str policy_id: str tenant_id: str action: ScalingAction from_count: int to_count: int reason: str triggered_by: str # 触发来源: manual, auto, scheduled status: str # pending, in_progress, completed, failed started_at: str completed_at: str | None error_message: str | None @dataclass class HealthCheck: """健康检查配置""" id: str tenant_id: str name: str target_type: str # service, database, api, etc. target_id: str check_type: str # http, tcp, ping, custom check_config: dict # 检查配置 interval: int # 检查间隔(秒) timeout: int # 超时时间(秒) retry_count: int healthy_threshold: int unhealthy_threshold: int is_enabled: bool created_at: str updated_at: str @dataclass class HealthCheckResult: """健康检查结果""" id: str check_id: str tenant_id: str status: HealthStatus response_time: float # 响应时间(毫秒) message: str details: dict checked_at: str @dataclass class FailoverConfig: """故障转移配置""" id: str tenant_id: str name: str primary_region: str secondary_regions: list[str] # 备用区域列表 failover_trigger: str # 触发条件 auto_failover: bool failover_timeout: int # 故障转移超时(秒) health_check_id: str is_enabled: bool created_at: str updated_at: str @dataclass class FailoverEvent: """故障转移事件""" id: str config_id: str tenant_id: str from_region: str to_region: str reason: str status: str # initiated, in_progress, completed, failed, rolled_back started_at: str completed_at: str | None rolled_back_at: str | None @dataclass class BackupJob: """备份任务""" id: str tenant_id: str name: str backup_type: str # full, incremental, differential target_type: str # database, files, configuration target_id: str schedule: str # cron 表达式 retention_days: int encryption_enabled: bool compression_enabled: bool storage_location: str is_enabled: bool created_at: str updated_at: str @dataclass class BackupRecord: """备份记录""" id: str job_id: str tenant_id: str status: BackupStatus size_bytes: int checksum: str started_at: str completed_at: str | None verified_at: str | None error_message: str | None storage_path: str @dataclass class CostReport: """成本报告""" id: str tenant_id: str report_period: str # YYYY-MM total_cost: float currency: str breakdown: dict[str, float] # 按资源类型分解 trends: dict # 趋势数据 anomalies: list[dict] # 异常检测 created_at: str @dataclass class ResourceUtilization: """资源利用率""" id: str tenant_id: str resource_type: ResourceType resource_id: str utilization_rate: float # 0-1 peak_utilization: float avg_utilization: float idle_time_percent: float report_date: str recommendations: list[str] @dataclass class IdleResource: """闲置资源""" id: str tenant_id: str resource_type: ResourceType resource_id: str resource_name: str idle_since: str estimated_monthly_cost: float currency: str reason: str recommendation: str detected_at: str @dataclass class CostOptimizationSuggestion: """成本优化建议""" id: str tenant_id: str category: str # resource_rightsize, reserved_instances, spot_instances, etc. title: str description: str potential_savings: float currency: str confidence: float difficulty: str # easy, medium, hard implementation_steps: list[str] risk_level: str # low, medium, high is_applied: bool created_at: str applied_at: str | None class OpsManager: """运维与监控管理主类""" def __init__(self, db_path: str = DB_PATH): self.db_path = db_path self._alert_evaluators: dict[str, Callable] = {} self._running = False self._evaluator_thread = None self._register_default_evaluators() def _get_db(self) -> None: """获取数据库连接""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def _register_default_evaluators(self) -> None: """注册默认的告警评估器""" self._alert_evaluators[AlertRuleType.THRESHOLD.value] = self._evaluate_threshold_rule self._alert_evaluators[AlertRuleType.ANOMALY.value] = self._evaluate_anomaly_rule self._alert_evaluators[AlertRuleType.PREDICTIVE.value] = self._evaluate_predictive_rule # ==================== 告警规则管理 ==================== def create_alert_rule( self, tenant_id: str, name: str, description: str, rule_type: AlertRuleType, severity: AlertSeverity, metric: str, condition: str, threshold: float, duration: int, evaluation_interval: int, channels: list[str], labels: dict, annotations: dict, created_by: str, ) -> AlertRule: """创建告警规则""" rule_id = f"ar_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() rule = AlertRule( id=rule_id, tenant_id=tenant_id, name=name, description=description, rule_type=rule_type, severity=severity, metric=metric, condition=condition, threshold=threshold, duration=duration, evaluation_interval=evaluation_interval, channels=channels, labels=labels or {}, annotations=annotations or {}, is_enabled=True, created_at=now, updated_at=now, created_by=created_by, ) with self._get_db() as conn: conn.execute( """ INSERT INTO alert_rules (id, tenant_id, name, description, rule_type, severity, metric, condition, threshold, duration, evaluation_interval, channels, labels, annotations, is_enabled, created_at, updated_at, created_by) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( rule.id, rule.tenant_id, rule.name, rule.description, rule.rule_type.value, rule.severity.value, rule.metric, rule.condition, rule.threshold, rule.duration, rule.evaluation_interval, json.dumps(rule.channels), json.dumps(rule.labels), json.dumps(rule.annotations), rule.is_enabled, rule.created_at, rule.updated_at, rule.created_by, ), ) conn.commit() return rule def get_alert_rule(self, rule_id: str) -> AlertRule | None: """获取告警规则""" with self._get_db() as conn: row = conn.execute("SELECT * FROM alert_rules WHERE id = ?", (rule_id,)).fetchone() if row: return self._row_to_alert_rule(row) return None def list_alert_rules(self, tenant_id: str, is_enabled: bool | None = None) -> list[AlertRule]: """列出租户的所有告警规则""" query = "SELECT * FROM alert_rules WHERE tenant_id = ?" params = [tenant_id] if is_enabled is not None: query += " AND is_enabled = ?" params.append(1 if is_enabled else 0) query += " ORDER BY created_at DESC" with self._get_db() as conn: rows = conn.execute(query, params).fetchall() return [self._row_to_alert_rule(row) for row in rows] def update_alert_rule(self, rule_id: str, **kwargs) -> AlertRule | None: """更新告警规则""" allowed_fields = [ "name", "description", "severity", "metric", "condition", "threshold", "duration", "evaluation_interval", "channels", "labels", "annotations", "is_enabled", ] updates = {k: v for k, v in kwargs.items() if k in allowed_fields} if not updates: return self.get_alert_rule(rule_id) # 处理列表和字典字段 if "channels" in updates: updates["channels"] = json.dumps(updates["channels"]) if "labels" in updates: updates["labels"] = json.dumps(updates["labels"]) if "annotations" in updates: updates["annotations"] = json.dumps(updates["annotations"]) if "severity" in updates and isinstance(updates["severity"], AlertSeverity): updates["severity"] = updates["severity"].value updates["updated_at"] = datetime.now().isoformat() with self._get_db() as conn: set_clause = ", ".join([f"{k} = ?" for k in updates.keys()]) conn.execute( f"UPDATE alert_rules SET {set_clause} WHERE id = ?", list(updates.values()) + [rule_id], ) conn.commit() return self.get_alert_rule(rule_id) def delete_alert_rule(self, rule_id: str) -> bool: """删除告警规则""" with self._get_db() as conn: conn.execute("DELETE FROM alert_rules WHERE id = ?", (rule_id,)) conn.commit() return conn.total_changes > 0 # ==================== 告警渠道管理 ==================== def create_alert_channel( self, tenant_id: str, name: str, channel_type: AlertChannelType, config: dict, severity_filter: list[str] = None, ) -> AlertChannel: """创建告警渠道""" channel_id = f"ac_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() channel = AlertChannel( id=channel_id, tenant_id=tenant_id, name=name, channel_type=channel_type, config=config, severity_filter=severity_filter or [s.value for s in AlertSeverity], is_enabled=True, success_count=0, fail_count=0, last_used_at=None, created_at=now, updated_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO alert_channels (id, tenant_id, name, channel_type, config, severity_filter, is_enabled, success_count, fail_count, last_used_at, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( channel.id, channel.tenant_id, channel.name, channel.channel_type.value, json.dumps(channel.config), json.dumps(channel.severity_filter), channel.is_enabled, channel.success_count, channel.fail_count, channel.last_used_at, channel.created_at, channel.updated_at, ), ) conn.commit() return channel def get_alert_channel(self, channel_id: str) -> AlertChannel | None: """获取告警渠道""" with self._get_db() as conn: row = conn.execute( "SELECT * FROM alert_channels WHERE id = ?", (channel_id,) ).fetchone() if row: return self._row_to_alert_channel(row) return None def list_alert_channels(self, tenant_id: str) -> list[AlertChannel]: """列出租户的所有告警渠道""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM alert_channels WHERE tenant_id = ? ORDER BY created_at DESC", (tenant_id,), ).fetchall() return [self._row_to_alert_channel(row) for row in rows] def test_alert_channel(self, channel_id: str) -> bool: """测试告警渠道""" channel = self.get_alert_channel(channel_id) if not channel: return False test_alert = Alert( id="test", rule_id="test", tenant_id=channel.tenant_id, severity=AlertSeverity.P3, status=AlertStatus.FIRING, title="测试告警", description="这是一条测试告警消息,用于验证告警渠道配置。", metric="test_metric", value=0.0, threshold=0.0, labels={"test": "true"}, annotations={}, started_at=datetime.now().isoformat(), resolved_at=None, acknowledged_by=None, acknowledged_at=None, notification_sent={}, suppression_count=0, ) return asyncio.run(self._send_alert_to_channel(test_alert, channel)) # ==================== 告警评估与触发 ==================== def _evaluate_threshold_rule(self, rule: AlertRule, metrics: list[ResourceMetric]) -> bool: """评估阈值告警规则""" if not metrics: return False # 获取最近 duration 秒内的指标 cutoff_time = datetime.now() - timedelta(seconds=rule.duration) recent_metrics = [m for m in metrics if datetime.fromisoformat(m.timestamp) > cutoff_time] if not recent_metrics: return False # 计算平均值 avg_value = statistics.mean([m.metric_value for m in recent_metrics]) # 评估条件 condition_map = { ">": lambda x, y: x > y, "<": lambda x, y: x < y, ">=": lambda x, y: x >= y, "<=": lambda x, y: x <= y, "==": lambda x, y: x == y, "!=": lambda x, y: x != y, } evaluator = condition_map.get(rule.condition) if evaluator: return evaluator(avg_value, rule.threshold) return False def _evaluate_anomaly_rule(self, rule: AlertRule, metrics: list[ResourceMetric]) -> bool: """评估异常检测规则(基于标准差)""" if len(metrics) < 10: return False values = [m.metric_value for m in metrics] mean = statistics.mean(values) std = statistics.stdev(values) if len(values) > 1 else 0 if std == 0: return False # 最近值偏离均值超过3个标准差视为异常 latest_value = values[-1] z_score = abs(latest_value - mean) / std return z_score > 3.0 def _evaluate_predictive_rule(self, rule: AlertRule, metrics: list[ResourceMetric]) -> bool: """评估预测性告警规则(基于线性趋势)""" if len(metrics) < 5: return False # 简单的线性趋势预测 values = [m.metric_value for m in metrics[-10:]] # 最近10个点 n = len(values) if n < 2: return False x = list(range(n)) mean_x = sum(x) / n mean_y = sum(values) / n # 计算斜率 numerator = sum((x[i] - mean_x) * (values[i] - mean_y) for i in range(n)) denominator = sum((x[i] - mean_x) ** 2 for i in range(n)) slope = numerator / denominator if denominator != 0 else 0 # 预测下一个值 predicted = values[-1] + slope # 如果预测值超过阈值,触发告警 condition_map = { ">": lambda x, y: x > y, "<": lambda x, y: x < y, } evaluator = condition_map.get(rule.condition) if evaluator: return evaluator(predicted, rule.threshold) return False async def evaluate_alert_rules(self, tenant_id: str): """评估所有告警规则""" rules = self.list_alert_rules(tenant_id, is_enabled=True) for rule in rules: # 获取相关指标 metrics = self.get_recent_metrics( tenant_id, rule.metric, seconds=rule.duration + rule.evaluation_interval ) # 评估规则 evaluator = self._alert_evaluators.get(rule.rule_type.value) if evaluator and evaluator(rule, metrics): # 触发告警 await self._trigger_alert(rule, metrics[-1] if metrics else None) async def _trigger_alert(self, rule: AlertRule, metric: ResourceMetric | None): """触发告警""" # 检查是否已有相同告警在触发中 existing = self.get_active_alert_by_rule(rule.id) if existing: # 更新抑制计数 self._increment_suppression_count(existing.id) return # 检查抑制规则 if self._is_alert_suppressed(rule): return alert_id = f"al_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() alert = Alert( id=alert_id, rule_id=rule.id, tenant_id=rule.tenant_id, severity=rule.severity, status=AlertStatus.FIRING, title=rule.annotations.get("summary", f"告警: {rule.name}"), description=rule.annotations.get("description", rule.description), metric=rule.metric, value=metric.metric_value if metric else 0.0, threshold=rule.threshold, labels=rule.labels, annotations=rule.annotations, started_at=now, resolved_at=None, acknowledged_by=None, acknowledged_at=None, notification_sent={}, suppression_count=0, ) # 保存告警 with self._get_db() as conn: conn.execute( """ INSERT INTO alerts (id, rule_id, tenant_id, severity, status, title, description, metric, value, threshold, labels, annotations, started_at, notification_sent, suppression_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( alert.id, alert.rule_id, alert.tenant_id, alert.severity.value, alert.status.value, alert.title, alert.description, alert.metric, alert.value, alert.threshold, json.dumps(alert.labels), json.dumps(alert.annotations), alert.started_at, json.dumps(alert.notification_sent), alert.suppression_count, ), ) conn.commit() # 发送告警通知 await self._send_alert_notifications(alert, rule) async def _send_alert_notifications(self, alert: Alert, rule: AlertRule): """发送告警通知到所有配置的渠道""" channels = [] for channel_id in rule.channels: channel = self.get_alert_channel(channel_id) if channel and channel.is_enabled: channels.append(channel) for channel in channels: # 检查严重级别过滤 if alert.severity.value not in channel.severity_filter: continue success = await self._send_alert_to_channel(alert, channel) # 更新发送状态 alert.notification_sent[channel.id] = success self._update_alert_notification_status(alert.id, channel.id, success) async def _send_alert_to_channel(self, alert: Alert, channel: AlertChannel) -> bool: """发送告警到指定渠道""" try: if channel.channel_type == AlertChannelType.FEISHU: return await self._send_feishu_alert(alert, channel) elif channel.channel_type == AlertChannelType.DINGTALK: return await self._send_dingtalk_alert(alert, channel) elif channel.channel_type == AlertChannelType.SLACK: return await self._send_slack_alert(alert, channel) elif channel.channel_type == AlertChannelType.EMAIL: return await self._send_email_alert(alert, channel) elif channel.channel_type == AlertChannelType.PAGERDUTY: return await self._send_pagerduty_alert(alert, channel) elif channel.channel_type == AlertChannelType.OPSGENIE: return await self._send_opsgenie_alert(alert, channel) elif channel.channel_type == AlertChannelType.WEBHOOK: return await self._send_webhook_alert(alert, channel) else: return False except Exception as e: print(f"Failed to send alert to {channel.name}: {e}") return False async def _send_feishu_alert(self, alert: Alert, channel: AlertChannel) -> bool: """发送飞书告警""" config = channel.config webhook_url = config.get("webhook_url") config.get("secret", "") if not webhook_url: return False # 构建飞书消息 severity_colors = { AlertSeverity.P0.value: "red", AlertSeverity.P1.value: "orange", AlertSeverity.P2.value: "yellow", AlertSeverity.P3.value: "blue", } message = { "msg_type": "interactive", "card": { "config": {"wide_screen_mode": True}, "header": { "title": { "tag": "plain_text", "content": f"🚨 [{alert.severity.value.upper()}] {alert.title}", }, "template": severity_colors.get(alert.severity.value, "blue"), }, "elements": [ { "tag": "div", "text": { "tag": "lark_md", "content": f"**描述:** {alert.description}\n\n**指标:** {alert.metric}\n**当前值:** {alert.value}\n**阈值:** {alert.threshold}", }, }, { "tag": "div", "text": {"tag": "lark_md", "content": f"**时间:** {alert.started_at}"}, }, ], }, } async with httpx.AsyncClient() as client: response = await client.post(webhook_url, json=message, timeout=30.0) success = response.status_code == 200 if success: self._update_channel_stats(channel.id, success=True) else: self._update_channel_stats(channel.id, success=False) return success async def _send_dingtalk_alert(self, alert: Alert, channel: AlertChannel) -> bool: """发送钉钉告警""" config = channel.config webhook_url = config.get("webhook_url") config.get("secret", "") if not webhook_url: return False # 构建钉钉消息 message = { "msgtype": "markdown", "markdown": { "title": f"[{alert.severity.value.upper()}] {alert.title}", "text": f"## 🚨 [{alert.severity.value.upper()}] {alert.title}\n\n" + f"**描述:** {alert.description}\n\n" + f"**指标:** {alert.metric}\n" + f"**当前值:** {alert.value}\n" + f"**阈值:** {alert.threshold}\n\n" + f"**时间:** {alert.started_at}", }, } async with httpx.AsyncClient() as client: response = await client.post(webhook_url, json=message, timeout=30.0) success = response.status_code == 200 self._update_channel_stats(channel.id, success) return success async def _send_slack_alert(self, alert: Alert, channel: AlertChannel) -> bool: """发送 Slack 告警""" config = channel.config webhook_url = config.get("webhook_url") if not webhook_url: return False severity_emojis = { AlertSeverity.P0.value: "🔴", AlertSeverity.P1.value: "🟠", AlertSeverity.P2.value: "🟡", AlertSeverity.P3.value: "🔵", } emoji = severity_emojis.get(alert.severity.value, "⚪") message = { "text": f"{emoji} [{alert.severity.value.upper()}] {alert.title}", "blocks": [ { "type": "header", "text": { "type": "plain_text", "text": f"{emoji} [{alert.severity.value.upper()}] {alert.title}", }, }, { "type": "section", "fields": [ {"type": "mrkdwn", "text": f"*描述:*\n{alert.description}"}, {"type": "mrkdwn", "text": f"*指标:*\n{alert.metric}"}, {"type": "mrkdwn", "text": f"*当前值:*\n{alert.value}"}, {"type": "mrkdwn", "text": f"*阈值:*\n{alert.threshold}"}, ], }, { "type": "context", "elements": [{"type": "mrkdwn", "text": f"触发时间: {alert.started_at}"}], }, ], } async with httpx.AsyncClient() as client: response = await client.post(webhook_url, json=message, timeout=30.0) success = response.status_code == 200 self._update_channel_stats(channel.id, success) return success async def _send_email_alert(self, alert: Alert, channel: AlertChannel) -> bool: """发送邮件告警(模拟实现)""" # 实际实现需要集成邮件服务如 SendGrid、AWS SES 等 config = channel.config smtp_host = config.get("smtp_host") config.get("smtp_port", 587) username = config.get("username") password = config.get("password") to_addresses = config.get("to_addresses", []) if not all([smtp_host, username, password, to_addresses]): return False # 这里模拟发送成功 self._update_channel_stats(channel.id, True) return True async def _send_pagerduty_alert(self, alert: Alert, channel: AlertChannel) -> bool: """发送 PagerDuty 告警""" config = channel.config integration_key = config.get("integration_key") if not integration_key: return False severity_map = { AlertSeverity.P0.value: "critical", AlertSeverity.P1.value: "error", AlertSeverity.P2.value: "warning", AlertSeverity.P3.value: "info", } message = { "routing_key": integration_key, "event_action": "trigger", "dedup_key": alert.id, "payload": { "summary": alert.title, "severity": severity_map.get(alert.severity.value, "warning"), "source": alert.labels.get("instance", "unknown"), "custom_details": { "description": alert.description, "metric": alert.metric, "value": alert.value, "threshold": alert.threshold, }, }, } async with httpx.AsyncClient() as client: response = await client.post( "https://events.pagerduty.com/v2/enqueue", json=message, timeout=30.0 ) success = response.status_code == 202 self._update_channel_stats(channel.id, success) return success async def _send_opsgenie_alert(self, alert: Alert, channel: AlertChannel) -> bool: """发送 Opsgenie 告警""" config = channel.config api_key = config.get("api_key") if not api_key: return False priority_map = { AlertSeverity.P0.value: "P1", AlertSeverity.P1.value: "P2", AlertSeverity.P2.value: "P3", AlertSeverity.P3.value: "P4", } message = { "message": alert.title, "description": alert.description, "priority": priority_map.get(alert.severity.value, "P3"), "alias": alert.id, "details": { "metric": alert.metric, "value": str(alert.value), "threshold": str(alert.threshold), }, } async with httpx.AsyncClient() as client: response = await client.post( "https://api.opsgenie.com/v2/alerts", json=message, headers={"Authorization": f"GenieKey {api_key}"}, timeout=30.0, ) success = response.status_code in [200, 201, 202] self._update_channel_stats(channel.id, success) return success async def _send_webhook_alert(self, alert: Alert, channel: AlertChannel) -> bool: """发送 Webhook 告警""" config = channel.config webhook_url = config.get("webhook_url") headers = config.get("headers", {}) if not webhook_url: return False message = { "alert_id": alert.id, "severity": alert.severity.value, "status": alert.status.value, "title": alert.title, "description": alert.description, "metric": alert.metric, "value": alert.value, "threshold": alert.threshold, "labels": alert.labels, "started_at": alert.started_at, } async with httpx.AsyncClient() as client: response = await client.post(webhook_url, json=message, headers=headers, timeout=30.0) success = response.status_code in [200, 201, 202] self._update_channel_stats(channel.id, success) return success # ==================== 告警查询与管理 ==================== def get_active_alert_by_rule(self, rule_id: str) -> Alert | None: """获取规则对应的活跃告警""" with self._get_db() as conn: row = conn.execute( """SELECT * FROM alerts WHERE rule_id = ? AND status = ? ORDER BY started_at DESC LIMIT 1""", (rule_id, AlertStatus.FIRING.value), ).fetchone() if row: return self._row_to_alert(row) return None def get_alert(self, alert_id: str) -> Alert | None: """获取告警详情""" with self._get_db() as conn: row = conn.execute("SELECT * FROM alerts WHERE id = ?", (alert_id,)).fetchone() if row: return self._row_to_alert(row) return None def list_alerts( self, tenant_id: str, status: AlertStatus | None = None, severity: AlertSeverity | None = None, limit: int = 100, ) -> list[Alert]: """列出租户的告警""" query = "SELECT * FROM alerts WHERE tenant_id = ?" params = [tenant_id] if status: query += " AND status = ?" params.append(status.value) if severity: query += " AND severity = ?" params.append(severity.value) query += " ORDER BY started_at DESC LIMIT ?" params.append(limit) with self._get_db() as conn: rows = conn.execute(query, params).fetchall() return [self._row_to_alert(row) for row in rows] def acknowledge_alert(self, alert_id: str, user_id: str) -> Alert | None: """确认告警""" now = datetime.now().isoformat() with self._get_db() as conn: conn.execute( """ UPDATE alerts SET status = ?, acknowledged_by = ?, acknowledged_at = ? WHERE id = ? """, (AlertStatus.ACKNOWLEDGED.value, user_id, now, alert_id), ) conn.commit() return self.get_alert(alert_id) def resolve_alert(self, alert_id: str) -> Alert | None: """解决告警""" now = datetime.now().isoformat() with self._get_db() as conn: conn.execute( """ UPDATE alerts SET status = ?, resolved_at = ? WHERE id = ? """, (AlertStatus.RESOLVED.value, now, alert_id), ) conn.commit() return self.get_alert(alert_id) def _increment_suppression_count(self, alert_id: str) -> None: """增加告警抑制计数""" with self._get_db() as conn: conn.execute( """ UPDATE alerts SET suppression_count = suppression_count + 1 WHERE id = ? """, (alert_id,), ) conn.commit() def _update_alert_notification_status( self, alert_id: str, channel_id: str, success: bool ) -> None: """更新告警通知状态""" with self._get_db() as conn: row = conn.execute( "SELECT notification_sent FROM alerts WHERE id = ?", (alert_id,) ).fetchone() if row: notification_sent = json.loads(row["notification_sent"]) notification_sent[channel_id] = success conn.execute( "UPDATE alerts SET notification_sent = ? WHERE id = ?", (json.dumps(notification_sent), alert_id), ) conn.commit() def _update_channel_stats(self, channel_id: str, success: bool) -> None: """更新渠道统计""" now = datetime.now().isoformat() with self._get_db() as conn: if success: conn.execute( """ UPDATE alert_channels SET success_count = success_count + 1, last_used_at = ? WHERE id = ? """, (now, channel_id), ) else: conn.execute( """ UPDATE alert_channels SET fail_count = fail_count + 1, last_used_at = ? WHERE id = ? """, (now, channel_id), ) conn.commit() # ==================== 告警抑制与聚合 ==================== def create_suppression_rule( self, tenant_id: str, name: str, matchers: dict[str, str], duration: int, is_regex: bool = False, expires_at: str | None = None, ) -> AlertSuppressionRule: """创建告警抑制规则""" rule_id = f"sr_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() rule = AlertSuppressionRule( id=rule_id, tenant_id=tenant_id, name=name, matchers=matchers, duration=duration, is_regex=is_regex, created_at=now, expires_at=expires_at, ) with self._get_db() as conn: conn.execute( """ INSERT INTO alert_suppression_rules (id, tenant_id, name, matchers, duration, is_regex, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( rule.id, rule.tenant_id, rule.name, json.dumps(rule.matchers), rule.duration, rule.is_regex, rule.created_at, rule.expires_at, ), ) conn.commit() return rule def _is_alert_suppressed(self, rule: AlertRule) -> bool: """检查告警是否被抑制""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM alert_suppression_rules WHERE tenant_id = ?", (rule.tenant_id,) ).fetchall() for row in rows: suppression_rule = self._row_to_suppression_rule(row) # 检查是否过期 if suppression_rule.expires_at: if datetime.now() > datetime.fromisoformat(suppression_rule.expires_at): continue # 检查匹配 matchers = suppression_rule.matchers match = True for key, pattern in matchers.items(): value = rule.labels.get(key, "") if suppression_rule.is_regex: if not re.match(pattern, value): match = False break else: if value != pattern: match = False break if match: return True return False # ==================== 资源监控 ==================== def record_resource_metric( self, tenant_id: str, resource_type: ResourceType, resource_id: str, metric_name: str, metric_value: float, unit: str, metadata: dict = None, ) -> ResourceMetric: """记录资源指标""" metric_id = f"rm_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() metric = ResourceMetric( id=metric_id, tenant_id=tenant_id, resource_type=resource_type, resource_id=resource_id, metric_name=metric_name, metric_value=metric_value, unit=unit, timestamp=now, metadata=metadata or {}, ) with self._get_db() as conn: conn.execute( """ INSERT INTO resource_metrics (id, tenant_id, resource_type, resource_id, metric_name, metric_value, unit, timestamp, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( metric.id, metric.tenant_id, metric.resource_type.value, metric.resource_id, metric.metric_name, metric.metric_value, metric.unit, metric.timestamp, json.dumps(metric.metadata), ), ) conn.commit() return metric def get_recent_metrics( self, tenant_id: str, metric_name: str, seconds: int = 3600 ) -> list[ResourceMetric]: """获取最近的指标数据""" cutoff_time = (datetime.now() - timedelta(seconds=seconds)).isoformat() with self._get_db() as conn: rows = conn.execute( """SELECT * FROM resource_metrics WHERE tenant_id = ? AND metric_name = ? AND timestamp > ? ORDER BY timestamp DESC""", (tenant_id, metric_name, cutoff_time), ).fetchall() return [self._row_to_resource_metric(row) for row in rows] def get_resource_metrics( self, tenant_id: str, resource_type: ResourceType, resource_id: str, metric_name: str, start_time: str, end_time: str, ) -> list[ResourceMetric]: """获取指定资源的指标数据""" with self._get_db() as conn: rows = conn.execute( """SELECT * FROM resource_metrics WHERE tenant_id = ? AND resource_type = ? AND resource_id = ? AND metric_name = ? AND timestamp BETWEEN ? AND ? ORDER BY timestamp ASC""", (tenant_id, resource_type.value, resource_id, metric_name, start_time, end_time), ).fetchall() return [self._row_to_resource_metric(row) for row in rows] # ==================== 容量规划 ==================== def create_capacity_plan( self, tenant_id: str, resource_type: ResourceType, current_capacity: float, prediction_date: str, confidence: float = 0.8, ) -> CapacityPlan: """创建容量规划""" plan_id = f"cp_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() # 基于历史数据预测 metrics = self.get_recent_metrics( tenant_id, f"{resource_type.value}_usage", seconds=30 * 24 * 3600 ) if metrics: values = [m.metric_value for m in metrics] trend = self._calculate_trend(values) # 预测未来容量需求 days_ahead = (datetime.fromisoformat(prediction_date) - datetime.now()).days predicted_capacity = current_capacity * (1 + trend * days_ahead / 30) # 推荐操作 if predicted_capacity > current_capacity * 1.2: recommended_action = "scale_up" estimated_cost = (predicted_capacity - current_capacity) * 10 # 简化计算 elif predicted_capacity < current_capacity * 0.5: recommended_action = "scale_down" estimated_cost = 0 else: recommended_action = "maintain" estimated_cost = 0 else: predicted_capacity = current_capacity recommended_action = "insufficient_data" estimated_cost = 0 plan = CapacityPlan( id=plan_id, tenant_id=tenant_id, resource_type=resource_type, current_capacity=current_capacity, predicted_capacity=predicted_capacity, prediction_date=prediction_date, confidence=confidence, recommended_action=recommended_action, estimated_cost=estimated_cost, created_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO capacity_plans (id, tenant_id, resource_type, current_capacity, predicted_capacity, prediction_date, confidence, recommended_action, estimated_cost, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( plan.id, plan.tenant_id, plan.resource_type.value, plan.current_capacity, plan.predicted_capacity, plan.prediction_date, plan.confidence, plan.recommended_action, plan.estimated_cost, plan.created_at, ), ) conn.commit() return plan def _calculate_trend(self, values: list[float]) -> float: """计算趋势(增长率)""" if len(values) < 2: return 0.0 # 使用最近的数据计算趋势 recent = values[-10:] if len(values) > 10 else values n = len(recent) if n < 2: return 0.0 # 简单线性回归计算斜率 x = list(range(n)) mean_x = sum(x) / n mean_y = sum(recent) / n numerator = sum((x[i] - mean_x) * (recent[i] - mean_y) for i in range(n)) denominator = sum((x[i] - mean_x) ** 2 for i in range(n)) slope = numerator / denominator if denominator != 0 else 0 # 归一化为增长率 if mean_y != 0: return slope / mean_y return 0.0 def get_capacity_plans(self, tenant_id: str) -> list[CapacityPlan]: """获取容量规划列表""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM capacity_plans WHERE tenant_id = ? ORDER BY created_at DESC", (tenant_id,), ).fetchall() return [self._row_to_capacity_plan(row) for row in rows] # ==================== 自动扩缩容 ==================== def create_auto_scaling_policy( self, tenant_id: str, name: str, resource_type: ResourceType, min_instances: int, max_instances: int, target_utilization: float, scale_up_threshold: float, scale_down_threshold: float, scale_up_step: int = 1, scale_down_step: int = 1, cooldown_period: int = 300, ) -> AutoScalingPolicy: """创建自动扩缩容策略""" policy_id = f"asp_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() policy = AutoScalingPolicy( id=policy_id, tenant_id=tenant_id, name=name, resource_type=resource_type, min_instances=min_instances, max_instances=max_instances, target_utilization=target_utilization, scale_up_threshold=scale_up_threshold, scale_down_threshold=scale_down_threshold, scale_up_step=scale_up_step, scale_down_step=scale_down_step, cooldown_period=cooldown_period, is_enabled=True, created_at=now, updated_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO auto_scaling_policies (id, tenant_id, name, resource_type, min_instances, max_instances, target_utilization, scale_up_threshold, scale_down_threshold, scale_up_step, scale_down_step, cooldown_period, is_enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( policy.id, policy.tenant_id, policy.name, policy.resource_type.value, policy.min_instances, policy.max_instances, policy.target_utilization, policy.scale_up_threshold, policy.scale_down_threshold, policy.scale_up_step, policy.scale_down_step, policy.cooldown_period, policy.is_enabled, policy.created_at, policy.updated_at, ), ) conn.commit() return policy def get_auto_scaling_policy(self, policy_id: str) -> AutoScalingPolicy | None: """获取自动扩缩容策略""" with self._get_db() as conn: row = conn.execute( "SELECT * FROM auto_scaling_policies WHERE id = ?", (policy_id,) ).fetchone() if row: return self._row_to_auto_scaling_policy(row) return None def list_auto_scaling_policies(self, tenant_id: str) -> list[AutoScalingPolicy]: """列出租户的自动扩缩容策略""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM auto_scaling_policies WHERE tenant_id = ? ORDER BY created_at DESC", (tenant_id,), ).fetchall() return [self._row_to_auto_scaling_policy(row) for row in rows] def evaluate_scaling_policy( self, policy_id: str, current_instances: int, current_utilization: float ) -> ScalingEvent | None: """评估扩缩容策略""" policy = self.get_auto_scaling_policy(policy_id) if not policy or not policy.is_enabled: return None # 检查是否在冷却期 last_event = self.get_last_scaling_event(policy_id) if last_event: last_time = datetime.fromisoformat(last_event.started_at) if (datetime.now() - last_time).total_seconds() < policy.cooldown_period: return None action = None reason = "" if current_utilization > policy.scale_up_threshold: if current_instances < policy.max_instances: action = ScalingAction.SCALE_UP reason = ( f"利用率 {current_utilization:.1%} 超过扩容阈值 {policy.scale_up_threshold:.1%}" ) elif current_utilization < policy.scale_down_threshold: if current_instances > policy.min_instances: action = ScalingAction.SCALE_DOWN reason = f"利用率 {current_utilization:.1%} 低于缩容阈值 {policy.scale_down_threshold:.1%}" if action: if action == ScalingAction.SCALE_UP: new_count = min(current_instances + policy.scale_up_step, policy.max_instances) else: new_count = max(current_instances - policy.scale_down_step, policy.min_instances) return self._create_scaling_event(policy, action, current_instances, new_count, reason) return None def _create_scaling_event( self, policy: AutoScalingPolicy, action: ScalingAction, from_count: int, to_count: int, reason: str, ) -> ScalingEvent: """创建扩缩容事件""" event_id = f"se_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() event = ScalingEvent( id=event_id, policy_id=policy.id, tenant_id=policy.tenant_id, action=action, from_count=from_count, to_count=to_count, reason=reason, triggered_by="auto", status="pending", started_at=now, completed_at=None, error_message=None, ) with self._get_db() as conn: conn.execute( """ INSERT INTO scaling_events (id, policy_id, tenant_id, action, from_count, to_count, reason, triggered_by, status, started_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( event.id, event.policy_id, event.tenant_id, event.action.value, event.from_count, event.to_count, event.reason, event.triggered_by, event.status, event.started_at, ), ) conn.commit() return event def get_last_scaling_event(self, policy_id: str) -> ScalingEvent | None: """获取最近的扩缩容事件""" with self._get_db() as conn: row = conn.execute( """SELECT * FROM scaling_events WHERE policy_id = ? ORDER BY started_at DESC LIMIT 1""", (policy_id,), ).fetchone() if row: return self._row_to_scaling_event(row) return None def update_scaling_event_status( self, event_id: str, status: str, error_message: str = None ) -> ScalingEvent | None: """更新扩缩容事件状态""" now = datetime.now().isoformat() with self._get_db() as conn: if status in ["completed", "failed"]: conn.execute( """ UPDATE scaling_events SET status = ?, completed_at = ?, error_message = ? WHERE id = ? """, (status, now, error_message, event_id), ) else: conn.execute( """ UPDATE scaling_events SET status = ?, error_message = ? WHERE id = ? """, (status, error_message, event_id), ) conn.commit() return self.get_scaling_event(event_id) def get_scaling_event(self, event_id: str) -> ScalingEvent | None: """获取扩缩容事件""" with self._get_db() as conn: row = conn.execute("SELECT * FROM scaling_events WHERE id = ?", (event_id,)).fetchone() if row: return self._row_to_scaling_event(row) return None def list_scaling_events( self, tenant_id: str, policy_id: str = None, limit: int = 100 ) -> list[ScalingEvent]: """列出租户的扩缩容事件""" query = "SELECT * FROM scaling_events WHERE tenant_id = ?" params = [tenant_id] if policy_id: query += " AND policy_id = ?" params.append(policy_id) query += " ORDER BY started_at DESC LIMIT ?" params.append(limit) with self._get_db() as conn: rows = conn.execute(query, params).fetchall() return [self._row_to_scaling_event(row) for row in rows] # ==================== 健康检查与故障转移 ==================== def create_health_check( self, tenant_id: str, name: str, target_type: str, target_id: str, check_type: str, check_config: dict, interval: int = 60, timeout: int = 10, retry_count: int = 3, ) -> HealthCheck: """创建健康检查""" check_id = f"hc_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() check = HealthCheck( id=check_id, tenant_id=tenant_id, name=name, target_type=target_type, target_id=target_id, check_type=check_type, check_config=check_config, interval=interval, timeout=timeout, retry_count=retry_count, healthy_threshold=2, unhealthy_threshold=3, is_enabled=True, created_at=now, updated_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO health_checks (id, tenant_id, name, target_type, target_id, check_type, check_config, interval, timeout, retry_count, healthy_threshold, unhealthy_threshold, is_enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( check.id, check.tenant_id, check.name, check.target_type, check.target_id, check.check_type, json.dumps(check.check_config), check.interval, check.timeout, check.retry_count, check.healthy_threshold, check.unhealthy_threshold, check.is_enabled, check.created_at, check.updated_at, ), ) conn.commit() return check def get_health_check(self, check_id: str) -> HealthCheck | None: """获取健康检查配置""" with self._get_db() as conn: row = conn.execute("SELECT * FROM health_checks WHERE id = ?", (check_id,)).fetchone() if row: return self._row_to_health_check(row) return None def list_health_checks(self, tenant_id: str) -> list[HealthCheck]: """列出租户的健康检查""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM health_checks WHERE tenant_id = ? ORDER BY created_at DESC", (tenant_id,), ).fetchall() return [self._row_to_health_check(row) for row in rows] async def execute_health_check(self, check_id: str) -> HealthCheckResult: """执行健康检查""" check = self.get_health_check(check_id) if not check: raise ValueError(f"Health check {check_id} not found") result_id = f"hcr_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() # 模拟健康检查(实际实现需要根据 check_type 执行具体检查) if check.check_type == "http": status, response_time, message = await self._check_http_health(check) elif check.check_type == "tcp": status, response_time, message = await self._check_tcp_health(check) elif check.check_type == "ping": status, response_time, message = await self._check_ping_health(check) else: status, response_time, message = HealthStatus.UNKNOWN, 0, "Unknown check type" result = HealthCheckResult( id=result_id, check_id=check_id, tenant_id=check.tenant_id, status=status, response_time=response_time, message=message, details={}, checked_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO health_check_results (id, check_id, tenant_id, status, response_time, message, details, checked_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( result.id, result.check_id, result.tenant_id, result.status.value, result.response_time, result.message, json.dumps(result.details), result.checked_at, ), ) conn.commit() return result async def _check_http_health(self, check: HealthCheck) -> tuple[HealthStatus, float, str]: """HTTP 健康检查""" config = check.check_config url = config.get("url") expected_status = config.get("expected_status", 200) if not url: return HealthStatus.UNHEALTHY, 0, "URL not configured" start_time = time.time() try: async with httpx.AsyncClient() as client: response = await client.get(url, timeout=check.timeout) response_time = (time.time() - start_time) * 1000 if response.status_code == expected_status: return HealthStatus.HEALTHY, response_time, "OK" else: return ( HealthStatus.DEGRADED, response_time, f"Unexpected status: {response.status_code}", ) except Exception as e: return HealthStatus.UNHEALTHY, (time.time() - start_time) * 1000, str(e) async def _check_tcp_health(self, check: HealthCheck) -> tuple[HealthStatus, float, str]: """TCP 健康检查""" config = check.check_config host = config.get("host") port = config.get("port") if not host or not port: return HealthStatus.UNHEALTHY, 0, "Host or port not configured" start_time = time.time() try: reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=check.timeout ) response_time = (time.time() - start_time) * 1000 writer.close() await writer.wait_closed() return HealthStatus.HEALTHY, response_time, "TCP connection successful" except TimeoutError: return HealthStatus.UNHEALTHY, (time.time() - start_time) * 1000, "Connection timeout" except Exception as e: return HealthStatus.UNHEALTHY, (time.time() - start_time) * 1000, str(e) async def _check_ping_health(self, check: HealthCheck) -> tuple[HealthStatus, float, str]: """Ping 健康检查(模拟)""" config = check.check_config host = config.get("host") if not host: return HealthStatus.UNHEALTHY, 0, "Host not configured" # 实际实现需要使用系统 ping 命令或 ICMP 库 # 这里模拟成功 return HealthStatus.HEALTHY, 10.0, "Ping successful" def get_health_check_results(self, check_id: str, limit: int = 100) -> list[HealthCheckResult]: """获取健康检查历史结果""" with self._get_db() as conn: rows = conn.execute( """SELECT * FROM health_check_results WHERE check_id = ? ORDER BY checked_at DESC LIMIT ?""", (check_id, limit), ).fetchall() return [self._row_to_health_check_result(row) for row in rows] # ==================== 故障转移 ==================== def create_failover_config( self, tenant_id: str, name: str, primary_region: str, secondary_regions: list[str], failover_trigger: str, auto_failover: bool = False, failover_timeout: int = 300, health_check_id: str = None, ) -> FailoverConfig: """创建故障转移配置""" config_id = f"fc_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() config = FailoverConfig( id=config_id, tenant_id=tenant_id, name=name, primary_region=primary_region, secondary_regions=secondary_regions, failover_trigger=failover_trigger, auto_failover=auto_failover, failover_timeout=failover_timeout, health_check_id=health_check_id, is_enabled=True, created_at=now, updated_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO failover_configs (id, tenant_id, name, primary_region, secondary_regions, failover_trigger, auto_failover, failover_timeout, health_check_id, is_enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( config.id, config.tenant_id, config.name, config.primary_region, json.dumps(config.secondary_regions), config.failover_trigger, config.auto_failover, config.failover_timeout, config.health_check_id, config.is_enabled, config.created_at, config.updated_at, ), ) conn.commit() return config def get_failover_config(self, config_id: str) -> FailoverConfig | None: """获取故障转移配置""" with self._get_db() as conn: row = conn.execute( "SELECT * FROM failover_configs WHERE id = ?", (config_id,) ).fetchone() if row: return self._row_to_failover_config(row) return None def list_failover_configs(self, tenant_id: str) -> list[FailoverConfig]: """列出租户的故障转移配置""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM failover_configs WHERE tenant_id = ? ORDER BY created_at DESC", (tenant_id,), ).fetchall() return [self._row_to_failover_config(row) for row in rows] def initiate_failover(self, config_id: str, reason: str) -> FailoverEvent | None: """发起故障转移""" config = self.get_failover_config(config_id) if not config or not config.is_enabled: return None event_id = f"fe_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() # 选择备用区域 to_region = config.secondary_regions[0] if config.secondary_regions else None if not to_region: return None event = FailoverEvent( id=event_id, config_id=config_id, tenant_id=config.tenant_id, from_region=config.primary_region, to_region=to_region, reason=reason, status="initiated", started_at=now, completed_at=None, rolled_back_at=None, ) with self._get_db() as conn: conn.execute( """ INSERT INTO failover_events (id, config_id, tenant_id, from_region, to_region, reason, status, started_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( event.id, event.config_id, event.tenant_id, event.from_region, event.to_region, event.reason, event.status, event.started_at, ), ) conn.commit() return event def update_failover_status(self, event_id: str, status: str) -> FailoverEvent | None: """更新故障转移状态""" now = datetime.now().isoformat() with self._get_db() as conn: if status == "completed": conn.execute( """ UPDATE failover_events SET status = ?, completed_at = ? WHERE id = ? """, (status, now, event_id), ) elif status == "rolled_back": conn.execute( """ UPDATE failover_events SET status = ?, rolled_back_at = ? WHERE id = ? """, (status, now, event_id), ) else: conn.execute( """ UPDATE failover_events SET status = ? WHERE id = ? """, (status, event_id), ) conn.commit() return self.get_failover_event(event_id) def get_failover_event(self, event_id: str) -> FailoverEvent | None: """获取故障转移事件""" with self._get_db() as conn: row = conn.execute("SELECT * FROM failover_events WHERE id = ?", (event_id,)).fetchone() if row: return self._row_to_failover_event(row) return None def list_failover_events(self, tenant_id: str, limit: int = 100) -> list[FailoverEvent]: """列出租户的故障转移事件""" with self._get_db() as conn: rows = conn.execute( """SELECT * FROM failover_events WHERE tenant_id = ? ORDER BY started_at DESC LIMIT ?""", (tenant_id, limit), ).fetchall() return [self._row_to_failover_event(row) for row in rows] # ==================== 数据备份与恢复 ==================== def create_backup_job( self, tenant_id: str, name: str, backup_type: str, target_type: str, target_id: str, schedule: str, retention_days: int = 30, encryption_enabled: bool = True, compression_enabled: bool = True, storage_location: str = None, ) -> BackupJob: """创建备份任务""" job_id = f"bj_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() job = BackupJob( id=job_id, tenant_id=tenant_id, name=name, backup_type=backup_type, target_type=target_type, target_id=target_id, schedule=schedule, retention_days=retention_days, encryption_enabled=encryption_enabled, compression_enabled=compression_enabled, storage_location=storage_location or f"backups/{tenant_id}", is_enabled=True, created_at=now, updated_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO backup_jobs (id, tenant_id, name, backup_type, target_type, target_id, schedule, retention_days, encryption_enabled, compression_enabled, storage_location, is_enabled, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( job.id, job.tenant_id, job.name, job.backup_type, job.target_type, job.target_id, job.schedule, job.retention_days, job.encryption_enabled, job.compression_enabled, job.storage_location, job.is_enabled, job.created_at, job.updated_at, ), ) conn.commit() return job def get_backup_job(self, job_id: str) -> BackupJob | None: """获取备份任务""" with self._get_db() as conn: row = conn.execute("SELECT * FROM backup_jobs WHERE id = ?", (job_id,)).fetchone() if row: return self._row_to_backup_job(row) return None def list_backup_jobs(self, tenant_id: str) -> list[BackupJob]: """列出租户的备份任务""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM backup_jobs WHERE tenant_id = ? ORDER BY created_at DESC", (tenant_id,), ).fetchall() return [self._row_to_backup_job(row) for row in rows] def execute_backup(self, job_id: str) -> BackupRecord | None: """执行备份""" job = self.get_backup_job(job_id) if not job or not job.is_enabled: return None record_id = f"br_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() record = BackupRecord( id=record_id, job_id=job_id, tenant_id=job.tenant_id, status=BackupStatus.IN_PROGRESS, size_bytes=0, checksum="", started_at=now, completed_at=None, verified_at=None, error_message=None, storage_path=f"{job.storage_location}/{record_id}", ) with self._get_db() as conn: conn.execute( """ INSERT INTO backup_records (id, job_id, tenant_id, status, size_bytes, checksum, started_at, storage_path) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( record.id, record.job_id, record.tenant_id, record.status.value, record.size_bytes, record.checksum, record.started_at, record.storage_path, ), ) conn.commit() # 异步执行备份(实际实现中应该启动后台任务) # 这里模拟备份完成 self._complete_backup(record_id, size_bytes=1024 * 1024 * 100) # 模拟100MB return record def _complete_backup(self, record_id: str, size_bytes: int, checksum: str = None) -> None: """完成备份""" now = datetime.now().isoformat() checksum = checksum or hashlib.sha256(str(time.time()).encode()).hexdigest()[:16] with self._get_db() as conn: conn.execute( """ UPDATE backup_records SET status = ?, size_bytes = ?, checksum = ?, completed_at = ? WHERE id = ? """, (BackupStatus.COMPLETED.value, size_bytes, checksum, now, record_id), ) conn.commit() def get_backup_record(self, record_id: str) -> BackupRecord | None: """获取备份记录""" with self._get_db() as conn: row = conn.execute("SELECT * FROM backup_records WHERE id = ?", (record_id,)).fetchone() if row: return self._row_to_backup_record(row) return None def list_backup_records( self, tenant_id: str, job_id: str = None, limit: int = 100 ) -> list[BackupRecord]: """列出租户的备份记录""" query = "SELECT * FROM backup_records WHERE tenant_id = ?" params = [tenant_id] if job_id: query += " AND job_id = ?" params.append(job_id) query += " ORDER BY started_at DESC LIMIT ?" params.append(limit) with self._get_db() as conn: rows = conn.execute(query, params).fetchall() return [self._row_to_backup_record(row) for row in rows] def restore_from_backup(self, record_id: str) -> bool: """从备份恢复""" record = self.get_backup_record(record_id) if not record or record.status != BackupStatus.COMPLETED: return False # 实际实现中执行恢复操作 # 这里模拟成功 return True # ==================== 成本优化 ==================== def generate_cost_report(self, tenant_id: str, year: int, month: int) -> CostReport: """生成成本报告""" report_id = f"cr_{uuid.uuid4().hex[:16]}" report_period = f"{year:04d}-{month:02d}" now = datetime.now().isoformat() # 获取资源利用率数据 utilizations = self.get_resource_utilizations(tenant_id, report_period) # 计算成本分解 breakdown = {} total_cost = 0.0 for util in utilizations: # 简化计算:假设每单位资源每月成本 unit_cost = 10.0 resource_cost = unit_cost * util.utilization_rate breakdown[util.resource_type.value] = ( breakdown.get(util.resource_type.value, 0) + resource_cost ) total_cost += resource_cost # 检测异常 anomalies = self._detect_cost_anomalies(utilizations) # 计算趋势 trends = self._calculate_cost_trends(tenant_id, year, month) report = CostReport( id=report_id, tenant_id=tenant_id, report_period=report_period, total_cost=total_cost, currency="CNY", breakdown=breakdown, trends=trends, anomalies=anomalies, created_at=now, ) with self._get_db() as conn: conn.execute( """ INSERT INTO cost_reports (id, tenant_id, report_period, total_cost, currency, breakdown, trends, anomalies, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( report.id, report.tenant_id, report.report_period, report.total_cost, report.currency, json.dumps(report.breakdown), json.dumps(report.trends), json.dumps(report.anomalies), report.created_at, ), ) conn.commit() return report def _detect_cost_anomalies(self, utilizations: list[ResourceUtilization]) -> list[dict]: """检测成本异常""" anomalies = [] for util in utilizations: # 检测低利用率 if util.utilization_rate < 0.1: anomalies.append( { "type": "low_utilization", "resource_type": util.resource_type.value, "resource_id": util.resource_id, "utilization_rate": util.utilization_rate, "severity": "high" if util.utilization_rate < 0.05 else "medium", } ) # 检测高峰利用率 if util.peak_utilization > 0.9: anomalies.append( { "type": "high_peak", "resource_type": util.resource_type.value, "resource_id": util.resource_id, "peak_utilization": util.peak_utilization, "severity": "medium", } ) return anomalies def _calculate_cost_trends(self, tenant_id: str, year: int, month: int) -> dict: """计算成本趋势""" # 简化实现:返回模拟趋势 return { "month_over_month": 0.05, "year_over_year": 0.15, "forecast_next_month": 1.05, } # 5% 增长 # 15% 增长 def record_resource_utilization( self, tenant_id: str, resource_type: ResourceType, resource_id: str, utilization_rate: float, peak_utilization: float, avg_utilization: float, idle_time_percent: float, report_date: str, recommendations: list[str] = None, ) -> ResourceUtilization: """记录资源利用率""" util_id = f"ru_{uuid.uuid4().hex[:16]}" util = ResourceUtilization( id=util_id, tenant_id=tenant_id, resource_type=resource_type, resource_id=resource_id, utilization_rate=utilization_rate, peak_utilization=peak_utilization, avg_utilization=avg_utilization, idle_time_percent=idle_time_percent, report_date=report_date, recommendations=recommendations or [], ) with self._get_db() as conn: conn.execute( """ INSERT INTO resource_utilizations (id, tenant_id, resource_type, resource_id, utilization_rate, peak_utilization, avg_utilization, idle_time_percent, report_date, recommendations) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( util.id, util.tenant_id, util.resource_type.value, util.resource_id, util.utilization_rate, util.peak_utilization, util.avg_utilization, util.idle_time_percent, util.report_date, json.dumps(util.recommendations), ), ) conn.commit() return util def get_resource_utilizations( self, tenant_id: str, report_period: str ) -> list[ResourceUtilization]: """获取资源利用率列表""" with self._get_db() as conn: rows = conn.execute( """SELECT * FROM resource_utilizations WHERE tenant_id = ? AND report_date LIKE ? ORDER BY report_date DESC""", (tenant_id, f"{report_period}%"), ).fetchall() return [self._row_to_resource_utilization(row) for row in rows] def detect_idle_resources(self, tenant_id: str) -> list[IdleResource]: """检测闲置资源""" idle_resources = [] # 获取最近30天的利用率数据 with self._get_db() as conn: thirty_days_ago = (datetime.now() - timedelta(days=30)).isoformat() rows = conn.execute( """SELECT resource_type, resource_id, AVG(utilization_rate) as avg_utilization, MAX(idle_time_percent) as max_idle_time FROM resource_utilizations WHERE tenant_id = ? AND report_date > ? GROUP BY resource_type, resource_id HAVING avg_utilization < 0.1 AND max_idle_time > 0.8""", (tenant_id, thirty_days_ago), ).fetchall() for row in rows: idle_id = f"ir_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() idle_resource = IdleResource( id=idle_id, tenant_id=tenant_id, resource_type=ResourceType(row["resource_type"]), resource_id=row["resource_id"], resource_name=f"{row['resource_type']}-{row['resource_id']}", idle_since=thirty_days_ago, estimated_monthly_cost=50.0, # 简化计算 currency="CNY", reason="Low utilization rate over 30 days", recommendation="Consider downsizing or terminating this resource", detected_at=now, ) conn.execute( """ INSERT OR REPLACE INTO idle_resources (id, tenant_id, resource_type, resource_id, resource_name, idle_since, estimated_monthly_cost, currency, reason, recommendation, detected_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( idle_resource.id, idle_resource.tenant_id, idle_resource.resource_type.value, idle_resource.resource_id, idle_resource.resource_name, idle_resource.idle_since, idle_resource.estimated_monthly_cost, idle_resource.currency, idle_resource.reason, idle_resource.recommendation, idle_resource.detected_at, ), ) idle_resources.append(idle_resource) conn.commit() return idle_resources def get_idle_resources(self, tenant_id: str) -> list[IdleResource]: """获取闲置资源列表""" with self._get_db() as conn: rows = conn.execute( "SELECT * FROM idle_resources WHERE tenant_id = ? ORDER BY detected_at DESC", (tenant_id,), ).fetchall() return [self._row_to_idle_resource(row) for row in rows] def generate_cost_optimization_suggestions( self, tenant_id: str ) -> list[CostOptimizationSuggestion]: """生成成本优化建议""" suggestions = [] # 基于闲置资源生成建议 idle_resources = self.detect_idle_resources(tenant_id) total_potential_savings = sum(r.estimated_monthly_cost for r in idle_resources) if total_potential_savings > 0: suggestion_id = f"cos_{uuid.uuid4().hex[:16]}" now = datetime.now().isoformat() suggestion = CostOptimizationSuggestion( id=suggestion_id, tenant_id=tenant_id, category="resource_rightsize", title="清理闲置资源", description=f"检测到 {len(idle_resources)} 个闲置资源,建议清理以节省成本。", potential_savings=total_potential_savings, currency="CNY", confidence=0.85, difficulty="easy", implementation_steps=[ "Review the list of idle resources", "Confirm resources are no longer needed", "Terminate or downsize unused resources", ], risk_level="low", is_applied=False, created_at=now, applied_at=None, ) with self._get_db() as conn: conn.execute( """ INSERT INTO cost_optimization_suggestions (id, tenant_id, category, title, description, potential_savings, currency, confidence, difficulty, implementation_steps, risk_level, is_applied, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( suggestion.id, suggestion.tenant_id, suggestion.category, suggestion.title, suggestion.description, suggestion.potential_savings, suggestion.currency, suggestion.confidence, suggestion.difficulty, json.dumps(suggestion.implementation_steps), suggestion.risk_level, suggestion.is_applied, suggestion.created_at, ), ) conn.commit() suggestions.append(suggestion) # 添加更多优化建议... return suggestions def get_cost_optimization_suggestions( self, tenant_id: str, is_applied: bool = None ) -> list[CostOptimizationSuggestion]: """获取成本优化建议""" query = "SELECT * FROM cost_optimization_suggestions WHERE tenant_id = ?" params = [tenant_id] if is_applied is not None: query += " AND is_applied = ?" params.append(1 if is_applied else 0) query += " ORDER BY potential_savings DESC" with self._get_db() as conn: rows = conn.execute(query, params).fetchall() return [self._row_to_cost_optimization_suggestion(row) for row in rows] def apply_cost_optimization_suggestion( self, suggestion_id: str ) -> CostOptimizationSuggestion | None: """应用成本优化建议""" now = datetime.now().isoformat() with self._get_db() as conn: conn.execute( """ UPDATE cost_optimization_suggestions SET is_applied = ?, applied_at = ? WHERE id = ? """, (True, now, suggestion_id), ) conn.commit() return self.get_cost_optimization_suggestion(suggestion_id) def get_cost_optimization_suggestion( self, suggestion_id: str ) -> CostOptimizationSuggestion | None: """获取成本优化建议详情""" with self._get_db() as conn: row = conn.execute( "SELECT * FROM cost_optimization_suggestions WHERE id = ?", (suggestion_id,) ).fetchone() if row: return self._row_to_cost_optimization_suggestion(row) return None # ==================== 辅助方法:数据库行转换 ==================== def _row_to_alert_rule(self, row) -> AlertRule: return AlertRule( id=row["id"], tenant_id=row["tenant_id"], name=row["name"], description=row["description"], rule_type=AlertRuleType(row["rule_type"]), severity=AlertSeverity(row["severity"]), metric=row["metric"], condition=row["condition"], threshold=row["threshold"], duration=row["duration"], evaluation_interval=row["evaluation_interval"], channels=json.loads(row["channels"]), labels=json.loads(row["labels"]), annotations=json.loads(row["annotations"]), is_enabled=bool(row["is_enabled"]), created_at=row["created_at"], updated_at=row["updated_at"], created_by=row["created_by"], ) def _row_to_alert_channel(self, row) -> AlertChannel: return AlertChannel( id=row["id"], tenant_id=row["tenant_id"], name=row["name"], channel_type=AlertChannelType(row["channel_type"]), config=json.loads(row["config"]), severity_filter=json.loads(row["severity_filter"]), is_enabled=bool(row["is_enabled"]), success_count=row["success_count"], fail_count=row["fail_count"], last_used_at=row["last_used_at"], created_at=row["created_at"], updated_at=row["updated_at"], ) def _row_to_alert(self, row) -> Alert: return Alert( id=row["id"], rule_id=row["rule_id"], tenant_id=row["tenant_id"], severity=AlertSeverity(row["severity"]), status=AlertStatus(row["status"]), title=row["title"], description=row["description"], metric=row["metric"], value=row["value"], threshold=row["threshold"], labels=json.loads(row["labels"]), annotations=json.loads(row["annotations"]), started_at=row["started_at"], resolved_at=row["resolved_at"], acknowledged_by=row["acknowledged_by"], acknowledged_at=row["acknowledged_at"], notification_sent=json.loads(row["notification_sent"]), suppression_count=row["suppression_count"], ) def _row_to_suppression_rule(self, row) -> AlertSuppressionRule: return AlertSuppressionRule( id=row["id"], tenant_id=row["tenant_id"], name=row["name"], matchers=json.loads(row["matchers"]), duration=row["duration"], is_regex=bool(row["is_regex"]), created_at=row["created_at"], expires_at=row["expires_at"], ) def _row_to_resource_metric(self, row) -> ResourceMetric: return ResourceMetric( id=row["id"], tenant_id=row["tenant_id"], resource_type=ResourceType(row["resource_type"]), resource_id=row["resource_id"], metric_name=row["metric_name"], metric_value=row["metric_value"], unit=row["unit"], timestamp=row["timestamp"], metadata=json.loads(row["metadata"]), ) def _row_to_capacity_plan(self, row) -> CapacityPlan: return CapacityPlan( id=row["id"], tenant_id=row["tenant_id"], resource_type=ResourceType(row["resource_type"]), current_capacity=row["current_capacity"], predicted_capacity=row["predicted_capacity"], prediction_date=row["prediction_date"], confidence=row["confidence"], recommended_action=row["recommended_action"], estimated_cost=row["estimated_cost"], created_at=row["created_at"], ) def _row_to_auto_scaling_policy(self, row) -> AutoScalingPolicy: return AutoScalingPolicy( id=row["id"], tenant_id=row["tenant_id"], name=row["name"], resource_type=ResourceType(row["resource_type"]), min_instances=row["min_instances"], max_instances=row["max_instances"], target_utilization=row["target_utilization"], scale_up_threshold=row["scale_up_threshold"], scale_down_threshold=row["scale_down_threshold"], scale_up_step=row["scale_up_step"], scale_down_step=row["scale_down_step"], cooldown_period=row["cooldown_period"], is_enabled=bool(row["is_enabled"]), created_at=row["created_at"], updated_at=row["updated_at"], ) def _row_to_scaling_event(self, row) -> ScalingEvent: return ScalingEvent( id=row["id"], policy_id=row["policy_id"], tenant_id=row["tenant_id"], action=ScalingAction(row["action"]), from_count=row["from_count"], to_count=row["to_count"], reason=row["reason"], triggered_by=row["triggered_by"], status=row["status"], started_at=row["started_at"], completed_at=row["completed_at"], error_message=row["error_message"], ) def _row_to_health_check(self, row) -> HealthCheck: return HealthCheck( id=row["id"], tenant_id=row["tenant_id"], name=row["name"], target_type=row["target_type"], target_id=row["target_id"], check_type=row["check_type"], check_config=json.loads(row["check_config"]), interval=row["interval"], timeout=row["timeout"], retry_count=row["retry_count"], healthy_threshold=row["healthy_threshold"], unhealthy_threshold=row["unhealthy_threshold"], is_enabled=bool(row["is_enabled"]), created_at=row["created_at"], updated_at=row["updated_at"], ) def _row_to_health_check_result(self, row) -> HealthCheckResult: return HealthCheckResult( id=row["id"], check_id=row["check_id"], tenant_id=row["tenant_id"], status=HealthStatus(row["status"]), response_time=row["response_time"], message=row["message"], details=json.loads(row["details"]), checked_at=row["checked_at"], ) def _row_to_failover_config(self, row) -> FailoverConfig: return FailoverConfig( id=row["id"], tenant_id=row["tenant_id"], name=row["name"], primary_region=row["primary_region"], secondary_regions=json.loads(row["secondary_regions"]), failover_trigger=row["failover_trigger"], auto_failover=bool(row["auto_failover"]), failover_timeout=row["failover_timeout"], health_check_id=row["health_check_id"], is_enabled=bool(row["is_enabled"]), created_at=row["created_at"], updated_at=row["updated_at"], ) def _row_to_failover_event(self, row) -> FailoverEvent: return FailoverEvent( id=row["id"], config_id=row["config_id"], tenant_id=row["tenant_id"], from_region=row["from_region"], to_region=row["to_region"], reason=row["reason"], status=row["status"], started_at=row["started_at"], completed_at=row["completed_at"], rolled_back_at=row["rolled_back_at"], ) def _row_to_backup_job(self, row) -> BackupJob: return BackupJob( id=row["id"], tenant_id=row["tenant_id"], name=row["name"], backup_type=row["backup_type"], target_type=row["target_type"], target_id=row["target_id"], schedule=row["schedule"], retention_days=row["retention_days"], encryption_enabled=bool(row["encryption_enabled"]), compression_enabled=bool(row["compression_enabled"]), storage_location=row["storage_location"], is_enabled=bool(row["is_enabled"]), created_at=row["created_at"], updated_at=row["updated_at"], ) def _row_to_backup_record(self, row) -> BackupRecord: return BackupRecord( id=row["id"], job_id=row["job_id"], tenant_id=row["tenant_id"], status=BackupStatus(row["status"]), size_bytes=row["size_bytes"], checksum=row["checksum"], started_at=row["started_at"], completed_at=row["completed_at"], verified_at=row["verified_at"], error_message=row["error_message"], storage_path=row["storage_path"], ) def _row_to_resource_utilization(self, row) -> ResourceUtilization: return ResourceUtilization( id=row["id"], tenant_id=row["tenant_id"], resource_type=ResourceType(row["resource_type"]), resource_id=row["resource_id"], utilization_rate=row["utilization_rate"], peak_utilization=row["peak_utilization"], avg_utilization=row["avg_utilization"], idle_time_percent=row["idle_time_percent"], report_date=row["report_date"], recommendations=json.loads(row["recommendations"]), ) def _row_to_idle_resource(self, row) -> IdleResource: return IdleResource( id=row["id"], tenant_id=row["tenant_id"], resource_type=ResourceType(row["resource_type"]), resource_id=row["resource_id"], resource_name=row["resource_name"], idle_since=row["idle_since"], estimated_monthly_cost=row["estimated_monthly_cost"], currency=row["currency"], reason=row["reason"], recommendation=row["recommendation"], detected_at=row["detected_at"], ) def _row_to_cost_optimization_suggestion(self, row) -> CostOptimizationSuggestion: return CostOptimizationSuggestion( id=row["id"], tenant_id=row["tenant_id"], category=row["category"], title=row["title"], description=row["description"], potential_savings=row["potential_savings"], currency=row["currency"], confidence=row["confidence"], difficulty=row["difficulty"], implementation_steps=json.loads(row["implementation_steps"]), risk_level=row["risk_level"], is_applied=bool(row["is_applied"]), created_at=row["created_at"], applied_at=row["applied_at"], ) # Singleton instance _ops_manager = None def get_ops_manager() -> OpsManager: global _ops_manager if _ops_manager is None: _ops_manager = OpsManager() return _ops_manager