Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/job-queue/src/job/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { JobStatus } from "../queue-storage/IQueueStorage";
import { JobError } from "./JobError";
import type { JobProgressListener } from "./JobQueueEventListeners";
import type { JobProgressListener, StreamEventLike } from "./JobQueueEventListeners";

export { JobStatus };

Expand All @@ -20,6 +20,12 @@ export interface IJobExecuteContext {
message?: string,
details?: Record<string, any> | null
) => Promise<void>;
/**
* OPTIONAL. Present only when the worker's transport can deliver stream
* events. Jobs MUST NOT retain references to chunk buffers after calling
* this (buffers may be transferred across a worker boundary and detached).
*/
emitStreamEvent?: (event: StreamEventLike) => void;
}

/**
Expand Down
67 changes: 66 additions & 1 deletion packages/job-queue/src/job/JobQueueClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import {
JobQueueEventListeners,
JobQueueEventParameters,
JobQueueEvents,
JobStreamListener,
type StreamEventLike,
} from "./JobQueueEventListeners";
import type { JobQueueServer } from "./JobQueueServer";
import { storageToClass } from "./JobStorageConverters";
Expand All @@ -38,6 +40,12 @@ export interface JobHandle<Output> {
waitFor(): Promise<Output>;
abort(): Promise<void>;
onProgress(callback: JobProgressListener): () => void;
/**
* OPTIONAL — present only when this handle's transport can deliver stream
* events (a same-process server-attached queue). Absent on storage-only
* backends; callers branch on `typeof handle.onStream === "function"`.
*/
onStream?(callback: JobStreamListener): () => void;
}

/**
Expand Down Expand Up @@ -78,6 +86,11 @@ export class JobQueueClient<Input, Output> {
*/
protected readonly jobProgressListeners: Map<unknown, Set<JobProgressListener>> = new Map();

/**
* Map of job IDs to their stream listeners
*/
protected readonly jobStreamListeners: Map<unknown, Set<JobStreamListener>> = new Map();

/**
* Last known progress state for each job
*/
Expand Down Expand Up @@ -391,6 +404,27 @@ export class JobQueueClient<Input, Output> {
};
}

/**
* Subscribe to stream events for a specific job
*/
public onJobStream(jobId: unknown, listener: JobStreamListener): () => void {
if (!this.jobStreamListeners.has(jobId)) {
this.jobStreamListeners.set(jobId, new Set());
}
const listeners = this.jobStreamListeners.get(jobId)!;
listeners.add(listener);

return () => {
const listeners = this.jobStreamListeners.get(jobId);
if (listeners) {
listeners.delete(listener);
if (listeners.size === 0) {
this.jobStreamListeners.delete(jobId);
}
}
};
}

// ========================================================================
// Event handling
// ========================================================================
Expand Down Expand Up @@ -524,23 +558,54 @@ export class JobQueueClient<Input, Output> {
}
}

/**
* Called by server when a job emits a stream event. Listener throws are
* isolated per-listener — one misbehaving subscriber does not interrupt
* delivery to the rest or abort the dispatch itself.
* @internal
*/
public handleJobStream(jobId: unknown, event: StreamEventLike): void {
this.events.emit("job_stream", this.queueName, jobId, event);

const listeners = this.jobStreamListeners.get(jobId);
if (!listeners) return;
for (const listener of listeners) {
try {
listener(event);
} catch (err) {
getLogger().error("JobHandle.onStream listener threw", {
jobId,
error: err instanceof Error ? err.message : String(err),
});
}
}
}

// ========================================================================
// Private helpers
// ========================================================================

private createJobHandle(id: unknown): JobHandle<Output> {
return {
const handle: JobHandle<Output> = {
id,
waitFor: () => this.waitFor(id),
abort: () => this.abort(id),
onProgress: (callback: JobProgressListener) => this.onJobProgress(id, callback),
};
// Stream delivery requires a same-process server-attached transport — the
// same signal `connect()` uses. Storage-only backends omit `onStream`, so
// callers branch on `typeof handle.onStream === "function"`.
if (this.server) {
handle.onStream = (callback: JobStreamListener) => this.onJobStream(id, callback);
}
return handle;
}

private cleanupJob(jobId: unknown): void {
this.activeJobPromises.delete(jobId);
this.lastKnownProgress.delete(jobId);
this.jobProgressListeners.delete(jobId);
this.jobStreamListeners.delete(jobId);
}

private handleStorageChange(change: QueueChangePayload<Input, Output>): void {
Expand Down
14 changes: 14 additions & 0 deletions packages/job-queue/src/job/JobQueueEventListeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export type JobQueueEventListeners<Input, Output> = {
message: string,
details: Record<string, any> | null
) => void;
job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void;
};

export type JobQueueEvents = keyof JobQueueEventListeners<any, any>;
Expand All @@ -46,3 +47,16 @@ export type JobProgressListener = (
message: string,
details: Record<string, any> | null
) => void;

/**
* Minimal structural shape of a stream event crossing the job-queue boundary.
*
* `@workglow/job-queue` sits below `@workglow/task-graph` in the dependency
* graph, so it cannot import task-graph's `StreamEvent`. This structural type
* captures just what the queue plumbing needs; task-graph's `StreamEvent` is
* assignable to it, so real stream producers interoperate transparently.
*/
export type StreamEventLike = { type: string; port?: string; [k: string]: unknown };

/** Listener for cross-process stream events emitted by an executing job. */
export type JobStreamListener = (event: StreamEventLike) => void;
12 changes: 12 additions & 0 deletions packages/job-queue/src/job/JobQueueServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import type { JobStorageFormat, QueueChangePayload } from "../queue-storage/IQue
import { JobStatus } from "../queue-storage/IQueueStorage";
import type { DeadLetter } from "./DeadLetter";
import { Job, JobClass } from "./Job";
import type { StreamEventLike } from "./JobQueueEventListeners";
import { JobQueueClient } from "./JobQueueClient";
import { JobQueueWorker } from "./JobQueueWorker";
import { classToStorage, storageToClass } from "./JobStorageConverters";
Expand Down Expand Up @@ -49,6 +50,7 @@ export type JobQueueServerEventListeners<Input, Output> = {
message: string,
details: Record<string, unknown> | null
) => void;
job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void;
};

