diff --git a/.changeset/fix-hook-graph-extractor.md b/.changeset/fix-hook-graph-extractor.md new file mode 100644 index 0000000000..fa8674b0e7 --- /dev/null +++ b/.changeset/fix-hook-graph-extractor.md @@ -0,0 +1,8 @@ +--- +"@workflow/builders": patch +--- + +Fix workflow graph hook detection for transformed bundles + +- Recognize step declarations with `@__PURE__` annotations in `WORKFLOW_USE_STEP` access +- Detect `createHook`/`createWebhook` calls wrapped by transpiled `using` helper calls diff --git a/.changeset/fix-workflow-port-probe-post-health.md b/.changeset/fix-workflow-port-probe-post-health.md new file mode 100644 index 0000000000..12a1faf7f7 --- /dev/null +++ b/.changeset/fix-workflow-port-probe-post-health.md @@ -0,0 +1,8 @@ +--- +"@workflow/utils": patch +--- + +Fix local workflow port detection for POST-only health endpoints + +- Probe `/.well-known/workflow/v1/flow?__health` with `POST` when `HEAD` is not healthy +- Prevent lazy-discovery socket ports from being selected as workflow HTTP base URL diff --git a/packages/builders/src/apply-swc-transform.test.ts b/packages/builders/src/apply-swc-transform.test.ts new file mode 100644 index 0000000000..9091e312a8 --- /dev/null +++ b/packages/builders/src/apply-swc-transform.test.ts @@ -0,0 +1,21 @@ +import { describe, expect, it } from 'vitest'; +import { applySwcTransform } from './apply-swc-transform.js'; + +describe('applySwcTransform', () => { + it('ignores missing external sourceMappingURL sidecars', async () => { + const source = [ + 'export const value = 1;', + '//# sourceMappingURL=index.js.map', + '', + ].join('\n'); + + const result = await applySwcTransform( + 'fixtures/missing-source-map.js', + source, + 'client' + ); + + expect(result.code).toContain('const value = 1'); + expect(result.workflowManifest).toEqual({}); + }); +}); diff --git a/packages/builders/src/apply-swc-transform.ts b/packages/builders/src/apply-swc-transform.ts index 57c2d09bf3..cbb0c3a308 100644 --- a/packages/builders/src/apply-swc-transform.ts +++ b/packages/builders/src/apply-swc-transform.ts @@ -126,6 +126,10 @@ export async function applySwcTransform( // TODO: investigate proper source map support as they // won't even be used in Node.js by default unless we // intercept errors and apply them ourselves + // Explicitly disable reading input source maps from sourceMappingURL + // sidecars. Some published dependencies omit *.map files, which can + // otherwise fail deferred route compilation in lazy discovery mode. + inputSourceMap: false, sourceMaps: false, minify: false, }); diff --git a/packages/builders/src/workflows-extractor.test.ts b/packages/builders/src/workflows-extractor.test.ts new file mode 100644 index 0000000000..44372e68e6 --- /dev/null +++ b/packages/builders/src/workflows-extractor.test.ts @@ -0,0 +1,128 @@ +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, describe, expect, it } from 'vitest'; +import { extractWorkflowGraphs } from './workflows-extractor.js'; + +async function createWorkflowBundleFile( + workflowCode: string +): Promise<{ filePath: string; tempDir: string }> { + const tempDir = await mkdtemp(join(tmpdir(), 'workflow-extractor-')); + const filePath = join(tempDir, 'route.js'); + const escapedWorkflowCode = workflowCode.replace(/[\\`$]/g, '\\$&'); + const bundleCode = `const workflowCode = \`${escapedWorkflowCode}\`; +export const POST = workflowCode;`; + await writeFile(filePath, bundleCode, 'utf8'); + return { filePath, tempDir }; +} + +describe('workflows-extractor', () => { + const tempDirs: string[] = []; + + afterEach(async () => { + await Promise.all( + tempDirs + .splice(0) + .map((tempDir) => rm(tempDir, { recursive: true, force: true })) + ); + }); + + it('detects step declarations that include @__PURE__ annotations', async () => { + const workflowCode = ` +var stepWithPure = globalThis[/* @__PURE__ */ Symbol.for("WORKFLOW_USE_STEP")]("step//./workflows/demo//stepWithPure"); +async function demo() { + const value = await stepWithPure(); + return value; +} +demo.workflowId = "workflow//./workflows/demo//demo"; +globalThis.__private_workflows.set("workflow//./workflows/demo//demo", demo); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const demoGraph = graphs['./workflows/demo']?.demo?.graph; + const labels = (demoGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toContain('stepWithPure'); + }); + + it('detects createHook wrapped by transpiled using helpers inside try/finally', async () => { + const workflowCode = ` +var stepA = globalThis[/* @__PURE__ */ Symbol.for("WORKFLOW_USE_STEP")]("step//./workflows/hooks//stepA"); +function _ts_add_disposable_resource(_env, value, _isAsync) { + return value; +} +function _ts_dispose_resources(_env) {} +async function withHook() { + const env = { stack: [] }; + try { + const responseId = await stepA(); + const hook = _ts_add_disposable_resource(env, createHook({ token: 'hook:' + responseId }), false); + const payload = await hook; + return payload; + } finally { + _ts_dispose_resources(env); + } +} +withHook.workflowId = "workflow//./workflows/hooks//withHook"; +globalThis.__private_workflows.set("workflow//./workflows/hooks//withHook", withHook); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const hookGraph = graphs['./workflows/hooks']?.withHook?.graph; + const labels = (hookGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toEqual( + expect.arrayContaining(['stepA', 'createHook', 'awaitWebhook']) + ); + }); + + it('detects hook.create calls used directly inside Promise.race', async () => { + const workflowCode = ` +async function waitForAuthWorkflow() { + const session = await Promise.race([ + authCompleteHook.create({ token: 'auth:demo' }), + sleep('1h').then(() => null), + ]); + return session; +} +waitForAuthWorkflow.workflowId = "workflow//./workflows/hooks//waitForAuthWorkflow"; +globalThis.__private_workflows.set("workflow//./workflows/hooks//waitForAuthWorkflow", waitForAuthWorkflow); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const hookGraph = graphs['./workflows/hooks']?.waitForAuthWorkflow?.graph; + const labels = (hookGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toEqual(expect.arrayContaining(['createHook'])); + }); + + it('detects for-await hook consumption when hook is created via hook.create', async () => { + const workflowCode = ` +var processPayload = globalThis[/* @__PURE__ */ Symbol.for("WORKFLOW_USE_STEP")]("step//./workflows/hooks//processPayload"); +async function streamHookWorkflow() { + const hook = activeSubagentRunHook.create({ token: 'stream:demo' }); + for await (const payload of hook) { + await processPayload(payload); + } +} +streamHookWorkflow.workflowId = "workflow//./workflows/hooks//streamHookWorkflow"; +globalThis.__private_workflows.set("workflow//./workflows/hooks//streamHookWorkflow", streamHookWorkflow); +`; + const { filePath, tempDir } = await createWorkflowBundleFile(workflowCode); + tempDirs.push(tempDir); + + const graphs = await extractWorkflowGraphs(filePath); + const hookGraph = graphs['./workflows/hooks']?.streamHookWorkflow?.graph; + const labels = (hookGraph?.nodes || []).map((node) => node.data.label); + + expect(labels).toEqual( + expect.arrayContaining(['processPayload', 'createHook', 'awaitWebhook']) + ); + }); +}); diff --git a/packages/builders/src/workflows-extractor.ts b/packages/builders/src/workflows-extractor.ts index 4379ed1744..422508448e 100644 --- a/packages/builders/src/workflows-extractor.ts +++ b/packages/builders/src/workflows-extractor.ts @@ -24,6 +24,137 @@ import { parseSync } from '@swc/core'; * pauses or wait points in the workflow execution. */ const WORKFLOW_PRIMITIVES = new Set(['sleep', 'createHook', 'createWebhook']); +const HOOK_PRIMITIVES = new Set(['createHook', 'createWebhook']); + +function getIdentifierCallExpressionName( + callExpr: CallExpression +): string | null { + return callExpr.callee.type === 'Identifier' + ? (callExpr.callee as Identifier).value + : null; +} + +function getMemberExpressionPropertyName( + memberExpr: MemberExpression +): string | null { + if (memberExpr.property.type === 'Identifier') { + return (memberExpr.property as Identifier).value; + } + + if (memberExpr.property.type === 'Computed') { + const computedExpr = (memberExpr.property as any).expression; + if (computedExpr?.type === 'StringLiteral') { + return (computedExpr as any).value; + } + if (computedExpr?.type === 'Identifier') { + return (computedExpr as Identifier).value; + } + } + + return null; +} + +function getExpressionTerminalName(expr: Expression): string | null { + if (expr.type === 'Identifier') { + return (expr as Identifier).value; + } + + if (expr.type === 'ParenthesisExpression') { + return getExpressionTerminalName((expr as any).expression); + } + + if (expr.type === 'MemberExpression') { + const member = expr as MemberExpression; + const propertyName = getMemberExpressionPropertyName(member); + if (propertyName) { + return propertyName; + } + return getExpressionTerminalName(member.object as Expression); + } + + return null; +} + +function getHookMemberCreatePrimitiveName( + callExpr: CallExpression +): string | null { + if (callExpr.callee.type !== 'MemberExpression') { + return null; + } + + const member = callExpr.callee as MemberExpression; + const methodName = getMemberExpressionPropertyName(member); + if (methodName !== 'create') { + return null; + } + + const objectName = getExpressionTerminalName(member.object as Expression); + if (!objectName) { + return null; + } + + // `defineHook(...).create(...)` calls are emitted as member calls in many + // bundles. Treat hook-like object names as createHook primitives. + if (!/hook/i.test(objectName)) { + return null; + } + + return 'createHook'; +} + +function getWorkflowPrimitiveCallName(callExpr: CallExpression): string | null { + const directCallName = getIdentifierCallExpressionName(callExpr); + if (directCallName && WORKFLOW_PRIMITIVES.has(directCallName)) { + return directCallName; + } + + const hookMemberCallName = getHookMemberCreatePrimitiveName(callExpr); + if (hookMemberCallName) { + return hookMemberCallName; + } + + return null; +} + +function findHookPrimitiveCallName(expr: Expression): string | null { + if (expr.type === 'ParenthesisExpression') { + return findHookPrimitiveCallName((expr as any).expression); + } + + if (expr.type === 'SequenceExpression') { + const sequence = expr as any; + for (const expression of sequence.expressions || []) { + const match = findHookPrimitiveCallName(expression as Expression); + if (match) { + return match; + } + } + return null; + } + + if (expr.type !== 'CallExpression') { + return null; + } + + const hookPrimitiveCallName = getWorkflowPrimitiveCallName(expr); + if (hookPrimitiveCallName && HOOK_PRIMITIVES.has(hookPrimitiveCallName)) { + return hookPrimitiveCallName; + } + + for (const arg of expr.arguments || []) { + if (!arg.expression) { + continue; + } + const nestedCallName = findHookPrimitiveCallName( + arg.expression as Expression + ); + if (nestedCallName) { + return nestedCallName; + } + } + + return null; +} /** * Extract the original function name from a stepId. @@ -298,7 +429,7 @@ function extractStepDeclarations( const stepDeclarations = new Map(); const stepPattern = - /var (\w+) = globalThis\[Symbol\.for\("WORKFLOW_USE_STEP"\)\]\("([^"]+)"\)/g; + /(?:var|let|const)\s+(\w+)\s*=\s*globalThis\[\s*(?:\/\*\s*@__PURE__\s*\*\/\s*)?Symbol\.for\("WORKFLOW_USE_STEP"\)\s*\]\("([^"]+)"\)/g; const lines = bundleCode.split('\n'); for (const line of lines) { @@ -743,12 +874,10 @@ function analyzeStatement( // Track webhook/hook variable assignments: const webhook = createWebhook() if ( decl.id.type === 'Identifier' && - decl.init.type === 'CallExpression' && - (decl.init as CallExpression).callee.type === 'Identifier' + decl.init.type === 'CallExpression' ) { - const funcName = ((decl.init as CallExpression).callee as Identifier) - .value; - if (funcName === 'createWebhook' || funcName === 'createHook') { + const funcName = findHookPrimitiveCallName(decl.init as Expression); + if (funcName && HOOK_PRIMITIVES.has(funcName)) { context.webhookVariables.add((decl.id as Identifier).value); } } @@ -1039,66 +1168,97 @@ function analyzeStatement( const isAwait = (stmt as any).isAwait || (stmt as any).await; const body = (stmt as any).body; - if (body.type === 'BlockStatement') { - const loopResult = analyzeBlock( - body.stmts, - stepDeclarations, - context, - functionMap, - variableMap - ); - - for (const node of loopResult.nodes) { - if (!node.metadata) node.metadata = {}; - node.metadata.loopId = loopId; - node.metadata.loopIsAwait = isAwait; + // `for await (const payload of hook)` is a webhook/hook wait pattern. + // Represent the awaited iterator source as an awaitWebhook primitive. + const iterableExpr = (stmt as any).right as Expression | undefined; + const awaitHookEntryIds: string[] = []; + const awaitHookExitIds: string[] = []; + if (isAwait && iterableExpr?.type === 'Identifier') { + const iterableName = (iterableExpr as Identifier).value; + if (context.webhookVariables.has(iterableName)) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = { + loopId, + loopIsAwait: true, + }; + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: 'awaitWebhook', + nodeKind: 'primitive', + }, + metadata, + }; + nodes.push(node); + awaitHookEntryIds.push(nodeId); + awaitHookExitIds.push(nodeId); } + } - nodes.push(...loopResult.nodes); - edges.push(...loopResult.edges); - entryNodeIds = loopResult.entryNodeIds; - exitNodeIds = loopResult.exitNodeIds; + const loopResult = + body.type === 'BlockStatement' + ? analyzeBlock( + body.stmts, + stepDeclarations, + context, + functionMap, + variableMap + ) + : analyzeStatement( + body, + stepDeclarations, + context, + functionMap, + variableMap + ); - for (const exitId of loopResult.exitNodeIds) { - for (const entryId of loopResult.entryNodeIds) { - edges.push({ - id: `e_${exitId}_back_${entryId}`, - source: exitId, - target: entryId, - type: 'loop', - }); + for (const node of loopResult.nodes) { + if (!node.metadata) node.metadata = {}; + node.metadata.loopId = loopId; + node.metadata.loopIsAwait = isAwait; + } + + nodes.push(...loopResult.nodes); + edges.push(...loopResult.edges); + + if (awaitHookEntryIds.length > 0) { + entryNodeIds = awaitHookEntryIds; + if (loopResult.entryNodeIds.length > 0) { + for (const hookExitId of awaitHookExitIds) { + for (const loopEntryId of loopResult.entryNodeIds) { + edges.push({ + id: `e_${hookExitId}_${loopEntryId}`, + source: hookExitId, + target: loopEntryId, + type: 'default', + }); + } } } } else { - // Handle single-statement body (no braces) - const loopResult = analyzeStatement( - body, - stepDeclarations, - context, - functionMap, - variableMap - ); - - for (const node of loopResult.nodes) { - if (!node.metadata) node.metadata = {}; - node.metadata.loopId = loopId; - node.metadata.loopIsAwait = isAwait; - } - - nodes.push(...loopResult.nodes); - edges.push(...loopResult.edges); entryNodeIds = loopResult.entryNodeIds; + } + + if (loopResult.exitNodeIds.length > 0) { exitNodeIds = loopResult.exitNodeIds; + } else if (awaitHookExitIds.length > 0) { + exitNodeIds = awaitHookExitIds; + } else { + exitNodeIds = []; + } - for (const exitId of loopResult.exitNodeIds) { - for (const entryId of loopResult.entryNodeIds) { - edges.push({ - id: `e_${exitId}_back_${entryId}`, - source: exitId, - target: entryId, - type: 'loop', - }); - } + for (const exitId of exitNodeIds) { + for (const entryId of entryNodeIds) { + edges.push({ + id: `e_${exitId}_back_${entryId}`, + source: exitId, + target: entryId, + type: 'loop', + }); } } @@ -1120,6 +1280,67 @@ function analyzeStatement( exitNodeIds = blockResult.exitNodeIds; } + if (stmt.type === 'TryStatement') { + const tryStmt = stmt as any; + const tryResult = analyzeBlock( + tryStmt.block?.stmts || [], + stepDeclarations, + context, + functionMap, + variableMap + ); + nodes.push(...tryResult.nodes); + edges.push(...tryResult.edges); + entryNodeIds = tryResult.entryNodeIds; + exitNodeIds = tryResult.exitNodeIds; + + if (tryStmt.handler?.body?.stmts) { + const catchResult = analyzeBlock( + tryStmt.handler.body.stmts, + stepDeclarations, + context, + functionMap, + variableMap + ); + nodes.push(...catchResult.nodes); + edges.push(...catchResult.edges); + if (entryNodeIds.length === 0) { + entryNodeIds = catchResult.entryNodeIds; + } + exitNodeIds = Array.from( + new Set([...exitNodeIds, ...catchResult.exitNodeIds]) + ); + } + + if (tryStmt.finalizer?.stmts) { + const finalizerResult = analyzeBlock( + tryStmt.finalizer.stmts, + stepDeclarations, + context, + functionMap, + variableMap + ); + nodes.push(...finalizerResult.nodes); + edges.push(...finalizerResult.edges); + if (entryNodeIds.length === 0) { + entryNodeIds = finalizerResult.entryNodeIds; + } + if (finalizerResult.entryNodeIds.length > 0) { + for (const exitId of exitNodeIds) { + for (const entryId of finalizerResult.entryNodeIds) { + edges.push({ + id: `e_${exitId}_${entryId}`, + source: exitId, + target: entryId, + type: 'default', + }); + } + } + exitNodeIds = finalizerResult.exitNodeIds; + } + } + } + if (stmt.type === 'ReturnStatement' && (stmt as any).argument) { const result = analyzeExpression( (stmt as any).argument, @@ -1379,6 +1600,33 @@ function analyzeExpression( entryNodeIds.push(...transitiveResult.entryNodeIds); exitNodeIds.push(...transitiveResult.exitNodeIds); } + } else { + const primitiveName = getWorkflowPrimitiveCallName(callExpr); + if (primitiveName) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + + if (context.inLoop) { + metadata.loopId = context.inLoop; + } + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: primitiveName, + nodeKind: 'primitive', + }, + metadata: Object.keys(metadata).length > 0 ? metadata : undefined, + }; + + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); + } } // Also analyze the arguments of awaited calls for step references in objects @@ -1500,6 +1748,33 @@ function analyzeExpression( entryNodeIds.push(...transitiveResult.entryNodeIds); exitNodeIds.push(...transitiveResult.exitNodeIds); } + } else { + const primitiveName = getWorkflowPrimitiveCallName(callExpr); + if (primitiveName) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + + if (context.inLoop) { + metadata.loopId = context.inLoop; + } + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: primitiveName, + nodeKind: 'primitive', + }, + metadata: Object.keys(metadata).length > 0 ? metadata : undefined, + }; + + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); + } } } @@ -1614,6 +1889,47 @@ function analyzeExpression( nodes.push(node); entryNodeIds.push(nodeId); exitNodeIds.push(nodeId); + } else if (WORKFLOW_PRIMITIVES.has(funcName)) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: funcName, + nodeKind: 'primitive', + }, + metadata: + Object.keys(metadata).length > 0 ? metadata : undefined, + }; + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); + } + } else { + const primitiveName = getWorkflowPrimitiveCallName(argCallExpr); + if (primitiveName) { + const nodeId = `node_${context.nodeCounter++}`; + const metadata: NodeMetadata = {}; + if (context.inConditional) { + metadata.conditionalId = context.inConditional; + } + const node: ManifestNode = { + id: nodeId, + type: 'primitive', + data: { + label: primitiveName, + nodeKind: 'primitive', + }, + metadata: + Object.keys(metadata).length > 0 ? metadata : undefined, + }; + nodes.push(node); + entryNodeIds.push(nodeId); + exitNodeIds.push(nodeId); } } } diff --git a/packages/core/src/runtime/resume-hook.test.ts b/packages/core/src/runtime/resume-hook.test.ts new file mode 100644 index 0000000000..de774b6a55 --- /dev/null +++ b/packages/core/src/runtime/resume-hook.test.ts @@ -0,0 +1,82 @@ +import { HookNotFoundError } from '@workflow/errors'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { getHookByToken } from './resume-hook.js'; +import { getWorld } from './world.js'; + +vi.mock('node:timers/promises', () => ({ + setTimeout: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('./world.js', () => ({ + getWorld: vi.fn(), +})); + +describe('getHookByToken lazy discovery retries', () => { + const originalLazyDiscovery = process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; + + const baseHook = { + runId: 'wrun_test', + hookId: 'hook_test', + token: 'token_test', + ownerId: 'owner_test', + projectId: 'project_test', + environment: 'development', + createdAt: new Date('2026-01-01T00:00:00.000Z'), + specVersion: 3, + }; + + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + if (originalLazyDiscovery === undefined) { + delete process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; + } else { + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = originalLazyDiscovery; + } + }); + + it('retries HookNotFoundError while lazy discovery is enabled', async () => { + process.env.WORKFLOW_NEXT_LAZY_DISCOVERY = '1'; + + const getByToken = vi + .fn() + .mockRejectedValueOnce(new HookNotFoundError('token_test')) + .mockRejectedValueOnce(new HookNotFoundError('token_test')) + .mockResolvedValue(baseHook); + + vi.mocked(getWorld).mockResolvedValue({ + hooks: { getByToken }, + runs: { + get: vi.fn().mockResolvedValue({ runId: 'wrun_test' }), + }, + getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), + } as any); + + const hook = await getHookByToken('token_test'); + expect(hook.token).toBe('token_test'); + expect(getByToken).toHaveBeenCalledTimes(3); + }); + + it('does not retry HookNotFoundError when lazy discovery is disabled', async () => { + delete process.env.WORKFLOW_NEXT_LAZY_DISCOVERY; + + const getByToken = vi + .fn() + .mockRejectedValue(new HookNotFoundError('token_test')); + + vi.mocked(getWorld).mockResolvedValue({ + hooks: { getByToken }, + runs: { + get: vi.fn().mockResolvedValue({ runId: 'wrun_test' }), + }, + getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined), + } as any); + + await expect(getHookByToken('token_test')).rejects.toThrow( + HookNotFoundError + ); + expect(getByToken).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index eed7e62450..e4d870fc8e 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -1,3 +1,4 @@ +import { setTimeout as delay } from 'node:timers/promises'; import { waitUntil } from '@vercel/functions'; import { ERROR_SLUGS, @@ -26,6 +27,21 @@ import { waitedUntil } from '../util.js'; import { getWorkflowQueueName } from './helpers.js'; import { getWorld } from './world.js'; +const LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS = [ + 50, 100, 200, 400, 800, 1200, 1800, 2500, +] as const; + +function shouldRetryMissingHookLookup(error: unknown): boolean { + if (!HookNotFoundError.is(error)) { + return false; + } + + // In lazy discovery mode a hook token can be resumed before the first + // deferred build has finished generating/activating the workflow route. + // Retrying for a short window avoids spurious HookNotFound races. + return process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1'; +} + /** * Internal helper that returns the hook, the associated workflow run, * and the resolved encryption key. @@ -50,6 +66,36 @@ async function getHookByTokenWithKey(token: string): Promise<{ return { hook, run, encryptionKey }; } +async function getHookByTokenWithKeyWithRetry(token: string): Promise<{ + hook: Hook; + run: WorkflowRun; + encryptionKey: CryptoKey | undefined; +}> { + let lastError: unknown; + for ( + let attempt = 0; + attempt <= LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS.length; + attempt++ + ) { + try { + return await getHookByTokenWithKey(token); + } catch (error) { + if (!shouldRetryMissingHookLookup(error)) { + throw error; + } + + lastError = error; + if (attempt === LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS.length) { + break; + } + + await delay(LAZY_DISCOVERY_HOOK_LOOKUP_RETRY_DELAYS_MS[attempt]); + } + } + + throw lastError; +} + /** * Get the hook by token to find the associated workflow run, * and hydrate the `metadata` property if it was set from within @@ -58,7 +104,7 @@ async function getHookByTokenWithKey(token: string): Promise<{ * @param token - The unique token identifying the hook */ export async function getHookByToken(token: string): Promise { - const { hook } = await getHookByTokenWithKey(token); + const { hook } = await getHookByTokenWithKeyWithRetry(token); return hook; } @@ -105,7 +151,7 @@ export async function resumeHook( let workflowRun: WorkflowRun; let encryptionKey: CryptoKey | undefined; if (typeof tokenOrHook === 'string') { - const result = await getHookByTokenWithKey(tokenOrHook); + const result = await getHookByTokenWithKeyWithRetry(tokenOrHook); hook = result.hook; workflowRun = result.run; encryptionKey = encryptionKeyOverride ?? result.encryptionKey; @@ -252,7 +298,7 @@ export async function resumeWebhook( token: string, request: Request ): Promise { - const { hook, encryptionKey } = await getHookByTokenWithKey(token); + const { hook, encryptionKey } = await getHookByTokenWithKeyWithRetry(token); // Only webhooks can be resumed via the public endpoint. // If the hook was created via createHook() (isWebhook !== true), diff --git a/packages/next/src/builder-deferred.ts b/packages/next/src/builder-deferred.ts index cb918cf0c2..8bd12101d0 100644 --- a/packages/next/src/builder-deferred.ts +++ b/packages/next/src/builder-deferred.ts @@ -115,6 +115,7 @@ export async function getNextBuilderDeferred() { private cacheInitialized = false; private cacheWriteTimer: NodeJS.Timeout | null = null; private deferredRebuildTimer: NodeJS.Timeout | null = null; + private pendingTriggerBuildAck = false; private lastDeferredBuildSignature: string | null = null; // Lazily initialized resolvers for bare specifier rewriting. // Cached to avoid re-creating on every import rewrite. @@ -134,6 +135,19 @@ export async function getNextBuilderDeferred() { await this.writeStubFiles(outputDir); await this.createDiscoverySocketServer(); + + // After a dev server restart, stubs are rewritten on disk. If we already + // have discovered entries from the persisted cache, trigger a deferred + // rebuild immediately so routes do not remain stubbed until a fresh + // loader discovery event arrives. + if ( + this.config.watch && + (this.discoveredWorkflowFiles.size > 0 || + this.discoveredStepFiles.size > 0 || + this.discoveredSerdeFiles.size > 0) + ) { + this.scheduleDeferredRebuild(); + } } async onBeforeDeferredEntries(): Promise { @@ -767,7 +781,7 @@ export async function getNextBuilderDeferred() { } }, onTriggerBuild: () => { - this.scheduleDeferredRebuild(); + this.scheduleDeferredRebuild({ acknowledgeTriggerBuild: true }); }, }; @@ -1046,26 +1060,48 @@ export async function getNextBuilderDeferred() { }, 50); } - private scheduleDeferredRebuild(): void { + private scheduleDeferredRebuild(options?: { + acknowledgeTriggerBuild?: boolean; + }): void { if (!this.config.watch) { return; } + if (options?.acknowledgeTriggerBuild) { + this.pendingTriggerBuildAck = true; + } + if (this.deferredRebuildTimer) { clearTimeout(this.deferredRebuildTimer); } this.deferredRebuildTimer = setTimeout(() => { this.deferredRebuildTimer = null; - void this.onBeforeDeferredEntries().catch((error) => { - console.warn( - '[workflow] Deferred rebuild after source update failed.', - error - ); - }); + void this.runDeferredRebuild(); }, 75); } + private async runDeferredRebuild(): Promise { + try { + await this.onBeforeDeferredEntries(); + } catch (error) { + console.warn( + '[workflow] Deferred rebuild after source update failed.', + error + ); + } finally { + // If another rebuild was queued while this pass was running, wait for + // that later pass to settle before acknowledging trigger-build waiters. + if (this.deferredRebuildTimer) { + return; + } + if (this.pendingTriggerBuildAck) { + this.pendingTriggerBuildAck = false; + this.socketIO?.emit('build-complete'); + } + } + } + private async readWorkflowsCache(): Promise<{ workflowFiles: string[]; stepFiles: string[]; diff --git a/packages/next/src/loader.ts b/packages/next/src/loader.ts index e04a2b219e..00aa5dc4c1 100644 --- a/packages/next/src/loader.ts +++ b/packages/next/src/loader.ts @@ -374,6 +374,12 @@ function isWorkflowRouteStubSource(source: string): boolean { return source.includes(ROUTE_STUB_FILE_MARKER); } +function isGeneratedFlowRouteFilePath(filePath: string): boolean { + return filePath + .replace(/\\/g, '/') + .endsWith('/.well-known/workflow/v1/flow/route.js'); +} + async function createSocketConnection( socketCredentials: SocketCredentials, timeoutMs = 1_000 @@ -669,13 +675,26 @@ export default function workflowLoader( process.env.WORKFLOW_NEXT_LAZY_DISCOVERY === '1' && isWorkflowRouteStubSource(normalizedSource) ) { + const isFlowRouteFile = isGeneratedFlowRouteFilePath(filename); try { await ensureDeferredRouteStubBuildAndWait(); const refreshedSource = await readFile(filename, 'utf8'); if (!isWorkflowRouteStubSource(refreshedSource)) { return { code: refreshedSource, map: sourceMap }; } + // Flow route requests drive workflow run execution. If the deferred + // build did not materialize a non-stub flow route, fail this attempt + // so queue delivery can retry instead of writing run_failed with a + // transient WorkflowNotRegisteredError. + if (isFlowRouteFile) { + throw new Error( + `Deferred flow route build completed but output remained a stub: ${filename}` + ); + } } catch (error) { + if (isFlowRouteFile) { + throw error; + } console.warn( `[workflow] Failed waiting for deferred route build for ${filename}, using stub output`, error @@ -744,6 +763,9 @@ export default function workflowLoader( workingDir ); const mode = isDeferredStepCopyFile ? 'step' : 'client'; + const inputSourceMapForTransform = isDeferredStepCopyFile + ? deferredSourceMapResult.sourceMap || sourceMap || false + : sourceMap || false; // Transform with SWC const result = await transform(sourceForTransform, { @@ -775,9 +797,10 @@ export default function workflowLoader( }, }, minify: false, - inputSourceMap: isDeferredStepCopyFile - ? deferredSourceMapResult.sourceMap || sourceMap - : sourceMap, + // When no upstream source map is provided, pass `false` explicitly so + // SWC does not attempt to load missing sourceMappingURL sidecar files + // from dependencies (which can fail deferred route compilation). + inputSourceMap: inputSourceMapForTransform, sourceMaps: true, inlineSourcesContent: true, }); diff --git a/packages/utils/src/get-port.test.ts b/packages/utils/src/get-port.test.ts index fd44cd1656..7f681f033e 100644 --- a/packages/utils/src/get-port.test.ts +++ b/packages/utils/src/get-port.test.ts @@ -273,6 +273,47 @@ describe('getWorkflowPort', () => { expect(port).toBe(workflowAddr.port); }); + it('should detect workflow server when health endpoint is POST-only', async () => { + // Non-workflow server (returns 404 for all requests) + const nonWorkflowServer = http.createServer((req, res) => { + res.writeHead(404); + res.end(); + }); + + // Workflow server that returns 405 for HEAD and 200 for POST health checks + const workflowServer = http.createServer((req, res) => { + if (req.url?.includes('__health')) { + if (req.method === 'POST') { + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ healthy: true })); + return; + } + res.writeHead(405); + res.end(); + return; + } + + if (req.url?.startsWith('/.well-known/workflow/v1/')) { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Missing required headers' })); + return; + } + + res.writeHead(404); + res.end(); + }); + + servers.push(nonWorkflowServer, workflowServer); + + await new Promise((resolve) => nonWorkflowServer.listen(0, resolve)); + await new Promise((resolve) => workflowServer.listen(0, resolve)); + + const port = await getWorkflowPort(); + const workflowAddr = workflowServer.address() as AddressInfo; + + expect(port).toBe(workflowAddr.port); + }); + it('should fall back to first port when probing fails', async () => { // Two non-workflow servers (both return 404) const server1 = http.createServer((req, res) => { diff --git a/packages/utils/src/get-port.ts b/packages/utils/src/get-port.ts index b6806a35a4..be0a2e4738 100644 --- a/packages/utils/src/get-port.ts +++ b/packages/utils/src/get-port.ts @@ -241,28 +241,22 @@ export interface ProbeOptions { timeout?: number; } -/** - * Probes a port to check if it's serving the workflow HTTP server. - * Uses HEAD request to minimize overhead. - * - * @returns true if the port responds with a 200 status from the health check endpoint - */ -async function probePort( +async function probePortWithMethod( port: number, - options: ProbeOptions = {} + endpoint: string, + timeout: number, + method: 'HEAD' | 'POST' ): Promise { - const { endpoint = PROBE_ENDPOINT, timeout = PROBE_TIMEOUT_MS } = options; - const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), timeout); try { const response = await fetch(`http://localhost:${port}${endpoint}`, { - method: 'HEAD', + method, signal: controller.signal, }); - // The workflow health endpoint returns 200 for healthy + // The workflow health endpoint returns 200 for healthy. return response.status === 200; } catch { // Connection refused, timeout, or other error @@ -272,6 +266,26 @@ async function probePort( } } +/** + * Probes a port to check if it's serving the workflow HTTP server. + * + * It first tries HEAD (low overhead), then falls back to POST for runtimes + * that only expose health checks on POST (for example Next.js route handlers + * that only export POST in lazy-discovery mode). + * + * @returns true if the port responds with a 200 status from the health check endpoint + */ +async function probePort( + port: number, + options: ProbeOptions = {} +): Promise { + const { endpoint = PROBE_ENDPOINT, timeout = PROBE_TIMEOUT_MS } = options; + if (await probePortWithMethod(port, endpoint, timeout, 'HEAD')) { + return true; + } + return await probePortWithMethod(port, endpoint, timeout, 'POST'); +} + /** * Gets the workflow server port by probing all listening ports. * This is more reliable than getPort() when other services (like Node.js inspector) diff --git a/packages/world-local/src/config.test.ts b/packages/world-local/src/config.test.ts index 3bc9a5b779..5fccfe1f6d 100644 --- a/packages/world-local/src/config.test.ts +++ b/packages/world-local/src/config.test.ts @@ -1,3 +1,7 @@ +import { spawn } from 'node:child_process'; +import { mkdtemp, mkdir } from 'node:fs/promises'; +import os from 'node:os'; +import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { resolveBaseUrl } from './config'; @@ -8,13 +12,16 @@ vi.mock('@workflow/utils/get-port', () => ({ describe('resolveBaseUrl', () => { let originalEnv: NodeJS.ProcessEnv; + let originalArgv: string[]; beforeEach(() => { originalEnv = { ...process.env }; + originalArgv = [...process.argv]; }); afterEach(() => { process.env = originalEnv; + process.argv = originalArgv; vi.clearAllMocks(); }); @@ -123,15 +130,16 @@ describe('resolveBaseUrl', () => { expect(result).toBe('http://localhost:5173'); }); - it('should handle port 0 (OS-assigned port)', async () => { + it('should treat port 0 as invalid and fall back to auto-detection', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(5173); const result = await resolveBaseUrl({ port: 0, }); - expect(result).toBe('http://localhost:0'); - expect(getWorkflowPort).not.toHaveBeenCalled(); + expect(result).toBe('http://localhost:5173'); + expect(getWorkflowPort).toHaveBeenCalled(); }); it('should handle port 80', async () => { @@ -194,6 +202,31 @@ describe('resolveBaseUrl', () => { }); describe('environment variables', () => { + it('should use __NEXT_PRIVATE_ORIGIN when no explicit local base URL is set', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + process.env.__NEXT_PRIVATE_ORIGIN = 'http://localhost:3002'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3002'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should prefer WORKFLOW_LOCAL_BASE_URL over __NEXT_PRIVATE_ORIGIN', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + process.env.WORKFLOW_LOCAL_BASE_URL = 'http://127.0.0.1:4000'; + process.env.__NEXT_PRIVATE_ORIGIN = 'http://localhost:3002'; + delete process.env.PORT; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://127.0.0.1:4000'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + it('should use PORT env var as fallback', async () => { const { getWorkflowPort } = await import('@workflow/utils/get-port'); vi.mocked(getWorkflowPort).mockResolvedValue(undefined); @@ -202,6 +235,257 @@ describe('resolveBaseUrl', () => { const result = await resolveBaseUrl({}); expect(result).toBe('http://localhost:4173'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should ignore invalid PORT env var values and continue fallback resolution', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(5173); + process.env.PORT = '0'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:5173'); + expect(getWorkflowPort).toHaveBeenCalled(); + }); + + it('should use TURBO_PORT env var when PORT is not set', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + process.env.TURBO_PORT = '3002'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3002'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse --port from npm_lifecycle_script when no direct env port is set', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = + 'MFE_DISABLE_LOCAL_PROXY_REWRITE=1 next dev --port 3002 --turbopack'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3002'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse --port= from npm_lifecycle_script', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = 'next dev --port=3010 --turbopack'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3010'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should use the last --port flag from npm_lifecycle_script when multiple are present', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = + 'next dev --port 3002 --turbopack --port 3000'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3000'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse -p from npm_lifecycle_script', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = 'next dev -p 4005'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4005'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should parse -p= from npm_lifecycle_script', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + process.env.npm_lifecycle_script = 'next dev -p=4006'; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4006'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should use process.argv when env and lifecycle ports are absent', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + process.argv = ['/usr/local/bin/node', 'next-server', '--port', '4567']; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4567'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should use the last --port flag from process.argv when repeated', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + process.argv = [ + '/usr/local/bin/node', + 'next-server', + '--port', + '3002', + '--turbopack', + '--port', + '3000', + ]; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:3000'); + expect(getWorkflowPort).not.toHaveBeenCalled(); + }); + + it('should ignore process.argv --port 0 and fall back to auto-detection', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(4568); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + process.argv = ['/usr/local/bin/node', 'next-server', '--port', '0']; + + const result = await resolveBaseUrl({}); + + expect(result).toBe('http://localhost:4568'); + expect(getWorkflowPort).toHaveBeenCalled(); + }); + + it('should resolve from process list when dataDir points to a project with a live next dev port', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + delete process.env.__NEXT_PRIVATE_ORIGIN; + process.argv = ['/usr/local/bin/node', 'vitest']; + + const projectRoot = await mkdtemp( + join(os.tmpdir(), 'workflow-config-process-port-') + ); + await mkdir(join(projectRoot, '.next', 'workflow-data'), { + recursive: true, + }); + + const simulatedPort = 41234; + const helper = spawn( + process.execPath, + [ + '-e', + 'setInterval(() => {}, 1000)', + 'next', + 'dev', + '--port', + String(simulatedPort), + `${projectRoot}/app`, + ], + { stdio: 'ignore' } + ); + + try { + await new Promise((resolve) => setTimeout(resolve, 50)); + const result = await resolveBaseUrl({ + dataDir: join(projectRoot, '.next', 'workflow-data'), + }); + expect(result).toBe(`http://localhost:${simulatedPort}`); + expect(getWorkflowPort).not.toHaveBeenCalled(); + } finally { + helper.kill(); + await new Promise((resolve) => helper.once('exit', resolve)); + } + }); + + it('should prefer the newest matching next dev process when multiple ports exist', async () => { + const { getWorkflowPort } = await import('@workflow/utils/get-port'); + vi.mocked(getWorkflowPort).mockResolvedValue(undefined); + delete process.env.PORT; + delete process.env.TURBO_PORT; + delete process.env.npm_lifecycle_script; + delete process.env.__NEXT_PRIVATE_ORIGIN; + process.argv = ['/usr/local/bin/node', 'vitest']; + + const projectRoot = await mkdtemp( + join(os.tmpdir(), 'workflow-config-process-port-multi-') + ); + await mkdir(join(projectRoot, '.next', 'workflow-data'), { + recursive: true, + }); + + const olderPort = 41231; + const newerPort = 41232; + const olderHelper = spawn( + process.execPath, + [ + '-e', + 'setInterval(() => {}, 1000)', + 'next', + 'dev', + '--port', + String(olderPort), + `${projectRoot}/app`, + ], + { stdio: 'ignore' } + ); + + // Ensure PID ordering so the second helper is considered newer. + await new Promise((resolve) => setTimeout(resolve, 20)); + + const newerHelper = spawn( + process.execPath, + [ + '-e', + 'setInterval(() => {}, 1000)', + 'next', + 'dev', + '--port', + String(newerPort), + `${projectRoot}/app`, + ], + { stdio: 'ignore' } + ); + + try { + await new Promise((resolve) => setTimeout(resolve, 50)); + const result = await resolveBaseUrl({ + dataDir: join(projectRoot, '.next', 'workflow-data'), + }); + expect(result).toBe(`http://localhost:${newerPort}`); + expect(getWorkflowPort).not.toHaveBeenCalled(); + } finally { + olderHelper.kill(); + newerHelper.kill(); + await Promise.all([ + new Promise((resolve) => olderHelper.once('exit', resolve)), + new Promise((resolve) => newerHelper.once('exit', resolve)), + ]); + } }); it('should ignore PORT env var when config.port is provided', async () => { diff --git a/packages/world-local/src/config.ts b/packages/world-local/src/config.ts index 99431be3e1..a5772f4424 100644 --- a/packages/world-local/src/config.ts +++ b/packages/world-local/src/config.ts @@ -1,6 +1,15 @@ +import { execFile } from 'node:child_process'; +import { resolve } from 'node:path'; +import { promisify } from 'node:util'; import { getWorkflowPort } from '@workflow/utils/get-port'; import { once } from './util.js'; +const execFileAsync = promisify(execFile); +const COMMAND_PORT_PATTERN = + /(?:^|\s)(?:--port(?:=|\s+)|-p(?:=|\s+))(\d{1,5})(?=\s|$)/g; +const MAX_ANCESTOR_PORT_SCAN_DEPTH = 6; +let cachedAncestorCommandPort: number | undefined; + const getDataDirFromEnv = () => { return process.env.WORKFLOW_LOCAL_DATA_DIR || '.workflow-data'; }; @@ -11,6 +20,340 @@ const getBaseUrlFromEnv = () => { return process.env.WORKFLOW_LOCAL_BASE_URL; }; +const getNextPrivateOriginFromEnv = () => { + const origin = process.env.__NEXT_PRIVATE_ORIGIN; + if (!origin) { + return undefined; + } + + return origin; +}; + +function getPortFromEnvVariable(name: string): number | undefined { + return parsePort(process.env[name]); +} + +function parsePort(value: string | undefined, radix = 10): number | undefined { + if (!value) { + return undefined; + } + const port = Number.parseInt(value, radix); + if (!Number.isFinite(port) || port <= 0 || port > 65535) { + return undefined; + } + return port; +} + +function getPortFromCommand(command: string | undefined): number | undefined { + if (!command) { + return undefined; + } + + const matches = Array.from(command.matchAll(COMMAND_PORT_PATTERN)); + if (matches.length === 0) { + return undefined; + } + + // Node/Next scripts can contain repeated `--port` flags (e.g. script default + // plus CLI override). Use the last match, which matches CLI precedence. + const lastMatch = matches[matches.length - 1]; + if (!lastMatch?.[1]) { + return undefined; + } + + return parsePort(lastMatch[1]); +} + +function getPortFromLifecycleScript(): number | undefined { + return getPortFromCommand(process.env.npm_lifecycle_script); +} + +function getPortFromArgv(): number | undefined { + if (process.argv.length === 0) { + return undefined; + } + + return getPortFromCommand(process.argv.join(' ')); +} + +async function getParentPid(pid: number): Promise { + if (process.platform === 'win32') { + return undefined; + } + + try { + const { stdout } = await execFileAsync('ps', [ + '-o', + 'ppid=', + '-p', + String(pid), + ]); + const parentPid = parsePort(stdout.trim()); + if (typeof parentPid !== 'number' || parentPid <= 0) { + return undefined; + } + return parentPid; + } catch { + return undefined; + } +} + +async function getPortFromPidCommand(pid: number): Promise { + if (process.platform === 'win32') { + return undefined; + } + + try { + const { stdout } = await execFileAsync('ps', [ + '-o', + 'command=', + '-p', + String(pid), + ]); + return getPortFromCommand(stdout.trim()); + } catch { + return undefined; + } +} + +async function getPortFromAncestorCommands(): Promise { + if (process.platform === 'win32') { + return undefined; + } + + let currentPid = process.pid; + for ( + let depth = 0; + depth < MAX_ANCESTOR_PORT_SCAN_DEPTH && currentPid > 1; + depth++ + ) { + const commandPort = await getPortFromPidCommand(currentPid); + if (typeof commandPort === 'number') { + return commandPort; + } + + const parentPid = await getParentPid(currentPid); + if ( + typeof parentPid !== 'number' || + parentPid <= 0 || + parentPid === currentPid + ) { + break; + } + + currentPid = parentPid; + } + + return undefined; +} + +async function getCachedPortFromAncestorCommands(): Promise< + number | undefined +> { + if (typeof cachedAncestorCommandPort === 'number') { + return cachedAncestorCommandPort; + } + + const resolvedPort = await getPortFromAncestorCommands(); + if (typeof resolvedPort === 'number') { + cachedAncestorCommandPort = resolvedPort; + } + return resolvedPort; +} + +function normalizePath(filePath: string): string { + return filePath.replace(/\\/g, '/'); +} + +function getProjectRootFromDataDir( + dataDir: string | undefined +): string | undefined { + if (!dataDir) { + return undefined; + } + + const normalized = normalizePath(resolve(dataDir)); + + const nextSuffix = '/.next/workflow-data'; + if (normalized.endsWith(nextSuffix)) { + return normalized.slice(0, -nextSuffix.length); + } + + const fallbackSuffix = '/.workflow-data'; + if (normalized.endsWith(fallbackSuffix)) { + return normalized.slice(0, -fallbackSuffix.length); + } + + return undefined; +} + +async function getPortFromProjectProcessList( + projectRoot: string | undefined +): Promise { + if (!projectRoot || process.platform === 'win32') { + return undefined; + } + + const isCommandWithinProjectRoot = ( + command: string, + normalizedProjectRoot: string + ): boolean => { + return ( + command.includes(`${normalizedProjectRoot}/`) || + command.includes(`${normalizedProjectRoot} `) || + command.endsWith(normalizedProjectRoot) + ); + }; + + const getProcessCwd = async (pid: number): Promise => { + const { stdout: cwdOutput } = await execFileAsync('lsof', [ + '-a', + '-p', + String(pid), + '-d', + 'cwd', + '-Fn', + ]).catch(() => ({ stdout: '' as string })); + const cwdLine = cwdOutput + .split('\n') + .map((line) => line.trim()) + .find((line) => line.startsWith('n')); + if (!cwdLine || cwdLine.length < 2) { + return undefined; + } + return normalizePath(cwdLine.slice(1)); + }; + + const isHttpReachablePort = async (port: number): Promise => { + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 300); + const response = await fetch(`http://localhost:${port}/`, { + method: 'HEAD', + signal: controller.signal, + }).catch(async () => { + return await fetch(`http://localhost:${port}/`, { + method: 'GET', + signal: controller.signal, + }); + }); + clearTimeout(timeout); + return response.status >= 100 && response.status < 600; + } catch { + return false; + } + }; + + try { + const { stdout } = await execFileAsync('ps', ['-Ao', 'pid=,command=']); + const normalizedProjectRoot = normalizePath(projectRoot); + + const nextDevCommands: Array<{ pid: number; command: string }> = []; + const processEntries = stdout + .split('\n') + .map((entry) => entry.trim()) + .map((line) => { + const match = line.match(/^(\d+)\s+(.+)$/); + if (!match?.[1] || !match[2]) { + return null; + } + const pid = Number.parseInt(match[1], 10); + if (!Number.isFinite(pid)) { + return null; + } + return { + pid, + command: match[2], + }; + }) + .filter((entry): entry is { pid: number; command: string } => + Boolean(entry) + ) + .sort((a, b) => b.pid - a.pid); + + for (const { pid, command } of processEntries) { + if (command.includes('detached-flush')) { + continue; + } + + const looksLikeNextDevProcess = + command.includes('next') && + (command.includes(' dev') || command.includes('next-server')); + if (!looksLikeNextDevProcess) { + continue; + } + + let matchesProject = isCommandWithinProjectRoot( + command, + normalizedProjectRoot + ); + if (!matchesProject) { + const processCwd = await getProcessCwd(pid); + matchesProject = processCwd === normalizedProjectRoot; + } + if (!matchesProject) { + continue; + } + + nextDevCommands.push({ pid, command }); + } + + const candidatePorts: number[] = []; + const addCandidatePort = (port: number) => { + if (!candidatePorts.includes(port)) { + candidatePorts.push(port); + } + }; + + for (const entry of nextDevCommands) { + const parsedPort = getPortFromCommand(entry.command); + if (typeof parsedPort === 'number') { + addCandidatePort(parsedPort); + } + + const { stdout: lsofOutput } = await execFileAsync('lsof', [ + '-a', + '-i', + '-P', + '-n', + '-p', + String(entry.pid), + '-sTCP:LISTEN', + ]).catch(() => ({ stdout: '' as string })); + const lsofLines = lsofOutput.split('\n'); + for (const lsofLine of lsofLines) { + const parts = lsofLine.trim().split(/\s+/); + const address = parts[8]; + if (!address) { + continue; + } + const colonIndex = address.lastIndexOf(':'); + if (colonIndex === -1) { + continue; + } + const parsedLsofPort = parsePort(address.slice(colonIndex + 1)); + if (typeof parsedLsofPort === 'number') { + addCandidatePort(parsedLsofPort); + } + } + } + + for (const candidatePort of candidatePorts) { + if (await isHttpReachablePort(candidatePort)) { + return candidatePort; + } + } + + if (candidatePorts.length > 0) { + return candidatePorts[0]; + } + + return undefined; + } catch { + return undefined; + } +} + export type Config = { dataDir: string; port?: number; @@ -40,9 +383,15 @@ export const config = once(() => { * Resolves the base URL for queue requests following the priority order: * 1. config.baseUrl (highest priority - full override from args) * 2. WORKFLOW_LOCAL_BASE_URL env var (checked directly to handle late env var setting) - * 3. config.port (explicit port override from args) - * 4. PORT env var (explicit configuration) - * 5. Auto-detected port via getPort (detect actual listening port) + * 3. __NEXT_PRIVATE_ORIGIN env var (Next.js internal server origin) + * 4. config.port (explicit port override from args) + * 5. PORT env var (explicit configuration) + * 6. TURBO_PORT env var (set by turbo task runner in some monorepos) + * 7. npm_lifecycle_script --port/-p value (when dev script encodes the port) + * 8. process.argv --port/-p value + * 9. Process list lookup by WORKFLOW_LOCAL_DATA_DIR project root (multi-worker fallback) + * 10. Ancestor process command --port/-p value (for detached worker contexts) + * 11. Auto-detected port via getPort (detect actual listening port) */ export async function resolveBaseUrl(config: Partial): Promise { if (config.baseUrl) { @@ -55,12 +404,50 @@ export async function resolveBaseUrl(config: Partial): Promise { return process.env.WORKFLOW_LOCAL_BASE_URL; } + // Next.js sets this internal origin env var for server-side internal fetches. + // In dev, workflow queue calls can run in worker processes that are not the + // listening server process, so PORT/getWorkflowPort may be unavailable there. + const nextPrivateOrigin = getNextPrivateOriginFromEnv(); + if (nextPrivateOrigin) { + return nextPrivateOrigin; + } + if (typeof config.port === 'number') { - return `http://localhost:${config.port}`; + const parsedConfigPort = parsePort(String(config.port)); + if (typeof parsedConfigPort === 'number') { + return `http://localhost:${parsedConfigPort}`; + } + } + + const envPort = parsePort(process.env.PORT); + if (typeof envPort === 'number') { + return `http://localhost:${envPort}`; + } + + const turboPort = getPortFromEnvVariable('TURBO_PORT'); + if (typeof turboPort === 'number') { + return `http://localhost:${turboPort}`; + } + + const lifecyclePort = getPortFromLifecycleScript(); + if (typeof lifecyclePort === 'number') { + return `http://localhost:${lifecyclePort}`; + } + + const argvPort = getPortFromArgv(); + if (typeof argvPort === 'number') { + return `http://localhost:${argvPort}`; + } + + const projectRoot = getProjectRootFromDataDir(config.dataDir); + const projectProcessPort = await getPortFromProjectProcessList(projectRoot); + if (typeof projectProcessPort === 'number') { + return `http://localhost:${projectProcessPort}`; } - if (process.env.PORT) { - return `http://localhost:${process.env.PORT}`; + const ancestorCommandPort = await getCachedPortFromAncestorCommands(); + if (typeof ancestorCommandPort === 'number') { + return `http://localhost:${ancestorCommandPort}`; } const detectedPort = await getWorkflowPort(); diff --git a/packages/world-local/src/queue.test.ts b/packages/world-local/src/queue.test.ts index 8f0b4f4491..8b77ec6bda 100644 --- a/packages/world-local/src/queue.test.ts +++ b/packages/world-local/src/queue.test.ts @@ -47,6 +47,8 @@ describe('queue timeout re-enqueue', () => { }); afterEach(async () => { + delete process.env.WORKFLOW_LOCAL_QUEUE_WORKFLOW_ATTEMPT_TIMEOUT_MS; + vi.unstubAllGlobals(); await localQueue.close(); }); @@ -166,4 +168,135 @@ describe('queue timeout re-enqueue', () => { // setTimeout should NOT have been called for timeoutSeconds: 0 expect(mockSetTimeout).not.toHaveBeenCalled(); }); + + it('queue uses warmup retry delays for transient HTTP failures', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + localQueue.registerHandler('__wkf_step_', async () => { + callCount++; + if (callCount < 4) { + return new Response('warming up', { status: 500 }); + } + return Response.json({ ok: true }); + }); + + await localQueue.queue('__wkf_step_test' as any, stepPayload); + + await vi.waitFor(() => { + expect(callCount).toBe(4); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + expect(mockSetTimeout).toHaveBeenNthCalledWith(3, 1000); + }); + + it('queue falls back to 5s retry delays after warmup retries', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + localQueue.registerHandler('__wkf_step_', async () => { + callCount++; + if (callCount < 6) { + return new Response('still failing', { status: 500 }); + } + return Response.json({ ok: true }); + }); + + await localQueue.queue('__wkf_step_test' as any, stepPayload); + + await vi.waitFor(() => { + expect(callCount).toBe(6); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + expect(mockSetTimeout).toHaveBeenNthCalledWith(3, 1000); + expect(mockSetTimeout).toHaveBeenNthCalledWith(4, 5000); + expect(mockSetTimeout).toHaveBeenNthCalledWith(5, 5000); + }); + + it('queue retries when direct handler throws transport errors', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + localQueue.registerHandler('__wkf_step_', async () => { + callCount++; + if (callCount < 3) { + throw new Error('handler crashed'); + } + return Response.json({ ok: true }); + }); + + await localQueue.queue('__wkf_step_test' as any, stepPayload); + + await vi.waitFor(() => { + expect(callCount).toBe(3); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + }); + + it('queue retries workflow fetch transport failures', async () => { + const { setTimeout: mockSetTimeout } = await import('node:timers/promises'); + vi.mocked(mockSetTimeout).mockClear(); + + let callCount = 0; + const fetchMock = vi.fn(async () => { + callCount++; + if (callCount < 3) { + throw new Error('connect ECONNREFUSED 127.0.0.1:3000'); + } + return Response.json({ ok: true }); + }); + vi.stubGlobal('fetch', fetchMock); + + await localQueue.queue('__wkf_workflow_test' as any, { + runId: 'run_01ABC', + }); + + await vi.waitFor(() => { + expect(callCount).toBe(3); + }); + + expect(mockSetTimeout).toHaveBeenNthCalledWith(1, 250); + expect(mockSetTimeout).toHaveBeenNthCalledWith(2, 500); + }); + + it('queue waits for first workflow attempt before resolving', async () => { + let firstAttemptStarted = false; + let releaseFirstAttempt: (() => void) | null = null; + const firstAttemptGate = new Promise((resolve) => { + releaseFirstAttempt = resolve; + }); + + localQueue.registerHandler('__wkf_workflow_', async () => { + firstAttemptStarted = true; + await firstAttemptGate; + return Response.json({ ok: true }); + }); + + const queuePromise = localQueue.queue('__wkf_workflow_test' as any, { + runId: 'run_01ABC', + }); + + await vi.waitFor(() => { + expect(firstAttemptStarted).toBe(true); + }); + + let settled = false; + queuePromise.then(() => { + settled = true; + }); + await Promise.resolve(); + expect(settled).toBe(false); + + releaseFirstAttempt?.(); + await queuePromise; + }); }); diff --git a/packages/world-local/src/queue.ts b/packages/world-local/src/queue.ts index 5a7bf6d7f7..086a461868 100644 --- a/packages/world-local/src/queue.ts +++ b/packages/world-local/src/queue.ts @@ -47,6 +47,60 @@ const LOCAL_QUEUE_MAX_VISIBILITY = // persistent state and return another timeoutSeconds if needed. const MAX_SAFE_TIMEOUT_MS = 2147483647; +const WARMUP_RETRY_DELAYS_MS = [250, 500, 1000] as const; + +const DEFAULT_WORKFLOW_ATTEMPT_TIMEOUT_MS = 180_000; +function getWorkflowAttemptTimeoutMs(): number { + const raw = process.env.WORKFLOW_LOCAL_QUEUE_WORKFLOW_ATTEMPT_TIMEOUT_MS; + if (!raw) { + return DEFAULT_WORKFLOW_ATTEMPT_TIMEOUT_MS; + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_WORKFLOW_ATTEMPT_TIMEOUT_MS; + } + return parsed; +} + +const DEFAULT_WORKFLOW_FIRST_ATTEMPT_WAIT_MS = 1500; +function getWorkflowFirstAttemptWaitMs(): number { + const raw = process.env.WORKFLOW_LOCAL_QUEUE_WORKFLOW_FIRST_ATTEMPT_WAIT_MS; + if (!raw) { + return DEFAULT_WORKFLOW_FIRST_ATTEMPT_WAIT_MS; + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed <= 0) { + return DEFAULT_WORKFLOW_FIRST_ATTEMPT_WAIT_MS; + } + return parsed; +} + +function waitForMs(ms: number): Promise { + return new Promise((resolve) => { + const timer = globalThis.setTimeout(resolve, ms); + if (typeof (timer as { unref?: () => void }).unref === 'function') { + (timer as { unref: () => void }).unref(); + } + }); +} + +function getLocalRetryDelayMs(params: { + attempt: number; + status: number; +}): number { + const { attempt, status } = params; + const isTransientHttpFailure = status === 404 || status >= 500; + + // In local/dev mode, Next route handlers may fail briefly while compiling + // (especially with lazy discovery). Prefer fast warmup retries first. + if (isTransientHttpFailure && attempt <= WARMUP_RETRY_DELAYS_MS.length) { + return WARMUP_RETRY_DELAYS_MS[attempt - 1]; + } + + // Steady-state backoff to approximate VQS retry cadence. + return 5000; +} + // The local workers share the same Node.js process and event loop, // so we need to limit concurrency to avoid overwhelming the system. const DEFAULT_CONCURRENCY_LIMIT = 1000; @@ -115,6 +169,20 @@ export function createQueue(config: Partial): LocalQueue { const body = transport.serialize(message); const { pathname, prefix } = getQueueRoute(queueName); const messageId = MessageId.parse(`msg_${generateId()}`); + let firstAttemptDelivered = false; + let resolveFirstAttemptDelivery: (() => void) | null = null; + const firstAttemptDeliveryPromise = new Promise((resolve) => { + resolveFirstAttemptDelivery = () => { + if (firstAttemptDelivered) { + return; + } + firstAttemptDelivered = true; + resolve(); + }; + }); + const signalFirstAttemptDelivery = () => { + resolveFirstAttemptDelivery?.(); + }; // Extract identifiers from the message for structured logging. // Workflow messages have `runId`, step messages have `workflowRunId` + `stepId`. @@ -155,38 +223,73 @@ export function createQueue(config: Partial): LocalQueue { }; const directHandler = directHandlers.get(prefix); let response: Response; + let text = ''; + + try { + if (directHandler) { + const req = new Request( + `http://localhost/.well-known/workflow/v1/${pathname}`, + { + method: 'POST', + headers, + body, + } + ); + response = await directHandler(req); + } else { + const baseUrl = await resolveBaseUrl(config); + // Keep workflow route requests bounded so a stuck lazy-discovery + // build cannot block queue delivery forever. + const signal = + pathname === 'flow' + ? AbortSignal.timeout(getWorkflowAttemptTimeoutMs()) + : undefined; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit + response = await fetch( + `${baseUrl}/.well-known/workflow/v1/${pathname}`, + { + method: 'POST', + duplex: 'half', + dispatcher: httpAgent, + headers, + body, + signal, + } as any + ); + } - if (directHandler) { - const req = new Request( - `http://localhost/.well-known/workflow/v1/${pathname}`, + text = await response.text(); + } catch (error) { + if (attempt === 0) { + signalFirstAttemptDelivery(); + } + console.error( + `[world-local] Queue message transport failed (attempt ${attempt + 1})`, { - method: 'POST', - headers, - body, + queueName, + messageId, + ...(runId && { runId }), + ...(stepId && { stepId }), + error: error instanceof Error ? error.message : String(error), } ); - response = await directHandler(req); - } else { - const baseUrl = await resolveBaseUrl(config); - // eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit - response = await fetch( - `${baseUrl}/.well-known/workflow/v1/${pathname}`, - { - method: 'POST', - duplex: 'half', - dispatcher: httpAgent, - headers, - body, - } as any - ); - } - const text = await response.text(); + const delayMs = getLocalRetryDelayMs({ + attempt: attempt + 1, + status: 503, + }); + await setTimeout(delayMs); + continue; + } if (response.ok) { try { const timeoutSeconds = Number(JSON.parse(text).timeoutSeconds); if (Number.isFinite(timeoutSeconds) && timeoutSeconds >= 0) { + if (attempt === 0) { + signalFirstAttemptDelivery(); + } // Clamp to MAX_SAFE_TIMEOUT_MS to avoid Node.js setTimeout overflow warning. // When this fires early, the handler recalculates remaining time from // persistent state and returns another timeoutSeconds if needed. @@ -200,9 +303,15 @@ export function createQueue(config: Partial): LocalQueue { continue; } } catch {} + if (attempt === 0) { + signalFirstAttemptDelivery(); + } return; } + if (attempt === 0) { + signalFirstAttemptDelivery(); + } console.error( `[world-local] Queue message failed (attempt ${attempt + 1}, HTTP ${response.status})`, { @@ -214,13 +323,14 @@ export function createQueue(config: Partial): LocalQueue { } ); - // 5s linear backoff to approximate VQS retry timing in local dev. - // VQS uses 5s linear for attempts 1–32, then exponential, but for - // local dev linear 5s is sufficient — the handler enforces the real - // cap at MAX_QUEUE_DELIVERIES (48) which keeps total time under ~4min. - await setTimeout(5000); + const delayMs = getLocalRetryDelayMs({ + attempt: attempt + 1, + status: response.status, + }); + await setTimeout(delayMs); } + signalFirstAttemptDelivery(); console.error( `[world-local] Queue message exhausted safety limit (${MAX_LOCAL_SAFETY_LIMIT} attempts)`, { @@ -231,10 +341,12 @@ export function createQueue(config: Partial): LocalQueue { } ); } finally { + signalFirstAttemptDelivery(); semaphore.release(); } })() .catch((err) => { + signalFirstAttemptDelivery(); // Silently ignore client disconnect errors (e.g., browser refresh during streaming) // These are expected and should not cause unhandled rejection warnings const isAbortError = @@ -249,6 +361,13 @@ export function createQueue(config: Partial): LocalQueue { } }); + if (pathname === 'flow') { + await Promise.race([ + firstAttemptDeliveryPromise, + waitForMs(getWorkflowFirstAttemptWaitMs()), + ]); + } + return { messageId }; };