Files
insightflow/backend/multimodal_entity_linker.py
AutoFix Bot 9fd1da8fb7 fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解
2026-03-03 06:03:38 +08:00

529 lines
16 KiB
Python

#!/usr/bin/env python3
"""
InsightFlow Multimodal Entity Linker - Phase 7
多模态实体关联模块:跨模态实体对齐和知识融合
"""
import uuid
from dataclasses import dataclass
from difflib import SequenceMatcher
# Constants
UUID_LENGTH = 8 # UUID 截断长度
# 尝试导入embedding库
try:
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
@dataclass
class MultimodalEntity:
"""多模态实体"""
id: str
entity_id: str
project_id: str
name: str
source_type: str # audio, video, image, document
source_id: str
mention_context: str
confidence: float
modality_features: dict = None # 模态特定特征
def __post_init__(self) -> None:
if self.modality_features is None:
self.modality_features = {}
@dataclass
class EntityLink:
"""实体关联"""
id: str
project_id: str
source_entity_id: str
target_entity_id: str
link_type: str # same_as, related_to, part_of
source_modality: str
target_modality: str
confidence: float
evidence: str
@dataclass
class AlignmentResult:
"""对齐结果"""
entity_id: str
matched_entity_id: str | None
similarity: float
match_type: str # exact, fuzzy, embedding
confidence: float
@dataclass
class FusionResult:
"""知识融合结果"""
canonical_entity_id: str
merged_entity_ids: list[str]
fused_properties: dict
source_modalities: list[str]
confidence: float
class MultimodalEntityLinker:
"""多模态实体关联器 - 跨模态实体对齐和知识融合"""
# 关联类型
LINK_TYPES = {
"same_as": "同一实体",
"related_to": "相关实体",
"part_of": "组成部分",
"mentions": "提及关系",
}
# 模态类型
MODALITIES = ["audio", "video", "image", "document"]
def __init__(self, similarity_threshold: float = 0.85) -> None:
"""
初始化多模态实体关联器
Args:
similarity_threshold: 相似度阈值
"""
self.similarity_threshold = similarity_threshold
def calculate_string_similarity(self, s1: str, s2: str) -> float:
"""
计算字符串相似度
Args:
s1: 字符串1
s2: 字符串2
Returns:
相似度分数 (0-1)
"""
if not s1 or not s2:
return 0.0
s1, s2 = s1.lower().strip(), s2.lower().strip()
# 完全匹配
if s1 == s2:
return 1.0
# 包含关系
if s1 in s2 or s2 in s1:
return 0.9
# 编辑距离相似度
return SequenceMatcher(None, s1, s2).ratio()
def calculate_entity_similarity(self, entity1: dict, entity2: dict) -> tuple[float, str]:
"""
计算两个实体的综合相似度
Args:
entity1: 实体1信息
entity2: 实体2信息
Returns:
(相似度, 匹配类型)
"""
# 名称相似度
name_sim = self.calculate_string_similarity(
entity1.get("name", ""), entity2.get("name", ""),
)
# 如果名称完全匹配
if name_sim == 1.0:
return 1.0, "exact"
# 检查别名
aliases1 = set(a.lower() for a in entity1.get("aliases", []))
aliases2 = set(a.lower() for a in entity2.get("aliases", []))
if aliases1 & aliases2: # 有共同别名
return 0.95, "alias_match"
if entity2.get("name", "").lower() in aliases1:
return 0.95, "alias_match"
if entity1.get("name", "").lower() in aliases2:
return 0.95, "alias_match"
# 定义相似度
def_sim = self.calculate_string_similarity(
entity1.get("definition", ""), entity2.get("definition", ""),
)
# 综合相似度
combined_sim = name_sim * 0.7 + def_sim * 0.3
if combined_sim >= self.similarity_threshold:
return combined_sim, "fuzzy"
return combined_sim, "none"
def find_matching_entity(
self, query_entity: dict, candidate_entities: list[dict], exclude_ids: set[str] = None,
) -> AlignmentResult | None:
"""
在候选实体中查找匹配的实体
Args:
query_entity: 查询实体
candidate_entities: 候选实体列表
exclude_ids: 排除的实体ID
Returns:
对齐结果
"""
exclude_ids = exclude_ids or set()
best_match = None
best_similarity = 0.0
for candidate in candidate_entities:
if candidate.get("id") in exclude_ids:
continue
similarity, match_type = self.calculate_entity_similarity(query_entity, candidate)
if similarity > best_similarity and similarity >= self.similarity_threshold:
best_similarity = similarity
best_match = candidate
best_match_type = match_type
if best_match:
return AlignmentResult(
entity_id=query_entity.get("id"),
matched_entity_id=best_match.get("id"),
similarity=best_similarity,
match_type=best_match_type,
confidence=best_similarity,
)
return None
def align_cross_modal_entities(
self,
project_id: str,
audio_entities: list[dict],
video_entities: list[dict],
image_entities: list[dict],
document_entities: list[dict],
) -> list[EntityLink]:
"""
跨模态实体对齐
Args:
project_id: 项目ID
audio_entities: 音频模态实体
video_entities: 视频模态实体
image_entities: 图片模态实体
document_entities: 文档模态实体
Returns:
实体关联列表
"""
links = []
# 合并所有实体
all_entities = {
"audio": audio_entities,
"video": video_entities,
"image": image_entities,
"document": document_entities,
}
# 跨模态对齐
for mod1 in self.MODALITIES:
for mod2 in self.MODALITIES:
if mod1 >= mod2: # 避免重复比较
continue
entities1 = all_entities.get(mod1, [])
entities2 = all_entities.get(mod2, [])
for ent1 in entities1:
# 在另一个模态中查找匹配
result = self.find_matching_entity(ent1, entities2)
if result and result.matched_entity_id:
link = EntityLink(
id=str(uuid.uuid4())[:UUID_LENGTH],
project_id=project_id,
source_entity_id=ent1.get("id"),
target_entity_id=result.matched_entity_id,
link_type="same_as" if result.similarity > 0.95 else "related_to",
source_modality=mod1,
target_modality=mod2,
confidence=result.confidence,
evidence=f"Cross-modal alignment: {result.match_type}",
)
links.append(link)
return links
def fuse_entity_knowledge(
self, entity_id: str, linked_entities: list[dict], multimodal_mentions: list[dict],
) -> FusionResult:
"""
融合多模态实体知识
Args:
entity_id: 主实体ID
linked_entities: 关联的实体信息列表
multimodal_mentions: 多模态提及列表
Returns:
融合结果
"""
# 收集所有属性
fused_properties = {
"names": set(),
"definitions": [],
"aliases": set(),
"types": set(),
"modalities": set(),
"contexts": [],
}
merged_ids = []
for entity in linked_entities:
merged_ids.append(entity.get("id"))
# 收集名称
fused_properties["names"].add(entity.get("name", ""))
# 收集定义
if entity.get("definition"):
fused_properties["definitions"].append(entity.get("definition"))
# 收集别名
fused_properties["aliases"].update(entity.get("aliases", []))
# 收集类型
fused_properties["types"].add(entity.get("type", "OTHER"))
# 收集模态和上下文
for mention in multimodal_mentions:
fused_properties["modalities"].add(mention.get("source_type", ""))
if mention.get("mention_context"):
fused_properties["contexts"].append(mention.get("mention_context"))
# 选择最佳定义(最长的那个)
best_definition = (
max(fused_properties["definitions"], key=len) if fused_properties["definitions"] else ""
)
# 选择最佳名称(最常见的那个)
from collections import Counter
name_counts = Counter(fused_properties["names"])
best_name = name_counts.most_common(1)[0][0] if name_counts else ""
# 构建融合结果
return FusionResult(
canonical_entity_id=entity_id,
merged_entity_ids=merged_ids,
fused_properties={
"name": best_name,
"definition": best_definition,
"aliases": list(fused_properties["aliases"]),
"types": list(fused_properties["types"]),
"modalities": list(fused_properties["modalities"]),
"contexts": fused_properties["contexts"][:10], # 最多10个上下文
},
source_modalities=list(fused_properties["modalities"]),
confidence=min(1.0, len(linked_entities) * 0.2 + 0.5),
)
def detect_entity_conflicts(self, entities: list[dict]) -> list[dict]:
"""
检测实体冲突(同名但不同义)
Args:
entities: 实体列表
Returns:
冲突列表
"""
conflicts = []
# 按名称分组
name_groups = {}
for entity in entities:
name = entity.get("name", "").lower()
if name:
if name not in name_groups:
name_groups[name] = []
name_groups[name].append(entity)
# 检测同名但定义不同的实体
for name, group in name_groups.items():
if len(group) > 1:
# 检查定义是否相似
definitions = [e.get("definition", "") for e in group if e.get("definition")]
if len(definitions) > 1:
# 计算定义之间的相似度
sim_matrix = []
for i, d1 in enumerate(definitions):
for j, d2 in enumerate(definitions):
if i < j:
sim = self.calculate_string_similarity(d1, d2)
sim_matrix.append(sim)
# 如果定义相似度都很低,可能是冲突
if sim_matrix and all(s < 0.5 for s in sim_matrix):
conflicts.append(
{
"name": name,
"entities": group,
"type": "homonym_conflict",
"suggestion": "Consider disambiguating these entities",
},
)
return conflicts
def suggest_entity_merges(
self, entities: list[dict], existing_links: list[EntityLink] = None,
) -> list[dict]:
"""
建议实体合并
Args:
entities: 实体列表
existing_links: 现有实体关联
Returns:
合并建议列表
"""
suggestions = []
existing_pairs = set()
# 记录已有的关联
if existing_links:
for link in existing_links:
pair = tuple(sorted([link.source_entity_id, link.target_entity_id]))
existing_pairs.add(pair)
# 检查所有实体对
for i, ent1 in enumerate(entities):
for j, ent2 in enumerate(entities):
if i >= j:
continue
# 检查是否已有关联
pair = tuple(sorted([ent1.get("id"), ent2.get("id")]))
if pair in existing_pairs:
continue
# 计算相似度
similarity, match_type = self.calculate_entity_similarity(ent1, ent2)
if similarity >= self.similarity_threshold:
suggestions.append(
{
"entity1": ent1,
"entity2": ent2,
"similarity": similarity,
"match_type": match_type,
"suggested_action": "merge" if similarity > 0.95 else "link",
},
)
# 按相似度排序
suggestions.sort(key=lambda x: x["similarity"], reverse=True)
return suggestions
def create_multimodal_entity_record(
self,
project_id: str,
entity_id: str,
source_type: str,
source_id: str,
mention_context: str = "",
confidence: float = 1.0,
) -> MultimodalEntity:
"""
创建多模态实体记录
Args:
project_id: 项目ID
entity_id: 实体ID
source_type: 来源类型
source_id: 来源ID
mention_context: 提及上下文
confidence: 置信度
Returns:
多模态实体记录
"""
return MultimodalEntity(
id=str(uuid.uuid4())[:UUID_LENGTH],
entity_id=entity_id,
project_id=project_id,
name="", # 将在后续填充
source_type=source_type,
source_id=source_id,
mention_context=mention_context,
confidence=confidence,
)
def analyze_modality_distribution(self, multimodal_entities: list[MultimodalEntity]) -> dict:
"""
分析模态分布
Args:
multimodal_entities: 多模态实体列表
Returns:
模态分布统计
"""
distribution = dict.fromkeys(self.MODALITIES, 0)
# 统计每个模态的实体数
for me in multimodal_entities:
if me.source_type in distribution:
distribution[me.source_type] += 1
# 统计跨模态实体
entity_modalities = {}
for me in multimodal_entities:
if me.entity_id not in entity_modalities:
entity_modalities[me.entity_id] = set()
entity_modalities[me.entity_id].add(me.source_type)
cross_modal_count = sum(1 for mods in entity_modalities.values() if len(mods) > 1)
return {
"modality_distribution": distribution,
"total_multimodal_records": len(multimodal_entities),
"unique_entities": len(entity_modalities),
"cross_modal_entities": cross_modal_count,
"cross_modal_ratio": cross_modal_count / len(entity_modalities)
if entity_modalities
else 0,
}
# Singleton instance
_multimodal_entity_linker = None
def get_multimodal_entity_linker(similarity_threshold: float = 0.85) -> MultimodalEntityLinker:
"""获取多模态实体关联器单例"""
global _multimodal_entity_linker
if _multimodal_entity_linker is None:
_multimodal_entity_linker = MultimodalEntityLinker(similarity_threshold)
return _multimodal_entity_linker