diff --git a/agents/src/utils.ts b/agents/src/utils.ts index c23238f42..1c6afc1d5 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -336,11 +336,19 @@ export class AsyncIterableQueue implements AsyncIterableIterator { this.#queue.put(AsyncIterableQueue.CLOSE_SENTINEL); } - async next(): Promise> { + async next(options: { signal?: AbortSignal } = {}): Promise> { if (this.#closed && this.#queue.items.length === 0) { return { value: undefined, done: true }; } - const item = await this.#queue.get(); + let item: T | typeof AsyncIterableQueue.CLOSE_SENTINEL; + try { + item = await this.#queue.get(options); + } catch (error) { + if (error instanceof Error && error.name === 'AbortError') { + return { value: undefined, done: true }; + } + throw error; + } if (item === AsyncIterableQueue.CLOSE_SENTINEL && this.#closed) { return { value: undefined, done: true }; } @@ -1402,6 +1410,35 @@ export async function waitForAbort(signal: AbortSignal) { return await abortFuture.await; } +/** @internal */ +export function combineAbortSignals(signals: AbortSignal[]): AbortSignal { + const controller = new AbortController(); + const cleanupCallbacks: (() => void)[] = []; + + const abort = () => { + for (const cleanup of cleanupCallbacks) { + cleanup(); + } + cleanupCallbacks.length = 0; + + if (!controller.signal.aborted) { + controller.abort(); + } + }; + + for (const signal of signals) { + if (signal.aborted) { + abort(); + break; + } + + signal.addEventListener('abort', abort, { once: true }); + cleanupCallbacks.push(() => signal.removeEventListener('abort', abort)); + } + + return controller.signal; +} + export async function rejectOnAbort(signal: AbortSignal): Promise { if (signal.aborted) throw signal.reason; const abortFuture = new Future(); diff --git a/plugins/assemblyai/src/stt.ts b/plugins/assemblyai/src/stt.ts index b504ac59f..3bd057347 100644 --- a/plugins/assemblyai/src/stt.ts +++ b/plugins/assemblyai/src/stt.ts @@ -8,6 +8,7 @@ import { AudioByteStream, Future, Task, + combineAbortSignals, createTimedString, delay, log, @@ -384,14 +385,12 @@ export class SpeechStream extends stt.SpeechStream { const samplesPerBuffer = Math.floor((this.#opts.sampleRate * this.#opts.bufferSizeMs) / 1000); const audioStream = new AudioByteStream(this.#opts.sampleRate, 1, samplesPerBuffer); - const abortPromise = waitForAbort(this.abortSignal); - const sessionAbort = waitForAbort(sessionController.signal); + const inputSignal = combineAbortSignals([this.abortSignal, sessionController.signal]); try { while (!this.closed) { - const result = await Promise.race([this.input.next(), abortPromise, sessionAbort]); + const result = await this.input.next({ signal: inputSignal }); - if (result === undefined) return; // aborted if (result.done) break; const data = result.value; diff --git a/plugins/cartesia/src/stt.ts b/plugins/cartesia/src/stt.ts index 8bff98de8..3ef9baff6 100644 --- a/plugins/cartesia/src/stt.ts +++ b/plugins/cartesia/src/stt.ts @@ -344,12 +344,9 @@ export class SpeechStream extends stt.SpeechStream { let hasEnded = false; const iterator = this.input[Symbol.asyncIterator](); - const abortPromise = waitForAbort(abortSignal); while (true) { - const result = await Promise.race([iterator.next(), abortPromise]); - - if (result === undefined) return; // aborted + const result = await iterator.next({ signal: abortSignal }); if (result.done) { hasEnded = true; diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 3f2650b56..d4a7a8210 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -357,15 +357,10 @@ export class SpeechStream extends stt.SpeechStream { samples100Ms, ); - // waitForAbort internally sets up an abort listener on the abort signal - // we need to put it outside loop to avoid constant re-registration of the listener - const abortPromise = waitForAbort(this.abortSignal); - try { while (!this.closed) { - const result = await Promise.race([this.input.next(), abortPromise]); + const result = await this.input.next({ signal: this.abortSignal }); - if (result === undefined) return; // aborted if (result.done) { break; } diff --git a/plugins/elevenlabs/src/stt.ts b/plugins/elevenlabs/src/stt.ts index 25f0ecb53..e84653359 100644 --- a/plugins/elevenlabs/src/stt.ts +++ b/plugins/elevenlabs/src/stt.ts @@ -13,6 +13,7 @@ import { Future, Task, calculateAudioDurationSeconds, + combineAbortSignals, createTimedString, delay, intervalForRetry, @@ -553,18 +554,12 @@ export class SpeechStream extends stt.SpeechStream { const sendTask = Task.from(async (controller) => { const samples50Ms = Math.floor(this.#opts.sampleRate / 20); const audioByteStream = new AudioByteStream(this.#opts.sampleRate, 1, samples50Ms); - const abortPromise = waitForAbort(controller.signal); - const streamAbortPromise = waitForAbort(this.abortSignal); + const inputSignal = combineAbortSignals([controller.signal, this.abortSignal]); let hasEnded = false; try { while (!this.closed) { - const result = await Promise.race([ - this.input.next(), - abortPromise, - streamAbortPromise, - ]); - if (result === undefined) return; + const result = await this.input.next({ signal: inputSignal }); if (result.done) break; const data = result.value; diff --git a/plugins/inworld/src/stt.ts b/plugins/inworld/src/stt.ts index 8e4166165..86c6e9df7 100644 --- a/plugins/inworld/src/stt.ts +++ b/plugins/inworld/src/stt.ts @@ -337,13 +337,10 @@ export class SpeechStream extends stt.SpeechStream { samples100Ms, ); - const abortPromise = waitForAbort(this.abortSignal); - try { while (!this.closed) { - const result = await Promise.race([this.input.next(), abortPromise]); + const result = await this.input.next({ signal: this.abortSignal }); - if (result === undefined) return; if (result.done) break; const data = result.value; diff --git a/plugins/sarvam/src/stt.ts b/plugins/sarvam/src/stt.ts index 48a6d7abc..af6c71bfd 100644 --- a/plugins/sarvam/src/stt.ts +++ b/plugins/sarvam/src/stt.ts @@ -8,6 +8,7 @@ import { AudioEnergyFilter, Future, Task, + combineAbortSignals, log, mergeFrames, normalizeLanguage, @@ -672,13 +673,11 @@ export class SpeechStream extends stt.SpeechStream { const sendTask = async () => { const samples50Ms = Math.floor(SAMPLE_RATE / 20); // 50ms chunks const stream = new AudioByteStream(SAMPLE_RATE, NUM_CHANNELS, samples50Ms); - const abortPromise = waitForAbort(this.abortSignal); - const sessionAbort = waitForAbort(sessionController.signal); + const inputSignal = combineAbortSignals([this.abortSignal, sessionController.signal]); try { while (!this.closed) { - const result = await Promise.race([this.input.next(), abortPromise, sessionAbort]); - if (result === undefined) return; // aborted + const result = await this.input.next({ signal: inputSignal }); if (result.done) break; const data = result.value; diff --git a/plugins/soniox/src/stt.ts b/plugins/soniox/src/stt.ts index b26c9cf76..e4e0907a9 100644 --- a/plugins/soniox/src/stt.ts +++ b/plugins/soniox/src/stt.ts @@ -297,10 +297,9 @@ export class SpeechStream extends stt.SpeechStream { } async #sendAudio(ws: WebSocket): Promise { - const abortPromise = waitForAbort(this.abortSignal); while (!this.closed) { - const result = await Promise.race([this.input.next(), abortPromise]); - if (result === undefined || result.done) { + const result = await this.input.next({ signal: this.abortSignal }); + if (result.done) { break; } diff --git a/plugins/xai/src/stt.ts b/plugins/xai/src/stt.ts index 8c47477c8..5e43d4500 100644 --- a/plugins/xai/src/stt.ts +++ b/plugins/xai/src/stt.ts @@ -255,13 +255,11 @@ export class SpeechStream extends stt.SpeechStream { const samples50ms = Math.floor(this.#opts.sampleRate / 20); const stream = new AudioByteStream(this.#opts.sampleRate, 1, samples50ms); - const abortPromise = waitForAbort(this.abortSignal); try { while (!this.closed) { - const result = await Promise.race([this.input.next(), abortPromise]); + const result = await this.input.next({ signal: this.abortSignal }); - if (result === undefined) return; if (result.done) break; const data = result.value;