diff --git a/CHANGELOG.md b/CHANGELOG.md index 34e4d9ffab..f6af485a6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **CLI and server no longer silently lose repo-local env vars.** Previously, env vars in `/.env` were parsed, deleted from `process.env` by `stripCwdEnv()`, and the only output operators saw was `[dotenv@17.3.1] injecting env (0) from .env` — which read as "file was empty." Workflows that needed `SLACK_WEBHOOK` or similar had no way to recover without knowing to use `~/.archon/.env`. The new `/.archon/.env` path + archon-owned log lines make the load state observable and recoverable. (#1302) - **Server startup no longer marks actively-running workflows as failed.** The `failOrphanedRuns()` call has been removed from `packages/server/src/index.ts` to match the CLI precedent (`packages/cli/src/cli.ts:256-258`). Per the new CLAUDE.md principle "No Autonomous Lifecycle Mutation Across Process Boundaries", a stuck `running` row is now transitioned explicitly by the user: via the per-row Cancel/Abandon buttons on the dashboard workflow card, or `archon workflow abandon ` from the CLI. (`archon workflow cleanup` is a separate command that deletes OLD terminal runs for disk hygiene — it does not handle stuck `running` rows.) Closes #1216. +- **`MCP server connection failed: ` noise no longer surfaces in workflow runs.** The dag-executor now loads the workflow node's `mcp:` config file once and filters the SDK's failure message to only the servers the workflow actually configured. User-level Claude plugin MCPs (e.g. `telegram` inherited from `~/.claude/`) that fail to connect in the headless subprocess are debug-logged as `dag.mcp_plugin_connection_suppressed` instead of being forwarded to the conversation. Other provider warnings (⚠️) surface unchanged. Credits @MrFadiAi for reporting the issue in #1134 (that PR was 9 days stale and conflicting; this is a fresh re-do on current `dev`). ### Changed diff --git a/packages/docs-web/src/content/docs/guides/mcp-servers.md b/packages/docs-web/src/content/docs/guides/mcp-servers.md index 41f7f331cf..46474477e2 100644 --- a/packages/docs-web/src/content/docs/guides/mcp-servers.md +++ b/packages/docs-web/src/content/docs/guides/mcp-servers.md @@ -194,8 +194,9 @@ and cannot touch the filesystem or run shell commands. ## Connection Failure Handling -MCP server connections are established when the node starts executing. If a server -fails to connect, you'll see a message like: +MCP server connections are established when the node starts executing. If a +server the **workflow** configured via `mcp:` fails to connect, you'll see a +message like: ``` MCP server connection failed: github (failed) @@ -204,6 +205,13 @@ MCP server connection failed: github (failed) The node continues executing but without the tools from the failed server. Check your config file path, server command, and environment variables if this happens. +User-level Claude plugin MCPs inherited from `~/.claude/` (e.g. `telegram`, +`notion`) routinely fail to connect inside the headless workflow subprocess +and are **not** surfaced here — they're not actionable for the workflow author. +They appear only in debug logs as `dag.mcp_plugin_connection_suppressed`. Run +the CLI with `--verbose` (or set `LOG_LEVEL=debug` on the server) if you need +to see them. + ## Workflow Examples ### GitHub Issue Triage @@ -378,6 +386,7 @@ bun run cli workflow run archon-smart-pr-review "Review PR #123" | `MCP config must be a JSON object` | Top-level value is array or string | Wrap in `{ "server-name": { ... } }` | | `undefined env vars: VAR_NAME` | Environment variable not set | Export the variable or add it to your `.env` | | `MCP server connection failed` | Server process crashed or URL unreachable | Check command/URL, test the server standalone | +| Plugin MCP missing from workflow output | User-level plugin MCPs (from `~/.claude/`) are filtered out of workflow warnings | Run with `--verbose` and look for `dag.mcp_plugin_connection_suppressed` | | `mcp config but uses Codex` | Node resolved to Codex provider | Set `provider: claude` on the node or switch default | | `Haiku model with MCP servers` | Haiku doesn't support tool search | Use `model: sonnet` or `model: opus` instead | diff --git a/packages/workflows/src/dag-executor.test.ts b/packages/workflows/src/dag-executor.test.ts index 46f33970bd..e3fe68e3f4 100644 --- a/packages/workflows/src/dag-executor.test.ts +++ b/packages/workflows/src/dag-executor.test.ts @@ -5753,3 +5753,214 @@ describe('executeDagWorkflow -- script nodes', () => { execSpy.mockRestore(); }); }); + +// --------------------------------------------------------------------------- +// MCP plugin-noise filtering helpers +// --------------------------------------------------------------------------- + +describe('parseMcpFailureServerNames', () => { + it('extracts entries (name + segment) from a well-formed message', async () => { + const { parseMcpFailureServerNames } = await import('./dag-executor'); + const entries = parseMcpFailureServerNames( + 'MCP server connection failed: telegram (disconnected), github (timeout)' + ); + expect(entries).toEqual([ + { name: 'telegram', segment: 'telegram (disconnected)' }, + { name: 'github', segment: 'github (timeout)' }, + ]); + }); + + it('returns empty array for unrelated messages', async () => { + const { parseMcpFailureServerNames } = await import('./dag-executor'); + expect(parseMcpFailureServerNames('⚠️ Something else')).toEqual([]); + expect(parseMcpFailureServerNames('')).toEqual([]); + }); + + it('deduplicates repeated entries (first segment wins)', async () => { + const { parseMcpFailureServerNames } = await import('./dag-executor'); + const entries = parseMcpFailureServerNames( + 'MCP server connection failed: foo (a), foo (b), bar (c)' + ); + expect(entries).toEqual([ + { name: 'foo', segment: 'foo (a)' }, + { name: 'bar', segment: 'bar (c)' }, + ]); + }); + + it('handles a single entry without status parens gracefully', async () => { + const { parseMcpFailureServerNames } = await import('./dag-executor'); + expect(parseMcpFailureServerNames('MCP server connection failed: solo')).toEqual([ + { name: 'solo', segment: 'solo' }, + ]); + }); + + it('drops empty segments from trailing/leading commas', async () => { + const { parseMcpFailureServerNames } = await import('./dag-executor'); + expect(parseMcpFailureServerNames('MCP server connection failed: a (x), , b (y)')).toEqual([ + { name: 'a', segment: 'a (x)' }, + { name: 'b', segment: 'b (y)' }, + ]); + }); +}); + +describe('loadConfiguredMcpServerNames', () => { + let testDir: string; + + beforeEach(async () => { + testDir = join(tmpdir(), `mcp-names-${Date.now()}-${Math.random().toString(36).slice(2)}`); + await mkdir(testDir, { recursive: true }); + }); + + afterEach(async () => { + await rm(testDir, { recursive: true, force: true }); + }); + + it('returns empty set when nodeMcpPath is undefined', async () => { + const { loadConfiguredMcpServerNames } = await import('./dag-executor'); + const names = await loadConfiguredMcpServerNames(undefined, testDir); + expect(names.size).toBe(0); + }); + + it('returns server names for a valid JSON config (relative path)', async () => { + const { loadConfiguredMcpServerNames } = await import('./dag-executor'); + await writeFile( + join(testDir, 'mcp.json'), + JSON.stringify({ foo: { command: 'x' }, bar: { command: 'y' } }) + ); + const names = await loadConfiguredMcpServerNames('mcp.json', testDir); + expect([...names].sort()).toEqual(['bar', 'foo']); + }); + + it('returns server names for an absolute path', async () => { + const { loadConfiguredMcpServerNames } = await import('./dag-executor'); + const absolutePath = join(testDir, 'abs.json'); + await writeFile(absolutePath, JSON.stringify({ baz: {} })); + const names = await loadConfiguredMcpServerNames(absolutePath, '/nonexistent/cwd'); + expect([...names]).toEqual(['baz']); + }); + + it('returns empty set when file is missing (no crash)', async () => { + const { loadConfiguredMcpServerNames } = await import('./dag-executor'); + const names = await loadConfiguredMcpServerNames('missing.json', testDir); + expect(names.size).toBe(0); + }); + + it('returns empty set for invalid JSON (provider surfaces its own error)', async () => { + const { loadConfiguredMcpServerNames } = await import('./dag-executor'); + await writeFile(join(testDir, 'broken.json'), '{ not-json'); + const names = await loadConfiguredMcpServerNames('broken.json', testDir); + expect(names.size).toBe(0); + }); + + it('returns empty set when JSON is an array (not an object of servers)', async () => { + const { loadConfiguredMcpServerNames } = await import('./dag-executor'); + await writeFile(join(testDir, 'arr.json'), '["foo","bar"]'); + const names = await loadConfiguredMcpServerNames('arr.json', testDir); + expect(names.size).toBe(0); + }); +}); + +// --------------------------------------------------------------------------- +// MCP plugin-noise filtering — end-to-end through executeDagWorkflow +// --------------------------------------------------------------------------- + +describe('executeDagWorkflow -- MCP failure filtering', () => { + let testDir: string; + + beforeEach(async () => { + testDir = join(tmpdir(), `dag-mcp-filter-${Date.now()}-${Math.random().toString(36).slice(2)}`); + const commandsDir = join(testDir, '.archon', 'commands'); + await mkdir(commandsDir, { recursive: true }); + await writeFile(join(commandsDir, 'my-cmd.md'), 'cmd prompt'); + + mockSendQueryDag.mockClear(); + mockGetAgentProviderDag.mockClear(); + }); + + afterEach(async () => { + mockGetAgentProviderDag.mockImplementation(() => ({ + sendQuery: mockSendQueryDag, + getType: () => 'claude', + getCapabilities: mockClaudeCapabilities, + })); + try { + await rm(testDir, { recursive: true, force: true }); + } catch { + // ignore cleanup errors + } + }); + + async function runWithSystemChunk( + systemContent: string, + nodeMcpPath?: string + ): Promise { + mockSendQueryDag.mockImplementation(function* () { + yield { type: 'system', content: systemContent }; + yield { type: 'assistant', content: 'ok' }; + yield { type: 'result', sessionId: 'sess' }; + }); + + const platform = createMockPlatform(); + await executeDagWorkflow( + createMockDeps(), + platform, + 'conv-mcp-filter', + testDir, + { + name: 'mcp-filter-test', + nodes: [{ id: 'review', command: 'my-cmd', ...(nodeMcpPath ? { mcp: nodeMcpPath } : {}) }], + }, + makeWorkflowRun(), + 'claude', + undefined, + join(testDir, 'artifacts'), + join(testDir, 'logs'), + 'main', + 'docs/', + minimalConfig + ); + return platform; + } + + function mcpMessages(platform: IWorkflowPlatform): string[] { + const calls = (platform.sendMessage as Mock).mock.calls; + return calls + .map(c => c[1] as string) + .filter(m => m.startsWith('MCP server connection failed:') || m.startsWith('⚠️')); + } + + it('forwards only workflow-configured failures and preserves status detail', async () => { + await writeFile(join(testDir, 'mcp.json'), JSON.stringify({ 'workflow-server': {} })); + const platform = await runWithSystemChunk( + 'MCP server connection failed: workflow-server (timeout), telegram (disconnected)', + 'mcp.json' + ); + + const sent = mcpMessages(platform); + expect(sent).toEqual(['MCP server connection failed: workflow-server (timeout)']); + }); + + it('suppresses MCP message entirely when all failures are user plugins', async () => { + await writeFile(join(testDir, 'mcp.json'), JSON.stringify({ 'workflow-server': {} })); + const platform = await runWithSystemChunk( + 'MCP server connection failed: telegram (disconnected), notion (timeout)', + 'mcp.json' + ); + + expect(mcpMessages(platform)).toEqual([]); + }); + + it('suppresses everything when node has no mcp: config (all failures are plugin noise)', async () => { + const platform = await runWithSystemChunk( + 'MCP server connection failed: telegram (disconnected)' + ); + + expect(mcpMessages(platform)).toEqual([]); + }); + + it('forwards ⚠️ provider warnings verbatim', async () => { + const platform = await runWithSystemChunk('⚠️ Haiku does not support MCP'); + + expect(mcpMessages(platform)).toEqual(['⚠️ Haiku does not support MCP']); + }); +}); diff --git a/packages/workflows/src/dag-executor.ts b/packages/workflows/src/dag-executor.ts index 63e4d6cafd..5c0b82ce31 100644 --- a/packages/workflows/src/dag-executor.ts +++ b/packages/workflows/src/dag-executor.ts @@ -5,6 +5,8 @@ * Independent nodes within the same layer run concurrently via Promise.allSettled. * Captures all assistant output regardless of streaming mode for $node_id.output substitution. */ +import { readFile } from 'fs/promises'; +import { isAbsolute, resolve as resolvePath } from 'path'; import { execFileAsync } from '@archon/git'; import { discoverScriptsForCwd } from './script-discovery'; import type { @@ -77,6 +79,69 @@ function getLog(): ReturnType { return cachedLog; } +const MCP_FAILURE_PREFIX = 'MCP server connection failed: '; + +/** A failed MCP server entry parsed from the SDK message. `segment` is the + * original substring (e.g. `"telegram (disconnected)"`) so callers can + * reconstruct a filtered message without losing the status detail. */ +export interface McpFailureEntry { + name: string; + segment: string; +} + +/** + * Parse the SDK's "MCP server connection failed: a (status), b (status)" + * message. Best-effort — malformed or prefix-free messages return `[]`. + * Entries are ordered and deduped by name; the segment of the first + * occurrence wins. + */ +export function parseMcpFailureServerNames(message: string): McpFailureEntry[] { + if (!message.startsWith(MCP_FAILURE_PREFIX)) return []; + const seen = new Set(); + const entries: McpFailureEntry[] = []; + for (const raw of message.slice(MCP_FAILURE_PREFIX.length).split(', ')) { + const segment = raw.trim(); + const name = segment.split(' (')[0]?.trim(); + if (name && !seen.has(name)) { + seen.add(name); + entries.push({ name, segment }); + } + } + return entries; +} + +/** + * Load the set of MCP server names that a node's `mcp:` config file declares. + * + * Returns an empty set when no `mcp:` is configured or when the file can't be + * read/parsed. Used to distinguish workflow-configured failures (surface to + * user) from user-plugin failures (silent debug log). We intentionally do not + * validate or env-expand here — the provider owns full loading and will + * surface its own parse errors via the warning channel if the file is broken. + * + * Read failures are debug-logged so a transient I/O error (EMFILE/EBUSY) that + * leaves us with an empty set — and silently reclassifies a real workflow-MCP + * failure as plugin noise — is at least observable. + */ +export async function loadConfiguredMcpServerNames( + nodeMcpPath: string | undefined, + cwd: string +): Promise> { + if (!nodeMcpPath) return new Set(); + const fullPath = isAbsolute(nodeMcpPath) ? nodeMcpPath : resolvePath(cwd, nodeMcpPath); + try { + const raw = await readFile(fullPath, 'utf-8'); + const parsed: unknown = JSON.parse(raw); + if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) { + return new Set(); + } + return new Set(Object.keys(parsed as Record)); + } catch (err) { + getLog().debug({ err, nodeMcpPath, fullPath }, 'dag.mcp_filter_config_read_failed'); + return new Set(); + } +} + /** Workflow-level Claude SDK options — per-node overrides take precedence via ?? */ interface WorkflowLevelOptions { effort?: EffortLevel; @@ -488,6 +553,8 @@ async function executeNodeInternal( const nodeStartTime = Date.now(); const nodeContext: SendMessageContext = { workflowId: workflowRun.id, nodeName: node.id }; + const configuredMcpNames = await loadConfiguredMcpServerNames(node.mcp, cwd); + getLog().info({ nodeId: node.id, provider }, 'dag_node_started'); await logNodeStart(logDir, workflowRun.id, node.id, node.command ?? ''); @@ -815,13 +882,43 @@ async function executeNodeInternal( } break; // Result is the "I'm done" signal — don't wait for subprocess to exit } else if (msg.type === 'system' && msg.content) { - // Forward provider warnings (⚠️) and MCP connection failures to the user. - // Providers yield system chunks for user-actionable issues (missing env vars, - // Haiku+MCP, structured output failures, etc.) - if ( - msg.content.startsWith('MCP server connection failed:') || - msg.content.startsWith('⚠️') - ) { + // Providers yield system chunks for user-actionable issues (missing env + // vars, Haiku+MCP, structured output failures, etc.). MCP-failure + // chunks need filtering: user-level plugin MCPs inherited from + // `~/.claude/` (e.g. `telegram`) routinely fail to connect inside the + // headless subprocess and aren't actionable for the workflow author. + // Other warnings (⚠️) are always actionable and surface verbatim. + if (msg.content.startsWith(MCP_FAILURE_PREFIX)) { + const failedEntries = parseMcpFailureServerNames(msg.content); + const workflowFailures = failedEntries.filter(e => configuredMcpNames.has(e.name)); + const pluginFailures = failedEntries.filter(e => !configuredMcpNames.has(e.name)); + + if (workflowFailures.length > 0) { + const filteredMsg = `${MCP_FAILURE_PREFIX}${workflowFailures.map(e => e.segment).join(', ')}`; + getLog().warn( + { nodeId: node.id, systemContent: filteredMsg }, + 'dag.provider_warning_forwarded' + ); + const delivered = await safeSendMessage( + platform, + conversationId, + filteredMsg, + nodeContext + ); + if (!delivered) { + getLog().error( + { nodeId: node.id, workflowRunId: workflowRun.id }, + 'dag.provider_warning_delivery_failed' + ); + } + } + if (pluginFailures.length > 0) { + getLog().debug( + { nodeId: node.id, pluginFailures: pluginFailures.map(e => e.name) }, + 'dag.mcp_plugin_connection_suppressed' + ); + } + } else if (msg.content.startsWith('⚠️')) { getLog().warn( { nodeId: node.id, systemContent: msg.content }, 'dag.provider_warning_forwarded'