From f2a7e78c5efadaf8d32bed09a1a1ed0e119ae29e Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Thu, 9 Apr 2026 15:03:31 -0700 Subject: [PATCH 01/17] fix(builders): improve hook graph detection in transformed bundles --- .changeset/fix-hook-graph-extractor.md | 8 ++ .../builders/src/workflows-extractor.test.ts | 77 ++++++++++++++++++ packages/builders/src/workflows-extractor.ts | 78 +++++++++++++++++-- 3 files changed, 157 insertions(+), 6 deletions(-) create mode 100644 .changeset/fix-hook-graph-extractor.md create mode 100644 packages/builders/src/workflows-extractor.test.ts diff --git a/.changeset/fix-hook-graph-extractor.md b/.changeset/fix-hook-graph-extractor.md new file mode 100644 index 0000000000..fa8674b0e7 --- /dev/null +++ b/.changeset/fix-hook-graph-extractor.md @@ -0,0 +1,8 @@ +--- +"@workflow/builders": patch +--- + +Fix workflow graph hook detection for transformed bundles + +- Recognize step declarations with `@__PURE__` annotations in `WORKFLOW_USE_STEP` access +- Detect `createHook`/`createWebhook` calls wrapped by transpiled `using` helper calls diff --git a/packages/builders/src/workflows-extractor.test.ts b/packages/builders/src/workflows-extractor.test.ts new file mode 100644 index 0000000000..7a1e45911a --- /dev/null +++ b/packages/builders/src/workflows-extractor.test.ts @@ -0,0 +1,77 @@ +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, describe, expect, it } from 'vitest'; +import { extractWorkflowGraphs } from './workflows-extractor.js'; + +async function createWorkflowBundleFile( + workflowCode: string +): Promise<{ filePath: string; tempDir: string }> { + const tempDir = await mkdtemp(join(tmpdir(), 'workflow-extractor-')); + const filePath = join(tempDir, 'route.js'); + const escapedWorkflowCode = workflowCode.replace(/[\\`$]/g, '\\$&'); + const bundleCode = `const workflowCode = \`${escapedWorkflowCode}\`; +export const POST = workflowCode;`; + await writeFile(filePath, bundleCode, 'utf8'); + return { filePath, tempDir }; +} + +describe('workflows-extractor', () => { + const tempDirs: string[] = []; + + afterEach(async () => { + await Promise.all( + tempDirs + .splice(0) + .map((tempDir) => rm(tempDir, { recursive: true, force: true })) + ); + }); + + it('detects step declarations that include @__PURE__ annotations', async () => { + const workflowCode = ` +var stepWithPure = globalThis[/* @__PURE__ */ Symbol.for("WORKFLOW_USE_STEP")]("step//./workflows/demo//stepWithPure"); +async function demo() { + const value = await stepWithPure(); + return value; +} +demo.workflowId = "workflow//./workflows/demo//demo"; +globalThis.__private_workflows.set("workflow//./workflows/demo//demo", demo); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const demoGraph = graphs['./workflows/demo']?.demo?.graph; + const labels = (demoGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toContain('stepWithPure'); + }); + + it('detects createHook wrapped by transpiled using helpers', async () => { + const workflowCode = ` +var stepA = globalThis[/* @__PURE__ */ Symbol.for("WORKFLOW_USE_STEP")]("step//./workflows/hooks//stepA"); +function _ts_add_disposable_resource(_env, value, _isAsync) { + return value; +} +async function withHook() { + const env = { stack: [] }; + const responseId = await stepA(); + const hook = _ts_add_disposable_resource(env, createHook({ token: 'hook:' + responseId }), false); + const payload = await hook; + return payload; +} +withHook.workflowId = "workflow//./workflows/hooks//withHook"; +globalThis.__private_workflows.set("workflow//./workflows/hooks//withHook", withHook); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const hookGraph = graphs['./workflows/hooks']?.withHook?.graph; + const labels = (hookGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toEqual( + expect.arrayContaining(['stepA', 'createHook', 'awaitWebhook']) + ); + }); +}); diff --git a/packages/builders/src/workflows-extractor.ts b/packages/builders/src/workflows-extractor.ts index 4379ed1744..e058b7a439 100644 --- a/packages/builders/src/workflows-extractor.ts +++ b/packages/builders/src/workflows-extractor.ts @@ -24,6 +24,55 @@ import { parseSync } from '@swc/core'; * pauses or wait points in the workflow execution. */ const WORKFLOW_PRIMITIVES = new Set(['sleep', 'createHook', 'createWebhook']); +const HOOK_PRIMITIVES = new Set(['createHook', 'createWebhook']); + +function getIdentifierCallExpressionName( + callExpr: CallExpression +): string | null { + return callExpr.callee.type === 'Identifier' + ? (callExpr.callee as Identifier).value + : null; +} + +function findHookPrimitiveCallName(expr: Expression): string | null { + if (expr.type === 'ParenthesisExpression') { + return findHookPrimitiveCallName((expr as any).expression); + } + + if (expr.type === 'SequenceExpression') { + const sequence = expr as any; + for (const expression of sequence.expressions || []) { + const match = findHookPrimitiveCallName(expression as Expression); + if (match) { + return match; + } + } + return null; + } + + if (expr.type !== 'CallExpression') { + return null; + } + + const directCallName = getIdentifierCallExpressionName(expr); + if (directCallName && HOOK_PRIMITIVES.has(directCallName)) { + return directCallName; + } + + for (const arg of expr.arguments || []) { + if (!arg.expression) { + continue; + } + const nestedCallName = findHookPrimitiveCallName( + arg.expression as Expression + ); + if (nestedCallName) { + return nestedCallName; + } + } + + return null; +} /** * Extract the original function name from a stepId. @@ -298,7 +347,7 @@ function extractStepDeclarations( const stepDeclarations = new Map(); const stepPattern = - /var (\w+) = globalThis\[Symbol\.for\("WORKFLOW_USE_STEP"\)\]\("([^"]+)"\)/g; + /(?:var|let|const)\s+(\w+)\s*=\s*globalThis\[\s*(?:\/\*\s*@__PURE__\s*\*\/\s*)?Symbol\.for\("WORKFLOW_USE_STEP"\)\s*\]\("([^"]+)"\)/g; const lines = bundleCode.split('\n'); for (const line of lines) { @@ -743,12 +792,10 @@ function analyzeStatement( // Track webhook/hook variable assignments: const webhook = createWebhook() if ( decl.id.type === 'Identifier' && - decl.init.type === 'CallExpression' && - (decl.init as CallExpression).callee.type === 'Identifier' + decl.init.type === 'CallExpression' ) { - const funcName = ((decl.init as CallExpression).callee as Identifier) - .value; - if (funcName === 'createWebhook' || funcName === 'createHook') { + const funcName = findHookPrimitiveCallName(decl.init as Expression); + if (funcName && HOOK_PRIMITIVES.has(funcName)) { context.webhookVariables.add((decl.id as Identifier).value); } } @@ -1614,6 +1661,25 @@ function analyzeExpression( nodes.push(node); entryNodeIds.push(nodeId); exitNodeIds.push(nodeId); + } else if (WORKFLOW_PRIMITIVES.has(funcName)) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: funcName, + nodeKind: 'primitive', + }, + metadata: + Object.keys(metadata).length > 0 ? metadata : undefined, + }; + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); } } } From c4631a7d4d05f8d91a43b5068ce3300e52d3843f Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Thu, 9 Apr 2026 15:14:17 -0700 Subject: [PATCH 02/17] fix(builders): traverse try/finally in workflow graph extraction --- .../builders/src/workflows-extractor.test.ts | 15 +++-- packages/builders/src/workflows-extractor.ts | 61 +++++++++++++++++++ 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/packages/builders/src/workflows-extractor.test.ts b/packages/builders/src/workflows-extractor.test.ts index 7a1e45911a..7e38438fca 100644 --- a/packages/builders/src/workflows-extractor.test.ts +++ b/packages/builders/src/workflows-extractor.test.ts @@ -47,18 +47,23 @@ globalThis.__private_workflows.set("workflow//./workflows/demo//demo", demo); expect(labels).toContain('stepWithPure'); }); - it('detects createHook wrapped by transpiled using helpers', async () => { + it('detects createHook wrapped by transpiled using helpers inside try/finally', async () => { const workflowCode = ` var stepA = globalThis[/* @__PURE__ */ Symbol.for("WORKFLOW_USE_STEP")]("step//./workflows/hooks//stepA"); function _ts_add_disposable_resource(_env, value, _isAsync) { return value; } +function _ts_dispose_resources(_env) {} async function withHook() { const env = { stack: [] }; - const responseId = await stepA(); - const hook = _ts_add_disposable_resource(env, createHook({ token: 'hook:' + responseId }), false); - const payload = await hook; - return payload; + try { + const responseId = await stepA(); + const hook = _ts_add_disposable_resource(env, createHook({ token: 'hook:' + responseId }), false); + const payload = await hook; + return payload; + } finally { + _ts_dispose_resources(env); + } } withHook.workflowId = "workflow//./workflows/hooks//withHook"; globalThis.__private_workflows.set("workflow//./workflows/hooks//withHook", withHook); diff --git a/packages/builders/src/workflows-extractor.ts b/packages/builders/src/workflows-extractor.ts index e058b7a439..ac96034425 100644 --- a/packages/builders/src/workflows-extractor.ts +++ b/packages/builders/src/workflows-extractor.ts @@ -1167,6 +1167,67 @@ function analyzeStatement( exitNodeIds = blockResult.exitNodeIds; } + if (stmt.type === 'TryStatement') { + const tryStmt = stmt as any; + const tryResult = analyzeBlock( + tryStmt.block?.stmts || [], + stepDeclarations, + context, + functionMap, + variableMap + ); + nodes.push(...tryResult.nodes); + edges.push(...tryResult.edges); + entryNodeIds = tryResult.entryNodeIds; + exitNodeIds = tryResult.exitNodeIds; + + if (tryStmt.handler?.body?.stmts) { + const catchResult = analyzeBlock( + tryStmt.handler.body.stmts, + stepDeclarations, + context, + functionMap, + variableMap + ); + nodes.push(...catchResult.nodes); + edges.push(...catchResult.edges); + if (entryNodeIds.length === 0) { + entryNodeIds = catchResult.entryNodeIds; + } + exitNodeIds = Array.from( + new Set([...exitNodeIds, ...catchResult.exitNodeIds]) + ); + } + + if (tryStmt.finalizer?.stmts) { + const finalizerResult = analyzeBlock( + tryStmt.finalizer.stmts, + stepDeclarations, + context, + functionMap, + variableMap + ); + nodes.push(...finalizerResult.nodes); + edges.push(...finalizerResult.edges); + if (entryNodeIds.length === 0) { + entryNodeIds = finalizerResult.entryNodeIds; + } + if (finalizerResult.entryNodeIds.length > 0) { + for (const exitId of exitNodeIds) { + for (const entryId of finalizerResult.entryNodeIds) { + edges.push({ + id: `e_${exitId}_${entryId}`, + source: exitId, + target: entryId, + type: 'default', + }); + } + } + exitNodeIds = finalizerResult.exitNodeIds; + } + } + } + if (stmt.type === 'ReturnStatement' && (stmt as any).argument) { const result = analyzeExpression( (stmt as any).argument, From 342a31a6b28dc924b862953991cdfc0ae46e2712 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 11:02:43 -0700 Subject: [PATCH 03/17] fix(builders): detect hook.create patterns in workflow graphs --- .../builders/src/workflows-extractor.test.ts | 46 +++ packages/builders/src/workflows-extractor.ts | 297 ++++++++++++++---- 2 files changed, 289 insertions(+), 54 deletions(-) diff --git a/packages/builders/src/workflows-extractor.test.ts b/packages/builders/src/workflows-extractor.test.ts index 7e38438fca..44372e68e6 100644 --- a/packages/builders/src/workflows-extractor.test.ts +++ b/packages/builders/src/workflows-extractor.test.ts @@ -79,4 +79,50 @@ globalThis.__private_workflows.set("workflow//./workflows/hooks//withHook", with expect.arrayContaining(['stepA', 'createHook', 'awaitWebhook']) ); }); + + it('detects hook.create calls used directly inside Promise.race', async () => { + const workflowCode = ` +async function waitForAuthWorkflow() { + const session = await Promise.race([ + authCompleteHook.create({ token: 'auth:demo' }), + sleep('1h').then(() => null), + ]); + return session; +} +waitForAuthWorkflow.workflowId = "workflow//./workflows/hooks//waitForAuthWorkflow"; +globalThis.__private_workflows.set("workflow//./workflows/hooks//waitForAuthWorkflow", waitForAuthWorkflow); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const hookGraph = graphs['./workflows/hooks']?.waitForAuthWorkflow?.graph; + const labels = (hookGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toEqual(expect.arrayContaining(['createHook'])); + }); + + it('detects for-await hook consumption when hook is created via hook.create', async () => { + const workflowCode = ` +var processPayload = globalThis[/* @__PURE__ */ Symbol.for("WORKFLOW_USE_STEP")]("step//./workflows/hooks//processPayload"); +async function streamHookWorkflow() { + const hook = activeSubagentRunHook.create({ token: 'stream:demo' }); + for await (const payload of hook) { + await processPayload(payload); + } +} +streamHookWorkflow.workflowId = "workflow//./workflows/hooks//streamHookWorkflow"; +globalThis.__private_workflows.set("workflow//./workflows/hooks//streamHookWorkflow", streamHookWorkflow); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const hookGraph = graphs['./workflows/hooks']?.streamHookWorkflow?.graph; + const labels = (hookGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toEqual( + expect.arrayContaining(['processPayload', 'createHook', 'awaitWebhook']) + ); + }); }); diff --git a/packages/builders/src/workflows-extractor.ts b/packages/builders/src/workflows-extractor.ts index ac96034425..422508448e 100644 --- a/packages/builders/src/workflows-extractor.ts +++ b/packages/builders/src/workflows-extractor.ts @@ -34,6 +34,88 @@ function getIdentifierCallExpressionName( : null; } +function getMemberExpressionPropertyName( + memberExpr: MemberExpression +): string | null { + if (memberExpr.property.type === 'Identifier') { + return (memberExpr.property as Identifier).value; + } + + if (memberExpr.property.type === 'Computed') { + const computedExpr = (memberExpr.property as any).expression; + if (computedExpr?.type === 'StringLiteral') { + return (computedExpr as any).value; + } + if (computedExpr?.type === 'Identifier') { + return (computedExpr as Identifier).value; + } + } + + return null; +} + +function getExpressionTerminalName(expr: Expression): string | null { + if (expr.type === 'Identifier') { + return (expr as Identifier).value; + } + + if (expr.type === 'ParenthesisExpression') { + return getExpressionTerminalName((expr as any).expression); + } + + if (expr.type === 'MemberExpression') { + const member = expr as MemberExpression; + const propertyName = getMemberExpressionPropertyName(member); + if (propertyName) { + return propertyName; + } + return getExpressionTerminalName(member.object as Expression); + } + + return null; +} + +function getHookMemberCreatePrimitiveName( + callExpr: CallExpression +): string | null { + if (callExpr.callee.type !== 'MemberExpression') { + return null; + } + + const member = callExpr.callee as MemberExpression; + const methodName = getMemberExpressionPropertyName(member); + if (methodName !== 'create') { + return null; + } + + const objectName = getExpressionTerminalName(member.object as Expression); + if (!objectName) { + return null; + } + + // `defineHook(...).create(...)` calls are emitted as member calls in many + // bundles. Treat hook-like object names as createHook primitives. + if (!/hook/i.test(objectName)) { + return null; + } + + return 'createHook'; +} + +function getWorkflowPrimitiveCallName(callExpr: CallExpression): string | null { + const directCallName = getIdentifierCallExpressionName(callExpr); + if (directCallName && WORKFLOW_PRIMITIVES.has(directCallName)) { + return directCallName; + } + + const hookMemberCallName = getHookMemberCreatePrimitiveName(callExpr); + if (hookMemberCallName) { + return hookMemberCallName; + } + + return null; +} + function findHookPrimitiveCallName(expr: Expression): string | null { if (expr.type === 'ParenthesisExpression') { return findHookPrimitiveCallName((expr as any).expression); @@ -54,9 +136,9 @@ function findHookPrimitiveCallName(expr: Expression): string | null { return null; } - const directCallName = getIdentifierCallExpressionName(expr); - if (directCallName && HOOK_PRIMITIVES.has(directCallName)) { - return directCallName; + const hookPrimitiveCallName = getWorkflowPrimitiveCallName(expr); + if (hookPrimitiveCallName && HOOK_PRIMITIVES.has(hookPrimitiveCallName)) { + return hookPrimitiveCallName; } for (const arg of expr.arguments || []) { @@ -1086,66 +1168,97 @@ function analyzeStatement( const isAwait = (stmt as any).isAwait || (stmt as any).await; const body = (stmt as any).body; - if (body.type === 'BlockStatement') { - const loopResult = analyzeBlock( - body.stmts, - stepDeclarations, - context, - functionMap, - variableMap - ); - - for (const node of loopResult.nodes) { - if (!node.metadata) node.metadata = {}; - node.metadata.loopId = loopId; - node.metadata.loopIsAwait = isAwait; + // `for await (const payload of hook)` is a webhook/hook wait pattern. + // Represent the awaited iterator source as an awaitWebhook primitive. + const iterableExpr = (stmt as any).right as Expression | undefined; + const awaitHookEntryIds: string[] = []; + const awaitHookExitIds: string[] = []; + if (isAwait && iterableExpr?.type === 'Identifier') { + const iterableName = (iterableExpr as Identifier).value; + if (context.webhookVariables.has(iterableName)) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = { + loopId, + loopIsAwait: true, + }; + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: 'awaitWebhook', + nodeKind: 'primitive', + }, + metadata, + }; + nodes.push(node); + awaitHookEntryIds.push(nodeId); + awaitHookExitIds.push(nodeId); } + } - nodes.push(...loopResult.nodes); - edges.push(...loopResult.edges); - entryNodeIds = loopResult.entryNodeIds; - exitNodeIds = loopResult.exitNodeIds; + const loopResult = + body.type === 'BlockStatement' + ? analyzeBlock( + body.stmts, + stepDeclarations, + context, + functionMap, + variableMap + ) + : analyzeStatement( + body, + stepDeclarations, + context, + functionMap, + variableMap + ); - for (const exitId of loopResult.exitNodeIds) { - for (const entryId of loopResult.entryNodeIds) { - edges.push({ - id: `e_${exitId}_back_${entryId}`, - source: exitId, - target: entryId, - type: 'loop', - }); + for (const node of loopResult.nodes) { + if (!node.metadata) node.metadata = {}; + node.metadata.loopId = loopId; + node.metadata.loopIsAwait = isAwait; + } + + nodes.push(...loopResult.nodes); + edges.push(...loopResult.edges); + + if (awaitHookEntryIds.length > 0) { + entryNodeIds = awaitHookEntryIds; + if (loopResult.entryNodeIds.length > 0) { + for (const hookExitId of awaitHookExitIds) { + for (const loopEntryId of loopResult.entryNodeIds) { + edges.push({ + id: `e_${hookExitId}_${loopEntryId}`, + source: hookExitId, + target: loopEntryId, + type: 'default', + }); + } } } } else { - // Handle single-statement body (no braces) - const loopResult = analyzeStatement( - body, - stepDeclarations, - context, - functionMap, - variableMap - ); - - for (const node of loopResult.nodes) { - if (!node.metadata) node.metadata = {}; - node.metadata.loopId = loopId; - node.metadata.loopIsAwait = isAwait; - } - - nodes.push(...loopResult.nodes); - edges.push(...loopResult.edges); entryNodeIds = loopResult.entryNodeIds; + } + + if (loopResult.exitNodeIds.length > 0) { exitNodeIds = loopResult.exitNodeIds; + } else if (awaitHookExitIds.length > 0) { + exitNodeIds = awaitHookExitIds; + } else { + exitNodeIds = []; + } - for (const exitId of loopResult.exitNodeIds) { - for (const entryId of loopResult.entryNodeIds) { - edges.push({ - id: `e_${exitId}_back_${entryId}`, - source: exitId, - target: entryId, - type: 'loop', - }); - } + for (const exitId of exitNodeIds) { + for (const entryId of entryNodeIds) { + edges.push({ + id: `e_${exitId}_back_${entryId}`, + source: exitId, + target: entryId, + type: 'loop', + }); } } @@ -1487,6 +1600,33 @@ function analyzeExpression( entryNodeIds.push(...transitiveResult.entryNodeIds); exitNodeIds.push(...transitiveResult.exitNodeIds); } + } else { + const primitiveName = getWorkflowPrimitiveCallName(callExpr); + if (primitiveName) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + + if (context.inLoop) { + metadata.loopId = context.inLoop; + } + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: primitiveName, + nodeKind: 'primitive', + }, + metadata: Object.keys(metadata).length > 0 ? metadata : undefined, + }; + + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); + } } // Also analyze the arguments of awaited calls for step references in objects @@ -1608,6 +1748,33 @@ function analyzeExpression( entryNodeIds.push(...transitiveResult.entryNodeIds); exitNodeIds.push(...transitiveResult.exitNodeIds); } + } else { + const primitiveName = getWorkflowPrimitiveCallName(callExpr); + if (primitiveName) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + + if (context.inLoop) { + metadata.loopId = context.inLoop; + } + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: primitiveName, + nodeKind: 'primitive', + }, + metadata: Object.keys(metadata).length > 0 ? metadata : undefined, + }; + + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); + } } } @@ -1742,6 +1909,28 @@ function analyzeExpression( entryNodeIds.push(nodeId); exitNodeIds.push(nodeId); } + } else { + const primitiveName = getWorkflowPrimitiveCallName(argCallExpr); + if (primitiveName) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: primitiveName, + nodeKind: 'primitive', + }, + metadata: + Object.keys(metadata).length > 0 ? metadata : undefined, + }; + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); + } } } if (arg.expression.type === 'ObjectExpression') { From 6c4ce7b0802c123d4bae4902506c494df01fcbaa Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 11:19:05 -0700 Subject: [PATCH 04/17] fix(next): wait for deferred discovery rebuild before first use --- packages/next/src/loader.ts | 74 +++++++++++++++++++++++++++++- packages/next/src/socket-server.ts | 4 ++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/packages/next/src/loader.ts b/packages/next/src/loader.ts index e04a2b219e..9c8e4b3223 100644 --- a/packages/next/src/loader.ts +++ b/packages/next/src/loader.ts @@ -50,11 +50,13 @@ let socketClientKey: string | null = null; type SocketCredentials = { port: number; authToken: string; + isDevServer: boolean; }; const ROUTE_STUB_FILE_MARKER = 'WORKFLOW_ROUTE_STUB_FILE'; const ROUTE_STUB_BUILD_WAIT_TIMEOUT_MS = 120_000; let pendingDeferredRouteStubBuildPromise: Promise | null = null; +let pendingDeferredDiscoveryBuildPromise: Promise | null = null; function registerFileDependency( loaderContext: WorkflowLoaderContext, @@ -213,7 +215,8 @@ function getSocketCredentialsFromEnv(): SocketCredentials | null { if (Number.isNaN(port)) { return null; } - return { port, authToken }; + const isDevServer = process.env.WORKFLOW_SOCKET_IS_DEV_SERVER === '1'; + return { port, authToken, isDevServer }; } async function getSocketCredentialsFromFile(): Promise { @@ -230,6 +233,7 @@ async function getSocketCredentialsFromFile(): Promise const parsed = JSON.parse(raw) as { port?: unknown; authToken?: unknown; + isDevServer?: unknown; }; const authToken = typeof parsed.authToken === 'string' ? parsed.authToken : null; @@ -241,9 +245,14 @@ async function getSocketCredentialsFromFile(): Promise if (!authToken || Number.isNaN(numericPort)) { return null; } + const isDevServer = + parsed.isDevServer === true || + parsed.isDevServer === '1' || + parsed.isDevServer === 1; return { port: numericPort, authToken, + isDevServer, }; } catch { return null; @@ -491,6 +500,40 @@ async function ensureDeferredRouteStubBuildAndWait(): Promise { return pendingDeferredRouteStubBuildPromise; } +async function triggerDeferredDiscoveryBuildAndWait(): Promise { + const socketCredentials = await getSocketCredentials(); + if (!socketCredentials) { + return; + } + // Trigger-build notifications only drive deferred rebuilds in dev/watch mode. + if (!socketCredentials.isDevServer) { + return; + } + const socket = await createSocketConnection(socketCredentials); + try { + await writeSocketMessage( + socket, + serializeMessage({ type: 'trigger-build' }, socketCredentials.authToken) + ); + await waitForDeferredBuildComplete(socket, socketCredentials.authToken); + } finally { + socket.destroy(); + } +} + +async function ensureDeferredDiscoveryBuildAndWait(): Promise { + if (pendingDeferredDiscoveryBuildPromise) { + return pendingDeferredDiscoveryBuildPromise; + } + const pendingPromise = triggerDeferredDiscoveryBuildAndWait(); + pendingDeferredDiscoveryBuildPromise = pendingPromise.finally(() => { + if (pendingDeferredDiscoveryBuildPromise === pendingPromise) { + pendingDeferredDiscoveryBuildPromise = null; + } + }); + return pendingDeferredDiscoveryBuildPromise; +} + async function getBuildersModule(): Promise< typeof import('@workflow/builders') > { @@ -698,7 +741,7 @@ export default function workflowLoader( hasStep: patterns.hasUseStep, hasSerde, }; - const { shouldNotify } = updateDiscoveredPatternState( + const { shouldNotify, previousState } = updateDiscoveredPatternState( discoveryFilePath, nextPatternState ); @@ -709,6 +752,33 @@ export default function workflowLoader( nextPatternState.hasStep, nextPatternState.hasSerde ); + + // In lazy discovery mode, a newly discovered workflow/step/serde file + // can be used immediately after this transform completes. Trigger a + // deferred rebuild and wait so start()/resume() calls do not race + // route generation on first use. + const introducedWorkflowPattern = + nextPatternState.hasWorkflow && !previousState?.hasWorkflow; + const introducedStepPattern = + nextPatternState.hasStep && !previousState?.hasStep; + const introducedSerdePattern = + nextPatternState.hasSerde && !previousState?.hasSerde; + if ( + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1' && + !isDeferredStepCopyFile && + (introducedWorkflowPattern || + introducedStepPattern || + introducedSerdePattern) + ) { + try { + await ensureDeferredDiscoveryBuildAndWait(); + } catch (error) { + console.warn( + `[workflow] Failed waiting for deferred discovery build for ${discoveryFilePath}`, + error + ); + } + } } } diff --git a/packages/next/src/socket-server.ts b/packages/next/src/socket-server.ts index c79acf12dd..d97409ea3a 100644 --- a/packages/next/src/socket-server.ts +++ b/packages/next/src/socket-server.ts @@ -193,6 +193,7 @@ export async function createSocketServer( { port: address.port, authToken, + isDevServer: config.isDevServer, }, null, 2 @@ -201,6 +202,9 @@ export async function createSocketServer( process.env.WORKFLOW_SOCKET_INFO_PATH = socketInfoFilePath; process.env.WORKFLOW_SOCKET_PORT = String(address.port); process.env.WORKFLOW_SOCKET_AUTH = authToken; + process.env.WORKFLOW_SOCKET_IS_DEV_SERVER = config.isDevServer + ? '1' + : '0'; resolve(); } catch (error) { reject(error); From b16ee1dc04b22d7e466cdb3f5d0c0e7788dd9313 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 11:43:17 -0700 Subject: [PATCH 05/17] fix(world-local): use fast warmup retries for transient queue failures --- packages/world-local/src/queue.test.ts | 50 ++++++++++++++++++++++++++ packages/world-local/src/queue.ts | 29 ++++++++++++--- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index 8f0b4f4491..e0c59c2015 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -166,4 +166,54 @@ describe('queue timeout re-enqueue', () => { // setTimeout should NOT have been called for timeoutSeconds: 0 expect(mockSetTimeout).not.toHaveBeenCalled(); }); + + it('queue uses warmup retry delays for transient HTTP failures', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + localQueue.registerHandler('__wkf_step_', async () => { + callCount++; + if (callCount < 4) { + return new Response('warming up', { status: 500 }); + } + return Response.json({ ok: true }); + }); + + await localQueue.queue('__wkf_step_test' as any, stepPayload); + + await vi.waitFor(() => { + expect(callCount).toBe(4); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + expect(mockSetTimeout).toHaveBeenNthCalledWith(3, 1000); + }); + + it('queue falls back to 5s retry delays after warmup retries', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + localQueue.registerHandler('__wkf_step_', async () => { + callCount++; + if (callCount < 6) { + return new Response('still failing', { status: 500 }); + } + return Response.json({ ok: true }); + }); + + await localQueue.queue('__wkf_step_test' as any, stepPayload); + + await vi.waitFor(() => { + expect(callCount).toBe(6); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + expect(mockSetTimeout).toHaveBeenNthCalledWith(3, 1000); + expect(mockSetTimeout).toHaveBeenNthCalledWith(4, 5000); + expect(mockSetTimeout).toHaveBeenNthCalledWith(5, 5000); + }); }); diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 5a7bf6d7f7..1eec21ee42 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -47,6 +47,25 @@ const LOCAL_QUEUE_MAX_VISIBILITY = // persistent state and return another timeoutSeconds if needed. const MAX_SAFE_TIMEOUT_MS = 2147483647; +const WARMUP_RETRY_DELAYS_MS = [250, 500, 1000] as const; + +function getLocalRetryDelayMs(params: { + attempt: number; + status: number; +}): number { + const { attempt, status } = params; + const isTransientHttpFailure = status === 404 || status >= 500; + + // In local/dev mode, Next route handlers may fail briefly while compiling + // (especially with lazy discovery). Prefer fast warmup retries first. + if (isTransientHttpFailure && attempt <= WARMUP_RETRY_DELAYS_MS.length) { + return WARMUP_RETRY_DELAYS_MS[attempt - 1]; + } + + // Steady-state backoff to approximate VQS retry cadence. + return 5000; +} + // The local workers share the same Node.js process and event loop, // so we need to limit concurrency to avoid overwhelming the system. const DEFAULT_CONCURRENCY_LIMIT = 1000; @@ -214,11 +233,11 @@ export function createQueue(config: Partial): LocalQueue { } ); - // 5s linear backoff to approximate VQS retry timing in local dev. - // VQS uses 5s linear for attempts 1–32, then exponential, but for - // local dev linear 5s is sufficient — the handler enforces the real - // cap at MAX_QUEUE_DELIVERIES (48) which keeps total time under ~4min. - await setTimeout(5000); + const delayMs = getLocalRetryDelayMs({ + attempt: attempt + 1, + status: response.status, + }); + await setTimeout(delayMs); } console.error( From 9107bf65586cf08f0a68900e61781ec3f70704a4 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 12:01:26 -0700 Subject: [PATCH 06/17] fix(core): retry lazy hook lookup without blocking deferred builds --- packages/core/src/runtime/resume-hook.test.ts | 82 +++++++++++++++++++ packages/core/src/runtime/resume-hook.ts | 52 +++++++++++- packages/next/src/loader.ts | 74 +---------------- packages/next/src/socket-server.ts | 4 - 4 files changed, 133 insertions(+), 79 deletions(-) create mode 100644 packages/core/src/runtime/resume-hook.test.ts diff --git a/packages/core/src/runtime/resume-hook.test.ts b/packages/core/src/runtime/resume-hook.test.ts new file mode 100644 index 0000000000..de774b6a55 --- /dev/null +++ b/packages/core/src/runtime/resume-hook.test.ts @@ -0,0 +1,82 @@ +import { HookNotFoundError } from '@workflow/errors'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { getHookByToken } from './resume-hook.js'; +import { getWorld } from './world.js'; + +vi.mock('node:timers/promises', () => ({ + setTimeout: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('./world.js', () => ({ + getWorld: vi.fn(), +})); + +describe('getHookByToken lazy discovery retries', () => { + const originalLazyDiscovery = process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; + + const baseHook = { + runId: 'wrun_test', + hookId: 'hook_test', + token: 'token_test', + ownerId: 'owner_test', + projectId: 'project_test', + environment: 'development', + createdAt: new Date('2026-01-01T00:00:00.000Z'), + specVersion: 3, + }; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + if (originalLazyDiscovery === undefined) { + delete process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; + } else { + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = originalLazyDiscovery; + } + }); + + it('retries HookNotFoundError while lazy discovery is enabled', async () => { + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = '1'; + + const getByToken = vi + .fn() + .mockRejectedValueOnce(new HookNotFoundError('token_test')) + .mockRejectedValueOnce(new HookNotFoundError('token_test')) + .mockResolvedValue(baseHook); + + vi.mocked(getWorld).mockResolvedValue({ + hooks: { getByToken }, + runs: { + get: vi.fn().mockResolvedValue({ runId: 'wrun_test' }), + }, + getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), + } as any); + + const hook = await getHookByToken('token_test'); + expect(hook.token).toBe('token_test'); + expect(getByToken).toHaveBeenCalledTimes(3); + }); + + it('does not retry HookNotFoundError when lazy discovery is disabled', async () => { + delete process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; + + const getByToken = vi + .fn() + .mockRejectedValue(new HookNotFoundError('token_test')); + + vi.mocked(getWorld).mockResolvedValue({ + hooks: { getByToken }, + runs: { + get: vi.fn().mockResolvedValue({ runId: 'wrun_test' }), + }, + getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), + } as any); + + await expect(getHookByToken('token_test')).rejects.toThrow( + HookNotFoundError + ); + expect(getByToken).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index eed7e62450..e4d870fc8e 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -1,3 +1,4 @@ +import { setTimeout as delay } from 'node:timers/promises'; import { waitUntil } from '@vercel/functions'; import { ERROR_SLUGS, @@ -26,6 +27,21 @@ import { waitedUntil } from '../util.js'; import { getWorkflowQueueName } from './helpers.js'; import { getWorld } from './world.js'; +const LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS = [ + 50, 100, 200, 400, 800, 1200, 1800, 2500, +] as const; + +function shouldRetryMissingHookLookup(error: unknown): boolean { + if (!HookNotFoundError.is(error)) { + return false; + } + + // In lazy discovery mode a hook token can be resumed before the first + // deferred build has finished generating/activating the workflow route. + // Retrying for a short window avoids spurious HookNotFound races. + return process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1'; +} + /** * Internal helper that returns the hook, the associated workflow run, * and the resolved encryption key. @@ -50,6 +66,36 @@ async function getHookByTokenWithKey(token: string): Promise<{ return { hook, run, encryptionKey }; } +async function getHookByTokenWithKeyWithRetry(token: string): Promise<{ + hook: Hook; + run: WorkflowRun; + encryptionKey: CryptoKey | undefined; +}> { + let lastError: unknown; + for ( + let attempt = 0; + attempt <= LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS.length; + attempt++ + ) { + try { + return await getHookByTokenWithKey(token); + } catch (error) { + if (!shouldRetryMissingHookLookup(error)) { + throw error; + } + + lastError = error; + if (attempt === LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS.length) { + break; + } + + await delay(LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS[attempt]); + } + } + + throw lastError; +} + /** * Get the hook by token to find the associated workflow run, * and hydrate the `metadata` property if it was set from within @@ -58,7 +104,7 @@ async function getHookByTokenWithKey(token: string): Promise<{ * @param token - The unique token identifying the hook */ export async function getHookByToken(token: string): Promise { - const { hook } = await getHookByTokenWithKey(token); + const { hook } = await getHookByTokenWithKeyWithRetry(token); return hook; } @@ -105,7 +151,7 @@ export async function resumeHook( let workflowRun: WorkflowRun; let encryptionKey: CryptoKey | undefined; if (typeof tokenOrHook === 'string') { - const result = await getHookByTokenWithKey(tokenOrHook); + const result = await getHookByTokenWithKeyWithRetry(tokenOrHook); hook = result.hook; workflowRun = result.run; encryptionKey = encryptionKeyOverride ?? result.encryptionKey; @@ -252,7 +298,7 @@ export async function resumeWebhook( token: string, request: Request ): Promise { - const { hook, encryptionKey } = await getHookByTokenWithKey(token); + const { hook, encryptionKey } = await getHookByTokenWithKeyWithRetry(token); // Only webhooks can be resumed via the public endpoint. // If the hook was created via createHook() (isWebhook !== true), diff --git a/packages/next/src/loader.ts b/packages/next/src/loader.ts index 9c8e4b3223..e04a2b219e 100644 --- a/packages/next/src/loader.ts +++ b/packages/next/src/loader.ts @@ -50,13 +50,11 @@ let socketClientKey: string | null = null; type SocketCredentials = { port: number; authToken: string; - isDevServer: boolean; }; const ROUTE_STUB_FILE_MARKER = 'WORKFLOW_ROUTE_STUB_FILE'; const ROUTE_STUB_BUILD_WAIT_TIMEOUT_MS = 120_000; let pendingDeferredRouteStubBuildPromise: Promise | null = null; -let pendingDeferredDiscoveryBuildPromise: Promise | null = null; function registerFileDependency( loaderContext: WorkflowLoaderContext, @@ -215,8 +213,7 @@ function getSocketCredentialsFromEnv(): SocketCredentials | null { if (Number.isNaN(port)) { return null; } - const isDevServer = process.env.WORKFLOW_SOCKET_IS_DEV_SERVER === '1'; - return { port, authToken, isDevServer }; + return { port, authToken }; } async function getSocketCredentialsFromFile(): Promise { @@ -233,7 +230,6 @@ async function getSocketCredentialsFromFile(): Promise const parsed = JSON.parse(raw) as { port?: unknown; authToken?: unknown; - isDevServer?: unknown; }; const authToken = typeof parsed.authToken === 'string' ? parsed.authToken : null; @@ -245,14 +241,9 @@ async function getSocketCredentialsFromFile(): Promise if (!authToken || Number.isNaN(numericPort)) { return null; } - const isDevServer = - parsed.isDevServer === true || - parsed.isDevServer === '1' || - parsed.isDevServer === 1; return { port: numericPort, authToken, - isDevServer, }; } catch { return null; @@ -500,40 +491,6 @@ async function ensureDeferredRouteStubBuildAndWait(): Promise { return pendingDeferredRouteStubBuildPromise; } -async function triggerDeferredDiscoveryBuildAndWait(): Promise { - const socketCredentials = await getSocketCredentials(); - if (!socketCredentials) { - return; - } - // Trigger-build notifications only drive deferred rebuilds in dev/watch mode. - if (!socketCredentials.isDevServer) { - return; - } - const socket = await createSocketConnection(socketCredentials); - try { - await writeSocketMessage( - socket, - serializeMessage({ type: 'trigger-build' }, socketCredentials.authToken) - ); - await waitForDeferredBuildComplete(socket, socketCredentials.authToken); - } finally { - socket.destroy(); - } -} - -async function ensureDeferredDiscoveryBuildAndWait(): Promise { - if (pendingDeferredDiscoveryBuildPromise) { - return pendingDeferredDiscoveryBuildPromise; - } - const pendingPromise = triggerDeferredDiscoveryBuildAndWait(); - pendingDeferredDiscoveryBuildPromise = pendingPromise.finally(() => { - if (pendingDeferredDiscoveryBuildPromise === pendingPromise) { - pendingDeferredDiscoveryBuildPromise = null; - } - }); - return pendingDeferredDiscoveryBuildPromise; -} - async function getBuildersModule(): Promise< typeof import('@workflow/builders') > { @@ -741,7 +698,7 @@ export default function workflowLoader( hasStep: patterns.hasUseStep, hasSerde, }; - const { shouldNotify, previousState } = updateDiscoveredPatternState( + const { shouldNotify } = updateDiscoveredPatternState( discoveryFilePath, nextPatternState ); @@ -752,33 +709,6 @@ export default function workflowLoader( nextPatternState.hasStep, nextPatternState.hasSerde ); - - // In lazy discovery mode, a newly discovered workflow/step/serde file - // can be used immediately after this transform completes. Trigger a - // deferred rebuild and wait so start()/resume() calls do not race - // route generation on first use. - const introducedWorkflowPattern = - nextPatternState.hasWorkflow && !previousState?.hasWorkflow; - const introducedStepPattern = - nextPatternState.hasStep && !previousState?.hasStep; - const introducedSerdePattern = - nextPatternState.hasSerde && !previousState?.hasSerde; - if ( - process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1' && - !isDeferredStepCopyFile && - (introducedWorkflowPattern || - introducedStepPattern || - introducedSerdePattern) - ) { - try { - await ensureDeferredDiscoveryBuildAndWait(); - } catch (error) { - console.warn( - `[workflow] Failed waiting for deferred discovery build for ${discoveryFilePath}`, - error - ); - } - } } } diff --git a/packages/next/src/socket-server.ts b/packages/next/src/socket-server.ts index d97409ea3a..c79acf12dd 100644 --- a/packages/next/src/socket-server.ts +++ b/packages/next/src/socket-server.ts @@ -193,7 +193,6 @@ export async function createSocketServer( { port: address.port, authToken, - isDevServer: config.isDevServer, }, null, 2 @@ -202,9 +201,6 @@ export async function createSocketServer( process.env.WORKFLOW_SOCKET_INFO_PATH = socketInfoFilePath; process.env.WORKFLOW_SOCKET_PORT = String(address.port); process.env.WORKFLOW_SOCKET_AUTH = authToken; - process.env.WORKFLOW_SOCKET_IS_DEV_SERVER = config.isDevServer - ? '1' - : '0'; resolve(); } catch (error) { reject(error); From 6f4bcb7b9fbf94cd6b4f9df415683d9ccddc2d9e Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 12:31:15 -0700 Subject: [PATCH 07/17] fix(next): always acknowledge trigger-build in deferred mode --- packages/next/src/builder-deferred.ts | 34 ++++++++++++++++++++------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index cb918cf0c2..1d63e330fb 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -115,6 +115,7 @@ export async function getNextBuilderDeferred() { private cacheInitialized = false; private cacheWriteTimer: NodeJS.Timeout | null = null; private deferredRebuildTimer: NodeJS.Timeout | null = null; + private pendingTriggerBuildAck = false; private lastDeferredBuildSignature: string | null = null; // Lazily initialized resolvers for bare specifier rewriting. // Cached to avoid re-creating on every import rewrite. @@ -767,7 +768,7 @@ export async function getNextBuilderDeferred() { } }, onTriggerBuild: () => { - this.scheduleDeferredRebuild(); + this.scheduleDeferredRebuild({ acknowledgeTriggerBuild: true }); }, }; @@ -1046,23 +1047,40 @@ export async function getNextBuilderDeferred() { }, 50); } - private scheduleDeferredRebuild(): void { + private scheduleDeferredRebuild(options?: { + acknowledgeTriggerBuild?: boolean; + }): void { if (!this.config.watch) { return; } + if (options?.acknowledgeTriggerBuild) { + this.pendingTriggerBuildAck = true; + } + if (this.deferredRebuildTimer) { clearTimeout(this.deferredRebuildTimer); } this.deferredRebuildTimer = setTimeout(() => { this.deferredRebuildTimer = null; - void this.onBeforeDeferredEntries().catch((error) => { - console.warn( - '[workflow] Deferred rebuild after source update failed.', - error - ); - }); + const shouldAckTriggerBuild = this.pendingTriggerBuildAck; + this.pendingTriggerBuildAck = false; + + void this.onBeforeDeferredEntries() + .catch((error) => { + console.warn( + '[workflow] Deferred rebuild after source update failed.', + error + ); + }) + .finally(() => { + // A trigger-build waiter must always get a completion signal, + // even when the rebuild is a no-op (signature unchanged). + if (shouldAckTriggerBuild) { + this.socketIO?.emit('build-complete'); + } + }); }, 75); } From 110c5389bc591de2b0962476bc4a7153c66c7e8f Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 12:46:23 -0700 Subject: [PATCH 08/17] fix(world-local): retry stalled lazy-discovery queue deliveries --- packages/world-local/src/queue.test.ts | 51 +++++++++++++++ packages/world-local/src/queue.ts | 86 +++++++++++++++++++------- 2 files changed, 115 insertions(+), 22 deletions(-) diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index e0c59c2015..fc2a09a417 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -47,6 +47,8 @@ describe('queue timeout re-enqueue', () => { }); afterEach(async () => { + delete process.env.WORKFLOW_LOCAL_QUEUE_WORKFLOW_ATTEMPT_TIMEOUT_MS; + vi.unstubAllGlobals(); await localQueue.close(); }); @@ -216,4 +218,53 @@ describe('queue timeout re-enqueue', () => { expect(mockSetTimeout).toHaveBeenNthCalledWith(4, 5000); expect(mockSetTimeout).toHaveBeenNthCalledWith(5, 5000); }); + + it('queue retries when direct handler throws transport errors', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + localQueue.registerHandler('__wkf_step_', async () => { + callCount++; + if (callCount < 3) { + throw new Error('handler crashed'); + } + return Response.json({ ok: true }); + }); + + await localQueue.queue('__wkf_step_test' as any, stepPayload); + + await vi.waitFor(() => { + expect(callCount).toBe(3); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + }); + + it('queue retries workflow fetch transport failures', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + const fetchMock = vi.fn(async () => { + callCount++; + if (callCount < 3) { + throw new Error('connect ECONNREFUSED 127.0.0.1:3000'); + } + return Response.json({ ok: true }); + }); + vi.stubGlobal('fetch', fetchMock); + + await localQueue.queue('__wkf_workflow_test' as any, { + runId: 'run_01ABC', + }); + + await vi.waitFor(() => { + expect(callCount).toBe(3); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + }); }); diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 1eec21ee42..118a2d2900 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -49,6 +49,19 @@ const MAX_SAFE_TIMEOUT_MS = 2147483647; const WARMUP_RETRY_DELAYS_MS = [250, 500, 1000] as const; +const DEFAULT_WORKFLOW_ATTEMPT_TIMEOUT_MS = 180_000; +function getWorkflowAttemptTimeoutMs(): number { + const raw = process.env.WORKFLOW_LOCAL_QUEUE_WORKFLOW_ATTEMPT_TIMEOUT_MS; + if (!raw) { + return DEFAULT_WORKFLOW_ATTEMPT_TIMEOUT_MS; + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_WORKFLOW_ATTEMPT_TIMEOUT_MS; + } + return parsed; +} + function getLocalRetryDelayMs(params: { attempt: number; status: number; @@ -174,33 +187,62 @@ export function createQueue(config: Partial): LocalQueue { }; const directHandler = directHandlers.get(prefix); let response: Response; + let text = ''; + + try { + if (directHandler) { + const req = new Request( + `http://localhost/.well-known/workflow/v1/${pathname}`, + { + method: 'POST', + headers, + body, + } + ); + response = await directHandler(req); + } else { + const baseUrl = await resolveBaseUrl(config); + // Keep workflow route requests bounded so a stuck lazy-discovery + // build cannot block queue delivery forever. + const signal = + pathname === 'flow' + ? AbortSignal.timeout(getWorkflowAttemptTimeoutMs()) + : undefined; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit + response = await fetch( + `${baseUrl}/.well-known/workflow/v1/${pathname}`, + { + method: 'POST', + duplex: 'half', + dispatcher: httpAgent, + headers, + body, + signal, + } as any + ); + } - if (directHandler) { - const req = new Request( - `http://localhost/.well-known/workflow/v1/${pathname}`, + text = await response.text(); + } catch (error) { + console.error( + `[world-local] Queue message transport failed (attempt ${attempt + 1})`, { - method: 'POST', - headers, - body, + queueName, + messageId, + ...(runId && { runId }), + ...(stepId && { stepId }), + error: error instanceof Error ? error.message : String(error), } ); - response = await directHandler(req); - } else { - const baseUrl = await resolveBaseUrl(config); - // eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit - response = await fetch( - `${baseUrl}/.well-known/workflow/v1/${pathname}`, - { - method: 'POST', - duplex: 'half', - dispatcher: httpAgent, - headers, - body, - } as any - ); - } - const text = await response.text(); + const delayMs = getLocalRetryDelayMs({ + attempt: attempt + 1, + status: 503, + }); + await setTimeout(delayMs); + continue; + } if (response.ok) { try { From 91783a172b1aae4ce74bfb6cd08a3ada83abd562 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 13:06:33 -0700 Subject: [PATCH 09/17] fix(utils): detect workflow port with post-only health endpoints --- .../fix-workflow-port-probe-post-health.md | 8 ++++ packages/utils/src/get-port.test.ts | 41 +++++++++++++++++++ packages/utils/src/get-port.ts | 38 +++++++++++------ 3 files changed, 75 insertions(+), 12 deletions(-) create mode 100644 .changeset/fix-workflow-port-probe-post-health.md diff --git a/.changeset/fix-workflow-port-probe-post-health.md b/.changeset/fix-workflow-port-probe-post-health.md new file mode 100644 index 0000000000..12a1faf7f7 --- /dev/null +++ b/.changeset/fix-workflow-port-probe-post-health.md @@ -0,0 +1,8 @@ +--- +"@workflow/utils": patch +--- + +Fix local workflow port detection for POST-only health endpoints + +- Probe `/.well-known/workflow/v1/flow?__health` with `POST` when `HEAD` is not healthy +- Prevent lazy-discovery socket ports from being selected as workflow HTTP base URL diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index fd44cd1656..7f681f033e 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -273,6 +273,47 @@ describe('getWorkflowPort', () => { expect(port).toBe(workflowAddr.port); }); + it('should detect workflow server when health endpoint is POST-only', async () => { + // Non-workflow server (returns 404 for all requests) + const nonWorkflowServer = http.createServer((req, res) => { + res.writeHead(404); + res.end(); + }); + + // Workflow server that returns 405 for HEAD and 200 for POST health checks + const workflowServer = http.createServer((req, res) => { + if (req.url?.includes('__health')) { + if (req.method === 'POST') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ healthy: true })); + return; + } + res.writeHead(405); + res.end(); + return; + } + + if (req.url?.startsWith('/.well-known/workflow/v1/')) { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Missing required headers' })); + return; + } + + res.writeHead(404); + res.end(); + }); + + servers.push(nonWorkflowServer, workflowServer); + + await new Promise((resolve) => nonWorkflowServer.listen(0, resolve)); + await new Promise((resolve) => workflowServer.listen(0, resolve)); + + const port = await getWorkflowPort(); + const workflowAddr = workflowServer.address() as AddressInfo; + + expect(port).toBe(workflowAddr.port); + }); + it('should fall back to first port when probing fails', async () => { // Two non-workflow servers (both return 404) const server1 = http.createServer((req, res) => { diff --git a/packages/utils/src/get-port.ts b/packages/utils/src/get-port.ts index b6806a35a4..be0a2e4738 100644 --- a/packages/utils/src/get-port.ts +++ b/packages/utils/src/get-port.ts @@ -241,28 +241,22 @@ export interface ProbeOptions { timeout?: number; } -/** - * Probes a port to check if it's serving the workflow HTTP server. - * Uses HEAD request to minimize overhead. - * - * @returns true if the port responds with a 200 status from the health check endpoint - */ -async function probePort( +async function probePortWithMethod( port: number, - options: ProbeOptions = {} + endpoint: string, + timeout: number, + method: 'HEAD' | 'POST' ): Promise { - const { endpoint = PROBE_ENDPOINT, timeout = PROBE_TIMEOUT_MS } = options; - const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), timeout); try { const response = await fetch(`http://localhost:${port}${endpoint}`, { - method: 'HEAD', + method, signal: controller.signal, }); - // The workflow health endpoint returns 200 for healthy + // The workflow health endpoint returns 200 for healthy. return response.status === 200; } catch { // Connection refused, timeout, or other error @@ -272,6 +266,26 @@ async function probePort( } } +/** + * Probes a port to check if it's serving the workflow HTTP server. + * + * It first tries HEAD (low overhead), then falls back to POST for runtimes + * that only expose health checks on POST (for example Next.js route handlers + * that only export POST in lazy-discovery mode). + * + * @returns true if the port responds with a 200 status from the health check endpoint + */ +async function probePort( + port: number, + options: ProbeOptions = {} +): Promise { + const { endpoint = PROBE_ENDPOINT, timeout = PROBE_TIMEOUT_MS } = options; + if (await probePortWithMethod(port, endpoint, timeout, 'HEAD')) { + return true; + } + return await probePortWithMethod(port, endpoint, timeout, 'POST'); +} + /** * Gets the workflow server port by probing all listening ports. * This is more reliable than getPort() when other services (like Node.js inspector) From dadfd44cdab588437ba11a58f1e77514872b35e9 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 13:25:12 -0700 Subject: [PATCH 10/17] fix(core): retry unregistered workflows during lazy discovery --- ...discovery-workflow-not-registered-retry.md | 8 + packages/core/src/runtime.ts | 31 ++ .../lazy-discovery-workflow-retry.test.ts | 268 ++++++++++++++++++ 3 files changed, 307 insertions(+) create mode 100644 .changeset/fix-lazy-discovery-workflow-not-registered-retry.md create mode 100644 packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts diff --git a/.changeset/fix-lazy-discovery-workflow-not-registered-retry.md b/.changeset/fix-lazy-discovery-workflow-not-registered-retry.md new file mode 100644 index 0000000000..88d5ad35b2 --- /dev/null +++ b/.changeset/fix-lazy-discovery-workflow-not-registered-retry.md @@ -0,0 +1,8 @@ +--- +"@workflow/core": patch +--- + +Retry lazy-discovery workflow registration misses before failing runs + +- Treat `WorkflowNotRegisteredError` as transient for a bounded retry window when `WORKFLOW_NEXT_LAZY_DISCOVERY=1` +- Re-enqueue workflow queue deliveries with backoff instead of immediately writing `run_failed` diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 1b5a6e1b3a..d8ef1240e4 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -2,6 +2,7 @@ import { EntityConflictError, RUN_ERROR_CODES, RunExpiredError, + WorkflowNotRegisteredError, WorkflowRuntimeError, } from '@workflow/errors'; import { parseWorkflowName } from '@workflow/utils/parse-name'; @@ -44,6 +45,10 @@ import { getErrorName, getErrorStack, normalizeUnknownError } from './types.js'; import { buildWorkflowSuspensionMessage } from './util.js'; import { runWorkflow } from './workflow.js'; +const LAZY_DISCOVERY_WORKFLOW_NOT_REGISTERED_RETRY_TIMEOUTS_SECONDS = [ + 1, 1, 2, 3, 5, 8, 13, 21, +] as const; + export type { Event, WorkflowRun }; export { WorkflowSuspension } from './global.js'; export { @@ -497,6 +502,32 @@ export function workflowEntrypoint( return; } + // In lazy-discovery mode, the first few flow deliveries can + // race with deferred route/materialization and temporarily + // report WorkflowNotRegisteredError. Treat this as transient + // for a bounded retry window instead of failing the run. + if ( + WorkflowNotRegisteredError.is(err) && + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1' + ) { + const retryTimeoutSeconds = + LAZY_DISCOVERY_WORKFLOW_NOT_REGISTERED_RETRY_TIMEOUTS_SECONDS[ + metadata.attempt - 1 + ]; + if (retryTimeoutSeconds !== undefined) { + runtimeLogger.warn( + 'Workflow not registered yet in lazy-discovery mode; re-enqueueing run.', + { + workflowRunId: runId, + workflowName, + attempt: metadata.attempt, + retryTimeoutSeconds, + } + ); + return { timeoutSeconds: retryTimeoutSeconds }; + } + } + // This is a user code error or a WorkflowRuntimeError // (e.g., corrupted event log). Fail the workflow run. diff --git a/packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts b/packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts new file mode 100644 index 0000000000..9103514de7 --- /dev/null +++ b/packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts @@ -0,0 +1,268 @@ +import { + type RUN_ERROR_CODES, + WorkflowNotRegisteredError, +} from '@workflow/errors'; +import { + afterEach, + beforeAll, + beforeEach, + describe, + expect, + it, + vi, +} from 'vitest'; + +const { + capturedHandlerRef, + mockEventsCreate, + mockRunWorkflow, + mockRuntimeLogger, +} = vi.hoisted(() => { + return { + capturedHandlerRef: { + current: null as null | ((...args: unknown[]) => Promise), + }, + mockEventsCreate: vi.fn(), + mockRunWorkflow: vi.fn(), + mockRuntimeLogger: { + warn: vi.fn(), + debug: vi.fn(), + info: vi.fn(), + error: vi.fn(), + }, + }; +}); + +vi.mock('../version.js', () => ({ version: '0.0.0-test' })); +vi.mock('@vercel/functions', () => ({ + waitUntil: vi.fn(), +})); + +vi.mock('./world.js', () => ({ + getWorld: vi.fn(async () => ({ + events: { create: mockEventsCreate }, + })), + getWorldHandlers: vi.fn(async () => ({ + createQueueHandler: vi.fn( + ( + _prefix: string, + handler: (...args: unknown[]) => Promise + ): ((req: Request) => Promise) => { + capturedHandlerRef.current = handler; + return vi.fn() as unknown as (req: Request) => Promise; + } + ), + })), +})); + +vi.mock('../telemetry.js', () => ({ + serializeTraceCarrier: vi.fn().mockResolvedValue({}), + trace: vi.fn((_name: string, _opts: unknown, fn?: unknown) => { + const callback = typeof _opts === 'function' ? _opts : fn; + return (callback as (span?: undefined) => unknown)(undefined); + }), + withTraceContext: vi.fn((_ctx: unknown, fn: () => unknown) => fn()), + linkToCurrentContext: vi.fn().mockResolvedValue([]), + withWorkflowBaggage: vi.fn((_attrs: unknown, fn: () => unknown) => fn()), +})); + +vi.mock('../logger.js', () => ({ + runtimeLogger: mockRuntimeLogger, +})); + +vi.mock('./helpers.js', async () => { + const actual = + await vi.importActual('./helpers.js'); + return { + ...actual, + withHealthCheck: (handler: unknown) => handler, + parseHealthCheckPayload: vi.fn().mockReturnValue(null), + handleHealthCheckMessage: vi.fn(), + getAllWorkflowRunEvents: vi.fn().mockResolvedValue([]), + getQueueOverhead: vi.fn().mockReturnValue({}), + }; +}); + +vi.mock('../types.js', () => ({ + normalizeUnknownError: vi.fn().mockImplementation(async (err: unknown) => ({ + message: err instanceof Error ? err.message : String(err), + name: err instanceof Error ? err.name : 'Error', + stack: err instanceof Error ? err.stack : undefined, + })), + getErrorName: vi.fn().mockReturnValue('Error'), + getErrorStack: vi.fn().mockReturnValue(''), +})); + +vi.mock('../workflow.js', () => ({ + runWorkflow: (...args: unknown[]) => mockRunWorkflow(...args), +})); + +import { workflowEntrypoint } from '../runtime.js'; + +function capturedHandler( + message: unknown, + metadata: { + queueName: string; + messageId: string; + attempt: number; + requestId?: string; + } +) { + if (!capturedHandlerRef.current) { + throw new Error('capturedHandler not set'); + } + return capturedHandlerRef.current(message, metadata); +} + +function createRun(runId: string, workflowName: string) { + const startedAt = new Date(); + return { + runId, + workflowName, + status: 'running', + createdAt: startedAt, + startedAt, + completedAt: null, + input: [], + deploymentId: 'dpl_local@test', + specVersion: 3, + executionContext: {}, + }; +} + +function createRunStartedResult(runId: string, workflowName: string) { + return { + run: createRun(runId, workflowName), + events: [ + { + eventId: 'evnt_created', + eventType: 'run_created', + specVersion: 3, + runId, + createdAt: new Date(), + }, + { + eventId: 'evnt_started', + eventType: 'run_started', + specVersion: 3, + runId, + createdAt: new Date(), + }, + ], + }; +} + +describe('workflowEntrypoint lazy-discovery retry for unregistered workflow', () => { + beforeAll(async () => { + // Initialize handler capture by creating the route handler once. + await workflowEntrypoint('globalThis.__private_workflows = new Map();')( + new Request('http://localhost', { method: 'POST' }) + ); + }); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + delete process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; + }); + + it('re-enqueues on WorkflowNotRegisteredError in lazy discovery mode', async () => { + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = '1'; + + const runId = 'wrun_lazy_retry_1'; + const workflowName = 'workflow//./app/lib/agent-stop//agentStoppedWorkflow'; + + mockEventsCreate.mockImplementation( + (_runId: string, event: { eventType: string }) => { + if (event.eventType === 'run_started') { + return Promise.resolve(createRunStartedResult(runId, workflowName)); + } + return Promise.resolve({ event: {} }); + } + ); + mockRunWorkflow.mockRejectedValue( + new WorkflowNotRegisteredError(workflowName) + ); + + const result = await capturedHandler( + { + runId, + runInput: { + input: [], + deploymentId: 'dpl_local@test', + workflowName, + specVersion: 3, + executionContext: {}, + }, + }, + { + queueName: `__wkf_workflow_${workflowName}`, + messageId: 'msg_retry_1', + attempt: 1, + requestId: 'req_retry_1', + } + ); + + expect(result).toEqual({ timeoutSeconds: 1 }); + expect(mockEventsCreate).not.toHaveBeenCalledWith( + runId, + expect.objectContaining({ eventType: 'run_failed' }), + expect.anything() + ); + }); + + it('fails run after lazy-discovery retry window is exhausted', async () => { + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = '1'; + + const runId = 'wrun_lazy_retry_2'; + const workflowName = 'workflow//./app/lib/agent-stop//agentStoppedWorkflow'; + + mockEventsCreate.mockImplementation( + (_runId: string, event: { eventType: string; eventData?: unknown }) => { + if (event.eventType === 'run_started') { + return Promise.resolve(createRunStartedResult(runId, workflowName)); + } + if (event.eventType === 'run_failed') { + return Promise.resolve({ event }); + } + return Promise.resolve({ event: {} }); + } + ); + mockRunWorkflow.mockRejectedValue( + new WorkflowNotRegisteredError(workflowName) + ); + + const result = await capturedHandler( + { + runId, + runInput: { + input: [], + deploymentId: 'dpl_local@test', + workflowName, + specVersion: 3, + executionContext: {}, + }, + }, + { + queueName: `__wkf_workflow_${workflowName}`, + messageId: 'msg_retry_2', + attempt: 9, + requestId: 'req_retry_2', + } + ); + + expect(result).toBeUndefined(); + expect(mockEventsCreate).toHaveBeenCalledWith( + runId, + expect.objectContaining({ + eventType: 'run_failed', + eventData: expect.objectContaining({ + errorCode: 'RUNTIME_ERROR' satisfies RUN_ERROR_CODES, + }), + }), + expect.objectContaining({ requestId: 'req_retry_2' }) + ); + }); +}); From ad99aa1552f60e86e5216cae5a0de66b4e174c73 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 14:45:18 -0700 Subject: [PATCH 11/17] fix(next): settle deferred trigger-build before flow ack --- ...discovery-workflow-not-registered-retry.md | 8 - packages/core/src/runtime.ts | 31 -- .../lazy-discovery-workflow-retry.test.ts | 268 ------------------ packages/next/src/builder-deferred.ts | 39 +-- packages/next/src/loader.ts | 19 ++ 5 files changed, 41 insertions(+), 324 deletions(-) delete mode 100644 .changeset/fix-lazy-discovery-workflow-not-registered-retry.md delete mode 100644 packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts diff --git a/.changeset/fix-lazy-discovery-workflow-not-registered-retry.md b/.changeset/fix-lazy-discovery-workflow-not-registered-retry.md deleted file mode 100644 index 88d5ad35b2..0000000000 --- a/.changeset/fix-lazy-discovery-workflow-not-registered-retry.md +++ /dev/null @@ -1,8 +0,0 @@ ---- -"@workflow/core": patch ---- - -Retry lazy-discovery workflow registration misses before failing runs - -- Treat `WorkflowNotRegisteredError` as transient for a bounded retry window when `WORKFLOW_NEXT_LAZY_DISCOVERY=1` -- Re-enqueue workflow queue deliveries with backoff instead of immediately writing `run_failed` diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index d8ef1240e4..1b5a6e1b3a 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -2,7 +2,6 @@ import { EntityConflictError, RUN_ERROR_CODES, RunExpiredError, - WorkflowNotRegisteredError, WorkflowRuntimeError, } from '@workflow/errors'; import { parseWorkflowName } from '@workflow/utils/parse-name'; @@ -45,10 +44,6 @@ import { getErrorName, getErrorStack, normalizeUnknownError } from './types.js'; import { buildWorkflowSuspensionMessage } from './util.js'; import { runWorkflow } from './workflow.js'; -const LAZY_DISCOVERY_WORKFLOW_NOT_REGISTERED_RETRY_TIMEOUTS_SECONDS = [ - 1, 1, 2, 3, 5, 8, 13, 21, -] as const; - export type { Event, WorkflowRun }; export { WorkflowSuspension } from './global.js'; export { @@ -502,32 +497,6 @@ export function workflowEntrypoint( return; } - // In lazy-discovery mode, the first few flow deliveries can - // race with deferred route/materialization and temporarily - // report WorkflowNotRegisteredError. Treat this as transient - // for a bounded retry window instead of failing the run. - if ( - WorkflowNotRegisteredError.is(err) && - process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1' - ) { - const retryTimeoutSeconds = - LAZY_DISCOVERY_WORKFLOW_NOT_REGISTERED_RETRY_TIMEOUTS_SECONDS[ - metadata.attempt - 1 - ]; - if (retryTimeoutSeconds !== undefined) { - runtimeLogger.warn( - 'Workflow not registered yet in lazy-discovery mode; re-enqueueing run.', - { - workflowRunId: runId, - workflowName, - attempt: metadata.attempt, - retryTimeoutSeconds, - } - ); - return { timeoutSeconds: retryTimeoutSeconds }; - } - } - // This is a user code error or a WorkflowRuntimeError // (e.g., corrupted event log). Fail the workflow run. diff --git a/packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts b/packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts deleted file mode 100644 index 9103514de7..0000000000 --- a/packages/core/src/runtime/lazy-discovery-workflow-retry.test.ts +++ /dev/null @@ -1,268 +0,0 @@ -import { - type RUN_ERROR_CODES, - WorkflowNotRegisteredError, -} from '@workflow/errors'; -import { - afterEach, - beforeAll, - beforeEach, - describe, - expect, - it, - vi, -} from 'vitest'; - -const { - capturedHandlerRef, - mockEventsCreate, - mockRunWorkflow, - mockRuntimeLogger, -} = vi.hoisted(() => { - return { - capturedHandlerRef: { - current: null as null | ((...args: unknown[]) => Promise), - }, - mockEventsCreate: vi.fn(), - mockRunWorkflow: vi.fn(), - mockRuntimeLogger: { - warn: vi.fn(), - debug: vi.fn(), - info: vi.fn(), - error: vi.fn(), - }, - }; -}); - -vi.mock('../version.js', () => ({ version: '0.0.0-test' })); -vi.mock('@vercel/functions', () => ({ - waitUntil: vi.fn(), -})); - -vi.mock('./world.js', () => ({ - getWorld: vi.fn(async () => ({ - events: { create: mockEventsCreate }, - })), - getWorldHandlers: vi.fn(async () => ({ - createQueueHandler: vi.fn( - ( - _prefix: string, - handler: (...args: unknown[]) => Promise - ): ((req: Request) => Promise) => { - capturedHandlerRef.current = handler; - return vi.fn() as unknown as (req: Request) => Promise; - } - ), - })), -})); - -vi.mock('../telemetry.js', () => ({ - serializeTraceCarrier: vi.fn().mockResolvedValue({}), - trace: vi.fn((_name: string, _opts: unknown, fn?: unknown) => { - const callback = typeof _opts === 'function' ? _opts : fn; - return (callback as (span?: undefined) => unknown)(undefined); - }), - withTraceContext: vi.fn((_ctx: unknown, fn: () => unknown) => fn()), - linkToCurrentContext: vi.fn().mockResolvedValue([]), - withWorkflowBaggage: vi.fn((_attrs: unknown, fn: () => unknown) => fn()), -})); - -vi.mock('../logger.js', () => ({ - runtimeLogger: mockRuntimeLogger, -})); - -vi.mock('./helpers.js', async () => { - const actual = - await vi.importActual('./helpers.js'); - return { - ...actual, - withHealthCheck: (handler: unknown) => handler, - parseHealthCheckPayload: vi.fn().mockReturnValue(null), - handleHealthCheckMessage: vi.fn(), - getAllWorkflowRunEvents: vi.fn().mockResolvedValue([]), - getQueueOverhead: vi.fn().mockReturnValue({}), - }; -}); - -vi.mock('../types.js', () => ({ - normalizeUnknownError: vi.fn().mockImplementation(async (err: unknown) => ({ - message: err instanceof Error ? err.message : String(err), - name: err instanceof Error ? err.name : 'Error', - stack: err instanceof Error ? err.stack : undefined, - })), - getErrorName: vi.fn().mockReturnValue('Error'), - getErrorStack: vi.fn().mockReturnValue(''), -})); - -vi.mock('../workflow.js', () => ({ - runWorkflow: (...args: unknown[]) => mockRunWorkflow(...args), -})); - -import { workflowEntrypoint } from '../runtime.js'; - -function capturedHandler( - message: unknown, - metadata: { - queueName: string; - messageId: string; - attempt: number; - requestId?: string; - } -) { - if (!capturedHandlerRef.current) { - throw new Error('capturedHandler not set'); - } - return capturedHandlerRef.current(message, metadata); -} - -function createRun(runId: string, workflowName: string) { - const startedAt = new Date(); - return { - runId, - workflowName, - status: 'running', - createdAt: startedAt, - startedAt, - completedAt: null, - input: [], - deploymentId: 'dpl_local@test', - specVersion: 3, - executionContext: {}, - }; -} - -function createRunStartedResult(runId: string, workflowName: string) { - return { - run: createRun(runId, workflowName), - events: [ - { - eventId: 'evnt_created', - eventType: 'run_created', - specVersion: 3, - runId, - createdAt: new Date(), - }, - { - eventId: 'evnt_started', - eventType: 'run_started', - specVersion: 3, - runId, - createdAt: new Date(), - }, - ], - }; -} - -describe('workflowEntrypoint lazy-discovery retry for unregistered workflow', () => { - beforeAll(async () => { - // Initialize handler capture by creating the route handler once. - await workflowEntrypoint('globalThis.__private_workflows = new Map();')( - new Request('http://localhost', { method: 'POST' }) - ); - }); - - beforeEach(() => { - vi.clearAllMocks(); - }); - - afterEach(() => { - delete process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; - }); - - it('re-enqueues on WorkflowNotRegisteredError in lazy discovery mode', async () => { - process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = '1'; - - const runId = 'wrun_lazy_retry_1'; - const workflowName = 'workflow//./app/lib/agent-stop//agentStoppedWorkflow'; - - mockEventsCreate.mockImplementation( - (_runId: string, event: { eventType: string }) => { - if (event.eventType === 'run_started') { - return Promise.resolve(createRunStartedResult(runId, workflowName)); - } - return Promise.resolve({ event: {} }); - } - ); - mockRunWorkflow.mockRejectedValue( - new WorkflowNotRegisteredError(workflowName) - ); - - const result = await capturedHandler( - { - runId, - runInput: { - input: [], - deploymentId: 'dpl_local@test', - workflowName, - specVersion: 3, - executionContext: {}, - }, - }, - { - queueName: `__wkf_workflow_${workflowName}`, - messageId: 'msg_retry_1', - attempt: 1, - requestId: 'req_retry_1', - } - ); - - expect(result).toEqual({ timeoutSeconds: 1 }); - expect(mockEventsCreate).not.toHaveBeenCalledWith( - runId, - expect.objectContaining({ eventType: 'run_failed' }), - expect.anything() - ); - }); - - it('fails run after lazy-discovery retry window is exhausted', async () => { - process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = '1'; - - const runId = 'wrun_lazy_retry_2'; - const workflowName = 'workflow//./app/lib/agent-stop//agentStoppedWorkflow'; - - mockEventsCreate.mockImplementation( - (_runId: string, event: { eventType: string; eventData?: unknown }) => { - if (event.eventType === 'run_started') { - return Promise.resolve(createRunStartedResult(runId, workflowName)); - } - if (event.eventType === 'run_failed') { - return Promise.resolve({ event }); - } - return Promise.resolve({ event: {} }); - } - ); - mockRunWorkflow.mockRejectedValue( - new WorkflowNotRegisteredError(workflowName) - ); - - const result = await capturedHandler( - { - runId, - runInput: { - input: [], - deploymentId: 'dpl_local@test', - workflowName, - specVersion: 3, - executionContext: {}, - }, - }, - { - queueName: `__wkf_workflow_${workflowName}`, - messageId: 'msg_retry_2', - attempt: 9, - requestId: 'req_retry_2', - } - ); - - expect(result).toBeUndefined(); - expect(mockEventsCreate).toHaveBeenCalledWith( - runId, - expect.objectContaining({ - eventType: 'run_failed', - eventData: expect.objectContaining({ - errorCode: 'RUNTIME_ERROR' satisfies RUN_ERROR_CODES, - }), - }), - expect.objectContaining({ requestId: 'req_retry_2' }) - ); - }); -}); diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index 1d63e330fb..c8064569de 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -1064,26 +1064,31 @@ export async function getNextBuilderDeferred() { this.deferredRebuildTimer = setTimeout(() => { this.deferredRebuildTimer = null; - const shouldAckTriggerBuild = this.pendingTriggerBuildAck; - this.pendingTriggerBuildAck = false; - - void this.onBeforeDeferredEntries() - .catch((error) => { - console.warn( - '[workflow] Deferred rebuild after source update failed.', - error - ); - }) - .finally(() => { - // A trigger-build waiter must always get a completion signal, - // even when the rebuild is a no-op (signature unchanged). - if (shouldAckTriggerBuild) { - this.socketIO?.emit('build-complete'); - } - }); + void this.runDeferredRebuild(); }, 75); } + private async runDeferredRebuild(): Promise { + try { + await this.onBeforeDeferredEntries(); + } catch (error) { + console.warn( + '[workflow] Deferred rebuild after source update failed.', + error + ); + } finally { + // If another rebuild was queued while this pass was running, wait for + // that later pass to settle before acknowledging trigger-build waiters. + if (this.deferredRebuildTimer) { + return; + } + if (this.pendingTriggerBuildAck) { + this.pendingTriggerBuildAck = false; + this.socketIO?.emit('build-complete'); + } + } + } + private async readWorkflowsCache(): Promise<{ workflowFiles: string[]; stepFiles: string[]; diff --git a/packages/next/src/loader.ts b/packages/next/src/loader.ts index e04a2b219e..758f21e7e2 100644 --- a/packages/next/src/loader.ts +++ b/packages/next/src/loader.ts @@ -374,6 +374,12 @@ function isWorkflowRouteStubSource(source: string): boolean { return source.includes(ROUTE_STUB_FILE_MARKER); } +function isGeneratedFlowRouteFilePath(filePath: string): boolean { + return filePath + .replace(/\\/g, '/') + .endsWith('/.well-known/workflow/v1/flow/route.js'); +} + async function createSocketConnection( socketCredentials: SocketCredentials, timeoutMs = 1_000 @@ -669,13 +675,26 @@ export default function workflowLoader( process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1' && isWorkflowRouteStubSource(normalizedSource) ) { + const isFlowRouteFile = isGeneratedFlowRouteFilePath(filename); try { await ensureDeferredRouteStubBuildAndWait(); const refreshedSource = await readFile(filename, 'utf8'); if (!isWorkflowRouteStubSource(refreshedSource)) { return { code: refreshedSource, map: sourceMap }; } + // Flow route requests drive workflow run execution. If the deferred + // build did not materialize a non-stub flow route, fail this attempt + // so queue delivery can retry instead of writing run_failed with a + // transient WorkflowNotRegisteredError. + if (isFlowRouteFile) { + throw new Error( + `Deferred flow route build completed but output remained a stub: ${filename}` + ); + } } catch (error) { + if (isFlowRouteFile) { + throw error; + } console.warn( `[workflow] Failed waiting for deferred route build for ${filename}, using stub output`, error From 9344636f26b4002363f61fcce1414e42049e5f95 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 15:14:03 -0700 Subject: [PATCH 12/17] fix(next): rebuild deferred routes from cached discovery on boot --- packages/next/src/builder-deferred.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index c8064569de..8bd12101d0 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -135,6 +135,19 @@ export async function getNextBuilderDeferred() { await this.writeStubFiles(outputDir); await this.createDiscoverySocketServer(); + + // After a dev server restart, stubs are rewritten on disk. If we already + // have discovered entries from the persisted cache, trigger a deferred + // rebuild immediately so routes do not remain stubbed until a fresh + // loader discovery event arrives. + if ( + this.config.watch && + (this.discoveredWorkflowFiles.size > 0 || + this.discoveredStepFiles.size > 0 || + this.discoveredSerdeFiles.size > 0) + ) { + this.scheduleDeferredRebuild(); + } } async onBeforeDeferredEntries(): Promise { From 93b29f51fd87159cfb26b7cccdc53fe354c99b57 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 15:40:52 -0700 Subject: [PATCH 13/17] fix(world-local): resolve Next private origin for queue delivery --- packages/world-local/src/config.test.ts | 25 +++++++++++++++++++++++++ packages/world-local/src/config.ts | 24 +++++++++++++++++++++--- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/packages/world-local/src/config.test.ts b/packages/world-local/src/config.test.ts index 3bc9a5b779..87c71ae3c6 100644 --- a/packages/world-local/src/config.test.ts +++ b/packages/world-local/src/config.test.ts @@ -194,6 +194,31 @@ describe('resolveBaseUrl', () => { }); describe('environment variables', () => { + it('should use __NEXT_PRIVATE_ORIGIN when no explicit local base URL is set', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + process.env.__NEXT_PRIVATE_ORIGIN = 'http://localhost:3002'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3002'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should prefer WORKFLOW_LOCAL_BASE_URL over __NEXT_PRIVATE_ORIGIN', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + process.env.WORKFLOW_LOCAL_BASE_URL = 'http://127.0.0.1:4000'; + process.env.__NEXT_PRIVATE_ORIGIN = 'http://localhost:3002'; + delete process.env.PORT; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://127.0.0.1:4000'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + it('should use PORT env var as fallback', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); vi.mocked(getWorkflowPort).mockResolvedValue(undefined); diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index 99431be3e1..d8c94998da 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -11,6 +11,15 @@ const getBaseUrlFromEnv = () => { return process.env.WORKFLOW_LOCAL_BASE_URL; }; +const getNextPrivateOriginFromEnv = () => { + const origin = process.env.__NEXT_PRIVATE_ORIGIN; + if (!origin) { + return undefined; + } + + return origin; +}; + export type Config = { dataDir: string; port?: number; @@ -40,9 +49,10 @@ export const config = once(() => { * Resolves the base URL for queue requests following the priority order: * 1. config.baseUrl (highest priority - full override from args) * 2. WORKFLOW_LOCAL_BASE_URL env var (checked directly to handle late env var setting) - * 3. config.port (explicit port override from args) - * 4. PORT env var (explicit configuration) - * 5. Auto-detected port via getPort (detect actual listening port) + * 3. __NEXT_PRIVATE_ORIGIN env var (Next.js internal server origin) + * 4. config.port (explicit port override from args) + * 5. PORT env var (explicit configuration) + * 6. Auto-detected port via getPort (detect actual listening port) */ export async function resolveBaseUrl(config: Partial): Promise { if (config.baseUrl) { @@ -55,6 +65,14 @@ export async function resolveBaseUrl(config: Partial): Promise { return process.env.WORKFLOW_LOCAL_BASE_URL; } + // Next.js sets this internal origin env var for server-side internal fetches. + // In dev, workflow queue calls can run in worker processes that are not the + // listening server process, so PORT/getWorkflowPort may be unavailable there. + const nextPrivateOrigin = getNextPrivateOriginFromEnv(); + if (nextPrivateOrigin) { + return nextPrivateOrigin; + } + if (typeof config.port === 'number') { return `http://localhost:${config.port}`; } From f2896acdadfca9c1d419952a1222f2fcb4d955f2 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 16:08:01 -0700 Subject: [PATCH 14/17] fix(world-local): resolve lazy queue base URL in detached worker contexts Add argv/ancestor command --port fallbacks for local queue delivery and strengthen port parsing tests. Also disable SWC input source map sidecar loading in builder transforms to avoid deferred flow compile failures when dependencies omit .map files. --- .../builders/src/apply-swc-transform.test.ts | 21 +++ packages/builders/src/apply-swc-transform.ts | 4 + packages/world-local/src/config.test.ts | 83 +++++++++ packages/world-local/src/config.ts | 157 +++++++++++++++++- 4 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 packages/builders/src/apply-swc-transform.test.ts diff --git a/packages/builders/src/apply-swc-transform.test.ts b/packages/builders/src/apply-swc-transform.test.ts new file mode 100644 index 0000000000..9091e312a8 --- /dev/null +++ b/packages/builders/src/apply-swc-transform.test.ts @@ -0,0 +1,21 @@ +import { describe, expect, it } from 'vitest'; +import { applySwcTransform } from './apply-swc-transform.js'; + +describe('applySwcTransform', () => { + it('ignores missing external sourceMappingURL sidecars', async () => { + const source = [ + 'export const value = 1;', + '//# sourceMappingURL=index.js.map', + '', + ].join('\n'); + + const result = await applySwcTransform( + 'fixtures/missing-source-map.js', + source, + 'client' + ); + + expect(result.code).toContain('const value = 1'); + expect(result.workflowManifest).toEqual({}); + }); +}); diff --git a/packages/builders/src/apply-swc-transform.ts b/packages/builders/src/apply-swc-transform.ts index 57c2d09bf3..cbb0c3a308 100644 --- a/packages/builders/src/apply-swc-transform.ts +++ b/packages/builders/src/apply-swc-transform.ts @@ -126,6 +126,10 @@ export async function applySwcTransform( // TODO: investigate proper source map support as they // won't even be used in Node.js by default unless we // intercept errors and apply them ourselves + // Explicitly disable reading input source maps from sourceMappingURL + // sidecars. Some published dependencies omit *.map files, which can + // otherwise fail deferred route compilation in lazy discovery mode. + inputSourceMap: false, sourceMaps: false, minify: false, }); diff --git a/packages/world-local/src/config.test.ts b/packages/world-local/src/config.test.ts index 87c71ae3c6..ede8258bb7 100644 --- a/packages/world-local/src/config.test.ts +++ b/packages/world-local/src/config.test.ts @@ -8,13 +8,16 @@ vi.mock('@workflow/utils/get-port', () => ({ describe('resolveBaseUrl', () => { let originalEnv: NodeJS.ProcessEnv; + let originalArgv: string[]; beforeEach(() => { originalEnv = { ...process.env }; + originalArgv = [...process.argv]; }); afterEach(() => { process.env = originalEnv; + process.argv = originalArgv; vi.clearAllMocks(); }); @@ -227,6 +230,86 @@ describe('resolveBaseUrl', () => { const result = await resolveBaseUrl({}); expect(result).toBe('http://localhost:4173'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should use TURBO_PORT env var when PORT is not set', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + process.env.TURBO_PORT = '3002'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3002'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse --port from npm_lifecycle_script when no direct env port is set', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = + 'MFE_DISABLE_LOCAL_PROXY_REWRITE=1 next dev --port 3002 --turbopack'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3002'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse --port= from npm_lifecycle_script', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = 'next dev --port=3010 --turbopack'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3010'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse -p from npm_lifecycle_script', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = 'next dev -p 4005'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4005'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse -p= from npm_lifecycle_script', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = 'next dev -p=4006'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4006'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should use process.argv when env and lifecycle ports are absent', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + process.argv = ['/usr/local/bin/node', 'next-server', '--port', '4567']; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4567'); + expect(getWorkflowPort).not.toHaveBeenCalled(); }); it('should ignore PORT env var when config.port is provided', async () => { diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index d8c94998da..cc24775717 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -1,6 +1,14 @@ +import { execFile } from 'node:child_process'; +import { promisify } from 'node:util'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { once } from './util.js'; +const execFileAsync = promisify(execFile); +const COMMAND_PORT_PATTERN = + /(?:^|\s)(?:--port(?:=|\s+)|-p(?:=|\s+))(\d{1,5})(?=\s|$)/; +const MAX_ANCESTOR_PORT_SCAN_DEPTH = 6; +let cachedAncestorCommandPortPromise: Promise | null = null; + const getDataDirFromEnv = () => { return process.env.WORKFLOW_LOCAL_DATA_DIR || '.workflow-data'; }; @@ -20,6 +28,129 @@ const getNextPrivateOriginFromEnv = () => { return origin; }; +function getPortFromEnvVariable(name: string): number | undefined { + return parsePort(process.env[name]); +} + +function parsePort(value: string | undefined, radix = 10): number | undefined { + if (!value) { + return undefined; + } + const port = Number.parseInt(value, radix); + if (!Number.isFinite(port) || port < 0 || port > 65535) { + return undefined; + } + return port; +} + +function getPortFromCommand(command: string | undefined): number | undefined { + if (!command) { + return undefined; + } + + const match = command.match(COMMAND_PORT_PATTERN); + if (!match?.[1]) { + return undefined; + } + + return parsePort(match[1]); +} + +function getPortFromLifecycleScript(): number | undefined { + return getPortFromCommand(process.env.npm_lifecycle_script); +} + +function getPortFromArgv(): number | undefined { + if (process.argv.length === 0) { + return undefined; + } + + return getPortFromCommand(process.argv.join(' ')); +} + +async function getParentPid(pid: number): Promise { + if (process.platform === 'win32') { + return undefined; + } + + try { + const { stdout } = await execFileAsync('ps', [ + '-o', + 'ppid=', + '-p', + String(pid), + ]); + const parentPid = parsePort(stdout.trim()); + if (typeof parentPid !== 'number' || parentPid <= 0) { + return undefined; + } + return parentPid; + } catch { + return undefined; + } +} + +async function getPortFromPidCommand(pid: number): Promise { + if (process.platform === 'win32') { + return undefined; + } + + try { + const { stdout } = await execFileAsync('ps', [ + '-o', + 'command=', + '-p', + String(pid), + ]); + return getPortFromCommand(stdout.trim()); + } catch { + return undefined; + } +} + +async function getPortFromAncestorCommands(): Promise { + if (process.platform === 'win32') { + return undefined; + } + + let currentPid = process.pid; + for ( + let depth = 0; + depth < MAX_ANCESTOR_PORT_SCAN_DEPTH && currentPid > 1; + depth++ + ) { + const commandPort = await getPortFromPidCommand(currentPid); + if (typeof commandPort === 'number') { + return commandPort; + } + + const parentPid = await getParentPid(currentPid); + if ( + typeof parentPid !== 'number' || + parentPid <= 0 || + parentPid === currentPid + ) { + break; + } + + currentPid = parentPid; + } + + return undefined; +} + +async function getCachedPortFromAncestorCommands(): Promise< + number | undefined +> { + if (!cachedAncestorCommandPortPromise) { + cachedAncestorCommandPortPromise = getPortFromAncestorCommands().catch( + () => undefined + ); + } + + return cachedAncestorCommandPortPromise; +} + export type Config = { dataDir: string; port?: number; @@ -52,7 +183,11 @@ export const config = once(() => { * 3. __NEXT_PRIVATE_ORIGIN env var (Next.js internal server origin) * 4. config.port (explicit port override from args) * 5. PORT env var (explicit configuration) - * 6. Auto-detected port via getPort (detect actual listening port) + * 6. TURBO_PORT env var (set by turbo task runner in some monorepos) + * 7. npm_lifecycle_script --port/-p value (when dev script encodes the port) + * 8. process.argv --port/-p value + * 9. Ancestor process command --port/-p value (for detached worker contexts) + * 10. Auto-detected port via getPort (detect actual listening port) */ export async function resolveBaseUrl(config: Partial): Promise { if (config.baseUrl) { @@ -81,6 +216,26 @@ export async function resolveBaseUrl(config: Partial): Promise { return `http://localhost:${process.env.PORT}`; } + const turboPort = getPortFromEnvVariable('TURBO_PORT'); + if (typeof turboPort === 'number') { + return `http://localhost:${turboPort}`; + } + + const lifecyclePort = getPortFromLifecycleScript(); + if (typeof lifecyclePort === 'number') { + return `http://localhost:${lifecyclePort}`; + } + + const argvPort = getPortFromArgv(); + if (typeof argvPort === 'number') { + return `http://localhost:${argvPort}`; + } + + const ancestorCommandPort = await getCachedPortFromAncestorCommands(); + if (typeof ancestorCommandPort === 'number') { + return `http://localhost:${ancestorCommandPort}`; + } + const detectedPort = await getWorkflowPort(); if (detectedPort) { return `http://localhost:${detectedPort}`; From 93ee6c18fcc98d7925208a391a0f231ddd9f43ea Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 16:20:39 -0700 Subject: [PATCH 15/17] fix(world-local): detect next dev port from data-dir project process When queue workers lack PORT/TURBO_PORT and no parent argv includes --port, resolveBaseUrl now derives project root from WORKFLOW_LOCAL_DATA_DIR and scans live process commands for matching next dev --port values. Adds regression coverage for process-list fallback. --- packages/world-local/src/config.test.ts | 48 +++++++++++++ packages/world-local/src/config.ts | 92 ++++++++++++++++++++++--- 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/packages/world-local/src/config.test.ts b/packages/world-local/src/config.test.ts index ede8258bb7..c935ede69c 100644 --- a/packages/world-local/src/config.test.ts +++ b/packages/world-local/src/config.test.ts @@ -1,3 +1,7 @@ +import { spawn } from 'node:child_process'; +import { mkdtemp, mkdir } from 'node:fs/promises'; +import os from 'node:os'; +import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { resolveBaseUrl } from './config'; @@ -312,6 +316,50 @@ describe('resolveBaseUrl', () => { expect(getWorkflowPort).not.toHaveBeenCalled(); }); + it('should resolve from process list when dataDir points to a project with a live next dev port', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + delete process.env.__NEXT_PRIVATE_ORIGIN; + process.argv = ['/usr/local/bin/node', 'vitest']; + + const projectRoot = await mkdtemp( + join(os.tmpdir(), 'workflow-config-process-port-') + ); + await mkdir(join(projectRoot, '.next', 'workflow-data'), { + recursive: true, + }); + + const simulatedPort = 41234; + const helper = spawn( + process.execPath, + [ + '-e', + 'setInterval(() => {}, 1000)', + 'next', + 'dev', + '--port', + String(simulatedPort), + `${projectRoot}/app`, + ], + { stdio: 'ignore' } + ); + + try { + await new Promise((resolve) => setTimeout(resolve, 50)); + const result = await resolveBaseUrl({ + dataDir: join(projectRoot, '.next', 'workflow-data'), + }); + expect(result).toBe(`http://localhost:${simulatedPort}`); + expect(getWorkflowPort).not.toHaveBeenCalled(); + } finally { + helper.kill(); + await new Promise((resolve) => helper.once('exit', resolve)); + } + }); + it('should ignore PORT env var when config.port is provided', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); process.env.PORT = '4173'; diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index cc24775717..a1f17daf85 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -1,4 +1,5 @@ import { execFile } from 'node:child_process'; +import { resolve } from 'node:path'; import { promisify } from 'node:util'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { once } from './util.js'; @@ -7,7 +8,8 @@ const execFileAsync = promisify(execFile); const COMMAND_PORT_PATTERN = /(?:^|\s)(?:--port(?:=|\s+)|-p(?:=|\s+))(\d{1,5})(?=\s|$)/; const MAX_ANCESTOR_PORT_SCAN_DEPTH = 6; -let cachedAncestorCommandPortPromise: Promise | null = null; +const cachedProjectPortByRoot = new Map(); +let cachedAncestorCommandPort: number | undefined; const getDataDirFromEnv = () => { return process.env.WORKFLOW_LOCAL_DATA_DIR || '.workflow-data'; @@ -142,13 +144,80 @@ async function getPortFromAncestorCommands(): Promise { async function getCachedPortFromAncestorCommands(): Promise< number | undefined > { - if (!cachedAncestorCommandPortPromise) { - cachedAncestorCommandPortPromise = getPortFromAncestorCommands().catch( - () => undefined - ); + if (typeof cachedAncestorCommandPort === 'number') { + return cachedAncestorCommandPort; } - return cachedAncestorCommandPortPromise; + const resolvedPort = await getPortFromAncestorCommands(); + if (typeof resolvedPort === 'number') { + cachedAncestorCommandPort = resolvedPort; + } + return resolvedPort; +} + +function normalizePath(filePath: string): string { + return filePath.replace(/\\/g, '/'); +} + +function getProjectRootFromDataDir( + dataDir: string | undefined +): string | undefined { + if (!dataDir) { + return undefined; + } + + const normalized = normalizePath(resolve(dataDir)); + + const nextSuffix = '/.next/workflow-data'; + if (normalized.endsWith(nextSuffix)) { + return normalized.slice(0, -nextSuffix.length); + } + + const fallbackSuffix = '/.workflow-data'; + if (normalized.endsWith(fallbackSuffix)) { + return normalized.slice(0, -fallbackSuffix.length); + } + + return undefined; +} + +async function getPortFromProjectProcessList( + projectRoot: string | undefined +): Promise { + if (!projectRoot || process.platform === 'win32') { + return undefined; + } + + const cachedPort = cachedProjectPortByRoot.get(projectRoot); + if (typeof cachedPort === 'number') { + return cachedPort; + } + + try { + const { stdout } = await execFileAsync('ps', ['-Ao', 'command=']); + const normalizedProjectRoot = normalizePath(projectRoot); + const projectPrefix = `${normalizedProjectRoot}/`; + const commandLines = stdout.split('\n').map((line) => line.trim()); + + const nextDevCommands = commandLines.filter( + (line) => + line.includes(projectPrefix) && + line.includes('next') && + line.includes(' dev') + ); + + for (const commandLine of nextDevCommands) { + const parsedPort = getPortFromCommand(commandLine); + if (typeof parsedPort === 'number') { + cachedProjectPortByRoot.set(projectRoot, parsedPort); + return parsedPort; + } + } + + return undefined; + } catch { + return undefined; + } } export type Config = { @@ -186,8 +255,9 @@ export const config = once(() => { * 6. TURBO_PORT env var (set by turbo task runner in some monorepos) * 7. npm_lifecycle_script --port/-p value (when dev script encodes the port) * 8. process.argv --port/-p value - * 9. Ancestor process command --port/-p value (for detached worker contexts) - * 10. Auto-detected port via getPort (detect actual listening port) + * 9. Process list lookup by WORKFLOW_LOCAL_DATA_DIR project root (multi-worker fallback) + * 10. Ancestor process command --port/-p value (for detached worker contexts) + * 11. Auto-detected port via getPort (detect actual listening port) */ export async function resolveBaseUrl(config: Partial): Promise { if (config.baseUrl) { @@ -231,6 +301,12 @@ export async function resolveBaseUrl(config: Partial): Promise { return `http://localhost:${argvPort}`; } + const projectRoot = getProjectRootFromDataDir(config.dataDir); + const projectProcessPort = await getPortFromProjectProcessList(projectRoot); + if (typeof projectProcessPort === 'number') { + return `http://localhost:${projectProcessPort}`; + } + const ancestorCommandPort = await getCachedPortFromAncestorCommands(); if (typeof ancestorCommandPort === 'number') { return `http://localhost:${ancestorCommandPort}`; From cff454907482898e781cca95cc9ec86fa312a9eb Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 16:35:56 -0700 Subject: [PATCH 16/17] fix(next+world-local): stabilize lazy delivery and loader source-map handling Disable implicit SWC sidecar source-map loading in next loader when no upstream map is provided, preventing missing .map dependency failures. Also ensure local workflow queue waits for first flow delivery attempt before returning to reduce dropped async deliveries in short-lived request contexts. --- packages/next/src/loader.ts | 10 +++-- packages/world-local/src/queue.test.ts | 32 ++++++++++++++ packages/world-local/src/queue.ts | 58 ++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 3 deletions(-) diff --git a/packages/next/src/loader.ts b/packages/next/src/loader.ts index 758f21e7e2..00aa5dc4c1 100644 --- a/packages/next/src/loader.ts +++ b/packages/next/src/loader.ts @@ -763,6 +763,9 @@ export default function workflowLoader( workingDir ); const mode = isDeferredStepCopyFile ? 'step' : 'client'; + const inputSourceMapForTransform = isDeferredStepCopyFile + ? deferredSourceMapResult.sourceMap || sourceMap || false + : sourceMap || false; // Transform with SWC const result = await transform(sourceForTransform, { @@ -794,9 +797,10 @@ export default function workflowLoader( }, }, minify: false, - inputSourceMap: isDeferredStepCopyFile - ? deferredSourceMapResult.sourceMap || sourceMap - : sourceMap, + // When no upstream source map is provided, pass `false` explicitly so + // SWC does not attempt to load missing sourceMappingURL sidecar files + // from dependencies (which can fail deferred route compilation). + inputSourceMap: inputSourceMapForTransform, sourceMaps: true, inlineSourcesContent: true, }); diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index fc2a09a417..8b77ec6bda 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -267,4 +267,36 @@ describe('queue timeout re-enqueue', () => { expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); }); + + it('queue waits for first workflow attempt before resolving', async () => { + let firstAttemptStarted = false; + let releaseFirstAttempt: (() => void) | null = null; + const firstAttemptGate = new Promise((resolve) => { + releaseFirstAttempt = resolve; + }); + + localQueue.registerHandler('__wkf_workflow_', async () => { + firstAttemptStarted = true; + await firstAttemptGate; + return Response.json({ ok: true }); + }); + + const queuePromise = localQueue.queue('__wkf_workflow_test' as any, { + runId: 'run_01ABC', + }); + + await vi.waitFor(() => { + expect(firstAttemptStarted).toBe(true); + }); + + let settled = false; + queuePromise.then(() => { + settled = true; + }); + await Promise.resolve(); + expect(settled).toBe(false); + + releaseFirstAttempt?.(); + await queuePromise; + }); }); diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 118a2d2900..086a461868 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -62,6 +62,28 @@ function getWorkflowAttemptTimeoutMs(): number { return parsed; } +const DEFAULT_WORKFLOW_FIRST_ATTEMPT_WAIT_MS = 1500; +function getWorkflowFirstAttemptWaitMs(): number { + const raw = process.env.WORKFLOW_LOCAL_QUEUE_WORKFLOW_FIRST_ATTEMPT_WAIT_MS; + if (!raw) { + return DEFAULT_WORKFLOW_FIRST_ATTEMPT_WAIT_MS; + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_WORKFLOW_FIRST_ATTEMPT_WAIT_MS; + } + return parsed; +} + +function waitForMs(ms: number): Promise { + return new Promise((resolve) => { + const timer = globalThis.setTimeout(resolve, ms); + if (typeof (timer as { unref?: () => void }).unref === 'function') { + (timer as { unref: () => void }).unref(); + } + }); +} + function getLocalRetryDelayMs(params: { attempt: number; status: number; @@ -147,6 +169,20 @@ export function createQueue(config: Partial): LocalQueue { const body = transport.serialize(message); const { pathname, prefix } = getQueueRoute(queueName); const messageId = MessageId.parse(`msg_${generateId()}`); + let firstAttemptDelivered = false; + let resolveFirstAttemptDelivery: (() => void) | null = null; + const firstAttemptDeliveryPromise = new Promise((resolve) => { + resolveFirstAttemptDelivery = () => { + if (firstAttemptDelivered) { + return; + } + firstAttemptDelivered = true; + resolve(); + }; + }); + const signalFirstAttemptDelivery = () => { + resolveFirstAttemptDelivery?.(); + }; // Extract identifiers from the message for structured logging. // Workflow messages have `runId`, step messages have `workflowRunId` + `stepId`. @@ -225,6 +261,9 @@ export function createQueue(config: Partial): LocalQueue { text = await response.text(); } catch (error) { + if (attempt === 0) { + signalFirstAttemptDelivery(); + } console.error( `[world-local] Queue message transport failed (attempt ${attempt + 1})`, { @@ -248,6 +287,9 @@ export function createQueue(config: Partial): LocalQueue { try { const timeoutSeconds = Number(JSON.parse(text).timeoutSeconds); if (Number.isFinite(timeoutSeconds) && timeoutSeconds >= 0) { + if (attempt === 0) { + signalFirstAttemptDelivery(); + } // Clamp to MAX_SAFE_TIMEOUT_MS to avoid Node.js setTimeout overflow warning. // When this fires early, the handler recalculates remaining time from // persistent state and returns another timeoutSeconds if needed. @@ -261,9 +303,15 @@ export function createQueue(config: Partial): LocalQueue { continue; } } catch {} + if (attempt === 0) { + signalFirstAttemptDelivery(); + } return; } + if (attempt === 0) { + signalFirstAttemptDelivery(); + } console.error( `[world-local] Queue message failed (attempt ${attempt + 1}, HTTP ${response.status})`, { @@ -282,6 +330,7 @@ export function createQueue(config: Partial): LocalQueue { await setTimeout(delayMs); } + signalFirstAttemptDelivery(); console.error( `[world-local] Queue message exhausted safety limit (${MAX_LOCAL_SAFETY_LIMIT} attempts)`, { @@ -292,10 +341,12 @@ export function createQueue(config: Partial): LocalQueue { } ); } finally { + signalFirstAttemptDelivery(); semaphore.release(); } })() .catch((err) => { + signalFirstAttemptDelivery(); // Silently ignore client disconnect errors (e.g., browser refresh during streaming) // These are expected and should not cause unhandled rejection warnings const isAbortError = @@ -310,6 +361,13 @@ export function createQueue(config: Partial): LocalQueue { } }); + if (pathname === 'flow') { + await Promise.race([ + firstAttemptDeliveryPromise, + waitForMs(getWorkflowFirstAttemptWaitMs()), + ]); + } + return { messageId }; }; From 52d1e314af9e81146970af8518acbe5b35b7b022 Mon Sep 17 00:00:00 2001 From: JJ Kasper Date: Fri, 10 Apr 2026 17:49:18 -0700 Subject: [PATCH 17/17] fix(world-local): prefer effective port and ignore invalid PORT values --- packages/world-local/src/config.test.ts | 134 ++++++++++++++++- packages/world-local/src/config.ts | 192 ++++++++++++++++++++---- 2 files changed, 296 insertions(+), 30 deletions(-) diff --git a/packages/world-local/src/config.test.ts b/packages/world-local/src/config.test.ts index c935ede69c..5fccfe1f6d 100644 --- a/packages/world-local/src/config.test.ts +++ b/packages/world-local/src/config.test.ts @@ -130,15 +130,16 @@ describe('resolveBaseUrl', () => { expect(result).toBe('http://localhost:5173'); }); - it('should handle port 0 (OS-assigned port)', async () => { + it('should treat port 0 as invalid and fall back to auto-detection', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(5173); const result = await resolveBaseUrl({ port: 0, }); - expect(result).toBe('http://localhost:0'); - expect(getWorkflowPort).not.toHaveBeenCalled(); + expect(result).toBe('http://localhost:5173'); + expect(getWorkflowPort).toHaveBeenCalled(); }); it('should handle port 80', async () => { @@ -237,6 +238,17 @@ describe('resolveBaseUrl', () => { expect(getWorkflowPort).not.toHaveBeenCalled(); }); + it('should ignore invalid PORT env var values and continue fallback resolution', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(5173); + process.env.PORT = '0'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:5173'); + expect(getWorkflowPort).toHaveBeenCalled(); + }); + it('should use TURBO_PORT env var when PORT is not set', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); vi.mocked(getWorkflowPort).mockResolvedValue(undefined); @@ -276,6 +288,20 @@ describe('resolveBaseUrl', () => { expect(getWorkflowPort).not.toHaveBeenCalled(); }); + it('should use the last --port flag from npm_lifecycle_script when multiple are present', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = + 'next dev --port 3002 --turbopack --port 3000'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3000'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + it('should parse -p from npm_lifecycle_script', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); vi.mocked(getWorkflowPort).mockResolvedValue(undefined); @@ -316,6 +342,42 @@ describe('resolveBaseUrl', () => { expect(getWorkflowPort).not.toHaveBeenCalled(); }); + it('should use the last --port flag from process.argv when repeated', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + process.argv = [ + '/usr/local/bin/node', + 'next-server', + '--port', + '3002', + '--turbopack', + '--port', + '3000', + ]; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3000'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should ignore process.argv --port 0 and fall back to auto-detection', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(4568); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + process.argv = ['/usr/local/bin/node', 'next-server', '--port', '0']; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4568'); + expect(getWorkflowPort).toHaveBeenCalled(); + }); + it('should resolve from process list when dataDir points to a project with a live next dev port', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); vi.mocked(getWorkflowPort).mockResolvedValue(undefined); @@ -360,6 +422,72 @@ describe('resolveBaseUrl', () => { } }); + it('should prefer the newest matching next dev process when multiple ports exist', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + delete process.env.__NEXT_PRIVATE_ORIGIN; + process.argv = ['/usr/local/bin/node', 'vitest']; + + const projectRoot = await mkdtemp( + join(os.tmpdir(), 'workflow-config-process-port-multi-') + ); + await mkdir(join(projectRoot, '.next', 'workflow-data'), { + recursive: true, + }); + + const olderPort = 41231; + const newerPort = 41232; + const olderHelper = spawn( + process.execPath, + [ + '-e', + 'setInterval(() => {}, 1000)', + 'next', + 'dev', + '--port', + String(olderPort), + `${projectRoot}/app`, + ], + { stdio: 'ignore' } + ); + + // Ensure PID ordering so the second helper is considered newer. + await new Promise((resolve) => setTimeout(resolve, 20)); + + const newerHelper = spawn( + process.execPath, + [ + '-e', + 'setInterval(() => {}, 1000)', + 'next', + 'dev', + '--port', + String(newerPort), + `${projectRoot}/app`, + ], + { stdio: 'ignore' } + ); + + try { + await new Promise((resolve) => setTimeout(resolve, 50)); + const result = await resolveBaseUrl({ + dataDir: join(projectRoot, '.next', 'workflow-data'), + }); + expect(result).toBe(`http://localhost:${newerPort}`); + expect(getWorkflowPort).not.toHaveBeenCalled(); + } finally { + olderHelper.kill(); + newerHelper.kill(); + await Promise.all([ + new Promise((resolve) => olderHelper.once('exit', resolve)), + new Promise((resolve) => newerHelper.once('exit', resolve)), + ]); + } + }); + it('should ignore PORT env var when config.port is provided', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); process.env.PORT = '4173'; diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index a1f17daf85..a5772f4424 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -6,9 +6,8 @@ import { once } from './util.js'; const execFileAsync = promisify(execFile); const COMMAND_PORT_PATTERN = - /(?:^|\s)(?:--port(?:=|\s+)|-p(?:=|\s+))(\d{1,5})(?=\s|$)/; + /(?:^|\s)(?:--port(?:=|\s+)|-p(?:=|\s+))(\d{1,5})(?=\s|$)/g; const MAX_ANCESTOR_PORT_SCAN_DEPTH = 6; -const cachedProjectPortByRoot = new Map(); let cachedAncestorCommandPort: number | undefined; const getDataDirFromEnv = () => { @@ -39,7 +38,7 @@ function parsePort(value: string | undefined, radix = 10): number | undefined { return undefined; } const port = Number.parseInt(value, radix); - if (!Number.isFinite(port) || port < 0 || port > 65535) { + if (!Number.isFinite(port) || port <= 0 || port > 65535) { return undefined; } return port; @@ -50,12 +49,19 @@ function getPortFromCommand(command: string | undefined): number | undefined { return undefined; } - const match = command.match(COMMAND_PORT_PATTERN); - if (!match?.[1]) { + const matches = Array.from(command.matchAll(COMMAND_PORT_PATTERN)); + if (matches.length === 0) { return undefined; } - return parsePort(match[1]); + // Node/Next scripts can contain repeated `--port` flags (e.g. script default + // plus CLI override). Use the last match, which matches CLI precedence. + const lastMatch = matches[matches.length - 1]; + if (!lastMatch?.[1]) { + return undefined; + } + + return parsePort(lastMatch[1]); } function getPortFromLifecycleScript(): number | undefined { @@ -188,32 +194,160 @@ async function getPortFromProjectProcessList( return undefined; } - const cachedPort = cachedProjectPortByRoot.get(projectRoot); - if (typeof cachedPort === 'number') { - return cachedPort; - } + const isCommandWithinProjectRoot = ( + command: string, + normalizedProjectRoot: string + ): boolean => { + return ( + command.includes(`${normalizedProjectRoot}/`) || + command.includes(`${normalizedProjectRoot} `) || + command.endsWith(normalizedProjectRoot) + ); + }; + + const getProcessCwd = async (pid: number): Promise => { + const { stdout: cwdOutput } = await execFileAsync('lsof', [ + '-a', + '-p', + String(pid), + '-d', + 'cwd', + '-Fn', + ]).catch(() => ({ stdout: '' as string })); + const cwdLine = cwdOutput + .split('\n') + .map((line) => line.trim()) + .find((line) => line.startsWith('n')); + if (!cwdLine || cwdLine.length < 2) { + return undefined; + } + return normalizePath(cwdLine.slice(1)); + }; + + const isHttpReachablePort = async (port: number): Promise => { + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 300); + const response = await fetch(`http://localhost:${port}/`, { + method: 'HEAD', + signal: controller.signal, + }).catch(async () => { + return await fetch(`http://localhost:${port}/`, { + method: 'GET', + signal: controller.signal, + }); + }); + clearTimeout(timeout); + return response.status >= 100 && response.status < 600; + } catch { + return false; + } + }; try { - const { stdout } = await execFileAsync('ps', ['-Ao', 'command=']); + const { stdout } = await execFileAsync('ps', ['-Ao', 'pid=,command=']); const normalizedProjectRoot = normalizePath(projectRoot); - const projectPrefix = `${normalizedProjectRoot}/`; - const commandLines = stdout.split('\n').map((line) => line.trim()); - - const nextDevCommands = commandLines.filter( - (line) => - line.includes(projectPrefix) && - line.includes('next') && - line.includes(' dev') - ); - for (const commandLine of nextDevCommands) { - const parsedPort = getPortFromCommand(commandLine); + const nextDevCommands: Array<{ pid: number; command: string }> = []; + const processEntries = stdout + .split('\n') + .map((entry) => entry.trim()) + .map((line) => { + const match = line.match(/^(\d+)\s+(.+)$/); + if (!match?.[1] || !match[2]) { + return null; + } + const pid = Number.parseInt(match[1], 10); + if (!Number.isFinite(pid)) { + return null; + } + return { + pid, + command: match[2], + }; + }) + .filter((entry): entry is { pid: number; command: string } => + Boolean(entry) + ) + .sort((a, b) => b.pid - a.pid); + + for (const { pid, command } of processEntries) { + if (command.includes('detached-flush')) { + continue; + } + + const looksLikeNextDevProcess = + command.includes('next') && + (command.includes(' dev') || command.includes('next-server')); + if (!looksLikeNextDevProcess) { + continue; + } + + let matchesProject = isCommandWithinProjectRoot( + command, + normalizedProjectRoot + ); + if (!matchesProject) { + const processCwd = await getProcessCwd(pid); + matchesProject = processCwd === normalizedProjectRoot; + } + if (!matchesProject) { + continue; + } + + nextDevCommands.push({ pid, command }); + } + + const candidatePorts: number[] = []; + const addCandidatePort = (port: number) => { + if (!candidatePorts.includes(port)) { + candidatePorts.push(port); + } + }; + + for (const entry of nextDevCommands) { + const parsedPort = getPortFromCommand(entry.command); if (typeof parsedPort === 'number') { - cachedProjectPortByRoot.set(projectRoot, parsedPort); - return parsedPort; + addCandidatePort(parsedPort); + } + + const { stdout: lsofOutput } = await execFileAsync('lsof', [ + '-a', + '-i', + '-P', + '-n', + '-p', + String(entry.pid), + '-sTCP:LISTEN', + ]).catch(() => ({ stdout: '' as string })); + const lsofLines = lsofOutput.split('\n'); + for (const lsofLine of lsofLines) { + const parts = lsofLine.trim().split(/\s+/); + const address = parts[8]; + if (!address) { + continue; + } + const colonIndex = address.lastIndexOf(':'); + if (colonIndex === -1) { + continue; + } + const parsedLsofPort = parsePort(address.slice(colonIndex + 1)); + if (typeof parsedLsofPort === 'number') { + addCandidatePort(parsedLsofPort); + } + } + } + + for (const candidatePort of candidatePorts) { + if (await isHttpReachablePort(candidatePort)) { + return candidatePort; } } + if (candidatePorts.length > 0) { + return candidatePorts[0]; + } + return undefined; } catch { return undefined; @@ -279,11 +413,15 @@ export async function resolveBaseUrl(config: Partial): Promise { } if (typeof config.port === 'number') { - return `http://localhost:${config.port}`; + const parsedConfigPort = parsePort(String(config.port)); + if (typeof parsedConfigPort === 'number') { + return `http://localhost:${parsedConfigPort}`; + } } - if (process.env.PORT) { - return `http://localhost:${process.env.PORT}`; + const envPort = parsePort(process.env.PORT); + if (typeof envPort === 'number') { + return `http://localhost:${envPort}`; } const turboPort = getPortFromEnvVariable('TURBO_PORT');