diff --git a/.changeset/false-interruption-audio-replay.md b/.changeset/false-interruption-audio-replay.md new file mode 100644 index 000000000..ec4e739da --- /dev/null +++ b/.changeset/false-interruption-audio-replay.md @@ -0,0 +1,7 @@ +--- +'@livekit/agents': patch +--- + +fix(room_io): stop dropping audio on false interruptions + +`ParticipantAudioOutput.pause()` cleared the entire native `AudioSource` queue, permanently discarding up to `queueSizeMs` (rtc-node default: 1000ms) of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so up to ~1s of agent speech was lost mid-sentence from both the live call and the observability recording. The output now keeps a rolling window of recently pushed frames, captures the unplayed tail on pause, and replays it on resume, while still discarding it on a real interruption (`clearBuffer()`). diff --git a/.changeset/warm-transfer-job-context.md b/.changeset/warm-transfer-job-context.md new file mode 100644 index 000000000..241be3d16 --- /dev/null +++ b/.changeset/warm-transfer-job-context.md @@ -0,0 +1,7 @@ +--- +'@livekit/agents': patch +--- + +fix(warm-transfer): capture job context for post-merge caller-room cleanup + +`WarmTransferTask`'s post-merge `RoomEvent.ParticipantDisconnected` listener called `getJobContext()` to build a `RoomServiceClient` and delete the caller room. That handler runs from a native rtc-node FFI callback whose `AsyncLocalStorage` context is pinned to `FfiClient`-singleton creation, not to the job's context — so `getJobContext()` read an empty (or stale) store and threw, surfacing as an unhandled promise rejection and leaving the 2-party SIP room undeleted when a participant hung up after the bridge. The task now captures the `JobContext` eagerly in `onEnter()` (while the live context is available) and uses `jobCtx.deleteRoom()` in the late handler, which also passes the job's API credentials instead of relying on environment variables. diff --git a/agents/src/beta/workflows/warm_transfer.ts b/agents/src/beta/workflows/warm_transfer.ts index b3d99dda0..9837fed69 100644 --- a/agents/src/beta/workflows/warm_transfer.ts +++ b/agents/src/beta/workflows/warm_transfer.ts @@ -6,7 +6,7 @@ import { type DisconnectReason, type ParticipantKind, Room, RoomEvent } from '@l import { AccessToken, RoomServiceClient, SipClient, type VideoGrant } from 'livekit-server-sdk'; import { z } from 'zod'; import type { LLMModels, STTModelString, TTSModelString } from '../../inference/index.js'; -import { getJobContext } from '../../job.js'; +import { type JobContext, getJobContext } from '../../job.js'; import type { ChatContext, Instructions, @@ -83,6 +83,11 @@ export interface WarmTransferTaskOptions { export class WarmTransferTask extends AgentTask { private _callerRoom: Room | null = null; private _humanAgentRoom: Room | null = null; + // Captured while the task runs inside the live job context. The post-merge + // caller-room cleanup listener fires from a native rtc-node FFI callback whose + // AsyncLocalStorage context is pinned to FfiClient-singleton creation, so + // getJobContext() is unreliable there; we capture the context eagerly instead. + private _jobCtx: JobContext | null = null; private _humanAgentSession: AgentSession | null = null; // Assigned in the constructor; a field initializer here would run after the // resolver is captured and clobber it (ES2022 class-field semantics). @@ -229,6 +234,7 @@ export class WarmTransferTask extends AgentTask { async onEnter(): Promise { const jobCtx = getJobContext(); + this._jobCtx = jobCtx; this._callerRoom = jobCtx.room; if (this._holdAudio !== null) { @@ -338,8 +344,16 @@ export class WarmTransferTask extends AgentTask { this._callerRoom.off(RoomEvent.ParticipantDisconnected, this.onCallerParticipantDisconnected); - const rooms = new RoomServiceClient(getJobContext().info.url); - void rooms.deleteRoom(this._callerRoom.name).catch((error) => { + // Use the eagerly-captured job context: this callback runs from a native + // rtc-node FFI event, where getJobContext() reads an empty/stale + // AsyncLocalStorage store and would throw as an unhandled rejection. + const jobCtx = this._jobCtx; + if (!jobCtx) { + this._logger.warn('no job context captured, cannot delete caller room'); + return; + } + const callerRoomName = this._callerRoom.name; + void jobCtx.deleteRoom(callerRoomName).catch((error) => { this._logger.warn({ error }, 'failed to delete caller room'); }); }; @@ -518,7 +532,8 @@ export class WarmTransferTask extends AgentTask { 'moving human agent to caller room', ); - const rooms = new RoomServiceClient(getJobContext().info.url); + const info = (this._jobCtx ?? getJobContext()).info; + const rooms = new RoomServiceClient(info.url, info.apiKey, info.apiSecret); await rooms.moveParticipant( this._humanAgentRoom.name, this._humanAgentIdentity, diff --git a/agents/src/voice/room_io/_output.test.ts b/agents/src/voice/room_io/_output.test.ts index 24b03ba8d..ecea3a7bb 100644 --- a/agents/src/voice/room_io/_output.test.ts +++ b/agents/src/voice/room_io/_output.test.ts @@ -167,6 +167,10 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { playbackFinishedCount: number; playbackFinishedFuture: Future; onPlaybackStarted: (createdAt: number) => void; + options: { queueSizeMs?: number }; + recentFrames: unknown[]; + recentFramesMs: number; + replayFrames: unknown[]; audioSource: { clearQueue: () => void; captureFrame: (frame: CaptureFrameArg) => Promise; @@ -187,6 +191,11 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { output.playbackFinishedCount = 0; output.playbackFinishedFuture = new Future(); output.onPlaybackStarted = vi.fn(); + // Object.create bypasses the constructor's field initializers; mirror them. + output.options = { queueSizeMs: 1000 }; + output.recentFrames = []; + output.recentFramesMs = 0; + output.replayFrames = []; output.audioSource = { clearQueue: vi.fn(), captureFrame: vi.fn(async () => {}) }; return output; }; @@ -220,3 +229,133 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => { expect(output.pushedDuration).toBeGreaterThan(0); }); }); + +/** + * Regression tests for the false-interruption audio loss fix. + * + * Before the fix, pause() called clearQueue() which permanently dropped every + * frame already pushed to the native AudioSource queue (up to queueSizeMs). On a + * false interruption (pause then resume) the agent never replayed them, so up to + * ~1s of audio (rtc-node default queue) vanished from both the call and the + * recording. The output now keeps a rolling window of recently pushed frames, + * captures the unplayed tail on pause(), and replays it on the next captureFrame + * after resume() — while discarding it on a real interruption (clearBuffer()). + */ +describe('ParticipantAudioOutput false-interruption replay', () => { + const FRAME_MS = 20; + const SR = 48000; + const SPF = (SR * FRAME_MS) / 1000; + + type ReplayOutput = ParticipantAudioOutput & { + startedFuture: Future; + playbackEnabledFuture: Future; + interruptedFuture: Future; + firstFrameEmitted: boolean; + pushedDuration: number; + _capturing: boolean; + playbackSegmentsCount: number; + playbackFinishedCount: number; + playbackFinishedFuture: Future; + onPlaybackStarted: (createdAt: number) => void; + options: { queueSizeMs?: number }; + recentFrames: unknown[]; + recentFramesMs: number; + replayFrames: unknown[]; + audioSource: { + clearQueue: () => void; + captureFrame: (frame: CaptureFrameArg) => Promise; + queuedDuration: number; + }; + }; + + // Tag each frame by id in data[0] so captured ids reveal exact ordering, lost + // frames (missing id), and replayed frames (duplicate id). + const frameOf = (id: number): CaptureFrameArg => { + const data = new Int16Array(SPF); + data[0] = id; + return { samplesPerChannel: SPF, sampleRate: SR, data } as unknown as CaptureFrameArg; + }; + + const makeOutput = (queueSizeMs: number, captured: number[]): ReplayOutput => { + const output = Object.create(ParticipantAudioOutput.prototype) as ReplayOutput; + output.startedFuture = new Future(); + output.startedFuture.resolve(); + output.playbackEnabledFuture = new Future(); + output.playbackEnabledFuture.resolve(); + output.interruptedFuture = new Future(); + output.firstFrameEmitted = false; + output.pushedDuration = 0; + output._capturing = false; + output.playbackSegmentsCount = 0; + output.playbackFinishedCount = 0; + output.playbackFinishedFuture = new Future(); + output.onPlaybackStarted = vi.fn(); + // Object.create bypasses the constructor's field initializers; mirror them. + output.options = { queueSizeMs }; + output.recentFrames = []; + output.recentFramesMs = 0; + output.replayFrames = []; + output.audioSource = { + clearQueue: vi.fn(), + queuedDuration: 0, + captureFrame: vi.fn(async (frame: CaptureFrameArg) => { + captured.push((frame as unknown as { data: Int16Array }).data[0]!); + }), + }; + return output; + }; + + it('replays the unplayed tail on resume (false interruption) — zero loss', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + // 100ms == 5 frames still queued (unplayed) when the false interruption hits. + output.audioSource.queuedDuration = 100; + output.pause(); + output.resume(); + + await output.captureFrame(frameOf(10)); + + // initial 0..9, then the unplayed tail 5..9 replayed, then 10 — nothing lost. + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10]); + }); + + it('discards the unplayed tail on clearBuffer (real interruption) — no replay', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + output.audioSource.queuedDuration = 100; + output.pause(); + output.clearBuffer(); // real interruption: the user cut the agent off + output.resume(); + + await output.captureFrame(frameOf(10)); + + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); + + it('does not replay when nothing was queued at pause', async () => { + const captured: number[] = []; + const output = makeOutput(100, captured); + + for (let i = 0; i < 10; i++) { + await output.captureFrame(frameOf(i)); + } + + output.audioSource.queuedDuration = 0; + output.pause(); + output.resume(); + + await output.captureFrame(frameOf(10)); + + expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); +}); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index 11dd8eb71..a30d71be8 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -385,6 +385,18 @@ export class ParticipantAudioOutput extends AudioOutput { /** Gate held closed while the output is paused; frame forwarding awaits it. */ private playbackEnabledFuture: Future = new Future(); + // Rolling window of the most recently pushed frames (covering ~queueSizeMs), + // used to recover the unplayed tail that clearQueue() drops on pause(). On a + // false interruption (pause -> resume) these are replayed so no audio is lost; + // on a real interruption (clearBuffer) they are discarded. + private recentFrames: AudioFrame[] = []; + private recentFramesMs: number = 0; + private replayFrames: AudioFrame[] = []; + + private static frameMs(frame: AudioFrame): number { + return (frame.samplesPerChannel / frame.sampleRate) * 1000; + } + constructor(room: Room, options: AudioOutputOptions) { super(options.sampleRate, undefined, { pause: true }); this.room = room; @@ -401,6 +413,19 @@ export class ParticipantAudioOutput extends AudioOutput { if (this.playbackEnabledFuture.done) { this.playbackEnabledFuture = new Future(); } + // Capture the unplayed tail (what clearQueue is about to drop) so a false + // interruption can replay it on resume instead of losing it forever. + const queuedMs = this.audioSource.queuedDuration; + if (queuedMs > 0 && this.recentFrames.length > 0) { + const tail: AudioFrame[] = []; + let acc = 0; + for (let i = this.recentFrames.length - 1; i >= 0 && acc < queuedMs; i--) { + const f = this.recentFrames[i]!; + tail.unshift(f); + acc += ParticipantAudioOutput.frameMs(f); + } + this.replayFrames = tail; + } // Drop already-buffered audio so playback stops promptly instead of draining the prebuffer. this.audioSource.clearQueue(); super.pause(); @@ -434,6 +459,18 @@ export class ParticipantAudioOutput extends AudioOutput { } } + // Replay the unplayed tail that pause() dropped, when the pause ended in a + // resume (false interruption). These frames were already counted in + // pushedDuration on their first push, so don't recount them. + if (this.replayFrames.length > 0) { + const replay = this.replayFrames; + this.replayFrames = []; + for (const rf of replay) { + await this.audioSource.captureFrame(rf); + this.trackRecentFrame(rf); + } + } + // Count the playback segment only after the pause/interrupt gate above. super.captureFrame // bumps playbackSegmentsCount; if a frame interrupted-while-paused bailed at the gate after // that bump, the count would strand ahead of playbackFinishedCount and the next @@ -448,6 +485,25 @@ export class ParticipantAudioOutput extends AudioOutput { // TODO(AJS-102): use frame.durationMs once available in rtc-node this.pushedDuration += frame.samplesPerChannel / frame.sampleRate; await this.audioSource.captureFrame(frame); + this.trackRecentFrame(frame); + } + + // Maintain a rolling window of recently pushed frames covering ~queueSizeMs, + // enough to recover the unplayed tail (<= queuedDuration) that pause() drops. + private trackRecentFrame(frame: AudioFrame): void { + // Keep a little headroom over queueSizeMs: queuedDuration can momentarily + // exceed the nominal queue size by up to one frame, and we must retain enough + // history to recover the entire unplayed tail. + const cap = (this.options.queueSizeMs ?? 1000) + 200; + this.recentFrames.push(frame); + this.recentFramesMs += ParticipantAudioOutput.frameMs(frame); + while ( + this.recentFrames.length > 1 && + this.recentFramesMs - ParticipantAudioOutput.frameMs(this.recentFrames[0]!) >= cap + ) { + const dropped = this.recentFrames.shift()!; + this.recentFramesMs -= ParticipantAudioOutput.frameMs(dropped); + } } private async waitForPlayoutTask(abortController: AbortController): Promise { @@ -483,6 +539,13 @@ export class ParticipantAudioOutput extends AudioOutput { this.pushedDuration = 0; this.firstFrameEmitted = false; + // Segment finished: drop the rolling window. On an interruption also drop any + // pending replay tail (the user chose to cut the agent off). + this.recentFrames = []; + this.recentFramesMs = 0; + if (interrupted) { + this.replayFrames = []; + } this.onPlaybackFinished({ playbackPosition: pushedDuration, @@ -522,6 +585,7 @@ export class ParticipantAudioOutput extends AudioOutput { } clearBuffer(): void { + this.replayFrames = []; // Signal interruption even if no frame has been pushed yet, so a gated captureFrame can bail. if (!this.interruptedFuture.done) { this.interruptedFuture.resolve(); diff --git a/agents/src/voice/room_io/room_io.ts b/agents/src/voice/room_io/room_io.ts index ed5315bf7..d5f1e40c6 100644 --- a/agents/src/voice/room_io/room_io.ts +++ b/agents/src/voice/room_io/room_io.ts @@ -137,6 +137,10 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = { syncTranscription: true, audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }), jsonFormat: false, + // Match Python (_output.py: queue_size_ms=200). The rtc-node AudioSource + // default is 1000ms; a smaller prebuffer keeps the playout queue close to + // realtime so interruptions take effect promptly. + queueSizeMs: 200, }; export class RoomIO {