Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **CLI and server no longer silently lose repo-local env vars.** Previously, env vars in `<repo>/.env` were parsed, deleted from `process.env` by `stripCwdEnv()`, and the only output operators saw was `[dotenv@17.3.1] injecting env (0) from .env` — which read as "file was empty." Workflows that needed `SLACK_WEBHOOK` or similar had no way to recover without knowing to use `~/.archon/.env`. The new `<cwd>/.archon/.env` path + archon-owned log lines make the load state observable and recoverable. (#1302)

- **Server startup no longer marks actively-running workflows as failed.** The `failOrphanedRuns()` call has been removed from `packages/server/src/index.ts` to match the CLI precedent (`packages/cli/src/cli.ts:256-258`). Per the new CLAUDE.md principle "No Autonomous Lifecycle Mutation Across Process Boundaries", a stuck `running` row is now transitioned explicitly by the user: via the per-row Cancel/Abandon buttons on the dashboard workflow card, or `archon workflow abandon <run-id>` from the CLI. (`archon workflow cleanup` is a separate command that deletes OLD terminal runs for disk hygiene — it does not handle stuck `running` rows.) Closes #1216.
- **`MCP server connection failed: <plugin>` noise no longer surfaces in workflow runs.** The dag-executor now loads the workflow node's `mcp:` config file once and filters the SDK's failure message to only the servers the workflow actually configured. User-level Claude plugin MCPs (e.g. `telegram` inherited from `~/.claude/`) that fail to connect in the headless subprocess are debug-logged as `dag.mcp_plugin_connection_suppressed` instead of being forwarded to the conversation. Other provider warnings (⚠️) surface unchanged. Credits @MrFadiAi for reporting the issue in #1134 (that PR was 9 days stale and conflicting; this is a fresh re-do on current `dev`).

### Changed

Expand Down
13 changes: 11 additions & 2 deletions packages/docs-web/src/content/docs/guides/mcp-servers.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ and cannot touch the filesystem or run shell commands.

## Connection Failure Handling

MCP server connections are established when the node starts executing. If a server
fails to connect, you'll see a message like:
MCP server connections are established when the node starts executing. If a
server the **workflow** configured via `mcp:` fails to connect, you'll see a
message like:

```
MCP server connection failed: github (failed)
Expand All @@ -204,6 +205,13 @@ MCP server connection failed: github (failed)
The node continues executing but without the tools from the failed server.
Check your config file path, server command, and environment variables if this happens.

User-level Claude plugin MCPs inherited from `~/.claude/` (e.g. `telegram`,
`notion`) routinely fail to connect inside the headless workflow subprocess
and are **not** surfaced here — they're not actionable for the workflow author.
They appear only in debug logs as `dag.mcp_plugin_connection_suppressed`. Run
the CLI with `--verbose` (or set `LOG_LEVEL=debug` on the server) if you need
to see them.

## Workflow Examples

### GitHub Issue Triage
Expand Down Expand Up @@ -378,6 +386,7 @@ bun run cli workflow run archon-smart-pr-review "Review PR #123"
| `MCP config must be a JSON object` | Top-level value is array or string | Wrap in `{ "server-name": { ... } }` |
| `undefined env vars: VAR_NAME` | Environment variable not set | Export the variable or add it to your `.env` |
| `MCP server connection failed` | Server process crashed or URL unreachable | Check command/URL, test the server standalone |
| Plugin MCP missing from workflow output | User-level plugin MCPs (from `~/.claude/`) are filtered out of workflow warnings | Run with `--verbose` and look for `dag.mcp_plugin_connection_suppressed` |
| `mcp config but uses Codex` | Node resolved to Codex provider | Set `provider: claude` on the node or switch default |
| `Haiku model with MCP servers` | Haiku doesn't support tool search | Use `model: sonnet` or `model: opus` instead |

Expand Down
211 changes: 211 additions & 0 deletions packages/workflows/src/dag-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5753,3 +5753,214 @@ describe('executeDagWorkflow -- script nodes', () => {
execSpy.mockRestore();
});
});

// ---------------------------------------------------------------------------
// MCP plugin-noise filtering helpers
// ---------------------------------------------------------------------------

describe('parseMcpFailureServerNames', () => {
it('extracts entries (name + segment) from a well-formed message', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
const entries = parseMcpFailureServerNames(
'MCP server connection failed: telegram (disconnected), github (timeout)'
);
expect(entries).toEqual([
{ name: 'telegram', segment: 'telegram (disconnected)' },
{ name: 'github', segment: 'github (timeout)' },
]);
});

it('returns empty array for unrelated messages', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('⚠️ Something else')).toEqual([]);
expect(parseMcpFailureServerNames('')).toEqual([]);
});

it('deduplicates repeated entries (first segment wins)', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
const entries = parseMcpFailureServerNames(
'MCP server connection failed: foo (a), foo (b), bar (c)'
);
expect(entries).toEqual([
{ name: 'foo', segment: 'foo (a)' },
{ name: 'bar', segment: 'bar (c)' },
]);
});

it('handles a single entry without status parens gracefully', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('MCP server connection failed: solo')).toEqual([
{ name: 'solo', segment: 'solo' },
]);
});

it('drops empty segments from trailing/leading commas', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('MCP server connection failed: a (x), , b (y)')).toEqual([
{ name: 'a', segment: 'a (x)' },
{ name: 'b', segment: 'b (y)' },
]);
});
});

describe('loadConfiguredMcpServerNames', () => {
let testDir: string;

beforeEach(async () => {
testDir = join(tmpdir(), `mcp-names-${Date.now()}-${Math.random().toString(36).slice(2)}`);
await mkdir(testDir, { recursive: true });
});

afterEach(async () => {
await rm(testDir, { recursive: true, force: true });
});

it('returns empty set when nodeMcpPath is undefined', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
const names = await loadConfiguredMcpServerNames(undefined, testDir);
expect(names.size).toBe(0);
});

it('returns server names for a valid JSON config (relative path)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
await writeFile(
join(testDir, 'mcp.json'),
JSON.stringify({ foo: { command: 'x' }, bar: { command: 'y' } })
);
const names = await loadConfiguredMcpServerNames('mcp.json', testDir);
expect([...names].sort()).toEqual(['bar', 'foo']);
});

it('returns server names for an absolute path', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
const absolutePath = join(testDir, 'abs.json');
await writeFile(absolutePath, JSON.stringify({ baz: {} }));
const names = await loadConfiguredMcpServerNames(absolutePath, '/nonexistent/cwd');
expect([...names]).toEqual(['baz']);
});

it('returns empty set when file is missing (no crash)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
const names = await loadConfiguredMcpServerNames('missing.json', testDir);
expect(names.size).toBe(0);
});

it('returns empty set for invalid JSON (provider surfaces its own error)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
await writeFile(join(testDir, 'broken.json'), '{ not-json');
const names = await loadConfiguredMcpServerNames('broken.json', testDir);
expect(names.size).toBe(0);
});

it('returns empty set when JSON is an array (not an object of servers)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
await writeFile(join(testDir, 'arr.json'), '["foo","bar"]');
const names = await loadConfiguredMcpServerNames('arr.json', testDir);
expect(names.size).toBe(0);
});
});

// ---------------------------------------------------------------------------
// MCP plugin-noise filtering — end-to-end through executeDagWorkflow
// ---------------------------------------------------------------------------

describe('executeDagWorkflow -- MCP failure filtering', () => {
let testDir: string;

beforeEach(async () => {
testDir = join(tmpdir(), `dag-mcp-filter-${Date.now()}-${Math.random().toString(36).slice(2)}`);
const commandsDir = join(testDir, '.archon', 'commands');
await mkdir(commandsDir, { recursive: true });
await writeFile(join(commandsDir, 'my-cmd.md'), 'cmd prompt');

mockSendQueryDag.mockClear();
mockGetAgentProviderDag.mockClear();
});

afterEach(async () => {
mockGetAgentProviderDag.mockImplementation(() => ({
sendQuery: mockSendQueryDag,
getType: () => 'claude',
getCapabilities: mockClaudeCapabilities,
}));
try {
await rm(testDir, { recursive: true, force: true });
} catch {
// ignore cleanup errors
}
});

async function runWithSystemChunk(
systemContent: string,
nodeMcpPath?: string
): Promise<IWorkflowPlatform> {
mockSendQueryDag.mockImplementation(function* () {
yield { type: 'system', content: systemContent };
yield { type: 'assistant', content: 'ok' };
yield { type: 'result', sessionId: 'sess' };
});

const platform = createMockPlatform();
await executeDagWorkflow(
createMockDeps(),
platform,
'conv-mcp-filter',
testDir,
{
name: 'mcp-filter-test',
nodes: [{ id: 'review', command: 'my-cmd', ...(nodeMcpPath ? { mcp: nodeMcpPath } : {}) }],
},
makeWorkflowRun(),
'claude',
undefined,
join(testDir, 'artifacts'),
join(testDir, 'logs'),
'main',
'docs/',
minimalConfig
);
return platform;
}

function mcpMessages(platform: IWorkflowPlatform): string[] {
const calls = (platform.sendMessage as Mock<typeof platform.sendMessage>).mock.calls;
return calls
.map(c => c[1] as string)
.filter(m => m.startsWith('MCP server connection failed:') || m.startsWith('⚠️'));
}

it('forwards only workflow-configured failures and preserves status detail', async () => {
await writeFile(join(testDir, 'mcp.json'), JSON.stringify({ 'workflow-server': {} }));
const platform = await runWithSystemChunk(
'MCP server connection failed: workflow-server (timeout), telegram (disconnected)',
'mcp.json'
);

const sent = mcpMessages(platform);
expect(sent).toEqual(['MCP server connection failed: workflow-server (timeout)']);
});

it('suppresses MCP message entirely when all failures are user plugins', async () => {
await writeFile(join(testDir, 'mcp.json'), JSON.stringify({ 'workflow-server': {} }));
const platform = await runWithSystemChunk(
'MCP server connection failed: telegram (disconnected), notion (timeout)',
'mcp.json'
);

expect(mcpMessages(platform)).toEqual([]);
});

it('suppresses everything when node has no mcp: config (all failures are plugin noise)', async () => {
const platform = await runWithSystemChunk(
'MCP server connection failed: telegram (disconnected)'
);

expect(mcpMessages(platform)).toEqual([]);
});

it('forwards ⚠️ provider warnings verbatim', async () => {
const platform = await runWithSystemChunk('⚠️ Haiku does not support MCP');

expect(mcpMessages(platform)).toEqual(['⚠️ Haiku does not support MCP']);
});
});
111 changes: 104 additions & 7 deletions packages/workflows/src/dag-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* Independent nodes within the same layer run concurrently via Promise.allSettled.
* Captures all assistant output regardless of streaming mode for $node_id.output substitution.
*/
import { readFile } from 'fs/promises';
import { isAbsolute, resolve as resolvePath } from 'path';
import { execFileAsync } from '@archon/git';
import { discoverScriptsForCwd } from './script-discovery';
import type {
Expand Down Expand Up @@ -77,6 +79,69 @@ function getLog(): ReturnType<typeof createLogger> {
return cachedLog;
}

const MCP_FAILURE_PREFIX = 'MCP server connection failed: ';

/** A failed MCP server entry parsed from the SDK message. `segment` is the
* original substring (e.g. `"telegram (disconnected)"`) so callers can
* reconstruct a filtered message without losing the status detail. */
export interface McpFailureEntry {
name: string;
segment: string;
}

/**
* Parse the SDK's "MCP server connection failed: a (status), b (status)"
* message. Best-effort — malformed or prefix-free messages return `[]`.
* Entries are ordered and deduped by name; the segment of the first
* occurrence wins.
*/
export function parseMcpFailureServerNames(message: string): McpFailureEntry[] {
if (!message.startsWith(MCP_FAILURE_PREFIX)) return [];
const seen = new Set<string>();
const entries: McpFailureEntry[] = [];
for (const raw of message.slice(MCP_FAILURE_PREFIX.length).split(', ')) {
const segment = raw.trim();
const name = segment.split(' (')[0]?.trim();
if (name && !seen.has(name)) {
seen.add(name);
entries.push({ name, segment });
}
}
return entries;
}

/**
* Load the set of MCP server names that a node's `mcp:` config file declares.
*
* Returns an empty set when no `mcp:` is configured or when the file can't be
* read/parsed. Used to distinguish workflow-configured failures (surface to
* user) from user-plugin failures (silent debug log). We intentionally do not
* validate or env-expand here — the provider owns full loading and will
* surface its own parse errors via the warning channel if the file is broken.
*
* Read failures are debug-logged so a transient I/O error (EMFILE/EBUSY) that
* leaves us with an empty set — and silently reclassifies a real workflow-MCP
* failure as plugin noise — is at least observable.
*/
export async function loadConfiguredMcpServerNames(
nodeMcpPath: string | undefined,
cwd: string
): Promise<Set<string>> {
if (!nodeMcpPath) return new Set();
const fullPath = isAbsolute(nodeMcpPath) ? nodeMcpPath : resolvePath(cwd, nodeMcpPath);
try {
const raw = await readFile(fullPath, 'utf-8');
const parsed: unknown = JSON.parse(raw);
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
return new Set();
}
return new Set(Object.keys(parsed as Record<string, unknown>));
} catch (err) {
getLog().debug({ err, nodeMcpPath, fullPath }, 'dag.mcp_filter_config_read_failed');
return new Set();
}
}

/** Workflow-level Claude SDK options — per-node overrides take precedence via ?? */
interface WorkflowLevelOptions {
effort?: EffortLevel;
Expand Down Expand Up @@ -488,6 +553,8 @@ async function executeNodeInternal(
const nodeStartTime = Date.now();
const nodeContext: SendMessageContext = { workflowId: workflowRun.id, nodeName: node.id };

const configuredMcpNames = await loadConfiguredMcpServerNames(node.mcp, cwd);

getLog().info({ nodeId: node.id, provider }, 'dag_node_started');
await logNodeStart(logDir, workflowRun.id, node.id, node.command ?? '<inline>');

Expand Down Expand Up @@ -815,13 +882,43 @@ async function executeNodeInternal(
}
break; // Result is the "I'm done" signal — don't wait for subprocess to exit
} else if (msg.type === 'system' && msg.content) {
// Forward provider warnings (⚠️) and MCP connection failures to the user.
// Providers yield system chunks for user-actionable issues (missing env vars,
// Haiku+MCP, structured output failures, etc.)
if (
msg.content.startsWith('MCP server connection failed:') ||
msg.content.startsWith('⚠️')
) {
// Providers yield system chunks for user-actionable issues (missing env
// vars, Haiku+MCP, structured output failures, etc.). MCP-failure
// chunks need filtering: user-level plugin MCPs inherited from
// `~/.claude/` (e.g. `telegram`) routinely fail to connect inside the
// headless subprocess and aren't actionable for the workflow author.
// Other warnings (⚠️) are always actionable and surface verbatim.
if (msg.content.startsWith(MCP_FAILURE_PREFIX)) {
const failedEntries = parseMcpFailureServerNames(msg.content);
const workflowFailures = failedEntries.filter(e => configuredMcpNames.has(e.name));
const pluginFailures = failedEntries.filter(e => !configuredMcpNames.has(e.name));

if (workflowFailures.length > 0) {
const filteredMsg = `${MCP_FAILURE_PREFIX}${workflowFailures.map(e => e.segment).join(', ')}`;
getLog().warn(
{ nodeId: node.id, systemContent: filteredMsg },
'dag.provider_warning_forwarded'
);
const delivered = await safeSendMessage(
platform,
conversationId,
filteredMsg,
nodeContext
);
if (!delivered) {
getLog().error(
{ nodeId: node.id, workflowRunId: workflowRun.id },
'dag.provider_warning_delivery_failed'
);
}
}
if (pluginFailures.length > 0) {
getLog().debug(
{ nodeId: node.id, pluginFailures: pluginFailures.map(e => e.name) },
'dag.mcp_plugin_connection_suppressed'
);
}
} else if (msg.content.startsWith('⚠️')) {
getLog().warn(
{ nodeId: node.id, systemContent: msg.content },
'dag.provider_warning_forwarded'
Expand Down
Loading