-
Notifications
You must be signed in to change notification settings - Fork 312
fix(warm-transfer): capture job context for post-merge caller-room cleanup #1896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| --- | ||
| '@livekit/agents': patch | ||
| --- | ||
|
|
||
| fix(room_io): stop dropping audio on false interruptions | ||
|
|
||
| `ParticipantAudioOutput.pause()` cleared the entire native `AudioSource` queue, permanently discarding up to `queueSizeMs` (rtc-node default: 1000ms) of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so up to ~1s of agent speech was lost mid-sentence from both the live call and the observability recording. The output now keeps a rolling window of recently pushed frames, captures the unplayed tail on pause, and replays it on resume, while still discarding it on a real interruption (`clearBuffer()`). |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| --- | ||
| '@livekit/agents': patch | ||
| --- | ||
|
|
||
| fix(warm-transfer): capture job context for post-merge caller-room cleanup | ||
|
|
||
| `WarmTransferTask`'s post-merge `RoomEvent.ParticipantDisconnected` listener called `getJobContext()` to build a `RoomServiceClient` and delete the caller room. That handler runs from a native rtc-node FFI callback whose `AsyncLocalStorage` context is pinned to `FfiClient`-singleton creation, not to the job's context — so `getJobContext()` read an empty (or stale) store and threw, surfacing as an unhandled promise rejection and leaving the 2-party SIP room undeleted when a participant hung up after the bridge. The task now captures the `JobContext` eagerly in `onEnter()` (while the live context is available) and uses `jobCtx.deleteRoom()` in the late handler, which also passes the job's API credentials instead of relying on environment variables. |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -385,6 +385,18 @@ export class ParticipantAudioOutput extends AudioOutput { | |||||||||
| /** Gate held closed while the output is paused; frame forwarding awaits it. */ | ||||||||||
| private playbackEnabledFuture: Future<void> = new Future(); | ||||||||||
|
|
||||||||||
| // Rolling window of the most recently pushed frames (covering ~queueSizeMs), | ||||||||||
| // used to recover the unplayed tail that clearQueue() drops on pause(). On a | ||||||||||
| // false interruption (pause -> resume) these are replayed so no audio is lost; | ||||||||||
| // on a real interruption (clearBuffer) they are discarded. | ||||||||||
| private recentFrames: AudioFrame[] = []; | ||||||||||
| private recentFramesMs: number = 0; | ||||||||||
| private replayFrames: AudioFrame[] = []; | ||||||||||
|
|
||||||||||
| private static frameMs(frame: AudioFrame): number { | ||||||||||
| return (frame.samplesPerChannel / frame.sampleRate) * 1000; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| constructor(room: Room, options: AudioOutputOptions) { | ||||||||||
| super(options.sampleRate, undefined, { pause: true }); | ||||||||||
| this.room = room; | ||||||||||
|
|
@@ -401,6 +413,19 @@ export class ParticipantAudioOutput extends AudioOutput { | |||||||||
| if (this.playbackEnabledFuture.done) { | ||||||||||
| this.playbackEnabledFuture = new Future(); | ||||||||||
| } | ||||||||||
| // Capture the unplayed tail (what clearQueue is about to drop) so a false | ||||||||||
| // interruption can replay it on resume instead of losing it forever. | ||||||||||
| const queuedMs = this.audioSource.queuedDuration; | ||||||||||
| if (queuedMs > 0 && this.recentFrames.length > 0) { | ||||||||||
| const tail: AudioFrame[] = []; | ||||||||||
| let acc = 0; | ||||||||||
| for (let i = this.recentFrames.length - 1; i >= 0 && acc < queuedMs; i--) { | ||||||||||
| const f = this.recentFrames[i]!; | ||||||||||
| tail.unshift(f); | ||||||||||
| acc += ParticipantAudioOutput.frameMs(f); | ||||||||||
| } | ||||||||||
| this.replayFrames = tail; | ||||||||||
| } | ||||||||||
| // Drop already-buffered audio so playback stops promptly instead of draining the prebuffer. | ||||||||||
| this.audioSource.clearQueue(); | ||||||||||
| super.pause(); | ||||||||||
|
|
@@ -434,6 +459,18 @@ export class ParticipantAudioOutput extends AudioOutput { | |||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Replay the unplayed tail that pause() dropped, when the pause ended in a | ||||||||||
| // resume (false interruption). These frames were already counted in | ||||||||||
| // pushedDuration on their first push, so don't recount them. | ||||||||||
| if (this.replayFrames.length > 0) { | ||||||||||
| const replay = this.replayFrames; | ||||||||||
| this.replayFrames = []; | ||||||||||
| for (const rf of replay) { | ||||||||||
| await this.audioSource.captureFrame(rf); | ||||||||||
| this.trackRecentFrame(rf); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Count the playback segment only after the pause/interrupt gate above. super.captureFrame | ||||||||||
| // bumps playbackSegmentsCount; if a frame interrupted-while-paused bailed at the gate after | ||||||||||
| // that bump, the count would strand ahead of playbackFinishedCount and the next | ||||||||||
|
|
@@ -448,6 +485,25 @@ export class ParticipantAudioOutput extends AudioOutput { | |||||||||
| // TODO(AJS-102): use frame.durationMs once available in rtc-node | ||||||||||
| this.pushedDuration += frame.samplesPerChannel / frame.sampleRate; | ||||||||||
| await this.audioSource.captureFrame(frame); | ||||||||||
| this.trackRecentFrame(frame); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| // Maintain a rolling window of recently pushed frames covering ~queueSizeMs, | ||||||||||
| // enough to recover the unplayed tail (<= queuedDuration) that pause() drops. | ||||||||||
| private trackRecentFrame(frame: AudioFrame): void { | ||||||||||
| // Keep a little headroom over queueSizeMs: queuedDuration can momentarily | ||||||||||
| // exceed the nominal queue size by up to one frame, and we must retain enough | ||||||||||
| // history to recover the entire unplayed tail. | ||||||||||
| const cap = (this.options.queueSizeMs ?? 1000) + 200; | ||||||||||
| this.recentFrames.push(frame); | ||||||||||
| this.recentFramesMs += ParticipantAudioOutput.frameMs(frame); | ||||||||||
| while ( | ||||||||||
| this.recentFrames.length > 1 && | ||||||||||
| this.recentFramesMs - ParticipantAudioOutput.frameMs(this.recentFrames[0]!) >= cap | ||||||||||
| ) { | ||||||||||
| const dropped = this.recentFrames.shift()!; | ||||||||||
| this.recentFramesMs -= ParticipantAudioOutput.frameMs(dropped); | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| private async waitForPlayoutTask(abortController: AbortController): Promise<void> { | ||||||||||
|
|
@@ -483,6 +539,13 @@ export class ParticipantAudioOutput extends AudioOutput { | |||||||||
|
|
||||||||||
| this.pushedDuration = 0; | ||||||||||
| this.firstFrameEmitted = false; | ||||||||||
| // Segment finished: drop the rolling window. On an interruption also drop any | ||||||||||
| // pending replay tail (the user chose to cut the agent off). | ||||||||||
| this.recentFrames = []; | ||||||||||
| this.recentFramesMs = 0; | ||||||||||
| if (interrupted) { | ||||||||||
| this.replayFrames = []; | ||||||||||
| } | ||||||||||
|
Comment on lines
+546
to
+548
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Unplayed audio tail from a previous response can leak into the next response Saved replay frames are not discarded when a segment finishes without interruption ( Impact: A listener may briefly hear the tail of the previous agent response at the beginning of the next response, causing a noticeable audio glitch. Trigger: false interruption during playout after TTS flush
The conditional clear at
Suggested change
Was this helpful? React with 👍 or 👎 to provide feedback. |
||||||||||
|
|
||||||||||
| this.onPlaybackFinished({ | ||||||||||
| playbackPosition: pushedDuration, | ||||||||||
|
|
@@ -522,6 +585,7 @@ export class ParticipantAudioOutput extends AudioOutput { | |||||||||
| } | ||||||||||
|
|
||||||||||
| clearBuffer(): void { | ||||||||||
| this.replayFrames = []; | ||||||||||
| // Signal interruption even if no frame has been pushed yet, so a gated captureFrame can bail. | ||||||||||
| if (!this.interruptedFuture.done) { | ||||||||||
| this.interruptedFuture.resolve(); | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -137,6 +137,10 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = { | |
| syncTranscription: true, | ||
| audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }), | ||
| jsonFormat: false, | ||
| // Match Python (_output.py: queue_size_ms=200). The rtc-node AudioSource | ||
| // default is 1000ms; a smaller prebuffer keeps the playout queue close to | ||
| // realtime so interruptions take effect promptly. | ||
| queueSizeMs: 200, | ||
| }; | ||
|
Comment on lines
+140
to
144
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 Behavioral change: default audio queue reduced from 1000ms to 200ms The PR changes Was this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| export class RoomIO { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a false interruption happens after the last TTS frame has already been captured,
pause()storesreplayFramesand clears the native queue, andforwardAudiostill callsaudioOutput.flush()when the TTS stream ends (agents/src/voice/generation.ts:907). This non-interrupted finish clears only the rolling window, so the saved tail survives and the next reply's firstcaptureFrame()will replay stale audio from the previous utterance before the new audio instead of resuming it in the original segment.Useful? React with 👍 / 👎.