#!/usr/bin/env python3 """ InsightFlow Phase 7 Task 6 & 8 测试脚本 测试高级搜索与发现、性能优化与扩展功能 """ import os import sys import time from performance_manager import CacheManager, PerformanceMonitor, TaskQueue, get_performance_manager from search_manager import ( EntityPathDiscovery, FullTextSearch, KnowledgeGapDetection, SemanticSearch, get_search_manager, ) # 添加 backend 到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def test_fulltext_search(): """测试全文搜索""" print("\n" + "=" * 60) print("测试全文搜索 (FullTextSearch)") print("=" * 60) search = FullTextSearch() # 测试索引创建 print("\n1. 测试索引创建...") success = search.index_content( content_id="test_entity_1", content_type="entity", project_id="test_project", text="这是一个测试实体,用于验证全文搜索功能。支持关键词高亮显示。" ) print(f" 索引创建: {'✓ 成功' if success else '✗ 失败'}") # 测试搜索 print("\n2. 测试关键词搜索...") results = search.search("测试", project_id="test_project") print(f" 搜索结果数量: {len(results)}") if results: print(f" 第一个结果: {results[0].content[:50]}...") print(f" 相关分数: {results[0].score}") # 测试布尔搜索 print("\n3. 测试布尔搜索...") results = search.search("测试 AND 全文", project_id="test_project") print(f" AND 搜索结果: {len(results)}") results = search.search("测试 OR 关键词", project_id="test_project") print(f" OR 搜索结果: {len(results)}") # 测试高亮 print("\n4. 测试文本高亮...") highlighted = search.highlight_text( "这是一个测试实体,用于验证全文搜索功能。", "测试 全文" ) print(f" 高亮结果: {highlighted}") print("\n✓ 全文搜索测试完成") return True def test_semantic_search(): """测试语义搜索""" print("\n" + "=" * 60) print("测试语义搜索 (SemanticSearch)") print("=" * 60) semantic = SemanticSearch() # 检查可用性 print(f"\n1. 语义搜索可用性: {'✓ 可用' if semantic.is_available() else '✗ 不可用'}") if not semantic.is_available(): print(" (需要安装 sentence-transformers 库)") return True # 测试 embedding 生成 print("\n2. 测试 embedding 生成...") embedding = semantic.generate_embedding("这是一个测试句子") if embedding: print(f" Embedding 维度: {len(embedding)}") print(f" 前5个值: {embedding[:5]}") # 测试索引 print("\n3. 测试语义索引...") success = semantic.index_embedding( content_id="test_content_1", content_type="transcript", project_id="test_project", text="这是用于语义搜索测试的文本内容。" ) print(f" 索引创建: {'✓ 成功' if success else '✗ 失败'}") print("\n✓ 语义搜索测试完成") return True def test_entity_path_discovery(): """测试实体路径发现""" print("\n" + "=" * 60) print("测试实体路径发现 (EntityPathDiscovery)") print("=" * 60) discovery = EntityPathDiscovery() print("\n1. 测试路径发现初始化...") print(f" 数据库路径: {discovery.db_path}") print("\n2. 测试多跳关系发现...") # 注意:这需要在数据库中有实际数据 print(" (需要实际实体数据才能测试)") print("\n✓ 实体路径发现测试完成") return True def test_knowledge_gap_detection(): """测试知识缺口识别""" print("\n" + "=" * 60) print("测试知识缺口识别 (KnowledgeGapDetection)") print("=" * 60) detection = KnowledgeGapDetection() print("\n1. 测试缺口检测初始化...") print(f" 数据库路径: {detection.db_path}") print("\n2. 测试完整性报告生成...") # 注意:这需要在数据库中有实际项目数据 print(" (需要实际项目数据才能测试)") print("\n✓ 知识缺口识别测试完成") return True def test_cache_manager(): """测试缓存管理器""" print("\n" + "=" * 60) print("测试缓存管理器 (CacheManager)") print("=" * 60) cache = CacheManager() print(f"\n1. 缓存后端: {'Redis' if cache.use_redis else '内存 LRU'}") print("\n2. 测试缓存操作...") # 设置缓存 cache.set("test_key_1", {"name": "测试数据", "value": 123}, ttl=60) print(" ✓ 设置缓存 test_key_1") # 获取缓存 _ = cache.get("test_key_1") print(" ✓ 获取缓存: {value}") # 批量操作 cache.set_many({ "batch_key_1": "value1", "batch_key_2": "value2", "batch_key_3": "value3" }, ttl=60) print(" ✓ 批量设置缓存") _ = cache.get_many(["batch_key_1", "batch_key_2", "batch_key_3"]) print(" ✓ 批量获取缓存: {len(values)} 个") # 删除缓存 cache.delete("test_key_1") print(" ✓ 删除缓存 test_key_1") # 获取统计 stats = cache.get_stats() print("\n3. 缓存统计:") print(f" 总请求数: {stats['total_requests']}") print(f" 命中数: {stats['hits']}") print(f" 未命中数: {stats['misses']}") print(f" 命中率: {stats['hit_rate']:.2%}") if not cache.use_redis: print(f" 内存使用: {stats.get('memory_size_bytes', 0)} bytes") print(f" 缓存条目数: {stats.get('cache_entries', 0)}") print("\n✓ 缓存管理器测试完成") return True def test_task_queue(): """测试任务队列""" print("\n" + "=" * 60) print("测试任务队列 (TaskQueue)") print("=" * 60) queue = TaskQueue() print(f"\n1. 任务队列可用性: {'✓ 可用' if queue.is_available() else '✗ 不可用'}") print(f" 后端: {'Celery' if queue.use_celery else '内存'}") print("\n2. 测试任务提交...") # 定义测试任务处理器 def test_task_handler(payload): print(f" 执行任务: {payload}") return {"status": "success", "processed": True} queue.register_handler("test_task", test_task_handler) # 提交任务 task_id = queue.submit( task_type="test_task", payload={"test": "data", "timestamp": time.time()} ) print(" ✓ 提交任务: {task_id}") # 获取任务状态 task_info = queue.get_status(task_id) if task_info: print(" ✓ 任务状态: {task_info.status}") # 获取统计 stats = queue.get_stats() print("\n3. 任务队列统计:") print(f" 后端: {stats['backend']}") print(f" 按状态统计: {stats.get('by_status', {})}") print("\n✓ 任务队列测试完成") return True def test_performance_monitor(): """测试性能监控""" print("\n" + "=" * 60) print("测试性能监控 (PerformanceMonitor)") print("=" * 60) monitor = PerformanceMonitor() print("\n1. 测试指标记录...") # 记录一些测试指标 for i in range(5): monitor.record_metric( metric_type="api_response", duration_ms=50 + i * 10, endpoint="/api/v1/test", metadata={"test": True} ) for i in range(3): monitor.record_metric( metric_type="db_query", duration_ms=20 + i * 5, endpoint="SELECT test", metadata={"test": True} ) print(" ✓ 记录了 8 个测试指标") # 获取统计 print("\n2. 获取性能统计...") stats = monitor.get_stats(hours=1) print(f" 总请求数: {stats['overall']['total_requests']}") print(f" 平均响应时间: {stats['overall']['avg_duration_ms']} ms") print(f" 最大响应时间: {stats['overall']['max_duration_ms']} ms") print("\n3. 按类型统计:") for type_stat in stats.get('by_type', []): print(f" {type_stat['type']}: {type_stat['count']} 次, " f"平均 {type_stat['avg_duration_ms']} ms") print("\n✓ 性能监控测试完成") return True def test_search_manager(): """测试搜索管理器""" print("\n" + "=" * 60) print("测试搜索管理器 (SearchManager)") print("=" * 60) manager = get_search_manager() print("\n1. 搜索管理器初始化...") print(" ✓ 搜索管理器已初始化") print("\n2. 获取搜索统计...") stats = manager.get_search_stats() print(f" 全文索引数: {stats['fulltext_indexed']}") print(f" 语义索引数: {stats['semantic_indexed']}") print(f" 语义搜索可用: {stats['semantic_search_available']}") print("\n✓ 搜索管理器测试完成") return True def test_performance_manager(): """测试性能管理器""" print("\n" + "=" * 60) print("测试性能管理器 (PerformanceManager)") print("=" * 60) manager = get_performance_manager() print("\n1. 性能管理器初始化...") print(" ✓ 性能管理器已初始化") print("\n2. 获取系统健康状态...") health = manager.get_health_status() print(f" 缓存后端: {health['cache']['backend']}") print(f" 任务队列后端: {health['task_queue']['backend']}") print("\n3. 获取完整统计...") stats = manager.get_full_stats() print(f" 缓存统计: {stats['cache']['total_requests']} 请求") print(f" 任务队列统计: {stats['task_queue']}") print("\n✓ 性能管理器测试完成") return True def run_all_tests(): """运行所有测试""" print("\n" + "=" * 60) print("InsightFlow Phase 7 Task 6 & 8 测试") print("高级搜索与发现 + 性能优化与扩展") print("=" * 60) results = [] # 搜索模块测试 try: results.append(("全文搜索", test_fulltext_search())) except Exception as e: print(f"\n✗ 全文搜索测试失败: {e}") results.append(("全文搜索", False)) try: results.append(("语义搜索", test_semantic_search())) except Exception as e: print(f"\n✗ 语义搜索测试失败: {e}") results.append(("语义搜索", False)) try: results.append(("实体路径发现", test_entity_path_discovery())) except Exception as e: print(f"\n✗ 实体路径发现测试失败: {e}") results.append(("实体路径发现", False)) try: results.append(("知识缺口识别", test_knowledge_gap_detection())) except Exception as e: print(f"\n✗ 知识缺口识别测试失败: {e}") results.append(("知识缺口识别", False)) try: results.append(("搜索管理器", test_search_manager())) except Exception as e: print(f"\n✗ 搜索管理器测试失败: {e}") results.append(("搜索管理器", False)) # 性能模块测试 try: results.append(("缓存管理器", test_cache_manager())) except Exception as e: print(f"\n✗ 缓存管理器测试失败: {e}") results.append(("缓存管理器", False)) try: results.append(("任务队列", test_task_queue())) except Exception as e: print(f"\n✗ 任务队列测试失败: {e}") results.append(("任务队列", False)) try: results.append(("性能监控", test_performance_monitor())) except Exception as e: print(f"\n✗ 性能监控测试失败: {e}") results.append(("性能监控", False)) try: results.append(("性能管理器", test_performance_manager())) except Exception as e: print(f"\n✗ 性能管理器测试失败: {e}") results.append(("性能管理器", False)) # 打印测试汇总 print("\n" + "=" * 60) print("测试汇总") print("=" * 60) passed = sum(1 for _, result in results if result) total = len(results) for name, result in results: status = "✓ 通过" if result else "✗ 失败" print(f" {status} - {name}") print(f"\n总计: {passed}/{total} 测试通过") if passed == total: print("\n🎉 所有测试通过!") else: print(f"\n⚠️ 有 {total - passed} 个测试失败") return passed == total if __name__ == "__main__": success = run_all_tests() sys.exit(0 if success else 1)