Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .account import create_tenant, reset_email, reset_password
from .plugin import (
backfill_plugin_auto_upgrade,
extract_plugins,
extract_unique_plugins,
install_plugins,
Expand Down Expand Up @@ -37,6 +38,7 @@
__all__ = [
"add_qdrant_index",
"archive_workflow_runs",
"backfill_plugin_auto_upgrade",
"clean_expired_messages",
"clean_workflow_runs",
"cleanup_orphaned_draft_variables",
Expand Down
109 changes: 108 additions & 1 deletion api/commands/plugin.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import logging
import time
from typing import Any, cast

import click
from pydantic import TypeAdapter
from sqlalchemy import delete, select
from sqlalchemy import delete, func, select
from sqlalchemy.engine import CursorResult

from configs import dify_config
Expand All @@ -14,11 +15,13 @@
from core.tools.utils.system_encryption import encrypt_system_params
from extensions.ext_database import db
from models import Tenant
from models.account import TenantPluginAutoUpgradeStrategy
from models.oauth import DatasourceOauthParamConfig, DatasourceProvider
from models.provider_ids import DatasourceProviderID, ToolProviderID
from models.source import DataSourceApiKeyAuthBinding, DataSourceOauthBinding
from models.tools import ToolOAuthSystemClient
from services.plugin.data_migration import PluginDataMigration
from services.plugin.plugin_auto_upgrade_service import PluginAutoUpgradeService
from services.plugin.plugin_migration import PluginMigration
from services.plugin.plugin_service import PluginService

Expand Down Expand Up @@ -402,6 +405,110 @@ def migrate_data_for_plugin():
click.echo(click.style("Migrate data for plugin completed.", fg="green"))


def _candidate_auto_upgrade_strategy_tenant_ids_stmt(limit: int | None = None):
category_count = len(TenantPluginAutoUpgradeStrategy.PluginCategory)
stmt = (
select(TenantPluginAutoUpgradeStrategy.tenant_id)
.group_by(TenantPluginAutoUpgradeStrategy.tenant_id)
.having(func.count(func.distinct(TenantPluginAutoUpgradeStrategy.category)) < category_count)
.order_by(TenantPluginAutoUpgradeStrategy.tenant_id)
)

if limit is not None:
stmt = stmt.limit(limit)

return stmt


def _count_auto_upgrade_strategy_tenant_ids(limit: int | None) -> int:
candidate_stmt = _candidate_auto_upgrade_strategy_tenant_ids_stmt(limit).subquery()
return db.session.scalar(select(func.count()).select_from(candidate_stmt)) or 0


def _iter_auto_upgrade_strategy_tenant_ids(limit: int | None):
stmt = _candidate_auto_upgrade_strategy_tenant_ids_stmt(limit).execution_options(yield_per=1000)
yield from db.session.scalars(stmt)


@click.command(
"backfill-plugin-auto-upgrade",
help="Backfill category-scoped plugin auto-upgrade strategies and normalize plugin lists.",
)
@click.option("--tenant-id", multiple=True, help="Tenant ID to backfill. Can be passed multiple times.")
@click.option("--limit", type=int, default=None, help="Maximum number of candidate tenants to process.")
@click.option("--batch-size", type=int, default=500, show_default=True, help="Progress reporting batch size.")
@click.option("--dry-run", is_flag=True, help="Only print candidate tenant count.")
def backfill_plugin_auto_upgrade(
tenant_id: tuple[str, ...],
limit: int | None,
batch_size: int,
dry_run: bool,
):
"""
Backfill historical auto-upgrade strategies after the category column exists.

Missing category rows are created from the tenant's tool/default row. Pure default
strategies become latest for model plugins and fix-only for all other categories.
Tenants with include/exclude plugin IDs are split
by installed plugin category using plugin daemon metadata.
"""
start_at = time.perf_counter()
candidate_count = len(tenant_id) if tenant_id else _count_auto_upgrade_strategy_tenant_ids(limit)
click.echo(click.style(f"Found {candidate_count} candidate tenants.", fg="yellow"))

if dry_run:
elapsed = time.perf_counter() - start_at
click.echo(click.style(f"Dry run completed. elapsed={elapsed:.2f}s", fg="green"))
return

tenant_ids = list(tenant_id) if tenant_id else _iter_auto_upgrade_strategy_tenant_ids(limit)

backfilled_count = 0
created_count = 0
normalized_count = 0
skipped_count = 0
failed_count = 0
for index, current_tenant_id in enumerate(tenant_ids, start=1):
try:
result = PluginAutoUpgradeService.backfill_strategy_categories(
current_tenant_id,
)
except Exception as e:
failed_count += 1
click.echo(click.style(f"Failed tenant {current_tenant_id}: {str(e)}", fg="red"))
continue

if result.created_count > 0:
backfilled_count += 1
created_count += result.created_count
elif not result.normalized:
skipped_count += 1
if result.normalized:
normalized_count += 1

if batch_size > 0 and index % batch_size == 0:
click.echo(
click.style(
f"Processed {index}/{candidate_count} tenants. "
f"backfilled={backfilled_count}, created_rows={created_count}, "
f"normalized={normalized_count}, skipped={skipped_count}, failed={failed_count}, "
f"elapsed={time.perf_counter() - start_at:.2f}s",
fg="yellow",
)
)

elapsed = time.perf_counter() - start_at
click.echo(
click.style(
f"Backfill plugin auto-upgrade strategy categories completed. "
f"backfilled={backfilled_count}, created_rows={created_count}, "
f"normalized={normalized_count}, skipped={skipped_count}, failed={failed_count}, "
f"elapsed={elapsed:.2f}s",
fg="green",
)
)


@click.command("extract-plugins", help="Extract plugins.")
@click.option("--output_file", prompt=True, help="The file to store the extracted plugins.", default="plugins.jsonl")
@click.option("--workers", prompt=True, help="The number of workers to extract plugins.", default=10)
Expand Down
Loading
Loading