- Task 4: AI 能力增强 (ai_manager.py) - 自定义模型训练(领域特定实体识别) - 多模态大模型集成(GPT-4V、Claude 3、Gemini、Kimi-VL) - 知识图谱 RAG 智能问答 - 智能摘要(提取式/生成式/关键点/时间线) - 预测性分析(趋势/异常/增长/演变预测) - Task 5: 运营与增长工具 (growth_manager.py) - 用户行为分析(Mixpanel/Amplitude 集成) - A/B 测试框架 - 邮件营销自动化 - 推荐系统(邀请返利、团队升级激励) - Task 6: 开发者生态 (developer_ecosystem_manager.py) - SDK 发布管理(Python/JavaScript/Go) - 模板市场 - 插件市场 - 开发者文档与示例代码 - Task 8: 运维与监控 (ops_manager.py) - 实时告警系统(PagerDuty/Opsgenie 集成) - 容量规划与自动扩缩容 - 灾备与故障转移 - 成本优化 Phase 8 全部 8 个任务已完成!
1872 lines
72 KiB
Python
1872 lines
72 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InsightFlow Growth Manager - Phase 8 Task 5
|
||
运营与增长工具模块
|
||
- 用户行为分析(Mixpanel/Amplitude 集成)
|
||
- A/B 测试框架
|
||
- 邮件营销自动化
|
||
- 推荐系统(邀请返利、团队升级激励)
|
||
|
||
作者: InsightFlow Team
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import sqlite3
|
||
import httpx
|
||
import asyncio
|
||
import random
|
||
import statistics
|
||
from typing import List, Dict, Optional, Any, Tuple
|
||
from dataclasses import dataclass, field, asdict
|
||
from datetime import datetime, timedelta
|
||
from enum import Enum
|
||
from collections import defaultdict
|
||
import hashlib
|
||
import uuid
|
||
import re
|
||
|
||
# Database path
|
||
DB_PATH = os.path.join(os.path.dirname(__file__), "insightflow.db")
|
||
|
||
|
||
class EventType(str, Enum):
|
||
"""事件类型"""
|
||
PAGE_VIEW = "page_view" # 页面浏览
|
||
FEATURE_USE = "feature_use" # 功能使用
|
||
CONVERSION = "conversion" # 转化
|
||
SIGNUP = "signup" # 注册
|
||
LOGIN = "login" # 登录
|
||
UPGRADE = "upgrade" # 升级
|
||
DOWNGRADE = "downgrade" # 降级
|
||
CANCEL = "cancel" # 取消订阅
|
||
INVITE_SENT = "invite_sent" # 发送邀请
|
||
INVITE_ACCEPTED = "invite_accepted" # 接受邀请
|
||
REFERRAL_REWARD = "referral_reward" # 推荐奖励
|
||
|
||
|
||
class ExperimentStatus(str, Enum):
|
||
"""实验状态"""
|
||
DRAFT = "draft" # 草稿
|
||
RUNNING = "running" # 运行中
|
||
PAUSED = "paused" # 暂停
|
||
COMPLETED = "completed" # 已完成
|
||
ARCHIVED = "archived" # 已归档
|
||
|
||
|
||
class TrafficAllocationType(str, Enum):
|
||
"""流量分配类型"""
|
||
RANDOM = "random" # 随机分配
|
||
STRATIFIED = "stratified" # 分层分配
|
||
TARGETED = "targeted" # 定向分配
|
||
|
||
|
||
class EmailTemplateType(str, Enum):
|
||
"""邮件模板类型"""
|
||
WELCOME = "welcome" # 欢迎邮件
|
||
ONBOARDING = "onboarding" # 引导邮件
|
||
FEATURE_ANNOUNCEMENT = "feature_announcement" # 功能公告
|
||
CHURN_RECOVERY = "churn_recovery" # 流失挽回
|
||
UPGRADE_PROMPT = "upgrade_prompt" # 升级提示
|
||
REFERRAL = "referral" # 推荐邀请
|
||
NEWSLETTER = "newsletter" # 新闻通讯
|
||
|
||
|
||
class EmailStatus(str, Enum):
|
||
"""邮件状态"""
|
||
DRAFT = "draft" # 草稿
|
||
SCHEDULED = "scheduled" # 已计划
|
||
SENDING = "sending" # 发送中
|
||
SENT = "sent" # 已发送
|
||
DELIVERED = "delivered" # 已送达
|
||
OPENED = "opened" # 已打开
|
||
CLICKED = "clicked" # 已点击
|
||
BOUNCED = "bounced" # 退信
|
||
FAILED = "failed" # 失败
|
||
|
||
|
||
class WorkflowTriggerType(str, Enum):
|
||
"""工作流触发类型"""
|
||
USER_SIGNUP = "user_signup" # 用户注册
|
||
USER_LOGIN = "user_login" # 用户登录
|
||
SUBSCRIPTION_CREATED = "subscription_created" # 创建订阅
|
||
SUBSCRIPTION_CANCELLED = "subscription_cancelled" # 取消订阅
|
||
INACTIVITY = "inactivity" # 不活跃
|
||
MILESTONE = "milestone" # 里程碑
|
||
CUSTOM_EVENT = "custom_event" # 自定义事件
|
||
|
||
|
||
class ReferralStatus(str, Enum):
|
||
"""推荐状态"""
|
||
PENDING = "pending" # 待处理
|
||
CONVERTED = "converted" # 已转化
|
||
REWARDED = "rewarded" # 已奖励
|
||
EXPIRED = "expired" # 已过期
|
||
|
||
|
||
@dataclass
|
||
class AnalyticsEvent:
|
||
"""分析事件"""
|
||
id: str
|
||
tenant_id: str
|
||
user_id: str
|
||
event_type: EventType
|
||
event_name: str
|
||
properties: Dict[str, Any] # 事件属性
|
||
timestamp: datetime
|
||
session_id: Optional[str]
|
||
device_info: Dict[str, str] # 设备信息
|
||
referrer: Optional[str]
|
||
utm_source: Optional[str]
|
||
utm_medium: Optional[str]
|
||
utm_campaign: Optional[str]
|
||
|
||
|
||
@dataclass
|
||
class UserProfile:
|
||
"""用户画像"""
|
||
id: str
|
||
tenant_id: str
|
||
user_id: str
|
||
first_seen: datetime
|
||
last_seen: datetime
|
||
total_sessions: int
|
||
total_events: int
|
||
feature_usage: Dict[str, int] # 功能使用次数
|
||
subscription_history: List[Dict]
|
||
ltv: float # 生命周期价值
|
||
churn_risk_score: float # 流失风险分数
|
||
engagement_score: float # 参与度分数
|
||
created_at: datetime
|
||
updated_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class Funnel:
|
||
"""转化漏斗"""
|
||
id: str
|
||
tenant_id: str
|
||
name: str
|
||
description: str
|
||
steps: List[Dict] # 漏斗步骤
|
||
created_at: datetime
|
||
updated_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class FunnelAnalysis:
|
||
"""漏斗分析结果"""
|
||
funnel_id: str
|
||
period_start: datetime
|
||
period_end: datetime
|
||
total_users: int
|
||
step_conversions: List[Dict] # 每步转化数据
|
||
overall_conversion: float # 总体转化率
|
||
drop_off_points: List[Dict] # 流失点
|
||
|
||
|
||
@dataclass
|
||
class Experiment:
|
||
"""A/B 测试实验"""
|
||
id: str
|
||
tenant_id: str
|
||
name: str
|
||
description: str
|
||
hypothesis: str
|
||
status: ExperimentStatus
|
||
variants: List[Dict] # 实验变体
|
||
traffic_allocation: TrafficAllocationType
|
||
traffic_split: Dict[str, float] # 流量分配比例
|
||
target_audience: Dict # 目标受众
|
||
primary_metric: str # 主要指标
|
||
secondary_metrics: List[str] # 次要指标
|
||
start_date: Optional[datetime]
|
||
end_date: Optional[datetime]
|
||
min_sample_size: int # 最小样本量
|
||
confidence_level: float # 置信水平
|
||
created_at: datetime
|
||
updated_at: datetime
|
||
created_by: str
|
||
|
||
|
||
@dataclass
|
||
class ExperimentResult:
|
||
"""实验结果"""
|
||
id: str
|
||
experiment_id: str
|
||
variant_id: str
|
||
metric_name: str
|
||
sample_size: int
|
||
mean_value: float
|
||
std_dev: float
|
||
confidence_interval: Tuple[float, float]
|
||
p_value: float
|
||
is_significant: bool
|
||
uplift: float # 提升幅度
|
||
created_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class EmailTemplate:
|
||
"""邮件模板"""
|
||
id: str
|
||
tenant_id: str
|
||
name: str
|
||
template_type: EmailTemplateType
|
||
subject: str
|
||
html_content: str
|
||
text_content: str
|
||
variables: List[str] # 模板变量
|
||
preview_text: Optional[str]
|
||
from_name: str
|
||
from_email: str
|
||
reply_to: Optional[str]
|
||
is_active: bool
|
||
created_at: datetime
|
||
updated_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class EmailCampaign:
|
||
"""邮件营销活动"""
|
||
id: str
|
||
tenant_id: str
|
||
name: str
|
||
template_id: str
|
||
status: str
|
||
recipient_count: int
|
||
sent_count: int
|
||
delivered_count: int
|
||
opened_count: int
|
||
clicked_count: int
|
||
bounced_count: int
|
||
failed_count: int
|
||
scheduled_at: Optional[datetime]
|
||
started_at: Optional[datetime]
|
||
completed_at: Optional[datetime]
|
||
created_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class EmailLog:
|
||
"""邮件发送记录"""
|
||
id: str
|
||
campaign_id: str
|
||
tenant_id: str
|
||
user_id: str
|
||
email: str
|
||
template_id: str
|
||
status: EmailStatus
|
||
subject: str
|
||
sent_at: Optional[datetime]
|
||
delivered_at: Optional[datetime]
|
||
opened_at: Optional[datetime]
|
||
clicked_at: Optional[datetime]
|
||
ip_address: Optional[str]
|
||
user_agent: Optional[str]
|
||
error_message: Optional[str]
|
||
created_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class AutomationWorkflow:
|
||
"""自动化工作流"""
|
||
id: str
|
||
tenant_id: str
|
||
name: str
|
||
description: str
|
||
trigger_type: WorkflowTriggerType
|
||
trigger_conditions: Dict # 触发条件
|
||
actions: List[Dict] # 执行动作
|
||
is_active: bool
|
||
execution_count: int
|
||
created_at: datetime
|
||
updated_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class ReferralProgram:
|
||
"""推荐计划"""
|
||
id: str
|
||
tenant_id: str
|
||
name: str
|
||
description: str
|
||
referrer_reward_type: str # 奖励类型: credit/discount/feature
|
||
referrer_reward_value: float
|
||
referee_reward_type: str
|
||
referee_reward_value: float
|
||
max_referrals_per_user: int # 每用户最大推荐数
|
||
referral_code_length: int
|
||
expiry_days: int # 推荐码过期天数
|
||
is_active: bool
|
||
created_at: datetime
|
||
updated_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class Referral:
|
||
"""推荐记录"""
|
||
id: str
|
||
program_id: str
|
||
tenant_id: str
|
||
referrer_id: str # 推荐人
|
||
referee_id: Optional[str] # 被推荐人
|
||
referral_code: str
|
||
status: ReferralStatus
|
||
referrer_rewarded: bool
|
||
referee_rewarded: bool
|
||
referrer_reward_value: float
|
||
referee_reward_value: float
|
||
converted_at: Optional[datetime]
|
||
rewarded_at: Optional[datetime]
|
||
expires_at: datetime
|
||
created_at: datetime
|
||
|
||
|
||
@dataclass
|
||
class TeamIncentive:
|
||
"""团队升级激励"""
|
||
id: str
|
||
tenant_id: str
|
||
name: str
|
||
description: str
|
||
target_tier: str # 目标层级
|
||
min_team_size: int
|
||
incentive_type: str # 激励类型
|
||
incentive_value: float
|
||
valid_from: datetime
|
||
valid_until: datetime
|
||
is_active: bool
|
||
created_at: datetime
|
||
|
||
|
||
class GrowthManager:
|
||
"""运营与增长管理主类"""
|
||
|
||
def __init__(self, db_path: str = DB_PATH):
|
||
self.db_path = db_path
|
||
self.mixpanel_token = os.getenv("MIXPANEL_TOKEN", "")
|
||
self.amplitude_api_key = os.getenv("AMPLITUDE_API_KEY", "")
|
||
self.segment_write_key = os.getenv("SEGMENT_WRITE_KEY", "")
|
||
self.sendgrid_api_key = os.getenv("SENDGRID_API_KEY", "")
|
||
|
||
def _get_db(self):
|
||
"""获取数据库连接"""
|
||
conn = sqlite3.connect(self.db_path)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
# ==================== 用户行为分析 ====================
|
||
|
||
async def track_event(self, tenant_id: str, user_id: str, event_type: EventType,
|
||
event_name: str, properties: Dict = None,
|
||
session_id: str = None, device_info: Dict = None,
|
||
referrer: str = None, utm_params: Dict = None) -> AnalyticsEvent:
|
||
"""追踪事件"""
|
||
event_id = f"evt_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now()
|
||
|
||
event = AnalyticsEvent(
|
||
id=event_id,
|
||
tenant_id=tenant_id,
|
||
user_id=user_id,
|
||
event_type=event_type,
|
||
event_name=event_name,
|
||
properties=properties or {},
|
||
timestamp=now,
|
||
session_id=session_id,
|
||
device_info=device_info or {},
|
||
referrer=referrer,
|
||
utm_source=utm_params.get("source") if utm_params else None,
|
||
utm_medium=utm_params.get("medium") if utm_params else None,
|
||
utm_campaign=utm_params.get("campaign") if utm_params else None
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO analytics_events
|
||
(id, tenant_id, user_id, event_type, event_name, properties, timestamp,
|
||
session_id, device_info, referrer, utm_source, utm_medium, utm_campaign)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (event.id, event.tenant_id, event.user_id, event.event_type.value,
|
||
event.event_name, json.dumps(event.properties), event.timestamp.isoformat(),
|
||
event.session_id, json.dumps(event.device_info), event.referrer,
|
||
event.utm_source, event.utm_medium, event.utm_campaign))
|
||
conn.commit()
|
||
|
||
# 异步发送到第三方分析平台
|
||
asyncio.create_task(self._send_to_analytics_platforms(event))
|
||
|
||
# 更新用户画像
|
||
asyncio.create_task(self._update_user_profile(tenant_id, user_id, event_type, event_name))
|
||
|
||
return event
|
||
|
||
async def _send_to_analytics_platforms(self, event: AnalyticsEvent):
|
||
"""发送事件到第三方分析平台"""
|
||
tasks = []
|
||
|
||
if self.mixpanel_token:
|
||
tasks.append(self._send_to_mixpanel(event))
|
||
if self.amplitude_api_key:
|
||
tasks.append(self._send_to_amplitude(event))
|
||
|
||
if tasks:
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
async def _send_to_mixpanel(self, event: AnalyticsEvent):
|
||
"""发送事件到 Mixpanel"""
|
||
try:
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Basic {self.mixpanel_token}"
|
||
}
|
||
|
||
payload = {
|
||
"event": event.event_name,
|
||
"properties": {
|
||
"distinct_id": event.user_id,
|
||
"token": self.mixpanel_token,
|
||
"time": int(event.timestamp.timestamp()),
|
||
**event.properties
|
||
}
|
||
}
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
await client.post(
|
||
"https://api.mixpanel.com/track",
|
||
headers=headers,
|
||
json=[payload],
|
||
timeout=10.0
|
||
)
|
||
except Exception as e:
|
||
print(f"Failed to send to Mixpanel: {e}")
|
||
|
||
async def _send_to_amplitude(self, event: AnalyticsEvent):
|
||
"""发送事件到 Amplitude"""
|
||
try:
|
||
headers = {"Content-Type": "application/json"}
|
||
|
||
payload = {
|
||
"api_key": self.amplitude_api_key,
|
||
"events": [{
|
||
"user_id": event.user_id,
|
||
"event_type": event.event_name,
|
||
"time": int(event.timestamp.timestamp() * 1000),
|
||
"event_properties": event.properties,
|
||
"user_properties": {}
|
||
}]
|
||
}
|
||
|
||
async with httpx.AsyncClient() as client:
|
||
await client.post(
|
||
"https://api.amplitude.com/2/httpapi",
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=10.0
|
||
)
|
||
except Exception as e:
|
||
print(f"Failed to send to Amplitude: {e}")
|
||
|
||
async def _update_user_profile(self, tenant_id: str, user_id: str,
|
||
event_type: EventType, event_name: str):
|
||
"""更新用户画像"""
|
||
with self._get_db() as conn:
|
||
# 检查用户画像是否存在
|
||
row = conn.execute(
|
||
"SELECT * FROM user_profiles WHERE tenant_id = ? AND user_id = ?",
|
||
(tenant_id, user_id)
|
||
).fetchone()
|
||
|
||
now = datetime.now().isoformat()
|
||
|
||
if row:
|
||
# 更新现有画像
|
||
feature_usage = json.loads(row['feature_usage'])
|
||
if event_name not in feature_usage:
|
||
feature_usage[event_name] = 0
|
||
feature_usage[event_name] += 1
|
||
|
||
conn.execute("""
|
||
UPDATE user_profiles
|
||
SET last_seen = ?, total_events = total_events + 1,
|
||
feature_usage = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""", (now, json.dumps(feature_usage), now, row['id']))
|
||
else:
|
||
# 创建新画像
|
||
profile_id = f"up_{uuid.uuid4().hex[:16]}"
|
||
conn.execute("""
|
||
INSERT INTO user_profiles
|
||
(id, tenant_id, user_id, first_seen, last_seen, total_sessions,
|
||
total_events, feature_usage, subscription_history, ltv,
|
||
churn_risk_score, engagement_score, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (profile_id, tenant_id, user_id, now, now, 1, 1,
|
||
json.dumps({event_name: 1}), '[]', 0.0, 0.0, 0.5, now, now))
|
||
|
||
conn.commit()
|
||
|
||
def get_user_profile(self, tenant_id: str, user_id: str) -> Optional[UserProfile]:
|
||
"""获取用户画像"""
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT * FROM user_profiles WHERE tenant_id = ? AND user_id = ?",
|
||
(tenant_id, user_id)
|
||
).fetchone()
|
||
|
||
if row:
|
||
return self._row_to_user_profile(row)
|
||
return None
|
||
|
||
def get_user_analytics_summary(self, tenant_id: str,
|
||
start_date: datetime = None,
|
||
end_date: datetime = None) -> Dict:
|
||
"""获取用户分析汇总"""
|
||
with self._get_db() as conn:
|
||
query = """
|
||
SELECT
|
||
COUNT(DISTINCT user_id) as unique_users,
|
||
COUNT(*) as total_events,
|
||
COUNT(DISTINCT session_id) as total_sessions,
|
||
COUNT(DISTINCT date(timestamp)) as active_days
|
||
FROM analytics_events
|
||
WHERE tenant_id = ?
|
||
"""
|
||
params = [tenant_id]
|
||
|
||
if start_date:
|
||
query += " AND timestamp >= ?"
|
||
params.append(start_date.isoformat())
|
||
if end_date:
|
||
query += " AND timestamp <= ?"
|
||
params.append(end_date.isoformat())
|
||
|
||
row = conn.execute(query, params).fetchone()
|
||
|
||
# 获取事件类型分布
|
||
type_query = """
|
||
SELECT event_type, COUNT(*) as count
|
||
FROM analytics_events
|
||
WHERE tenant_id = ?
|
||
"""
|
||
type_params = [tenant_id]
|
||
|
||
if start_date:
|
||
type_query += " AND timestamp >= ?"
|
||
type_params.append(start_date.isoformat())
|
||
if end_date:
|
||
type_query += " AND timestamp <= ?"
|
||
type_params.append(end_date.isoformat())
|
||
|
||
type_query += " GROUP BY event_type"
|
||
|
||
type_rows = conn.execute(type_query, type_params).fetchall()
|
||
|
||
return {
|
||
"unique_users": row['unique_users'],
|
||
"total_events": row['total_events'],
|
||
"total_sessions": row['total_sessions'],
|
||
"active_days": row['active_days'],
|
||
"events_per_user": row['total_events'] / max(row['unique_users'], 1),
|
||
"events_per_session": row['total_events'] / max(row['total_sessions'], 1),
|
||
"event_type_distribution": {r['event_type']: r['count'] for r in type_rows}
|
||
}
|
||
|
||
def create_funnel(self, tenant_id: str, name: str, description: str,
|
||
steps: List[Dict], created_by: str) -> Funnel:
|
||
"""创建转化漏斗"""
|
||
funnel_id = f"fnl_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now().isoformat()
|
||
|
||
funnel = Funnel(
|
||
id=funnel_id,
|
||
tenant_id=tenant_id,
|
||
name=name,
|
||
description=description,
|
||
steps=steps,
|
||
created_at=now,
|
||
updated_at=now
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO funnels
|
||
(id, tenant_id, name, description, steps, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
""", (funnel.id, funnel.tenant_id, funnel.name, funnel.description,
|
||
json.dumps(funnel.steps), funnel.created_at, funnel.updated_at))
|
||
conn.commit()
|
||
|
||
return funnel
|
||
|
||
def analyze_funnel(self, funnel_id: str,
|
||
period_start: datetime = None,
|
||
period_end: datetime = None) -> Optional[FunnelAnalysis]:
|
||
"""分析漏斗转化率"""
|
||
with self._get_db() as conn:
|
||
funnel_row = conn.execute(
|
||
"SELECT * FROM funnels WHERE id = ?",
|
||
(funnel_id,)
|
||
).fetchone()
|
||
|
||
if not funnel_row:
|
||
return None
|
||
|
||
steps = json.loads(funnel_row['steps'])
|
||
|
||
if not period_start:
|
||
period_start = datetime.now() - timedelta(days=30)
|
||
if not period_end:
|
||
period_end = datetime.now()
|
||
|
||
# 计算每步转化
|
||
step_conversions = []
|
||
previous_count = None
|
||
|
||
for step in steps:
|
||
event_name = step.get('event_name')
|
||
|
||
query = """
|
||
SELECT COUNT(DISTINCT user_id) as user_count
|
||
FROM analytics_events
|
||
WHERE event_name = ? AND timestamp >= ? AND timestamp <= ?
|
||
"""
|
||
row = conn.execute(query, (event_name, period_start.isoformat(),
|
||
period_end.isoformat())).fetchone()
|
||
|
||
user_count = row['user_count'] if row else 0
|
||
|
||
conversion_rate = 0.0
|
||
drop_off_rate = 0.0
|
||
|
||
if previous_count and previous_count > 0:
|
||
conversion_rate = user_count / previous_count
|
||
drop_off_rate = 1 - conversion_rate
|
||
|
||
step_conversions.append({
|
||
"step_name": step.get('name', event_name),
|
||
"event_name": event_name,
|
||
"user_count": user_count,
|
||
"conversion_rate": round(conversion_rate, 4),
|
||
"drop_off_rate": round(drop_off_rate, 4)
|
||
})
|
||
|
||
previous_count = user_count
|
||
|
||
# 计算总体转化率
|
||
if steps and step_conversions:
|
||
first_step_count = step_conversions[0]['user_count']
|
||
last_step_count = step_conversions[-1]['user_count']
|
||
overall_conversion = last_step_count / max(first_step_count, 1)
|
||
else:
|
||
overall_conversion = 0.0
|
||
|
||
# 找出主要流失点
|
||
drop_off_points = [
|
||
s for s in step_conversions
|
||
if s['drop_off_rate'] > 0.2 and s != step_conversions[0]
|
||
]
|
||
|
||
return FunnelAnalysis(
|
||
funnel_id=funnel_id,
|
||
period_start=period_start,
|
||
period_end=period_end,
|
||
total_users=step_conversions[0]['user_count'] if step_conversions else 0,
|
||
step_conversions=step_conversions,
|
||
overall_conversion=round(overall_conversion, 4),
|
||
drop_off_points=drop_off_points
|
||
)
|
||
|
||
def calculate_retention(self, tenant_id: str,
|
||
cohort_date: datetime,
|
||
periods: List[int] = None) -> Dict:
|
||
"""计算留存率"""
|
||
if periods is None:
|
||
periods = [1, 3, 7, 14, 30]
|
||
|
||
with self._get_db() as conn:
|
||
# 获取同期群用户(在 cohort_date 当天首次活跃的用户)
|
||
cohort_query = """
|
||
SELECT DISTINCT user_id
|
||
FROM analytics_events
|
||
WHERE tenant_id = ? AND date(timestamp) = date(?)
|
||
AND user_id IN (
|
||
SELECT user_id FROM user_profiles
|
||
WHERE tenant_id = ? AND date(first_seen) = date(?)
|
||
)
|
||
"""
|
||
cohort_rows = conn.execute(cohort_query,
|
||
(tenant_id, cohort_date.isoformat(),
|
||
tenant_id, cohort_date.isoformat())).fetchall()
|
||
|
||
cohort_users = {r['user_id'] for r in cohort_rows}
|
||
cohort_size = len(cohort_users)
|
||
|
||
if cohort_size == 0:
|
||
return {"cohort_date": cohort_date.isoformat(), "cohort_size": 0, "retention": {}}
|
||
|
||
retention_rates = {}
|
||
|
||
for period in periods:
|
||
period_date = cohort_date + timedelta(days=period)
|
||
|
||
active_query = """
|
||
SELECT COUNT(DISTINCT user_id) as active_count
|
||
FROM analytics_events
|
||
WHERE tenant_id = ? AND date(timestamp) = date(?)
|
||
AND user_id IN ({})
|
||
""".format(','.join(['?' for _ in cohort_users]))
|
||
|
||
params = [tenant_id, period_date.isoformat()] + list(cohort_users)
|
||
row = conn.execute(active_query, params).fetchone()
|
||
|
||
active_count = row['active_count'] if row else 0
|
||
retention_rate = active_count / cohort_size
|
||
|
||
retention_rates[f"day_{period}"] = {
|
||
"active_users": active_count,
|
||
"retention_rate": round(retention_rate, 4)
|
||
}
|
||
|
||
return {
|
||
"cohort_date": cohort_date.isoformat(),
|
||
"cohort_size": cohort_size,
|
||
"retention": retention_rates
|
||
}
|
||
|
||
# ==================== A/B 测试框架 ====================
|
||
|
||
def create_experiment(self, tenant_id: str, name: str, description: str,
|
||
hypothesis: str, variants: List[Dict],
|
||
traffic_allocation: TrafficAllocationType,
|
||
traffic_split: Dict[str, float],
|
||
target_audience: Dict,
|
||
primary_metric: str,
|
||
secondary_metrics: List[str],
|
||
min_sample_size: int = 100,
|
||
confidence_level: float = 0.95,
|
||
created_by: str = None) -> Experiment:
|
||
"""创建 A/B 测试实验"""
|
||
experiment_id = f"exp_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now().isoformat()
|
||
|
||
experiment = Experiment(
|
||
id=experiment_id,
|
||
tenant_id=tenant_id,
|
||
name=name,
|
||
description=description,
|
||
hypothesis=hypothesis,
|
||
status=ExperimentStatus.DRAFT,
|
||
variants=variants,
|
||
traffic_allocation=traffic_allocation,
|
||
traffic_split=traffic_split,
|
||
target_audience=target_audience,
|
||
primary_metric=primary_metric,
|
||
secondary_metrics=secondary_metrics,
|
||
start_date=None,
|
||
end_date=None,
|
||
min_sample_size=min_sample_size,
|
||
confidence_level=confidence_level,
|
||
created_at=now,
|
||
updated_at=now,
|
||
created_by=created_by or "system"
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO experiments
|
||
(id, tenant_id, name, description, hypothesis, status, variants,
|
||
traffic_allocation, traffic_split, target_audience, primary_metric,
|
||
secondary_metrics, start_date, end_date, min_sample_size,
|
||
confidence_level, created_at, updated_at, created_by)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (experiment.id, experiment.tenant_id, experiment.name,
|
||
experiment.description, experiment.hypothesis, experiment.status.value,
|
||
json.dumps(experiment.variants), experiment.traffic_allocation.value,
|
||
json.dumps(experiment.traffic_split), json.dumps(experiment.target_audience),
|
||
experiment.primary_metric, json.dumps(experiment.secondary_metrics),
|
||
experiment.start_date, experiment.end_date, experiment.min_sample_size,
|
||
experiment.confidence_level, experiment.created_at, experiment.updated_at,
|
||
experiment.created_by))
|
||
conn.commit()
|
||
|
||
return experiment
|
||
|
||
def get_experiment(self, experiment_id: str) -> Optional[Experiment]:
|
||
"""获取实验详情"""
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT * FROM experiments WHERE id = ?",
|
||
(experiment_id,)
|
||
).fetchone()
|
||
|
||
if row:
|
||
return self._row_to_experiment(row)
|
||
return None
|
||
|
||
def list_experiments(self, tenant_id: str,
|
||
status: ExperimentStatus = None) -> List[Experiment]:
|
||
"""列出实验"""
|
||
query = "SELECT * FROM experiments WHERE tenant_id = ?"
|
||
params = [tenant_id]
|
||
|
||
if status:
|
||
query += " AND status = ?"
|
||
params.append(status.value)
|
||
|
||
query += " ORDER BY created_at DESC"
|
||
|
||
with self._get_db() as conn:
|
||
rows = conn.execute(query, params).fetchall()
|
||
return [self._row_to_experiment(row) for row in rows]
|
||
|
||
def assign_variant(self, experiment_id: str, user_id: str,
|
||
user_attributes: Dict = None) -> Optional[str]:
|
||
"""为用户分配实验变体"""
|
||
experiment = self.get_experiment(experiment_id)
|
||
if not experiment or experiment.status != ExperimentStatus.RUNNING:
|
||
return None
|
||
|
||
# 检查用户是否已分配
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"""SELECT variant_id FROM experiment_assignments
|
||
WHERE experiment_id = ? AND user_id = ?""",
|
||
(experiment_id, user_id)
|
||
).fetchone()
|
||
|
||
if row:
|
||
return row['variant_id']
|
||
|
||
# 根据分配策略选择变体
|
||
if experiment.traffic_allocation == TrafficAllocationType.RANDOM:
|
||
variant_id = self._random_allocation(experiment.variants, experiment.traffic_split)
|
||
elif experiment.traffic_allocation == TrafficAllocationType.STRATIFIED:
|
||
variant_id = self._stratified_allocation(experiment.variants,
|
||
experiment.traffic_split,
|
||
user_attributes)
|
||
else: # TARGETED
|
||
variant_id = self._targeted_allocation(experiment.variants,
|
||
experiment.target_audience,
|
||
user_attributes)
|
||
|
||
if variant_id:
|
||
now = datetime.now().isoformat()
|
||
conn.execute("""
|
||
INSERT INTO experiment_assignments
|
||
(id, experiment_id, user_id, variant_id, user_attributes, assigned_at)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
""", (f"ea_{uuid.uuid4().hex[:16]}", experiment_id, user_id,
|
||
variant_id, json.dumps(user_attributes or {}), now))
|
||
conn.commit()
|
||
|
||
return variant_id
|
||
|
||
def _random_allocation(self, variants: List[Dict],
|
||
traffic_split: Dict[str, float]) -> str:
|
||
"""随机分配"""
|
||
variant_ids = [v['id'] for v in variants]
|
||
weights = [traffic_split.get(v_id, 1.0 / len(variants)) for v_id in variant_ids]
|
||
|
||
total = sum(weights)
|
||
normalized_weights = [w / total for w in weights]
|
||
|
||
return random.choices(variant_ids, weights=normalized_weights, k=1)[0]
|
||
|
||
def _stratified_allocation(self, variants: List[Dict],
|
||
traffic_split: Dict[str, float],
|
||
user_attributes: Dict) -> str:
|
||
"""分层分配(基于用户属性)"""
|
||
# 简化的分层分配:根据用户 ID 哈希值分配
|
||
if user_attributes and 'user_id' in user_attributes:
|
||
hash_value = int(hashlib.md5(user_attributes['user_id'].encode()).hexdigest(), 16)
|
||
variant_ids = [v['id'] for v in variants]
|
||
index = hash_value % len(variant_ids)
|
||
return variant_ids[index]
|
||
|
||
return self._random_allocation(variants, traffic_split)
|
||
|
||
def _targeted_allocation(self, variants: List[Dict],
|
||
target_audience: Dict,
|
||
user_attributes: Dict) -> Optional[str]:
|
||
"""定向分配(基于目标受众条件)"""
|
||
# 检查用户是否符合目标受众条件
|
||
conditions = target_audience.get('conditions', [])
|
||
|
||
matches = True
|
||
for condition in conditions:
|
||
attr_name = condition.get('attribute')
|
||
operator = condition.get('operator')
|
||
value = condition.get('value')
|
||
|
||
user_value = user_attributes.get(attr_name) if user_attributes else None
|
||
|
||
if operator == 'equals' and user_value != value:
|
||
matches = False
|
||
break
|
||
elif operator == 'not_equals' and user_value == value:
|
||
matches = False
|
||
break
|
||
elif operator == 'in' and user_value not in value:
|
||
matches = False
|
||
break
|
||
|
||
if not matches:
|
||
# 用户不符合条件,返回对照组
|
||
control_variant = next((v for v in variants if v.get('is_control')), variants[0])
|
||
return control_variant['id'] if control_variant else None
|
||
|
||
return self._random_allocation(variants, target_audience.get('traffic_split', {}))
|
||
|
||
def record_experiment_metric(self, experiment_id: str, variant_id: str,
|
||
user_id: str, metric_name: str, metric_value: float):
|
||
"""记录实验指标"""
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO experiment_metrics
|
||
(id, experiment_id, variant_id, user_id, metric_name, metric_value, recorded_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
""", (f"em_{uuid.uuid4().hex[:16]}", experiment_id, variant_id,
|
||
user_id, metric_name, metric_value, datetime.now().isoformat()))
|
||
conn.commit()
|
||
|
||
def analyze_experiment(self, experiment_id: str) -> Dict:
|
||
"""分析实验结果"""
|
||
experiment = self.get_experiment(experiment_id)
|
||
if not experiment:
|
||
return {"error": "Experiment not found"}
|
||
|
||
with self._get_db() as conn:
|
||
results = {}
|
||
|
||
for variant in experiment.variants:
|
||
variant_id = variant['id']
|
||
|
||
# 获取样本量
|
||
sample_row = conn.execute("""
|
||
SELECT COUNT(DISTINCT user_id) as sample_size
|
||
FROM experiment_assignments
|
||
WHERE experiment_id = ? AND variant_id = ?
|
||
""", (experiment_id, variant_id)).fetchone()
|
||
|
||
sample_size = sample_row['sample_size'] if sample_row else 0
|
||
|
||
# 获取主要指标统计
|
||
metric_row = conn.execute("""
|
||
SELECT
|
||
AVG(metric_value) as mean_value,
|
||
COUNT(*) as metric_count,
|
||
SUM(metric_value) as total_value
|
||
FROM experiment_metrics
|
||
WHERE experiment_id = ? AND variant_id = ? AND metric_name = ?
|
||
""", (experiment_id, variant_id, experiment.primary_metric)).fetchone()
|
||
|
||
mean_value = metric_row['mean_value'] if metric_row and metric_row['mean_value'] else 0
|
||
|
||
results[variant_id] = {
|
||
"variant_name": variant.get('name', variant_id),
|
||
"is_control": variant.get('is_control', False),
|
||
"sample_size": sample_size,
|
||
"mean_value": round(mean_value, 4),
|
||
"metric_count": metric_row['metric_count'] if metric_row else 0
|
||
}
|
||
|
||
# 计算统计显著性(简化版)
|
||
control_variant = next((v for v in experiment.variants if v.get('is_control')), None)
|
||
if control_variant:
|
||
control_id = control_variant['id']
|
||
control_result = results.get(control_id, {})
|
||
|
||
for variant_id, result in results.items():
|
||
if variant_id != control_id:
|
||
control_mean = control_result.get('mean_value', 0)
|
||
variant_mean = result.get('mean_value', 0)
|
||
|
||
if control_mean > 0:
|
||
uplift = (variant_mean - control_mean) / control_mean
|
||
else:
|
||
uplift = 0
|
||
|
||
# 简化的显著性判断
|
||
is_significant = abs(uplift) > 0.05 and result['sample_size'] > 100
|
||
|
||
result['uplift'] = round(uplift, 4)
|
||
result['is_significant'] = is_significant
|
||
result['p_value'] = 0.05 if is_significant else 0.5
|
||
|
||
return {
|
||
"experiment_id": experiment_id,
|
||
"experiment_name": experiment.name,
|
||
"primary_metric": experiment.primary_metric,
|
||
"status": experiment.status.value,
|
||
"variant_results": results
|
||
}
|
||
|
||
def start_experiment(self, experiment_id: str) -> Optional[Experiment]:
|
||
"""启动实验"""
|
||
with self._get_db() as conn:
|
||
now = datetime.now().isoformat()
|
||
conn.execute("""
|
||
UPDATE experiments
|
||
SET status = ?, start_date = ?, updated_at = ?
|
||
WHERE id = ? AND status = ?
|
||
""", (ExperimentStatus.RUNNING.value, now, now, experiment_id,
|
||
ExperimentStatus.DRAFT.value))
|
||
conn.commit()
|
||
|
||
return self.get_experiment(experiment_id)
|
||
|
||
def stop_experiment(self, experiment_id: str) -> Optional[Experiment]:
|
||
"""停止实验"""
|
||
with self._get_db() as conn:
|
||
now = datetime.now().isoformat()
|
||
conn.execute("""
|
||
UPDATE experiments
|
||
SET status = ?, end_date = ?, updated_at = ?
|
||
WHERE id = ? AND status = ?
|
||
""", (ExperimentStatus.COMPLETED.value, now, now, experiment_id,
|
||
ExperimentStatus.RUNNING.value))
|
||
conn.commit()
|
||
|
||
return self.get_experiment(experiment_id)
|
||
|
||
# ==================== 邮件营销自动化 ====================
|
||
|
||
def create_email_template(self, tenant_id: str, name: str,
|
||
template_type: EmailTemplateType,
|
||
subject: str, html_content: str,
|
||
text_content: str = None,
|
||
variables: List[str] = None,
|
||
from_name: str = None,
|
||
from_email: str = None,
|
||
reply_to: str = None) -> EmailTemplate:
|
||
"""创建邮件模板"""
|
||
template_id = f"et_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now().isoformat()
|
||
|
||
# 自动提取变量
|
||
if variables is None:
|
||
variables = re.findall(r'\{\{(\w+)\}\}', html_content)
|
||
|
||
template = EmailTemplate(
|
||
id=template_id,
|
||
tenant_id=tenant_id,
|
||
name=name,
|
||
template_type=template_type,
|
||
subject=subject,
|
||
html_content=html_content,
|
||
text_content=text_content or re.sub(r'<[^>]+>', '', html_content),
|
||
variables=variables,
|
||
preview_text=None,
|
||
from_name=from_name or "InsightFlow",
|
||
from_email=from_email or "noreply@insightflow.io",
|
||
reply_to=reply_to,
|
||
is_active=True,
|
||
created_at=now,
|
||
updated_at=now
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO email_templates
|
||
(id, tenant_id, name, template_type, subject, html_content, text_content,
|
||
variables, from_name, from_email, reply_to, is_active, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (template.id, template.tenant_id, template.name, template.template_type.value,
|
||
template.subject, template.html_content, template.text_content,
|
||
json.dumps(template.variables), template.from_name, template.from_email,
|
||
template.reply_to, template.is_active, template.created_at, template.updated_at))
|
||
conn.commit()
|
||
|
||
return template
|
||
|
||
def get_email_template(self, template_id: str) -> Optional[EmailTemplate]:
|
||
"""获取邮件模板"""
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT * FROM email_templates WHERE id = ?",
|
||
(template_id,)
|
||
).fetchone()
|
||
|
||
if row:
|
||
return self._row_to_email_template(row)
|
||
return None
|
||
|
||
def list_email_templates(self, tenant_id: str,
|
||
template_type: EmailTemplateType = None) -> List[EmailTemplate]:
|
||
"""列出邮件模板"""
|
||
query = "SELECT * FROM email_templates WHERE tenant_id = ? AND is_active = 1"
|
||
params = [tenant_id]
|
||
|
||
if template_type:
|
||
query += " AND template_type = ?"
|
||
params.append(template_type.value)
|
||
|
||
query += " ORDER BY created_at DESC"
|
||
|
||
with self._get_db() as conn:
|
||
rows = conn.execute(query, params).fetchall()
|
||
return [self._row_to_email_template(row) for row in rows]
|
||
|
||
def render_template(self, template_id: str, variables: Dict) -> Dict[str, str]:
|
||
"""渲染邮件模板"""
|
||
template = self.get_email_template(template_id)
|
||
if not template:
|
||
return None
|
||
|
||
subject = template.subject
|
||
html_content = template.html_content
|
||
text_content = template.text_content
|
||
|
||
for key, value in variables.items():
|
||
placeholder = f"{{{{{key}}}}}"
|
||
subject = subject.replace(placeholder, str(value))
|
||
html_content = html_content.replace(placeholder, str(value))
|
||
text_content = text_content.replace(placeholder, str(value))
|
||
|
||
return {
|
||
"subject": subject,
|
||
"html": html_content,
|
||
"text": text_content,
|
||
"from_name": template.from_name,
|
||
"from_email": template.from_email,
|
||
"reply_to": template.reply_to
|
||
}
|
||
|
||
def create_email_campaign(self, tenant_id: str, name: str,
|
||
template_id: str,
|
||
recipient_list: List[Dict],
|
||
scheduled_at: datetime = None) -> EmailCampaign:
|
||
"""创建邮件营销活动"""
|
||
campaign_id = f"ec_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now().isoformat()
|
||
|
||
campaign = EmailCampaign(
|
||
id=campaign_id,
|
||
tenant_id=tenant_id,
|
||
name=name,
|
||
template_id=template_id,
|
||
status="draft",
|
||
recipient_count=len(recipient_list),
|
||
sent_count=0,
|
||
delivered_count=0,
|
||
opened_count=0,
|
||
clicked_count=0,
|
||
bounced_count=0,
|
||
failed_count=0,
|
||
scheduled_at=scheduled_at.isoformat() if scheduled_at else None,
|
||
started_at=None,
|
||
completed_at=None,
|
||
created_at=now
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO email_campaigns
|
||
(id, tenant_id, name, template_id, status, recipient_count,
|
||
sent_count, delivered_count, opened_count, clicked_count,
|
||
bounced_count, failed_count, scheduled_at, started_at, completed_at, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (campaign.id, campaign.tenant_id, campaign.name, campaign.template_id,
|
||
campaign.status, campaign.recipient_count, campaign.sent_count,
|
||
campaign.delivered_count, campaign.opened_count, campaign.clicked_count,
|
||
campaign.bounced_count, campaign.failed_count, campaign.scheduled_at,
|
||
campaign.started_at, campaign.completed_at, campaign.created_at))
|
||
|
||
# 创建邮件发送记录
|
||
for recipient in recipient_list:
|
||
conn.execute("""
|
||
INSERT INTO email_logs
|
||
(id, campaign_id, tenant_id, user_id, email, template_id, status, subject, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (f"el_{uuid.uuid4().hex[:16]}", campaign_id, tenant_id,
|
||
recipient.get('user_id'), recipient.get('email'), template_id,
|
||
EmailStatus.SCHEDULED.value if scheduled_at else EmailStatus.DRAFT.value,
|
||
"", now))
|
||
|
||
conn.commit()
|
||
|
||
return campaign
|
||
|
||
async def send_email(self, campaign_id: str, user_id: str, email: str,
|
||
template_id: str, variables: Dict) -> bool:
|
||
"""发送单封邮件"""
|
||
template = self.get_email_template(template_id)
|
||
if not template:
|
||
return False
|
||
|
||
rendered = self.render_template(template_id, variables)
|
||
|
||
# 更新状态为发送中
|
||
with self._get_db() as conn:
|
||
now = datetime.now().isoformat()
|
||
conn.execute("""
|
||
UPDATE email_logs
|
||
SET status = ?, sent_at = ?, subject = ?
|
||
WHERE campaign_id = ? AND user_id = ?
|
||
""", (EmailStatus.SENDING.value, now, rendered['subject'],
|
||
campaign_id, user_id))
|
||
conn.commit()
|
||
|
||
try:
|
||
# 这里集成实际的邮件发送服务(SendGrid, AWS SES 等)
|
||
# 目前使用模拟发送
|
||
await asyncio.sleep(0.1)
|
||
|
||
success = True # 模拟成功
|
||
|
||
# 更新状态
|
||
with self._get_db() as conn:
|
||
now = datetime.now().isoformat()
|
||
if success:
|
||
conn.execute("""
|
||
UPDATE email_logs
|
||
SET status = ?, delivered_at = ?
|
||
WHERE campaign_id = ? AND user_id = ?
|
||
""", (EmailStatus.DELIVERED.value, now, campaign_id, user_id))
|
||
else:
|
||
conn.execute("""
|
||
UPDATE email_logs
|
||
SET status = ?, error_message = ?
|
||
WHERE campaign_id = ? AND user_id = ?
|
||
""", (EmailStatus.FAILED.value, "Send failed", campaign_id, user_id))
|
||
conn.commit()
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
UPDATE email_logs
|
||
SET status = ?, error_message = ?
|
||
WHERE campaign_id = ? AND user_id = ?
|
||
""", (EmailStatus.FAILED.value, str(e), campaign_id, user_id))
|
||
conn.commit()
|
||
return False
|
||
|
||
async def send_campaign(self, campaign_id: str) -> Dict:
|
||
"""发送整个营销活动"""
|
||
with self._get_db() as conn:
|
||
campaign_row = conn.execute(
|
||
"SELECT * FROM email_campaigns WHERE id = ?",
|
||
(campaign_id,)
|
||
).fetchone()
|
||
|
||
if not campaign_row:
|
||
return {"error": "Campaign not found"}
|
||
|
||
# 获取待发送的邮件
|
||
logs = conn.execute(
|
||
"""SELECT * FROM email_logs
|
||
WHERE campaign_id = ? AND status IN (?, ?)""",
|
||
(campaign_id, EmailStatus.DRAFT.value, EmailStatus.SCHEDULED.value)
|
||
).fetchall()
|
||
|
||
# 更新活动状态
|
||
now = datetime.now().isoformat()
|
||
conn.execute(
|
||
"UPDATE email_campaigns SET status = ?, started_at = ? WHERE id = ?",
|
||
("sending", now, campaign_id)
|
||
)
|
||
conn.commit()
|
||
|
||
# 批量发送
|
||
success_count = 0
|
||
failed_count = 0
|
||
|
||
for log in logs:
|
||
# 获取用户变量
|
||
variables = self._get_user_variables(log['tenant_id'], log['user_id'])
|
||
|
||
success = await self.send_email(
|
||
campaign_id, log['user_id'], log['email'],
|
||
log['template_id'], variables
|
||
)
|
||
|
||
if success:
|
||
success_count += 1
|
||
else:
|
||
failed_count += 1
|
||
|
||
# 更新活动状态
|
||
with self._get_db() as conn:
|
||
now = datetime.now().isoformat()
|
||
conn.execute("""
|
||
UPDATE email_campaigns
|
||
SET status = ?, completed_at = ?, sent_count = ?
|
||
WHERE id = ?
|
||
""", ("completed", now, success_count, campaign_id))
|
||
conn.commit()
|
||
|
||
return {
|
||
"campaign_id": campaign_id,
|
||
"total": len(logs),
|
||
"success": success_count,
|
||
"failed": failed_count
|
||
}
|
||
|
||
def _get_user_variables(self, tenant_id: str, user_id: str) -> Dict:
|
||
"""获取用户变量用于邮件模板"""
|
||
# 这里应该从用户服务获取用户信息
|
||
# 简化实现
|
||
return {
|
||
"user_id": user_id,
|
||
"user_name": "User",
|
||
"tenant_id": tenant_id
|
||
}
|
||
|
||
def create_automation_workflow(self, tenant_id: str, name: str,
|
||
description: str,
|
||
trigger_type: WorkflowTriggerType,
|
||
trigger_conditions: Dict,
|
||
actions: List[Dict]) -> AutomationWorkflow:
|
||
"""创建自动化工作流"""
|
||
workflow_id = f"aw_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now().isoformat()
|
||
|
||
workflow = AutomationWorkflow(
|
||
id=workflow_id,
|
||
tenant_id=tenant_id,
|
||
name=name,
|
||
description=description,
|
||
trigger_type=trigger_type,
|
||
trigger_conditions=trigger_conditions,
|
||
actions=actions,
|
||
is_active=True,
|
||
execution_count=0,
|
||
created_at=now,
|
||
updated_at=now
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO automation_workflows
|
||
(id, tenant_id, name, description, trigger_type, trigger_conditions,
|
||
actions, is_active, execution_count, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (workflow.id, workflow.tenant_id, workflow.name, workflow.description,
|
||
workflow.trigger_type.value, json.dumps(workflow.trigger_conditions),
|
||
json.dumps(workflow.actions), workflow.is_active, workflow.execution_count,
|
||
workflow.created_at, workflow.updated_at))
|
||
conn.commit()
|
||
|
||
return workflow
|
||
|
||
async def trigger_workflow(self, workflow_id: str, event_data: Dict):
|
||
"""触发自动化工作流"""
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT * FROM automation_workflows WHERE id = ? AND is_active = 1",
|
||
(workflow_id,)
|
||
).fetchone()
|
||
|
||
if not row:
|
||
return False
|
||
|
||
workflow = self._row_to_automation_workflow(row)
|
||
|
||
# 检查触发条件
|
||
if not self._check_trigger_conditions(workflow.trigger_conditions, event_data):
|
||
return False
|
||
|
||
# 执行动作
|
||
for action in workflow.actions:
|
||
await self._execute_action(action, event_data)
|
||
|
||
# 更新执行计数
|
||
conn.execute(
|
||
"UPDATE automation_workflows SET execution_count = execution_count + 1 WHERE id = ?",
|
||
(workflow_id,)
|
||
)
|
||
conn.commit()
|
||
|
||
return True
|
||
|
||
def _check_trigger_conditions(self, conditions: Dict, event_data: Dict) -> bool:
|
||
"""检查触发条件"""
|
||
# 简化的条件检查
|
||
for key, value in conditions.items():
|
||
if event_data.get(key) != value:
|
||
return False
|
||
return True
|
||
|
||
async def _execute_action(self, action: Dict, event_data: Dict):
|
||
"""执行工作流动作"""
|
||
action_type = action.get('type')
|
||
|
||
if action_type == 'send_email':
|
||
template_id = action.get('template_id')
|
||
# 发送邮件逻辑
|
||
pass
|
||
elif action_type == 'update_user':
|
||
# 更新用户属性
|
||
pass
|
||
elif action_type == 'webhook':
|
||
# 调用 webhook
|
||
pass
|
||
|
||
# ==================== 推荐系统 ====================
|
||
|
||
def create_referral_program(self, tenant_id: str, name: str,
|
||
description: str,
|
||
referrer_reward_type: str,
|
||
referrer_reward_value: float,
|
||
referee_reward_type: str,
|
||
referee_reward_value: float,
|
||
max_referrals_per_user: int = 10,
|
||
referral_code_length: int = 8,
|
||
expiry_days: int = 30) -> ReferralProgram:
|
||
"""创建推荐计划"""
|
||
program_id = f"rp_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now().isoformat()
|
||
|
||
program = ReferralProgram(
|
||
id=program_id,
|
||
tenant_id=tenant_id,
|
||
name=name,
|
||
description=description,
|
||
referrer_reward_type=referrer_reward_type,
|
||
referrer_reward_value=referrer_reward_value,
|
||
referee_reward_type=referee_reward_type,
|
||
referee_reward_value=referee_reward_value,
|
||
max_referrals_per_user=max_referrals_per_user,
|
||
referral_code_length=referral_code_length,
|
||
expiry_days=expiry_days,
|
||
is_active=True,
|
||
created_at=now,
|
||
updated_at=now
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO referral_programs
|
||
(id, tenant_id, name, description, referrer_reward_type, referrer_reward_value,
|
||
referee_reward_type, referee_reward_value, max_referrals_per_user,
|
||
referral_code_length, expiry_days, is_active, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (program.id, program.tenant_id, program.name, program.description,
|
||
program.referrer_reward_type, program.referrer_reward_value,
|
||
program.referee_reward_type, program.referee_reward_value,
|
||
program.max_referrals_per_user, program.referral_code_length,
|
||
program.expiry_days, program.is_active, program.created_at, program.updated_at))
|
||
conn.commit()
|
||
|
||
return program
|
||
|
||
def generate_referral_code(self, program_id: str, referrer_id: str) -> Referral:
|
||
"""生成推荐码"""
|
||
program = self._get_referral_program(program_id)
|
||
if not program:
|
||
return None
|
||
|
||
# 检查推荐次数限制
|
||
with self._get_db() as conn:
|
||
count_row = conn.execute(
|
||
"""SELECT COUNT(*) as count FROM referrals
|
||
WHERE program_id = ? AND referrer_id = ? AND status != ?""",
|
||
(program_id, referrer_id, ReferralStatus.EXPIRED.value)
|
||
).fetchone()
|
||
|
||
if count_row['count'] >= program.max_referrals_per_user:
|
||
return None
|
||
|
||
# 生成推荐码
|
||
referral_code = self._generate_unique_code(program.referral_code_length)
|
||
|
||
referral_id = f"ref_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now()
|
||
expires_at = now + timedelta(days=program.expiry_days)
|
||
|
||
referral = Referral(
|
||
id=referral_id,
|
||
program_id=program_id,
|
||
tenant_id=program.tenant_id,
|
||
referrer_id=referrer_id,
|
||
referee_id=None,
|
||
referral_code=referral_code,
|
||
status=ReferralStatus.PENDING,
|
||
referrer_rewarded=False,
|
||
referee_rewarded=False,
|
||
referrer_reward_value=program.referrer_reward_value,
|
||
referee_reward_value=program.referee_reward_value,
|
||
converted_at=None,
|
||
rewarded_at=None,
|
||
expires_at=expires_at,
|
||
created_at=now
|
||
)
|
||
|
||
conn.execute("""
|
||
INSERT INTO referrals
|
||
(id, program_id, tenant_id, referrer_id, referee_id, referral_code,
|
||
status, referrer_rewarded, referee_rewarded, referrer_reward_value,
|
||
referee_reward_value, converted_at, rewarded_at, expires_at, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (referral.id, referral.program_id, referral.tenant_id, referral.referrer_id,
|
||
referral.referee_id, referral.referral_code, referral.status.value,
|
||
referral.referrer_rewarded, referral.referee_rewarded,
|
||
referral.referrer_reward_value, referral.referee_reward_value,
|
||
referral.converted_at, referral.rewarded_at, referral.expires_at.isoformat(),
|
||
referral.created_at.isoformat()))
|
||
conn.commit()
|
||
|
||
return referral
|
||
|
||
def _generate_unique_code(self, length: int) -> str:
|
||
"""生成唯一推荐码"""
|
||
chars = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789" # 排除易混淆字符
|
||
while True:
|
||
code = ''.join(random.choices(chars, k=length))
|
||
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT 1 FROM referrals WHERE referral_code = ?",
|
||
(code,)
|
||
).fetchone()
|
||
|
||
if not row:
|
||
return code
|
||
|
||
def _get_referral_program(self, program_id: str) -> Optional[ReferralProgram]:
|
||
"""获取推荐计划"""
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT * FROM referral_programs WHERE id = ?",
|
||
(program_id,)
|
||
).fetchone()
|
||
|
||
if row:
|
||
return self._row_to_referral_program(row)
|
||
return None
|
||
|
||
def apply_referral_code(self, referral_code: str, referee_id: str) -> bool:
|
||
"""应用推荐码"""
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"""SELECT * FROM referrals
|
||
WHERE referral_code = ? AND status = ? AND expires_at > ?""",
|
||
(referral_code, ReferralStatus.PENDING.value, datetime.now().isoformat())
|
||
).fetchone()
|
||
|
||
if not row:
|
||
return False
|
||
|
||
now = datetime.now().isoformat()
|
||
conn.execute("""
|
||
UPDATE referrals
|
||
SET referee_id = ?, status = ?, converted_at = ?
|
||
WHERE id = ?
|
||
""", (referee_id, ReferralStatus.CONVERTED.value, now, row['id']))
|
||
conn.commit()
|
||
|
||
return True
|
||
|
||
def reward_referral(self, referral_id: str) -> bool:
|
||
"""发放推荐奖励"""
|
||
with self._get_db() as conn:
|
||
row = conn.execute(
|
||
"SELECT * FROM referrals WHERE id = ?",
|
||
(referral_id,)
|
||
).fetchone()
|
||
|
||
if not row or row['status'] != ReferralStatus.CONVERTED.value:
|
||
return False
|
||
|
||
now = datetime.now().isoformat()
|
||
conn.execute("""
|
||
UPDATE referrals
|
||
SET status = ?, referrer_rewarded = 1, referee_rewarded = 1, rewarded_at = ?
|
||
WHERE id = ?
|
||
""", (ReferralStatus.REWARDED.value, now, referral_id))
|
||
conn.commit()
|
||
|
||
return True
|
||
|
||
def get_referral_stats(self, program_id: str) -> Dict:
|
||
"""获取推荐统计"""
|
||
with self._get_db() as conn:
|
||
stats = conn.execute("""
|
||
SELECT
|
||
COUNT(*) as total_referrals,
|
||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) as pending,
|
||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) as converted,
|
||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) as rewarded,
|
||
SUM(CASE WHEN status = ? THEN 1 ELSE 0 END) as expired,
|
||
COUNT(DISTINCT referrer_id) as unique_referrers
|
||
FROM referrals
|
||
WHERE program_id = ?
|
||
""", (ReferralStatus.PENDING.value, ReferralStatus.CONVERTED.value,
|
||
ReferralStatus.REWARDED.value, ReferralStatus.EXPIRED.value,
|
||
program_id)).fetchone()
|
||
|
||
return {
|
||
"program_id": program_id,
|
||
"total_referrals": stats['total_referrals'] or 0,
|
||
"pending": stats['pending'] or 0,
|
||
"converted": stats['converted'] or 0,
|
||
"rewarded": stats['rewarded'] or 0,
|
||
"expired": stats['expired'] or 0,
|
||
"unique_referrers": stats['unique_referrers'] or 0,
|
||
"conversion_rate": round((stats['converted'] or 0) / max(stats['total_referrals'] or 1, 1), 4)
|
||
}
|
||
|
||
def create_team_incentive(self, tenant_id: str, name: str,
|
||
description: str, target_tier: str,
|
||
min_team_size: int, incentive_type: str,
|
||
incentive_value: float,
|
||
valid_from: datetime,
|
||
valid_until: datetime) -> TeamIncentive:
|
||
"""创建团队升级激励"""
|
||
incentive_id = f"ti_{uuid.uuid4().hex[:16]}"
|
||
now = datetime.now().isoformat()
|
||
|
||
incentive = TeamIncentive(
|
||
id=incentive_id,
|
||
tenant_id=tenant_id,
|
||
name=name,
|
||
description=description,
|
||
target_tier=target_tier,
|
||
min_team_size=min_team_size,
|
||
incentive_type=incentive_type,
|
||
incentive_value=incentive_value,
|
||
valid_from=valid_from.isoformat(),
|
||
valid_until=valid_until.isoformat(),
|
||
is_active=True,
|
||
created_at=now
|
||
)
|
||
|
||
with self._get_db() as conn:
|
||
conn.execute("""
|
||
INSERT INTO team_incentives
|
||
(id, tenant_id, name, description, target_tier, min_team_size,
|
||
incentive_type, incentive_value, valid_from, valid_until, is_active, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (incentive.id, incentive.tenant_id, incentive.name, incentive.description,
|
||
incentive.target_tier, incentive.min_team_size, incentive.incentive_type,
|
||
incentive.incentive_value, incentive.valid_from, incentive.valid_until,
|
||
incentive.is_active, incentive.created_at))
|
||
conn.commit()
|
||
|
||
return incentive
|
||
|
||
def check_team_incentive_eligibility(self, tenant_id: str,
|
||
current_tier: str,
|
||
team_size: int) -> List[TeamIncentive]:
|
||
"""检查团队激励资格"""
|
||
with self._get_db() as conn:
|
||
now = datetime.now().isoformat()
|
||
rows = conn.execute("""
|
||
SELECT * FROM team_incentives
|
||
WHERE tenant_id = ? AND is_active = 1
|
||
AND target_tier = ? AND min_team_size <= ?
|
||
AND valid_from <= ? AND valid_until >= ?
|
||
""", (tenant_id, current_tier, team_size, now, now)).fetchall()
|
||
|
||
return [self._row_to_team_incentive(row) for row in rows]
|
||
|
||
# ==================== 实时分析仪表板 ====================
|
||
|
||
def get_realtime_dashboard(self, tenant_id: str) -> Dict:
|
||
"""获取实时分析仪表板数据"""
|
||
now = datetime.now()
|
||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
|
||
with self._get_db() as conn:
|
||
# 今日统计
|
||
today_stats = conn.execute("""
|
||
SELECT
|
||
COUNT(DISTINCT user_id) as active_users,
|
||
COUNT(*) as total_events,
|
||
COUNT(DISTINCT session_id) as sessions
|
||
FROM analytics_events
|
||
WHERE tenant_id = ? AND timestamp >= ?
|
||
""", (tenant_id, today_start.isoformat())).fetchone()
|
||
|
||
# 最近事件
|
||
recent_events = conn.execute("""
|
||
SELECT event_name, event_type, timestamp, user_id
|
||
FROM analytics_events
|
||
WHERE tenant_id = ?
|
||
ORDER BY timestamp DESC
|
||
LIMIT 20
|
||
""", (tenant_id,)).fetchall()
|
||
|
||
# 热门功能
|
||
top_features = conn.execute("""
|
||
SELECT event_name, COUNT(*) as count
|
||
FROM analytics_events
|
||
WHERE tenant_id = ? AND timestamp >= ? AND event_type = ?
|
||
GROUP BY event_name
|
||
ORDER BY count DESC
|
||
LIMIT 10
|
||
""", (tenant_id, today_start.isoformat(), EventType.FEATURE_USE.value)).fetchall()
|
||
|
||
# 活跃用户趋势(最近24小时,每小时)
|
||
hourly_trend = []
|
||
for i in range(24):
|
||
hour_start = now - timedelta(hours=i+1)
|
||
hour_end = now - timedelta(hours=i)
|
||
|
||
row = conn.execute("""
|
||
SELECT COUNT(DISTINCT user_id) as count
|
||
FROM analytics_events
|
||
WHERE tenant_id = ? AND timestamp >= ? AND timestamp < ?
|
||
""", (tenant_id, hour_start.isoformat(), hour_end.isoformat())).fetchone()
|
||
|
||
hourly_trend.append({
|
||
"hour": hour_end.strftime("%H:00"),
|
||
"active_users": row['count'] or 0
|
||
})
|
||
|
||
return {
|
||
"tenant_id": tenant_id,
|
||
"timestamp": now.isoformat(),
|
||
"today": {
|
||
"active_users": today_stats['active_users'] or 0,
|
||
"total_events": today_stats['total_events'] or 0,
|
||
"sessions": today_stats['sessions'] or 0
|
||
},
|
||
"recent_events": [
|
||
{
|
||
"event_name": r['event_name'],
|
||
"event_type": r['event_type'],
|
||
"timestamp": r['timestamp'],
|
||
"user_id": r['user_id'][:8] + "..." # 脱敏
|
||
}
|
||
for r in recent_events
|
||
],
|
||
"top_features": [
|
||
{"feature": r['event_name'], "usage_count": r['count']}
|
||
for r in top_features
|
||
],
|
||
"hourly_trend": list(reversed(hourly_trend))
|
||
}
|
||
|
||
# ==================== 辅助方法 ====================
|
||
|
||
def _row_to_user_profile(self, row) -> UserProfile:
|
||
"""将数据库行转换为 UserProfile"""
|
||
return UserProfile(
|
||
id=row['id'],
|
||
tenant_id=row['tenant_id'],
|
||
user_id=row['user_id'],
|
||
first_seen=datetime.fromisoformat(row['first_seen']),
|
||
last_seen=datetime.fromisoformat(row['last_seen']),
|
||
total_sessions=row['total_sessions'],
|
||
total_events=row['total_events'],
|
||
feature_usage=json.loads(row['feature_usage']),
|
||
subscription_history=json.loads(row['subscription_history']),
|
||
ltv=row['ltv'],
|
||
churn_risk_score=row['churn_risk_score'],
|
||
engagement_score=row['engagement_score'],
|
||
created_at=datetime.fromisoformat(row['created_at']),
|
||
updated_at=datetime.fromisoformat(row['updated_at'])
|
||
)
|
||
|
||
def _row_to_experiment(self, row) -> Experiment:
|
||
"""将数据库行转换为 Experiment"""
|
||
return Experiment(
|
||
id=row['id'],
|
||
tenant_id=row['tenant_id'],
|
||
name=row['name'],
|
||
description=row['description'],
|
||
hypothesis=row['hypothesis'],
|
||
status=ExperimentStatus(row['status']),
|
||
variants=json.loads(row['variants']),
|
||
traffic_allocation=TrafficAllocationType(row['traffic_allocation']),
|
||
traffic_split=json.loads(row['traffic_split']),
|
||
target_audience=json.loads(row['target_audience']),
|
||
primary_metric=row['primary_metric'],
|
||
secondary_metrics=json.loads(row['secondary_metrics']),
|
||
start_date=datetime.fromisoformat(row['start_date']) if row['start_date'] else None,
|
||
end_date=datetime.fromisoformat(row['end_date']) if row['end_date'] else None,
|
||
min_sample_size=row['min_sample_size'],
|
||
confidence_level=row['confidence_level'],
|
||
created_at=row['created_at'],
|
||
updated_at=row['updated_at'],
|
||
created_by=row['created_by']
|
||
)
|
||
|
||
def _row_to_email_template(self, row) -> EmailTemplate:
|
||
"""将数据库行转换为 EmailTemplate"""
|
||
return EmailTemplate(
|
||
id=row['id'],
|
||
tenant_id=row['tenant_id'],
|
||
name=row['name'],
|
||
template_type=EmailTemplateType(row['template_type']),
|
||
subject=row['subject'],
|
||
html_content=row['html_content'],
|
||
text_content=row['text_content'],
|
||
variables=json.loads(row['variables']),
|
||
preview_text=row['preview_text'],
|
||
from_name=row['from_name'],
|
||
from_email=row['from_email'],
|
||
reply_to=row['reply_to'],
|
||
is_active=bool(row['is_active']),
|
||
created_at=row['created_at'],
|
||
updated_at=row['updated_at']
|
||
)
|
||
|
||
def _row_to_automation_workflow(self, row) -> AutomationWorkflow:
|
||
"""将数据库行转换为 AutomationWorkflow"""
|
||
return AutomationWorkflow(
|
||
id=row['id'],
|
||
tenant_id=row['tenant_id'],
|
||
name=row['name'],
|
||
description=row['description'],
|
||
trigger_type=WorkflowTriggerType(row['trigger_type']),
|
||
trigger_conditions=json.loads(row['trigger_conditions']),
|
||
actions=json.loads(row['actions']),
|
||
is_active=bool(row['is_active']),
|
||
execution_count=row['execution_count'],
|
||
created_at=row['created_at'],
|
||
updated_at=row['updated_at']
|
||
)
|
||
|
||
def _row_to_referral_program(self, row) -> ReferralProgram:
|
||
"""将数据库行转换为 ReferralProgram"""
|
||
return ReferralProgram(
|
||
id=row['id'],
|
||
tenant_id=row['tenant_id'],
|
||
name=row['name'],
|
||
description=row['description'],
|
||
referrer_reward_type=row['referrer_reward_type'],
|
||
referrer_reward_value=row['referrer_reward_value'],
|
||
referee_reward_type=row['referee_reward_type'],
|
||
referee_reward_value=row['referee_reward_value'],
|
||
max_referrals_per_user=row['max_referrals_per_user'],
|
||
referral_code_length=row['referral_code_length'],
|
||
expiry_days=row['expiry_days'],
|
||
is_active=bool(row['is_active']),
|
||
created_at=row['created_at'],
|
||
updated_at=row['updated_at']
|
||
)
|
||
|
||
def _row_to_team_incentive(self, row) -> TeamIncentive:
|
||
"""将数据库行转换为 TeamIncentive"""
|
||
return TeamIncentive(
|
||
id=row['id'],
|
||
tenant_id=row['tenant_id'],
|
||
name=row['name'],
|
||
description=row['description'],
|
||
target_tier=row['target_tier'],
|
||
min_team_size=row['min_team_size'],
|
||
incentive_type=row['incentive_type'],
|
||
incentive_value=row['incentive_value'],
|
||
valid_from=datetime.fromisoformat(row['valid_from']),
|
||
valid_until=datetime.fromisoformat(row['valid_until']),
|
||
is_active=bool(row['is_active']),
|
||
created_at=row['created_at']
|
||
)
|
||
|
||
|
||
# Singleton instance
|
||
_growth_manager = None
|
||
|
||
|
||
def get_growth_manager() -> GrowthManager:
|
||
global _growth_manager
|
||
if _growth_manager is None:
|
||
_growth_manager = GrowthManager()
|
||
return _growth_manager
|