Add binary stream support to task-graph and job-queue#545
Open
sroussey wants to merge 2 commits into
Open
Conversation
Coverage Report
File CoverageNo changed files found. |
dac4185 to
afe3943
Compare
Spec 1 — binary-delta streaming framework
-----------------------------------------
Adds a `binary-delta` variant to `StreamEvent` (analogous to `text-delta` /
`object-delta`) plus an `x-stream: "binary"` annotation on output port
schemas, so a task can `executeStream` byte chunks the same way it streams
text or structured objects. New port helpers (`getBinaryPortId`,
`getBinaryPortFormat`, `getStreamingPorts`), a `materializeBinary`
assembler (Blob for `format: "blob"`/absent, ArrayBuffer for
`format: "binary"`), and a `getOutputStreamMode` adopter let downstream
code branch cleanly on binary mode without reaching for `any`.
StreamProcessor accumulates `binary-delta` chunks per port and merges
them into the enriched finish event so downstream dataflows see the
materialized payload (or, for explicit binary finish payloads, the
artifact wins per Spec 1's precedence rule).
StreamPump adds the graph-aware decision (`canStreamBinaryToCache`,
`anyConsumerNeedsMaterialized`) and the `pipeBinaryToCache` assembly
helper that turns a task's `binary-delta` events into an `AsyncIterable`
ready to drive a streaming cache sink.
`TaskOutputRepository` gains an optional `saveOutputStream` sink so
file-backed (or other stream-capable) caches can ingest bytes without
materializing the full payload; `supportsStreaming()` and the
`RunPrivateCacheRepo` wrapper forward the capability correctly.
Spec 2 — result-as-reference
----------------------------
Builds on Spec 1 to close the queue-row-bloat hole: when the cache backing
supports streaming, the runner pipes the binary bytes straight to the
cache and places a `CacheRef` placeholder in `Output` at the port slot.
Downstream `Output` consumers (and the queue row) see a small envelope
(`{ \$ref, size?, mime? }`) instead of the full payload, while the bytes
live in the cache for hydration on demand.
Pieces:
- `CacheRef` type + `isCacheRef` type guard (`cache/CacheRef.ts`).
- `resolveOutput` walker (`cache/resolveRef.ts`) — pure recursive walker
that hydrates refs through a caller-supplied resolver. Identity is
preserved when no descendant matches the optional filter; class
instances (`Error`, `URL`, custom classes) survive with prototype
intact; `Map`/`Set` are walked through so nested refs resolve; opaque
leaves are `Blob`/`ArrayBuffer`/`TypedArray`/`Date`/`RegExp`/`Promise`.
- `resolveJobOutput` queue-boundary bridge (`cache/resolveJobOutput.ts`)
accepting either a `CacheRefResolver` function or any object exposing
`getOutputByRef` (`TaskOutputRepository` shape).
- `IRunConfig.referenceThresholdBytes` (default 64 KiB; `0` forces ref
for every binary output).
- `TaskOutputRepository.saveOutputStream` now returns `Promise<CacheRef>`;
new `getOutputByRef` / `getOutputStreamByRef` readers complete the
contract.
- `CacheCoordinator.getBinaryRefSinksByPolicy` derives a per-port
`BinaryRefSink` map; `hydrateRefsBelowThreshold` rehydrates refs whose
committed size falls below the configured threshold (schema-restricted
to binary streaming ports so legitimate `{\$ref: string}` fields in
non-binary slots are not mistakenly hit against the cache).
- `StreamProcessor` routes `binary-delta` chunks to a `BinaryRefSink`
via a small `BinaryStreamRouter` producer-consumer pump.
- `TaskRunner` reads the threshold, builds sinks, threads them through
`StreamProcessor`, and rehydrates below-threshold refs in the post-run
pass — saveByPolicy then writes the small ref-bearing Output.
- StreamProcessor TEES when both an accumulator and a router exist for
a port (graph context where the cache can stream AND a downstream
edge needs materialized bytes): the emitted finish event carries the
materialized Blob/ArrayBuffer for edge consumers; `finalOutput`
carries the CacheRef so the queue/cache row stays small.
- `RunPrivateCacheRepo` forwards all three new optional methods,
mirroring the backing's true capability on the wrapper instance
(assigning `undefined` when the backing lacks them) so callers
probing `typeof === "function"` see the truth.
Tests cover binary-delta accumulation + explicit-finish-payload
precedence, port helpers, cache decision + assembly, runner pipe + force-ref +
threshold rehydrate, tee for the graph + materializing-consumer case,
saved-row size + cross-process serialization round-trip + dangling-ref
best-effort, and the walker / `resolveOutput` / `resolveJobOutput`
surface (class instances, Map/Set, sparse-ref filter, concurrency bound,
identity preservation).
Adds a same-process channel so a holder of a `JobHandle` can subscribe
to a running job's stream events (text deltas, object deltas,
binary-delta chunks, snapshot, finish, error, phase) instead of only
the terminal result.
Worker side
-----------
- `IJobExecuteContext` gains an optional `emitStreamEvent(event)` method.
- `JobQueueWorker` plumbs a per-job event emitter through into the
execute context so a run-fn can call `ctx.emitStreamEvent(...)` to
publish stream chunks as they're produced.
Server side
-----------
- `JobQueueServer.forwardToClients("handleJobStream", jobId, event)`
fans the event to every attached client by direct method invocation —
pure in-memory, no `postMessage`, no serialization, no worker thread.
The channel is intentionally same-process only; storage-backed cross-
process clients see state transitions through `subscribeToChanges`
but receive no incremental stream events.
Client side
-----------
- `JobHandle.onStream(callback)` is exposed only when the client is
server-attached (`this.server` set); callers branch on
`typeof handle.onStream === "function"`.
- Each listener invocation is wrapped in try/catch so one throwing
subscriber does not abort delivery to the rest or break the dispatch.
Tests
-----
- `JobQueueStream.test.ts` proves end-to-end same-process delivery: a
worker's `emitStreamEvent` calls reach every `JobHandle.onStream`
listener in order.
- `JobQueueStreamWorker.integration.test.ts` (+ its `.fixture.mjs`)
validates the underlying Node `worker_threads` transfer mechanism
the design relies on for any future cross-thread queue host: binary
chunks emitted from a worker thread transfer (not copy) to the host
per `WorkerServerBase.extractTransferables`. The docblock spells
out that this is a Node-primitive validation, NOT a test of the
current package's behavior — today's queue channel is entirely
same-process and the test exists as a navigational marker for a
future hosted-in-thread variant.
afe3943 to
184128f
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implements binary streaming support across the task-graph and job-queue packages, enabling efficient handling of large binary outputs (files, images, etc.) without materializing them into memory. Introduces a new
"binary"stream mode alongside existing"append","replace", and"object"modes, with intelligent accumulation decisions and cache-streaming optimization.Key Changes
Core Types & Helpers (
packages/task-graph/src/task/StreamTypes.ts)"binary"toStreamModetype unionStreamBinaryDeltatype for ordered byte chunks (Uint8Array)materializeBinary()helper to concatenate chunks intoBloborArrayBufferbased on schemaformatgetPortStreamMode()andgetStreamingPorts()to recognize binary portsgetBinaryPortId()to locate the first binary port in a schemaedgeNeedsAccumulation()to determine if a binary→non-binary edge requires materializationStreamPump Enhancements (
packages/task-graph/src/task-graph/StreamPump.ts)canStreamBinaryToCache()— Static decision method (unit-testable in isolation) that returnstruewhen:supportsStreaming() === true)pipeBinaryToCache()— Assemblesbinary-deltaevents into anAsyncIterable<Uint8Array>and pipes to cache'ssaveOutputStream(), returning{ promise, detach }for lifecycle managementtaskNeedsAccumulation()to skip accumulation whencanStreamBinaryToCache()returnstrue, enabling direct cache ingestion without bufferingStreamProcessor Binary Accumulation (
packages/task-graph/src/task/StreamProcessor.ts)accumulatedBinarymap to collectbinary-deltachunks during streamingfinishevent, materializes accumulated binary chunks intoBloborArrayBufferper schemaformatCache & Repository Updates
TaskOutputRepository— Added optionalsaveOutputStream()method for streaming sinks;supportsStreaming()reflects presence of this methodRunPrivateCacheRepo— ForwardssaveOutputStream()calls to backing repository with run-ID namespacingCacheCoordinator— AddedsaveByStream()method (stub for Spec 2 integration withTaskRunner)Job Queue Stream Support (
packages/job-queue/src/job/)JobQueueEventListeners— Addedjob_streamevent type andJobStreamListenercallbackJobQueueClient— AddedonJobStream()subscription method andjobStreamListenersmap;JobHandlenow has optionalonStream()method (present only on server-attached handles)JobQueueWorker— ForwardsemitStreamEvent()calls from job context to server'sjob_streameventJobQueueServer— Broadcastsjob_streamevents to attached clients viaforwardToClients()Job— AddedemitStreamEvent()context hook for jobs to emit stream events during executionTest Coverage
Binary Stream Tests (
packages/test/src/test/task-graph/)StreamBinaryPump.test.ts— Comprehensive suite covering:canStreamBinaryToCache()returnstrue/falsein isolationpipeBinaryToCache()feeds chunks to cache and resolves on stream endhttps://claude.ai/code/session_01EQqii18C8KWqix8fayyL7X