Skip to content

Commit a6439b4

Browse files
committed
squash
1 parent f52a71b commit a6439b4

7 files changed

Lines changed: 166 additions & 31 deletions

File tree

apps/engine/src/lib/ndjson.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,25 @@ export async function* parseNdjson<T = unknown>(text: string): AsyncIterable<T>
1111
/** Serialize an AsyncIterable as a streaming NDJSON ReadableStream. */
1212
export function toNdjsonStream(iter: AsyncIterable<unknown>): ReadableStream<Uint8Array> {
1313
const enc = new TextEncoder()
14+
const iterator = iter[Symbol.asyncIterator]()
1415
return new ReadableStream({
1516
async start(controller) {
1617
try {
17-
for await (const item of iter) {
18-
controller.enqueue(enc.encode(JSON.stringify(item) + '\n'))
18+
while (true) {
19+
const result = await iterator.next()
20+
if (result.done) break
21+
controller.enqueue(enc.encode(JSON.stringify(result.value) + '\n'))
1922
}
2023
controller.close()
2124
} catch (err) {
2225
controller.error(err)
26+
} finally {
27+
iterator.return?.()
2328
}
2429
},
30+
cancel() {
31+
iterator.return?.()
32+
},
2533
})
2634
}
2735

apps/engine/src/lib/remote-engine.ts

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -139,22 +139,38 @@ export function createRemoteEngine(engineUrl: string): Engine {
139139
params: { header: { 'x-source': JSON.stringify(source) } },
140140
})
141141
if (!response.ok) throw new Error(`source_discover failed: ${response.status}`)
142-
yield* parseNdjsonStream<DiscoverOutput>(response.body!)
142+
try {
143+
yield* parseNdjsonStream<DiscoverOutput>(response.body!)
144+
} finally {
145+
await response.body?.cancel().catch(() => {})
146+
}
143147
},
144148

145149
async *pipeline_check(pipeline: PipelineConfig): AsyncIterable<CheckOutput> {
146150
const res = await post('/pipeline_check', pipeline)
147-
yield* parseNdjsonStream<CheckOutput>(res.body!)
151+
try {
152+
yield* parseNdjsonStream<CheckOutput>(res.body!)
153+
} finally {
154+
await res.body?.cancel().catch(() => {})
155+
}
148156
},
149157

150158
async *pipeline_setup(pipeline: PipelineConfig): AsyncIterable<SetupOutput> {
151159
const res = await post('/pipeline_setup', pipeline)
152-
yield* parseNdjsonStream<SetupOutput>(res.body!)
160+
try {
161+
yield* parseNdjsonStream<SetupOutput>(res.body!)
162+
} finally {
163+
await res.body?.cancel().catch(() => {})
164+
}
153165
},
154166

155167
async *pipeline_teardown(pipeline: PipelineConfig): AsyncIterable<TeardownOutput> {
156168
const res = await post('/pipeline_teardown', pipeline)
157-
yield* parseNdjsonStream<TeardownOutput>(res.body!)
169+
try {
170+
yield* parseNdjsonStream<TeardownOutput>(res.body!)
171+
} finally {
172+
await res.body?.cancel().catch(() => {})
173+
}
158174
},
159175

160176
async *pipeline_read(
@@ -164,15 +180,23 @@ export function createRemoteEngine(engineUrl: string): Engine {
164180
): AsyncIterable<Message> {
165181
const body = input ? toNdjsonStream(input) : undefined
166182
const res = await post('/pipeline_read', pipeline, opts, body)
167-
yield* parseNdjsonStream<Message>(res.body!)
183+
try {
184+
yield* parseNdjsonStream<Message>(res.body!)
185+
} finally {
186+
await res.body?.cancel().catch(() => {})
187+
}
168188
},
169189

170190
async *pipeline_write(
171191
pipeline: PipelineConfig,
172192
messages: AsyncIterable<Message>
173193
): AsyncIterable<DestinationOutput> {
174194
const res = await post('/pipeline_write', pipeline, undefined, toNdjsonStream(messages))
175-
yield* parseNdjsonStream<DestinationOutput>(res.body!)
195+
try {
196+
yield* parseNdjsonStream<DestinationOutput>(res.body!)
197+
} finally {
198+
await res.body?.cancel().catch(() => {})
199+
}
176200
},
177201

178202
async *pipeline_sync(
@@ -182,7 +206,11 @@ export function createRemoteEngine(engineUrl: string): Engine {
182206
): AsyncIterable<SyncOutput> {
183207
const body = input ? toNdjsonStream(input) : undefined
184208
const res = await post('/pipeline_sync', pipeline, opts, body)
185-
yield* parseNdjsonStream<SyncOutput>(res.body!)
209+
try {
210+
yield* parseNdjsonStream<SyncOutput>(res.body!)
211+
} finally {
212+
await res.body?.cancel().catch(() => {})
213+
}
186214
},
187215
}
188216
}

