From 50fcede6a363e77c3e0fb604ffa060df846cab58 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Jun 2026 05:36:28 +0000 Subject: [PATCH 1/2] feat(task-graph): binary-streaming framework + result-as-reference MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spec 1 — binary-delta streaming framework ----------------------------------------- Adds a `binary-delta` variant to `StreamEvent` (analogous to `text-delta` / `object-delta`) plus an `x-stream: "binary"` annotation on output port schemas, so a task can `executeStream` byte chunks the same way it streams text or structured objects. New port helpers (`getBinaryPortId`, `getBinaryPortFormat`, `getStreamingPorts`), a `materializeBinary` assembler (Blob for `format: "blob"`/absent, ArrayBuffer for `format: "binary"`), and a `getOutputStreamMode` adopter let downstream code branch cleanly on binary mode without reaching for `any`. StreamProcessor accumulates `binary-delta` chunks per port and merges them into the enriched finish event so downstream dataflows see the materialized payload (or, for explicit binary finish payloads, the artifact wins per Spec 1's precedence rule). StreamPump adds the graph-aware decision (`canStreamBinaryToCache`, `anyConsumerNeedsMaterialized`) and the `pipeBinaryToCache` assembly helper that turns a task's `binary-delta` events into an `AsyncIterable` ready to drive a streaming cache sink. `TaskOutputRepository` gains an optional `saveOutputStream` sink so file-backed (or other stream-capable) caches can ingest bytes without materializing the full payload; `supportsStreaming()` and the `RunPrivateCacheRepo` wrapper forward the capability correctly. Spec 2 — result-as-reference ---------------------------- Builds on Spec 1 to close the queue-row-bloat hole: when the cache backing supports streaming, the runner pipes the binary bytes straight to the cache and places a `CacheRef` placeholder in `Output` at the port slot. Downstream `Output` consumers (and the queue row) see a small envelope (`{ \$ref, size?, mime? }`) instead of the full payload, while the bytes live in the cache for hydration on demand. Pieces: - `CacheRef` type + `isCacheRef` type guard (`cache/CacheRef.ts`). - `resolveOutput` walker (`cache/resolveRef.ts`) — pure recursive walker that hydrates refs through a caller-supplied resolver. Identity is preserved when no descendant matches the optional filter; class instances (`Error`, `URL`, custom classes) survive with prototype intact; `Map`/`Set` are walked through so nested refs resolve; opaque leaves are `Blob`/`ArrayBuffer`/`TypedArray`/`Date`/`RegExp`/`Promise`. - `resolveJobOutput` queue-boundary bridge (`cache/resolveJobOutput.ts`) accepting either a `CacheRefResolver` function or any object exposing `getOutputByRef` (`TaskOutputRepository` shape). - `IRunConfig.referenceThresholdBytes` (default 64 KiB; `0` forces ref for every binary output). - `TaskOutputRepository.saveOutputStream` now returns `Promise`; new `getOutputByRef` / `getOutputStreamByRef` readers complete the contract. - `CacheCoordinator.getBinaryRefSinksByPolicy` derives a per-port `BinaryRefSink` map; `hydrateRefsBelowThreshold` rehydrates refs whose committed size falls below the configured threshold (schema-restricted to binary streaming ports so legitimate `{\$ref: string}` fields in non-binary slots are not mistakenly hit against the cache). - `StreamProcessor` routes `binary-delta` chunks to a `BinaryRefSink` via a small `BinaryStreamRouter` producer-consumer pump. - `TaskRunner` reads the threshold, builds sinks, threads them through `StreamProcessor`, and rehydrates below-threshold refs in the post-run pass — saveByPolicy then writes the small ref-bearing Output. - StreamProcessor TEES when both an accumulator and a router exist for a port (graph context where the cache can stream AND a downstream edge needs materialized bytes): the emitted finish event carries the materialized Blob/ArrayBuffer for edge consumers; `finalOutput` carries the CacheRef so the queue/cache row stays small. - `RunPrivateCacheRepo` forwards all three new optional methods, mirroring the backing's true capability on the wrapper instance (assigning `undefined` when the backing lacks them) so callers probing `typeof === "function"` see the truth. Tests cover binary-delta accumulation + explicit-finish-payload precedence, port helpers, cache decision + assembly, runner pipe + force-ref + threshold rehydrate, tee for the graph + materializing-consumer case, saved-row size + cross-process serialization round-trip + dangling-ref best-effort, and the walker / `resolveOutput` / `resolveJobOutput` surface (class instances, Map/Set, sparse-ref filter, concurrency bound, identity preservation). --- packages/task-graph/src/cache/CacheRef.ts | 57 ++ .../src/cache/RunPrivateCacheRepo.ts | 64 ++ packages/task-graph/src/cache/index.ts | 3 + .../task-graph/src/cache/resolveJobOutput.ts | 55 ++ packages/task-graph/src/cache/resolveRef.ts | 195 ++++++ .../src/storage/TaskOutputRepository.ts | 46 ++ .../task-graph/src/task-graph/StreamPump.ts | 186 +++++- .../task-graph/src/task/CacheCoordinator.ts | 116 ++++ packages/task-graph/src/task/ITask.ts | 16 + .../task-graph/src/task/StreamProcessor.ts | 224 ++++++- packages/task-graph/src/task/StreamTypes.ts | 89 ++- packages/task-graph/src/task/TaskRunner.ts | 39 +- .../test/src/test/task-graph/CacheRef.test.ts | 86 +++ .../Spec2QueueRowAndRehydrate.test.ts | 241 +++++++ .../task-graph/StreamBinaryProcessor.test.ts | 95 +++ .../test/task-graph/StreamBinaryPump.test.ts | 625 ++++++++++++++++++ .../test/task-graph/StreamBinaryTypes.test.ts | 126 ++++ .../StreamProcessorBinaryRefSink.test.ts | 225 +++++++ .../TaskOutputRepositoryStream.test.ts | 190 ++++++ .../test/task-graph/TaskRunnerRefPath.test.ts | 205 ++++++ .../test/task-graph/resolveJobOutput.test.ts | 86 +++ .../src/test/task-graph/resolveOutput.test.ts | 136 ++++ packages/util/src/json-schema/JsonSchema.ts | 2 +- 23 files changed, 3095 insertions(+), 12 deletions(-) create mode 100644 packages/task-graph/src/cache/CacheRef.ts create mode 100644 packages/task-graph/src/cache/resolveJobOutput.ts create mode 100644 packages/task-graph/src/cache/resolveRef.ts create mode 100644 packages/test/src/test/task-graph/CacheRef.test.ts create mode 100644 packages/test/src/test/task-graph/Spec2QueueRowAndRehydrate.test.ts create mode 100644 packages/test/src/test/task-graph/StreamBinaryProcessor.test.ts create mode 100644 packages/test/src/test/task-graph/StreamBinaryPump.test.ts create mode 100644 packages/test/src/test/task-graph/StreamBinaryTypes.test.ts create mode 100644 packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts create mode 100644 packages/test/src/test/task-graph/TaskOutputRepositoryStream.test.ts create mode 100644 packages/test/src/test/task-graph/TaskRunnerRefPath.test.ts create mode 100644 packages/test/src/test/task-graph/resolveJobOutput.test.ts create mode 100644 packages/test/src/test/task-graph/resolveOutput.test.ts diff --git a/packages/task-graph/src/cache/CacheRef.ts b/packages/task-graph/src/cache/CacheRef.ts new file mode 100644 index 000000000..20222a48e --- /dev/null +++ b/packages/task-graph/src/cache/CacheRef.ts @@ -0,0 +1,57 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * A reference to bytes that live in the configured cache backing rather than + * inline in a task `Output`. Emitted by `TaskRunner` for binary output ports + * whose committed size meets the `IRunConfig.referenceThresholdBytes` and + * whose cache backing implements `saveOutputStream`. + * + * `$ref` is opaque to consumers: only the cache backing knows how to translate + * it back into bytes. `size` and `mime` are best-effort hints populated when + * known at finish time; absent values do not imply unknown failure. + * + * Resolution is best-effort: the cache backing's TTL is the lifetime contract, + * and `resolveOutputRef` returns `undefined` when the underlying entry has + * been evicted. + */ +export type CacheRef = { + readonly $ref: string; + readonly size?: number; + readonly mime?: string; +}; + +/** + * Narrow an unknown value to {@link CacheRef}. The discriminator is a `$ref` + * property of type `string`; other fields are optional and not inspected. + */ +export function isCacheRef(value: unknown): value is CacheRef { + if (typeof value !== "object" || value === null) return false; + const candidate = value as { readonly $ref?: unknown }; + return typeof candidate.$ref === "string"; +} + +/** + * Default threshold (in bytes) at which a binary output port becomes a + * {@link CacheRef} instead of being inlined in `Output`. Below this size, the + * runner inlines the bytes; at or above, it emits a reference. + * + * `0` is a sentinel meaning "always emit a reference" and is honored by the + * runtime path (a callsite that wants to force refs sets `0` explicitly via + * `IRunConfig.referenceThresholdBytes`). + */ +export const REFERENCE_THRESHOLD_BYTES_DEFAULT = 65_536; + +/** + * Resolve the effective reference threshold for a run, falling back to + * {@link REFERENCE_THRESHOLD_BYTES_DEFAULT} when unset. A negative value is + * treated as the default (negative thresholds are nonsensical). + */ +export function resolveReferenceThreshold(threshold: number | undefined): number { + if (threshold === undefined) return REFERENCE_THRESHOLD_BYTES_DEFAULT; + if (threshold < 0) return REFERENCE_THRESHOLD_BYTES_DEFAULT; + return threshold; +} diff --git a/packages/task-graph/src/cache/RunPrivateCacheRepo.ts b/packages/task-graph/src/cache/RunPrivateCacheRepo.ts index 5f1a7bc3b..5f573ffef 100644 --- a/packages/task-graph/src/cache/RunPrivateCacheRepo.ts +++ b/packages/task-graph/src/cache/RunPrivateCacheRepo.ts @@ -6,6 +6,7 @@ import { TaskOutputRepository } from "../storage/TaskOutputRepository"; import type { TaskInput, TaskOutput } from "../task/TaskTypes"; +import type { CacheRef } from "./CacheRef"; export interface RunPrivateCacheRepoOptions { backing: TaskOutputRepository; @@ -31,6 +32,20 @@ export class RunPrivateCacheRepo extends TaskOutputRepository { super({ outputCompression: backing.outputCompression }); this.backing = backing; this.runId = runId; + // Mirror the backing's optional-method shape on this instance so callers + // probing `typeof repo.saveOutputStream === "function"` (or the + // getOutputByRef/getOutputStreamByRef siblings) see the true capability + // instead of the always-present wrapper override. Class methods live on + // the prototype; assigning `undefined` on the instance shadows them. + if (typeof backing.saveOutputStream !== "function") { + (this as { saveOutputStream?: unknown }).saveOutputStream = undefined; + } + if (typeof backing.getOutputByRef !== "function") { + (this as { getOutputByRef?: unknown }).getOutputByRef = undefined; + } + if (typeof backing.getOutputStreamByRef !== "function") { + (this as { getOutputStreamByRef?: unknown }).getOutputStreamByRef = undefined; + } } private ns(taskType: string): string { @@ -50,6 +65,55 @@ export class RunPrivateCacheRepo extends TaskOutputRepository { return this.backing.getOutput(this.ns(taskType), inputs); } + /** + * Forwards the streaming sink to the backing repository, applying the same + * `runId` namespacing as `saveOutput`. Only present in effect when the + * backing repo supports streaming; `supportsStreaming()` (below) reflects the + * backing repo so callers branch correctly before calling this. + * + * Returns whatever {@link CacheRef} the backing produced (already namespaced + * via the wrapped `taskType`). Resolvers calling `getOutputByRef` on this + * wrapper forward to the backing, which decodes its own `$ref`. + */ + public override saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + metadata: Record + ): Promise { + const fn = this.backing.saveOutputStream; + if (typeof fn !== "function") { + return Promise.reject( + new Error( + `RunPrivateCacheRepo: backing repository does not implement saveOutputStream. ` + + `Call supportsStreaming() before saveOutputStream.` + ) + ); + } + return fn.call(this.backing, this.ns(taskType), inputs, chunks, metadata); + } + + /** + * Forwards by-ref retrieval to the backing repository. The `$ref` already + * encodes whatever the backing needs to locate the entry; no namespacing is + * re-applied here. + */ + public override getOutputByRef(ref: CacheRef): Promise { + if (typeof this.backing.getOutputByRef !== "function") return Promise.resolve(undefined); + return this.backing.getOutputByRef(ref); + } + + /** Forwards streaming by-ref retrieval to the backing repository. */ + public override getOutputStreamByRef(ref: CacheRef): AsyncIterable | undefined { + if (typeof this.backing.getOutputStreamByRef !== "function") return undefined; + return this.backing.getOutputStreamByRef(ref); + } + + /** Mirrors the backing repository's streaming capability. */ + public override supportsStreaming(): boolean { + return this.backing.supportsStreaming(); + } + /** * Override of `TaskOutputRepository.clear()` that only deletes entries * namespaced under THIS wrapper's `runId`. Entries from other runs are not diff --git a/packages/task-graph/src/cache/index.ts b/packages/task-graph/src/cache/index.ts index ac1518d67..708d1c5de 100644 --- a/packages/task-graph/src/cache/index.ts +++ b/packages/task-graph/src/cache/index.ts @@ -6,5 +6,8 @@ export * from "./CacheJanitor"; export * from "./CachePolicy"; +export * from "./CacheRef"; export * from "./CacheRegistry"; +export * from "./resolveJobOutput"; +export * from "./resolveRef"; export * from "./RunPrivateCacheRepo"; diff --git a/packages/task-graph/src/cache/resolveJobOutput.ts b/packages/task-graph/src/cache/resolveJobOutput.ts new file mode 100644 index 000000000..da7a0a71a --- /dev/null +++ b/packages/task-graph/src/cache/resolveJobOutput.ts @@ -0,0 +1,55 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { CacheRef } from "./CacheRef"; +import type { CacheRefResolver, ResolveOutputOptions } from "./resolveRef"; +import { resolveOutput } from "./resolveRef"; + +/** + * Structural type matching `@workglow/job-queue`'s `JobHandle`. Declared + * locally so this module doesn't have to import from job-queue (avoiding a + * runtime dependency edge for a structural shape). + */ +export interface JobHandleLike { + waitFor(): Promise; +} + +/** + * Carrier of the resolver. Two-shape input: either a {@link CacheRefResolver} + * function directly, or anything with a `getOutputByRef` method (the shape + * `TaskOutputRepository` exposes after Spec 2 Phase D.1). + */ +export type RefBacking = + | CacheRefResolver + | { readonly getOutputByRef?: (ref: CacheRef) => Promise }; + +/** + * Await a job's completion and hydrate every {@link CacheRef} inside its + * `Output` to inline bytes via the supplied backing. The backing can be a + * raw resolver function or any object exposing `getOutputByRef` (e.g. a + * `TaskOutputRepository`). + * + * On cache miss the placeholder is replaced by `undefined` (best-effort + * resolution, per Spec 2 §2). Backings that don't implement `getOutputByRef` + * leave every ref in place. + */ +export async function resolveJobOutput( + handle: JobHandleLike, + backing: RefBacking, + options?: ResolveOutputOptions +): Promise { + const output = await handle.waitFor(); + const resolver = asResolver(backing); + if (resolver === undefined) return output; + return resolveOutput(output, resolver, options); +} + +function asResolver(backing: RefBacking): CacheRefResolver | undefined { + if (typeof backing === "function") return backing; + const get = backing.getOutputByRef; + if (typeof get !== "function") return undefined; + return (ref) => get.call(backing, ref); +} diff --git a/packages/task-graph/src/cache/resolveRef.ts b/packages/task-graph/src/cache/resolveRef.ts new file mode 100644 index 000000000..585317a07 --- /dev/null +++ b/packages/task-graph/src/cache/resolveRef.ts @@ -0,0 +1,195 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { isCacheRef } from "./CacheRef"; +import type { CacheRef } from "./CacheRef"; + +/** + * Resolves a single {@link CacheRef} to bytes (or `undefined` on cache miss). + * Wired up by callers against their configured cache backing; this module is + * unaware of any specific repository implementation. + */ +export type CacheRefResolver = (ref: CacheRef) => Promise; + +/** + * Streaming counterpart of {@link CacheRefResolver}. Returns an async iterable + * of chunks for consumers that want to pipe the bytes further (e.g. into an + * HTTP response) without materializing the full payload. Returns `undefined` + * if the backing has no streaming retrieval for this ref or the entry is + * absent. + */ +export type CacheRefStreamResolver = ( + ref: CacheRef +) => AsyncIterable | undefined; + +/** Options accepted by {@link resolveOutput}. */ +export type ResolveOutputOptions = { + /** + * Maximum number of concurrent resolver calls. Defaults to unbounded + * (`Infinity`), suitable for backings that handle their own pacing. + * Set a finite value when the backing is rate-limited. + */ + readonly concurrency?: number; + /** + * Predicate deciding which refs are resolved. Refs that fail the filter are + * left in place (the slot keeps the original {@link CacheRef}). When omitted, + * every ref is resolved. + */ + readonly filter?: (ref: CacheRef) => boolean; +}; + +/** + * Recursively visit a task output and replace every {@link CacheRef} encountered + * with the value produced by the resolver. Non-ref values are returned as-is. + * + * Identity is preserved when the input contains no refs (or none that match the + * optional filter): the same object reference comes back, so callers can rely + * on `===` / `WeakMap` keys not being silently invalidated by an auto-resolve. + * + * Plain objects and arrays are walked structurally; objects with a non-Object + * prototype (class instances such as `Error`, `URL`) are also walked, and the + * returned clone preserves their prototype. `Blob`, `ArrayBuffer`, typed + * arrays, `Date`, `RegExp`, and `Promise` are treated as opaque leaves. + * `Map`/`Set` are walked through so that refs nested inside them can resolve. + * + * On cache miss the resolver returns `undefined`; the corresponding slot in + * the returned output is `undefined`. This is the documented best-effort + * behavior — callers either tolerate missing bytes or check explicitly. + */ +export async function resolveOutput( + output: T, + resolver: CacheRefResolver, + options?: ResolveOutputOptions +): Promise { + if (!hasMatchingRef(output, options?.filter)) return output; + const limit = createLimiter(options?.concurrency); + return (await walk(output, resolver, limit, options?.filter)) as T; +} + +/** + * Cheap pre-scan: returns `true` if any {@link CacheRef} (matching the + * optional filter) is reachable inside `value`. Lets `resolveOutput` + * short-circuit and preserve identity when nothing needs resolving. + */ +function hasMatchingRef(value: unknown, filter: ((ref: CacheRef) => boolean) | undefined): boolean { + if (isCacheRef(value)) return filter ? filter(value) : true; + if (value === null || value === undefined) return false; + if (isLeaf(value)) return false; + if (Array.isArray(value)) { + for (const v of value) { + if (hasMatchingRef(v, filter)) return true; + } + return false; + } + if (value instanceof Map) { + for (const v of value.values()) { + if (hasMatchingRef(v, filter)) return true; + } + return false; + } + if (value instanceof Set) { + for (const v of value) { + if (hasMatchingRef(v, filter)) return true; + } + return false; + } + if (typeof value === "object") { + const source = value as Record; + for (const k of Object.keys(source)) { + if (hasMatchingRef(source[k], filter)) return true; + } + return false; + } + return false; +} + +async function walk( + value: unknown, + resolver: CacheRefResolver, + limit: Limiter, + filter: ((ref: CacheRef) => boolean) | undefined +): Promise { + if (isCacheRef(value)) { + if (filter && !filter(value)) return value; + return limit.run(() => resolver(value)); + } + if (value === null || value === undefined) return value; + if (isLeaf(value)) return value; + if (!hasMatchingRef(value, filter)) return value; + if (Array.isArray(value)) { + return Promise.all(value.map((v) => walk(v, resolver, limit, filter))); + } + if (value instanceof Map) { + const out = new Map(); + const entries = Array.from(value.entries()); + const resolved = await Promise.all( + entries.map(async ([k, v]) => [k, await walk(v, resolver, limit, filter)] as const) + ); + for (const [k, v] of resolved) out.set(k, v); + return out; + } + if (value instanceof Set) { + const out = new Set(); + const resolved = await Promise.all( + Array.from(value).map((v) => walk(v, resolver, limit, filter)) + ); + for (const v of resolved) out.add(v); + return out; + } + if (typeof value === "object") { + const source = value as Record; + // Preserve prototype so class instances (Error, URL, custom classes) + // survive the walk without losing methods/instanceof identity. + const proto = Object.getPrototypeOf(source); + const out: Record = + proto === null || proto === Object.prototype ? {} : Object.create(proto); + // Iterate in source order so the returned object's enumeration order + // matches the input even though resolutions race. + const keys = Object.keys(source); + const resolvedValues = await Promise.all( + keys.map((k) => walk(source[k], resolver, limit, filter)) + ); + for (let i = 0; i < keys.length; i++) out[keys[i]!] = resolvedValues[i]; + return out; + } + return value; +} + +function isLeaf(value: unknown): boolean { + if (typeof value !== "object" || value === null) return true; + if (value instanceof Blob) return true; + if (value instanceof ArrayBuffer) return true; + if (ArrayBuffer.isView(value)) return true; + if (value instanceof Date) return true; + if (value instanceof RegExp) return true; + if (value instanceof Promise) return true; + return false; +} + +type Limiter = { run(fn: () => Promise): Promise }; + +function createLimiter(concurrency: number | undefined): Limiter { + if (concurrency === undefined || concurrency === Infinity) { + return { run: (fn) => fn() }; + } + let free = Math.max(1, Math.floor(concurrency)); + const waiters: Array<() => void> = []; + return { + async run(fn: () => Promise): Promise { + while (free <= 0) { + await new Promise((resolve) => waiters.push(resolve)); + } + free--; + try { + return await fn(); + } finally { + free++; + const next = waiters.shift(); + if (next) next(); + } + }, + }; +} diff --git a/packages/task-graph/src/storage/TaskOutputRepository.ts b/packages/task-graph/src/storage/TaskOutputRepository.ts index 59ac43eb9..cc444ae08 100644 --- a/packages/task-graph/src/storage/TaskOutputRepository.ts +++ b/packages/task-graph/src/storage/TaskOutputRepository.ts @@ -5,6 +5,7 @@ */ import { createServiceToken, EventEmitter, EventParameters } from "@workglow/util"; +import type { CacheRef } from "../cache/CacheRef"; import { TaskInput, TaskOutput } from "../task/TaskTypes"; /** @@ -109,6 +110,51 @@ export abstract class TaskOutputRepository { createdAt?: Date // for testing purposes ): Promise; + /** + * OPTIONAL streaming sink. Implementations that can ingest a byte stream + * without materializing the full payload (e.g. a file-backed cache) declare + * this method; the runner pipes `binary-delta` chunks straight to it. The + * default base class does NOT implement it — call `supportsStreaming()` to + * branch. `metadata` carries side-band data (e.g. HTTP response headers). + * + * Returns a {@link CacheRef} that the runner places into `Output` at the + * binary port slot when the reference threshold is met (Spec 2). The `$ref` + * string is opaque; only this repository (and any wrapping namespacer like + * {@link RunPrivateCacheRepo}) needs to know how to decode it via + * {@link getOutputByRef} / {@link getOutputStreamByRef}. + * + * Implementations that provide `saveOutputStream` MUST also provide + * `getOutputByRef` (and ideally `getOutputStreamByRef`); a ref written by + * one without a paired reader is unresolvable. + */ + saveOutputStream?( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + metadata: Record + ): Promise; + + /** + * OPTIONAL reader counterpart of {@link saveOutputStream}. Resolves a + * {@link CacheRef} previously produced by `saveOutputStream` to a `Blob`. + * Returns `undefined` on cache miss (TTL expiry, manual clear). The runner + * never calls this directly; consumers calling `JobHandle.result()` or + * `resolveOutput` reach it through the resolver layer. + */ + getOutputByRef?(ref: CacheRef): Promise; + + /** + * OPTIONAL streaming reader counterpart of {@link saveOutputStream}. Returns + * an async iterable of bytes for the referenced entry, or `undefined` when + * the entry is absent or this backing does not support streaming retrieval. + */ + getOutputStreamByRef?(ref: CacheRef): AsyncIterable | undefined; + + /** True when this repository implements `saveOutputStream`. */ + supportsStreaming(): boolean { + return typeof this.saveOutputStream === "function"; + } + /** * Retrieves a task output from the repository * @param taskType The type of task to retrieve the output for diff --git a/packages/task-graph/src/task-graph/StreamPump.ts b/packages/task-graph/src/task-graph/StreamPump.ts index b53515cc4..20a500dbc 100644 --- a/packages/task-graph/src/task-graph/StreamPump.ts +++ b/packages/task-graph/src/task-graph/StreamPump.ts @@ -155,6 +155,21 @@ export class StreamPump { options.accumulateLeafOutputs ); + // NOTE (Spec 1 / Phase C, reduced scope): when accumulation is skipped + // because the binary output qualifies for a streaming cache sink (see + // `canStreamBinaryToCache` / `taskNeedsAccumulation`), the byte stream + // should be piped straight to the cache via `CacheCoordinator.saveStream`. + // That live pipe cannot be driven from here: the normalized cache key + // (`keyInputs`, `__cv`-stamped) and the resolved cache-policy slot are + // computed *inside* `TaskRunner.run()`, which this method calls as a black + // box, and `TaskRunner` unconditionally calls `saveByPolicy` afterwards. + // Driving `saveOutputStream` from here would use the wrong key and collide + // with the buffered save. The decision (`taskNeedsAccumulation` → false) + // and the event→AsyncIterable→`saveOutputStream` assembly + // (`pipeBinaryToCache`) are complete and unit-tested; the `TaskRunner` + // integration that suppresses the buffered save and drives the pipe with + // the real `keyInputs` is deferred to Spec 2. + let streamingNotified = false; const onStatus = (status: TaskStatus) => { @@ -193,6 +208,12 @@ export class StreamPump { registry: options.registry, resourceScope: options.resourceScope, runId: options.runId, + // Sinks are installed regardless of downstream needs: when both an + // accumulator and a router exist (downstream needs materialized + cache + // can stream), StreamProcessor tees — accumulator drives the enriched + // finish event for edge consumers; the router's CacheRef takes the + // port slot in finalOutput so the queue/cache row stays small + // (Spec 2 Phase E). }); await this.edgeMaterializer.pushOutputFromNodeToEdges(task, results); @@ -229,7 +250,15 @@ export class StreamPump { outputCache: TaskOutputRepository | undefined, accumulateLeafOutputs: boolean ): boolean { - if (outputCache) return true; + if (outputCache) { + // Relaxation: when the cache can ingest a byte stream, the task streams + // ONLY binary, and no downstream edge needs the materialized value, the + // bytes are piped straight to the cache sink instead of being buffered + // into an enriched finish event. This is the memory win for large binary + // outputs (e.g. file/image producers). + if (StreamPump.canStreamBinaryToCache(this.graph, task, outputCache)) return false; + return true; + } const outEdges = this.graph.getTargetDataflows(task.id); if (outEdges.length === 0) return accumulateLeafOutputs; @@ -256,6 +285,161 @@ export class StreamPump { return false; } + /** + * Decides whether a streaming task's binary output can be piped straight to a + * stream-capable cache sink (skipping in-memory accumulation). True when: + * + * 1. The cache reports `supportsStreaming()` (NOT a `typeof saveOutputStream` + * duck-type — wrappers like `RunPrivateCacheRepo` always expose a concrete + * `saveOutputStream` but their `supportsStreaming()` reflects the BACKING + * repo, so the duck-type would falsely report `true` over a non-streaming + * backing store). + * 2. The task's only streaming output port(s) are binary. + * 3. No downstream dataflow edge needs the materialized value (every consumer + * accepts the raw binary stream, or there are no consumers). + * + * Exposed as a static (taking the graph explicitly) so the decision is + * unit-testable in isolation from a live run — mirroring + * {@link StreamPump.pipeBinaryToCache}. + */ + static canStreamBinaryToCache( + graph: TaskGraph, + task: ITask, + outputCache: TaskOutputRepository | undefined + ): boolean { + // Defensive: a repository may not implement `supportsStreaming` (the base + // class does, but test doubles / partial mocks may not). Treat anything + // that cannot affirmatively report streaming support as non-streaming. + if (typeof outputCache?.supportsStreaming !== "function") return false; + if (!outputCache.supportsStreaming()) return false; + + const outSchema = task.outputSchema(); + const streamingPorts = getStreamingPorts(outSchema); + const binaryOnly = + streamingPorts.length > 0 && streamingPorts.every((p) => p.mode === "binary"); + if (!binaryOnly) return false; + + return !StreamPump.anyConsumerNeedsMaterialized(graph, task); + } + + /** + * Returns `true` when any outgoing dataflow edge from {@link task} has a + * target task whose input port can't consume the source's stream mode + * directly (per {@link edgeNeedsAccumulation}). Independent of the cache — + * used by the graph runner to decide whether to inhibit binary-stream sinks + * on the source task's runner (refs can't survive across an edge whose + * target expects a materialized value). + * + * Treats fan-out `*` edges as always-needs-materialized (conservative). + */ + static anyConsumerNeedsMaterialized(graph: TaskGraph, task: ITask): boolean { + const outSchema = task.outputSchema(); + const outEdges = graph.getTargetDataflows(task.id); + return outEdges.some((df) => { + if (df.sourceTaskPortId === DATAFLOW_ALL_PORTS) return true; + const targetTask = graph.getTask(df.targetTaskId); + if (!targetTask) return false; + return edgeNeedsAccumulation( + outSchema, + df.sourceTaskPortId, + targetTask.inputSchema(), + df.targetTaskPortId + ); + }); + } + + /** + * Drives a stream-capable cache sink from a streaming task's `binary-delta` + * events. Returns an `{ promise, detach }` pair: `promise` resolves once the + * cache's `saveOutputStream` has consumed every chunk (after the task emits + * `stream_end`); `detach` removes the listeners. The chunk iterable is fed by + * the task's `stream_chunk` events and closed on `stream_end`. + * + * Abort/error contract: `StreamProcessor` emits `stream_end` only on success + * (it throws on abort/error before emitting it). To avoid a hang + listener + * leak when the source task aborts or errors mid-stream, the iterable is also + * terminated by the task's `abort`/`error` events and by an optional + * `AbortSignal` (Spec 2 passes the task's `ctx.signal`). On any of those the + * iterable ends gracefully so the sink can finalize the bytes seen so far — + * the returned promise ALWAYS settles and `detach` ALWAYS runs. + * + * Exposed as a static so the assembly (binary-delta events → AsyncIterable → + * `saveOutputStream`) is unit-testable in isolation from a graph run. + */ + static pipeBinaryToCache( + task: ITask, + binaryPortId: string | undefined, + sink: (chunks: AsyncIterable) => Promise, + signal?: AbortSignal + ): { promise: Promise; detach: () => void } { + const queue: Uint8Array[] = []; + let done = false; + let notify: (() => void) | undefined; + + const wake = () => { + const n = notify; + notify = undefined; + n?.(); + }; + + const onChunk = (event: StreamEvent) => { + if (event.type === "binary-delta") { + if (binaryPortId === undefined || event.port === binaryPortId) { + queue.push(event.binaryDelta); + wake(); + } + } + }; + const onEnd = () => { + done = true; + wake(); + }; + // Abort/error termination: StreamProcessor never emits `stream_end` on these + // paths, so without this the iterable would await forever. Terminate the + // iterable (don't throw) so the sink finalizes the bytes seen so far and the + // promise settles — the source's own abort/error already surfaces to the run. + const onTerminate = () => { + done = true; + wake(); + }; + + task.on("stream_chunk", onChunk); + task.on("stream_end", onEnd); + task.on("abort", onTerminate); + task.on("error", onTerminate); + if (signal) { + if (signal.aborted) onTerminate(); + else signal.addEventListener("abort", onTerminate); + } + + const detach = () => { + task.off("stream_chunk", onChunk); + task.off("stream_end", onEnd); + task.off("abort", onTerminate); + task.off("error", onTerminate); + signal?.removeEventListener("abort", onTerminate); + }; + + async function* chunkIterable(): AsyncIterable { + while (true) { + while (queue.length > 0) { + yield queue.shift()!; + } + if (done) return; + await new Promise((resolve) => { + notify = resolve; + }); + } + } + + // Discard the sink's return value (helper signals completion only; callers + // wanting a CacheRef should hold the sink-returning promise themselves). + const promise = sink(chunkIterable()) + .finally(detach) + .then(() => undefined); + return { promise, detach }; + } + /** * Returns true if an event carries a port-specific delta (text-delta or object-delta). */ diff --git a/packages/task-graph/src/task/CacheCoordinator.ts b/packages/task-graph/src/task/CacheCoordinator.ts index 0e5ae0237..7a5f6908e 100644 --- a/packages/task-graph/src/task/CacheCoordinator.ts +++ b/packages/task-graph/src/task/CacheCoordinator.ts @@ -5,9 +5,14 @@ */ import { getPortCodec } from "@workglow/util"; +import type { DataPortSchema } from "@workglow/util/schema"; import { type CachePolicy, isPolicyCached, isPolicyPrivate } from "../cache/CachePolicy"; +import type { CacheRef } from "../cache/CacheRef"; +import { isCacheRef } from "../cache/CacheRef"; import type { CacheRegistry } from "../cache/CacheRegistry"; import type { TaskOutputRepository } from "../storage/TaskOutputRepository"; +import type { BinaryRefSink } from "./StreamProcessor"; +import { getBinaryPortFormat, getBinaryPortId, getStreamingPorts } from "./StreamTypes"; import type { ITask } from "./ITask"; import type { StreamEvent } from "./StreamTypes"; import { Task } from "./Task"; @@ -110,6 +115,23 @@ export class CacheCoordinator, + metadata: Record, + outputCache: TaskOutputRepository | undefined + ): Promise { + if (!outputCache || !this.task.cacheable) return undefined; + if (!outputCache.supportsStreaming()) return undefined; + return outputCache.saveOutputStream!(this.task.type, keyInputs, chunks, metadata); + } + // ======================================================================== // Policy-aware routing methods // ======================================================================== @@ -155,6 +177,100 @@ export class CacheCoordinator | undefined { + if (!this.task.cacheable) return undefined; + const cache = this.repoFor(registry, policy); + if (!cache || !cache.supportsStreaming()) return undefined; + const port = getBinaryPortId(outputSchema); + if (port === undefined) return undefined; + const taskType = this.task.type; + const sink: BinaryRefSink = (chunks) => + cache.saveOutputStream!(taskType, keyInputs, chunks, {}); + return new Map([[port, sink]]); + } + + /** + * Post-process the streaming task's `Output`: for every **binary streaming + * port** (per the schema) whose value is a {@link CacheRef} with + * `size < referenceThresholdBytes`, rehydrate the bytes via `getOutputByRef` + * and inline them as `Blob`/`ArrayBuffer` (per the port's `format` + * annotation). Refs at or above the threshold are left in place. + * `referenceThresholdBytes === 0` forces every ref to survive regardless of + * size. + * + * Restricted to schema-declared binary streaming ports so that legitimate + * non-binary fields that happen to carry a `{$ref: string}` shape (e.g. a + * JSON-Schema reference embedded in metadata) are not mistakenly resolved + * against the cache. + * + * Refs without a known `size` are kept as-is (the writer didn't measure; + * conservatively assume "large enough to keep as ref"). Backings that want + * threshold-based rehydration MUST populate `size` on the CacheRef they + * return from `saveOutputStream`. + */ + public async hydrateRefsBelowThreshold( + output: Output, + registry: CacheRegistry | undefined, + policy: CachePolicy, + outputSchema: DataPortSchema, + referenceThresholdBytes: number + ): Promise { + if (referenceThresholdBytes === 0) return output; + if (output === null || typeof output !== "object") return output; + const cache = this.repoFor(registry, policy); + if (!cache || typeof cache.getOutputByRef !== "function") return output; + + const binaryPorts = getStreamingPorts(outputSchema) + .filter((p) => p.mode === "binary") + .map((p) => p.port); + if (binaryPorts.length === 0) return output; + + const source = output as Record; + let out: Record | undefined; + const rehydrations = await Promise.all( + binaryPorts.map(async (port) => { + const value = source[port]; + if (!isCacheRef(value)) return undefined; + const size = value.size; + if (size === undefined || size >= referenceThresholdBytes) return undefined; + const blob = await cache.getOutputByRef!(value); + if (blob === undefined) return undefined; + const format = getBinaryPortFormat(outputSchema, port); + const inlined = format === "binary" ? await blob.arrayBuffer() : blob; + return { port, inlined }; + }) + ); + for (const r of rehydrations) { + if (!r) continue; + out ??= { ...source }; + out[r.port] = r.inlined; + } + return (out ?? source) as Output; + } + // ======================================================================== // Private static helpers (lifted from current module-private functions in // TaskRunner.ts) diff --git a/packages/task-graph/src/task/ITask.ts b/packages/task-graph/src/task/ITask.ts index a58199825..5c5da57ae 100644 --- a/packages/task-graph/src/task/ITask.ts +++ b/packages/task-graph/src/task/ITask.ts @@ -114,6 +114,22 @@ export interface IRunConfig { */ shouldAccumulate?: boolean; + /** + * Threshold (in bytes) at which a binary output port's value is replaced by + * a {@link CacheRef} in `Output` instead of being inlined. Below this size, + * the runner inlines the bytes; at or above, it emits a reference and the + * bytes live only in the cache backing. + * + * `0` forces a reference for every binary port regardless of size. Negative + * values and `undefined` fall back to + * {@link REFERENCE_THRESHOLD_BYTES_DEFAULT} (64 KB). + * + * Only applied when the cache backing implements `saveOutputStream` and the + * port carries binary stream events; otherwise the value is always inlined + * regardless of this setting. + */ + referenceThresholdBytes?: number; + /** * Optional callback invoked whenever a task's progress changes during execution. * @param task - The task whose progress changed. diff --git a/packages/task-graph/src/task/StreamProcessor.ts b/packages/task-graph/src/task/StreamProcessor.ts index 386fd0553..25a1798c5 100644 --- a/packages/task-graph/src/task/StreamProcessor.ts +++ b/packages/task-graph/src/task/StreamProcessor.ts @@ -5,13 +5,30 @@ */ import type { ResourceScope, ServiceRegistry } from "@workglow/util"; +import type { CacheRef } from "../cache/CacheRef"; import type { Taskish } from "../task-graph/Conversions"; import type { ITask } from "./ITask"; import type { StreamEvent, StreamMode } from "./StreamTypes"; -import { getOutputStreamMode, getStreamingPorts } from "./StreamTypes"; +import { + getBinaryPortFormat, + getOutputStreamMode, + getStreamingPorts, + materializeBinary, +} from "./StreamTypes"; import { TaskAbortedError, TaskError } from "./TaskError"; import type { TaskRunContext } from "./TaskRunContext"; import type { TaskInput, TaskOutput } from "./TaskTypes"; + +/** + * Consumer for a port's binary-delta stream. The processor exposes chunks as + * an async iterable; the sink returns the {@link CacheRef} the processor + * places into `Output` at the port slot (Spec 2). + * + * Implementations are typically thin wrappers around + * `TaskOutputRepository.saveOutputStream` — the runner supplies the wrapper + * once it knows the cache key. + */ +export type BinaryRefSink = (chunks: AsyncIterable) => Promise; import { TaskStatus } from "./TaskTypes"; /** @@ -31,6 +48,18 @@ export interface StreamProcessorDeps { ...args: any[] ) => Promise; readonly own: >(i: T) => T; + /** + * Per-port binary-stream sinks. When a port has a sink registered, the + * processor routes that port's `binary-delta` chunks to the sink (as an + * async iterable) **instead** of accumulating them into a `Blob` / + * `ArrayBuffer` in memory. At finish, the sink's returned {@link CacheRef} + * replaces the port's slot in the output object — unless an explicit + * binary finish payload is present for that port, which always wins + * (the artifact-precedence rule from Spec 1). + * + * Ports without a sink follow the normal accumulation path. + */ + readonly binaryRefSinks?: ReadonlyMap; } /** @@ -70,6 +99,27 @@ export class StreamProcessor const accumulatedObjects = ctx.shouldAccumulate ? new Map | unknown[]>() : undefined; + const accumulatedBinary = ctx.shouldAccumulate + ? new Map() + : undefined; + // Per-port routers: lazily created on the first binary-delta whose port has + // a sink in `deps.binaryRefSinks`. Routes chunks to the sink instead of + // accumulating in memory; at finish, awaits the sink's returned CacheRef + // and writes it into the output at the port slot. + const sinks = deps.binaryRefSinks; + const routers = new Map(); + const ensureRouter = (port: string): BinaryStreamRouter | undefined => { + if (!sinks) return undefined; + const sink = sinks.get(port); + if (!sink) return undefined; + let r = routers.get(port); + if (!r) { + r = new BinaryStreamRouter(sink); + routers.set(port, r); + } + return r; + }; + let streamingStarted = false; let finalOutput: Output | undefined; @@ -84,6 +134,7 @@ export class StreamProcessor inputStreams: deps.inputStreams, }); + try { for await (const event of stream) { // For snapshot events, update runOutputData BEFORE emitting stream_chunk // so listeners see the latest snapshot when they handle the event. @@ -149,6 +200,28 @@ export class StreamProcessor this.task.emit("stream_chunk", event as StreamEvent); break; } + case "binary-delta": { + if (!streamingStarted) { + streamingStarted = true; + this.task.status = TaskStatus.STREAMING; + this.task.emit("status", this.task.status); + } + // Spec 2 Phase E: tee. When both a router AND an accumulator exist + // for this port (graph context where the cache can stream but a + // downstream edge needs the materialized value), push to BOTH — + // router writes to the cache for the small ref-bearing Output, + // accumulator drives the enriched finish event so edge consumers + // still receive a Blob/ArrayBuffer. + const router = ensureRouter(event.port); + if (router) router.push(event.binaryDelta); + if (accumulatedBinary) { + const arr = accumulatedBinary.get(event.port) ?? []; + arr.push(event.binaryDelta); + accumulatedBinary.set(event.port, arr); + } + this.task.emit("stream_chunk", event as StreamEvent); + break; + } case "snapshot": { if (!streamingStarted) { streamingStarted = true; @@ -159,11 +232,17 @@ export class StreamProcessor break; } case "finish": { - if (accumulated || accumulatedObjects) { + const hasEnrichment = + accumulated !== undefined || + accumulatedObjects !== undefined || + accumulatedBinary !== undefined || + routers.size > 0; + if (hasEnrichment) { // Emit an enriched finish event: merge accumulated deltas into // the finish payload so downstream dataflows get complete port data // without needing to re-accumulate themselves. - const merged: Record = { ...(event.data || {}) }; + const explicitPayload = (event.data || {}) as Record; + const merged: Record = { ...explicitPayload }; if (accumulated) { for (const [port, text] of accumulated) { if (text.length > 0) merged[port] = text; @@ -174,13 +253,42 @@ export class StreamProcessor merged[port] = obj; } } + if (accumulatedBinary) { + const outSchema = this.task.outputSchema(); + for (const [port, chunks] of accumulatedBinary) { + // Explicit binary finish payload wins. (Unlike text/object + // deltas above, which overwrite event.data, binary yields to + // an explicit payload — it's a whole artifact, not a partial.) + if (port in explicitPayload) continue; + const format = getBinaryPortFormat(outSchema, port); + merged[port] = materializeBinary(chunks, format); + } + } + // Close routers and collect refs. Explicit binary finish payload + // still wins for the OUTPUT slot (artifact precedence); the + // router's CacheRef is discarded in that case but the cache + // write already happened. + for (const router of routers.values()) router.end(); + const refs = new Map(); + for (const [port, router] of routers) { + if (port in explicitPayload) { + // Drain the promise so the sink doesn't leak; ignore the ref. + router.ref().catch(() => {}); + continue; + } + refs.set(port, await router.ref()); + } // For replace-mode streams, finish carries data: {} by convention. // Fall back to the last snapshot (runOutputData) so the final output - // is not silently cleared when the finish payload is empty. + // is not silently cleared when the finish payload is empty — + // overlaying router refs on top so cache-written bytes are not + // orphaned (the ref still lands in the OUTPUT slot). if (streamMode === "replace" && Object.keys(merged).length === 0) { const lastSnapshot = this.task.runOutputData; if (lastSnapshot && Object.keys(lastSnapshot).length > 0) { - finalOutput = lastSnapshot as Output; + const snapshotWithRefs: Record = { ...lastSnapshot }; + for (const [port, ref] of refs) snapshotWithRefs[port] = ref; + finalOutput = snapshotWithRefs as Output; this.task.emit("stream_chunk", { type: "finish", data: lastSnapshot, @@ -188,8 +296,20 @@ export class StreamProcessor break; } } - finalOutput = merged as unknown as Output; + // The emitted finish event always carries the materialized payload + // (from accumulators) so edge consumers see Blob/ArrayBuffer. + // finalOutput diverges only when a router produced a ref for a + // port that wasn't already pinned by an explicit payload — that + // ref takes the slot in the return value so the queue/cache row + // stays small (Spec 2 Phase E: tee). this.task.emit("stream_chunk", { type: "finish", data: merged } as StreamEvent); + if (refs.size === 0) { + finalOutput = merged as unknown as Output; + } else { + const finalMerged: Record = { ...merged }; + for (const [port, ref] of refs) finalMerged[port] = ref; + finalOutput = finalMerged as unknown as Output; + } } else { // No accumulation. For replace-mode streams the provider's finish // event carries `data: {}` by convention — the snapshots already @@ -219,6 +339,19 @@ export class StreamProcessor } } } + } catch (err) { + // Surface the error to any in-flight router sinks so they reject + // (rather than waiting forever on the producer). The original error is + // rethrown unchanged. + const failure = err instanceof Error ? err : new Error(String(err)); + for (const router of routers.values()) router.fail(failure); + throw err; + } finally { + // Defensive: if the loop exited without seeing a `finish` event + // (e.g. abort, generator return without yield), close routers so their + // sinks see end-of-stream rather than blocking on the next chunk. + for (const router of routers.values()) router.end(); + } // Check if the task was aborted during streaming if (ctx.abortController.signal.aborted) { @@ -234,3 +367,82 @@ export class StreamProcessor return this.task.runOutputData as Output; } } + +/** + * Producer-consumer router used by {@link StreamProcessor} to forward a single + * binary output port's `binary-delta` chunks to a {@link BinaryRefSink}. The + * sink consumes the chunks via the async iterable and returns a + * {@link CacheRef} that the processor places into `Output` at finish. + * + * Lifecycle: chunks pushed via `push()` are yielded to the sink in order. + * `end()` signals end-of-stream (sink completes consumption, refPromise + * resolves). `fail(err)` causes the iterable to throw on the next read + * (refPromise rejects). `end()` and `fail()` are idempotent. + * + * Backpressure: there is none — the producer (`executeStream`) writes into + * `buffer` synchronously; if the sink consumes more slowly than the producer + * emits chunks, the buffer grows unbounded. This is acceptable for cache + * backings whose write throughput meets or exceeds the upstream source + * (the common case: cache is a local SSD/memory FS; source is bounded by + * network or compute). For genuinely slow backings (remote object stores, + * throttled FS), wrap the sink in a chunked-uploader that applies its own + * pacing — there is no signal we can send back into `binary-delta`. + */ +class BinaryStreamRouter { + private readonly buffer: Uint8Array[] = []; + private finished = false; + private failure: Error | undefined; + private notify: (() => void) | undefined; + private readonly refPromise: Promise; + + constructor(sink: BinaryRefSink) { + this.refPromise = sink(this.iterable()); + // Observe rejection so an unawaited refPromise (e.g. after fail() in an + // error path) doesn't surface as an unhandled rejection. Subsequent + // `await this.refPromise` still rejects. + this.refPromise.catch(() => {}); + } + + push(chunk: Uint8Array): void { + if (this.finished) return; + this.buffer.push(chunk); + this.wake(); + } + + end(): void { + if (this.finished) return; + this.finished = true; + this.wake(); + } + + fail(err: Error): void { + if (this.finished) return; + this.failure = err; + this.finished = true; + this.wake(); + } + + ref(): Promise { + return this.refPromise; + } + + private wake(): void { + const n = this.notify; + this.notify = undefined; + n?.(); + } + + private async *iterable(): AsyncIterable { + while (true) { + while (this.buffer.length > 0) { + yield this.buffer.shift()!; + } + if (this.failure) throw this.failure; + if (this.finished) return; + await new Promise((res) => { + this.notify = res; + }); + } + } +} + diff --git a/packages/task-graph/src/task/StreamTypes.ts b/packages/task-graph/src/task/StreamTypes.ts index 3f79f20d3..2220a370e 100644 --- a/packages/task-graph/src/task/StreamTypes.ts +++ b/packages/task-graph/src/task/StreamTypes.ts @@ -12,12 +12,13 @@ import type { DataPortSchema, JsonSchema } from "@workglow/util/schema"; * - `append`: Each chunk is a delta (e.g., a new token). * - `replace`: Each chunk is a corrected/revised snapshot of the complete output so far. * - `object`: Each chunk is a progressively more complete partial object snapshot. + * - `binary`: Each chunk is an ordered byte slice; consumer concatenates into a Blob/ArrayBuffer. * - `mixed`: Multiple ports use different stream modes (e.g., append + object). * * Declared per-port via the `x-stream` schema extension property. * Absent `x-stream` = `"none"`. */ -export type StreamMode = "none" | "append" | "replace" | "object" | "mixed"; +export type StreamMode = "none" | "append" | "replace" | "object" | "binary" | "mixed"; /** * Append mode: delta chunk (consumer accumulates). @@ -45,6 +46,18 @@ export type StreamObjectDelta = { objectDelta: Record | unknown[]; }; +/** + * Binary mode: an ordered, append-only chunk of bytes (consumer concatenates). + * `port` identifies which output port this delta belongs to. Chunks are + * materialized on `finish` into a `Blob` or `ArrayBuffer` per the port's + * schema `format` (see `materializeBinary`). + */ +export type StreamBinaryDelta = { + type: "binary-delta"; + port: string; + binaryDelta: Uint8Array; +}; + /** * Replace mode: full snapshot chunk (replaces previous state). */ @@ -104,6 +117,7 @@ export type StreamPhase = { export type StreamEvent> = | StreamTextDelta | StreamObjectDelta + | StreamBinaryDelta | StreamSnapshot | StreamFinish | StreamError @@ -126,7 +140,8 @@ export function getPortStreamMode(schema: DataPortSchema | JsonSchema, portId: s const prop = (schema.properties as Record)?.[portId]; if (!prop || typeof prop === "boolean") return "none"; const xStream = prop["x-stream"]; - if (xStream === "append" || xStream === "replace" || xStream === "object") return xStream; + if (xStream === "append" || xStream === "replace" || xStream === "object" || xStream === "binary") + return xStream; return "none"; } @@ -147,7 +162,12 @@ export function getStreamingPorts( for (const [name, prop] of Object.entries(props)) { if (!prop || typeof prop === "boolean") continue; const xStream = (prop as any)["x-stream"]; - if (xStream === "append" || xStream === "replace" || xStream === "object") { + if ( + xStream === "append" || + xStream === "replace" || + xStream === "object" || + xStream === "binary" + ) { result.push({ port: name, mode: xStream }); } } @@ -251,6 +271,69 @@ export function getObjectPortId(schema: DataPortSchema): string | undefined { return undefined; } +/** + * Returns the port ID (property name) of the first output port that declares + * `x-stream: "binary"`, or `undefined` if no such port exists. + * + * @param schema - The task's output DataPortSchema + * @returns The port name with binary streaming, or undefined + */ +export function getBinaryPortId(schema: DataPortSchema): string | undefined { + if (typeof schema === "boolean") return undefined; + const props = schema.properties; + if (!props) return undefined; + + for (const [name, prop] of Object.entries(props)) { + if (!prop || typeof prop === "boolean") continue; + if ((prop as any)["x-stream"] === "binary") return name; + } + return undefined; +} + +/** + * Reads the `format` annotation of a single output port from the task's output + * schema. Used to decide whether accumulated binary chunks materialize into a + * `Blob` (`format: "blob"` or absent) or an `ArrayBuffer` (`format: "binary"`). + */ +export function getBinaryPortFormat(schema: DataPortSchema, port: string): string | undefined { + if (typeof schema === "boolean") return undefined; + const prop = (schema.properties as Record)?.[port]; + if (!prop || typeof prop === "boolean") return undefined; + return prop.format as string | undefined; +} + +/** + * Materializes ordered binary chunks into the value type declared by the + * output port's schema `format`: + * - `"binary"` → `ArrayBuffer` + * - `"blob"` (or absent) → `Blob` (the default) + * + * Chunks are concatenated in arrival order. Callers MUST pass chunks in the + * order they were emitted. + * + * @param chunks - Ordered binary chunks to concatenate + * @param format - The output port's schema `format` (e.g. `"binary"` or `"blob"`) + * @returns The materialized `Blob` or `ArrayBuffer` + */ +export function materializeBinary( + chunks: readonly Uint8Array[], + format: string | undefined +): Blob | ArrayBuffer { + if (format === "blob" || format === undefined) { + return new Blob(chunks as unknown as BlobPart[]); + } + // format === "binary" (and any other non-blob value) → ArrayBuffer + let total = 0; + for (const c of chunks) total += c.byteLength; + const merged = new Uint8Array(total); + let offset = 0; + for (const c of chunks) { + merged.set(c, offset); + offset += c.byteLength; + } + return merged.buffer; +} + /** * Returns a map of port names to their JSON Schemas for every output port * that declares `"x-structured-output": true`. diff --git a/packages/task-graph/src/task/TaskRunner.ts b/packages/task-graph/src/task/TaskRunner.ts index 4847622d7..fbf2345e2 100644 --- a/packages/task-graph/src/task/TaskRunner.ts +++ b/packages/task-graph/src/task/TaskRunner.ts @@ -13,7 +13,12 @@ import { SpanStatusCode, } from "@workglow/util"; import type { CacheRegistry } from "../cache"; -import { CACHE_REGISTRY, DefaultCacheRegistry, RunPrivateCacheRepo } from "../cache"; +import { + CACHE_REGISTRY, + DefaultCacheRegistry, + resolveReferenceThreshold, + RunPrivateCacheRepo, +} from "../cache"; import { TASK_OUTPUT_REPOSITORY, TaskOutputRepository } from "../storage/TaskOutputRepository"; import type { Taskish } from "../task-graph/Conversions"; import { ensureTask } from "../task-graph/Conversions"; @@ -270,6 +275,23 @@ export class TaskRunner< ); if (outputs === undefined) { + // Spec 2: build per-port binary-stream sinks when the cache supports + // streaming and the schema has a binary port. The sinks always run + // (memory-bounded write to cache); the runtime threshold controls + // whether the resulting CacheRef SURVIVES in Output or gets + // rehydrated to an inline Blob/ArrayBuffer below. + const referenceThresholdBytes = resolveReferenceThreshold( + config.referenceThresholdBytes ?? this.task.runConfig.referenceThresholdBytes + ); + const binaryRefSinks = isStreamable + ? this.cacheCoordinator.getBinaryRefSinksByPolicy( + keyInputs, + this.cacheRegistry, + policy, + this.task.outputSchema() + ) + : undefined; + outputs = isStreamable ? await this.streamProcessor.run(inputs, ctx, { registry: this.registry, @@ -277,9 +299,24 @@ export class TaskRunner< inputStreams: this.inputStreams, onProgress: this.handleProgress.bind(this), own: this.own, + binaryRefSinks, }) : await this.executeTask(inputs, ctx); + // Phase D.4: rehydrate refs whose committed size is below the + // configured threshold so callers see inline bytes for small outputs + // (threshold default = 64 KiB). Refs at/above threshold survive. + // threshold = 0 forces every ref to survive regardless of size. + if (outputs !== undefined && binaryRefSinks !== undefined) { + outputs = await this.cacheCoordinator.hydrateRefsBelowThreshold( + outputs as Output, + this.cacheRegistry, + policy, + this.task.outputSchema(), + referenceThresholdBytes + ); + } + await this.cacheCoordinator.saveByPolicy( keyInputs, outputs as Output, diff --git a/packages/test/src/test/task-graph/CacheRef.test.ts b/packages/test/src/test/task-graph/CacheRef.test.ts new file mode 100644 index 000000000..b736d7b51 --- /dev/null +++ b/packages/test/src/test/task-graph/CacheRef.test.ts @@ -0,0 +1,86 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it } from "vitest"; +import { + isCacheRef, + REFERENCE_THRESHOLD_BYTES_DEFAULT, + resolveReferenceThreshold, +} from "@workglow/task-graph"; +import type { CacheRef, IRunConfig } from "@workglow/task-graph"; + +describe("isCacheRef", () => { + it("accepts a minimal ref carrying only $ref", () => { + const ref: CacheRef = { $ref: "cache://k1" }; + expect(isCacheRef(ref)).toBe(true); + }); + + it("accepts a ref carrying size and mime hints", () => { + const ref: CacheRef = { $ref: "cache://k2", size: 1024, mime: "audio/wav" }; + expect(isCacheRef(ref)).toBe(true); + }); + + it("rejects values without a string $ref", () => { + expect(isCacheRef({})).toBe(false); + expect(isCacheRef({ ref: "cache://k" })).toBe(false); + expect(isCacheRef({ $ref: 42 })).toBe(false); + expect(isCacheRef({ $ref: null })).toBe(false); + }); + + it("rejects primitives and null", () => { + expect(isCacheRef(null)).toBe(false); + expect(isCacheRef(undefined)).toBe(false); + expect(isCacheRef("cache://k")).toBe(false); + expect(isCacheRef(42)).toBe(false); + expect(isCacheRef(true)).toBe(false); + }); + + it("accepts a ref where $ref is the empty string (still string-typed)", () => { + expect(isCacheRef({ $ref: "" })).toBe(true); + }); + + it("does not confuse JSON-Schema $ref strings with cache refs by shape", () => { + // JSON Schema $ref also uses { $ref: string }. Shape is identical at this + // layer; discrimination by call site / port-context is the contract, not + // shape inspection. This test documents the limitation. + const jsonSchemaRef = { $ref: "#/definitions/Foo" }; + expect(isCacheRef(jsonSchemaRef)).toBe(true); + }); +}); + +describe("resolveReferenceThreshold", () => { + it("returns the default constant when threshold is undefined", () => { + expect(resolveReferenceThreshold(undefined)).toBe(REFERENCE_THRESHOLD_BYTES_DEFAULT); + }); + + it("returns the configured threshold when set to a positive number", () => { + expect(resolveReferenceThreshold(1024)).toBe(1024); + expect(resolveReferenceThreshold(1_000_000)).toBe(1_000_000); + }); + + it("returns 0 when set to 0 (sentinel: always emit a reference)", () => { + expect(resolveReferenceThreshold(0)).toBe(0); + }); + + it("falls back to the default when given a negative value", () => { + expect(resolveReferenceThreshold(-1)).toBe(REFERENCE_THRESHOLD_BYTES_DEFAULT); + }); + + it("the default is 64 KiB", () => { + expect(REFERENCE_THRESHOLD_BYTES_DEFAULT).toBe(65_536); + }); + + it("IRunConfig accepts referenceThresholdBytes as a number", () => { + const cfg: IRunConfig = { referenceThresholdBytes: 0 }; + expect(resolveReferenceThreshold(cfg.referenceThresholdBytes)).toBe(0); + const cfg2: IRunConfig = { referenceThresholdBytes: 2048 }; + expect(resolveReferenceThreshold(cfg2.referenceThresholdBytes)).toBe(2048); + const cfg3: IRunConfig = {}; + expect(resolveReferenceThreshold(cfg3.referenceThresholdBytes)).toBe( + REFERENCE_THRESHOLD_BYTES_DEFAULT + ); + }); +}); diff --git a/packages/test/src/test/task-graph/Spec2QueueRowAndRehydrate.test.ts b/packages/test/src/test/task-graph/Spec2QueueRowAndRehydrate.test.ts new file mode 100644 index 000000000..f453b30ec --- /dev/null +++ b/packages/test/src/test/task-graph/Spec2QueueRowAndRehydrate.test.ts @@ -0,0 +1,241 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { CacheRef, StreamEvent } from "@workglow/task-graph"; +import { + CACHE_REGISTRY, + DefaultCacheRegistry, + IExecuteContext, + isCacheRef, + resolveJobOutput, + Task, + TaskOutputRepository, + TaskRegistry, +} from "@workglow/task-graph"; +import type { JobHandleLike, TaskInput, TaskOutput } from "@workglow/task-graph"; +import { Container, ServiceRegistry, sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, beforeEach, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob }; + +/** + * Streaming memory cache that exposes both `saveOutputStream` (Spec 2 path, + * returns CacheRef + stores bytes in a side map) and `getOutputByRef` so the + * cross-process resolution test below can hydrate refs without touching the + * main `saveOutput` row. + */ +class StreamingMemoryRepo extends TaskOutputRepository { + public readonly saved = new Map(); + public readonly streamed = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.saved.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.saved.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.saved.clear(); + this.streamed.clear(); + } + override async size(): Promise { + return this.saved.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + _metadata: Record + ): Promise { + const parts: Uint8Array[] = []; + let size = 0; + for await (const c of chunks) { + parts.push(c); + size += c.byteLength; + } + const merged = new Uint8Array(size); + let off = 0; + for (const p of parts) { + merged.set(p, off); + off += p.byteLength; + } + const key = `inmem://${taskType}::${JSON.stringify(inputs)}`; + this.streamed.set(key, merged); + return { $ref: key, size, mime: "application/octet-stream" }; + } + override async getOutputByRef(ref: CacheRef): Promise { + const bytes = this.streamed.get(ref.$ref); + return bytes === undefined ? undefined : new Blob([bytes as unknown as BlobPart]); + } +} + +class NonStreamingMemoryRepo extends TaskOutputRepository { + public readonly saved = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.saved.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.saved.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.saved.clear(); + } + override async size(): Promise { + return this.saved.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } +} + +const CHUNK = 4 * 1024; // 4 KiB +const CHUNKS = 16; // 64 KiB total — large enough that inline-vs-ref is dramatic + +class BigBlobStreamTask extends Task, BinOut> { + public static override type = "Spec2QueueRowTest_BigBlobStream"; + public static override category = "Test"; + public static override cacheable = true; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + for (let i = 0; i < CHUNKS; i++) { + const chunk = new Uint8Array(CHUNK).fill(i & 0xff); + yield { type: "binary-delta", port: "bytes", binaryDelta: chunk }; + if (i % 4 === 3) await sleep(0); + } + yield { type: "finish", data: {} as BinOut }; + } +} + +beforeAll(() => { + TaskRegistry.registerTask(BigBlobStreamTask as any); +}); + +let services: ServiceRegistry; +let repo: StreamingMemoryRepo; +beforeEach(() => { + repo = new StreamingMemoryRepo({}); + services = new ServiceRegistry(new Container()); + services.registerInstance(CACHE_REGISTRY, new DefaultCacheRegistry({ deterministic: repo })); +}); + +/** + * The principal user value of Spec 2: the SAVED ROW in the cache (the same + * value job-queue would carry through `JobStorageFormat.output`) stays small + * regardless of payload size when the ref path is taken. These tests measure + * the wire size by JSON-serializing the saved output the way a real storage + * backend (Postgres/SQLite) would. + */ +describe("Spec 2 — saved-row size & cross-process rehydration", () => { + it("force-ref keeps the saved row tiny (CacheRef envelope only); bytes live in the streaming cache", async () => { + const task = new BigBlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + // Wire shape of the cached small row. + expect(repo.saved.size).toBe(1); + const [savedOutput] = Array.from(repo.saved.values()) as Array>; + expect(isCacheRef(savedOutput.bytes)).toBe(true); + + const savedJson = JSON.stringify(savedOutput); + // CacheRef envelope is well under 1 KiB regardless of payload size. + expect(savedJson.length).toBeLessThan(1024); + + // Bytes are present in the streaming side of the cache (full size). + const ref = savedOutput.bytes as CacheRef; + expect(ref.size).toBe(CHUNKS * CHUNK); + const hydrated = await repo.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + expect(hydrated!.size).toBe(CHUNKS * CHUNK); + + // And Output's port slot is a ref (not a Blob). + expect(isCacheRef(output.bytes)).toBe(true); + }); + + it("contrast: a non-streaming cache embeds the full Blob in the saved row (the old bloat path)", async () => { + // Same task, but the cache cannot stream — the runner falls through to + // accumulation and the saved row contains the serialized payload. + const nonStreamRepo = new NonStreamingMemoryRepo({}); + const altServices = new ServiceRegistry(new Container()); + altServices.registerInstance( + CACHE_REGISTRY, + new DefaultCacheRegistry({ deterministic: nonStreamRepo }) + ); + + const task = new BigBlobStreamTask(); + await task.run({}, { registry: altServices, referenceThresholdBytes: 0 }); + + expect(nonStreamRepo.saved.size).toBe(1); + const [savedOutput] = Array.from(nonStreamRepo.saved.values()) as Array< + Record + >; + expect(isCacheRef(savedOutput.bytes)).toBe(false); + expect(savedOutput.bytes).toBeInstanceOf(Blob); + // The Blob itself isn't JSON-encoded inline by default, but the + // observable point is: this row carries the artifact, not a reference. + expect((savedOutput.bytes as Blob).size).toBe(CHUNKS * CHUNK); + }); + + it("cross-process simulation: serialize the small row, deserialize elsewhere, resolveJobOutput against shared cache", async () => { + const task = new BigBlobStreamTask(); + await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + const [savedOutput] = Array.from(repo.saved.values()) as Array>; + + // "Process A" → wire: serialize the small row to a string (what Postgres + // would store in JSONB / SQLite would store in TEXT for the JobStorageFormat + // output column). + const wire = JSON.stringify(savedOutput); + expect(wire.length).toBeLessThan(1024); + + // "Process B" pulls the small row off the queue and reconstructs Output. + // The CacheRef survives JSON round-trip unchanged (just data). + const received = JSON.parse(wire) as { bytes: CacheRef }; + expect(isCacheRef(received.bytes)).toBe(true); + + // Process B resolves the ref against the SHARED cache (in real + // deployments: S3, networked FS, shared Postgres) — here `repo` is the + // shared backing for the test. resolveJobOutput is the queue-boundary + // bridge that callers wrap their JobHandle in. + const handle: JobHandleLike<{ bytes: Blob }> = { + waitFor: async () => received as unknown as { bytes: Blob }, + }; + const resolved = await resolveJobOutput(handle, repo); + + expect(resolved.bytes).toBeInstanceOf(Blob); + expect((resolved.bytes as Blob).size).toBe(CHUNKS * CHUNK); + }); + + it("dangling refs (cache cleared between save and read) resolve to undefined — best-effort contract", async () => { + const task = new BigBlobStreamTask(); + await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + const [savedOutput] = Array.from(repo.saved.values()) as Array>; + + // Cache TTL expired / explicit clear / different deployment with no + // backing access — the ref now points nowhere. + await repo.clear(); + + const handle: JobHandleLike<{ bytes: Blob }> = { + waitFor: async () => savedOutput as unknown as { bytes: Blob }, + }; + const resolved = await resolveJobOutput(handle, repo); + // Per Spec 2 §2: best-effort, returns undefined on cache miss. + expect(resolved.bytes).toBeUndefined(); + }); +}); diff --git a/packages/test/src/test/task-graph/StreamBinaryProcessor.test.ts b/packages/test/src/test/task-graph/StreamBinaryProcessor.test.ts new file mode 100644 index 000000000..d97534e46 --- /dev/null +++ b/packages/test/src/test/task-graph/StreamBinaryProcessor.test.ts @@ -0,0 +1,95 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { StreamEvent } from "@workglow/task-graph"; +import { IExecuteContext, Task, TaskRegistry } from "@workglow/task-graph"; +import { sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob | ArrayBuffer }; + +/** + * A streaming source task (binary mode) that yields two byte chunks and an + * empty finish, mirroring how real binary producers emit `binary-delta` events. + */ +class BlobStreamTask extends Task, BinOut> { + public static override type = "BlobStreamTask"; + public static override category = "Test"; + public static override cacheable = false; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2]) }; + await sleep(2); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([3, 4]) }; + yield { type: "finish", data: {} as BinOut }; + } +} + +class ArrayBufferStreamTask extends BlobStreamTask { + public static override type = "ArrayBufferStreamTask"; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "binary", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } +} + +class BinaryFinishOverrideTask extends BlobStreamTask { + public static override type = "BinaryFinishOverrideTask"; + + override async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([9, 9]) }; + // Explicit finish payload at the binary port must win over accumulation. + yield { type: "finish", data: { bytes: new Blob([new Uint8Array([7])]) } as BinOut }; + } +} + +describe("StreamProcessor binary accumulation", () => { + beforeAll(() => { + TaskRegistry.registerTask(BlobStreamTask); + TaskRegistry.registerTask(ArrayBufferStreamTask); + TaskRegistry.registerTask(BinaryFinishOverrideTask); + }); + + it("accumulates binary deltas into a Blob (format: blob)", async () => { + const task = new BlobStreamTask({}); + const out = (await task.run()) as BinOut; + expect(out.bytes).toBeInstanceOf(Blob); + const buf = await (out.bytes as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4]); + }); + + it("accumulates binary deltas into an ArrayBuffer (format: binary)", async () => { + const task = new ArrayBufferStreamTask({}); + const out = (await task.run()) as BinOut; + expect(out.bytes).toBeInstanceOf(ArrayBuffer); + expect(Array.from(new Uint8Array(out.bytes as ArrayBuffer))).toEqual([1, 2, 3, 4]); + }); + + it("uses explicit finish payload at the binary port verbatim", async () => { + const out = (await new BinaryFinishOverrideTask({}).run()) as BinOut; + const buf = await (out.bytes as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([7]); // not [9,9] + }); +}); diff --git a/packages/test/src/test/task-graph/StreamBinaryPump.test.ts b/packages/test/src/test/task-graph/StreamBinaryPump.test.ts new file mode 100644 index 000000000..2968139b7 --- /dev/null +++ b/packages/test/src/test/task-graph/StreamBinaryPump.test.ts @@ -0,0 +1,625 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * StreamPump binary-stream behavior. + * + * C1 (regression guard): a binary source feeding a NON-binary consumer must + * MATERIALIZE across the edge — `edgeNeedsAccumulation(binary → non-stream)` is + * `true`, so the pump accumulates and the sink receives a finished `Blob`. + * + * C2 (cache-streaming decision + assembly): + * - The DECISION (`StreamPump.canStreamBinaryToCache`) is asserted directly, in + * isolation from a live run: `true` for a streaming-capable cache + binary-only + * leaf with no value-needing consumer; `false` for a buffered cache, for a + * downstream edge that needs the materialized value, and (defensively) for a + * cache that cannot report `supportsStreaming()`. + * - The byte-stream assembly (`binary-delta` events → `AsyncIterable` + * → `saveOutputStream`) is unit-tested in isolation via + * `StreamPump.pipeBinaryToCache`, which asserts the cache RECEIVES the bytes. + * + * NOTE (reduced scope): the live cache pipe through `TaskRunner` (suppressing the + * buffered save and driving `saveOutputStream` with the real normalized cache + * key) is deferred to Spec 2 — `TaskRunner.run()` owns `keyInputs`/policy slot + * resolution and StreamPump calls it as a black box. See the NOTE in + * `StreamPump.runStreamingTask`. + * + * IMPORTANT: in the current reduced scope nothing drives `saveOutputStream` during + * a REAL graph run, so a streaming-cache run currently emits a finish with the + * binary port absent AND the cache never receives the bytes. That absence is NOT + * the desired outcome — it is a known-incomplete path. The goal we assert here is + * "decision = don't accumulate"; live delivery (the cache actually getting the + * bytes on a real run) is completed in Spec 2. We deliberately do NOT assert that + * a binary-less finish from a real streaming-cache run is "correct", because that + * would bless silent data loss. + */ + +import type { CacheRef, ITask, StreamEvent, TaskInput, TaskOutput } from "@workglow/task-graph"; +import type { DataPortSchema } from "@workglow/util/schema"; +import { + Dataflow, + getBinaryPortId, + IExecuteContext, + StreamPump, + Task, + TaskGraph, + TaskGraphRunner, + TaskOutputRepository, + TaskStatus, +} from "@workglow/task-graph"; +import { setLogger, sleep } from "@workglow/util"; +import { beforeEach, describe, expect, it } from "vitest"; +import { getTestingLogger } from "../../binding/TestingLogger"; + +setLogger(getTestingLogger()); + +// ============================================================================ +// Test tasks +// ============================================================================ + +type BinOut = { bytes: Blob | ArrayBuffer }; + +/** + * Binary streaming source: yields two `binary-delta` chunks then an empty + * `finish` (mirrors a real producer that does not re-buffer its output). + */ +class BinaryStreamSource extends Task, BinOut> { + public static override type = "StreamBinaryPump_Source"; + public static override category = "Test"; + public static override cacheable = false; + + public static override inputSchema(): DataPortSchema { + return { type: "object", properties: {}, additionalProperties: false } as const; + } + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + context: IExecuteContext + ): AsyncIterable> { + if (context.signal.aborted) return; + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2]) }; + await sleep(2); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([3, 4]) }; + yield { type: "finish", data: {} as BinOut }; + } + + override async execute(): Promise { + return { bytes: new Blob([new Uint8Array([1, 2, 3, 4])]) }; + } +} + +/** + * A cacheable variant — needed to exercise the cache-streaming decision (the + * cache is only consulted for cacheable tasks). + */ +class CacheableBinaryStreamSource extends BinaryStreamSource { + public static override type = "StreamBinaryPump_CacheableSource"; + public static override cacheable = true; +} + +type SinkInput = { bytes: Blob | ArrayBuffer }; +type SinkOutput = { length: number; isBlob: boolean }; + +/** + * Non-binary consumer: its `bytes` input port has NO `x-stream`, so a binary + * source feeding it MUST materialize across the edge. + */ +class BinarySinkTask extends Task { + public static override type = "StreamBinaryPump_Sink"; + public static override category = "Test"; + public static override cacheable = false; + + public received: Blob | ArrayBuffer | undefined = undefined; + + public static override inputSchema(): DataPortSchema { + // No `type` constraint (accepts the materialized Blob at runtime) and NO + // `x-stream` ⇒ a non-streaming consumer that needs the value across the edge. + return { + type: "object", + properties: { bytes: { title: "Bytes", description: "materialized binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { + length: { type: "number" }, + isBlob: { type: "boolean" }, + }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + override async execute(input: SinkInput): Promise { + this.received = input.bytes; + if (input.bytes instanceof Blob) { + return { length: input.bytes.size, isBlob: true }; + } + if (input.bytes instanceof ArrayBuffer) { + return { length: input.bytes.byteLength, isBlob: false }; + } + return { length: -1, isBlob: false }; + } +} + +// ============================================================================ +// Cache repositories (in-test) +// ============================================================================ + +/** + * Records whether `saveOutputStream` (streaming) vs `saveOutput` (buffered) was + * invoked, and the total bytes seen through the streaming path. + */ +class StreamingMemoryRepo extends TaskOutputRepository { + public saveOutputCalls = 0; + public saveOutputStreamCalls = 0; + public streamedBytes: number[] = []; + private store = new Map(); + + constructor() { + super({ outputCompression: false }); + } + + override async saveOutput( + taskType: string, + inputs: TaskInput, + output: TaskOutput + ): Promise { + this.saveOutputCalls++; + this.store.set(taskType + JSON.stringify(inputs), output); + } + + override async getOutput(taskType: string, inputs: TaskInput): Promise { + return this.store.get(taskType + JSON.stringify(inputs)); + } + + override async clear(): Promise { + this.store.clear(); + } + + override async size(): Promise { + return this.store.size; + } + + override async clearOlderThan(): Promise {} + + override isDurable(): boolean { + return false; + } + + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + _metadata: Record + ): Promise { + this.saveOutputStreamCalls++; + let size = 0; + for await (const c of chunks) { + size += c.byteLength; + for (const b of c) this.streamedBytes.push(b); + } + return { $ref: `inmem://${taskType}::${JSON.stringify(inputs)}`, size }; + } +} + +/** + * A buffered-only cache: extends the streaming repo but removes the streaming + * capability so `supportsStreaming()` returns `false`. + */ +class BufferedMemoryRepo extends StreamingMemoryRepo { + public override saveOutputStream = + undefined as unknown as StreamingMemoryRepo["saveOutputStream"]; +} + +// ============================================================================ +// Helpers +// ============================================================================ + +function blobFromFinish(event: StreamEvent | undefined): Blob | ArrayBuffer | undefined { + if (!event || event.type !== "finish") return undefined; + return (event.data as Record)?.bytes as Blob | ArrayBuffer | undefined; +} + +async function* gen(...chunks: Uint8Array[]): AsyncIterable { + for (const c of chunks) yield c; +} + +/** + * Resolves to the awaited promise, or rejects with a sentinel if `ms` elapses + * first. Used so a regression (the promise never settling) fails fast instead + * of hanging the whole suite. + */ +function withTimeout(p: Promise, ms: number, label: string): Promise { + return Promise.race([ + p, + new Promise((_resolve, reject) => + setTimeout(() => reject(new Error(`timeout: ${label} did not settle within ${ms}ms`)), ms) + ), + ]); +} + +/** Minimal `ITask`-shaped event source for the `pipeBinaryToCache` assembly test. */ +class FakeEmitter { + private listeners = new Map void>>(); + on(name: string, fn: (...args: any[]) => void): void { + let s = this.listeners.get(name); + if (!s) this.listeners.set(name, (s = new Set())); + s.add(fn); + } + off(name: string, fn: (...args: any[]) => void): void { + this.listeners.get(name)?.delete(fn); + } + emit(name: string, ...args: any[]): void { + for (const fn of this.listeners.get(name) ?? []) fn(...args); + } + /** Total live listeners across all event names — used to assert detachment. */ + listenerCount(): number { + let n = 0; + for (const s of this.listeners.values()) n += s.size; + return n; + } +} + +// ============================================================================ +// C1: regression guard — binary source materializes across a non-binary edge +// ============================================================================ + +describe("StreamBinaryPump — C1 binary source → non-binary consumer", () => { + it("materializes a Blob across the edge (no production change)", async () => { + const graph = new TaskGraph(); + const source = new BinaryStreamSource({ id: "source" }); + const sink = new BinarySinkTask({ id: "sink" }); + + graph.addTasks([source, sink]); + graph.addDataflow(new Dataflow("source", "bytes", "sink", "bytes")); + + const runner = new TaskGraphRunner(graph); + const results = await runner.runGraph({}); + + expect(source.status).toBe(TaskStatus.COMPLETED); + expect(sink.status).toBe(TaskStatus.COMPLETED); + + // The sink received a materialized Blob with the concatenated bytes. + expect(sink.received).toBeInstanceOf(Blob); + const buf = await (sink.received as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4]); + + const sinkResult = results.find((r) => r.id === "sink"); + expect(sinkResult).toBeDefined(); + expect((sinkResult!.data as SinkOutput).isBlob).toBe(true); + expect((sinkResult!.data as SinkOutput).length).toBe(4); + }); +}); + +// ============================================================================ +// C2: cache-streaming decision — asserted DIRECTLY via canStreamBinaryToCache +// +// These tests assert the DECISION in isolation, not a real-run outcome. We +// deliberately do NOT run a streaming-cache graph and assert "binary port absent +// from finish" as correct: in the reduced scope nothing drives saveOutputStream +// on a real run, so absent bytes there means SILENT DATA LOSS, not success. The +// live pipe (cache actually receiving the bytes on a real run) lands in Spec 2. +// ============================================================================ + +describe("StreamBinaryPump.canStreamBinaryToCache — decision", () => { + it("returns true: streaming cache + binary-only leaf + no value-needing consumer", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + + expect(StreamPump.canStreamBinaryToCache(graph, source, new StreamingMemoryRepo())).toBe(true); + }); + + it("returns false: buffered (non-streaming) cache", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + + const cache = new BufferedMemoryRepo(); + expect(cache.supportsStreaming()).toBe(false); + expect(StreamPump.canStreamBinaryToCache(graph, source, cache)).toBe(false); + }); + + it("returns false: a downstream edge needs the materialized value", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + const sink = new BinarySinkTask({ id: "sink" }); + graph.addTasks([source, sink]); + graph.addDataflow(new Dataflow("source", "bytes", "sink", "bytes")); + + // Streaming-capable cache present, but the non-binary consumer needs the + // value across the edge ⇒ must still accumulate. + expect(StreamPump.canStreamBinaryToCache(graph, source, new StreamingMemoryRepo())).toBe(false); + }); + + it("returns false (defensive): a cache that cannot report supportsStreaming()", () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + + // A `{}`-style partial double with no `supportsStreaming` method: the guard + // must treat anything that can't affirmatively report streaming support as + // non-streaming, never optimistically piping. + const partialCache = {} as unknown as TaskOutputRepository; + expect(StreamPump.canStreamBinaryToCache(graph, source, partialCache)).toBe(false); + }); +}); + +// ============================================================================ +// C2: cache-streaming decision — observed on a real run via the source's finish. +// +// These run a real graph and assert the bytes ARE materialized (present) when the +// decision is "accumulate". They guard the POSITIVE outcome (bytes delivered), not +// the absence of bytes, so they do not bless data loss. +// ============================================================================ + +describe("StreamBinaryPump — C2 accumulation materializes bytes on a real run", () => { + let logger = getTestingLogger(); + setLogger(logger); + + it("DOES accumulate a leaf binary task when the cache cannot stream", async () => { + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + graph.addTask(source); + const runner = new TaskGraphRunner(graph); + + const finishes: StreamEvent[] = []; + source.on("stream_chunk", (e) => { + if (e.type === "finish") finishes.push(e); + }); + + const cache = new BufferedMemoryRepo(); + expect(cache.supportsStreaming()).toBe(false); + await runner.runGraph({}, { outputCache: cache }); + + // Decision = true ⇒ enriched finish ⇒ binary port materialized to a Blob. + expect(finishes.length).toBe(1); + const bytes = blobFromFinish(finishes[0]); + expect(bytes).toBeInstanceOf(Blob); + const buf = await (bytes as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4]); + }); + + it("tees when a downstream edge needs materialized AND the cache can stream", async () => { + // Spec 2 Phase E: cache-can-stream + downstream-needs-materialized used to + // inhibit refs entirely. Now both paths fire — accumulator drives the + // enriched finish event (Blob for the edge consumer) and the router + // writes to the cache so the queue/cache row stays small. + const graph = new TaskGraph(); + const source = new CacheableBinaryStreamSource({ id: "source" }); + const sink = new BinarySinkTask({ id: "sink" }); + graph.addTasks([source, sink]); + graph.addDataflow(new Dataflow("source", "bytes", "sink", "bytes")); + const runner = new TaskGraphRunner(graph); + + const finishes: StreamEvent[] = []; + source.on("stream_chunk", (e) => { + if (e.type === "finish") finishes.push(e); + }); + + const cache = new StreamingMemoryRepo(); + await runner.runGraph({}, { outputCache: cache }); + + // Edge path: downstream still receives a materialized Blob. + expect(finishes.length).toBe(1); + const bytes = blobFromFinish(finishes[0]); + expect(bytes).toBeInstanceOf(Blob); + expect(sink.received).toBeInstanceOf(Blob); + + // Cache path: the streaming sink fired too (tee). + expect(cache.saveOutputStreamCalls).toBeGreaterThanOrEqual(1); + }); +}); + +// ============================================================================ +// C2: byte-stream assembly (binary-delta events → AsyncIterable → sink) +// ============================================================================ + +describe("StreamBinaryPump.pipeBinaryToCache — assembly", () => { + it("feeds binary-delta chunks to the sink and resolves on stream_end", async () => { + const emitter = new FakeEmitter(); + const repo = new StreamingMemoryRepo(); + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + (chunks) => repo.saveOutputStream("T", { k: 1 }, chunks, {}) + ); + + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2]), + }); + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([3]), + }); + emitter.emit("stream_end", {}); + + await promise; + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(repo.saveOutputCalls).toBe(0); + expect(repo.streamedBytes).toEqual([1, 2, 3]); + }); + + it("filters chunks to the requested binary port", async () => { + const emitter = new FakeEmitter(); + const seen: number[] = []; + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const c of chunks) for (const b of c) seen.push(b); + } + ); + + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "other", + binaryDelta: new Uint8Array([9]), + }); + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([5, 6]), + }); + emitter.emit("stream_end", {}); + + await promise; + expect(seen).toEqual([5, 6]); + }); + + it("uses getBinaryPortId to resolve the source's binary port", () => { + expect(getBinaryPortId(CacheableBinaryStreamSource.outputSchema())).toBe("bytes"); + }); + + // -------------------------------------------------------------------------- + // Failure path: source aborts/errors WITHOUT emitting stream_end. + // + // StreamProcessor emits `stream_end` ONLY on success — on abort/error it + // throws before emitting it. Against the OLD helper (which terminated the + // iterable only on `stream_end`) the iterable would await forever, the sink + // would never resolve, the returned promise would never settle, and `detach` + // (wired via `.finally`) would never run → permanent listener leak + hang. + // These tests force settlement under a timeout guard so a regression fails + // fast rather than hanging the suite, and assert all listeners are detached. + // -------------------------------------------------------------------------- + + it("settles and detaches when the source ABORTS without stream_end", async () => { + const emitter = new FakeEmitter(); + const seen: number[] = []; + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const c of chunks) for (const b of c) seen.push(b); + } + ); + + // A chunk or two, then abort — NO stream_end. + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2]), + }); + emitter.emit("abort", new Error("aborted")); + + // Against the OLD code this never settles → withTimeout rejects → test fails. + await withTimeout(promise, 500, "pipeBinaryToCache(abort)"); + + // The bytes seen before the abort were finalized, and listeners are gone. + expect(seen).toEqual([1, 2]); + expect(emitter.listenerCount()).toBe(0); + }); + + it("settles and detaches when the source ERRORS without stream_end", async () => { + const emitter = new FakeEmitter(); + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + // Consume whatever arrives; the helper must close the iterable on error. + for await (const _c of chunks) { + /* drain */ + } + } + ); + + emitter.emit("error", new Error("boom")); + + await withTimeout(promise, 500, "pipeBinaryToCache(error)"); + expect(emitter.listenerCount()).toBe(0); + }); + + it("settles and detaches when an AbortSignal fires without stream_end", async () => { + const emitter = new FakeEmitter(); + const controller = new AbortController(); + + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const _c of chunks) { + /* drain */ + } + }, + controller.signal + ); + + emitter.emit("stream_chunk", { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([7]), + }); + controller.abort(); + + await withTimeout(promise, 500, "pipeBinaryToCache(signal)"); + expect(emitter.listenerCount()).toBe(0); + }); + + it("settles when the AbortSignal is ALREADY aborted at call time", async () => { + const emitter = new FakeEmitter(); + const { promise } = StreamPump.pipeBinaryToCache( + emitter as unknown as ITask, + "bytes", + async (chunks) => { + for await (const _c of chunks) { + /* drain */ + } + }, + AbortSignal.abort() + ); + + await withTimeout(promise, 500, "pipeBinaryToCache(pre-aborted)"); + expect(emitter.listenerCount()).toBe(0); + }); +}); + +// ============================================================================ +// Sanity: the in-test repos behave as expected +// ============================================================================ + +describe("StreamBinaryPump — repo capability sanity", () => { + let repo: StreamingMemoryRepo; + beforeEach(() => { + repo = new StreamingMemoryRepo(); + }); + + it("streaming repo reports supportsStreaming() === true", () => { + expect(repo.supportsStreaming()).toBe(true); + }); + + it("buffered repo reports supportsStreaming() === false", () => { + expect(new BufferedMemoryRepo().supportsStreaming()).toBe(false); + }); + + it("saveOutputStream concatenates all delivered bytes", async () => { + await repo.saveOutputStream( + "T", + { k: 1 }, + gen(new Uint8Array([1, 2]), new Uint8Array([3])), + {} + ); + expect(repo.streamedBytes).toEqual([1, 2, 3]); + }); +}); diff --git a/packages/test/src/test/task-graph/StreamBinaryTypes.test.ts b/packages/test/src/test/task-graph/StreamBinaryTypes.test.ts new file mode 100644 index 000000000..5a48d3e85 --- /dev/null +++ b/packages/test/src/test/task-graph/StreamBinaryTypes.test.ts @@ -0,0 +1,126 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ +import { describe, expect, it } from "vitest"; +import type { StreamBinaryDelta, StreamEvent, StreamMode } from "@workglow/task-graph"; +import { + getPortStreamMode, + getStreamingPorts, + getOutputStreamMode, + getBinaryPortId, + edgeNeedsAccumulation, + materializeBinary, +} from "@workglow/task-graph"; +import type { DataPortSchema } from "@workglow/util/schema"; + +const binarySchema = { + type: "object", + properties: { + bytes: { type: "object", format: "blob", "x-stream": "binary" }, + }, + additionalProperties: false, +} as const satisfies DataPortSchema; + +const mixedSchema = { + type: "object", + properties: { + text: { type: "string", "x-stream": "append" }, + bytes: { type: "object", format: "binary", "x-stream": "binary" }, + }, + additionalProperties: false, +} as const satisfies DataPortSchema; + +describe("StreamBinaryDelta type", () => { + it("is assignable to StreamEvent and carries a Uint8Array delta", () => { + const evt: StreamEvent = { + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2, 3]), + } satisfies StreamBinaryDelta; + expect(evt.type).toBe("binary-delta"); + if (evt.type === "binary-delta") { + expect(evt.binaryDelta).toBeInstanceOf(Uint8Array); + expect(Array.from(evt.binaryDelta)).toEqual([1, 2, 3]); + } + }); + + it("admits 'binary' as a StreamMode", () => { + const mode: StreamMode = "binary"; + expect(mode).toBe("binary"); + }); +}); + +describe("binary-aware port helpers", () => { + it("getPortStreamMode returns 'binary'", () => { + expect(getPortStreamMode(binarySchema, "bytes")).toBe("binary"); + }); + + it("getStreamingPorts includes binary ports", () => { + expect(getStreamingPorts(binarySchema)).toEqual([{ port: "bytes", mode: "binary" }]); + }); + + it("getOutputStreamMode returns 'binary' for a single binary port", () => { + expect(getOutputStreamMode(binarySchema)).toBe("binary"); + }); + + it("getOutputStreamMode returns 'mixed' for append + binary", () => { + expect(getOutputStreamMode(mixedSchema)).toBe("mixed"); + }); + + it("getBinaryPortId finds the first binary port", () => { + expect(getBinaryPortId(binarySchema)).toBe("bytes"); + expect(getBinaryPortId(mixedSchema)).toBe("bytes"); + }); + + it("getBinaryPortId returns undefined when no binary port", () => { + const noBinary = { + type: "object", + properties: { text: { type: "string", "x-stream": "append" } }, + } as const satisfies DataPortSchema; + expect(getBinaryPortId(noBinary)).toBeUndefined(); + }); + + it("edgeNeedsAccumulation: binary source → non-binary target accumulates", () => { + const target = { + type: "object", + properties: { bytes: { type: "object" } }, + } as const satisfies DataPortSchema; + expect(edgeNeedsAccumulation(binarySchema, "bytes", target, "bytes")).toBe(true); + }); + + it("edgeNeedsAccumulation: binary → binary passes through", () => { + expect(edgeNeedsAccumulation(binarySchema, "bytes", binarySchema, "bytes")).toBe(false); + }); +}); + +describe("materializeBinary", () => { + const chunks = [new Uint8Array([1, 2]), new Uint8Array([3, 4, 5])]; + + it("concatenates to an ArrayBuffer when format is 'binary'", async () => { + const out = materializeBinary(chunks, "binary"); + expect(out).toBeInstanceOf(ArrayBuffer); + expect(Array.from(new Uint8Array(out as ArrayBuffer))).toEqual([1, 2, 3, 4, 5]); + }); + + it("concatenates to a Blob when format is 'blob'", async () => { + const out = materializeBinary(chunks, "blob"); + expect(out).toBeInstanceOf(Blob); + const buf = await (out as Blob).arrayBuffer(); + expect(Array.from(new Uint8Array(buf))).toEqual([1, 2, 3, 4, 5]); + }); + + it("defaults to Blob when format is undefined", () => { + expect(materializeBinary(chunks, undefined)).toBeInstanceOf(Blob); + }); + + it("handles an empty chunk list", () => { + expect(materializeBinary([], "binary")).toBeInstanceOf(ArrayBuffer); + expect((materializeBinary([], "binary") as ArrayBuffer).byteLength).toBe(0); + }); + + it("treats an unknown format as binary (ArrayBuffer)", () => { + expect(materializeBinary(chunks, "wat")).toBeInstanceOf(ArrayBuffer); + }); +}); diff --git a/packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts b/packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts new file mode 100644 index 000000000..f14b9d42d --- /dev/null +++ b/packages/test/src/test/task-graph/StreamProcessorBinaryRefSink.test.ts @@ -0,0 +1,225 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { BinaryRefSink, CacheRef, StreamEvent } from "@workglow/task-graph"; +import { IExecuteContext, isCacheRef, Task, TaskRegistry } from "@workglow/task-graph"; +import { sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob | ArrayBuffer }; +type TwoBinOut = { audio: Blob; transcript: Blob }; + +class BlobStreamTask extends Task, BinOut> { + public static override type = "BinaryRefSinkTest_BlobStream"; + public static override category = "Test"; + public static override cacheable = false; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2]) }; + await sleep(1); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([3]) }; + yield { type: "finish", data: {} as BinOut }; + } +} + +class TwoPortStreamTask extends Task, TwoBinOut> { + public static override type = "BinaryRefSinkTest_TwoPort"; + public static override category = "Test"; + public static override cacheable = false; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { + audio: { type: "object", format: "blob", "x-stream": "binary" }, + transcript: { type: "object", format: "blob", "x-stream": "binary" }, + }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "audio", binaryDelta: new Uint8Array([10, 11]) }; + yield { type: "binary-delta", port: "transcript", binaryDelta: new Uint8Array([20]) }; + yield { type: "binary-delta", port: "audio", binaryDelta: new Uint8Array([12]) }; + yield { type: "finish", data: {} as TwoBinOut }; + } +} + +class ExplicitFinishPayloadTask extends BlobStreamTask { + public static override type = "BinaryRefSinkTest_ExplicitFinish"; + + override async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([9, 9]) }; + yield { type: "finish", data: { bytes: new Blob([new Uint8Array([7])]) } as BinOut }; + } +} + +beforeAll(() => { + TaskRegistry.registerTask(BlobStreamTask as any); + TaskRegistry.registerTask(TwoPortStreamTask as any); + TaskRegistry.registerTask(ExplicitFinishPayloadTask as any); +}); + +function makeSink(): { + sink: BinaryRefSink; + collected: Promise<{ ref: CacheRef; bytes: number[] }>; +} { + const $ref = `inmem://test/${Math.random().toString(36).slice(2)}`; + let resolveCollected: (v: { ref: CacheRef; bytes: number[] }) => void = () => {}; + let rejectCollected: (e: unknown) => void = () => {}; + const collected = new Promise<{ ref: CacheRef; bytes: number[] }>((res, rej) => { + resolveCollected = res; + rejectCollected = rej; + }); + const sink: BinaryRefSink = async (chunks) => { + const bytes: number[] = []; + try { + for await (const c of chunks) { + for (const b of c) bytes.push(b); + } + } catch (err) { + rejectCollected(err); + throw err; + } + const ref = { $ref, size: bytes.length, mime: "application/octet-stream" }; + resolveCollected({ ref, bytes }); + return ref; + }; + return { sink, collected }; +} + +describe("StreamProcessor — binaryRefSinks (direct deps wiring)", () => { + it("routes a single binary port to its sink and produces CacheRef in Output", async () => { + const task = new BlobStreamTask(); + const { sink, collected } = makeSink(); + + // Drive the streamProcessor directly with sinks injected. + const processor = (task as any).runner.streamProcessor as { + run(input: any, ctx: any, deps: any): Promise; + }; + + // Mimic minimal ctx + deps the processor needs. + const abortController = new AbortController(); + const ctx = { + abortController, + shouldAccumulate: true, + registry: undefined, + runId: undefined, + runStartedAt: new Date(), + runOutputData: {}, + telemetrySpan: undefined, + dispose: () => {}, + } as any; + + const output = (await processor.run({}, ctx, { + registry: undefined as any, + resourceScope: undefined, + inputStreams: undefined, + onProgress: async () => {}, + own: (t: T) => t, + binaryRefSinks: new Map([["bytes", sink]]), + })) as BinOut; + + expect(output).toBeDefined(); + expect(isCacheRef((output as any).bytes)).toBe(true); + const { ref, bytes } = await collected; + expect(ref.size).toBe(3); + expect(bytes).toEqual([1, 2, 3]); + expect((output as any).bytes.$ref).toBe(ref.$ref); + }); + + it("routes only the configured port; other binary ports continue to accumulate", async () => { + const task = new TwoPortStreamTask(); + const { sink: audioSink, collected: audioCollected } = makeSink(); + + const processor = (task as any).runner.streamProcessor as { + run(input: any, ctx: any, deps: any): Promise; + }; + const abortController = new AbortController(); + const ctx = { + abortController, + shouldAccumulate: true, + registry: undefined, + runId: undefined, + runStartedAt: new Date(), + runOutputData: {}, + telemetrySpan: undefined, + dispose: () => {}, + } as any; + + const output = (await processor.run({}, ctx, { + registry: undefined as any, + resourceScope: undefined, + inputStreams: undefined, + onProgress: async () => {}, + own: (t: T) => t, + binaryRefSinks: new Map([["audio", audioSink]]), + })) as TwoBinOut; + + expect(isCacheRef((output as any).audio)).toBe(true); + expect((output as any).transcript).toBeInstanceOf(Blob); + const { bytes: audioBytes } = await audioCollected; + expect(audioBytes).toEqual([10, 11, 12]); + const transcriptBytes = new Uint8Array(await (output.transcript as Blob).arrayBuffer()); + expect(Array.from(transcriptBytes)).toEqual([20]); + }); + + it("explicit binary finish payload wins over the sink's CacheRef (artifact precedence)", async () => { + const task = new ExplicitFinishPayloadTask(); + const { sink, collected } = makeSink(); + const processor = (task as any).runner.streamProcessor as { + run(input: any, ctx: any, deps: any): Promise; + }; + const abortController = new AbortController(); + const ctx = { + abortController, + shouldAccumulate: true, + registry: undefined, + runId: undefined, + runStartedAt: new Date(), + runOutputData: {}, + telemetrySpan: undefined, + dispose: () => {}, + } as any; + + const output = (await processor.run({}, ctx, { + registry: undefined as any, + resourceScope: undefined, + inputStreams: undefined, + onProgress: async () => {}, + own: (t: T) => t, + binaryRefSinks: new Map([["bytes", sink]]), + })) as BinOut; + + // The explicit finish payload (Blob of [7]) takes the slot, not the ref. + expect(isCacheRef((output as any).bytes)).toBe(false); + expect((output as any).bytes).toBeInstanceOf(Blob); + const blobBytes = new Uint8Array(await (output.bytes as Blob).arrayBuffer()); + expect(Array.from(blobBytes)).toEqual([7]); + // The sink still observed the deltas (just lost the race for the slot). + const { bytes } = await collected; + expect(bytes).toEqual([9, 9]); + }); +}); diff --git a/packages/test/src/test/task-graph/TaskOutputRepositoryStream.test.ts b/packages/test/src/test/task-graph/TaskOutputRepositoryStream.test.ts new file mode 100644 index 000000000..aa852b90f --- /dev/null +++ b/packages/test/src/test/task-graph/TaskOutputRepositoryStream.test.ts @@ -0,0 +1,190 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ +import { describe, expect, it } from "vitest"; +import { RunPrivateCacheRepo, TaskOutputRepository } from "@workglow/task-graph"; +import type { CacheRef, TaskInput, TaskOutput } from "@workglow/task-graph"; + +class StreamingMemoryRepo extends TaskOutputRepository { + public readonly streamed = new Map(); + public readonly streamedMetadata = new Map>(); + private store = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.store.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.store.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.store.clear(); + this.streamed.clear(); + } + override async size(): Promise { + return this.store.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + metadata: Record + ): Promise { + const parts: Uint8Array[] = []; + for await (const c of chunks) parts.push(c); + let total = 0; + for (const p of parts) total += p.byteLength; + const merged = new Uint8Array(total); + let off = 0; + for (const p of parts) { + merged.set(p, off); + off += p.byteLength; + } + const key = taskType + JSON.stringify(inputs); + this.streamed.set(key, merged); + this.streamedMetadata.set(key, metadata); + return { $ref: `inmem://${key}`, size: total }; + } + override async getOutputByRef(ref: CacheRef): Promise { + const key = ref.$ref.replace(/^inmem:\/\//, ""); + const bytes = this.streamed.get(key); + return bytes === undefined ? undefined : new Blob([bytes as unknown as BlobPart]); + } + override getOutputStreamByRef(ref: CacheRef): AsyncIterable | undefined { + const key = ref.$ref.replace(/^inmem:\/\//, ""); + const bytes = this.streamed.get(key); + if (bytes === undefined) return undefined; + return (async function* () { + yield bytes; + })(); + } +} + +// A minimal repo that simply does NOT define `saveOutputStream`, so it has no +// streaming capability (no double-cast shadowing). +class NonStreamingMemoryRepo extends TaskOutputRepository { + private store = new Map(); + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.store.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.store.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.store.clear(); + } + override async size(): Promise { + return this.store.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } +} + +async function* gen(...chunks: Uint8Array[]): AsyncIterable { + for (const c of chunks) yield c; +} + +describe("TaskOutputRepository.saveOutputStream", () => { + it("supportsStreaming reflects presence of saveOutputStream", () => { + expect(new StreamingMemoryRepo({}).supportsStreaming()).toBe(true); + expect(new NonStreamingMemoryRepo({}).supportsStreaming()).toBe(false); + }); + + it("streams chunks and the total equals total bytes streamed", async () => { + const repo = new StreamingMemoryRepo({}); + await repo.saveOutputStream( + "T", + { k: 1 }, + gen(new Uint8Array([1, 2]), new Uint8Array([3])), + {} + ); + expect(Array.from(repo.streamed.get('T{"k":1}')!)).toEqual([1, 2, 3]); + }); + + it("an empty stream stores a zero-length Uint8Array", async () => { + const repo = new StreamingMemoryRepo({}); + await repo.saveOutputStream("T", { k: 1 }, gen(), {}); + const stored = repo.streamed.get('T{"k":1}')!; + expect(stored).toBeInstanceOf(Uint8Array); + expect(stored.byteLength).toBe(0); + }); + + it("passes the metadata arg through to the repo (side-band contract)", async () => { + const repo = new StreamingMemoryRepo({}); + const metadata = { contentType: "application/octet-stream", status: 200 }; + await repo.saveOutputStream("T", { k: 1 }, gen(new Uint8Array([9])), metadata); + expect(repo.streamedMetadata.get('T{"k":1}')).toEqual(metadata); + }); + + it("RunPrivateCacheRepo forwards streaming with namespaced taskType", async () => { + const backing = new StreamingMemoryRepo({}); + const wrapper = new RunPrivateCacheRepo({ backing, runId: "run-A" }); + + expect(wrapper.supportsStreaming()).toBe(true); + + await wrapper.saveOutputStream("T", { k: 1 }, gen(new Uint8Array([1, 2, 3])), {}); + + // taskType is namespaced exactly as saveOutput namespaces it. + const namespacedKey = `__run:run-A::T${JSON.stringify({ k: 1 })}`; + expect(Array.from(backing.streamed.get(namespacedKey)!)).toEqual([1, 2, 3]); + expect(backing.streamed.has('T{"k":1}')).toBe(false); + }); + + it("RunPrivateCacheRepo.supportsStreaming() is false when backing lacks it", () => { + const backing = new NonStreamingMemoryRepo({}); + const wrapper = new RunPrivateCacheRepo({ backing, runId: "run-A" }); + expect(wrapper.supportsStreaming()).toBe(false); + }); + + it("saveOutputStream returns a CacheRef the same backing can resolve to bytes", async () => { + const repo = new StreamingMemoryRepo({}); + const ref = await repo.saveOutputStream( + "T", + { k: 1 }, + gen(new Uint8Array([7, 8, 9])), + {} + ); + expect(typeof ref.$ref).toBe("string"); + expect(ref.size).toBe(3); + const hydrated = await repo.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await hydrated!.arrayBuffer()); + expect(Array.from(bytes)).toEqual([7, 8, 9]); + }); + + it("getOutputStreamByRef yields bytes for a saved ref", async () => { + const repo = new StreamingMemoryRepo({}); + const ref = await repo.saveOutputStream("T", { k: 2 }, gen(new Uint8Array([4, 5])), {}); + const stream = repo.getOutputStreamByRef(ref); + expect(stream).toBeDefined(); + const collected: number[] = []; + for await (const chunk of stream!) { + for (const b of chunk) collected.push(b); + } + expect(collected).toEqual([4, 5]); + }); + + it("getOutputByRef returns undefined after clear (dangling reference)", async () => { + const repo = new StreamingMemoryRepo({}); + const ref = await repo.saveOutputStream("T", { k: 3 }, gen(new Uint8Array([1])), {}); + expect(await repo.getOutputByRef(ref)).toBeInstanceOf(Blob); + await repo.clear(); + expect(await repo.getOutputByRef(ref)).toBeUndefined(); + }); + + it("RunPrivateCacheRepo forwards getOutputByRef to backing", async () => { + const backing = new StreamingMemoryRepo({}); + const wrapper = new RunPrivateCacheRepo({ backing, runId: "run-B" }); + const ref = await wrapper.saveOutputStream("T", { k: 4 }, gen(new Uint8Array([42])), {}); + const hydrated = await wrapper.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await hydrated!.arrayBuffer()); + expect(Array.from(bytes)).toEqual([42]); + }); +}); diff --git a/packages/test/src/test/task-graph/TaskRunnerRefPath.test.ts b/packages/test/src/test/task-graph/TaskRunnerRefPath.test.ts new file mode 100644 index 000000000..b644da9b2 --- /dev/null +++ b/packages/test/src/test/task-graph/TaskRunnerRefPath.test.ts @@ -0,0 +1,205 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { CacheRef, StreamEvent } from "@workglow/task-graph"; +import { + CACHE_REGISTRY, + DefaultCacheRegistry, + IExecuteContext, + isCacheRef, + Task, + TaskOutputRepository, + TaskRegistry, +} from "@workglow/task-graph"; +import type { TaskInput, TaskOutput } from "@workglow/task-graph"; +import { Container, ServiceRegistry, sleep } from "@workglow/util"; +import { DataPortSchema } from "@workglow/util/schema"; +import { beforeAll, beforeEach, describe, expect, it } from "vitest"; + +type BinOut = { bytes: Blob | ArrayBuffer }; + +class StreamingMemoryRepo extends TaskOutputRepository { + public readonly saved = new Map(); + public readonly streamed = new Map(); + public saveOutputCalls = 0; + public saveOutputStreamCalls = 0; + + override async saveOutput(t: string, i: TaskInput, o: TaskOutput): Promise { + this.saveOutputCalls++; + this.saved.set(t + JSON.stringify(i), o); + } + override async getOutput(t: string, i: TaskInput): Promise { + return this.saved.get(t + JSON.stringify(i)); + } + override async clear(): Promise { + this.saved.clear(); + this.streamed.clear(); + } + override async size(): Promise { + return this.saved.size; + } + override async clearOlderThan(): Promise {} + override isDurable(): boolean { + return false; + } + override async saveOutputStream( + taskType: string, + inputs: TaskInput, + chunks: AsyncIterable, + _metadata: Record + ): Promise { + this.saveOutputStreamCalls++; + const parts: Uint8Array[] = []; + let size = 0; + for await (const c of chunks) { + parts.push(c); + size += c.byteLength; + } + const merged = new Uint8Array(size); + let off = 0; + for (const p of parts) { + merged.set(p, off); + off += p.byteLength; + } + const key = `inmem://${taskType}::${JSON.stringify(inputs)}`; + this.streamed.set(key, merged); + return { $ref: key, size, mime: "application/octet-stream" }; + } + override async getOutputByRef(ref: CacheRef): Promise { + const bytes = this.streamed.get(ref.$ref); + return bytes === undefined ? undefined : new Blob([bytes as unknown as BlobPart]); + } +} + +class BlobStreamTask extends Task, BinOut> { + public static override type = "TaskRunnerRefPathTest_BlobStream"; + public static override category = "Test"; + public static override cacheable = true; + + public static override outputSchema(): DataPortSchema { + return { + type: "object", + properties: { bytes: { type: "object", format: "blob", "x-stream": "binary" } }, + additionalProperties: false, + } as const satisfies DataPortSchema; + } + + async *executeStream( + _input: Record, + _ctx: IExecuteContext + ): AsyncIterable> { + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([1, 2, 3]) }; + await sleep(1); + yield { type: "binary-delta", port: "bytes", binaryDelta: new Uint8Array([4, 5]) }; + yield { type: "finish", data: {} as BinOut }; + } +} + +class NonCacheableBlobStreamTask extends BlobStreamTask { + public static override type = "TaskRunnerRefPathTest_NonCacheableBlobStream"; + public static override cacheable = false; +} + +beforeAll(() => { + TaskRegistry.registerTask(BlobStreamTask as any); + TaskRegistry.registerTask(NonCacheableBlobStreamTask as any); +}); + +let repo: StreamingMemoryRepo; +let services: ServiceRegistry; +beforeEach(() => { + repo = new StreamingMemoryRepo({}); + services = new ServiceRegistry(new Container()); + services.registerInstance(CACHE_REGISTRY, new DefaultCacheRegistry({ deterministic: repo })); +}); + +describe("TaskRunner — referenceThresholdBytes: 0 (force-ref) ref path", () => { + it("Output carries a CacheRef at the binary port; bytes live in the streaming cache", async () => { + const task = new BlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(isCacheRef(output.bytes)).toBe(true); + + const ref = output.bytes as unknown as CacheRef; + const hydrated = await repo.getOutputByRef(ref); + expect(hydrated).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await hydrated!.arrayBuffer()); + expect(Array.from(bytes)).toEqual([1, 2, 3, 4, 5]); + expect(ref.size).toBe(5); + }); + + it("saveOutput still runs (small Output with embedded ref → small queue/cache row)", async () => { + const task = new BlobStreamTask(); + await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputCalls).toBe(1); + // The cached small row contains the ref, NOT the bytes. + const [savedOutput] = Array.from(repo.saved.values()); + expect(isCacheRef((savedOutput as any).bytes)).toBe(true); + }); + + it("defaults (threshold 64 KiB) produce inline Blob in Output — small outputs rehydrate", async () => { + const task = new BlobStreamTask(); + const output = await task.run({}, { registry: services }); + + // D.4: sink runs unconditionally when cache supports streaming; the + // rehydrate step converts the ref back to an inline Blob because total + // bytes (5) is below the 64 KiB default threshold. + expect(repo.saveOutputStreamCalls).toBe(1); + expect(output.bytes).toBeInstanceOf(Blob); + const bytes = new Uint8Array(await (output.bytes as Blob).arrayBuffer()); + expect(Array.from(bytes)).toEqual([1, 2, 3, 4, 5]); + }); + + it("non-cacheable tasks fall through to accumulation even with threshold=0", async () => { + const task = new NonCacheableBlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputStreamCalls).toBe(0); + expect(output.bytes).toBeInstanceOf(Blob); + }); +}); + +describe("TaskRunner — Phase D.4 threshold-based size decision", () => { + it("output below threshold is rehydrated to an inline Blob (sink still ran for memory bound)", async () => { + const task = new BlobStreamTask(); + // Threshold well above the 5 bytes the task produces → rehydrate inline. + const output = await task.run({}, { registry: services, referenceThresholdBytes: 100 }); + + expect(repo.saveOutputStreamCalls).toBe(1); // sink ran (memory-bounded write) + expect(output.bytes).toBeInstanceOf(Blob); // but the slot is now an inline Blob + const bytes = new Uint8Array(await (output.bytes as Blob).arrayBuffer()); + expect(Array.from(bytes)).toEqual([1, 2, 3, 4, 5]); + }); + + it("output at or above threshold keeps the CacheRef", async () => { + const task = new BlobStreamTask(); + // 5 bytes >= threshold 5 → ref survives. + const output = await task.run({}, { registry: services, referenceThresholdBytes: 5 }); + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(isCacheRef(output.bytes)).toBe(true); + expect((output.bytes as unknown as CacheRef).size).toBe(5); + }); + + it("threshold=0 (force-ref) overrides the size check; the ref survives regardless", async () => { + const task = new BlobStreamTask(); + const output = await task.run({}, { registry: services, referenceThresholdBytes: 0 }); + + expect(repo.saveOutputStreamCalls).toBe(1); + expect(isCacheRef(output.bytes)).toBe(true); + }); + + it("default threshold (64 KiB) rehydrates the small-output path automatically", async () => { + const task = new BlobStreamTask(); + // No threshold specified → resolves to 64 KiB default; 5 bytes is below. + const output = await task.run({}, { registry: services }); + + expect(repo.saveOutputStreamCalls).toBe(1); // sink now always runs when cache supports it + expect(output.bytes).toBeInstanceOf(Blob); + }); +}); diff --git a/packages/test/src/test/task-graph/resolveJobOutput.test.ts b/packages/test/src/test/task-graph/resolveJobOutput.test.ts new file mode 100644 index 000000000..0eb6d58e2 --- /dev/null +++ b/packages/test/src/test/task-graph/resolveJobOutput.test.ts @@ -0,0 +1,86 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it } from "vitest"; +import { resolveJobOutput } from "@workglow/task-graph"; +import type { CacheRef, CacheRefResolver, JobHandleLike } from "@workglow/task-graph"; + +const handleOf = (value: T): JobHandleLike => ({ + waitFor: async () => value, +}); + +const ref = (key: string, size = 0): CacheRef => ({ $ref: key, size }); + +describe("resolveJobOutput", () => { + it("awaits the job and hydrates a top-level ref through a function resolver", async () => { + const blob = new Blob([new Uint8Array([1, 2, 3])]); + const resolver: CacheRefResolver = async (r) => + r.$ref === "cache://A" ? blob : undefined; + const handle = handleOf({ bytes: ref("cache://A", 3) as unknown as Blob }); + const out = await resolveJobOutput(handle, resolver); + expect(out.bytes).toBe(blob); + }); + + it("accepts an object with getOutputByRef (TaskOutputRepository shape)", async () => { + const blob = new Blob([new Uint8Array([7, 8])]); + const backing = { + getOutputByRef: async (r: CacheRef) => (r.$ref === "cache://B" ? blob : undefined), + }; + const handle = handleOf({ payload: ref("cache://B", 2) as unknown as Blob }); + const out = await resolveJobOutput(handle, backing); + expect(out.payload).toBe(blob); + }); + + it("returns the output unchanged when the backing has no getOutputByRef", async () => { + const original = { bytes: ref("cache://x", 1) as unknown as Blob }; + const handle = handleOf(original); + const out = await resolveJobOutput(handle, {}); + expect(out).toBe(original); + }); + + it("replaces refs with undefined on cache miss (best-effort)", async () => { + const handle = handleOf({ bytes: ref("cache://missing", 1) as unknown as Blob }); + const out = await resolveJobOutput(handle, async () => undefined); + expect(out.bytes).toBeUndefined(); + }); + + it("walks nested structures", async () => { + const blob = new Blob([new Uint8Array([42])]); + const handle = handleOf({ + meta: { lang: "en" }, + payload: { audio: ref("cache://A", 1) as unknown as Blob }, + }); + const out = await resolveJobOutput(handle, async () => blob); + expect(out.meta).toEqual({ lang: "en" }); + expect(out.payload.audio).toBe(blob); + }); + + it("propagates rejection from the underlying handle.waitFor()", async () => { + const handle: JobHandleLike = { + waitFor: async () => { + throw new Error("job failed"); + }, + }; + await expect(resolveJobOutput(handle, async () => undefined)).rejects.toThrow("job failed"); + }); + + it("forwards ResolveOutputOptions to the underlying walker", async () => { + let inFlight = 0; + let observedMax = 0; + const resolver: CacheRefResolver = async () => { + inFlight++; + observedMax = Math.max(observedMax, inFlight); + await new Promise((res) => setTimeout(res, 5)); + inFlight--; + return new Blob(); + }; + const handle = handleOf( + Array.from({ length: 6 }, (_, i) => ref(`cache://r${i}`, 1) as unknown as Blob) + ); + await resolveJobOutput(handle, resolver, { concurrency: 2 }); + expect(observedMax).toBeLessThanOrEqual(2); + }); +}); diff --git a/packages/test/src/test/task-graph/resolveOutput.test.ts b/packages/test/src/test/task-graph/resolveOutput.test.ts new file mode 100644 index 000000000..1a5a4ead7 --- /dev/null +++ b/packages/test/src/test/task-graph/resolveOutput.test.ts @@ -0,0 +1,136 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { describe, expect, it, vi } from "vitest"; +import { resolveOutput } from "@workglow/task-graph"; +import type { CacheRef, CacheRefResolver } from "@workglow/task-graph"; + +const ref = (key: string, size?: number, mime?: string): CacheRef => ({ + $ref: key, + ...(size !== undefined ? { size } : {}), + ...(mime !== undefined ? { mime } : {}), +}); + +const fakeResolver = + (table: Record): CacheRefResolver => + async (r) => + table[r.$ref]; + +describe("resolveOutput", () => { + it("returns primitives and non-ref objects unchanged", async () => { + const resolver = vi.fn(fakeResolver({})); + const input = { a: 1, b: "two", c: true, d: null }; + expect(await resolveOutput(input, resolver)).toEqual(input); + expect(resolver).not.toHaveBeenCalled(); + }); + + it("resolves a top-level ref to bytes", async () => { + const blob = new Blob([new Uint8Array([1, 2, 3])]); + const table = { "cache://x": blob }; + const out = await resolveOutput(ref("cache://x") as unknown as Blob, fakeResolver(table)); + expect(out).toBe(blob); + }); + + it("resolves refs nested inside a plain object, leaving siblings alone", async () => { + const audio = new Blob([new Uint8Array([9, 9, 9])]); + const input = { + transcript: "hello", + audio: ref("cache://a", 3, "audio/wav") as unknown as Blob, + meta: { lang: "en" }, + }; + const out = await resolveOutput(input, fakeResolver({ "cache://a": audio })); + expect(out.transcript).toBe("hello"); + expect(out.audio).toBe(audio); + expect(out.meta).toEqual({ lang: "en" }); + }); + + it("resolves refs inside arrays", async () => { + const b1 = new Blob([new Uint8Array([1])]); + const b2 = new Blob([new Uint8Array([2])]); + const input = [ + ref("cache://1") as unknown as Blob, + "plain", + ref("cache://2") as unknown as Blob, + ]; + const out = await resolveOutput(input, fakeResolver({ "cache://1": b1, "cache://2": b2 })); + expect(out[0]).toBe(b1); + expect(out[1]).toBe("plain"); + expect(out[2]).toBe(b2); + }); + + it("treats Blob, ArrayBuffer, typed arrays, Date as opaque leaves (not walked)", async () => { + const blob = new Blob([new Uint8Array([1])]); + const ab = new ArrayBuffer(8); + const u8 = new Uint8Array([5, 6, 7]); + const date = new Date(2026, 0, 1); + const resolver = vi.fn(); + const input = { blob, ab, u8, date }; + const out = await resolveOutput(input, resolver); + expect(out.blob).toBe(blob); + expect(out.ab).toBe(ab); + expect(out.u8).toBe(u8); + expect(out.date).toBe(date); + expect(resolver).not.toHaveBeenCalled(); + }); + + it("returns undefined for refs the resolver cannot resolve (best-effort)", async () => { + const input = { audio: ref("cache://missing") as unknown as Blob }; + const out = await resolveOutput(input, fakeResolver({})); + expect(out.audio).toBeUndefined(); + }); + + it("propagates resolver rejections (caller-controlled error policy)", async () => { + const failingResolver: CacheRefResolver = async () => { + throw new Error("backing down"); + }; + await expect( + resolveOutput({ x: ref("cache://k") as unknown as Blob }, failingResolver) + ).rejects.toThrow("backing down"); + }); + + it("resolves refs in deeply nested structures", async () => { + const b = new Blob([new Uint8Array([42])]); + const input = { + level1: { + level2: { + items: [{ payload: ref("cache://deep") as unknown as Blob }], + }, + }, + }; + const out = await resolveOutput(input, fakeResolver({ "cache://deep": b })); + expect(out.level1.level2.items[0].payload).toBe(b); + }); + + it("honors a concurrency bound: never exceeds the configured maximum in flight", async () => { + let inFlight = 0; + let observedMax = 0; + const resolver: CacheRefResolver = async (r) => { + inFlight++; + observedMax = Math.max(observedMax, inFlight); + await new Promise((res) => setTimeout(res, 5)); + inFlight--; + return new Blob([new Uint8Array([Number(r.$ref.slice(-1))])]); + }; + const refs = Array.from({ length: 8 }, (_, i) => ref(`cache://r${i}`)); + await resolveOutput(refs as unknown as Blob[], resolver, { concurrency: 2 }); + expect(observedMax).toBeLessThanOrEqual(2); + }); + + it("with concurrency undefined runs all resolutions in parallel", async () => { + let inFlight = 0; + let observedMax = 0; + const resolver: CacheRefResolver = async () => { + inFlight++; + observedMax = Math.max(observedMax, inFlight); + await new Promise((res) => setTimeout(res, 5)); + inFlight--; + return new Blob(); + }; + const refs = Array.from({ length: 6 }, (_, i) => ref(`cache://r${i}`)); + await resolveOutput(refs as unknown as Blob[], resolver); + expect(observedMax).toBe(6); + }); +}); diff --git a/packages/util/src/json-schema/JsonSchema.ts b/packages/util/src/json-schema/JsonSchema.ts index 4b5a1d4c0..7d5d7774a 100644 --- a/packages/util/src/json-schema/JsonSchema.ts +++ b/packages/util/src/json-schema/JsonSchema.ts @@ -24,7 +24,7 @@ export type JsonSchemaCustomProps = { "x-ui"?: unknown; "x-ui-iteration"?: boolean; // marks property as iteration-injected (hidden from parent, read-only in subgraph) "x-auto-generated"?: boolean; // marks a primary key column as auto-generated by storage backend - "x-stream"?: "append" | "replace" | "object"; // streaming mode for this port (absent = none/non-streaming) + "x-stream"?: "append" | "replace" | "object" | "binary"; // streaming mode for this port (absent = none/non-streaming) "x-structured-output"?: boolean; // marks a port as requiring structured output from the AI provider }; From 184128f15c7a709ae05d9dacc4029b110aedd202 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Jun 2026 05:36:45 +0000 Subject: [PATCH 2/2] feat(job-queue): in-process stream observability via JobHandle.onStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a same-process channel so a holder of a `JobHandle` can subscribe to a running job's stream events (text deltas, object deltas, binary-delta chunks, snapshot, finish, error, phase) instead of only the terminal result. Worker side ----------- - `IJobExecuteContext` gains an optional `emitStreamEvent(event)` method. - `JobQueueWorker` plumbs a per-job event emitter through into the execute context so a run-fn can call `ctx.emitStreamEvent(...)` to publish stream chunks as they're produced. Server side ----------- - `JobQueueServer.forwardToClients("handleJobStream", jobId, event)` fans the event to every attached client by direct method invocation — pure in-memory, no `postMessage`, no serialization, no worker thread. The channel is intentionally same-process only; storage-backed cross- process clients see state transitions through `subscribeToChanges` but receive no incremental stream events. Client side ----------- - `JobHandle.onStream(callback)` is exposed only when the client is server-attached (`this.server` set); callers branch on `typeof handle.onStream === "function"`. - Each listener invocation is wrapped in try/catch so one throwing subscriber does not abort delivery to the rest or break the dispatch. Tests ----- - `JobQueueStream.test.ts` proves end-to-end same-process delivery: a worker's `emitStreamEvent` calls reach every `JobHandle.onStream` listener in order. - `JobQueueStreamWorker.integration.test.ts` (+ its `.fixture.mjs`) validates the underlying Node `worker_threads` transfer mechanism the design relies on for any future cross-thread queue host: binary chunks emitted from a worker thread transfer (not copy) to the host per `WorkerServerBase.extractTransferables`. The docblock spells out that this is a Node-primitive validation, NOT a test of the current package's behavior — today's queue channel is entirely same-process and the test exists as a navigational marker for a future hosted-in-thread variant. --- packages/job-queue/src/job/Job.ts | 8 +- packages/job-queue/src/job/JobQueueClient.ts | 67 ++++++++++- .../src/job/JobQueueEventListeners.ts | 14 +++ packages/job-queue/src/job/JobQueueServer.ts | 12 ++ packages/job-queue/src/job/JobQueueWorker.ts | 14 +++ .../src/test/job-queue/JobQueueStream.test.ts | 111 ++++++++++++++++++ .../JobQueueStreamWorker.integration.test.ts | 93 +++++++++++++++ .../jobQueueStreamWorker.fixture.mjs | 53 +++++++++ 8 files changed, 370 insertions(+), 2 deletions(-) create mode 100644 packages/test/src/test/job-queue/JobQueueStream.test.ts create mode 100644 packages/test/src/test/job-queue/JobQueueStreamWorker.integration.test.ts create mode 100644 packages/test/src/test/job-queue/jobQueueStreamWorker.fixture.mjs diff --git a/packages/job-queue/src/job/Job.ts b/packages/job-queue/src/job/Job.ts index b251680af..02acde4df 100644 --- a/packages/job-queue/src/job/Job.ts +++ b/packages/job-queue/src/job/Job.ts @@ -6,7 +6,7 @@ import { JobStatus } from "../queue-storage/IQueueStorage"; import { JobError } from "./JobError"; -import type { JobProgressListener } from "./JobQueueEventListeners"; +import type { JobProgressListener, StreamEventLike } from "./JobQueueEventListeners"; export { JobStatus }; @@ -20,6 +20,12 @@ export interface IJobExecuteContext { message?: string, details?: Record | null ) => Promise; + /** + * OPTIONAL. Present only when the worker's transport can deliver stream + * events. Jobs MUST NOT retain references to chunk buffers after calling + * this (buffers may be transferred across a worker boundary and detached). + */ + emitStreamEvent?: (event: StreamEventLike) => void; } /** diff --git a/packages/job-queue/src/job/JobQueueClient.ts b/packages/job-queue/src/job/JobQueueClient.ts index 2ae48100e..2ae4b142e 100644 --- a/packages/job-queue/src/job/JobQueueClient.ts +++ b/packages/job-queue/src/job/JobQueueClient.ts @@ -26,6 +26,8 @@ import { JobQueueEventListeners, JobQueueEventParameters, JobQueueEvents, + JobStreamListener, + type StreamEventLike, } from "./JobQueueEventListeners"; import type { JobQueueServer } from "./JobQueueServer"; import { storageToClass } from "./JobStorageConverters"; @@ -38,6 +40,12 @@ export interface JobHandle { waitFor(): Promise; abort(): Promise; onProgress(callback: JobProgressListener): () => void; + /** + * OPTIONAL — present only when this handle's transport can deliver stream + * events (a same-process server-attached queue). Absent on storage-only + * backends; callers branch on `typeof handle.onStream === "function"`. + */ + onStream?(callback: JobStreamListener): () => void; } /** @@ -78,6 +86,11 @@ export class JobQueueClient { */ protected readonly jobProgressListeners: Map> = new Map(); + /** + * Map of job IDs to their stream listeners + */ + protected readonly jobStreamListeners: Map> = new Map(); + /** * Last known progress state for each job */ @@ -391,6 +404,27 @@ export class JobQueueClient { }; } + /** + * Subscribe to stream events for a specific job + */ + public onJobStream(jobId: unknown, listener: JobStreamListener): () => void { + if (!this.jobStreamListeners.has(jobId)) { + this.jobStreamListeners.set(jobId, new Set()); + } + const listeners = this.jobStreamListeners.get(jobId)!; + listeners.add(listener); + + return () => { + const listeners = this.jobStreamListeners.get(jobId); + if (listeners) { + listeners.delete(listener); + if (listeners.size === 0) { + this.jobStreamListeners.delete(jobId); + } + } + }; + } + // ======================================================================== // Event handling // ======================================================================== @@ -524,23 +558,54 @@ export class JobQueueClient { } } + /** + * Called by server when a job emits a stream event. Listener throws are + * isolated per-listener — one misbehaving subscriber does not interrupt + * delivery to the rest or abort the dispatch itself. + * @internal + */ + public handleJobStream(jobId: unknown, event: StreamEventLike): void { + this.events.emit("job_stream", this.queueName, jobId, event); + + const listeners = this.jobStreamListeners.get(jobId); + if (!listeners) return; + for (const listener of listeners) { + try { + listener(event); + } catch (err) { + getLogger().error("JobHandle.onStream listener threw", { + jobId, + error: err instanceof Error ? err.message : String(err), + }); + } + } + } + // ======================================================================== // Private helpers // ======================================================================== private createJobHandle(id: unknown): JobHandle { - return { + const handle: JobHandle = { id, waitFor: () => this.waitFor(id), abort: () => this.abort(id), onProgress: (callback: JobProgressListener) => this.onJobProgress(id, callback), }; + // Stream delivery requires a same-process server-attached transport — the + // same signal `connect()` uses. Storage-only backends omit `onStream`, so + // callers branch on `typeof handle.onStream === "function"`. + if (this.server) { + handle.onStream = (callback: JobStreamListener) => this.onJobStream(id, callback); + } + return handle; } private cleanupJob(jobId: unknown): void { this.activeJobPromises.delete(jobId); this.lastKnownProgress.delete(jobId); this.jobProgressListeners.delete(jobId); + this.jobStreamListeners.delete(jobId); } private handleStorageChange(change: QueueChangePayload): void { diff --git a/packages/job-queue/src/job/JobQueueEventListeners.ts b/packages/job-queue/src/job/JobQueueEventListeners.ts index ad3b7c457..18913f196 100644 --- a/packages/job-queue/src/job/JobQueueEventListeners.ts +++ b/packages/job-queue/src/job/JobQueueEventListeners.ts @@ -25,6 +25,7 @@ export type JobQueueEventListeners = { message: string, details: Record | null ) => void; + job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void; }; export type JobQueueEvents = keyof JobQueueEventListeners; @@ -46,3 +47,16 @@ export type JobProgressListener = ( message: string, details: Record | null ) => void; + +/** + * Minimal structural shape of a stream event crossing the job-queue boundary. + * + * `@workglow/job-queue` sits below `@workglow/task-graph` in the dependency + * graph, so it cannot import task-graph's `StreamEvent`. This structural type + * captures just what the queue plumbing needs; task-graph's `StreamEvent` is + * assignable to it, so real stream producers interoperate transparently. + */ +export type StreamEventLike = { type: string; port?: string; [k: string]: unknown }; + +/** Listener for cross-process stream events emitted by an executing job. */ +export type JobStreamListener = (event: StreamEventLike) => void; diff --git a/packages/job-queue/src/job/JobQueueServer.ts b/packages/job-queue/src/job/JobQueueServer.ts index 3431309dd..87f5a84e4 100644 --- a/packages/job-queue/src/job/JobQueueServer.ts +++ b/packages/job-queue/src/job/JobQueueServer.ts @@ -13,6 +13,7 @@ import type { JobStorageFormat, QueueChangePayload } from "../queue-storage/IQue import { JobStatus } from "../queue-storage/IQueueStorage"; import type { DeadLetter } from "./DeadLetter"; import { Job, JobClass } from "./Job"; +import type { StreamEventLike } from "./JobQueueEventListeners"; import { JobQueueClient } from "./JobQueueClient"; import { JobQueueWorker } from "./JobQueueWorker"; import { classToStorage, storageToClass } from "./JobStorageConverters"; @@ -49,6 +50,7 @@ export type JobQueueServerEventListeners = { message: string, details: Record | null ) => void; + job_stream: (queueName: string, jobId: unknown, event: StreamEventLike) => void; }; export type JobQueueServerEvents = keyof JobQueueServerEventListeners; @@ -484,6 +486,11 @@ export class JobQueueServer< this.forwardToClients("handleJobProgress", jobId, progress, message, details); }); + worker.on("job_stream", (jobId, event) => { + this.events.emit("job_stream", this.queueName, jobId, event); + this.forwardToClients("handleJobStream", jobId, event); + }); + return worker; } @@ -507,6 +514,11 @@ export class JobQueueServer< message: string, details: Record | null ): void; + protected forwardToClients( + method: "handleJobStream", + jobId: unknown, + event: StreamEventLike + ): void; protected forwardToClients(method: string, ...args: unknown[]): void { for (const client of this.clients) { const fn = (client as any)[method]; diff --git a/packages/job-queue/src/job/JobQueueWorker.ts b/packages/job-queue/src/job/JobQueueWorker.ts index 71946da26..8f97e945d 100644 --- a/packages/job-queue/src/job/JobQueueWorker.ts +++ b/packages/job-queue/src/job/JobQueueWorker.ts @@ -31,6 +31,7 @@ import { RetryableJobError, } from "./JobError"; import { withJobErrorDiagnostics } from "./JobErrorDiagnostics"; +import type { StreamEventLike } from "./JobQueueEventListeners"; import { classToStorage, storageToClass } from "./JobStorageConverters"; /** @@ -56,6 +57,7 @@ export type JobQueueWorkerEventListeners = { message: string, details: Record | null ) => void; + job_stream: (jobId: unknown, event: StreamEventLike) => void; worker_start: () => void; worker_stop: () => void; }; @@ -814,6 +816,7 @@ export class JobQueueWorker< return await job.execute(job.input, { signal, updateProgress: this.updateProgress.bind(this, job.id), + emitStreamEvent: (event) => this.emitStreamEvent(job.id, event), }); } @@ -835,6 +838,17 @@ export class JobQueueWorker< this.events.emit("job_progress", jobId, progress, message, details); } + /** + * Emit a cross-process stream event for a job. + * + * Mirrors {@link updateProgress}: stream events are delivered in-memory via + * the `job_stream` event and forwarded by an attached `JobQueueServer` to + * subscribed clients. Storage is not touched. + */ + protected emitStreamEvent(jobId: unknown, event: StreamEventLike): void { + this.events.emit("job_stream", jobId, event); + } + /** Internal — resolve the active claim for a job id, throw if missing. */ private getClaim(jobId: unknown): IClaim> | undefined { return this.activeClaims.get(jobId); diff --git a/packages/test/src/test/job-queue/JobQueueStream.test.ts b/packages/test/src/test/job-queue/JobQueueStream.test.ts new file mode 100644 index 000000000..9523d94f5 --- /dev/null +++ b/packages/test/src/test/job-queue/JobQueueStream.test.ts @@ -0,0 +1,111 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import type { IJobExecuteContext, StreamEventLike } from "@workglow/job-queue"; +import { + InMemoryQueueStorage, + Job, + JobQueueClient, + JobQueueServer, + wrapQueueStorage, +} from "@workglow/job-queue"; +import { uuid4 } from "@workglow/util"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +interface SInput { + readonly [key: string]: unknown; +} +interface SOutput { + readonly ok: true; + readonly [key: string]: unknown; +} + +/** + * A job that emits a few stream events during execution via the OPTIONAL + * `emitStreamEvent` context hook, then returns a result. Two ordered + * `binary-delta` chunks followed by a `finish` exercise both binary payload + * delivery and ordering across the same-process server-attached channel. + */ +class StreamEmittingJob extends Job { + public override async execute(_input: SInput, context: IJobExecuteContext): Promise { + context.emitStreamEvent?.({ + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([1, 2]), + }); + context.emitStreamEvent?.({ + type: "binary-delta", + port: "bytes", + binaryDelta: new Uint8Array([3]), + }); + context.emitStreamEvent?.({ type: "finish", data: {} }); + return { ok: true }; + } +} + +// Same-process server-attached harness, mirroring genericJobQueueTests.ts: +// InMemory queue storage + JobQueueServer + JobQueueClient, with the client +// attached to the server (`client.attach(server)`) so the client's `this.server` +// is set and `JobHandle.onStream` is present. These same-process queue tests run +// unconditionally in the repo (see InMemoryJobQueue.test.ts), so this suite is +// not gated behind any RUN_QUEUE_TESTS flag. +describe("job-queue stream delivery (same-process)", () => { + let server: JobQueueServer; + let client: JobQueueClient; + let storage: InMemoryQueueStorage; + let queueName: string; + + beforeEach(async () => { + queueName = `test-stream-${uuid4()}`; + storage = new InMemoryQueueStorage(queueName); + await storage.migrate(); + + const { messageQueue, jobStore } = wrapQueueStorage(storage); + server = new JobQueueServer(StreamEmittingJob, { + messageQueue, + jobStore, + queueName, + pollIntervalMs: 1, + stopTimeoutMs: 0, + }); + client = new JobQueueClient({ messageQueue, jobStore, queueName }); + // Attach for same-process optimization → sets client.server → enables onStream. + client.attach(server); + }); + + afterEach(async () => { + if (server) await server.stop(); + if (storage) await storage.deleteAll(); + }); + + it("delivers stream events in order via handle.onStream", async () => { + await server.start(); + + const handle = await client.send({ taskType: "stream" }); + + // onStream is present only on a server-attached handle (capability gate). + expect(typeof handle.onStream).toBe("function"); + + const received: StreamEventLike[] = []; + const cleanup = handle.onStream!((event) => { + received.push(event); + }); + + const output = await handle.waitFor(); + cleanup(); + + expect(output).toEqual({ ok: true }); + + // Events arrived in emission order. + expect(received.map((e) => e.type)).toEqual(["binary-delta", "binary-delta", "finish"]); + + // Binary payloads preserved byte-for-byte across the channel. + const firstBytes = received[0].binaryDelta as Uint8Array; + const secondBytes = received[1].binaryDelta as Uint8Array; + expect(Array.from(firstBytes)).toEqual([1, 2]); + expect(Array.from(secondBytes)).toEqual([3]); + }); +}); diff --git a/packages/test/src/test/job-queue/JobQueueStreamWorker.integration.test.ts b/packages/test/src/test/job-queue/JobQueueStreamWorker.integration.test.ts new file mode 100644 index 000000000..5e5a26d9c --- /dev/null +++ b/packages/test/src/test/job-queue/JobQueueStreamWorker.integration.test.ts @@ -0,0 +1,93 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +import { Worker } from "node:worker_threads"; +import { afterEach, describe, expect, it } from "vitest"; + +/** + * Node primitive validation: structured-clone + transferable buffers across a + * `worker_threads` boundary. Navigational marker for a future cross-thread + * queue host — NOT a test of current `@workglow/job-queue` behavior. + * + * SCOPE — read before changing this test: today, `@workglow/job-queue`'s own + * stream channel (`IJobExecuteContext.emitStreamEvent` → worker `job_stream` + * event → `JobQueueServer.forwardToClients("handleJobStream", …)` → + * `JobQueueClient` → `JobHandle.onStream`) is entirely SAME-PROCESS: the + * `JobQueueWorker` runs in-process inside `JobQueueServer`, and + * `forwardToClients` invokes attached-client methods directly (no postMessage, + * no worker thread, no transferables). Cross-PROCESS coordination is handled + * by the message-queue storage layer via `IMessageQueue.subscribeToChanges` + * with serialized rows — also not a transferables path. There is no + * `WorkerManager`-hosted queue transport anywhere in the package. The actual + * same-process delivery path is proven by JobQueueStream.test.ts. + * + * This test therefore exercises the underlying Node primitive that a future + * `WorkerServer`-hosted queue would have to rely on: binary chunks emitted + * from a worker thread can be TRANSFERRED (not copied) to the host via + * `postMessage`, which is what `WorkerServerBase.extractTransferables` + * (packages/util/src/worker/WorkerServerBase.ts ~line 30) walks payloads to + * arrange. Note that `WorkerServerBase` currently applies that walk only in + * `postResult` (terminal complete message), not in `postStreamChunk` — so + * even on that boundary, incremental chunks are structure-cloned today; this + * test validates that the transfer semantics work for the binary-delta payload + * shape if anyone later wires them up. The worker emits two `binary-delta` + * events across the thread boundary (see jobQueueStreamWorker.fixture.mjs); + * the host receives the full byte sequence in order, and the worker's + * transferred views detach (`byteLength` becomes 0). Run under Node (vitest's + * default pool); bun's worker_threads does not detach transferred buffers, + * which is why this is an `.integration` test executed under the Node ABI per + * libs/.claude/CLAUDE.md. + */ +describe("worker_threads transfer mechanism — payload validation for a future cross-thread queue host (not current job-queue code)", () => { + let worker: Worker | undefined; + + afterEach(async () => { + if (worker) { + await worker.terminate(); + worker = undefined; + } + }); + + it("host receives the full byte sequence in order; worker buffers detach", async () => { + const fixtureUrl = new URL("./jobQueueStreamWorker.fixture.mjs", import.meta.url); + worker = new Worker(fixtureUrl); + + const received: Array<{ type: string; port?: string; binaryDelta?: Uint8Array }> = []; + + const done = await new Promise<{ firstByteLength: number; secondByteLength: number }>( + (resolve, reject) => { + worker!.on("error", reject); + worker!.on("message", (msg: Record) => { + // The host plays the role of `JobHandle.onStream` listener: collect + // every stream event the worker emits across the thread boundary. + if (msg.type === "binary-delta" || msg.type === "finish") { + received.push(msg as { type: string; port?: string; binaryDelta?: Uint8Array }); + } else if (msg.type === "done") { + resolve(msg as { firstByteLength: number; secondByteLength: number }); + } + }); + worker!.postMessage("start"); + } + ); + + // Events arrived in emission order across the thread boundary. + expect(received.map((e) => e.type)).toEqual(["binary-delta", "binary-delta", "finish"]); + + // Host received the full byte sequence in order across the two events. + const hostBytes = received + .filter((e) => e.type === "binary-delta") + .flatMap((e) => Array.from(e.binaryDelta as Uint8Array)); + expect(hostBytes).toEqual([1, 2, 3]); + + // Detachment: the worker transferred (did not copy) each chunk buffer, so + // its own views are now detached (byteLength === 0). This is the + // `WorkerServerBase.extractTransferables` behavior. Asserted under Node, + // which detaches transferred buffers per the structured-clone transfer + // semantics the design depends on. + expect(done.firstByteLength).toBe(0); + expect(done.secondByteLength).toBe(0); + }); +}); diff --git a/packages/test/src/test/job-queue/jobQueueStreamWorker.fixture.mjs b/packages/test/src/test/job-queue/jobQueueStreamWorker.fixture.mjs new file mode 100644 index 000000000..2b031a718 --- /dev/null +++ b/packages/test/src/test/job-queue/jobQueueStreamWorker.fixture.mjs @@ -0,0 +1,53 @@ +/** + * @license + * Copyright 2026 Steven Roussey + * SPDX-License-Identifier: Apache-2.0 + */ + +/** + * Worker-thread fixture for JobQueueStreamWorker.integration.test.ts. + * + * Simulates a job executing inside a real worker thread that emits two ordered + * `binary-delta` stream events. Each chunk is posted to the host with the + * chunk's underlying `ArrayBuffer` in the transfer list — the exact mechanism + * `WorkerServerBase.extractTransferables` (packages/util/src/worker/WorkerServerBase.ts) + * applies automatically when a TypedArray crosses a worker boundary. The + * transfer (rather than copy) detaches the worker's view of the buffer, which + * the host asserts via the reported `byteLength` values in the terminal "done" + * message (a stand-in for the job result the job would otherwise return). + * + * Plain `.mjs` (not `.ts`) so it can be launched directly by `worker_threads` + * under the Node runtime vitest uses, without a TypeScript transform step. + */ + +import { parentPort } from "node:worker_threads"; + +if (!parentPort) { + throw new Error("jobQueueStreamWorker.fixture.mjs must run as a worker thread"); +} + +parentPort.on("message", () => { + // Two ordered binary-delta chunks, then a finish — mirrors a job calling + // ctx.emitStreamEvent?.(...) during execution. + const first = new Uint8Array([1, 2]); + const second = new Uint8Array([3]); + + parentPort.postMessage( + { type: "binary-delta", port: "bytes", binaryDelta: first }, + [first.buffer] + ); + parentPort.postMessage( + { type: "binary-delta", port: "bytes", binaryDelta: second }, + [second.buffer] + ); + parentPort.postMessage({ type: "finish", data: {} }); + + // Report the worker-side byteLength of the retained chunk views AFTER they + // were transferred. A genuine transfer detaches the underlying buffer, so + // these are 0 under Node. (Stands in for the job result.) + parentPort.postMessage({ + type: "done", + firstByteLength: first.byteLength, + secondByteLength: second.byteLength, + }); +});