Files
insightflow/backend/test_phase8_task1.py
OpenClaw Bot 646b64daf7 fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理 (BaseException -> 具体异常类型)
- 修复PEP8格式问题
- 添加类型注解
- 修复tingwu_client.py缩进错误
2026-02-27 15:20:03 +08:00

330 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""
InsightFlow Phase 8 Task 1 - 多租户 SaaS 架构测试脚本
测试内容:
1. 租户创建和管理
2. 自定义域名绑定和验证
3. 品牌白标配置
4. 成员邀请和权限管理
5. 资源使用统计
"""
from tenant_manager import (
get_tenant_manager
)
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_tenant_management():
"""测试租户管理功能"""
print("=" * 60)
print("测试 1: 租户管理")
print("=" * 60)
manager = get_tenant_manager()
# 1. 创建租户
print("\n1.1 创建租户...")
tenant = manager.create_tenant(
name="Test Company",
owner_id="user_001",
tier="pro",
description="A test company tenant"
)
print(f"✅ 租户创建成功: {tenant.id}")
print(f" - 名称: {tenant.name}")
print(f" - Slug: {tenant.slug}")
print(f" - 层级: {tenant.tier}")
print(f" - 状态: {tenant.status}")
print(f" - 资源限制: {tenant.resource_limits}")
# 2. 获取租户
print("\n1.2 获取租户信息...")
fetched = manager.get_tenant(tenant.id)
assert fetched is not None, "获取租户失败"
print(f"✅ 获取租户成功: {fetched.name}")
# 3. 通过 slug 获取
print("\n1.3 通过 slug 获取租户...")
by_slug = manager.get_tenant_by_slug(tenant.slug)
assert by_slug is not None, "通过 slug 获取失败"
print(f"✅ 通过 slug 获取成功: {by_slug.name}")
# 4. 更新租户
print("\n1.4 更新租户信息...")
updated = manager.update_tenant(
tenant_id=tenant.id,
name="Test Company Updated",
tier="enterprise"
)
assert updated is not None, "更新租户失败"
print(f"✅ 租户更新成功: {updated.name}, 层级: {updated.tier}")
# 5. 列出租户
print("\n1.5 列出租户...")
tenants = manager.list_tenants(limit=10)
print(f"✅ 找到 {len(tenants)} 个租户")
return tenant.id
def test_domain_management(tenant_id: str):
"""测试域名管理功能"""
print("\n" + "=" * 60)
print("测试 2: 域名管理")
print("=" * 60)
manager = get_tenant_manager()
# 1. 添加域名
print("\n2.1 添加自定义域名...")
domain = manager.add_domain(
tenant_id=tenant_id,
domain="test.example.com",
is_primary=True
)
print(f"✅ 域名添加成功: {domain.domain}")
print(f" - ID: {domain.id}")
print(f" - 状态: {domain.status}")
print(f" - 验证令牌: {domain.verification_token}")
# 2. 获取验证指导
print("\n2.2 获取域名验证指导...")
instructions = manager.get_domain_verification_instructions(domain.id)
print("✅ 验证指导:")
print(f" - DNS 记录: {instructions['dns_record']}")
print(f" - 文件验证: {instructions['file_verification']}")
# 3. 验证域名
print("\n2.3 验证域名...")
verified = manager.verify_domain(tenant_id, domain.id)
print(f"✅ 域名验证结果: {verified}")
# 4. 通过域名获取租户
print("\n2.4 通过域名获取租户...")
by_domain = manager.get_tenant_by_domain("test.example.com")
if by_domain:
print(f"✅ 通过域名获取租户成功: {by_domain.name}")
else:
print("⚠️ 通过域名获取租户失败(验证可能未通过)")
# 5. 列出域名
print("\n2.5 列出所有域名...")
domains = manager.list_domains(tenant_id)
print(f"✅ 找到 {len(domains)} 个域名")
for d in domains:
print(f" - {d.domain} ({d.status})")
return domain.id
def test_branding_management(tenant_id: str):
"""测试品牌白标功能"""
print("\n" + "=" * 60)
print("测试 3: 品牌白标")
print("=" * 60)
manager = get_tenant_manager()
# 1. 更新品牌配置
print("\n3.1 更新品牌配置...")
branding = manager.update_branding(
tenant_id=tenant_id,
logo_url="https://example.com/logo.png",
favicon_url="https://example.com/favicon.ico",
primary_color="#1890ff",
secondary_color="#52c41a",
custom_css=".header { background: #1890ff; }",
custom_js="console.log('Custom JS loaded');",
login_page_bg="https://example.com/bg.jpg"
)
print("✅ 品牌配置更新成功")
print(f" - Logo: {branding.logo_url}")
print(f" - 主色: {branding.primary_color}")
print(f" - 次色: {branding.secondary_color}")
# 2. 获取品牌配置
print("\n3.2 获取品牌配置...")
fetched = manager.get_branding(tenant_id)
assert fetched is not None, "获取品牌配置失败"
print("✅ 获取品牌配置成功")
# 3. 生成品牌 CSS
print("\n3.3 生成品牌 CSS...")
css = manager.get_branding_css(tenant_id)
print(f"✅ 生成 CSS 成功 ({len(css)} 字符)")
print(f" CSS 预览:\n{css[:200]}...")
return branding.id
def test_member_management(tenant_id: str):
"""测试成员管理功能"""
print("\n" + "=" * 60)
print("测试 4: 成员管理")
print("=" * 60)
manager = get_tenant_manager()
# 1. 邀请成员
print("\n4.1 邀请成员...")
member1 = manager.invite_member(
tenant_id=tenant_id,
email="admin@test.com",
role="admin",
invited_by="user_001"
)
print(f"✅ 成员邀请成功: {member1.email}")
print(f" - ID: {member1.id}")
print(f" - 角色: {member1.role}")
print(f" - 权限: {member1.permissions}")
member2 = manager.invite_member(
tenant_id=tenant_id,
email="member@test.com",
role="member",
invited_by="user_001"
)
print(f"✅ 成员邀请成功: {member2.email}")
# 2. 接受邀请
print("\n4.2 接受邀请...")
accepted = manager.accept_invitation(member1.id, "user_002")
print(f"✅ 邀请接受结果: {accepted}")
# 3. 列出成员
print("\n4.3 列出所有成员...")
members = manager.list_members(tenant_id)
print(f"✅ 找到 {len(members)} 个成员")
for m in members:
print(f" - {m.email} ({m.role}) - {m.status}")
# 4. 检查权限
print("\n4.4 检查权限...")
can_manage = manager.check_permission(tenant_id, "user_002", "project", "create")
print(f"✅ user_002 可以创建项目: {can_manage}")
# 5. 更新成员角色
print("\n4.5 更新成员角色...")
updated = manager.update_member_role(tenant_id, member2.id, "viewer")
print(f"✅ 角色更新结果: {updated}")
# 6. 获取用户所属租户
print("\n4.6 获取用户所属租户...")
user_tenants = manager.get_user_tenants("user_002")
print(f"✅ user_002 属于 {len(user_tenants)} 个租户")
for t in user_tenants:
print(f" - {t['name']} ({t['member_role']})")
return member1.id, member2.id
def test_usage_tracking(tenant_id: str):
"""测试资源使用统计功能"""
print("\n" + "=" * 60)
print("测试 5: 资源使用统计")
print("=" * 60)
manager = get_tenant_manager()
# 1. 记录使用
print("\n5.1 记录资源使用...")
manager.record_usage(
tenant_id=tenant_id,
storage_bytes=1024 * 1024 * 50, # 50MB
transcription_seconds=600, # 10分钟
api_calls=100,
projects_count=5,
entities_count=50,
members_count=3
)
print("✅ 资源使用记录成功")
# 2. 获取使用统计
print("\n5.2 获取使用统计...")
stats = manager.get_usage_stats(tenant_id)
print("✅ 使用统计:")
print(f" - 存储: {stats['storage_mb']:.2f} MB")
print(f" - 转录: {stats['transcription_minutes']:.2f} 分钟")
print(f" - API 调用: {stats['api_calls']}")
print(f" - 项目数: {stats['projects_count']}")
print(f" - 实体数: {stats['entities_count']}")
print(f" - 成员数: {stats['members_count']}")
print(f" - 使用百分比: {stats['usage_percentages']}")
# 3. 检查资源限制
print("\n5.3 检查资源限制...")
for resource in ["storage", "transcription", "api_calls", "projects", "entities", "members"]:
allowed, current, limit = manager.check_resource_limit(tenant_id, resource)
print(f" - {resource}: {current}/{limit} ({'' if allowed else ''})")
return stats
def cleanup(tenant_id: str, domain_id: str, member_ids: list):
"""清理测试数据"""
print("\n" + "=" * 60)
print("清理测试数据")
print("=" * 60)
manager = get_tenant_manager()
# 移除成员
for member_id in member_ids:
if member_id:
manager.remove_member(tenant_id, member_id)
print(f"✅ 成员已移除: {member_id}")
# 移除域名
if domain_id:
manager.remove_domain(tenant_id, domain_id)
print(f"✅ 域名已移除: {domain_id}")
# 删除租户
manager.delete_tenant(tenant_id)
print(f"✅ 租户已删除: {tenant_id}")
def main():
"""主测试函数"""
print("\n" + "=" * 60)
print("InsightFlow Phase 8 Task 1 - 多租户 SaaS 架构测试")
print("=" * 60)
tenant_id = None
domain_id = None
member_ids = []
try:
# 运行所有测试
tenant_id = test_tenant_management()
domain_id = test_domain_management(tenant_id)
test_branding_management(tenant_id)
m1, m2 = test_member_management(tenant_id)
member_ids = [m1, m2]
test_usage_tracking(tenant_id)
print("\n" + "=" * 60)
print("✅ 所有测试通过!")
print("=" * 60)
except Exception as e:
print(f"\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
finally:
# 清理
if tenant_id:
try:
cleanup(tenant_id, domain_id, member_ids)
except Exception as e:
print(f"⚠️ 清理失败: {e}")
if __name__ == "__main__":
main()