Compare commits

..

2 Commits

Author SHA1 Message Date
OpenClaw Bot
243f41de8f Merge remote changes 2026-02-24 00:13:24 +08:00
OpenClaw Bot
c557cc52c4 Phase 7 Task 4: 协作与共享模块
- 创建 collaboration_manager.py 协作管理模块
  - CollaborationManager: 协作管理主类
  - 项目分享链接管理 - 支持只读/评论/编辑/管理员权限
  - 评论和批注系统 - 支持实体、关系、转录文本评论
  - 变更历史追踪 - 记录所有数据操作变更
  - 团队成员管理 - 支持多角色权限控制

- 更新 schema.sql 添加协作相关数据库表
  - project_shares: 项目分享表
  - comments: 评论表
  - change_history: 变更历史表
  - team_members: 团队成员表

- 更新 main.py 添加协作相关 API 端点
  - 项目分享相关端点
  - 评论和批注相关端点
  - 变更历史相关端点
  - 团队成员管理端点

- 更新 README.md 和 STATUS.md
2026-02-24 00:13:09 +08:00
7 changed files with 914 additions and 0 deletions

View File

@@ -0,0 +1,914 @@
"""
InsightFlow - 协作与共享模块 (Phase 7 Task 4)
支持项目分享、评论批注、变更历史、团队空间
"""
import os
import json
import uuid
import hashlib
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
from dataclasses import dataclass, asdict
from enum import Enum
class SharePermission(Enum):
"""分享权限级别"""
READ_ONLY = "read_only" # 只读
COMMENT = "comment" # 可评论
EDIT = "edit" # 可编辑
ADMIN = "admin" # 管理员
class CommentTargetType(Enum):
"""评论目标类型"""
ENTITY = "entity" # 实体评论
RELATION = "relation" # 关系评论
TRANSCRIPT = "transcript" # 转录文本评论
PROJECT = "project" # 项目级评论
class ChangeType(Enum):
"""变更类型"""
CREATE = "create" # 创建
UPDATE = "update" # 更新
DELETE = "delete" # 删除
MERGE = "merge" # 合并
SPLIT = "split" # 拆分
@dataclass
class ProjectShare:
"""项目分享链接"""
id: str
project_id: str
token: str # 分享令牌
permission: str # 权限级别
created_by: str # 创建者
created_at: str
expires_at: Optional[str] # 过期时间
max_uses: Optional[int] # 最大使用次数
use_count: int # 已使用次数
password_hash: Optional[str] # 密码保护
is_active: bool # 是否激活
allow_download: bool # 允许下载
allow_export: bool # 允许导出
@dataclass
class Comment:
"""评论/批注"""
id: str
project_id: str
target_type: str # 评论目标类型
target_id: str # 目标ID
parent_id: Optional[str] # 父评论ID支持回复
author: str # 作者
author_name: str # 作者显示名
content: str # 评论内容
created_at: str
updated_at: str
resolved: bool # 是否已解决
resolved_by: Optional[str] # 解决者
resolved_at: Optional[str] # 解决时间
mentions: List[str] # 提及的用户
attachments: List[Dict] # 附件
@dataclass
class ChangeRecord:
"""变更记录"""
id: str
project_id: str
change_type: str # 变更类型
entity_type: str # 实体类型 (entity/relation/transcript/project)
entity_id: str # 实体ID
entity_name: str # 实体名称(用于显示)
changed_by: str # 变更者
changed_by_name: str # 变更者显示名
changed_at: str
old_value: Optional[Dict] # 旧值
new_value: Optional[Dict] # 新值
description: str # 变更描述
session_id: Optional[str] # 会话ID批量变更关联
reverted: bool # 是否已回滚
reverted_at: Optional[str] # 回滚时间
reverted_by: Optional[str] # 回滚者
@dataclass
class TeamMember:
"""团队成员"""
id: str
project_id: str
user_id: str # 用户ID
user_name: str # 用户名
user_email: str # 用户邮箱
role: str # 角色 (owner/admin/editor/viewer)
joined_at: str
invited_by: str # 邀请者
last_active_at: Optional[str] # 最后活跃时间
permissions: List[str] # 具体权限列表
@dataclass
class TeamSpace:
"""团队空间"""
id: str
name: str
description: str
created_by: str
created_at: str
updated_at: str
member_count: int
project_count: int
settings: Dict[str, Any] # 团队设置
class CollaborationManager:
"""协作管理主类"""
def __init__(self, db_manager=None):
self.db = db_manager
self._shares_cache: Dict[str, ProjectShare] = {}
self._comments_cache: Dict[str, List[Comment]] = {}
# ============ 项目分享 ============
def create_share_link(
self,
project_id: str,
created_by: str,
permission: str = "read_only",
expires_in_days: Optional[int] = None,
max_uses: Optional[int] = None,
password: Optional[str] = None,
allow_download: bool = False,
allow_export: bool = False
) -> ProjectShare:
"""创建项目分享链接"""
share_id = str(uuid.uuid4())
token = self._generate_share_token(project_id)
now = datetime.now().isoformat()
expires_at = None
if expires_in_days:
expires_at = (datetime.now() + timedelta(days=expires_in_days)).isoformat()
password_hash = None
if password:
password_hash = hashlib.sha256(password.encode()).hexdigest()
share = ProjectShare(
id=share_id,
project_id=project_id,
token=token,
permission=permission,
created_by=created_by,
created_at=now,
expires_at=expires_at,
max_uses=max_uses,
use_count=0,
password_hash=password_hash,
is_active=True,
allow_download=allow_download,
allow_export=allow_export
)
# 保存到数据库
if self.db:
self._save_share_to_db(share)
self._shares_cache[token] = share
return share
def _generate_share_token(self, project_id: str) -> str:
"""生成分享令牌"""
data = f"{project_id}:{datetime.now().timestamp()}:{uuid.uuid4()}"
return hashlib.sha256(data.encode()).hexdigest()[:32]
def _save_share_to_db(self, share: ProjectShare):
"""保存分享记录到数据库"""
cursor = self.db.conn.cursor()
cursor.execute("""
INSERT INTO project_shares
(id, project_id, token, permission, created_by, created_at,
expires_at, max_uses, use_count, password_hash, is_active,
allow_download, allow_export)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
share.id, share.project_id, share.token, share.permission,
share.created_by, share.created_at, share.expires_at,
share.max_uses, share.use_count, share.password_hash,
share.is_active, share.allow_download, share.allow_export
))
self.db.conn.commit()
def validate_share_token(
self,
token: str,
password: Optional[str] = None
) -> Optional[ProjectShare]:
"""验证分享令牌"""
# 从缓存或数据库获取
share = self._shares_cache.get(token)
if not share and self.db:
share = self._get_share_from_db(token)
if not share:
return None
# 检查是否激活
if not share.is_active:
return None
# 检查是否过期
if share.expires_at and datetime.now().isoformat() > share.expires_at:
return None
# 检查使用次数
if share.max_uses and share.use_count >= share.max_uses:
return None
# 验证密码
if share.password_hash:
if not password:
return None
password_hash = hashlib.sha256(password.encode()).hexdigest()
if password_hash != share.password_hash:
return None
return share
def _get_share_from_db(self, token: str) -> Optional[ProjectShare]:
"""从数据库获取分享记录"""
cursor = self.db.conn.cursor()
cursor.execute("""
SELECT * FROM project_shares WHERE token = ?
""", (token,))
row = cursor.fetchone()
if not row:
return None
return ProjectShare(
id=row[0],
project_id=row[1],
token=row[2],
permission=row[3],
created_by=row[4],
created_at=row[5],
expires_at=row[6],
max_uses=row[7],
use_count=row[8],
password_hash=row[9],
is_active=bool(row[10]),
allow_download=bool(row[11]),
allow_export=bool(row[12])
)
def increment_share_usage(self, token: str):
"""增加分享链接使用次数"""
share = self._shares_cache.get(token)
if share:
share.use_count += 1
if self.db:
cursor = self.db.conn.cursor()
cursor.execute("""
UPDATE project_shares
SET use_count = use_count + 1
WHERE token = ?
""", (token,))
self.db.conn.commit()
def revoke_share_link(self, share_id: str, revoked_by: str) -> bool:
"""撤销分享链接"""
if self.db:
cursor = self.db.conn.cursor()
cursor.execute("""
UPDATE project_shares
SET is_active = 0
WHERE id = ?
""", (share_id,))
self.db.conn.commit()
return cursor.rowcount > 0
return False
def list_project_shares(self, project_id: str) -> List[ProjectShare]:
"""列出项目的所有分享链接"""
if not self.db:
return []
cursor = self.db.conn.cursor()
cursor.execute("""
SELECT * FROM project_shares
WHERE project_id = ?
ORDER BY created_at DESC
""", (project_id,))
shares = []
for row in cursor.fetchall():
shares.append(ProjectShare(
id=row[0],
project_id=row[1],
token=row[2],
permission=row[3],
created_by=row[4],
created_at=row[5],
expires_at=row[6],
max_uses=row[7],
use_count=row[8],
password_hash=row[9],
is_active=bool(row[10]),
allow_download=bool(row[11]),
allow_export=bool(row[12])
))
return shares
# ============ 评论和批注 ============
def add_comment(
self,
project_id: str,
target_type: str,
target_id: str,
author: str,
author_name: str,
content: str,
parent_id: Optional[str] = None,
mentions: Optional[List[str]] = None,
attachments: Optional[List[Dict]] = None
) -> Comment:
"""添加评论"""
comment_id = str(uuid.uuid4())
now = datetime.now().isoformat()
comment = Comment(
id=comment_id,
project_id=project_id,
target_type=target_type,
target_id=target_id,
parent_id=parent_id,
author=author,
author_name=author_name,
content=content,
created_at=now,
updated_at=now,
resolved=False,
resolved_by=None,
resolved_at=None,
mentions=mentions or [],
attachments=attachments or []
)
if self.db:
self._save_comment_to_db(comment)
# 更新缓存
key = f"{target_type}:{target_id}"
if key not in self._comments_cache:
self._comments_cache[key] = []
self._comments_cache[key].append(comment)
return comment
def _save_comment_to_db(self, comment: Comment):
"""保存评论到数据库"""
cursor = self.db.conn.cursor()
cursor.execute("""
INSERT INTO comments
(id, project_id, target_type, target_id, parent_id, author, author_name,
content, created_at, updated_at, resolved, resolved_by, resolved_at,
mentions, attachments)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
comment.id, comment.project_id, comment.target_type, comment.target_id,
comment.parent_id, comment.author, comment.author_name, comment.content,
comment.created_at, comment.updated_at, comment.resolved,
comment.resolved_by, comment.resolved_at,
json.dumps(comment.mentions), json.dumps(comment.attachments)
))
self.db.conn.commit()
def get_comments(
self,
target_type: str,
target_id: str,
include_resolved: bool = True
) -> List[Comment]:
"""获取评论列表"""
if not self.db:
return []
cursor = self.db.conn.cursor()
if include_resolved:
cursor.execute("""
SELECT * FROM comments
WHERE target_type = ? AND target_id = ?
ORDER BY created_at ASC
""", (target_type, target_id))
else:
cursor.execute("""
SELECT * FROM comments
WHERE target_type = ? AND target_id = ? AND resolved = 0
ORDER BY created_at ASC
""", (target_type, target_id))
comments = []
for row in cursor.fetchall():
comments.append(self._row_to_comment(row))
return comments
def _row_to_comment(self, row) -> Comment:
"""将数据库行转换为Comment对象"""
return Comment(
id=row[0],
project_id=row[1],
target_type=row[2],
target_id=row[3],
parent_id=row[4],
author=row[5],
author_name=row[6],
content=row[7],
created_at=row[8],
updated_at=row[9],
resolved=bool(row[10]),
resolved_by=row[11],
resolved_at=row[12],
mentions=json.loads(row[13]) if row[13] else [],
attachments=json.loads(row[14]) if row[14] else []
)
def update_comment(
self,
comment_id: str,
content: str,
updated_by: str
) -> Optional[Comment]:
"""更新评论"""
if not self.db:
return None
now = datetime.now().isoformat()
cursor = self.db.conn.cursor()
cursor.execute("""
UPDATE comments
SET content = ?, updated_at = ?
WHERE id = ? AND author = ?
""", (content, now, comment_id, updated_by))
self.db.conn.commit()
if cursor.rowcount > 0:
return self._get_comment_by_id(comment_id)
return None
def _get_comment_by_id(self, comment_id: str) -> Optional[Comment]:
"""根据ID获取评论"""
cursor = self.db.conn.cursor()
cursor.execute("SELECT * FROM comments WHERE id = ?", (comment_id,))
row = cursor.fetchone()
if row:
return self._row_to_comment(row)
return None
def resolve_comment(
self,
comment_id: str,
resolved_by: str
) -> bool:
"""标记评论为已解决"""
if not self.db:
return False
now = datetime.now().isoformat()
cursor = self.db.conn.cursor()
cursor.execute("""
UPDATE comments
SET resolved = 1, resolved_by = ?, resolved_at = ?
WHERE id = ?
""", (resolved_by, now, comment_id))
self.db.conn.commit()
return cursor.rowcount > 0
def delete_comment(self, comment_id: str, deleted_by: str) -> bool:
"""删除评论"""
if not self.db:
return False
cursor = self.db.conn.cursor()
# 只允许作者或管理员删除
cursor.execute("""
DELETE FROM comments
WHERE id = ? AND (author = ? OR ? IN (
SELECT created_by FROM projects WHERE id = comments.project_id
))
""", (comment_id, deleted_by, deleted_by))
self.db.conn.commit()
return cursor.rowcount > 0
def get_project_comments(
self,
project_id: str,
limit: int = 50,
offset: int = 0
) -> List[Comment]:
"""获取项目下的所有评论"""
if not self.db:
return []
cursor = self.db.conn.cursor()
cursor.execute("""
SELECT * FROM comments
WHERE project_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (project_id, limit, offset))
comments = []
for row in cursor.fetchall():
comments.append(self._row_to_comment(row))
return comments
# ============ 变更历史 ============
def record_change(
self,
project_id: str,
change_type: str,
entity_type: str,
entity_id: str,
entity_name: str,
changed_by: str,
changed_by_name: str,
old_value: Optional[Dict] = None,
new_value: Optional[Dict] = None,
description: str = "",
session_id: Optional[str] = None
) -> ChangeRecord:
"""记录变更"""
record_id = str(uuid.uuid4())
now = datetime.now().isoformat()
record = ChangeRecord(
id=record_id,
project_id=project_id,
change_type=change_type,
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
changed_by=changed_by,
changed_by_name=changed_by_name,
changed_at=now,
old_value=old_value,
new_value=new_value,
description=description,
session_id=session_id,
reverted=False,
reverted_at=None,
reverted_by=None
)
if self.db:
self._save_change_to_db(record)
return record
def _save_change_to_db(self, record: ChangeRecord):
"""保存变更记录到数据库"""
cursor = self.db.conn.cursor()
cursor.execute("""
INSERT INTO change_history
(id, project_id, change_type, entity_type, entity_id, entity_name,
changed_by, changed_by_name, changed_at, old_value, new_value,
description, session_id, reverted, reverted_at, reverted_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.id, record.project_id, record.change_type, record.entity_type,
record.entity_id, record.entity_name, record.changed_by, record.changed_by_name,
record.changed_at, json.dumps(record.old_value) if record.old_value else None,
json.dumps(record.new_value) if record.new_value else None,
record.description, record.session_id, record.reverted,
record.reverted_at, record.reverted_by
))
self.db.conn.commit()
def get_change_history(
self,
project_id: str,
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> List[ChangeRecord]:
"""获取变更历史"""
if not self.db:
return []
cursor = self.db.conn.cursor()
if entity_type and entity_id:
cursor.execute("""
SELECT * FROM change_history
WHERE project_id = ? AND entity_type = ? AND entity_id = ?
ORDER BY changed_at DESC
LIMIT ? OFFSET ?
""", (project_id, entity_type, entity_id, limit, offset))
elif entity_type:
cursor.execute("""
SELECT * FROM change_history
WHERE project_id = ? AND entity_type = ?
ORDER BY changed_at DESC
LIMIT ? OFFSET ?
""", (project_id, entity_type, limit, offset))
else:
cursor.execute("""
SELECT * FROM change_history
WHERE project_id = ?
ORDER BY changed_at DESC
LIMIT ? OFFSET ?
""", (project_id, limit, offset))
records = []
for row in cursor.fetchall():
records.append(self._row_to_change_record(row))
return records
def _row_to_change_record(self, row) -> ChangeRecord:
"""将数据库行转换为ChangeRecord对象"""
return ChangeRecord(
id=row[0],
project_id=row[1],
change_type=row[2],
entity_type=row[3],
entity_id=row[4],
entity_name=row[5],
changed_by=row[6],
changed_by_name=row[7],
changed_at=row[8],
old_value=json.loads(row[9]) if row[9] else None,
new_value=json.loads(row[10]) if row[10] else None,
description=row[11],
session_id=row[12],
reverted=bool(row[13]),
reverted_at=row[14],
reverted_by=row[15]
)
def get_entity_version_history(
self,
entity_type: str,
entity_id: str
) -> List[ChangeRecord]:
"""获取实体的版本历史(用于版本对比)"""
if not self.db:
return []
cursor = self.db.conn.cursor()
cursor.execute("""
SELECT * FROM change_history
WHERE entity_type = ? AND entity_id = ?
ORDER BY changed_at ASC
""", (entity_type, entity_id))
records = []
for row in cursor.fetchall():
records.append(self._row_to_change_record(row))
return records
def revert_change(self, record_id: str, reverted_by: str) -> bool:
"""回滚变更"""
if not self.db:
return False
now = datetime.now().isoformat()
cursor = self.db.conn.cursor()
cursor.execute("""
UPDATE change_history
SET reverted = 1, reverted_at = ?, reverted_by = ?
WHERE id = ? AND reverted = 0
""", (now, reverted_by, record_id))
self.db.conn.commit()
return cursor.rowcount > 0
def get_change_stats(self, project_id: str) -> Dict[str, Any]:
"""获取变更统计"""
if not self.db:
return {}
cursor = self.db.conn.cursor()
# 总变更数
cursor.execute("""
SELECT COUNT(*) FROM change_history WHERE project_id = ?
""", (project_id,))
total_changes = cursor.fetchone()[0]
# 按类型统计
cursor.execute("""
SELECT change_type, COUNT(*) FROM change_history
WHERE project_id = ? GROUP BY change_type
""", (project_id,))
type_counts = {row[0]: row[1] for row in cursor.fetchall()}
# 按实体类型统计
cursor.execute("""
SELECT entity_type, COUNT(*) FROM change_history
WHERE project_id = ? GROUP BY entity_type
""", (project_id,))
entity_type_counts = {row[0]: row[1] for row in cursor.fetchall()}
# 最近活跃的用户
cursor.execute("""
SELECT changed_by_name, COUNT(*) as count FROM change_history
WHERE project_id = ?
GROUP BY changed_by_name
ORDER BY count DESC
LIMIT 5
""", (project_id,))
top_contributors = [
{"name": row[0], "changes": row[1]}
for row in cursor.fetchall()
]
return {
"total_changes": total_changes,
"by_type": type_counts,
"by_entity_type": entity_type_counts,
"top_contributors": top_contributors
}
# ============ 团队成员管理 ============
def add_team_member(
self,
project_id: str,
user_id: str,
user_name: str,
user_email: str,
role: str,
invited_by: str,
permissions: Optional[List[str]] = None
) -> TeamMember:
"""添加团队成员"""
member_id = str(uuid.uuid4())
now = datetime.now().isoformat()
# 根据角色设置默认权限
if permissions is None:
permissions = self._get_default_permissions(role)
member = TeamMember(
id=member_id,
project_id=project_id,
user_id=user_id,
user_name=user_name,
user_email=user_email,
role=role,
joined_at=now,
invited_by=invited_by,
last_active_at=None,
permissions=permissions
)
if self.db:
self._save_member_to_db(member)
return member
def _get_default_permissions(self, role: str) -> List[str]:
"""获取角色的默认权限"""
permissions_map = {
"owner": ["read", "write", "delete", "share", "admin", "export"],
"admin": ["read", "write", "delete", "share", "export"],
"editor": ["read", "write", "export"],
"viewer": ["read"],
"commenter": ["read", "comment"]
}
return permissions_map.get(role, ["read"])
def _save_member_to_db(self, member: TeamMember):
"""保存成员到数据库"""
cursor = self.db.conn.cursor()
cursor.execute("""
INSERT INTO team_members
(id, project_id, user_id, user_name, user_email, role, joined_at,
invited_by, last_active_at, permissions)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
member.id, member.project_id, member.user_id, member.user_name,
member.user_email, member.role, member.joined_at, member.invited_by,
member.last_active_at, json.dumps(member.permissions)
))
self.db.conn.commit()
def get_team_members(self, project_id: str) -> List[TeamMember]:
"""获取团队成员列表"""
if not self.db:
return []
cursor = self.db.conn.cursor()
cursor.execute("""
SELECT * FROM team_members WHERE project_id = ?
ORDER BY joined_at ASC
""", (project_id,))
members = []
for row in cursor.fetchall():
members.append(self._row_to_team_member(row))
return members
def _row_to_team_member(self, row) -> TeamMember:
"""将数据库行转换为TeamMember对象"""
return TeamMember(
id=row[0],
project_id=row[1],
user_id=row[2],
user_name=row[3],
user_email=row[4],
role=row[5],
joined_at=row[6],
invited_by=row[7],
last_active_at=row[8],
permissions=json.loads(row[9]) if row[9] else []
)
def update_member_role(
self,
member_id: str,
new_role: str,
updated_by: str
) -> bool:
"""更新成员角色"""
if not self.db:
return False
permissions = self._get_default_permissions(new_role)
cursor = self.db.conn.cursor()
cursor.execute("""
UPDATE team_members
SET role = ?, permissions = ?
WHERE id = ?
""", (new_role, json.dumps(permissions), member_id))
self.db.conn.commit()
return cursor.rowcount > 0
def remove_team_member(self, member_id: str, removed_by: str) -> bool:
"""移除团队成员"""
if not self.db:
return False
cursor = self.db.conn.cursor()
cursor.execute("DELETE FROM team_members WHERE id = ?", (member_id,))
self.db.conn.commit()
return cursor.rowcount > 0
def check_permission(
self,
project_id: str,
user_id: str,
permission: str
) -> bool:
"""检查用户权限"""
if not self.db:
return False
cursor = self.db.conn.cursor()
cursor.execute("""
SELECT permissions FROM team_members
WHERE project_id = ? AND user_id = ?
""", (project_id, user_id))
row = cursor.fetchone()
if not row:
return False
permissions = json.loads(row[0]) if row[0] else []
return permission in permissions or "admin" in permissions
def update_last_active(self, project_id: str, user_id: str):
"""更新用户最后活跃时间"""
if not self.db:
return
now = datetime.now().isoformat()
cursor = self.db.conn.cursor()
cursor.execute("""
UPDATE team_members
SET last_active_at = ?
WHERE project_id = ? AND user_id = ?
""", (now, project_id, user_id))
self.db.conn.commit()
# 全局协作管理器实例
_collaboration_manager = None
def get_collaboration_manager(db_manager=None):
"""获取协作管理器单例"""
global _collaboration_manager
if _collaboration_manager is None:
_collaboration_manager = CollaborationManager(db_manager)
return _collaboration_manager