diff --git a/agents/src/voice/room_io/_input.ts b/agents/src/voice/room_io/_input.ts index 6ede89e2f..0d7233796 100644 --- a/agents/src/voice/room_io/_input.ts +++ b/agents/src/voice/room_io/_input.ts @@ -4,7 +4,7 @@ import { type AudioFrame, AudioStream, - FrameProcessor, + type FrameProcessor, type NoiseCancellationOptions, RemoteParticipant, type RemoteTrack, @@ -12,6 +12,7 @@ import { type Room, RoomEvent, TrackSource, + isFrameProcessor, } from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; import { log } from '../../log.js'; @@ -44,7 +45,7 @@ export class ParticipantAudioInputStream extends AudioInput { this.room = room; this.sampleRate = sampleRate; this.numChannels = numChannels; - if (noiseCancellation instanceof FrameProcessor) { + if (isFrameProcessor>(noiseCancellation)) { this.frameProcessor = noiseCancellation; } else { this.noiseCancellation = noiseCancellation; @@ -52,7 +53,6 @@ export class ParticipantAudioInputStream extends AudioInput { this.room.on(RoomEvent.TrackSubscribed, this.onTrackSubscribed); this.room.on(RoomEvent.TrackUnpublished, this.onTrackUnpublished); - this.room.on(RoomEvent.TokenRefreshed, this.onTokenRefreshed); } setParticipant(participant: RemoteParticipant | string | null) { @@ -153,32 +153,18 @@ export class ParticipantAudioInputStream extends AudioInput { outputRate: this.sampleRate, }), ); - this.frameProcessor?.onStreamInfoUpdated({ - participantIdentity: participant.identity, - roomName: this.room.name!, - publicationSid: publication.sid!, - }); - this.frameProcessor?.onCredentialsUpdated({ - token: this.room.token!, - url: this.room.serverUrl!, - }); return true; }; - private onTokenRefreshed = () => { - if (this.room.token && this.room.serverUrl) { - this.frameProcessor?.onCredentialsUpdated({ - token: this.room.token, - url: this.room.serverUrl, - }); - } - }; - private createStream(track: RemoteTrack): ReadableStream { return new AudioStream(track, { sampleRate: this.sampleRate, numChannels: this.numChannels, noiseCancellation: this.frameProcessor || this.noiseCancellation, + // Don't let the AudioStream close the processor when the track switches — + // this input stream owns the processor across track changes and closes it + // itself in close(). + autoCloseNoiseCancellation: false, // TODO(AJS-269): resolve compatibility issue with node-sdk to remove the forced type casting }) as unknown as ReadableStream; } @@ -186,7 +172,6 @@ export class ParticipantAudioInputStream extends AudioInput { override async close() { this.room.off(RoomEvent.TrackSubscribed, this.onTrackSubscribed); this.room.off(RoomEvent.TrackUnpublished, this.onTrackUnpublished); - this.room.off(RoomEvent.TokenRefreshed, this.onTokenRefreshed); this.closeStream(); await super.close();