Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **CLI and server no longer silently lose repo-local env vars.** Previously, env vars in `<repo>/.env` were parsed, deleted from `process.env` by `stripCwdEnv()`, and the only output operators saw was `[dotenv@17.3.1] injecting env (0) from .env` — which read as "file was empty." Workflows that needed `SLACK_WEBHOOK` or similar had no way to recover without knowing to use `~/.archon/.env`. The new `<cwd>/.archon/.env` path + archon-owned log lines make the load state observable and recoverable. (#1302)

- **Server startup no longer marks actively-running workflows as failed.** The `failOrphanedRuns()` call has been removed from `packages/server/src/index.ts` to match the CLI precedent (`packages/cli/src/cli.ts:256-258`). Per the new CLAUDE.md principle "No Autonomous Lifecycle Mutation Across Process Boundaries", a stuck `running` row is now transitioned explicitly by the user: via the per-row Cancel/Abandon buttons on the dashboard workflow card, or `archon workflow abandon <run-id>` from the CLI. (`archon workflow cleanup` is a separate command that deletes OLD terminal runs for disk hygiene — it does not handle stuck `running` rows.) Closes #1216.
- **`MCP server connection failed: <plugin>` noise no longer surfaces in workflow runs.** The dag-executor now loads the workflow node's `mcp:` config file once and filters the SDK's failure message to only the servers the workflow actually configured. User-level Claude plugin MCPs (e.g. `telegram` inherited from `~/.claude/`) that fail to connect in the headless subprocess are debug-logged as `dag.mcp_plugin_connection_suppressed` instead of being forwarded to the conversation. Other provider warnings (⚠️) surface unchanged. Credits @MrFadiAi for reporting the issue in #1134 (that PR was 9 days stale and conflicting; this is a fresh re-do on current `dev`).

### Changed

Expand Down
98 changes: 98 additions & 0 deletions packages/workflows/src/dag-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5753,3 +5753,101 @@ describe('executeDagWorkflow -- script nodes', () => {
execSpy.mockRestore();
});
});

// ---------------------------------------------------------------------------
// MCP plugin-noise filtering helpers
// ---------------------------------------------------------------------------

describe('parseMcpFailureServerNames', () => {
it('extracts names from a well-formed message', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
const names = parseMcpFailureServerNames(
'MCP server connection failed: telegram (disconnected), github (timeout)'
);
expect(names).toEqual(['telegram', 'github']);
});

it('returns empty array for unrelated messages', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('⚠️ Something else')).toEqual([]);
expect(parseMcpFailureServerNames('')).toEqual([]);
});

it('deduplicates repeated entries', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
const names = parseMcpFailureServerNames(
'MCP server connection failed: foo (a), foo (b), bar (c)'
);
expect(names).toEqual(['foo', 'bar']);
});

it('handles a single entry without status parens gracefully', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('MCP server connection failed: solo')).toEqual(['solo']);
});

it('drops empty segments from trailing/leading commas', async () => {
const { parseMcpFailureServerNames } = await import('./dag-executor');
expect(parseMcpFailureServerNames('MCP server connection failed: a (x), , b (y)')).toEqual([
'a',
'b',
]);
});
});

describe('loadConfiguredMcpServerNames', () => {
let testDir: string;

beforeEach(async () => {
testDir = join(tmpdir(), `mcp-names-${Date.now()}-${Math.random().toString(36).slice(2)}`);
await mkdir(testDir, { recursive: true });
});

afterEach(async () => {
await rm(testDir, { recursive: true, force: true });
});

it('returns empty set when nodeMcpPath is undefined', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
const names = await loadConfiguredMcpServerNames(undefined, testDir);
expect(names.size).toBe(0);
});

it('returns server names for a valid JSON config (relative path)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
await writeFile(
join(testDir, 'mcp.json'),
JSON.stringify({ foo: { command: 'x' }, bar: { command: 'y' } })
);
const names = await loadConfiguredMcpServerNames('mcp.json', testDir);
expect([...names].sort()).toEqual(['bar', 'foo']);
});

it('returns server names for an absolute path', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
const absolutePath = join(testDir, 'abs.json');
await writeFile(absolutePath, JSON.stringify({ baz: {} }));
const names = await loadConfiguredMcpServerNames(absolutePath, '/nonexistent/cwd');
expect([...names]).toEqual(['baz']);
});

it('returns empty set when file is missing (no crash)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
const names = await loadConfiguredMcpServerNames('missing.json', testDir);
expect(names.size).toBe(0);
});

it('returns empty set for invalid JSON (provider surfaces its own error)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
await writeFile(join(testDir, 'broken.json'), '{ not-json');
const names = await loadConfiguredMcpServerNames('broken.json', testDir);
expect(names.size).toBe(0);
});

