Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 7 additions & 22 deletions agents/src/voice/room_io/_input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import {
type AudioFrame,
AudioStream,
FrameProcessor,
type FrameProcessor,
type NoiseCancellationOptions,
RemoteParticipant,
type RemoteTrack,
type RemoteTrackPublication,
type Room,
RoomEvent,
TrackSource,
isFrameProcessor,
} from '@livekit/rtc-node';
import type { ReadableStream } from 'node:stream/web';
import { log } from '../../log.js';
Expand Down Expand Up @@ -44,15 +45,14 @@ export class ParticipantAudioInputStream extends AudioInput {
this.room = room;
this.sampleRate = sampleRate;
this.numChannels = numChannels;
if (noiseCancellation instanceof FrameProcessor) {
if (isFrameProcessor<FrameProcessor<AudioFrame>>(noiseCancellation)) {
this.frameProcessor = noiseCancellation;
} else {
this.noiseCancellation = noiseCancellation;
}

this.room.on(RoomEvent.TrackSubscribed, this.onTrackSubscribed);
this.room.on(RoomEvent.TrackUnpublished, this.onTrackUnpublished);
this.room.on(RoomEvent.TokenRefreshed, this.onTokenRefreshed);
}

setParticipant(participant: RemoteParticipant | string | null) {
Expand Down Expand Up @@ -153,40 +153,25 @@ export class ParticipantAudioInputStream extends AudioInput {
outputRate: this.sampleRate,
}),
);
this.frameProcessor?.onStreamInfoUpdated({
participantIdentity: participant.identity,
roomName: this.room.name!,
publicationSid: publication.sid!,
});
this.frameProcessor?.onCredentialsUpdated({
token: this.room.token!,
url: this.room.serverUrl!,
});
return true;
};

private onTokenRefreshed = () => {
if (this.room.token && this.room.serverUrl) {
this.frameProcessor?.onCredentialsUpdated({
token: this.room.token,
url: this.room.serverUrl,
});
}
};

private createStream(track: RemoteTrack): ReadableStream<AudioFrame> {
return new AudioStream(track, {
sampleRate: this.sampleRate,
numChannels: this.numChannels,
noiseCancellation: this.frameProcessor || this.noiseCancellation,
// Don't let the AudioStream close the processor when the track switches —
// this input stream owns the processor across track changes and closes it
// itself in close().
autoCloseNoiseCancellation: false,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 autoCloseNoiseCancellation is a new option that must be supported by the rtc-node SDK

The autoCloseNoiseCancellation: false option at _input.ts:167 is passed to the AudioStream constructor. This option doesn't appear anywhere else in this repository — it's a new SDK feature. The catalog pins @livekit/rtc-node to ^0.13.27. If this option was only added in a newer minor/patch version not yet published, the option would be silently ignored (since JS doesn't validate extra constructor options), and the AudioStream would still auto-close the processor on track switches, leading to a use-after-close when the next track tries to use the same processor. Worth confirming the SDK version supports this flag.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

// TODO(AJS-269): resolve compatibility issue with node-sdk to remove the forced type casting
}) as unknown as ReadableStream<AudioFrame>;
Comment on lines 159 to 169

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Removal of onStreamInfoUpdated/onCredentialsUpdated depends on rtc-node handling them internally

The old code explicitly called this.frameProcessor?.onStreamInfoUpdated(...) and this.frameProcessor?.onCredentialsUpdated(...) when a track was subscribed (_input.ts:156-164 in old code), and refreshed credentials on TokenRefreshed events. The new code removes all of these, relying on the AudioStream constructor in @livekit/rtc-node to handle stream info and credential propagation internally when it receives the frameProcessor via the noiseCancellation option. This is a correctness assumption that can't be verified from this repo alone — it depends on the AudioStream implementation in @livekit/rtc-node@^0.13.27 actually performing these calls. If the SDK version doesn't handle this internally, the frame processor (e.g., server-side noise cancellation) would not receive the participant/room/credential context it needs to function.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}

override async close() {
this.room.off(RoomEvent.TrackSubscribed, this.onTrackSubscribed);
this.room.off(RoomEvent.TrackUnpublished, this.onTrackUnpublished);
this.room.off(RoomEvent.TokenRefreshed, this.onTokenRefreshed);
this.closeStream();
await super.close();

Expand Down
Loading