- 任务1: 多租户SaaS架构 (tenant_manager.py) - 任务2: 订阅与计费系统 (subscription_manager.py) - 任务3: 企业级功能 (enterprise_manager.py) - 更新 schema.sql 添加所有相关表 - 更新 main.py 添加所有API端点 - 更新 README.md 进度表
247 lines
8.3 KiB
Python
247 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
InsightFlow Phase 8 Task 2 测试脚本 - 订阅与计费系统
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from subscription_manager import (
|
|
get_subscription_manager, SubscriptionManager,
|
|
SubscriptionStatus, PaymentProvider, PaymentStatus, InvoiceStatus, RefundStatus
|
|
)
|
|
|
|
def test_subscription_manager():
|
|
"""测试订阅管理器"""
|
|
print("=" * 60)
|
|
print("InsightFlow Phase 8 Task 2 - 订阅与计费系统测试")
|
|
print("=" * 60)
|
|
|
|
# 使用临时文件数据库进行测试
|
|
db_path = tempfile.mktemp(suffix='.db')
|
|
|
|
try:
|
|
manager = SubscriptionManager(db_path=db_path)
|
|
|
|
print("\n1. 测试订阅计划管理")
|
|
print("-" * 40)
|
|
|
|
# 获取默认计划
|
|
plans = manager.list_plans()
|
|
print(f"✓ 默认计划数量: {len(plans)}")
|
|
for plan in plans:
|
|
print(f" - {plan.name} ({plan.tier}): ¥{plan.price_monthly}/月")
|
|
|
|
# 通过 tier 获取计划
|
|
free_plan = manager.get_plan_by_tier("free")
|
|
pro_plan = manager.get_plan_by_tier("pro")
|
|
enterprise_plan = manager.get_plan_by_tier("enterprise")
|
|
|
|
assert free_plan is not None, "Free 计划应该存在"
|
|
assert pro_plan is not None, "Pro 计划应该存在"
|
|
assert enterprise_plan is not None, "Enterprise 计划应该存在"
|
|
|
|
print(f"✓ Free 计划: {free_plan.name}")
|
|
print(f"✓ Pro 计划: {pro_plan.name}")
|
|
print(f"✓ Enterprise 计划: {enterprise_plan.name}")
|
|
|
|
print("\n2. 测试订阅管理")
|
|
print("-" * 40)
|
|
|
|
tenant_id = "test-tenant-001"
|
|
|
|
# 创建订阅
|
|
subscription = manager.create_subscription(
|
|
tenant_id=tenant_id,
|
|
plan_id=pro_plan.id,
|
|
payment_provider=PaymentProvider.STRIPE.value,
|
|
trial_days=14
|
|
)
|
|
|
|
print(f"✓ 创建订阅: {subscription.id}")
|
|
print(f" - 状态: {subscription.status}")
|
|
print(f" - 计划: {pro_plan.name}")
|
|
print(f" - 试用开始: {subscription.trial_start}")
|
|
print(f" - 试用结束: {subscription.trial_end}")
|
|
|
|
# 获取租户订阅
|
|
tenant_sub = manager.get_tenant_subscription(tenant_id)
|
|
assert tenant_sub is not None, "应该能获取到租户订阅"
|
|
print(f"✓ 获取租户订阅: {tenant_sub.id}")
|
|
|
|
print("\n3. 测试用量记录")
|
|
print("-" * 40)
|
|
|
|
# 记录转录用量
|
|
usage1 = manager.record_usage(
|
|
tenant_id=tenant_id,
|
|
resource_type="transcription",
|
|
quantity=120,
|
|
unit="minute",
|
|
description="会议转录"
|
|
)
|
|
print(f"✓ 记录转录用量: {usage1.quantity} {usage1.unit}, 费用: ¥{usage1.cost:.2f}")
|
|
|
|
# 记录存储用量
|
|
usage2 = manager.record_usage(
|
|
tenant_id=tenant_id,
|
|
resource_type="storage",
|
|
quantity=2.5,
|
|
unit="gb",
|
|
description="文件存储"
|
|
)
|
|
print(f"✓ 记录存储用量: {usage2.quantity} {usage2.unit}, 费用: ¥{usage2.cost:.2f}")
|
|
|
|
# 获取用量汇总
|
|
summary = manager.get_usage_summary(tenant_id)
|
|
print(f"✓ 用量汇总:")
|
|
print(f" - 总费用: ¥{summary['total_cost']:.2f}")
|
|
for resource, data in summary['breakdown'].items():
|
|
print(f" - {resource}: {data['quantity']} (¥{data['cost']:.2f})")
|
|
|
|
print("\n4. 测试支付管理")
|
|
print("-" * 40)
|
|
|
|
# 创建支付
|
|
payment = manager.create_payment(
|
|
tenant_id=tenant_id,
|
|
amount=99.0,
|
|
currency="CNY",
|
|
provider=PaymentProvider.ALIPAY.value,
|
|
payment_method="qrcode"
|
|
)
|
|
print(f"✓ 创建支付: {payment.id}")
|
|
print(f" - 金额: ¥{payment.amount}")
|
|
print(f" - 提供商: {payment.provider}")
|
|
print(f" - 状态: {payment.status}")
|
|
|
|
# 确认支付
|
|
confirmed = manager.confirm_payment(payment.id, "alipay_123456")
|
|
print(f"✓ 确认支付完成: {confirmed.status}")
|
|
|
|
# 列出支付记录
|
|
payments = manager.list_payments(tenant_id)
|
|
print(f"✓ 支付记录数量: {len(payments)}")
|
|
|
|
print("\n5. 测试发票管理")
|
|
print("-" * 40)
|
|
|
|
# 列出发票
|
|
invoices = manager.list_invoices(tenant_id)
|
|
print(f"✓ 发票数量: {len(invoices)}")
|
|
|
|
if invoices:
|
|
invoice = invoices[0]
|
|
print(f" - 发票号: {invoice.invoice_number}")
|
|
print(f" - 金额: ¥{invoice.amount_due}")
|
|
print(f" - 状态: {invoice.status}")
|
|
|
|
print("\n6. 测试退款管理")
|
|
print("-" * 40)
|
|
|
|
# 申请退款
|
|
refund = manager.request_refund(
|
|
tenant_id=tenant_id,
|
|
payment_id=payment.id,
|
|
amount=50.0,
|
|
reason="服务不满意",
|
|
requested_by="user_001"
|
|
)
|
|
print(f"✓ 申请退款: {refund.id}")
|
|
print(f" - 金额: ¥{refund.amount}")
|
|
print(f" - 原因: {refund.reason}")
|
|
print(f" - 状态: {refund.status}")
|
|
|
|
# 批准退款
|
|
approved = manager.approve_refund(refund.id, "admin_001")
|
|
print(f"✓ 批准退款: {approved.status}")
|
|
|
|
# 完成退款
|
|
completed = manager.complete_refund(refund.id, "refund_123456")
|
|
print(f"✓ 完成退款: {completed.status}")
|
|
|
|
# 列出退款记录
|
|
refunds = manager.list_refunds(tenant_id)
|
|
print(f"✓ 退款记录数量: {len(refunds)}")
|
|
|
|
print("\n7. 测试账单历史")
|
|
print("-" * 40)
|
|
|
|
history = manager.get_billing_history(tenant_id)
|
|
print(f"✓ 账单历史记录数量: {len(history)}")
|
|
for h in history:
|
|
print(f" - [{h.type}] {h.description}: ¥{h.amount}")
|
|
|
|
print("\n8. 测试支付提供商集成")
|
|
print("-" * 40)
|
|
|
|
# Stripe Checkout
|
|
stripe_session = manager.create_stripe_checkout_session(
|
|
tenant_id=tenant_id,
|
|
plan_id=enterprise_plan.id,
|
|
success_url="https://example.com/success",
|
|
cancel_url="https://example.com/cancel"
|
|
)
|
|
print(f"✓ Stripe Checkout 会话: {stripe_session['session_id']}")
|
|
|
|
# 支付宝订单
|
|
alipay_order = manager.create_alipay_order(
|
|
tenant_id=tenant_id,
|
|
plan_id=pro_plan.id
|
|
)
|
|
print(f"✓ 支付宝订单: {alipay_order['order_id']}")
|
|
|
|
# 微信支付订单
|
|
wechat_order = manager.create_wechat_order(
|
|
tenant_id=tenant_id,
|
|
plan_id=pro_plan.id
|
|
)
|
|
print(f"✓ 微信支付订单: {wechat_order['order_id']}")
|
|
|
|
# Webhook 处理
|
|
webhook_result = manager.handle_webhook("stripe", {
|
|
"event_type": "checkout.session.completed",
|
|
"data": {"object": {"id": "cs_test"}}
|
|
})
|
|
print(f"✓ Webhook 处理: {webhook_result}")
|
|
|
|
print("\n9. 测试订阅变更")
|
|
print("-" * 40)
|
|
|
|
# 更改计划
|
|
changed = manager.change_plan(
|
|
subscription_id=subscription.id,
|
|
new_plan_id=enterprise_plan.id
|
|
)
|
|
print(f"✓ 更改计划: {changed.plan_id} (Enterprise)")
|
|
|
|
# 取消订阅
|
|
cancelled = manager.cancel_subscription(
|
|
subscription_id=subscription.id,
|
|
at_period_end=True
|
|
)
|
|
print(f"✓ 取消订阅: {cancelled.status}")
|
|
print(f" - 周期结束时取消: {cancelled.cancel_at_period_end}")
|
|
|
|
print("\n" + "=" * 60)
|
|
print("所有测试通过! ✓")
|
|
print("=" * 60)
|
|
|
|
finally:
|
|
# 清理临时数据库
|
|
if os.path.exists(db_path):
|
|
os.remove(db_path)
|
|
print(f"\n清理临时数据库: {db_path}")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
test_subscription_manager()
|
|
except Exception as e:
|
|
print(f"\n❌ 测试失败: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|