#!/usr/bin/env python3 """ InsightFlow Phase 8 Task 2 测试脚本 - 订阅与计费系统 """ from subscription_manager import ( SubscriptionManager, PaymentProvider ) import sys import os import tempfile sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def test_subscription_manager(): """测试订阅管理器""" print("=" * 60) print("InsightFlow Phase 8 Task 2 - 订阅与计费系统测试") print("=" * 60) # 使用临时文件数据库进行测试 db_path = tempfile.mktemp(suffix='.db') try: manager = SubscriptionManager(db_path=db_path) print("\n1. 测试订阅计划管理") print("-" * 40) # 获取默认计划 plans = manager.list_plans() print(f"✓ 默认计划数量: {len(plans)}") for plan in plans: print(f" - {plan.name} ({plan.tier}): ¥{plan.price_monthly}/月") # 通过 tier 获取计划 free_plan = manager.get_plan_by_tier("free") pro_plan = manager.get_plan_by_tier("pro") enterprise_plan = manager.get_plan_by_tier("enterprise") assert free_plan is not None, "Free 计划应该存在" assert pro_plan is not None, "Pro 计划应该存在" assert enterprise_plan is not None, "Enterprise 计划应该存在" print(f"✓ Free 计划: {free_plan.name}") print(f"✓ Pro 计划: {pro_plan.name}") print(f"✓ Enterprise 计划: {enterprise_plan.name}") print("\n2. 测试订阅管理") print("-" * 40) tenant_id = "test-tenant-001" # 创建订阅 subscription = manager.create_subscription( tenant_id=tenant_id, plan_id=pro_plan.id, payment_provider=PaymentProvider.STRIPE.value, trial_days=14 ) print(f"✓ 创建订阅: {subscription.id}") print(f" - 状态: {subscription.status}") print(f" - 计划: {pro_plan.name}") print(f" - 试用开始: {subscription.trial_start}") print(f" - 试用结束: {subscription.trial_end}") # 获取租户订阅 tenant_sub = manager.get_tenant_subscription(tenant_id) assert tenant_sub is not None, "应该能获取到租户订阅" print(f"✓ 获取租户订阅: {tenant_sub.id}") print("\n3. 测试用量记录") print("-" * 40) # 记录转录用量 usage1 = manager.record_usage( tenant_id=tenant_id, resource_type="transcription", quantity=120, unit="minute", description="会议转录" ) print(f"✓ 记录转录用量: {usage1.quantity} {usage1.unit}, 费用: ¥{usage1.cost:.2f}") # 记录存储用量 usage2 = manager.record_usage( tenant_id=tenant_id, resource_type="storage", quantity=2.5, unit="gb", description="文件存储" ) print(f"✓ 记录存储用量: {usage2.quantity} {usage2.unit}, 费用: ¥{usage2.cost:.2f}") # 获取用量汇总 summary = manager.get_usage_summary(tenant_id) print(f"✓ 用量汇总:") print(f" - 总费用: ¥{summary['total_cost']:.2f}") for resource, data in summary['breakdown'].items(): print(f" - {resource}: {data['quantity']} (¥{data['cost']:.2f})") print("\n4. 测试支付管理") print("-" * 40) # 创建支付 payment = manager.create_payment( tenant_id=tenant_id, amount=99.0, currency="CNY", provider=PaymentProvider.ALIPAY.value, payment_method="qrcode" ) print(f"✓ 创建支付: {payment.id}") print(f" - 金额: ¥{payment.amount}") print(f" - 提供商: {payment.provider}") print(f" - 状态: {payment.status}") # 确认支付 confirmed = manager.confirm_payment(payment.id, "alipay_123456") print(f"✓ 确认支付完成: {confirmed.status}") # 列出支付记录 payments = manager.list_payments(tenant_id) print(f"✓ 支付记录数量: {len(payments)}") print("\n5. 测试发票管理") print("-" * 40) # 列出发票 invoices = manager.list_invoices(tenant_id) print(f"✓ 发票数量: {len(invoices)}") if invoices: invoice = invoices[0] print(f" - 发票号: {invoice.invoice_number}") print(f" - 金额: ¥{invoice.amount_due}") print(f" - 状态: {invoice.status}") print("\n6. 测试退款管理") print("-" * 40) # 申请退款 refund = manager.request_refund( tenant_id=tenant_id, payment_id=payment.id, amount=50.0, reason="服务不满意", requested_by="user_001" ) print(f"✓ 申请退款: {refund.id}") print(f" - 金额: ¥{refund.amount}") print(f" - 原因: {refund.reason}") print(f" - 状态: {refund.status}") # 批准退款 approved = manager.approve_refund(refund.id, "admin_001") print(f"✓ 批准退款: {approved.status}") # 完成退款 completed = manager.complete_refund(refund.id, "refund_123456") print(f"✓ 完成退款: {completed.status}") # 列出退款记录 refunds = manager.list_refunds(tenant_id) print(f"✓ 退款记录数量: {len(refunds)}") print("\n7. 测试账单历史") print("-" * 40) history = manager.get_billing_history(tenant_id) print(f"✓ 账单历史记录数量: {len(history)}") for h in history: print(f" - [{h.type}] {h.description}: ¥{h.amount}") print("\n8. 测试支付提供商集成") print("-" * 40) # Stripe Checkout stripe_session = manager.create_stripe_checkout_session( tenant_id=tenant_id, plan_id=enterprise_plan.id, success_url="https://example.com/success", cancel_url="https://example.com/cancel" ) print(f"✓ Stripe Checkout 会话: {stripe_session['session_id']}") # 支付宝订单 alipay_order = manager.create_alipay_order( tenant_id=tenant_id, plan_id=pro_plan.id ) print(f"✓ 支付宝订单: {alipay_order['order_id']}") # 微信支付订单 wechat_order = manager.create_wechat_order( tenant_id=tenant_id, plan_id=pro_plan.id ) print(f"✓ 微信支付订单: {wechat_order['order_id']}") # Webhook 处理 webhook_result = manager.handle_webhook("stripe", { "event_type": "checkout.session.completed", "data": {"object": {"id": "cs_test"}} }) print(f"✓ Webhook 处理: {webhook_result}") print("\n9. 测试订阅变更") print("-" * 40) # 更改计划 changed = manager.change_plan( subscription_id=subscription.id, new_plan_id=enterprise_plan.id ) print(f"✓ 更改计划: {changed.plan_id} (Enterprise)") # 取消订阅 cancelled = manager.cancel_subscription( subscription_id=subscription.id, at_period_end=True ) print(f"✓ 取消订阅: {cancelled.status}") print(f" - 周期结束时取消: {cancelled.cancel_at_period_end}") print("\n" + "=" * 60) print("所有测试通过! ✓") print("=" * 60) finally: # 清理临时数据库 if os.path.exists(db_path): os.remove(db_path) print(f"\n清理临时数据库: {db_path}") if __name__ == "__main__": try: test_subscription_manager() except Exception as e: print(f"\n❌ 测试失败: {e}") import traceback traceback.print_exc() sys.exit(1)