From 563c84be9059e82f846405ccbb8a45b6ec5630da Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 3 Apr 2026 11:57:47 -0700 Subject: [PATCH 1/3] Update Vercel queue delay cap for longer sleeps --- .changeset/strong-guests-study.md | 5 +++++ packages/world-vercel/src/queue.test.ts | 8 +++++--- packages/world-vercel/src/queue.ts | 18 +++++++++++------- pnpm-lock.yaml | 16 ++++++++-------- pnpm-workspace.yaml | 2 +- 5 files changed, 30 insertions(+), 19 deletions(-) create mode 100644 .changeset/strong-guests-study.md diff --git a/.changeset/strong-guests-study.md b/.changeset/strong-guests-study.md new file mode 100644 index 0000000000..bf87b0a121 --- /dev/null +++ b/.changeset/strong-guests-study.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-vercel': patch +--- + +Update the Vercel queue delay cap for longer sleeps. diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index b0a587fc69..0cc9d4132b 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -1,5 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +const MAX_DELAY_SECONDS = 7 * 24 * 60 * 60 - 60 * 60; + const { mockSend, MockDuplicateMessageError, @@ -404,7 +406,7 @@ describe('createQueue', () => { } }); - it('should clamp delaySeconds to max 23 hours for long sleeps', async () => { + it('should clamp delaySeconds to 1 hour less than 7 days for long sleeps', async () => { mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); let capturedHandler: ( @@ -422,7 +424,7 @@ describe('createQueue', () => { try { const queue = createQueue(); queue.createQueueHandler('__wkf_workflow_', async () => ({ - timeoutSeconds: 100000, + timeoutSeconds: 700000, })); await capturedHandler!( @@ -443,7 +445,7 @@ describe('createQueue', () => { expect(mockSend).toHaveBeenCalledTimes(1); // send(topicName, payload, options) const sendOpts = mockSend.mock.calls[0][2]; - expect(sendOpts.delaySeconds).toBe(82800); // MAX_DELAY_SECONDS + expect(sendOpts.delaySeconds).toBe(MAX_DELAY_SECONDS); } finally { if (originalEnv !== undefined) { process.env.VERCEL_DEPLOYMENT_ID = originalEnv; diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index ee18932687..d2cd778b69 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -1,5 +1,5 @@ import { AsyncLocalStorage } from 'node:async_hooks'; -import { QueueClient, DuplicateMessageError } from '@vercel/queue'; +import { DuplicateMessageError, QueueClient } from '@vercel/queue'; import { MessageId, type Queue, @@ -32,12 +32,12 @@ const MessageWrapper = z.object({ * rather than using visibility timeouts on the same message. * * Benefits of this approach: - * - Fresh 24-hour lifetime with each message (no message age tracking needed) + * - Fresh delay window with each message (no message age tracking needed) * - Messages fire at the scheduled time (no short-circuit + recheck pattern) * - Simpler conceptual model: messages are triggers with delivery schedules * - * For sleeps > 24 hours (max delay), we use chaining: - * 1. Schedule message with max delay (~23h, leaving buffer) + * For sleeps > 7 days (max delay), we use chaining: + * 1. Schedule message with max delay (~6d 23h, leaving 1h buffer) * 2. When it fires, workflow checks if sleep is complete * 3. If not, another delayed message is queued for remaining time * 4. Process repeats until the full sleep duration has elapsed @@ -48,8 +48,11 @@ const MessageWrapper = z.object({ * * These constants can be overridden via environment variables for testing. */ +const SECONDS_PER_HOUR = 60 * 60; +const MAX_QUEUE_DELAY_WINDOW_SECONDS = 7 * 24 * SECONDS_PER_HOUR; const MAX_DELAY_SECONDS = Number( - process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || 82800 // 23 hours - leave 1h buffer before 24h retention limit + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || + MAX_QUEUE_DELAY_WINDOW_SECONDS - SECONDS_PER_HOUR ); /** @@ -191,8 +194,9 @@ export function createQueue(config?: APIConfig): Queue { if (typeof result?.timeoutSeconds === 'number') { // When timeoutSeconds is 0, skip delaySeconds entirely for immediate re-enqueue. - // Otherwise, clamp to max delay (23h) - for longer sleeps, the workflow will chain - // multiple delayed messages until the full sleep duration has elapsed. + // Otherwise, clamp to the queue delay window minus a 1h buffer (6d 23h). + // For longer sleeps, the workflow will chain multiple delayed messages until + // the full sleep duration has elapsed. const delaySeconds = result.timeoutSeconds > 0 ? Math.min(result.timeoutSeconds, MAX_DELAY_SECONDS) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 70311f2e81..8a629af5d0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -22,8 +22,8 @@ catalogs: specifier: 3.2.0 version: 3.2.0 '@vercel/queue': - specifier: 0.1.4 - version: 0.1.4 + specifier: 0.1.6 + version: 0.1.6 '@vitest/coverage-v8': specifier: ^4.0.18 version: 4.0.18 @@ -1297,7 +1297,7 @@ importers: dependencies: '@vercel/queue': specifier: 'catalog:' - version: 0.1.4 + version: 0.1.6 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -1343,7 +1343,7 @@ importers: dependencies: '@vercel/queue': specifier: 'catalog:' - version: 0.1.4 + version: 0.1.6 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -1450,7 +1450,7 @@ importers: version: 3.2.0 '@vercel/queue': specifier: 'catalog:' - version: 0.1.4 + version: 0.1.6 '@workflow/errors': specifier: workspace:* version: link:../errors @@ -8100,8 +8100,8 @@ packages: '@opentelemetry/sdk-metrics': '>=1.19.0 <2.0.0' '@opentelemetry/sdk-trace-base': '>=1.19.0 <2.0.0' - '@vercel/queue@0.1.4': - resolution: {integrity: sha512-wo+jCycmCX078vQSbkX+RcLvySONDCK0f9aQp5UMKQD1+B+xKt3YVbIYbZukvoHQpbm5nnk6If+ADSeK/PmCgQ==} + '@vercel/queue@0.1.6': + resolution: {integrity: sha512-FUQ0ySYNm31ZO709lg6a2NPamzH5LpfU9QZwZVduxnOH0N/aNp+8rjKmYLDWQpdA/S+ihNexJV+NhbV3GFaumQ==} engines: {node: '>=20.0.0'} '@vercel/routing-utils@5.3.0': @@ -22610,7 +22610,7 @@ snapshots: '@opentelemetry/sdk-metrics': 1.30.1(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-trace-base': 1.30.1(@opentelemetry/api@1.9.0) - '@vercel/queue@0.1.4': + '@vercel/queue@0.1.6': dependencies: '@vercel/oidc': 3.2.0 minimatch: 10.2.4 diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 80605c1166..761923543a 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -11,7 +11,7 @@ catalog: "@types/node": 22.19.0 "@vercel/functions": ^3.4.3 "@vercel/oidc": 3.2.0 - "@vercel/queue": 0.1.4 + "@vercel/queue": 0.1.6 "@vitest/coverage-v8": ^4.0.18 ai: 6.0.116 esbuild: ^0.27.3 From 6b2568fee48a3a02e91aec84a66ca32eece98a90 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 3 Apr 2026 12:04:14 -0700 Subject: [PATCH 2/3] Validate Vercel queue max delay override --- packages/world-vercel/src/queue.test.ts | 110 ++++++++++++++++++++++++ packages/world-vercel/src/queue.ts | 25 ++++-- 2 files changed, 130 insertions(+), 5 deletions(-) diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index 0cc9d4132b..247a0c5ee9 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -455,6 +455,116 @@ describe('createQueue', () => { } }); + it('should fall back to the default max delay when the env override is invalid', async () => { + mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); + + let capturedHandler: ( + message: unknown, + metadata: unknown + ) => Promise; + mockHandleCallback.mockImplementation((handler) => { + capturedHandler = handler; + return async () => new Response('ok'); + }); + + const originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID; + const originalMaxDelay = process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test'; + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = 'not-a-number'; + + try { + const queue = createQueue(); + queue.createQueueHandler('__wkf_workflow_', async () => ({ + timeoutSeconds: 700000, + })); + + await capturedHandler!( + { + payload: { runId: 'run-123' }, + queueName: '__wkf_workflow_test', + deploymentId: 'dpl_original', + }, + { + messageId: 'msg-123', + deliveryCount: 1, + createdAt: new Date(), + topicName: '__wkf_workflow_test', + consumerGroup: 'test', + } + ); + + const sendOpts = mockSend.mock.calls[0][2]; + expect(sendOpts.delaySeconds).toBe(MAX_DELAY_SECONDS); + } finally { + if (originalDeploymentId !== undefined) { + process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId; + } else { + delete process.env.VERCEL_DEPLOYMENT_ID; + } + + if (originalMaxDelay !== undefined) { + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = originalMaxDelay; + } else { + delete process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + } + } + }); + + it('should clamp oversized env overrides to the default max delay', async () => { + mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); + + let capturedHandler: ( + message: unknown, + metadata: unknown + ) => Promise; + mockHandleCallback.mockImplementation((handler) => { + capturedHandler = handler; + return async () => new Response('ok'); + }); + + const originalDeploymentId = process.env.VERCEL_DEPLOYMENT_ID; + const originalMaxDelay = process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + process.env.VERCEL_DEPLOYMENT_ID = 'dpl_test'; + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = `${MAX_DELAY_SECONDS + 1}`; + + try { + const queue = createQueue(); + queue.createQueueHandler('__wkf_workflow_', async () => ({ + timeoutSeconds: 700000, + })); + + await capturedHandler!( + { + payload: { runId: 'run-123' }, + queueName: '__wkf_workflow_test', + deploymentId: 'dpl_original', + }, + { + messageId: 'msg-123', + deliveryCount: 1, + createdAt: new Date(), + topicName: '__wkf_workflow_test', + consumerGroup: 'test', + } + ); + + const sendOpts = mockSend.mock.calls[0][2]; + expect(sendOpts.delaySeconds).toBe(MAX_DELAY_SECONDS); + } finally { + if (originalDeploymentId !== undefined) { + process.env.VERCEL_DEPLOYMENT_ID = originalDeploymentId; + } else { + delete process.env.VERCEL_DEPLOYMENT_ID; + } + + if (originalMaxDelay !== undefined) { + process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS = originalMaxDelay; + } else { + delete process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + } + } + }); + it('should send new message without delaySeconds when handler returns timeoutSeconds: 0', async () => { mockSend.mockResolvedValue({ messageId: 'new-msg-123' }); diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index d2cd778b69..997794437a 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -50,10 +50,25 @@ const MessageWrapper = z.object({ */ const SECONDS_PER_HOUR = 60 * 60; const MAX_QUEUE_DELAY_WINDOW_SECONDS = 7 * 24 * SECONDS_PER_HOUR; -const MAX_DELAY_SECONDS = Number( - process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS || - MAX_QUEUE_DELAY_WINDOW_SECONDS - SECONDS_PER_HOUR -); +const DEFAULT_MAX_DELAY_SECONDS = + MAX_QUEUE_DELAY_WINDOW_SECONDS - SECONDS_PER_HOUR; + +function getMaxDelaySeconds(): number { + const rawMaxDelaySeconds = process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; + if ( + rawMaxDelaySeconds === undefined || + rawMaxDelaySeconds.trim().length === 0 + ) { + return DEFAULT_MAX_DELAY_SECONDS; + } + + const parsedMaxDelaySeconds = Number(rawMaxDelaySeconds); + if (!Number.isFinite(parsedMaxDelaySeconds) || parsedMaxDelaySeconds < 0) { + return DEFAULT_MAX_DELAY_SECONDS; + } + + return Math.min(Math.floor(parsedMaxDelaySeconds), DEFAULT_MAX_DELAY_SECONDS); +} /** * Extract known identifiers from a queue payload and return them as VQS headers. @@ -199,7 +214,7 @@ export function createQueue(config?: APIConfig): Queue { // the full sleep duration has elapsed. const delaySeconds = result.timeoutSeconds > 0 - ? Math.min(result.timeoutSeconds, MAX_DELAY_SECONDS) + ? Math.min(result.timeoutSeconds, getMaxDelaySeconds()) : undefined; // Send new message BEFORE acknowledging current message. From 407954ea97d3e91091d6bc21abaa1d5b6a826684 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Fri, 3 Apr 2026 13:03:49 -0700 Subject: [PATCH 3/3] Adjust Vercel queue re-enqueue margin --- .changeset/strong-guests-study.md | 2 +- packages/world-vercel/src/queue.test.ts | 4 +--- packages/world-vercel/src/queue.ts | 17 +++++++++-------- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/.changeset/strong-guests-study.md b/.changeset/strong-guests-study.md index bf87b0a121..67e7dfa815 100644 --- a/.changeset/strong-guests-study.md +++ b/.changeset/strong-guests-study.md @@ -2,4 +2,4 @@ '@workflow/world-vercel': patch --- -Update the Vercel queue delay cap for longer sleeps. +Update Vercel queue max delay for longer sleeps without having to re-enqueue as frequently diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index 247a0c5ee9..6f624ce6f1 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -1,7 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -const MAX_DELAY_SECONDS = 7 * 24 * 60 * 60 - 60 * 60; - const { mockSend, MockDuplicateMessageError, @@ -46,7 +44,7 @@ vi.mock('./utils.js', () => ({ getHeaders: vi.fn().mockReturnValue(new Map()), })); -import { createQueue } from './queue.js'; +import { createQueue, MAX_DELAY_SECONDS } from './queue.js'; describe('createQueue', () => { beforeEach(() => { diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 997794437a..8a14c475bb 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -37,7 +37,7 @@ const MessageWrapper = z.object({ * - Simpler conceptual model: messages are triggers with delivery schedules * * For sleeps > 7 days (max delay), we use chaining: - * 1. Schedule message with max delay (~6d 23h, leaving 1h buffer) + * 1. Schedule message with max delay (~6d, leaving a 24h re-enqueue margin) * 2. When it fires, workflow checks if sleep is complete * 3. If not, another delayed message is queued for remaining time * 4. Process repeats until the full sleep duration has elapsed @@ -49,9 +49,10 @@ const MessageWrapper = z.object({ * These constants can be overridden via environment variables for testing. */ const SECONDS_PER_HOUR = 60 * 60; -const MAX_QUEUE_DELAY_WINDOW_SECONDS = 7 * 24 * SECONDS_PER_HOUR; -const DEFAULT_MAX_DELAY_SECONDS = - MAX_QUEUE_DELAY_WINDOW_SECONDS - SECONDS_PER_HOUR; +export const RE_ENQUEUE_MARGIN_SECONDS = 24 * SECONDS_PER_HOUR; // 24 hours +export const MAX_QUEUE_DELAY_WINDOW_SECONDS = 7 * 24 * SECONDS_PER_HOUR; // 7 days +export const MAX_DELAY_SECONDS = + MAX_QUEUE_DELAY_WINDOW_SECONDS - RE_ENQUEUE_MARGIN_SECONDS; function getMaxDelaySeconds(): number { const rawMaxDelaySeconds = process.env.VERCEL_QUEUE_MAX_DELAY_SECONDS; @@ -59,15 +60,15 @@ function getMaxDelaySeconds(): number { rawMaxDelaySeconds === undefined || rawMaxDelaySeconds.trim().length === 0 ) { - return DEFAULT_MAX_DELAY_SECONDS; + return MAX_DELAY_SECONDS; } const parsedMaxDelaySeconds = Number(rawMaxDelaySeconds); if (!Number.isFinite(parsedMaxDelaySeconds) || parsedMaxDelaySeconds < 0) { - return DEFAULT_MAX_DELAY_SECONDS; + return MAX_DELAY_SECONDS; } - return Math.min(Math.floor(parsedMaxDelaySeconds), DEFAULT_MAX_DELAY_SECONDS); + return Math.min(Math.floor(parsedMaxDelaySeconds), MAX_DELAY_SECONDS); } /** @@ -209,7 +210,7 @@ export function createQueue(config?: APIConfig): Queue { if (typeof result?.timeoutSeconds === 'number') { // When timeoutSeconds is 0, skip delaySeconds entirely for immediate re-enqueue. - // Otherwise, clamp to the queue delay window minus a 1h buffer (6d 23h). + // Otherwise, clamp to the queue delay window minus a 24h buffer (6d). // For longer sleeps, the workflow will chain multiple delayed messages until // the full sleep duration has elapsed. const delaySeconds =