From 8c89a2ae0424496098cf0d6853f52ca88f474254 Mon Sep 17 00:00:00 2001 From: Toubat Date: Thu, 25 Jun 2026 15:47:28 -0700 Subject: [PATCH 1/2] fix(room_io): replay unplayed audio tail on false interruptions pause() cleared the entire native AudioSource queue, permanently dropping up to queueSizeMs of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so up to ~1s of agent speech was lost mid-sentence from both the live call and the recording. Keep a rolling window of recently pushed frames, capture the unplayed tail on pause(), and replay it on resume(), while discarding it on a real interruption (clearBuffer()). Also cap the default room output queue to 200ms to match Python. Co-authored-by: Cursor --- .changeset/false-interruption-audio-replay.md | 7 + agents/src/voice/room_io/_output.test.ts | 139 ++++++++++++++++++ agents/src/voice/room_io/_output.ts | 64 ++++++++ agents/src/voice/room_io/room_io.ts | 4 + 4 files changed, 214 insertions(+) create mode 100644 .changeset/false-interruption-audio-replay.md diff --git a/.changeset/false-interruption-audio-replay.md b/.changeset/false-interruption-audio-replay.md new file mode 100644 index 000000000..ec4e739da --- /dev/null +++ b/.changeset/false-interruption-audio-replay.md @@ -0,0 +1,7 @@ +--- +'@livekit/agents': patch +--- + +fix(room_io): stop dropping audio on false interruptions + +`ParticipantAudioOutput.pause()` cleared the entire native `AudioSource` queue, permanently discarding up to `queueSizeMs` (rtc-node default: 1000ms) of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so up to ~1s of agent speech was lost mid-sentence from both the live call and the observability recording. The output now keeps a rolling window of recently pushed frames, captures the unplayed tail on pause, and replays it on resume, while still discarding it on a real interruption (`clearBuffer()`). diff --git a/agents/src/voice/room_io/_output.test.ts b/agents/src/voice/room_io/_output.test.ts index 24b03ba8d..ecea3a7bb 100644 --- a/agents/src/voice/room_io/_output.test.ts +++ b/agents/src/voice/room_io/_output.test.ts @@ -167,6 +167,10 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { playbackFinishedCount: number; playbackFinishedFuture: Future; onPlaybackStarted: (createdAt: number) => void; + options: { queueSizeMs?: number }; + recentFrames: unknown[]; + recentFramesMs: number; + replayFrames: unknown[]; audioSource: { clearQueue: () => void; captureFrame: (frame: CaptureFrameArg) => Promise; @@ -187,6 +191,11 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { output.playbackFinishedCount = 0; output.playbackFinishedFuture = new Future(); output.onPlaybackStarted = vi.fn(); + // Object.create bypasses the constructor's field initializers; mirror them. + output.options = { queueSizeMs: 1000 }; + output.recentFrames = []; + output.recentFramesMs = 0; + output.replayFrames = []; output.audioSource = { clearQueue: vi.fn(), captureFrame: vi.fn(async () => {}) }; return output; }; @@ -220,3 +229,133 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { expect(output.pushedDuration).toBeGreaterThan(0); }); }); + +/** + * Regression tests for the false-interruption audio loss fix. + * + * Before the fix, pause() called clearQueue() which permanently dropped every + * frame already pushed to the native AudioSource queue (up to queueSizeMs). On a + * false interruption (pause then resume) the agent never replayed them, so up to + * ~1s of audio (rtc-node default queue) vanished from both the call and the + * recording. The output now keeps a rolling window of recently pushed frames, + * captures the unplayed tail on pause(), and replays it on the next captureFrame + * after resume() — while discarding it on a real interruption (clearBuffer()). + */ +describe('ParticipantAudioOutput false-interruption replay', () => { + const FRAME_MS = 20; + const SR = 48000; + const SPF = (SR * FRAME_MS) / 1000; + + type ReplayOutput = ParticipantAudioOutput & { + startedFuture: Future; + playbackEnabledFuture: Future; + interruptedFuture: Future; + firstFrameEmitted: boolean; + pushedDuration: number; + _capturing: boolean; + playbackSegmentsCount: number; + playbackFinishedCount: number; + playbackFinishedFuture: Future; + onPlaybackStarted: (createdAt: number) => void; + options: { queueSizeMs?: number }; + recentFrames: unknown[]; + recentFramesMs: number; + replayFrames: unknown[]; + audioSource: { + clearQueue: () => void; + captureFrame: (frame: CaptureFrameArg) => Promise; + queuedDuration: number; + }; + }; + + // Tag each frame by id in data[0] so captured ids reveal exact ordering, lost + // frames (missing id), and replayed frames (duplicate id). + const frameOf = (id: number): CaptureFrameArg => { + const data = new Int16Array(SPF); + data[0] = id; + return { samplesPerChannel: SPF, sampleRate: SR, data } as unknown as CaptureFrameArg; + }; + + const makeOutput = (queueSizeMs: number, captured: number[]): ReplayOutput => { + const output = Object.create(ParticipantAudioOutput.prototype) as ReplayOutput; + output.startedFuture = new Future(); + output.startedFuture.resolve(); + output.playbackEnabledFuture = new Future(); + output.playbackEnabledFuture.resolve(); + output.interruptedFuture = new Future(); + output.firstFrameEmitted = false; + output.pushedDuration = 0; + output._capturing = false; + output.playbackSegmentsCount = 0; + output.playbackFinishedCount = 0; + output.playbackFinishedFuture = new Future(); + output.onPlaybackStarted = vi.fn(); + // Object.create bypasses the constructor's field initializers; mirror them. + output.options = { queueSizeMs }; + output.recentFrames = []; + output.recentFramesMs = 0; + output.replayFrames = []; + output.audioSource = { + clearQueue: vi.fn(), + queuedDuration: 0, + captureFrame: vi.fn(async (frame: CaptureFrameArg) => { + captured.push((frame as unknown as { data: Int16Array }).data[0]!); + }), + }; + return output; + }; + + it('replays the unplayed tail on resume (false interruption) — zero loss', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + // 100ms == 5 frames still queued (unplayed) when the false interruption hits. + output.audioSource.queuedDuration = 100; + output.pause(); + output.resume(); + + await output.captureFrame(frameOf(10)); + + // initial 0..9, then the unplayed tail 5..9 replayed, then 10 — nothing lost. + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10]); + }); + + it('discards the unplayed tail on clearBuffer (real interruption) — no replay', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + output.audioSource.queuedDuration = 100; + output.pause(); + output.clearBuffer(); // real interruption: the user cut the agent off + output.resume(); + + await output.captureFrame(frameOf(10)); + + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); + + it('does not replay when nothing was queued at pause', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + output.audioSource.queuedDuration = 0; + output.pause(); + output.resume(); + + await output.captureFrame(frameOf(10)); + + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); +}); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 11dd8eb71..a30d71be8 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -385,6 +385,18 @@ export class ParticipantAudioOutput extends AudioOutput { /** Gate held closed while the output is paused; frame forwarding awaits it. */ private playbackEnabledFuture: Future = new Future(); + // Rolling window of the most recently pushed frames (covering ~queueSizeMs), + // used to recover the unplayed tail that clearQueue() drops on pause(). On a + // false interruption (pause -> resume) these are replayed so no audio is lost; + // on a real interruption (clearBuffer) they are discarded. + private recentFrames: AudioFrame[] = []; + private recentFramesMs: number = 0; + private replayFrames: AudioFrame[] = []; + + private static frameMs(frame: AudioFrame): number { + return (frame.samplesPerChannel / frame.sampleRate) * 1000; + } + constructor(room: Room, options: AudioOutputOptions) { super(options.sampleRate, undefined, { pause: true }); this.room = room; @@ -401,6 +413,19 @@ export class ParticipantAudioOutput extends AudioOutput { if (this.playbackEnabledFuture.done) { this.playbackEnabledFuture = new Future(); } + // Capture the unplayed tail (what clearQueue is about to drop) so a false + // interruption can replay it on resume instead of losing it forever. + const queuedMs = this.audioSource.queuedDuration; + if (queuedMs > 0 && this.recentFrames.length > 0) { + const tail: AudioFrame[] = []; + let acc = 0; + for (let i = this.recentFrames.length - 1; i >= 0 && acc < queuedMs; i--) { + const f = this.recentFrames[i]!; + tail.unshift(f); + acc += ParticipantAudioOutput.frameMs(f); + } + this.replayFrames = tail; + } // Drop already-buffered audio so playback stops promptly instead of draining the prebuffer. this.audioSource.clearQueue(); super.pause(); @@ -434,6 +459,18 @@ export class ParticipantAudioOutput extends AudioOutput { } } + // Replay the unplayed tail that pause() dropped, when the pause ended in a + // resume (false interruption). These frames were already counted in + // pushedDuration on their first push, so don't recount them. + if (this.replayFrames.length > 0) { + const replay = this.replayFrames; + this.replayFrames = []; + for (const rf of replay) { + await this.audioSource.captureFrame(rf); + this.trackRecentFrame(rf); + } + } + // Count the playback segment only after the pause/interrupt gate above. super.captureFrame // bumps playbackSegmentsCount; if a frame interrupted-while-paused bailed at the gate after // that bump, the count would strand ahead of playbackFinishedCount and the next @@ -448,6 +485,25 @@ export class ParticipantAudioOutput extends AudioOutput { // TODO(AJS-102): use frame.durationMs once available in rtc-node this.pushedDuration += frame.samplesPerChannel / frame.sampleRate; await this.audioSource.captureFrame(frame); + this.trackRecentFrame(frame); + } + + // Maintain a rolling window of recently pushed frames covering ~queueSizeMs, + // enough to recover the unplayed tail (<= queuedDuration) that pause() drops. + private trackRecentFrame(frame: AudioFrame): void { + // Keep a little headroom over queueSizeMs: queuedDuration can momentarily + // exceed the nominal queue size by up to one frame, and we must retain enough + // history to recover the entire unplayed tail. + const cap = (this.options.queueSizeMs ?? 1000) + 200; + this.recentFrames.push(frame); + this.recentFramesMs += ParticipantAudioOutput.frameMs(frame); + while ( + this.recentFrames.length > 1 && + this.recentFramesMs - ParticipantAudioOutput.frameMs(this.recentFrames[0]!) >= cap + ) { + const dropped = this.recentFrames.shift()!; + this.recentFramesMs -= ParticipantAudioOutput.frameMs(dropped); + } } private async waitForPlayoutTask(abortController: AbortController): Promise { @@ -483,6 +539,13 @@ export class ParticipantAudioOutput extends AudioOutput { this.pushedDuration = 0; this.firstFrameEmitted = false; + // Segment finished: drop the rolling window. On an interruption also drop any + // pending replay tail (the user chose to cut the agent off). + this.recentFrames = []; + this.recentFramesMs = 0; + if (interrupted) { + this.replayFrames = []; + } this.onPlaybackFinished({ playbackPosition: pushedDuration, @@ -522,6 +585,7 @@ export class ParticipantAudioOutput extends AudioOutput { } clearBuffer(): void { + this.replayFrames = []; // Signal interruption even if no frame has been pushed yet, so a gated captureFrame can bail. if (!this.interruptedFuture.done) { this.interruptedFuture.resolve(); diff --git a/agents/src/voice/room_io/room_io.ts b/agents/src/voice/room_io/room_io.ts index ed5315bf7..d5f1e40c6 100644 --- a/agents/src/voice/room_io/room_io.ts +++ b/agents/src/voice/room_io/room_io.ts @@ -137,6 +137,10 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = { syncTranscription: true, audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }), jsonFormat: false, + // Match Python (_output.py: queue_size_ms=200). The rtc-node AudioSource + // default is 1000ms; a smaller prebuffer keeps the playout queue close to + // realtime so interruptions take effect promptly. + queueSizeMs: 200, }; export class RoomIO { From 908767da1ec349b238716e795d91e1ab5a342aa5 Mon Sep 17 00:00:00 2001 From: Toubat Date: Fri, 26 Jun 2026 14:12:36 -0700 Subject: [PATCH 2/2] fix(warm-transfer): capture job context for post-merge caller-room cleanup The post-merge ParticipantDisconnected listener runs from a native rtc-node FFI callback whose AsyncLocalStorage context is pinned to FfiClient-singleton creation, not the job context, so getJobContext() read an empty/stale store and threw as an unhandled rejection, leaving the caller room undeleted on hangup. Capture the JobContext eagerly in onEnter() and use jobCtx.deleteRoom() in the late handler (which also forwards the job's API credentials instead of relying on environment variables). Closes #1895. Co-authored-by: Cursor --- .changeset/warm-transfer-job-context.md | 7 +++++++ agents/src/beta/workflows/warm_transfer.ts | 23 ++++++++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 .changeset/warm-transfer-job-context.md diff --git a/.changeset/warm-transfer-job-context.md b/.changeset/warm-transfer-job-context.md new file mode 100644 index 000000000..241be3d16 --- /dev/null +++ b/.changeset/warm-transfer-job-context.md @@ -0,0 +1,7 @@ +--- +'@livekit/agents': patch +--- + +fix(warm-transfer): capture job context for post-merge caller-room cleanup + +`WarmTransferTask`'s post-merge `RoomEvent.ParticipantDisconnected` listener called `getJobContext()` to build a `RoomServiceClient` and delete the caller room. That handler runs from a native rtc-node FFI callback whose `AsyncLocalStorage` context is pinned to `FfiClient`-singleton creation, not to the job's context — so `getJobContext()` read an empty (or stale) store and threw, surfacing as an unhandled promise rejection and leaving the 2-party SIP room undeleted when a participant hung up after the bridge. The task now captures the `JobContext` eagerly in `onEnter()` (while the live context is available) and uses `jobCtx.deleteRoom()` in the late handler, which also passes the job's API credentials instead of relying on environment variables. diff --git a/agents/src/beta/workflows/warm_transfer.ts b/agents/src/beta/workflows/warm_transfer.ts index b3d99dda0..9837fed69 100644 --- a/agents/src/beta/workflows/warm_transfer.ts +++ b/agents/src/beta/workflows/warm_transfer.ts @@ -6,7 +6,7 @@ import { type DisconnectReason, type ParticipantKind, Room, RoomEvent } from '@l import { AccessToken, RoomServiceClient, SipClient, type VideoGrant } from 'livekit-server-sdk'; import { z } from 'zod'; import type { LLMModels, STTModelString, TTSModelString } from '../../inference/index.js'; -import { getJobContext } from '../../job.js'; +import { type JobContext, getJobContext } from '../../job.js'; import type { ChatContext, Instructions, @@ -83,6 +83,11 @@ export interface WarmTransferTaskOptions { export class WarmTransferTask extends AgentTask { private _callerRoom: Room | null = null; private _humanAgentRoom: Room | null = null; + // Captured while the task runs inside the live job context. The post-merge + // caller-room cleanup listener fires from a native rtc-node FFI callback whose + // AsyncLocalStorage context is pinned to FfiClient-singleton creation, so + // getJobContext() is unreliable there; we capture the context eagerly instead. + private _jobCtx: JobContext | null = null; private _humanAgentSession: AgentSession | null = null; // Assigned in the constructor; a field initializer here would run after the // resolver is captured and clobber it (ES2022 class-field semantics). @@ -229,6 +234,7 @@ export class WarmTransferTask extends AgentTask { async onEnter(): Promise { const jobCtx = getJobContext(); + this._jobCtx = jobCtx; this._callerRoom = jobCtx.room; if (this._holdAudio !== null) { @@ -338,8 +344,16 @@ export class WarmTransferTask extends AgentTask { this._callerRoom.off(RoomEvent.ParticipantDisconnected, this.onCallerParticipantDisconnected); - const rooms = new RoomServiceClient(getJobContext().info.url); - void rooms.deleteRoom(this._callerRoom.name).catch((error) => { + // Use the eagerly-captured job context: this callback runs from a native + // rtc-node FFI event, where getJobContext() reads an empty/stale + // AsyncLocalStorage store and would throw as an unhandled rejection. + const jobCtx = this._jobCtx; + if (!jobCtx) { + this._logger.warn('no job context captured, cannot delete caller room'); + return; + } + const callerRoomName = this._callerRoom.name; + void jobCtx.deleteRoom(callerRoomName).catch((error) => { this._logger.warn({ error }, 'failed to delete caller room'); }); }; @@ -518,7 +532,8 @@ export class WarmTransferTask extends AgentTask { 'moving human agent to caller room', ); - const rooms = new RoomServiceClient(getJobContext().info.url); + const info = (this._jobCtx ?? getJobContext()).info; + const rooms = new RoomServiceClient(info.url, info.apiKey, info.apiSecret); await rooms.moveParticipant( this._humanAgentRoom.name, this._humanAgentIdentity,