Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions packages/ai/src/agent/durable-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -861,21 +861,38 @@ export class DurableAgent<TBaseTools extends ToolSet = ToolSet> {
options.timeout !== undefined &&
typeof AbortController !== 'undefined'
) {
const timeoutController = new AbortController();
timeoutId = setTimeout(() => timeoutController.abort(), options.timeout);
const timeoutSignal = timeoutController.signal;
if (effectiveAbortSignal) {
// Combine: whichever fires first wins
const combined = new AbortController();
effectiveAbortSignal.addEventListener('abort', () => combined.abort(), {
once: true,
});
timeoutSignal.addEventListener('abort', () => combined.abort(), {
once: true,
});
effectiveAbortSignal = combined.signal;
} else {
effectiveAbortSignal = timeoutSignal;
// In the workflow VM, setTimeout is replaced with a throwing stub.
// Probe it with a no-op to detect whether real timers are available.
let hasTimers = false;
try {
const probe = setTimeout(() => {}, 0);
clearTimeout(probe);
hasTimers = true;
} catch {
// setTimeout not available (e.g. workflow VM) — skip timeout setup
}

if (hasTimers) {
const timeoutController = new AbortController();
timeoutId = setTimeout(
() => timeoutController.abort(),
options.timeout
);
const timeoutSignal = timeoutController.signal;
if (effectiveAbortSignal) {
const combined = new AbortController();
effectiveAbortSignal.addEventListener(
'abort',
() => combined.abort(),
{ once: true }
);
timeoutSignal.addEventListener('abort', () => combined.abort(), {
once: true,
});
effectiveAbortSignal = combined.signal;
} else {
effectiveAbortSignal = timeoutSignal;
}
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/runtime/step-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ vi.mock('../serialization.js', () => ({
dehydrateStepReturnValue: vi
.fn()
.mockResolvedValue(new Uint8Array([1, 2, 3])),
cancelAbortReaders: vi.fn(),
}));

// Mock context storage
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/runtime/step-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { importKey } from '../encryption.js';
import { runtimeLogger, stepLogger } from '../logger.js';
import { getStepFunction } from '../private.js';
import {
cancelAbortReaders,
dehydrateStepReturnValue,
hydrateStepArguments,
} from '../serialization.js';
Expand Down Expand Up @@ -524,6 +525,8 @@ const stepHandler = getWorldHandlers().createQueueHandler(
}
const executionTimeMs = Date.now() - executionStartTime;

cancelAbortReaders(...args, thisVal, hydratedInput.closureVars);

span?.setAttributes({
...Attribute.QueueExecutionTimeMs(executionTimeMs),
});
Expand Down
29 changes: 12 additions & 17 deletions packages/core/src/runtime/suspension-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
} from '../global.js';
import { runtimeLogger } from '../logger.js';
import { dehydrateStepArguments } from '../serialization.js';
import { getAbortStreamIdFromToken } from '../util.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { serializeTraceCarrier } from '../telemetry.js';
import { queueMessage } from './helpers.js';
Expand Down Expand Up @@ -116,6 +117,7 @@ export async function handleSuspension({
token: queueItem.token,
metadata: hookMetadata,
isWebhook: queueItem.isWebhook ?? false,
...(queueItem.isSystem && { isSystem: true }),
},
};
})
Expand Down Expand Up @@ -232,10 +234,7 @@ export async function handleSuspension({

// Write stream cancellation packet for real-time step propagation
try {
// The stream name is derived from the hook token
// (abort hooks use token format `abrt_{id}`, stream is `strm_{id}_system_abort`)
const abortId = queueItem.token.replace('abrt_', '');
const streamName = `strm_${abortId}_system_abort`;
const streamName = getAbortStreamIdFromToken(queueItem.token);
await world.writeToStream(
streamName,
runId,
Expand All @@ -255,19 +254,15 @@ export async function handleSuspension({
);
}
} catch (err) {
if (WorkflowAPIError.is(err)) {
if (err.status === 410) {
runtimeLogger.info(
'Workflow run already completed, skipping abort',
{
workflowRunId: runId,
correlationId: queueItem.correlationId,
message: err.message,
}
);
} else {
throw err;
}
if (EntityConflictError.is(err) || RunExpiredError.is(err)) {
runtimeLogger.info(
'Workflow run already completed, skipping abort',
{
workflowRunId: runId,
correlationId: queueItem.correlationId,
message: err.message,
}
);
} else {
throw err;
}
Expand Down
126 changes: 118 additions & 8 deletions packages/core/src/serialization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4714,6 +4714,83 @@ describe('AbortController serialization', () => {
throw e;
}
});

it('stream reader triggers abort when abort payload arrives', async () => {
// Override the global getWorld mock to return a readFromStream that
// delivers an actual abort payload, verifying the stream reader in
// reviveAbortController processes it correctly (not masked by the
// default immediately-closed stream mock).
const { getWorld } = await import('./runtime/world.js');
const abortPayload = new TextEncoder().encode(
JSON.stringify({ reason: 'stream-abort-reason' })
);
const readFromStreamMock = vi.fn().mockResolvedValue(
new ReadableStream({
start(c) {
c.enqueue(abortPayload);
c.close();
},
})
);
vi.mocked(getWorld).mockReturnValueOnce({
writeToStream: vi.fn().mockResolvedValue(undefined),
writeToStreamMulti: vi.fn().mockResolvedValue(undefined),
closeStream: vi.fn().mockResolvedValue(undefined),
readFromStream: readFromStreamMock,
listStreamsByRunId: vi.fn().mockResolvedValue([]),
} as any);

try {
const controller: any = {};
controller[ABORT_STREAM_NAME] =
'strm_01ABORT0000000000STRM_system_abort';
controller[ABORT_HOOK_TOKEN] = 'abrt_01ABORT0000000000STRM';
const signal: any = {};
signal[ABORT_STREAM_NAME] = 'strm_01ABORT0000000000STRM_system_abort';
signal[ABORT_HOOK_TOKEN] = 'abrt_01ABORT0000000000STRM';
signal.aborted = false;
signal.reason = undefined;
controller.signal = signal;

const origAC = vmGlobalThis.AbortController;
const origAS = vmGlobalThis.AbortSignal;
function FakeAC() {}
function FakeAS() {}
Object.setPrototypeOf(controller, FakeAC.prototype);
Object.setPrototypeOf(signal, FakeAS.prototype);
vmGlobalThis.AbortController = FakeAC;
vmGlobalThis.AbortSignal = FakeAS;

const serialized = await dehydrateStepArguments(
controller,
mockRunId,
noEncryptionKey,
vmGlobalThis
);

const ops: Promise<void>[] = [];
const hydrated = await hydrateStepArguments(
serialized,
mockRunId,
noEncryptionKey,
ops
);

expect(hydrated).toBeInstanceOf(AbortController);
expect(hydrated.signal.aborted).toBe(false);

// Wait for the stream reader op to process the abort payload
await Promise.all(ops);

expect(hydrated.signal.aborted).toBe(true);
expect(hydrated.signal.reason).toBe('stream-abort-reason');

vmGlobalThis.AbortController = origAC;
vmGlobalThis.AbortSignal = origAS;
} catch (e) {
throw e;
}
});
});

describe('step return value (step → workflow)', () => {
Expand Down Expand Up @@ -4884,18 +4961,23 @@ describe('AbortController serialization', () => {
});

describe('integration with Request', () => {
it('Request with signal: new Request(url, { signal }) preserves signal through round-trip', async () => {
it('Request with workflow-managed signal preserves signal through step hydration', async () => {
const originalStableUlid = (globalThis as any)[STABLE_ULID];
(globalThis as any)[STABLE_ULID] = () => '01ABORT000000000000E';
try {
// Use an aborted signal because the Request reducer only includes
// signals that are aborted or have ABORT_STREAM_NAME set
// The Request constructor copies the signal internally, so symbols
// set on the original controller.signal won't appear on request.signal.
// To test the Request+signal serialization path, set the symbol
// directly on the Request's own signal after construction.
const controller = new AbortController();
controller.abort('request cancelled');
const request = new Request('https://example.com/api', {
method: 'POST',
signal: controller.signal,
});
(request.signal as any)[ABORT_STREAM_NAME] =
'strm_01ABORT000000000000E_system_abort';
(request.signal as any)[ABORT_HOOK_TOKEN] = 'abrt_01ABORT000000000000E';
const ops: Promise<void>[] = [];

const serialized = await dehydrateWorkflowArguments(
Expand All @@ -4905,25 +4987,53 @@ describe('AbortController serialization', () => {
ops
);

const hydrated = (await hydrateWorkflowArguments(
const hydrated = (await hydrateStepArguments(
serialized,
mockRunId,
noEncryptionKey,
vmGlobalThis
ops
)) as Request;

vmGlobalThis.val = hydrated;
expect(runInContext('val instanceof Request', context)).toBe(true);
expect(hydrated).toBeInstanceOf(Request);
expect(hydrated.url).toBe('https://example.com/api');
expect(hydrated.method).toBe('POST');
// The signal should exist and be aborted with the reason preserved
expect(hydrated.signal).toBeDefined();
expect(hydrated.signal.aborted).toBe(true);
expect(hydrated.signal.reason).toBe('request cancelled');
} finally {
(globalThis as any)[STABLE_ULID] = originalStableUlid;
}
});

it('Request with plain (non-workflow) signal does not serialize the signal', async () => {
const controller = new AbortController();
controller.abort('user timeout');
const request = new Request('https://example.com/api', {
method: 'GET',
signal: controller.signal,
});
const ops: Promise<void>[] = [];

const serialized = await dehydrateWorkflowArguments(
request,
mockRunId,
noEncryptionKey,
ops
);

const hydrated = (await hydrateStepArguments(
serialized,
mockRunId,
noEncryptionKey,
ops
)) as Request;

expect(hydrated).toBeInstanceOf(Request);
expect(hydrated.url).toBe('https://example.com/api');
expect(hydrated.method).toBe('GET');
// Plain signals are not serialized — the hydrated Request gets a fresh default signal
expect(hydrated.signal.aborted).toBe(false);
});
});

describe('encryption', () => {
Expand Down
Loading
Loading