fix: auto-fix code issues (cron)

- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
- 修复缺失的urllib.parse导入
This commit is contained in:
OpenClaw Bot
2026-02-28 06:03:09 +08:00
parent ff83cab6c7
commit fe3d64a1d2
41 changed files with 4501 additions and 1176 deletions

View File

@@ -28,6 +28,7 @@ import httpx
# Database path
DB_PATH = os.path.join(os.path.dirname(__file__), "insightflow.db")
class EventType(StrEnum):
"""事件类型"""
@@ -43,6 +44,7 @@ class EventType(StrEnum):
INVITE_ACCEPTED = "invite_accepted" # 接受邀请
REFERRAL_REWARD = "referral_reward" # 推荐奖励
class ExperimentStatus(StrEnum):
"""实验状态"""
@@ -52,6 +54,7 @@ class ExperimentStatus(StrEnum):
COMPLETED = "completed" # 已完成
ARCHIVED = "archived" # 已归档
class TrafficAllocationType(StrEnum):
"""流量分配类型"""
@@ -59,6 +62,7 @@ class TrafficAllocationType(StrEnum):
STRATIFIED = "stratified" # 分层分配
TARGETED = "targeted" # 定向分配
class EmailTemplateType(StrEnum):
"""邮件模板类型"""
@@ -70,6 +74,7 @@ class EmailTemplateType(StrEnum):
REFERRAL = "referral" # 推荐邀请
NEWSLETTER = "newsletter" # 新闻通讯
class EmailStatus(StrEnum):
"""邮件状态"""
@@ -83,6 +88,7 @@ class EmailStatus(StrEnum):
BOUNCED = "bounced" # 退信
FAILED = "failed" # 失败
class WorkflowTriggerType(StrEnum):
"""工作流触发类型"""
@@ -94,6 +100,7 @@ class WorkflowTriggerType(StrEnum):
MILESTONE = "milestone" # 里程碑
CUSTOM_EVENT = "custom_event" # 自定义事件
class ReferralStatus(StrEnum):
"""推荐状态"""
@@ -102,6 +109,7 @@ class ReferralStatus(StrEnum):
REWARDED = "rewarded" # 已奖励
EXPIRED = "expired" # 已过期
@dataclass
class AnalyticsEvent:
"""分析事件"""
@@ -120,6 +128,7 @@ class AnalyticsEvent:
utm_medium: str | None
utm_campaign: str | None
@dataclass
class UserProfile:
"""用户画像"""
@@ -139,6 +148,7 @@ class UserProfile:
created_at: datetime
updated_at: datetime
@dataclass
class Funnel:
"""转化漏斗"""
@@ -151,6 +161,7 @@ class Funnel:
created_at: datetime
updated_at: datetime
@dataclass
class FunnelAnalysis:
"""漏斗分析结果"""
@@ -163,6 +174,7 @@ class FunnelAnalysis:
overall_conversion: float # 总体转化率
drop_off_points: list[dict] # 流失点
@dataclass
class Experiment:
"""A/B 测试实验"""
@@ -187,6 +199,7 @@ class Experiment:
updated_at: datetime
created_by: str
@dataclass
class ExperimentResult:
"""实验结果"""
@@ -204,6 +217,7 @@ class ExperimentResult:
uplift: float # 提升幅度
created_at: datetime
@dataclass
class EmailTemplate:
"""邮件模板"""
@@ -224,6 +238,7 @@ class EmailTemplate:
created_at: datetime
updated_at: datetime
@dataclass
class EmailCampaign:
"""邮件营销活动"""
@@ -245,6 +260,7 @@ class EmailCampaign:
completed_at: datetime | None
created_at: datetime
@dataclass
class EmailLog:
"""邮件发送记录"""
@@ -266,6 +282,7 @@ class EmailLog:
error_message: str | None
created_at: datetime
@dataclass
class AutomationWorkflow:
"""自动化工作流"""
@@ -282,6 +299,7 @@ class AutomationWorkflow:
created_at: datetime
updated_at: datetime
@dataclass
class ReferralProgram:
"""推荐计划"""
@@ -301,6 +319,7 @@ class ReferralProgram:
created_at: datetime
updated_at: datetime
@dataclass
class Referral:
"""推荐记录"""
@@ -321,6 +340,7 @@ class Referral:
expires_at: datetime
created_at: datetime
@dataclass
class TeamIncentive:
"""团队升级激励"""
@@ -338,6 +358,7 @@ class TeamIncentive:
is_active: bool
created_at: datetime
class GrowthManager:
"""运营与增长管理主类"""
@@ -437,7 +458,10 @@ class GrowthManager:
async def _send_to_mixpanel(self, event: AnalyticsEvent):
"""发送事件到 Mixpanel"""
try:
headers = {"Content-Type": "application/json", "Authorization": f"Basic {self.mixpanel_token}"}
headers = {
"Content-Type": "application/json",
"Authorization": f"Basic {self.mixpanel_token}",
}
payload = {
"event": event.event_name,
@@ -450,7 +474,9 @@ class GrowthManager:
}
async with httpx.AsyncClient() as client:
await client.post("https://api.mixpanel.com/track", headers=headers, json=[payload], timeout=10.0)
await client.post(
"https://api.mixpanel.com/track", headers=headers, json=[payload], timeout=10.0
)
except Exception as e:
print(f"Failed to send to Mixpanel: {e}")
@@ -473,16 +499,24 @@ class GrowthManager:
}
async with httpx.AsyncClient() as client:
await client.post("https://api.amplitude.com/2/httpapi", headers=headers, json=payload, timeout=10.0)
await client.post(
"https://api.amplitude.com/2/httpapi",
headers=headers,
json=payload,
timeout=10.0,
)
except Exception as e:
print(f"Failed to send to Amplitude: {e}")
async def _update_user_profile(self, tenant_id: str, user_id: str, event_type: EventType, event_name: str):
async def _update_user_profile(
self, tenant_id: str, user_id: str, event_type: EventType, event_name: str
):
"""更新用户画像"""
with self._get_db() as conn:
# 检查用户画像是否存在
row = conn.execute(
"SELECT * FROM user_profiles WHERE tenant_id = ? AND user_id = ?", (tenant_id, user_id)
"SELECT * FROM user_profiles WHERE tenant_id = ? AND user_id = ?",
(tenant_id, user_id),
).fetchone()
now = datetime.now().isoformat()
@@ -538,7 +572,8 @@ class GrowthManager:
"""获取用户画像"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM user_profiles WHERE tenant_id = ? AND user_id = ?", (tenant_id, user_id)
"SELECT * FROM user_profiles WHERE tenant_id = ? AND user_id = ?",
(tenant_id, user_id),
).fetchone()
if row:
@@ -599,7 +634,9 @@ class GrowthManager:
"event_type_distribution": {r["event_type"]: r["count"] for r in type_rows},
}
def create_funnel(self, tenant_id: str, name: str, description: str, steps: list[dict], created_by: str) -> Funnel:
def create_funnel(
self, tenant_id: str, name: str, description: str, steps: list[dict], created_by: str
) -> Funnel:
"""创建转化漏斗"""
funnel_id = f"fnl_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
@@ -664,7 +701,9 @@ class GrowthManager:
FROM analytics_events
WHERE event_name = ? AND timestamp >= ? AND timestamp <= ?
"""
row = conn.execute(query, (event_name, period_start.isoformat(), period_end.isoformat())).fetchone()
row = conn.execute(
query, (event_name, period_start.isoformat(), period_end.isoformat())
).fetchone()
user_count = row["user_count"] if row else 0
@@ -696,7 +735,9 @@ class GrowthManager:
overall_conversion = 0.0
# 找出主要流失点
drop_off_points = [s for s in step_conversions if s["drop_off_rate"] > 0.2 and s != step_conversions[0]]
drop_off_points = [
s for s in step_conversions if s["drop_off_rate"] > 0.2 and s != step_conversions[0]
]
return FunnelAnalysis(
funnel_id=funnel_id,
@@ -708,7 +749,9 @@ class GrowthManager:
drop_off_points=drop_off_points,
)
def calculate_retention(self, tenant_id: str, cohort_date: datetime, periods: list[int] = None) -> dict:
def calculate_retention(
self, tenant_id: str, cohort_date: datetime, periods: list[int] = None
) -> dict:
"""计算留存率"""
if periods is None:
periods = [1, 3, 7, 14, 30]
@@ -725,7 +768,8 @@ class GrowthManager:
)
"""
cohort_rows = conn.execute(
cohort_query, (tenant_id, cohort_date.isoformat(), tenant_id, cohort_date.isoformat())
cohort_query,
(tenant_id, cohort_date.isoformat(), tenant_id, cohort_date.isoformat()),
).fetchall()
cohort_users = {r["user_id"] for r in cohort_rows}
@@ -757,7 +801,11 @@ class GrowthManager:
"retention_rate": round(retention_rate, 4),
}
return {"cohort_date": cohort_date.isoformat(), "cohort_size": cohort_size, "retention": retention_rates}
return {
"cohort_date": cohort_date.isoformat(),
"cohort_size": cohort_size,
"retention": retention_rates,
}
# ==================== A/B 测试框架 ====================
@@ -842,7 +890,9 @@ class GrowthManager:
def get_experiment(self, experiment_id: str) -> Experiment | None:
"""获取实验详情"""
with self._get_db() as conn:
row = conn.execute("SELECT * FROM experiments WHERE id = ?", (experiment_id,)).fetchone()
row = conn.execute(
"SELECT * FROM experiments WHERE id = ?", (experiment_id,)
).fetchone()
if row:
return self._row_to_experiment(row)
@@ -863,7 +913,9 @@ class GrowthManager:
rows = conn.execute(query, params).fetchall()
return [self._row_to_experiment(row) for row in rows]
def assign_variant(self, experiment_id: str, user_id: str, user_attributes: dict = None) -> str | None:
def assign_variant(
self, experiment_id: str, user_id: str, user_attributes: dict = None
) -> str | None:
"""为用户分配实验变体"""
experiment = self.get_experiment(experiment_id)
if not experiment or experiment.status != ExperimentStatus.RUNNING:
@@ -884,9 +936,13 @@ class GrowthManager:
if experiment.traffic_allocation == TrafficAllocationType.RANDOM:
variant_id = self._random_allocation(experiment.variants, experiment.traffic_split)
elif experiment.traffic_allocation == TrafficAllocationType.STRATIFIED:
variant_id = self._stratified_allocation(experiment.variants, experiment.traffic_split, user_attributes)
variant_id = self._stratified_allocation(
experiment.variants, experiment.traffic_split, user_attributes
)
else: # TARGETED
variant_id = self._targeted_allocation(experiment.variants, experiment.target_audience, user_attributes)
variant_id = self._targeted_allocation(
experiment.variants, experiment.target_audience, user_attributes
)
if variant_id:
now = datetime.now().isoformat()
@@ -932,7 +988,9 @@ class GrowthManager:
return self._random_allocation(variants, traffic_split)
def _targeted_allocation(self, variants: list[dict], target_audience: dict, user_attributes: dict) -> str | None:
def _targeted_allocation(
self, variants: list[dict], target_audience: dict, user_attributes: dict
) -> str | None:
"""定向分配(基于目标受众条件)"""
# 检查用户是否符合目标受众条件
conditions = target_audience.get("conditions", [])
@@ -963,7 +1021,12 @@ class GrowthManager:
return self._random_allocation(variants, target_audience.get("traffic_split", {}))
def record_experiment_metric(
self, experiment_id: str, variant_id: str, user_id: str, metric_name: str, metric_value: float
self,
experiment_id: str,
variant_id: str,
user_id: str,
metric_name: str,
metric_value: float,
):
"""记录实验指标"""
with self._get_db() as conn:
@@ -1022,7 +1085,9 @@ class GrowthManager:
(experiment_id, variant_id, experiment.primary_metric),
).fetchone()
mean_value = metric_row["mean_value"] if metric_row and metric_row["mean_value"] else 0
mean_value = (
metric_row["mean_value"] if metric_row and metric_row["mean_value"] else 0
)
results[variant_id] = {
"variant_name": variant.get("name", variant_id),
@@ -1073,7 +1138,13 @@ class GrowthManager:
SET status = ?, start_date = ?, updated_at = ?
WHERE id = ? AND status = ?
""",
(ExperimentStatus.RUNNING.value, now, now, experiment_id, ExperimentStatus.DRAFT.value),
(
ExperimentStatus.RUNNING.value,
now,
now,
experiment_id,
ExperimentStatus.DRAFT.value,
),
)
conn.commit()
@@ -1089,7 +1160,13 @@ class GrowthManager:
SET status = ?, end_date = ?, updated_at = ?
WHERE id = ? AND status = ?
""",
(ExperimentStatus.COMPLETED.value, now, now, experiment_id, ExperimentStatus.RUNNING.value),
(
ExperimentStatus.COMPLETED.value,
now,
now,
experiment_id,
ExperimentStatus.RUNNING.value,
),
)
conn.commit()
@@ -1168,13 +1245,17 @@ class GrowthManager:
def get_email_template(self, template_id: str) -> EmailTemplate | None:
"""获取邮件模板"""
with self._get_db() as conn:
row = conn.execute("SELECT * FROM email_templates WHERE id = ?", (template_id,)).fetchone()
row = conn.execute(
"SELECT * FROM email_templates WHERE id = ?", (template_id,)
).fetchone()
if row:
return self._row_to_email_template(row)
return None
def list_email_templates(self, tenant_id: str, template_type: EmailTemplateType = None) -> list[EmailTemplate]:
def list_email_templates(
self, tenant_id: str, template_type: EmailTemplateType = None
) -> list[EmailTemplate]:
"""列出邮件模板"""
query = "SELECT * FROM email_templates WHERE tenant_id = ? AND is_active = 1"
params = [tenant_id]
@@ -1215,7 +1296,12 @@ class GrowthManager:
}
def create_email_campaign(
self, tenant_id: str, name: str, template_id: str, recipient_list: list[dict], scheduled_at: datetime = None
self,
tenant_id: str,
name: str,
template_id: str,
recipient_list: list[dict],
scheduled_at: datetime = None,
) -> EmailCampaign:
"""创建邮件营销活动"""
campaign_id = f"ec_{uuid.uuid4().hex[:16]}"
@@ -1294,7 +1380,9 @@ class GrowthManager:
return campaign
async def send_email(self, campaign_id: str, user_id: str, email: str, template_id: str, variables: dict) -> bool:
async def send_email(
self, campaign_id: str, user_id: str, email: str, template_id: str, variables: dict
) -> bool:
"""发送单封邮件"""
template = self.get_email_template(template_id)
if not template:
@@ -1363,7 +1451,9 @@ class GrowthManager:
async def send_campaign(self, campaign_id: str) -> dict:
"""发送整个营销活动"""
with self._get_db() as conn:
campaign_row = conn.execute("SELECT * FROM email_campaigns WHERE id = ?", (campaign_id,)).fetchone()
campaign_row = conn.execute(
"SELECT * FROM email_campaigns WHERE id = ?", (campaign_id,)
).fetchone()
if not campaign_row:
return {"error": "Campaign not found"}
@@ -1378,7 +1468,8 @@ class GrowthManager:
# 更新活动状态
now = datetime.now().isoformat()
conn.execute(
"UPDATE email_campaigns SET status = ?, started_at = ? WHERE id = ?", ("sending", now, campaign_id)
"UPDATE email_campaigns SET status = ?, started_at = ? WHERE id = ?",
("sending", now, campaign_id),
)
conn.commit()
@@ -1390,7 +1481,9 @@ class GrowthManager:
# 获取用户变量
variables = self._get_user_variables(log["tenant_id"], log["user_id"])
success = await self.send_email(campaign_id, log["user_id"], log["email"], log["template_id"], variables)
success = await self.send_email(
campaign_id, log["user_id"], log["email"], log["template_id"], variables
)
if success:
success_count += 1
@@ -1410,7 +1503,12 @@ class GrowthManager:
)
conn.commit()
return {"campaign_id": campaign_id, "total": len(logs), "success": success_count, "failed": failed_count}
return {
"campaign_id": campaign_id,
"total": len(logs),
"success": success_count,
"failed": failed_count,
}
def _get_user_variables(self, tenant_id: str, user_id: str) -> dict:
"""获取用户变量用于邮件模板"""
@@ -1493,7 +1591,8 @@ class GrowthManager:
# 更新执行计数
conn.execute(
"UPDATE automation_workflows SET execution_count = execution_count + 1 WHERE id = ?", (workflow_id,)
"UPDATE automation_workflows SET execution_count = execution_count + 1 WHERE id = ?",
(workflow_id,),
)
conn.commit()
@@ -1666,7 +1765,9 @@ class GrowthManager:
code = "".join(random.choices(chars, k=length))
with self._get_db() as conn:
row = conn.execute("SELECT 1 FROM referrals WHERE referral_code = ?", (code,)).fetchone()
row = conn.execute(
"SELECT 1 FROM referrals WHERE referral_code = ?", (code,)
).fetchone()
if not row:
return code
@@ -1674,7 +1775,9 @@ class GrowthManager:
def _get_referral_program(self, program_id: str) -> ReferralProgram | None:
"""获取推荐计划"""
with self._get_db() as conn:
row = conn.execute("SELECT * FROM referral_programs WHERE id = ?", (program_id,)).fetchone()
row = conn.execute(
"SELECT * FROM referral_programs WHERE id = ?", (program_id,)
).fetchone()
if row:
return self._row_to_referral_program(row)
@@ -1758,7 +1861,9 @@ class GrowthManager:
"rewarded": stats["rewarded"] or 0,
"expired": stats["expired"] or 0,
"unique_referrers": stats["unique_referrers"] or 0,
"conversion_rate": round((stats["converted"] or 0) / max(stats["total_referrals"] or 1, 1), 4),
"conversion_rate": round(
(stats["converted"] or 0) / max(stats["total_referrals"] or 1, 1), 4
),
}
def create_team_incentive(
@@ -1898,7 +2003,9 @@ class GrowthManager:
(tenant_id, hour_start.isoformat(), hour_end.isoformat()),
).fetchone()
hourly_trend.append({"hour": hour_end.strftime("%H:00"), "active_users": row["count"] or 0})
hourly_trend.append(
{"hour": hour_end.strftime("%H:00"), "active_users": row["count"] or 0}
)
return {
"tenant_id": tenant_id,
@@ -1917,7 +2024,9 @@ class GrowthManager:
}
for r in recent_events
],
"top_features": [{"feature": r["event_name"], "usage_count": r["count"]} for r in top_features],
"top_features": [
{"feature": r["event_name"], "usage_count": r["count"]} for r in top_features
],
"hourly_trend": list(reversed(hourly_trend)),
}
@@ -2038,9 +2147,11 @@ class GrowthManager:
created_at=row["created_at"],
)
# Singleton instance
_growth_manager = None
def get_growth_manager() -> GrowthManager:
global _growth_manager
if _growth_manager is None: