Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion knip.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"$schema": "https://unpkg.com/knip@5/schema.json",
"tags": ["-@public"],
"entry": ["src/agent/worker.ts"],
"entry": ["src/agent/editorWorker.ts", "src/agent/runViewWorker.ts"],
"ignore": [
"src/api/**",
"src/components/ui/**",
Expand Down
66 changes: 66 additions & 0 deletions src/agent/agents/dispatcherRuntime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Shared dispatcher runtime for the in-browser AI assistant.
*
* Owns the per-thread `MemorySession` map and the `invoke` loop. The
* concrete dispatcher Agent is built per turn by a page-specific
* `buildAgent` (Editor vs Run View) because the underlying sub-agents
* close over the per-turn `AgentSession` (bridge, recent runs, status
* emitter).
*/
import { type Agent, MemorySession, run } from "@openai/agents";

import type { AiProviderConfig } from "@/types/aiProvider";

import type { AgentSession } from "../session";

interface DispatcherInvokeParams {
message: string;
threadId: string;
aiConfig: AiProviderConfig;
session: AgentSession;
}

interface DispatcherInvokeResult {
answer: string;
threadId: string;
}

export interface TangleDispatcher {
invoke(params: DispatcherInvokeParams): Promise<DispatcherInvokeResult>;
dispose(): void;
}

export type BuildDispatcherAgent = (session: AgentSession) => Promise<Agent>;

export function createDispatcherRuntime(
buildAgent: BuildDispatcherAgent,
): TangleDispatcher {
const sessions = new Map<string, MemorySession>();

function getOrCreateSessionMemory(threadId: string): MemorySession {
const existing = sessions.get(threadId);
if (existing) return existing;
const created = new MemorySession({ sessionId: threadId });
sessions.set(threadId, created);
return created;
}

return {
async invoke(params) {
params.session.proxyClient.ensureConfigured(params.aiConfig);
const sessionMemory = getOrCreateSessionMemory(params.threadId);
const agent = await buildAgent(params.session);
const result = await run(agent, params.message, {
session: sessionMemory,
});
const answer =
typeof result.finalOutput === "string"
? result.finalOutput
: JSON.stringify(result.finalOutput ?? "");
return { answer, threadId: params.threadId };
},
dispose() {
sessions.clear();
},
};
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,34 @@
/**
* Top-level dispatcher agent for the in-browser AI assistant.
* Editor dispatcher agent for the in-browser AI assistant.
*
* The dispatcher is the only top-level agent in the system. It owns
* orchestration: it never edits the spec or fetches runs directly,
* instead it calls specialist sub-agents that are exposed as *tools*
* via the `Agent.asTool(...)` adapter. The dispatcher's own LLM loop
* is what chains those tool calls together for multi-step requests
* (e.g. "investigate AND fix" needs `ask_debug_assistant` followed by
* The dispatcher is the only top-level agent in the Editor worker. It
* owns orchestration: it never edits the spec or fetches runs directly,
* instead it calls specialist sub-agents that are exposed as *tools* via
* the `Agent.asTool(...)` adapter. The dispatcher's own LLM loop is what
* chains those tool calls together for multi-step requests (e.g.
* "investigate AND fix" needs `ask_debug_assistant` followed by
* `ask_pipeline_repair`).
*
* The dispatcher Agent and its specialist tool wrappers are rebuilt
* on every turn because the underlying sub-agents close over the
* per-turn `AgentSession` (bridge, recent runs, status emitter).
* The dispatcher Agent and its specialist tool wrappers are rebuilt on
* every turn because the underlying sub-agents close over the per-turn
* `AgentSession` (bridge, recent runs, status emitter).
*/
import { Agent, MemorySession, run } from "@openai/agents";

import type { AiProviderConfig } from "@/types/aiProvider";
import { Agent } from "@openai/agents";

import { getAgentModelConfig } from "../config";
import { attachObservabilityHooks } from "../middleware/observability";
import dispatcherPrompt from "../prompts/dispatcher.md?raw";
import type { AgentSession } from "../session";
import {
createDispatcherRuntime,
type TangleDispatcher,
} from "./dispatcherRuntime";
import { createDebugAssistantAgent } from "./subagents/debugAssistant";
import { createGeneralHelpAgent } from "./subagents/generalHelp";
import { createPipelineArchitectAgent } from "./subagents/pipelineArchitect";
import { createPipelineRepairAgent } from "./subagents/pipelineRepair";

interface DispatcherInvokeParams {
message: string;
threadId: string;
aiConfig: AiProviderConfig;
session: AgentSession;
}

interface DispatcherInvokeResult {
answer: string;
threadId: string;
}

export interface TangleDispatcher {
invoke(params: DispatcherInvokeParams): Promise<DispatcherInvokeResult>;
dispose(): void;
}

async function createDispatcherAgent(session: AgentSession): Promise<Agent> {
async function buildEditorAgent(session: AgentSession): Promise<Agent> {
const generalHelp = createGeneralHelpAgent(session);
const pipelineRepair = createPipelineRepairAgent(session);
const pipelineArchitect = await createPipelineArchitectAgent(session);
Expand Down Expand Up @@ -80,33 +65,6 @@ async function createDispatcherAgent(session: AgentSession): Promise<Agent> {
return agent;
}

export function createDispatcher(): TangleDispatcher {
const sessions = new Map<string, MemorySession>();

function getOrCreateSessionMemory(threadId: string): MemorySession {
const existing = sessions.get(threadId);
if (existing) return existing;
const created = new MemorySession({ sessionId: threadId });
sessions.set(threadId, created);
return created;
}

return {
async invoke(params) {
params.session.proxyClient.ensureConfigured(params.aiConfig);
const sessionMemory = getOrCreateSessionMemory(params.threadId);
const agent = await createDispatcherAgent(params.session);
const result = await run(agent, params.message, {
session: sessionMemory,
});
const answer =
typeof result.finalOutput === "string"
? result.finalOutput
: JSON.stringify(result.finalOutput ?? "");
return { answer, threadId: params.threadId };
},
dispose() {
sessions.clear();
},
};
export function createEditorDispatcher(): TangleDispatcher {
return createDispatcherRuntime(buildEditorAgent);
}
68 changes: 68 additions & 0 deletions src/agent/agents/runViewDispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Run View dispatcher agent for the in-browser AI assistant.
*
* Read-only by design: the Run View worker inspects a completed (or
* running) pipeline run, so the dispatcher only exposes the non-mutating
* specialists — `ask_general_help` and `ask_debug_assistant`. The active
* run is baked into the instructions from `session.context` so the agent
* is immediately aware of which run it works with, without an extra tool
* call or the user restating the run id.
*
* The dispatcher Agent and its specialist tool wrappers are rebuilt on
* every turn because the underlying sub-agents close over the per-turn
* `AgentSession` (bridge, recent runs, status emitter).
*/
import { Agent } from "@openai/agents";

import { getAgentModelConfig } from "../config";
import { attachObservabilityHooks } from "../middleware/observability";
import runViewDispatcherPrompt from "../prompts/runViewDispatcher.md?raw";
import type { AgentSession } from "../session";
import type { AgentContext } from "../types";
import {
createDispatcherRuntime,
type TangleDispatcher,
} from "./dispatcherRuntime";
import { createDebugAssistantAgent } from "./subagents/debugAssistant";
import { createGeneralHelpAgent } from "./subagents/generalHelp";

function formatCurrentRunSection(context: AgentContext): string {
if (context.mode !== "runView") {
return "## Current run\n\nNo run context available.";
}
const subgraph = context.subgraphExecutionId
? `\n- subgraph execution: ${context.subgraphExecutionId}`
: "";
return `## Current run\n\nThe user is viewing run ${context.runId}. Treat this as "the current run" / "this run" unless they name a different run id.${subgraph}`;
}

async function buildRunViewAgent(session: AgentSession): Promise<Agent> {
const generalHelp = createGeneralHelpAgent(session);
const debugAssistant = createDebugAssistantAgent(session);

const instructions = `${runViewDispatcherPrompt}\n\n${formatCurrentRunSection(session.context)}`;

const agent = new Agent({
name: "tangle-run-view-dispatcher",
...getAgentModelConfig(session.aiConfig),
instructions,
tools: [
generalHelp.asTool({
toolName: "ask_general_help",
toolDescription:
"Ask the general-help specialist a question about Tangle concepts, features, how things work, best practices, getting started, or documentation lookups. Input: the user's question phrased as a clear, standalone question.",
}),
debugAssistant.asTool({
toolName: "ask_debug_assistant",
toolDescription:
"Ask the debug-assistant specialist to inspect or explain a pipeline run from execution details, container state, and logs. Read-only — cannot edit the spec or submit runs. Input: a clear question that names the run id, e.g. 'Explain what run 12345 did and its outcome.' or 'Why did run 12345 fail?'.",
}),
],
});
attachObservabilityHooks(agent, session.emitStatus);
return agent;
}

export function createRunViewDispatcher(): TangleDispatcher {
return createDispatcherRuntime(buildRunViewAgent);
}
46 changes: 23 additions & 23 deletions src/agent/worker.ts → src/agent/createWorkerApi.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
/**
* Web Worker entry point for the in-browser agent.
* Shared Web Worker API factory for the in-browser agent.
*
* A placeholder `ask()` that echoes the user's message — it
* proves the bundling, lazy-spawn, and Comlink round-trip are working
* end-to-end before we wire the LLM and the tool bridge.
* Both the Editor and Run View workers reuse this factory; they differ
* only in the {@link TangleDispatcher} they are built with. The page that
* spawns the worker bakes its {@link AgentContext} into `init()` so the
* dispatcher (and its sub-agents) are immediately aware of where they run
* and which run they inspect.
*/
// Must come first: installs the `globalThis.process` stub that
// `@openai/agents-core` needs before any SDK module is evaluated.
import "./processPolyfill";

import * as Comlink from "comlink";

import type { AiProviderConfig } from "@/types/aiProvider";

import {
createDispatcher,
type TangleDispatcher,
} from "./agents/tangleDispatcher";
import type { TangleDispatcher } from "./agents/dispatcherRuntime";
import { ProxyClient } from "./config";
import { createSession, type RecentPipelineRun } from "./session";
import { SkillsLoader } from "./skills/loader";
import type { ToolBridgeApi } from "./toolBridgeApi";
import type { AgentResponse, StatusCallback } from "./types";
import type { AgentContext, AgentResponse, StatusCallback } from "./types";

export interface AskParams {
interface AskParams {
message: string;
threadId?: string;
recentRuns?: RecentPipelineRun[];
aiConfig: AiProviderConfig;
}

export interface AgentWorkerApi {
init(bridge: ToolBridgeApi, onStatus: StatusCallback): void;
init(
bridge: ToolBridgeApi,
onStatus: StatusCallback,
context: AgentContext,
): void;
ping(): Promise<"pong">;
ask(params: AskParams, signal?: AbortSignal): Promise<AgentResponse>;
}
Expand All @@ -40,9 +37,12 @@ function generateThreadId(): string {
return `thread-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}

function createWorkerApi(): AgentWorkerApi {
export function createWorkerApi(
createDispatcher: () => TangleDispatcher,
): AgentWorkerApi {
let dispatcher: TangleDispatcher | null = null;
let bridge: ToolBridgeApi | null = null;
let context: AgentContext | null = null;
let emitStatus: StatusCallback = () => {};
const proxyClient = new ProxyClient();
const skillsLoader = new SkillsLoader();
Expand All @@ -52,12 +52,13 @@ function createWorkerApi(): AgentWorkerApi {
* Initialization entry point. Called once by the main thread
* immediately after spawning the worker.
*/
init(toolBridge, onStatus) {
init(toolBridge, onStatus, agentContext) {
// Dispose any prior dispatcher (detaches its observability listeners)
// so a fresh onStatus fully replaces the old one on re-init.
dispatcher?.dispose();
bridge = toolBridge;
emitStatus = onStatus;
context = agentContext;
dispatcher = createDispatcher();
},

Expand All @@ -68,7 +69,7 @@ function createWorkerApi(): AgentWorkerApi {
async ask({ message, threadId, recentRuns, aiConfig }, _signal) {
// todo: add logic to handle the signal

if (!dispatcher || !bridge) {
if (!dispatcher || !bridge || !context) {
throw new Error(
"Agent worker not initialized. Call init() before ask().",
);
Expand All @@ -82,6 +83,7 @@ function createWorkerApi(): AgentWorkerApi {
skillsLoader,
aiConfig,
recentRuns,
context,
});

const result = await dispatcher.invoke({
Expand All @@ -92,7 +94,7 @@ function createWorkerApi(): AgentWorkerApi {
});

// TODO: populate from session.componentReferences once the
// search_components tool is wired (PR 4+).
// search_components tool is wired.
const componentReferences: AgentResponse["componentReferences"] = {};

return {
Expand All @@ -103,5 +105,3 @@ function createWorkerApi(): AgentWorkerApi {
},
};
}

Comlink.expose(createWorkerApi());
17 changes: 17 additions & 0 deletions src/agent/editorWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Web Worker entry point for the Editor AI assistant.
*
* Spawns the full Editor dispatcher (general help, pipeline repair,
* pipeline architect, debug assistant). The page owns the factory that
* creates this worker; see `createEditorAgentWorker`.
*/
// Must come first: installs the `globalThis.process` stub that
// `@openai/agents-core` needs before any SDK module is evaluated.
import "./processPolyfill";

import * as Comlink from "comlink";

import { createEditorDispatcher } from "./agents/editorDispatcher";
import { createWorkerApi } from "./createWorkerApi";

Comlink.expose(createWorkerApi(createEditorDispatcher));
Loading
Loading