Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,19 @@ export class AsyncIterableQueue<T> implements AsyncIterableIterator<T> {
this.#queue.put(AsyncIterableQueue.CLOSE_SENTINEL);
}

async next(): Promise<IteratorResult<T>> {
async next(options: { signal?: AbortSignal } = {}): Promise<IteratorResult<T>> {
if (this.#closed && this.#queue.items.length === 0) {
return { value: undefined, done: true };
}
const item = await this.#queue.get();
let item: T | typeof AsyncIterableQueue.CLOSE_SENTINEL;
try {
item = await this.#queue.get(options);
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
return { value: undefined, done: true };
}
throw error;
}
if (item === AsyncIterableQueue.CLOSE_SENTINEL && this.#closed) {
return { value: undefined, done: true };
}
Expand Down Expand Up @@ -1402,6 +1410,35 @@ export async function waitForAbort(signal: AbortSignal) {
return await abortFuture.await;
}

/** @internal */
export function combineAbortSignals(signals: AbortSignal[]): AbortSignal {
const controller = new AbortController();
const cleanupCallbacks: (() => void)[] = [];

const abort = () => {
for (const cleanup of cleanupCallbacks) {
cleanup();
}
cleanupCallbacks.length = 0;

if (!controller.signal.aborted) {
controller.abort();
}
};

for (const signal of signals) {
if (signal.aborted) {
abort();
break;
}

signal.addEventListener('abort', abort, { once: true });
cleanupCallbacks.push(() => signal.removeEventListener('abort', abort));
}

return controller.signal;
}
Comment on lines +1414 to +1440

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 combineAbortSignals duplicates existing combineSignals with different API

The new combineAbortSignals(signals: AbortSignal[]) at agents/src/utils.ts:1414 overlaps significantly with the existing combineSignals(a, b) at agents/src/utils.ts:1455. Key differences: (1) the new function takes an array vs two params, (2) the new function cleans up listeners on other signals when one fires, (3) the new function does NOT propagate the abort reason (controller.abort() with no argument), while the old one does (c.abort((s as any).reason)). Both leak listeners if neither signal ever fires, which is acceptable in practice since signals are eventually aborted. Consider consolidating these in a future PR.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


export async function rejectOnAbort(signal: AbortSignal): Promise<never> {
if (signal.aborted) throw signal.reason;
const abortFuture = new Future<never>();
Expand Down
7 changes: 3 additions & 4 deletions plugins/assemblyai/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
AudioByteStream,
Future,
Task,
combineAbortSignals,
createTimedString,
delay,
log,
Expand Down Expand Up @@ -384,14 +385,12 @@ export class SpeechStream extends stt.SpeechStream {
const samplesPerBuffer = Math.floor((this.#opts.sampleRate * this.#opts.bufferSizeMs) / 1000);
const audioStream = new AudioByteStream(this.#opts.sampleRate, 1, samplesPerBuffer);

const abortPromise = waitForAbort(this.abortSignal);
const sessionAbort = waitForAbort(sessionController.signal);
const inputSignal = combineAbortSignals([this.abortSignal, sessionController.signal]);

try {
while (!this.closed) {
const result = await Promise.race([this.input.next(), abortPromise, sessionAbort]);
const result = await this.input.next({ signal: inputSignal });

if (result === undefined) return; // aborted
if (result.done) break;

const data = result.value;
Expand Down
5 changes: 1 addition & 4 deletions plugins/cartesia/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,9 @@ export class SpeechStream extends stt.SpeechStream {

let hasEnded = false;
const iterator = this.input[Symbol.asyncIterator]();
const abortPromise = waitForAbort(abortSignal);

while (true) {
const result = await Promise.race([iterator.next(), abortPromise]);

if (result === undefined) return; // aborted
const result = await iterator.next({ signal: abortSignal });

if (result.done) {
hasEnded = true;
Expand Down
7 changes: 1 addition & 6 deletions plugins/deepgram/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,15 +357,10 @@ export class SpeechStream extends stt.SpeechStream {
samples100Ms,
);

// waitForAbort internally sets up an abort listener on the abort signal
// we need to put it outside loop to avoid constant re-registration of the listener
const abortPromise = waitForAbort(this.abortSignal);

try {
while (!this.closed) {
const result = await Promise.race([this.input.next(), abortPromise]);
const result = await this.input.next({ signal: this.abortSignal });

if (result === undefined) return; // aborted
if (result.done) {
break;
}
Expand Down
11 changes: 3 additions & 8 deletions plugins/elevenlabs/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
Future,
Task,
calculateAudioDurationSeconds,
combineAbortSignals,
createTimedString,
delay,
intervalForRetry,
Expand Down Expand Up @@ -553,18 +554,12 @@ export class SpeechStream extends stt.SpeechStream {
const sendTask = Task.from(async (controller) => {
const samples50Ms = Math.floor(this.#opts.sampleRate / 20);
const audioByteStream = new AudioByteStream(this.#opts.sampleRate, 1, samples50Ms);
const abortPromise = waitForAbort(controller.signal);
const streamAbortPromise = waitForAbort(this.abortSignal);
const inputSignal = combineAbortSignals([controller.signal, this.abortSignal]);
let hasEnded = false;

try {
while (!this.closed) {
const result = await Promise.race([
this.input.next(),
abortPromise,
streamAbortPromise,
]);
if (result === undefined) return;
const result = await this.input.next({ signal: inputSignal });
if (result.done) break;

const data = result.value;
Expand Down
5 changes: 1 addition & 4 deletions plugins/inworld/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,10 @@ export class SpeechStream extends stt.SpeechStream {
samples100Ms,
);

const abortPromise = waitForAbort(this.abortSignal);

try {
while (!this.closed) {
const result = await Promise.race([this.input.next(), abortPromise]);
const result = await this.input.next({ signal: this.abortSignal });

if (result === undefined) return;
if (result.done) break;

const data = result.value;
Expand Down
7 changes: 3 additions & 4 deletions plugins/sarvam/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
AudioEnergyFilter,
Future,
Task,
combineAbortSignals,
log,
mergeFrames,
normalizeLanguage,
Expand Down Expand Up @@ -672,13 +673,11 @@ export class SpeechStream extends stt.SpeechStream {
const sendTask = async () => {
const samples50Ms = Math.floor(SAMPLE_RATE / 20); // 50ms chunks
const stream = new AudioByteStream(SAMPLE_RATE, NUM_CHANNELS, samples50Ms);
const abortPromise = waitForAbort(this.abortSignal);
const sessionAbort = waitForAbort(sessionController.signal);
const inputSignal = combineAbortSignals([this.abortSignal, sessionController.signal]);

try {
while (!this.closed) {
const result = await Promise.race([this.input.next(), abortPromise, sessionAbort]);
if (result === undefined) return; // aborted
const result = await this.input.next({ signal: inputSignal });
if (result.done) break;

const data = result.value;
Expand Down
5 changes: 2 additions & 3 deletions plugins/soniox/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,9 @@ export class SpeechStream extends stt.SpeechStream {
}

async #sendAudio(ws: WebSocket): Promise<void> {
const abortPromise = waitForAbort(this.abortSignal);
while (!this.closed) {
const result = await Promise.race([this.input.next(), abortPromise]);
if (result === undefined || result.done) {
const result = await this.input.next({ signal: this.abortSignal });
if (result.done) {
break;
}

Expand Down
4 changes: 1 addition & 3 deletions plugins/xai/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,11 @@ export class SpeechStream extends stt.SpeechStream {

const samples50ms = Math.floor(this.#opts.sampleRate / 20);
const stream = new AudioByteStream(this.#opts.sampleRate, 1, samples50ms);
const abortPromise = waitForAbort(this.abortSignal);

try {
while (!this.closed) {
const result = await Promise.race([this.input.next(), abortPromise]);
const result = await this.input.next({ signal: this.abortSignal });

if (result === undefined) return;
if (result.done) break;

const data = result.value;
Expand Down