export type JobQueueServerEvents = keyof JobQueueServerEventListeners<unknown, unknown>;
Expand Down Expand Up @@ -484,6 +486,11 @@ export class JobQueueServer<
this.forwardToClients("handleJobProgress", jobId, progress, message, details);
});

worker.on("job_stream", (jobId, event) => {
this.events.emit("job_stream", this.queueName, jobId, event);
this.forwardToClients("handleJobStream", jobId, event);
});

return worker;
}

Expand All @@ -507,6 +514,11 @@ export class JobQueueServer<
message: string,
details: Record<string, unknown> | null
): void;
protected forwardToClients(
method: "handleJobStream",
jobId: unknown,
event: StreamEventLike
): void;
protected forwardToClients(method: string, ...args: unknown[]): void {
for (const client of this.clients) {
const fn = (client as any)[method];
Expand Down
14 changes: 14 additions & 0 deletions packages/job-queue/src/job/JobQueueWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
RetryableJobError,
} from "./JobError";
import { withJobErrorDiagnostics } from "./JobErrorDiagnostics";
import type { StreamEventLike } from "./JobQueueEventListeners";
import { classToStorage, storageToClass } from "./JobStorageConverters";

/**
Expand All @@ -56,6 +57,7 @@ export type JobQueueWorkerEventListeners<Input, Output> = {
message: string,
details: Record<string, unknown> | null
) => void;
job_stream: (jobId: unknown, event: StreamEventLike) => void;
worker_start: () => void;
worker_stop: () => void;
};
Expand Down Expand Up @@ -814,6 +816,7 @@ export class JobQueueWorker<
return await job.execute(job.input, {
signal,
updateProgress: this.updateProgress.bind(this, job.id),
emitStreamEvent: (event) => this.emitStreamEvent(job.id, event),
});
}

Expand All @@ -835,6 +838,17 @@ export class JobQueueWorker<
this.events.emit("job_progress", jobId, progress, message, details);
}

/**
* Emit a cross-process stream event for a job.
*
* Mirrors {@link updateProgress}: stream events are delivered in-memory via
* the `job_stream` event and forwarded by an attached `JobQueueServer` to
* subscribed clients. Storage is not touched.
*/
protected emitStreamEvent(jobId: unknown, event: StreamEventLike): void {
this.events.emit("job_stream", jobId, event);
}

/** Internal — resolve the active claim for a job id, throw if missing. */
private getClaim(jobId: unknown): IClaim<JobStorageFormat<Input, Output>> | undefined {
return this.activeClaims.get(jobId);
Expand Down
57 changes: 57 additions & 0 deletions packages/task-graph/src/cache/CacheRef.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* @license
* Copyright 2026 Steven Roussey <sroussey@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/

/**
* A reference to bytes that live in the configured cache backing rather than
* inline in a task `Output`. Emitted by `TaskRunner` for binary output ports
* whose committed size meets the `IRunConfig.referenceThresholdBytes` and
* whose cache backing implements `saveOutputStream`.
*
* `$ref` is opaque to consumers: only the cache backing knows how to translate
* it back into bytes. `size` and `mime` are best-effort hints populated when
* known at finish time; absent values do not imply unknown failure.
*
* Resolution is best-effort: the cache backing's TTL is the lifetime contract,
* and `resolveOutputRef` returns `undefined` when the underlying entry has
* been evicted.
*/
export type CacheRef = {
readonly $ref: string;
readonly size?: number;
readonly mime?: string;
};

/**
* Narrow an unknown value to {@link CacheRef}. The discriminator is a `$ref`
* property of type `string`; other fields are optional and not inspected.
*/
export function isCacheRef(value: unknown): value is CacheRef {
if (typeof value !== "object" || value === null) return false;
const candidate = value as { readonly $ref?: unknown };
return typeof candidate.$ref === "string";
}

/**
* Default threshold (in bytes) at which a binary output port becomes a
* {@link CacheRef} instead of being inlined in `Output`. Below this size, the
* runner inlines the bytes; at or above, it emits a reference.
*
* `0` is a sentinel meaning "always emit a reference" and is honored by the
* runtime path (a callsite that wants to force refs sets `0` explicitly via
* `IRunConfig.referenceThresholdBytes`).
*/
export const REFERENCE_THRESHOLD_BYTES_DEFAULT = 65_536;

/**
* Resolve the effective reference threshold for a run, falling back to
* {@link REFERENCE_THRESHOLD_BYTES_DEFAULT} when unset. A negative value is
* treated as the default (negative thresholds are nonsensical).
*/
export function resolveReferenceThreshold(threshold: number | undefined): number {
if (threshold === undefined) return REFERENCE_THRESHOLD_BYTES_DEFAULT;
if (threshold < 0) return REFERENCE_THRESHOLD_BYTES_DEFAULT;
return threshold;
}
64 changes: 64 additions & 0 deletions packages/task-graph/src/cache/RunPrivateCacheRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { TaskOutputRepository } from "../storage/TaskOutputRepository";
import type { TaskInput, TaskOutput } from "../task/TaskTypes";
import type { CacheRef } from "./CacheRef";

export interface RunPrivateCacheRepoOptions {
backing: TaskOutputRepository;
Expand All @@ -31,6 +32,20 @@ export class RunPrivateCacheRepo extends TaskOutputRepository {
super({ outputCompression: backing.outputCompression });
this.backing = backing;
this.runId = runId;
// Mirror the backing's optional-method shape on this instance so callers
// probing `typeof repo.saveOutputStream === "function"` (or the
// getOutputByRef/getOutputStreamByRef siblings) see the true capability
// instead of the always-present wrapper override. Class methods live on
// the prototype; assigning `undefined` on the instance shadows them.
if (typeof backing.saveOutputStream !== "function") {
(this as { saveOutputStream?: unknown }).saveOutputStream = undefined;
}
if (typeof backing.getOutputByRef !== "function") {
(this as { getOutputByRef?: unknown }).getOutputByRef = undefined;
}
if (typeof backing.getOutputStreamByRef !== "function") {
(this as { getOutputStreamByRef?: unknown }).getOutputStreamByRef = undefined;
}
}

private ns(taskType: string): string {
Expand All @@ -50,6 +65,55 @@ export class RunPrivateCacheRepo extends TaskOutputRepository {
return this.backing.getOutput(this.ns(taskType), inputs);
}

/**
* Forwards the streaming sink to the backing repository, applying the same
* `runId` namespacing as `saveOutput`. Only present in effect when the
* backing repo supports streaming; `supportsStreaming()` (below) reflects the
* backing repo so callers branch correctly before calling this.
*
* Returns whatever {@link CacheRef} the backing produced (already namespaced
* via the wrapped `taskType`). Resolvers calling `getOutputByRef` on this
* wrapper forward to the backing, which decodes its own `$ref`.
*/
public override saveOutputStream(
taskType: string,
inputs: TaskInput,
chunks: AsyncIterable<Uint8Array>,
metadata: Record<string, unknown>
): Promise<CacheRef> {
const fn = this.backing.saveOutputStream;
if (typeof fn !== "function") {
return Promise.reject(
new Error(
`RunPrivateCacheRepo: backing repository does not implement saveOutputStream. ` +
`Call supportsStreaming() before saveOutputStream.`
)
);
}
return fn.call(this.backing, this.ns(taskType), inputs, chunks, metadata);
}

/**
* Forwards by-ref retrieval to the backing repository. The `$ref` already
* encodes whatever the backing needs to locate the entry; no namespacing is
* re-applied here.
*/
public override getOutputByRef(ref: CacheRef): Promise<Blob | undefined> {
if (typeof this.backing.getOutputByRef !== "function") return Promise.resolve(undefined);
return this.backing.getOutputByRef(ref);
}

/** Forwards streaming by-ref retrieval to the backing repository. */
public override getOutputStreamByRef(ref: CacheRef): AsyncIterable<Uint8Array> | undefined {
if (typeof this.backing.getOutputStreamByRef !== "function") return undefined;
return this.backing.getOutputStreamByRef(ref);
}

/** Mirrors the backing repository's streaming capability. */
public override supportsStreaming(): boolean {
return this.backing.supportsStreaming();
}

/**
* Override of `TaskOutputRepository.clear()` that only deletes entries
* namespaced under THIS wrapper's `runId`. Entries from other runs are not
Expand Down
3 changes: 3 additions & 0 deletions packages/task-graph/src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@

export * from "./CacheJanitor";
export * from "./CachePolicy";
export * from "./CacheRef";
export * from "./CacheRegistry";
export * from "./resolveJobOutput";
export * from "./resolveRef";
export * from "./RunPrivateCacheRepo";
Loading