apps/service/src/temporal/activities/_shared.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,12 @@ export async function drainMessages(
6262
): Promise<{
6363
errors: RunResult['errors']
6464
state: SourceState
65-
records: Message[]
6665
sourceConfig?: Record<string, unknown>
6766
destConfig?: Record<string, unknown>
6867
eof?: EofPayload
6968
}> {
7069
const errors: RunResult['errors'] = []
7170
let state: SourceState = initialState ?? { streams: {}, global: {} }
72-
const records: Message[] = []
7371
let sourceConfig: Record<string, unknown> | undefined
7472
let destConfig: Record<string, unknown> | undefined
7573
let eof: EofPayload | undefined
@@ -91,13 +89,11 @@ export async function drainMessages(
9189
errors.push(error)
9290
} else if (message.type === 'source_state') {
9391
state = mergeStateMessage(state, message)
94-
} else if (message.type === 'record') {
95-
records.push(message)
9692
}
9793
}
9894
if (count % 50 === 0) heartbeat({ messages: count })
9995
}
10096
if (count % 50 !== 0) heartbeat({ messages: count })
10197

102-
return { errors, state, records, sourceConfig, destConfig, eof }
98+
return { errors, state, sourceConfig, destConfig, eof }
10399
}

apps/service/src/temporal/workflows/pipeline-workflow.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ export async function pipelineWorkflow(
150150

151151
const result = await pipelineSync(pipelineId, {
152152
state: sourceState,
153-
state_limit: 100,
154-
time_limit: 10,
153+
state_limit: 1000,
154+
time_limit: 30,
155155
})
156156
operationCount++
157157
sourceState = result.state

packages/protocol/src/async-iterable-utils.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,60 @@ describe('split', () => {
127127
expect(evenResult).toEqual([])
128128
expect(oddResult).toEqual([1, 3, 5])
129129
})
130+
131+
it('propagates return() to source when consumer breaks early', async () => {
132+
let sourceReturned = false
133+
async function* source() {
134+
try {
135+
yield 1
136+
yield 2
137+
yield 3
138+
} finally {
139+
sourceReturned = true
140+
}
141+
}
142+
const isEven = (n: number): n is number => n % 2 === 0
143+
const [, odds] = split(source(), isEven)
144+
const it = odds[Symbol.asyncIterator]()
145+
await it.next()
146+
await it.return!()
147+
expect(sourceReturned).toBe(true)
148+
})
149+
150+
it('propagates return() to source from the matches branch', async () => {
151+
let sourceReturned = false
152+
async function* source() {
153+
try {
154+
yield 2
155+
yield 4
156+
yield 6
157+
} finally {
158+
sourceReturned = true
159+
}
160+
}
161+
const isEven = (n: number): n is number => n % 2 === 0
162+
const [evens] = split(source(), isEven)
163+
const it = evens[Symbol.asyncIterator]()
164+
await it.next()
165+
await it.return!()
166+
expect(sourceReturned).toBe(true)
167+
})
168+
169+
it('closes sibling branch when one branch returns early', async () => {
170+
async function* source() {
171+
yield 1
172+
yield 2
173+
yield 3
174+
yield 4
175+
}
176+
const isEven = (n: number): n is number => n % 2 === 0
177+
const [evens, odds] = split(source(), isEven)
178+
const oddIt = odds[Symbol.asyncIterator]()
179+
await oddIt.next()
180+
await oddIt.return!()
181+
const remaining = await collect(evens)
182+
expect(remaining).toEqual([])
183+
})
130184
})
131185

