#!/usr/bin/env python3 """ InsightFlow Multimodal Entity Linker - Phase 7 多模态实体关联模块:跨模态实体对齐和知识融合 """ import os import json import uuid from typing import List, Dict, Optional, Tuple, Set from dataclasses import dataclass from difflib import SequenceMatcher # 尝试导入embedding库 try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False @dataclass class MultimodalEntity: """多模态实体""" id: str entity_id: str project_id: str name: str source_type: str # audio, video, image, document source_id: str mention_context: str confidence: float modality_features: Dict = None # 模态特定特征 def __post_init__(self): if self.modality_features is None: self.modality_features = {} @dataclass class EntityLink: """实体关联""" id: str project_id: str source_entity_id: str target_entity_id: str link_type: str # same_as, related_to, part_of source_modality: str target_modality: str confidence: float evidence: str @dataclass class AlignmentResult: """对齐结果""" entity_id: str matched_entity_id: Optional[str] similarity: float match_type: str # exact, fuzzy, embedding confidence: float @dataclass class FusionResult: """知识融合结果""" canonical_entity_id: str merged_entity_ids: List[str] fused_properties: Dict source_modalities: List[str] confidence: float class MultimodalEntityLinker: """多模态实体关联器 - 跨模态实体对齐和知识融合""" # 关联类型 LINK_TYPES = { 'same_as': '同一实体', 'related_to': '相关实体', 'part_of': '组成部分', 'mentions': '提及关系' } # 模态类型 MODALITIES = ['audio', 'video', 'image', 'document'] def __init__(self, similarity_threshold: float = 0.85): """ 初始化多模态实体关联器 Args: similarity_threshold: 相似度阈值 """ self.similarity_threshold = similarity_threshold def calculate_string_similarity(self, s1: str, s2: str) -> float: """ 计算字符串相似度 Args: s1: 字符串1 s2: 字符串2 Returns: 相似度分数 (0-1) """ if not s1 or not s2: return 0.0 s1, s2 = s1.lower().strip(), s2.lower().strip() # 完全匹配 if s1 == s2: return 1.0 # 包含关系 if s1 in s2 or s2 in s1: return 0.9 # 编辑距离相似度 return SequenceMatcher(None, s1, s2).ratio() def calculate_entity_similarity(self, entity1: Dict, entity2: Dict) -> Tuple[float, str]: """ 计算两个实体的综合相似度 Args: entity1: 实体1信息 entity2: 实体2信息 Returns: (相似度, 匹配类型) """ # 名称相似度 name_sim = self.calculate_string_similarity( entity1.get('name', ''), entity2.get('name', '') ) # 如果名称完全匹配 if name_sim == 1.0: return 1.0, 'exact' # 检查别名 aliases1 = set(a.lower() for a in entity1.get('aliases', [])) aliases2 = set(a.lower() for a in entity2.get('aliases', [])) if aliases1 & aliases2: # 有共同别名 return 0.95, 'alias_match' if entity2.get('name', '').lower() in aliases1: return 0.95, 'alias_match' if entity1.get('name', '').lower() in aliases2: return 0.95, 'alias_match' # 定义相似度 def_sim = self.calculate_string_similarity( entity1.get('definition', ''), entity2.get('definition', '') ) # 综合相似度 combined_sim = name_sim * 0.7 + def_sim * 0.3 if combined_sim >= self.similarity_threshold: return combined_sim, 'fuzzy' return combined_sim, 'none' def find_matching_entity(self, query_entity: Dict, candidate_entities: List[Dict], exclude_ids: Set[str] = None) -> Optional[AlignmentResult]: """ 在候选实体中查找匹配的实体 Args: query_entity: 查询实体 candidate_entities: 候选实体列表 exclude_ids: 排除的实体ID Returns: 对齐结果 """ exclude_ids = exclude_ids or set() best_match = None best_similarity = 0.0 for candidate in candidate_entities: if candidate.get('id') in exclude_ids: continue similarity, match_type = self.calculate_entity_similarity( query_entity, candidate ) if similarity > best_similarity and similarity >= self.similarity_threshold: best_similarity = similarity best_match = candidate best_match_type = match_type if best_match: return AlignmentResult( entity_id=query_entity.get('id'), matched_entity_id=best_match.get('id'), similarity=best_similarity, match_type=best_match_type, confidence=best_similarity ) return None def align_cross_modal_entities(self, project_id: str, audio_entities: List[Dict], video_entities: List[Dict], image_entities: List[Dict], document_entities: List[Dict]) -> List[EntityLink]: """ 跨模态实体对齐 Args: project_id: 项目ID audio_entities: 音频模态实体 video_entities: 视频模态实体 image_entities: 图片模态实体 document_entities: 文档模态实体 Returns: 实体关联列表 """ links = [] # 合并所有实体 all_entities = { 'audio': audio_entities, 'video': video_entities, 'image': image_entities, 'document': document_entities } # 跨模态对齐 for mod1 in self.MODALITIES: for mod2 in self.MODALITIES: if mod1 >= mod2: # 避免重复比较 continue entities1 = all_entities.get(mod1, []) entities2 = all_entities.get(mod2, []) for ent1 in entities1: # 在另一个模态中查找匹配 result = self.find_matching_entity(ent1, entities2) if result and result.matched_entity_id: link = EntityLink( id=str(uuid.uuid4())[:8], project_id=project_id, source_entity_id=ent1.get('id'), target_entity_id=result.matched_entity_id, link_type='same_as' if result.similarity > 0.95 else 'related_to', source_modality=mod1, target_modality=mod2, confidence=result.confidence, evidence=f"Cross-modal alignment: {result.match_type}" ) links.append(link) return links def fuse_entity_knowledge(self, entity_id: str, linked_entities: List[Dict], multimodal_mentions: List[Dict]) -> FusionResult: """ 融合多模态实体知识 Args: entity_id: 主实体ID linked_entities: 关联的实体信息列表 multimodal_mentions: 多模态提及列表 Returns: 融合结果 """ # 收集所有属性 fused_properties = { 'names': set(), 'definitions': [], 'aliases': set(), 'types': set(), 'modalities': set(), 'contexts': [] } merged_ids = [] for entity in linked_entities: merged_ids.append(entity.get('id')) # 收集名称 fused_properties['names'].add(entity.get('name', '')) # 收集定义 if entity.get('definition'): fused_properties['definitions'].append(entity.get('definition')) # 收集别名 fused_properties['aliases'].update(entity.get('aliases', [])) # 收集类型 fused_properties['types'].add(entity.get('type', 'OTHER')) # 收集模态和上下文 for mention in multimodal_mentions: fused_properties['modalities'].add(mention.get('source_type', '')) if mention.get('mention_context'): fused_properties['contexts'].append(mention.get('mention_context')) # 选择最佳定义(最长的那个) best_definition = max(fused_properties['definitions'], key=len) \ if fused_properties['definitions'] else "" # 选择最佳名称(最常见的那个) from collections import Counter name_counts = Counter(fused_properties['names']) best_name = name_counts.most_common(1)[0][0] if name_counts else "" # 构建融合结果 return FusionResult( canonical_entity_id=entity_id, merged_entity_ids=merged_ids, fused_properties={ 'name': best_name, 'definition': best_definition, 'aliases': list(fused_properties['aliases']), 'types': list(fused_properties['types']), 'modalities': list(fused_properties['modalities']), 'contexts': fused_properties['contexts'][:10] # 最多10个上下文 }, source_modalities=list(fused_properties['modalities']), confidence=min(1.0, len(linked_entities) * 0.2 + 0.5) ) def detect_entity_conflicts(self, entities: List[Dict]) -> List[Dict]: """ 检测实体冲突(同名但不同义) Args: entities: 实体列表 Returns: 冲突列表 """ conflicts = [] # 按名称分组 name_groups = {} for entity in entities: name = entity.get('name', '').lower() if name: if name not in name_groups: name_groups[name] = [] name_groups[name].append(entity) # 检测同名但定义不同的实体 for name, group in name_groups.items(): if len(group) > 1: # 检查定义是否相似 definitions = [e.get('definition', '') for e in group if e.get('definition')] if len(definitions) > 1: # 计算定义之间的相似度 sim_matrix = [] for i, d1 in enumerate(definitions): for j, d2 in enumerate(definitions): if i < j: sim = self.calculate_string_similarity(d1, d2) sim_matrix.append(sim) # 如果定义相似度都很低,可能是冲突 if sim_matrix and all(s < 0.5 for s in sim_matrix): conflicts.append({ 'name': name, 'entities': group, 'type': 'homonym_conflict', 'suggestion': 'Consider disambiguating these entities' }) return conflicts def suggest_entity_merges(self, entities: List[Dict], existing_links: List[EntityLink] = None) -> List[Dict]: """ 建议实体合并 Args: entities: 实体列表 existing_links: 现有实体关联 Returns: 合并建议列表 """ suggestions = [] existing_pairs = set() # 记录已有的关联 if existing_links: for link in existing_links: pair = tuple(sorted([link.source_entity_id, link.target_entity_id])) existing_pairs.add(pair) # 检查所有实体对 for i, ent1 in enumerate(entities): for j, ent2 in enumerate(entities): if i >= j: continue # 检查是否已有关联 pair = tuple(sorted([ent1.get('id'), ent2.get('id')])) if pair in existing_pairs: continue # 计算相似度 similarity, match_type = self.calculate_entity_similarity(ent1, ent2) if similarity >= self.similarity_threshold: suggestions.append({ 'entity1': ent1, 'entity2': ent2, 'similarity': similarity, 'match_type': match_type, 'suggested_action': 'merge' if similarity > 0.95 else 'link' }) # 按相似度排序 suggestions.sort(key=lambda x: x['similarity'], reverse=True) return suggestions def create_multimodal_entity_record(self, project_id: str, entity_id: str, source_type: str, source_id: str, mention_context: str = "", confidence: float = 1.0) -> MultimodalEntity: """ 创建多模态实体记录 Args: project_id: 项目ID entity_id: 实体ID source_type: 来源类型 source_id: 来源ID mention_context: 提及上下文 confidence: 置信度 Returns: 多模态实体记录 """ return MultimodalEntity( id=str(uuid.uuid4())[:8], entity_id=entity_id, project_id=project_id, name="", # 将在后续填充 source_type=source_type, source_id=source_id, mention_context=mention_context, confidence=confidence ) def analyze_modality_distribution(self, multimodal_entities: List[MultimodalEntity]) -> Dict: """ 分析模态分布 Args: multimodal_entities: 多模态实体列表 Returns: 模态分布统计 """ distribution = {mod: 0 for mod in self.MODALITIES} cross_modal_entities = set() # 统计每个模态的实体数 for me in multimodal_entities: if me.source_type in distribution: distribution[me.source_type] += 1 # 统计跨模态实体 entity_modalities = {} for me in multimodal_entities: if me.entity_id not in entity_modalities: entity_modalities[me.entity_id] = set() entity_modalities[me.entity_id].add(me.source_type) cross_modal_count = sum(1 for mods in entity_modalities.values() if len(mods) > 1) return { 'modality_distribution': distribution, 'total_multimodal_records': len(multimodal_entities), 'unique_entities': len(entity_modalities), 'cross_modal_entities': cross_modal_count, 'cross_modal_ratio': cross_modal_count / len(entity_modalities) if entity_modalities else 0 } # Singleton instance _multimodal_entity_linker = None def get_multimodal_entity_linker(similarity_threshold: float = 0.85) -> MultimodalEntityLinker: """获取多模态实体关联器单例""" global _multimodal_entity_linker if _multimodal_entity_linker is None: _multimodal_entity_linker = MultimodalEntityLinker(similarity_threshold) return _multimodal_entity_linker