Files
insightflow/backend/developer_ecosystem_manager.py
AutoFix Bot b000397dbe fix: auto-fix code issues (cron)
- 修复重复导入/字段
- 修复异常处理
- 修复PEP8格式问题
- 添加类型注解

自动修复统计:
- 修复了1177个格式问题
- 删除了多余的空行
- 清理了行尾空格
- 移除了重复导入和未使用的导入
2026-03-04 09:16:13 +08:00

2068 lines
65 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
InsightFlow Developer Ecosystem Manager - Phase 8 Task 6
开发者生态系统模块
- SDK 发布与管理Python/JavaScript/Go
- 模板市场(行业模板、预训练模型)
- 插件市场(第三方插件审核与分发)
- 开发者文档与示例代码
作者: InsightFlow Team
"""
import json
import os
import sqlite3
import uuid
from dataclasses import dataclass
from datetime import datetime
from enum import StrEnum
# Database path
DB_PATH = os.path.join(os.path.dirname(__file__), "insightflow.db")
class SDKLanguage(StrEnum):
"""SDK 语言类型"""
PYTHON = "python"
JAVASCRIPT = "javascript"
TYPESCRIPT = "typescript"
GO = "go"
JAVA = "java"
RUST = "rust"
class SDKStatus(StrEnum):
"""SDK 状态"""
DRAFT = "draft" # 草稿
BETA = "beta" # 测试版
STABLE = "stable" # 稳定版
DEPRECATED = "deprecated" # 已弃用
ARCHIVED = "archived" # 已归档
class TemplateCategory(StrEnum):
"""模板分类"""
MEDICAL = "medical" # 医疗
LEGAL = "legal" # 法律
FINANCE = "finance" # 金融
EDUCATION = "education" # 教育
TECH = "tech" # 科技
GENERAL = "general" # 通用
class TemplateStatus(StrEnum):
"""模板状态"""
PENDING = "pending" # 待审核
APPROVED = "approved" # 已通过
REJECTED = "rejected" # 已拒绝
PUBLISHED = "published" # 已发布
UNLISTED = "unlisted" # 未列出
class PluginStatus(StrEnum):
"""插件状态"""
PENDING = "pending" # 待审核
REVIEWING = "reviewing" # 审核中
APPROVED = "approved" # 已通过
REJECTED = "rejected" # 已拒绝
PUBLISHED = "published" # 已发布
SUSPENDED = "suspended" # 已暂停
class PluginCategory(StrEnum):
"""插件分类"""
INTEGRATION = "integration" # 集成
ANALYSIS = "analysis" # 分析
VISUALIZATION = "visualization" # 可视化
AUTOMATION = "automation" # 自动化
SECURITY = "security" # 安全
CUSTOM = "custom" # 自定义
class DeveloperStatus(StrEnum):
"""开发者认证状态"""
UNVERIFIED = "unverified" # 未认证
PENDING = "pending" # 审核中
VERIFIED = "verified" # 已认证
CERTIFIED = "certified" # 已认证(高级)
SUSPENDED = "suspended" # 已暂停
@dataclass
class SDKRelease:
"""SDK 发布"""
id: str
name: str
language: SDKLanguage
version: str
description: str
changelog: str
download_url: str
documentation_url: str
repository_url: str
package_name: str # pip/npm/go module name
status: SDKStatus
min_platform_version: str
dependencies: list[dict] # [{"name": "requests", "version": ">= 2.0"}]
file_size: int
checksum: str
download_count: int
created_at: str
updated_at: str
published_at: str | None
created_by: str
@dataclass
class SDKVersion:
"""SDK 版本历史"""
id: str
sdk_id: str
version: str
is_latest: bool
is_lts: bool # 长期支持版本
release_notes: str
download_url: str
checksum: str
file_size: int
download_count: int
created_at: str
@dataclass
class TemplateMarketItem:
"""模板市场项目"""
id: str
name: str
description: str
category: TemplateCategory
subcategory: str | None
tags: list[str]
author_id: str
author_name: str
status: TemplateStatus
price: float # 0 = 免费
currency: str
preview_image_url: str | None
demo_url: str | None
documentation_url: str | None
download_url: str | None
install_count: int
rating: float
rating_count: int
review_count: int
version: str
min_platform_version: str
file_size: int
checksum: str
created_at: str
updated_at: str
published_at: str | None
@dataclass
class TemplateReview:
"""模板评价"""
id: str
template_id: str
user_id: str
user_name: str
rating: int # 1-5
comment: str
is_verified_purchase: bool
helpful_count: int
created_at: str
updated_at: str
@dataclass
class PluginMarketItem:
"""插件市场项目"""
id: str
name: str
description: str
category: PluginCategory
tags: list[str]
author_id: str
author_name: str
status: PluginStatus
price: float
currency: str
pricing_model: str # free, paid, freemium, subscription
preview_image_url: str | None
demo_url: str | None
documentation_url: str | None
repository_url: str | None
download_url: str | None
webhook_url: str | None # 用于插件回调
permissions: list[str] # 需要的权限列表
install_count: int
active_install_count: int
rating: float
rating_count: int
review_count: int
version: str
min_platform_version: str
file_size: int
checksum: str
created_at: str
updated_at: str
published_at: str | None
reviewed_by: str | None
reviewed_at: str | None
review_notes: str | None
@dataclass
class PluginReview:
"""插件评价"""
id: str
plugin_id: str
user_id: str
user_name: str
rating: int
comment: str
is_verified_purchase: bool
helpful_count: int
created_at: str
updated_at: str
@dataclass
class DeveloperProfile:
"""开发者档案"""
id: str
user_id: str
display_name: str
email: str
bio: str | None
website: str | None
github_url: str | None
avatar_url: str | None
status: DeveloperStatus
verification_documents: dict # 认证文档
total_sales: float
total_downloads: int
plugin_count: int
template_count: int
rating_average: float
created_at: str
updated_at: str
verified_at: str | None
@dataclass
class DeveloperRevenue:
"""开发者收益"""
id: str
developer_id: str
item_type: str # plugin, template
item_id: str
item_name: str
sale_amount: float
platform_fee: float
developer_earnings: float
currency: str
buyer_id: str
transaction_id: str
created_at: str
@dataclass
class CodeExample:
"""代码示例"""
id: str
title: str
description: str
language: str
category: str
code: str
explanation: str
tags: list[str]
author_id: str
author_name: str
sdk_id: str | None # 关联的 SDK
api_endpoints: list[str] # 涉及的 API 端点
view_count: int
copy_count: int
rating: float
created_at: str
updated_at: str
@dataclass
class APIDocumentation:
"""API 文档生成记录"""
id: str
version: str
openapi_spec: str # OpenAPI JSON
markdown_content: str
html_content: str
changelog: str
generated_at: str
generated_by: str
@dataclass
class DeveloperPortalConfig:
"""开发者门户配置"""
id: str
name: str
description: str
theme: str
custom_css: str | None
custom_js: str | None
logo_url: str | None
favicon_url: str | None
primary_color: str
secondary_color: str
support_email: str
support_url: str | None
github_url: str | None
discord_url: str | None
api_base_url: str
is_active: bool
created_at: str
updated_at: str
class DeveloperEcosystemManager:
"""开发者生态系统管理主类"""
def __init__(self, db_path: str = DB_PATH) -> None:
self.db_path = db_path
self.platform_fee_rate = 0.30 # 平台抽成比例 30%
def _get_db(self) -> None:
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
# ==================== SDK 发布与管理 ====================
def create_sdk_release(
self,
name: str,
language: SDKLanguage,
version: str,
description: str,
changelog: str,
download_url: str,
documentation_url: str,
repository_url: str,
package_name: str,
min_platform_version: str,
dependencies: list[dict],
file_size: int,
checksum: str,
created_by: str,
) -> SDKRelease:
"""创建 SDK 发布"""
sdk_id = f"sdk_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
sdk = SDKRelease(
id=sdk_id,
name=name,
language=language,
version=version,
description=description,
changelog=changelog,
download_url=download_url,
documentation_url=documentation_url,
repository_url=repository_url,
package_name=package_name,
status=SDKStatus.DRAFT,
min_platform_version=min_platform_version,
dependencies=dependencies,
file_size=file_size,
checksum=checksum,
download_count=0,
created_at=now,
updated_at=now,
published_at=None,
created_by=created_by,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO sdk_releases
(id, name, language, version, description, changelog, download_url,
documentation_url, repository_url, package_name, status, min_platform_version,
dependencies, file_size, checksum, download_count, created_at, updated_at,
published_at, created_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
sdk.id,
sdk.name,
sdk.language.value,
sdk.version,
sdk.description,
sdk.changelog,
sdk.download_url,
sdk.documentation_url,
sdk.repository_url,
sdk.package_name,
sdk.status.value,
sdk.min_platform_version,
json.dumps(sdk.dependencies),
sdk.file_size,
sdk.checksum,
sdk.download_count,
sdk.created_at,
sdk.updated_at,
sdk.published_at,
sdk.created_by,
),
)
conn.commit()
return sdk
def get_sdk_release(self, sdk_id: str) -> SDKRelease | None:
"""获取 SDK 发布详情"""
with self._get_db() as conn:
row = conn.execute("SELECT * FROM sdk_releases WHERE id = ?", (sdk_id,)).fetchone()
if row:
return self._row_to_sdk_release(row)
return None
def list_sdk_releases(
self,
language: SDKLanguage | None = None,
status: SDKStatus | None = None,
search: str | None = None,
) -> list[SDKRelease]:
"""列出 SDK 发布"""
query = "SELECT * FROM sdk_releases WHERE 1 = 1"
params = []
if language:
query += " AND language = ?"
params.append(language.value)
if status:
query += " AND status = ?"
params.append(status.value)
if search:
query += " AND (name LIKE ? OR description LIKE ? OR package_name LIKE ?)"
params.extend([f"%{search}%", f"%{search}%", f"%{search}%"])
query += " ORDER BY created_at DESC"
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_sdk_release(row) for row in rows]
def update_sdk_release(self, sdk_id: str, **kwargs) -> SDKRelease | None:
"""更新 SDK 发布"""
allowed_fields = [
"name",
"description",
"changelog",
"download_url",
"documentation_url",
"repository_url",
"status",
]
updates = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not updates:
return self.get_sdk_release(sdk_id)
updates["updated_at"] = datetime.now().isoformat()
with self._get_db() as conn:
set_clause = ", ".join([f"{k} = ?" for k in updates])
conn.execute(
f"UPDATE sdk_releases SET {set_clause} WHERE id = ?",
list(updates.values()) + [sdk_id],
)
conn.commit()
return self.get_sdk_release(sdk_id)
def publish_sdk_release(self, sdk_id: str) -> SDKRelease | None:
"""发布 SDK"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute(
"""
UPDATE sdk_releases
SET status = ?, published_at = ?, updated_at = ?
WHERE id = ?
""",
(SDKStatus.STABLE.value, now, now, sdk_id),
)
conn.commit()
return self.get_sdk_release(sdk_id)
def increment_sdk_download(self, sdk_id: str) -> None:
"""增加 SDK 下载计数"""
with self._get_db() as conn:
conn.execute(
"""
UPDATE sdk_releases
SET download_count = download_count + 1
WHERE id = ?
""",
(sdk_id,),
)
conn.commit()
def get_sdk_versions(self, sdk_id: str) -> list[SDKVersion]:
"""获取 SDK 版本历史"""
with self._get_db() as conn:
rows = conn.execute(
"SELECT * FROM sdk_versions WHERE sdk_id = ? ORDER BY created_at DESC",
(sdk_id,),
).fetchall()
return [self._row_to_sdk_version(row) for row in rows]
def add_sdk_version(
self,
sdk_id: str,
version: str,
is_lts: bool,
release_notes: str,
download_url: str,
checksum: str,
file_size: int,
) -> SDKVersion:
"""添加 SDK 版本"""
version_id = f"sv_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
with self._get_db() as conn:
# 如果设置为最新版本,取消其他版本的最新标记
if True: # 默认新版本为最新
conn.execute("UPDATE sdk_versions SET is_latest = 0 WHERE sdk_id = ?", (sdk_id,))
conn.execute(
"""
INSERT INTO sdk_versions
(id, sdk_id, version, is_latest, is_lts, release_notes, download_url,
checksum, file_size, download_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
version_id,
sdk_id,
version,
True,
is_lts,
release_notes,
download_url,
checksum,
file_size,
0,
now,
),
)
conn.commit()
return SDKVersion(
id=version_id,
sdk_id=sdk_id,
version=version,
is_latest=True,
is_lts=is_lts,
release_notes=release_notes,
download_url=download_url,
checksum=checksum,
file_size=file_size,
download_count=0,
created_at=now,
)
# ==================== 模板市场 ====================
def create_template(
self,
name: str,
description: str,
category: TemplateCategory,
subcategory: str | None,
tags: list[str],
author_id: str,
author_name: str,
price: float = 0.0,
currency: str = "CNY",
preview_image_url: str | None = None,
demo_url: str | None = None,
documentation_url: str | None = None,
download_url: str | None = None,
version: str = "1.0.0",
min_platform_version: str = "1.0.0",
file_size: int = 0,
checksum: str = "",
) -> TemplateMarketItem:
"""创建模板"""
template_id = f"tpl_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
template = TemplateMarketItem(
id=template_id,
name=name,
description=description,
category=category,
subcategory=subcategory,
tags=tags,
author_id=author_id,
author_name=author_name,
status=TemplateStatus.PENDING,
price=price,
currency=currency,
preview_image_url=preview_image_url,
demo_url=demo_url,
documentation_url=documentation_url,
download_url=download_url,
install_count=0,
rating=0.0,
rating_count=0,
review_count=0,
version=version,
min_platform_version=min_platform_version,
file_size=file_size,
checksum=checksum,
created_at=now,
updated_at=now,
published_at=None,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO template_market
(id, name, description, category, subcategory, tags, author_id, author_name,
status, price, currency, preview_image_url, demo_url, documentation_url,
download_url, install_count, rating, rating_count, review_count, version,
min_platform_version, file_size, checksum, created_at, updated_at,
published_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?)
""",
(
template.id,
template.name,
template.description,
template.category.value,
template.subcategory,
json.dumps(template.tags),
template.author_id,
template.author_name,
template.status.value,
template.price,
template.currency,
template.preview_image_url,
template.demo_url,
template.documentation_url,
template.download_url,
template.install_count,
template.rating,
template.rating_count,
template.review_count,
template.version,
template.min_platform_version,
template.file_size,
template.checksum,
template.created_at,
template.updated_at,
template.published_at,
),
)
conn.commit()
return template
def get_template(self, template_id: str) -> TemplateMarketItem | None:
"""获取模板详情"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM template_market WHERE id = ?",
(template_id,),
).fetchone()
if row:
return self._row_to_template(row)
return None
def list_templates(
self,
category: TemplateCategory | None = None,
status: TemplateStatus | None = None,
search: str | None = None,
author_id: str | None = None,
min_price: float | None = None,
max_price: float | None = None,
sort_by: str = "created_at",
) -> list[TemplateMarketItem]:
"""列出模板"""
query = "SELECT * FROM template_market WHERE 1 = 1"
params = []
if category:
query += " AND category = ?"
params.append(category.value)
if status:
query += " AND status = ?"
params.append(status.value)
if author_id:
query += " AND author_id = ?"
params.append(author_id)
if search:
query += " AND (name LIKE ? OR description LIKE ? OR tags LIKE ?)"
params.extend([f"%{search}%", f"%{search}%", f"%{search}%"])
if min_price is not None:
query += " AND price >= ?"
params.append(min_price)
if max_price is not None:
query += " AND price <= ?"
params.append(max_price)
# 排序
sort_mapping = {
"created_at": "created_at DESC",
"rating": "rating DESC",
"install_count": "install_count DESC",
"price": "price ASC",
"name": "name ASC",
}
query += f" ORDER BY {sort_mapping.get(sort_by, 'created_at DESC')}"
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_template(row) for row in rows]
def approve_template(self, template_id: str, reviewed_by: str) -> TemplateMarketItem | None:
"""审核通过模板"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute(
"""
UPDATE template_market
SET status = ?, updated_at = ?
WHERE id = ?
""",
(TemplateStatus.APPROVED.value, now, template_id),
)
conn.commit()
return self.get_template(template_id)
def publish_template(self, template_id: str) -> TemplateMarketItem | None:
"""发布模板"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute(
"""
UPDATE template_market
SET status = ?, published_at = ?, updated_at = ?
WHERE id = ?
""",
(TemplateStatus.PUBLISHED.value, now, now, template_id),
)
conn.commit()
return self.get_template(template_id)
def reject_template(self, template_id: str, reason: str) -> TemplateMarketItem | None:
"""拒绝模板"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute(
"""
UPDATE template_market
SET status = ?, updated_at = ?
WHERE id = ?
""",
(TemplateStatus.REJECTED.value, now, template_id),
)
conn.commit()
return self.get_template(template_id)
def increment_template_install(self, template_id: str) -> None:
"""增加模板安装计数"""
with self._get_db() as conn:
conn.execute(
"""
UPDATE template_market
SET install_count = install_count + 1
WHERE id = ?
""",
(template_id,),
)
conn.commit()
def add_template_review(
self,
template_id: str,
user_id: str,
user_name: str,
rating: int,
comment: str,
is_verified_purchase: bool = False,
) -> TemplateReview:
"""添加模板评价"""
review_id = f"tr_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
review = TemplateReview(
id=review_id,
template_id=template_id,
user_id=user_id,
user_name=user_name,
rating=rating,
comment=comment,
is_verified_purchase=is_verified_purchase,
helpful_count=0,
created_at=now,
updated_at=now,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO template_reviews
(id, template_id, user_id, user_name, rating, comment,
is_verified_purchase, helpful_count, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review.id,
review.template_id,
review.user_id,
review.user_name,
review.rating,
review.comment,
review.is_verified_purchase,
review.helpful_count,
review.created_at,
review.updated_at,
),
)
# 更新模板评分
self._update_template_rating(conn, template_id)
conn.commit()
return review
def _update_template_rating(self, conn, template_id: str) -> None:
"""更新模板评分"""
row = conn.execute(
"""
SELECT AVG(rating) as avg_rating, COUNT(*) as count
FROM template_reviews
WHERE template_id = ?
""",
(template_id,),
).fetchone()
if row:
conn.execute(
"""
UPDATE template_market
SET rating = ?, rating_count = ?, review_count = ?
WHERE id = ?
""",
(
round(row["avg_rating"], 2) if row["avg_rating"] else 0,
row["count"],
row["count"],
template_id,
),
)
def get_template_reviews(self, template_id: str, limit: int = 50) -> list[TemplateReview]:
"""获取模板评价"""
with self._get_db() as conn:
rows = conn.execute(
"""SELECT * FROM template_reviews
WHERE template_id = ?
ORDER BY created_at DESC
LIMIT ?""",
(template_id, limit),
).fetchall()
return [self._row_to_template_review(row) for row in rows]
# ==================== 插件市场 ====================
def create_plugin(
self,
name: str,
description: str,
category: PluginCategory,
tags: list[str],
author_id: str,
author_name: str,
price: float = 0.0,
currency: str = "CNY",
pricing_model: str = "free",
preview_image_url: str | None = None,
demo_url: str | None = None,
documentation_url: str | None = None,
repository_url: str | None = None,
download_url: str | None = None,
webhook_url: str | None = None,
permissions: list[str] = None,
version: str = "1.0.0",
min_platform_version: str = "1.0.0",
file_size: int = 0,
checksum: str = "",
) -> PluginMarketItem:
"""创建插件"""
plugin_id = f"plg_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
plugin = PluginMarketItem(
id=plugin_id,
name=name,
description=description,
category=category,
tags=tags,
author_id=author_id,
author_name=author_name,
status=PluginStatus.PENDING,
price=price,
currency=currency,
pricing_model=pricing_model,
preview_image_url=preview_image_url,
demo_url=demo_url,
documentation_url=documentation_url,
repository_url=repository_url,
download_url=download_url,
webhook_url=webhook_url,
permissions=permissions or [],
install_count=0,
active_install_count=0,
rating=0.0,
rating_count=0,
review_count=0,
version=version,
min_platform_version=min_platform_version,
file_size=file_size,
checksum=checksum,
created_at=now,
updated_at=now,
published_at=None,
reviewed_by=None,
reviewed_at=None,
review_notes=None,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO plugin_market
(id, name, description, category, tags, author_id, author_name, status,
price, currency, pricing_model, preview_image_url, demo_url,
documentation_url, repository_url, download_url, webhook_url,
permissions, install_count, active_install_count, rating,
rating_count, review_count, version, min_platform_version,
file_size, checksum, created_at, updated_at, published_at,
reviewed_by, reviewed_at, review_notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
plugin.id,
plugin.name,
plugin.description,
plugin.category.value,
json.dumps(plugin.tags),
plugin.author_id,
plugin.author_name,
plugin.status.value,
plugin.price,
plugin.currency,
plugin.pricing_model,
plugin.preview_image_url,
plugin.demo_url,
plugin.documentation_url,
plugin.repository_url,
plugin.download_url,
plugin.webhook_url,
json.dumps(plugin.permissions),
plugin.install_count,
plugin.active_install_count,
plugin.rating,
plugin.rating_count,
plugin.review_count,
plugin.version,
plugin.min_platform_version,
plugin.file_size,
plugin.checksum,
plugin.created_at,
plugin.updated_at,
plugin.published_at,
plugin.reviewed_by,
plugin.reviewed_at,
plugin.review_notes,
),
)
conn.commit()
return plugin
def get_plugin(self, plugin_id: str) -> PluginMarketItem | None:
"""获取插件详情"""
with self._get_db() as conn:
row = conn.execute("SELECT * FROM plugin_market WHERE id = ?", (plugin_id,)).fetchone()
if row:
return self._row_to_plugin(row)
return None
def list_plugins(
self,
category: PluginCategory | None = None,
status: PluginStatus | None = None,
search: str | None = None,
author_id: str | None = None,
sort_by: str = "created_at",
) -> list[PluginMarketItem]:
"""列出插件"""
query = "SELECT * FROM plugin_market WHERE 1 = 1"
params = []
if category:
query += " AND category = ?"
params.append(category.value)
if status:
query += " AND status = ?"
params.append(status.value)
if author_id:
query += " AND author_id = ?"
params.append(author_id)
if search:
query += " AND (name LIKE ? OR description LIKE ? OR tags LIKE ?)"
params.extend([f"%{search}%", f"%{search}%", f"%{search}%"])
sort_mapping = {
"created_at": "created_at DESC",
"rating": "rating DESC",
"install_count": "install_count DESC",
"name": "name ASC",
}
query += f" ORDER BY {sort_mapping.get(sort_by, 'created_at DESC')}"
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_plugin(row) for row in rows]
def review_plugin(
self,
plugin_id: str,
reviewed_by: str,
status: PluginStatus,
notes: str = "",
) -> PluginMarketItem | None:
"""审核插件"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute(
"""
UPDATE plugin_market
SET status = ?, reviewed_by = ?, reviewed_at = ?, review_notes = ?, updated_at = ?
WHERE id = ?
""",
(status.value, reviewed_by, now, notes, now, plugin_id),
)
conn.commit()
return self.get_plugin(plugin_id)
def publish_plugin(self, plugin_id: str) -> PluginMarketItem | None:
"""发布插件"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute(
"""
UPDATE plugin_market
SET status = ?, published_at = ?, updated_at = ?
WHERE id = ?
""",
(PluginStatus.PUBLISHED.value, now, now, plugin_id),
)
conn.commit()
return self.get_plugin(plugin_id)
def increment_plugin_install(self, plugin_id: str, active: bool = True) -> None:
"""增加插件安装计数"""
with self._get_db() as conn:
conn.execute(
"""
UPDATE plugin_market
SET install_count = install_count + 1
WHERE id = ?
""",
(plugin_id,),
)
if active:
conn.execute(
"""
UPDATE plugin_market
SET active_install_count = active_install_count + 1
WHERE id = ?
""",
(plugin_id,),
)
conn.commit()
def add_plugin_review(
self,
plugin_id: str,
user_id: str,
user_name: str,
rating: int,
comment: str,
is_verified_purchase: bool = False,
) -> PluginReview:
"""添加插件评价"""
review_id = f"pr_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
review = PluginReview(
id=review_id,
plugin_id=plugin_id,
user_id=user_id,
user_name=user_name,
rating=rating,
comment=comment,
is_verified_purchase=is_verified_purchase,
helpful_count=0,
created_at=now,
updated_at=now,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO plugin_reviews
(id, plugin_id, user_id, user_name, rating, comment,
is_verified_purchase, helpful_count, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review.id,
review.plugin_id,
review.user_id,
review.user_name,
review.rating,
review.comment,
review.is_verified_purchase,
review.helpful_count,
review.created_at,
review.updated_at,
),
)
self._update_plugin_rating(conn, plugin_id)
conn.commit()
return review
def _update_plugin_rating(self, conn, plugin_id: str) -> None:
"""更新插件评分"""
row = conn.execute(
"""
SELECT AVG(rating) as avg_rating, COUNT(*) as count
FROM plugin_reviews
WHERE plugin_id = ?
""",
(plugin_id,),
).fetchone()
if row:
conn.execute(
"""
UPDATE plugin_market
SET rating = ?, rating_count = ?, review_count = ?
WHERE id = ?
""",
(
round(row["avg_rating"], 2) if row["avg_rating"] else 0,
row["count"],
row["count"],
plugin_id,
),
)
def get_plugin_reviews(self, plugin_id: str, limit: int = 50) -> list[PluginReview]:
"""获取插件评价"""
with self._get_db() as conn:
rows = conn.execute(
"""SELECT * FROM plugin_reviews
WHERE plugin_id = ?
ORDER BY created_at DESC
LIMIT ?""",
(plugin_id, limit),
).fetchall()
return [self._row_to_plugin_review(row) for row in rows]
# ==================== 开发者收益分成 ====================
def record_revenue(
self,
developer_id: str,
item_type: str,
item_id: str,
item_name: str,
sale_amount: float,
currency: str,
buyer_id: str,
transaction_id: str,
) -> DeveloperRevenue:
"""记录开发者收益"""
revenue_id = f"rev_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
platform_fee = sale_amount * self.platform_fee_rate
developer_earnings = sale_amount - platform_fee
revenue = DeveloperRevenue(
id=revenue_id,
developer_id=developer_id,
item_type=item_type,
item_id=item_id,
item_name=item_name,
sale_amount=sale_amount,
platform_fee=platform_fee,
developer_earnings=developer_earnings,
currency=currency,
buyer_id=buyer_id,
transaction_id=transaction_id,
created_at=now,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO developer_revenues
(id, developer_id, item_type, item_id, item_name, sale_amount,
platform_fee, developer_earnings, currency, buyer_id, transaction_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
revenue.id,
revenue.developer_id,
revenue.item_type,
revenue.item_id,
revenue.item_name,
revenue.sale_amount,
revenue.platform_fee,
revenue.developer_earnings,
revenue.currency,
revenue.buyer_id,
revenue.transaction_id,
revenue.created_at,
),
)
# 更新开发者总收入
conn.execute(
"""
UPDATE developer_profiles
SET total_sales = total_sales + ?
WHERE id = ?
""",
(sale_amount, developer_id),
)
conn.commit()
return revenue
def get_developer_revenues(
self,
developer_id: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> list[DeveloperRevenue]:
"""获取开发者收益记录"""
query = "SELECT * FROM developer_revenues WHERE developer_id = ?"
params = [developer_id]
if start_date:
query += " AND created_at >= ?"
params.append(start_date.isoformat())
if end_date:
query += " AND created_at <= ?"
params.append(end_date.isoformat())
query += " ORDER BY created_at DESC"
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_developer_revenue(row) for row in rows]
def get_developer_revenue_summary(self, developer_id: str) -> dict:
"""获取开发者收益汇总"""
with self._get_db() as conn:
row = conn.execute(
"""
SELECT
SUM(sale_amount) as total_sales,
SUM(platform_fee) as total_fees,
SUM(developer_earnings) as total_earnings,
COUNT(*) as transaction_count
FROM developer_revenues
WHERE developer_id = ?
""",
(developer_id,),
).fetchone()
return {
"total_sales": row["total_sales"] or 0,
"total_fees": row["total_fees"] or 0,
"total_earnings": row["total_earnings"] or 0,
"transaction_count": row["transaction_count"] or 0,
"platform_fee_rate": self.platform_fee_rate,
}
# ==================== 开发者认证与管理 ====================
def create_developer_profile(
self,
user_id: str,
display_name: str,
email: str,
bio: str | None = None,
website: str | None = None,
github_url: str | None = None,
avatar_url: str | None = None,
) -> DeveloperProfile:
"""创建开发者档案"""
profile_id = f"dev_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
profile = DeveloperProfile(
id=profile_id,
user_id=user_id,
display_name=display_name,
email=email,
bio=bio,
website=website,
github_url=github_url,
avatar_url=avatar_url,
status=DeveloperStatus.UNVERIFIED,
verification_documents={},
total_sales=0.0,
total_downloads=0,
plugin_count=0,
template_count=0,
rating_average=0.0,
created_at=now,
updated_at=now,
verified_at=None,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO developer_profiles
(id, user_id, display_name, email, bio, website, github_url, avatar_url,
status, verification_documents, total_sales, total_downloads,
plugin_count, template_count, rating_average, created_at, updated_at, verified_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
profile.id,
profile.user_id,
profile.display_name,
profile.email,
profile.bio,
profile.website,
profile.github_url,
profile.avatar_url,
profile.status.value,
json.dumps(profile.verification_documents),
profile.total_sales,
profile.total_downloads,
profile.plugin_count,
profile.template_count,
profile.rating_average,
profile.created_at,
profile.updated_at,
profile.verified_at,
),
)
conn.commit()
return profile
def get_developer_profile(self, developer_id: str) -> DeveloperProfile | None:
"""获取开发者档案"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM developer_profiles WHERE id = ?",
(developer_id,),
).fetchone()
if row:
return self._row_to_developer_profile(row)
return None
def get_developer_profile_by_user(self, user_id: str) -> DeveloperProfile | None:
"""通过用户 ID 获取开发者档案"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM developer_profiles WHERE user_id = ?",
(user_id,),
).fetchone()
if row:
return self._row_to_developer_profile(row)
return None
def verify_developer(
self,
developer_id: str,
status: DeveloperStatus,
) -> DeveloperProfile | None:
"""验证开发者"""
now = datetime.now().isoformat()
with self._get_db() as conn:
conn.execute(
"""
UPDATE developer_profiles
SET status = ?, verified_at = ?, updated_at = ?
WHERE id = ?
""",
(
status.value,
(
now
if status in [DeveloperStatus.VERIFIED, DeveloperStatus.CERTIFIED]
else None
),
now,
developer_id,
),
)
conn.commit()
return self.get_developer_profile(developer_id)
def update_developer_stats(self, developer_id: str) -> None:
"""更新开发者统计信息"""
with self._get_db() as conn:
# 统计插件数量
plugin_row = conn.execute(
"SELECT COUNT(*) as count FROM plugin_market WHERE author_id = ?",
(developer_id,),
).fetchone()
# 统计模板数量
template_row = conn.execute(
"SELECT COUNT(*) as count FROM template_market WHERE author_id = ?",
(developer_id,),
).fetchone()
# 统计总下载量
download_row = conn.execute(
"""
SELECT SUM(install_count) as total FROM (
SELECT install_count FROM plugin_market WHERE author_id = ?
UNION ALL
SELECT install_count FROM template_market WHERE author_id = ?
)
""",
(developer_id, developer_id),
).fetchone()
conn.execute(
"""
UPDATE developer_profiles
SET plugin_count = ?, template_count = ?, total_downloads = ?, updated_at = ?
WHERE id = ?
""",
(
plugin_row["count"],
template_row["count"],
download_row["total"] or 0,
datetime.now().isoformat(),
developer_id,
),
)
conn.commit()
# ==================== 代码示例库 ====================
def create_code_example(
self,
title: str,
description: str,
language: str,
category: str,
code: str,
explanation: str,
tags: list[str],
author_id: str,
author_name: str,
sdk_id: str | None = None,
api_endpoints: list[str] = None,
) -> CodeExample:
"""创建代码示例"""
example_id = f"ex_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
example = CodeExample(
id=example_id,
title=title,
description=description,
language=language,
category=category,
code=code,
explanation=explanation,
tags=tags,
author_id=author_id,
author_name=author_name,
sdk_id=sdk_id,
api_endpoints=api_endpoints or [],
view_count=0,
copy_count=0,
rating=0.0,
created_at=now,
updated_at=now,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO code_examples
(id, title, description, language, category, code, explanation, tags,
author_id, author_name, sdk_id, api_endpoints, view_count, copy_count,
rating, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
example.id,
example.title,
example.description,
example.language,
example.category,
example.code,
example.explanation,
json.dumps(example.tags),
example.author_id,
example.author_name,
example.sdk_id,
json.dumps(example.api_endpoints),
example.view_count,
example.copy_count,
example.rating,
example.created_at,
example.updated_at,
),
)
conn.commit()
return example
def get_code_example(self, example_id: str) -> CodeExample | None:
"""获取代码示例"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM code_examples WHERE id = ?",
(example_id,),
).fetchone()
if row:
return self._row_to_code_example(row)
return None
def list_code_examples(
self,
language: str | None = None,
category: str | None = None,
sdk_id: str | None = None,
search: str | None = None,
) -> list[CodeExample]:
"""列出代码示例"""
query = "SELECT * FROM code_examples WHERE 1 = 1"
params = []
if language:
query += " AND language = ?"
params.append(language)
if category:
query += " AND category = ?"
params.append(category)
if sdk_id:
query += " AND sdk_id = ?"
params.append(sdk_id)
if search:
query += " AND (title LIKE ? OR description LIKE ? OR tags LIKE ?)"
params.extend([f"%{search}%", f"%{search}%", f"%{search}%"])
query += " ORDER BY created_at DESC"
with self._get_db() as conn:
rows = conn.execute(query, params).fetchall()
return [self._row_to_code_example(row) for row in rows]
def increment_example_view(self, example_id: str) -> None:
"""增加代码示例查看计数"""
with self._get_db() as conn:
conn.execute(
"""
UPDATE code_examples
SET view_count = view_count + 1
WHERE id = ?
""",
(example_id,),
)
conn.commit()
def increment_example_copy(self, example_id: str) -> None:
"""增加代码示例复制计数"""
with self._get_db() as conn:
conn.execute(
"""
UPDATE code_examples
SET copy_count = copy_count + 1
WHERE id = ?
""",
(example_id,),
)
conn.commit()
# ==================== API 文档生成 ====================
def create_api_documentation(
self,
version: str,
openapi_spec: str,
markdown_content: str,
html_content: str,
changelog: str,
generated_by: str,
) -> APIDocumentation:
"""创建 API 文档"""
doc_id = f"api_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
doc = APIDocumentation(
id=doc_id,
version=version,
openapi_spec=openapi_spec,
markdown_content=markdown_content,
html_content=html_content,
changelog=changelog,
generated_at=now,
generated_by=generated_by,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO api_documentation
(id, version, openapi_spec, markdown_content, html_content, changelog,
generated_at, generated_by)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
doc.id,
doc.version,
doc.openapi_spec,
doc.markdown_content,
doc.html_content,
doc.changelog,
doc.generated_at,
doc.generated_by,
),
)
conn.commit()
return doc
def get_api_documentation(self, doc_id: str) -> APIDocumentation | None:
"""获取 API 文档"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM api_documentation WHERE id = ?",
(doc_id,),
).fetchone()
if row:
return self._row_to_api_documentation(row)
return None
def get_latest_api_documentation(self) -> APIDocumentation | None:
"""获取最新 API 文档"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM api_documentation ORDER BY generated_at DESC LIMIT 1",
).fetchone()
if row:
return self._row_to_api_documentation(row)
return None
# ==================== 开发者门户 ====================
def create_portal_config(
self,
name: str,
description: str,
theme: str = "default",
custom_css: str | None = None,
custom_js: str | None = None,
logo_url: str | None = None,
favicon_url: str | None = None,
primary_color: str = "#1890ff",
secondary_color: str = "#52c41a",
support_email: str = "support@insightflow.io",
support_url: str | None = None,
github_url: str | None = None,
discord_url: str | None = None,
api_base_url: str = "https://api.insightflow.io",
) -> DeveloperPortalConfig:
"""创建开发者门户配置"""
config_id = f"portal_{uuid.uuid4().hex[:16]}"
now = datetime.now().isoformat()
config = DeveloperPortalConfig(
id=config_id,
name=name,
description=description,
theme=theme,
custom_css=custom_css,
custom_js=custom_js,
logo_url=logo_url,
favicon_url=favicon_url,
primary_color=primary_color,
secondary_color=secondary_color,
support_email=support_email,
support_url=support_url,
github_url=github_url,
discord_url=discord_url,
api_base_url=api_base_url,
is_active=True,
created_at=now,
updated_at=now,
)
with self._get_db() as conn:
conn.execute(
"""
INSERT INTO developer_portal_configs
(id, name, description, theme, custom_css, custom_js, logo_url, favicon_url,
primary_color, secondary_color, support_email, support_url, github_url,
discord_url, api_base_url, is_active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
config.id,
config.name,
config.description,
config.theme,
config.custom_css,
config.custom_js,
config.logo_url,
config.favicon_url,
config.primary_color,
config.secondary_color,
config.support_email,
config.support_url,
config.github_url,
config.discord_url,
config.api_base_url,
config.is_active,
config.created_at,
config.updated_at,
),
)
conn.commit()
return config
def get_portal_config(self, config_id: str) -> DeveloperPortalConfig | None:
"""获取开发者门户配置"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM developer_portal_configs WHERE id = ?",
(config_id,),
).fetchone()
if row:
return self._row_to_portal_config(row)
return None
def get_active_portal_config(self) -> DeveloperPortalConfig | None:
"""获取活跃的开发者门户配置"""
with self._get_db() as conn:
row = conn.execute(
"SELECT * FROM developer_portal_configs WHERE is_active = 1 LIMIT 1",
).fetchone()
if row:
return self._row_to_portal_config(row)
return None
# ==================== 辅助方法 ====================
def _row_to_sdk_release(self, row) -> SDKRelease:
"""将数据库行转换为 SDKRelease"""
return SDKRelease(
id=row["id"],
name=row["name"],
language=SDKLanguage(row["language"]),
version=row["version"],
description=row["description"],
changelog=row["changelog"],
download_url=row["download_url"],
documentation_url=row["documentation_url"],
repository_url=row["repository_url"],
package_name=row["package_name"],
status=SDKStatus(row["status"]),
min_platform_version=row["min_platform_version"],
dependencies=json.loads(row["dependencies"]),
file_size=row["file_size"],
checksum=row["checksum"],
download_count=row["download_count"],
created_at=row["created_at"],
updated_at=row["updated_at"],
published_at=row["published_at"],
created_by=row["created_by"],
)
def _row_to_sdk_version(self, row) -> SDKVersion:
"""将数据库行转换为 SDKVersion"""
return SDKVersion(
id=row["id"],
sdk_id=row["sdk_id"],
version=row["version"],
is_latest=bool(row["is_latest"]),
is_lts=bool(row["is_lts"]),
release_notes=row["release_notes"],
download_url=row["download_url"],
checksum=row["checksum"],
file_size=row["file_size"],
download_count=row["download_count"],
created_at=row["created_at"],
)
def _row_to_template(self, row) -> TemplateMarketItem:
"""将数据库行转换为 TemplateMarketItem"""
return TemplateMarketItem(
id=row["id"],
name=row["name"],
description=row["description"],
category=TemplateCategory(row["category"]),
subcategory=row["subcategory"],
tags=json.loads(row["tags"]),
author_id=row["author_id"],
author_name=row["author_name"],
status=TemplateStatus(row["status"]),
price=row["price"],
currency=row["currency"],
preview_image_url=row["preview_image_url"],
demo_url=row["demo_url"],
documentation_url=row["documentation_url"],
download_url=row["download_url"],
install_count=row["install_count"],
rating=row["rating"],
rating_count=row["rating_count"],
review_count=row["review_count"],
version=row["version"],
min_platform_version=row["min_platform_version"],
file_size=row["file_size"],
checksum=row["checksum"],
created_at=row["created_at"],
updated_at=row["updated_at"],
published_at=row["published_at"],
)
def _row_to_template_review(self, row) -> TemplateReview:
"""将数据库行转换为 TemplateReview"""
return TemplateReview(
id=row["id"],
template_id=row["template_id"],
user_id=row["user_id"],
user_name=row["user_name"],
rating=row["rating"],
comment=row["comment"],
is_verified_purchase=bool(row["is_verified_purchase"]),
helpful_count=row["helpful_count"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)
def _row_to_plugin(self, row) -> PluginMarketItem:
"""将数据库行转换为 PluginMarketItem"""
return PluginMarketItem(
id=row["id"],
name=row["name"],
description=row["description"],
category=PluginCategory(row["category"]),
tags=json.loads(row["tags"]),
author_id=row["author_id"],
author_name=row["author_name"],
status=PluginStatus(row["status"]),
price=row["price"],
currency=row["currency"],
pricing_model=row["pricing_model"],
preview_image_url=row["preview_image_url"],
demo_url=row["demo_url"],
documentation_url=row["documentation_url"],
repository_url=row["repository_url"],
download_url=row["download_url"],
webhook_url=row["webhook_url"],
permissions=json.loads(row["permissions"]),
install_count=row["install_count"],
active_install_count=row["active_install_count"],
rating=row["rating"],
rating_count=row["rating_count"],
review_count=row["review_count"],
version=row["version"],
min_platform_version=row["min_platform_version"],
file_size=row["file_size"],
checksum=row["checksum"],
created_at=row["created_at"],
updated_at=row["updated_at"],
published_at=row["published_at"],
reviewed_by=row["reviewed_by"],
reviewed_at=row["reviewed_at"],
review_notes=row["review_notes"],
)
def _row_to_plugin_review(self, row) -> PluginReview:
"""将数据库行转换为 PluginReview"""
return PluginReview(
id=row["id"],
plugin_id=row["plugin_id"],
user_id=row["user_id"],
user_name=row["user_name"],
rating=row["rating"],
comment=row["comment"],
is_verified_purchase=bool(row["is_verified_purchase"]),
helpful_count=row["helpful_count"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)
def _row_to_developer_profile(self, row) -> DeveloperProfile:
"""将数据库行转换为 DeveloperProfile"""
return DeveloperProfile(
id=row["id"],
user_id=row["user_id"],
display_name=row["display_name"],
email=row["email"],
bio=row["bio"],
website=row["website"],
github_url=row["github_url"],
avatar_url=row["avatar_url"],
status=DeveloperStatus(row["status"]),
verification_documents=json.loads(row["verification_documents"]),
total_sales=row["total_sales"],
total_downloads=row["total_downloads"],
plugin_count=row["plugin_count"],
template_count=row["template_count"],
rating_average=row["rating_average"],
created_at=row["created_at"],
updated_at=row["updated_at"],
verified_at=row["verified_at"],
)
def _row_to_developer_revenue(self, row) -> DeveloperRevenue:
"""将数据库行转换为 DeveloperRevenue"""
return DeveloperRevenue(
id=row["id"],
developer_id=row["developer_id"],
item_type=row["item_type"],
item_id=row["item_id"],
item_name=row["item_name"],
sale_amount=row["sale_amount"],
platform_fee=row["platform_fee"],
developer_earnings=row["developer_earnings"],
currency=row["currency"],
buyer_id=row["buyer_id"],
transaction_id=row["transaction_id"],
created_at=row["created_at"],
)
def _row_to_code_example(self, row) -> CodeExample:
"""将数据库行转换为 CodeExample"""
return CodeExample(
id=row["id"],
title=row["title"],
description=row["description"],
language=row["language"],
category=row["category"],
code=row["code"],
explanation=row["explanation"],
tags=json.loads(row["tags"]),
author_id=row["author_id"],
author_name=row["author_name"],
sdk_id=row["sdk_id"],
api_endpoints=json.loads(row["api_endpoints"]),
view_count=row["view_count"],
copy_count=row["copy_count"],
rating=row["rating"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)
def _row_to_api_documentation(self, row) -> APIDocumentation:
"""将数据库行转换为 APIDocumentation"""
return APIDocumentation(
id=row["id"],
version=row["version"],
openapi_spec=row["openapi_spec"],
markdown_content=row["markdown_content"],
html_content=row["html_content"],
changelog=row["changelog"],
generated_at=row["generated_at"],
generated_by=row["generated_by"],
)
def _row_to_portal_config(self, row) -> DeveloperPortalConfig:
"""将数据库行转换为 DeveloperPortalConfig"""
return DeveloperPortalConfig(
id=row["id"],
name=row["name"],
description=row["description"],
theme=row["theme"],
custom_css=row["custom_css"],
custom_js=row["custom_js"],
logo_url=row["logo_url"],
favicon_url=row["favicon_url"],
primary_color=row["primary_color"],
secondary_color=row["secondary_color"],
support_email=row["support_email"],
support_url=row["support_url"],
github_url=row["github_url"],
discord_url=row["discord_url"],
api_base_url=row["api_base_url"],
is_active=bool(row["is_active"]),
created_at=row["created_at"],
updated_at=row["updated_at"],
)
# Singleton instance
_developer_ecosystem_manager = None
def get_developer_ecosystem_manager() -> DeveloperEcosystemManager:
"""获取开发者生态系统管理器单例"""
global _developer_ecosystem_manager
if _developer_ecosystem_manager is None:
_developer_ecosystem_manager = DeveloperEcosystemManager()
return _developer_ecosystem_manager