Skip to content
Draft
1 change: 1 addition & 0 deletions api/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""External service client packages."""
74 changes: 74 additions & 0 deletions api/clients/agent_backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""API-side integration boundary for the Dify Agent backend.

Public wire DTOs come from ``dify_agent.protocol``. This package only contains
API adapters: request building from Dify product concepts, a thin client wrapper,
event adaptation for future workflow integration, and deterministic fakes.
"""

from clients.agent_backend.client import AgentBackendRunClient, DifyAgentBackendRunClient
from clients.agent_backend.errors import (
AgentBackendError,
AgentBackendHTTPError,
AgentBackendRequestBuildError,
AgentBackendRunFailedError,
AgentBackendStreamError,
AgentBackendTransportError,
AgentBackendValidationError,
)
from clients.agent_backend.event_adapter import (
AgentBackendInternalEvent,
AgentBackendInternalEventType,
AgentBackendRunCancelledInternalEvent,
AgentBackendRunEventAdapter,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunStartedInternalEvent,
AgentBackendRunSucceededInternalEvent,
AgentBackendStreamInternalEvent,
)
from clients.agent_backend.factory import create_agent_backend_run_client
from clients.agent_backend.fake_client import FakeAgentBackendRunClient, FakeAgentBackendScenario
from clients.agent_backend.request_builder import (
AGENT_SOUL_PROMPT_LAYER_ID,
DIFY_PLUGIN_CONTEXT_LAYER_ID,
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
AgentBackendModelConfig,
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
redact_for_agent_backend_log,
)

__all__ = [
"AGENT_SOUL_PROMPT_LAYER_ID",
"DIFY_PLUGIN_CONTEXT_LAYER_ID",
"WORKFLOW_NODE_JOB_PROMPT_LAYER_ID",
"WORKFLOW_USER_PROMPT_LAYER_ID",
"AgentBackendError",
"AgentBackendHTTPError",
"AgentBackendInternalEvent",
"AgentBackendInternalEventType",
"AgentBackendModelConfig",
"AgentBackendOutputConfig",
"AgentBackendRequestBuildError",
"AgentBackendRunCancelledInternalEvent",
"AgentBackendRunClient",
"AgentBackendRunEventAdapter",
"AgentBackendRunFailedError",
"AgentBackendRunFailedInternalEvent",
"AgentBackendRunPausedInternalEvent",
"AgentBackendRunRequestBuilder",
"AgentBackendRunStartedInternalEvent",
"AgentBackendRunSucceededInternalEvent",
"AgentBackendStreamError",
"AgentBackendStreamInternalEvent",
"AgentBackendTransportError",
"AgentBackendValidationError",
"AgentBackendWorkflowNodeRunInput",
"DifyAgentBackendRunClient",
"FakeAgentBackendRunClient",
"FakeAgentBackendScenario",
"create_agent_backend_run_client",
"redact_for_agent_backend_log",
]
126 changes: 126 additions & 0 deletions api/clients/agent_backend/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Synchronous API-side wrapper around the public ``dify-agent`` client.

``dify-agent`` owns the cross-service DTOs and HTTP/SSE implementation. The API
backend keeps this thin wrapper so workflow code depends on a local protocol,
gets API-native errors, and can use a deterministic fake in tests without
creating another wire contract.
"""

from __future__ import annotations

from collections.abc import Iterator
from typing import Protocol

from dify_agent.client import (
DifyAgentClientError,
DifyAgentHTTPError,
DifyAgentStreamError,
DifyAgentTimeoutError,
DifyAgentValidationError,
)
from dify_agent.protocol import (
CancelRunRequest,
CancelRunResponse,
CreateRunRequest,
CreateRunResponse,
RunEvent,
RunStatusResponse,
)

from clients.agent_backend.errors import (
AgentBackendError,
AgentBackendHTTPError,
AgentBackendStreamError,
AgentBackendTransportError,
AgentBackendValidationError,
)


class AgentBackendRunClient(Protocol):
"""Local boundary used by API workflow integrations to run Agent backend jobs."""

def create_run(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create one Agent backend run and return its accepted status."""