132186
describe('map', () => {

packages/protocol/src/async-iterable-utils.ts

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Pure primitives — no external deps, no engine-specific imports.
33

44
/**
5-
* Async push/pull channel. No array buffering — uses linked promise pairs.
5+
* Async push/pull channel with unbounded buffer when push outpaces pull.
66
*
77
* **Error handling:** The channel itself never throws — it is a passive data
88
* structure. Producers call `push()` and `close()`; neither can fail.
@@ -12,10 +12,12 @@
1212
export function channel<T>(): AsyncIterable<T> & {
1313
push(value: T): void
1414
close(): void
15+
onReturn?: () => void | Promise<unknown>
1516
} {
1617
let resolve: ((result: IteratorResult<T>) => void) | null = null
1718
let done = false
1819
const pending: T[] = [] // only used when push() is called before next()
20+
let onReturn: (() => void | Promise<unknown>) | undefined
1921

2022
const iter: AsyncIterableIterator<T> = {
2123
[Symbol.asyncIterator]() {
@@ -30,9 +32,20 @@ export function channel<T>(): AsyncIterable<T> & {
3032
resolve = r
3133
})
3234
},
35+
async return() {
36+
done = true
37+
pending.length = 0
38+
if (resolve) {
39+
const r = resolve
40+
resolve = null
41+
r({ value: undefined as any, done: true })
42+
}
43+
await onReturn?.()
44+
return { value: undefined as any, done: true }
45+
},
3346
}
3447

35-
return Object.assign(iter, {
48+
const result = Object.assign(iter, {
3649
push(value: T) {
3750
if (done) return
3851
if (resolve) {
@@ -52,6 +65,16 @@ export function channel<T>(): AsyncIterable<T> & {
5265
}
5366
},
5467
})
68+
Object.defineProperty(result, 'onReturn', {
69+
set(fn: (() => void | Promise<unknown>) | undefined) {
70+
onReturn = fn
71+
},
72+
get() {
73+
return onReturn
74+
},
75+
configurable: true,
76+
})
77+
return result as typeof result & { onReturn?: () => void | Promise<unknown> }
5578
}
5679

5780
/**
@@ -87,13 +110,19 @@ export async function* merge<T>(
87110
enqueue(i)
88111
}
89112

90-
while (pending.size > 0) {
91-
const { index, result } = await Promise.race(pending.values())
92-
if (result.done) {
93-
pending.delete(index)
94-
} else {
95-
yield result.value
96-
enqueue(index)
113+
try {
114+
while (pending.size > 0) {
115+
const { index, result } = await Promise.race(pending.values())
116+
if (result.done) {
117+
pending.delete(index)
118+
} else {
119+
yield result.value
120+
enqueue(index)
121+
}
122+
}
123+
} finally {
124+
for (const it of iterators) {
125+
it.return?.()
97126
}
98127
}
99128
}
@@ -115,16 +144,29 @@ export function split<T, U extends T>(
115144
iterable: AsyncIterable<T>,
116145
predicate: (item: T) => item is U
117146
): [AsyncIterable<U>, AsyncIterable<Exclude<T, U>>] {
147+
const sourceIterator = iterable[Symbol.asyncIterator]()
118148
const matches = channel<U>()
119149
const rest = channel<Exclude<T, U>>()
120150

151+
let aborted = false
152+
const abort = () => {
153+
if (aborted) return
154+
aborted = true
155+
matches.close()
156+
rest.close()
157+
return sourceIterator.return?.()
158+
}
159+
matches.onReturn = abort
160+
rest.onReturn = abort
121161
;(async () => {
122162
try {
123-
for await (const item of iterable) {
124-
if (predicate(item)) {
125-
matches.push(item)
163+
while (true) {
164+
const result = await sourceIterator.next()
165+
if (result.done) break
166+
if (predicate(result.value)) {
167+
matches.push(result.value)
126168
} else {
127-
rest.push(item as Exclude<T, U>)
169+
rest.push(result.value as Exclude<T, U>)
128170
}
129171
}
130172
} finally {

packages/ts-cli/src/ndjson.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,28 @@ export function ndjsonResponse<T>(
2323
onError?: (err: unknown) => T
2424
): Response {
2525
const encoder = new TextEncoder()
26+
const iterator = iterable[Symbol.asyncIterator]()
2627

2728
const stream = new ReadableStream({
2829
async start(controller) {
2930
try {
30-
for await (const item of iterable) {
31-
controller.enqueue(encoder.encode(JSON.stringify(item) + '\n'))
31+
while (true) {
32+
const result = await iterator.next()
33+
if (result.done) break
34+
controller.enqueue(encoder.encode(JSON.stringify(result.value) + '\n'))
3235
}
3336
} catch (err) {
3437
if (onError) {
3538
controller.enqueue(encoder.encode(JSON.stringify(onError(err)) + '\n'))
3639
}
3740
} finally {
41+
iterator.return?.()
3842
controller.close()
3943
}
4044
},
45+
cancel() {
46+
iterator.return?.()
47+
},
4148
})
4249

4350
return new Response(stream, {

0 commit comments

Comments
 (0)