it('returns empty set when JSON is an array (not an object of servers)', async () => {
const { loadConfiguredMcpServerNames } = await import('./dag-executor');
await writeFile(join(testDir, 'arr.json'), '["foo","bar"]');
const names = await loadConfiguredMcpServerNames('arr.json', testDir);
expect(names.size).toBe(0);
});
});
104 changes: 97 additions & 7 deletions packages/workflows/src/dag-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
* Independent nodes within the same layer run concurrently via Promise.allSettled.
* Captures all assistant output regardless of streaming mode for $node_id.output substitution.
*/
import { readFile } from 'fs/promises';
import { isAbsolute, resolve as resolvePath } from 'path';
import { execFileAsync } from '@archon/git';
import { discoverScriptsForCwd } from './script-discovery';
import type {
Expand Down Expand Up @@ -77,6 +79,54 @@ function getLog(): ReturnType<typeof createLogger> {
return cachedLog;
}

const MCP_FAILURE_PREFIX = 'MCP server connection failed: ';

/**
* Parse the SDK's "MCP server connection failed: a (status), b (status)"
* message into the list of failed server names (ordered, deduped). Best-effort
* — malformed or prefix-free messages return an empty array.
*/
export function parseMcpFailureServerNames(message: string): string[] {
if (!message.startsWith(MCP_FAILURE_PREFIX)) return [];
const seen = new Set<string>();
const names: string[] = [];
for (const entry of message.slice(MCP_FAILURE_PREFIX.length).split(', ')) {
const name = entry.split(' (')[0]?.trim();
if (name && !seen.has(name)) {
seen.add(name);
names.push(name);
}
}
return names;
}

/**
* Load the set of MCP server names that a node's `mcp:` config file declares.
*
* Returns an empty set when no `mcp:` is configured or when the file can't be
* read/parsed. Used to distinguish workflow-configured failures (surface to
* user) from user-plugin failures (silent debug log). We intentionally do not
* validate or env-expand here — the provider owns full loading and will
* surface its own parse errors via the warning channel if the file is broken.
*/
export async function loadConfiguredMcpServerNames(
nodeMcpPath: string | undefined,
cwd: string
): Promise<Set<string>> {
if (!nodeMcpPath) return new Set();
const fullPath = isAbsolute(nodeMcpPath) ? nodeMcpPath : resolvePath(cwd, nodeMcpPath);
try {
const raw = await readFile(fullPath, 'utf-8');
const parsed: unknown = JSON.parse(raw);
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
return new Set();
}
return new Set(Object.keys(parsed as Record<string, unknown>));
} catch {
return new Set();
}
}

/** Workflow-level Claude SDK options — per-node overrides take precedence via ?? */
interface WorkflowLevelOptions {
effort?: EffortLevel;
Expand Down Expand Up @@ -488,6 +538,11 @@ async function executeNodeInternal(
const nodeStartTime = Date.now();
const nodeContext: SendMessageContext = { workflowId: workflowRun.id, nodeName: node.id };

// Pre-compute the workflow-configured MCP server names. Used by the system
// message handler below to distinguish workflow-owned MCP failures (surface
// to the user) from user-plugin noise (debug log, suppress).
const configuredMcpNames = await loadConfiguredMcpServerNames(node.mcp, cwd);

getLog().info({ nodeId: node.id, provider }, 'dag_node_started');
await logNodeStart(logDir, workflowRun.id, node.id, node.command ?? '<inline>');

Expand Down Expand Up @@ -815,13 +870,48 @@ async function executeNodeInternal(
}
break; // Result is the "I'm done" signal — don't wait for subprocess to exit
} else if (msg.type === 'system' && msg.content) {
// Forward provider warnings (⚠️) and MCP connection failures to the user.
// Providers yield system chunks for user-actionable issues (missing env vars,
// Haiku+MCP, structured output failures, etc.)
if (
msg.content.startsWith('MCP server connection failed:') ||
msg.content.startsWith('⚠️')
) {
// Providers yield system chunks for user-actionable issues (missing env
// vars, Haiku+MCP, structured output failures, etc.). Two sub-cases:
//
// 1. MCP connection failures — only surface servers the *workflow*
// configured via `mcp:`. User-level plugin MCPs (e.g. `telegram`
// inherited from `~/.claude/`) often fail to connect in the
// headless subprocess and produced spurious warnings before this
// filter landed. Plugin failures are debug-logged, not shown.
//
// 2. Other provider warnings (⚠️) — always surface; these are things
// the workflow author can act on.
if (msg.content.startsWith(MCP_FAILURE_PREFIX)) {
const failedNames = parseMcpFailureServerNames(msg.content);
const workflowFailures = failedNames.filter(n => configuredMcpNames.has(n));
const pluginFailures = failedNames.filter(n => !configuredMcpNames.has(n));

if (workflowFailures.length > 0) {
const filteredMsg = `${MCP_FAILURE_PREFIX}${workflowFailures.join(', ')}`;
getLog().warn(
{ nodeId: node.id, systemContent: filteredMsg },
'dag.provider_warning_forwarded'
);
const delivered = await safeSendMessage(
platform,
conversationId,
filteredMsg,
nodeContext
);
if (!delivered) {
getLog().error(
{ nodeId: node.id, workflowRunId: workflowRun.id },
'dag.provider_warning_delivery_failed'
);
}
}
if (pluginFailures.length > 0) {
getLog().debug(
{ nodeId: node.id, pluginFailures },
'dag.mcp_plugin_connection_suppressed'
);
}
} else if (msg.content.startsWith('⚠️')) {
getLog().warn(
{ nodeId: node.id, systemContent: msg.content },
'dag.provider_warning_forwarded'
Expand Down
Loading