def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Request explicit cancellation for one Agent backend run."""

def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
"""Yield public ``dify-agent`` run events in stream order."""

def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
"""Wait for a run to reach a terminal status and return that status."""


class _DifyAgentSyncClient(Protocol):
"""Subset of ``dify_agent.client.Client`` used by the API wrapper."""

def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create one run synchronously."""

def cancel_run_sync(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Cancel one run synchronously."""

def stream_events_sync(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
"""Stream run events synchronously."""

def wait_run_sync(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
"""Wait for terminal run status synchronously."""


class DifyAgentBackendRunClient:
"""Adapter from API sync call sites to ``dify_agent.client.Client`` sync methods."""

client: _DifyAgentSyncClient

def __init__(self, client: _DifyAgentSyncClient) -> None:
self.client = client

def create_run(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create one run through ``POST /runs`` and normalize client exceptions."""
try:
return self.client.create_run_sync(request)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc

def cancel_run(self, run_id: str, request: CancelRunRequest | None = None) -> CancelRunResponse:
"""Cancel one run through ``POST /runs/{run_id}/cancel`` and normalize exceptions."""
try:
return self.client.cancel_run_sync(run_id, request=request)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc

def stream_events(self, run_id: str, *, after: str | None = None) -> Iterator[RunEvent]:
"""Stream run events from ``/events/sse`` with the wrapped client's reconnect policy."""
try:
yield from self.client.stream_events_sync(run_id, after=after)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc

def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
"""Poll run status until terminal state and normalize client exceptions."""
try:
return self.client.wait_run_sync(run_id, timeout_seconds=timeout_seconds)
except Exception as exc:
raise _normalize_dify_agent_error(exc) from exc


def _normalize_dify_agent_error(exc: Exception) -> AgentBackendError:
"""Map public ``dify-agent`` client errors to API-side integration errors."""
if isinstance(exc, DifyAgentValidationError):
return AgentBackendValidationError("Agent backend request or response validation failed", detail=exc.detail)
if isinstance(exc, DifyAgentHTTPError):
return AgentBackendHTTPError(
f"Agent backend HTTP {exc.status_code}",
status_code=exc.status_code,
detail=exc.detail,
)
if isinstance(exc, DifyAgentTimeoutError):
return AgentBackendTransportError(str(exc))
if isinstance(exc, DifyAgentStreamError):
return AgentBackendStreamError(str(exc))
if isinstance(exc, DifyAgentClientError):
return AgentBackendTransportError(str(exc))
if isinstance(exc, AgentBackendError):
return exc
return AgentBackendTransportError(str(exc) or type(exc).__name__)
61 changes: 61 additions & 0 deletions api/clients/agent_backend/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""API-side errors for the Dify Agent backend integration.

The wire protocol and low-level HTTP behaviour are owned by ``dify-agent``.
This module only normalizes those client errors into the API backend's boundary
so workflow/node code does not depend directly on transport-specific exception
classes.
"""

from __future__ import annotations

from typing import Any


class AgentBackendError(Exception):
"""Base error for API-side Agent backend integration failures."""


class AgentBackendRequestBuildError(AgentBackendError):
"""Raised when Dify product/workflow state cannot be mapped to a run request."""


class AgentBackendTransportError(AgentBackendError):
"""Raised for timeout or request-level failures talking to Agent backend."""


class AgentBackendHTTPError(AgentBackendTransportError):
"""Raised for Agent backend HTTP errors after status/detail normalization."""

status_code: int
detail: object

def __init__(self, message: str, *, status_code: int, detail: object) -> None:
self.status_code = status_code
self.detail = detail
super().__init__(message)


class AgentBackendValidationError(AgentBackendError):
"""Raised for local request validation or Agent backend 422 responses."""

detail: object

def __init__(self, message: str, *, detail: object) -> None:
self.detail = detail
super().__init__(message)


class AgentBackendStreamError(AgentBackendError):
"""Raised when an Agent backend event stream is malformed or exhausted."""


class AgentBackendRunFailedError(AgentBackendError):
"""Raised by callers that choose to translate a terminal failed run into an exception."""

run_id: str
detail: Any

def __init__(self, run_id: str, detail: Any) -> None:
self.run_id = run_id
self.detail = detail
super().__init__(f"Agent backend run failed: {run_id}")
Loading
Loading