Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/false-interruption-audio-replay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@livekit/agents': patch
---

fix(room_io): stop dropping audio on false interruptions

`ParticipantAudioOutput.pause()` cleared the entire native `AudioSource` queue, permanently discarding up to `queueSizeMs` (rtc-node default: 1000ms) of generated-but-unplayed audio. On a false interruption (pause then resume) those frames were never replayed, so up to ~1s of agent speech was lost mid-sentence from both the live call and the observability recording. The output now keeps a rolling window of recently pushed frames, captures the unplayed tail on pause, and replays it on resume, while still discarding it on a real interruption (`clearBuffer()`).
7 changes: 7 additions & 0 deletions .changeset/warm-transfer-job-context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@livekit/agents': patch
---

fix(warm-transfer): capture job context for post-merge caller-room cleanup

`WarmTransferTask`'s post-merge `RoomEvent.ParticipantDisconnected` listener called `getJobContext()` to build a `RoomServiceClient` and delete the caller room. That handler runs from a native rtc-node FFI callback whose `AsyncLocalStorage` context is pinned to `FfiClient`-singleton creation, not to the job's context — so `getJobContext()` read an empty (or stale) store and threw, surfacing as an unhandled promise rejection and leaving the 2-party SIP room undeleted when a participant hung up after the bridge. The task now captures the `JobContext` eagerly in `onEnter()` (while the live context is available) and uses `jobCtx.deleteRoom()` in the late handler, which also passes the job's API credentials instead of relying on environment variables.
23 changes: 19 additions & 4 deletions agents/src/beta/workflows/warm_transfer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { type DisconnectReason, type ParticipantKind, Room, RoomEvent } from '@l
import { AccessToken, RoomServiceClient, SipClient, type VideoGrant } from 'livekit-server-sdk';
import { z } from 'zod';
import type { LLMModels, STTModelString, TTSModelString } from '../../inference/index.js';
import { getJobContext } from '../../job.js';
import { type JobContext, getJobContext } from '../../job.js';
import type {
ChatContext,
Instructions,
Expand Down Expand Up @@ -83,6 +83,11 @@ export interface WarmTransferTaskOptions {
export class WarmTransferTask extends AgentTask<WarmTransferResult> {
private _callerRoom: Room | null = null;
private _humanAgentRoom: Room | null = null;
// Captured while the task runs inside the live job context. The post-merge
// caller-room cleanup listener fires from a native rtc-node FFI callback whose
// AsyncLocalStorage context is pinned to FfiClient-singleton creation, so
// getJobContext() is unreliable there; we capture the context eagerly instead.
private _jobCtx: JobContext | null = null;
private _humanAgentSession: AgentSession | null = null;
// Assigned in the constructor; a field initializer here would run after the
// resolver is captured and clobber it (ES2022 class-field semantics).
Expand Down Expand Up @@ -229,6 +234,7 @@ export class WarmTransferTask extends AgentTask<WarmTransferResult> {

async onEnter(): Promise<void> {
const jobCtx = getJobContext();
this._jobCtx = jobCtx;
this._callerRoom = jobCtx.room;

if (this._holdAudio !== null) {
Expand Down Expand Up @@ -338,8 +344,16 @@ export class WarmTransferTask extends AgentTask<WarmTransferResult> {

this._callerRoom.off(RoomEvent.ParticipantDisconnected, this.onCallerParticipantDisconnected);

const rooms = new RoomServiceClient(getJobContext().info.url);
void rooms.deleteRoom(this._callerRoom.name).catch((error) => {
// Use the eagerly-captured job context: this callback runs from a native
// rtc-node FFI event, where getJobContext() reads an empty/stale
// AsyncLocalStorage store and would throw as an unhandled rejection.
const jobCtx = this._jobCtx;
if (!jobCtx) {
this._logger.warn('no job context captured, cannot delete caller room');
return;
}
const callerRoomName = this._callerRoom.name;
void jobCtx.deleteRoom(callerRoomName).catch((error) => {
this._logger.warn({ error }, 'failed to delete caller room');
});
};
Expand Down Expand Up @@ -518,7 +532,8 @@ export class WarmTransferTask extends AgentTask<WarmTransferResult> {
'moving human agent to caller room',
);

const rooms = new RoomServiceClient(getJobContext().info.url);
const info = (this._jobCtx ?? getJobContext()).info;
const rooms = new RoomServiceClient(info.url, info.apiKey, info.apiSecret);
await rooms.moveParticipant(
this._humanAgentRoom.name,
this._humanAgentIdentity,
Expand Down
139 changes: 139 additions & 0 deletions agents/src/voice/room_io/_output.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
playbackFinishedCount: number;
playbackFinishedFuture: Future<void>;
onPlaybackStarted: (createdAt: number) => void;
options: { queueSizeMs?: number };
recentFrames: unknown[];
recentFramesMs: number;
replayFrames: unknown[];
audioSource: {
clearQueue: () => void;
captureFrame: (frame: CaptureFrameArg) => Promise<void>;
Expand All @@ -187,6 +191,11 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
output.playbackFinishedCount = 0;
output.playbackFinishedFuture = new Future<void>();
output.onPlaybackStarted = vi.fn();
// Object.create bypasses the constructor's field initializers; mirror them.
output.options = { queueSizeMs: 1000 };
output.recentFrames = [];
output.recentFramesMs = 0;
output.replayFrames = [];
output.audioSource = { clearQueue: vi.fn(), captureFrame: vi.fn(async () => {}) };
return output;
};
Expand Down Expand Up @@ -220,3 +229,133 @@ describe('ParticipantAudioOutput captureFrame segment accounting', () => {
expect(output.pushedDuration).toBeGreaterThan(0);
});
});

/**
* Regression tests for the false-interruption audio loss fix.
*
* Before the fix, pause() called clearQueue() which permanently dropped every
* frame already pushed to the native AudioSource queue (up to queueSizeMs). On a
* false interruption (pause then resume) the agent never replayed them, so up to
* ~1s of audio (rtc-node default queue) vanished from both the call and the
* recording. The output now keeps a rolling window of recently pushed frames,
* captures the unplayed tail on pause(), and replays it on the next captureFrame
* after resume() — while discarding it on a real interruption (clearBuffer()).
*/
describe('ParticipantAudioOutput false-interruption replay', () => {
const FRAME_MS = 20;
const SR = 48000;
const SPF = (SR * FRAME_MS) / 1000;

type ReplayOutput = ParticipantAudioOutput & {
startedFuture: Future<void>;
playbackEnabledFuture: Future<void>;
interruptedFuture: Future<void>;
firstFrameEmitted: boolean;
pushedDuration: number;
_capturing: boolean;
playbackSegmentsCount: number;
playbackFinishedCount: number;
playbackFinishedFuture: Future<void>;
onPlaybackStarted: (createdAt: number) => void;
options: { queueSizeMs?: number };
recentFrames: unknown[];
recentFramesMs: number;
replayFrames: unknown[];
audioSource: {
clearQueue: () => void;
captureFrame: (frame: CaptureFrameArg) => Promise<void>;
queuedDuration: number;
};
};

// Tag each frame by id in data[0] so captured ids reveal exact ordering, lost
// frames (missing id), and replayed frames (duplicate id).
const frameOf = (id: number): CaptureFrameArg => {
const data = new Int16Array(SPF);
data[0] = id;
return { samplesPerChannel: SPF, sampleRate: SR, data } as unknown as CaptureFrameArg;
};

const makeOutput = (queueSizeMs: number, captured: number[]): ReplayOutput => {
const output = Object.create(ParticipantAudioOutput.prototype) as ReplayOutput;
output.startedFuture = new Future<void>();
output.startedFuture.resolve();
output.playbackEnabledFuture = new Future<void>();
output.playbackEnabledFuture.resolve();
output.interruptedFuture = new Future<void>();
output.firstFrameEmitted = false;
output.pushedDuration = 0;
output._capturing = false;
output.playbackSegmentsCount = 0;
output.playbackFinishedCount = 0;
output.playbackFinishedFuture = new Future<void>();
output.onPlaybackStarted = vi.fn();
// Object.create bypasses the constructor's field initializers; mirror them.
output.options = { queueSizeMs };
output.recentFrames = [];
output.recentFramesMs = 0;
output.replayFrames = [];
output.audioSource = {
clearQueue: vi.fn(),
queuedDuration: 0,
captureFrame: vi.fn(async (frame: CaptureFrameArg) => {
captured.push((frame as unknown as { data: Int16Array }).data[0]!);
}),
};
return output;
};

it('replays the unplayed tail on resume (false interruption) — zero loss', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

// 100ms == 5 frames still queued (unplayed) when the false interruption hits.
output.audioSource.queuedDuration = 100;
output.pause();
output.resume();

await output.captureFrame(frameOf(10));

// initial 0..9, then the unplayed tail 5..9 replayed, then 10 — nothing lost.
expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10]);
});

it('discards the unplayed tail on clearBuffer (real interruption) — no replay', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

output.audioSource.queuedDuration = 100;
output.pause();
output.clearBuffer(); // real interruption: the user cut the agent off
output.resume();

await output.captureFrame(frameOf(10));

expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});

it('does not replay when nothing was queued at pause', async () => {
const captured: number[] = [];
const output = makeOutput(100, captured);

for (let i = 0; i < 10; i++) {
await output.captureFrame(frameOf(i));
}

output.audioSource.queuedDuration = 0;
output.pause();
output.resume();

await output.captureFrame(frameOf(10));

expect(captured).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
});
64 changes: 64 additions & 0 deletions agents/src/voice/room_io/_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ export class ParticipantAudioOutput extends AudioOutput {
/** Gate held closed while the output is paused; frame forwarding awaits it. */
private playbackEnabledFuture: Future<void> = new Future();

// Rolling window of the most recently pushed frames (covering ~queueSizeMs),
// used to recover the unplayed tail that clearQueue() drops on pause(). On a
// false interruption (pause -> resume) these are replayed so no audio is lost;
// on a real interruption (clearBuffer) they are discarded.
private recentFrames: AudioFrame[] = [];
private recentFramesMs: number = 0;
private replayFrames: AudioFrame[] = [];

private static frameMs(frame: AudioFrame): number {
return (frame.samplesPerChannel / frame.sampleRate) * 1000;
}

constructor(room: Room, options: AudioOutputOptions) {
super(options.sampleRate, undefined, { pause: true });
this.room = room;
Expand All @@ -401,6 +413,19 @@ export class ParticipantAudioOutput extends AudioOutput {
if (this.playbackEnabledFuture.done) {
this.playbackEnabledFuture = new Future();
}
// Capture the unplayed tail (what clearQueue is about to drop) so a false
// interruption can replay it on resume instead of losing it forever.
const queuedMs = this.audioSource.queuedDuration;
if (queuedMs > 0 && this.recentFrames.length > 0) {
const tail: AudioFrame[] = [];
let acc = 0;
for (let i = this.recentFrames.length - 1; i >= 0 && acc < queuedMs; i--) {
const f = this.recentFrames[i]!;
tail.unshift(f);
acc += ParticipantAudioOutput.frameMs(f);
}
this.replayFrames = tail;
}
// Drop already-buffered audio so playback stops promptly instead of draining the prebuffer.
this.audioSource.clearQueue();
super.pause();
Expand Down Expand Up @@ -434,6 +459,18 @@ export class ParticipantAudioOutput extends AudioOutput {
}
}

// Replay the unplayed tail that pause() dropped, when the pause ended in a
// resume (false interruption). These frames were already counted in
// pushedDuration on their first push, so don't recount them.
if (this.replayFrames.length > 0) {
const replay = this.replayFrames;
this.replayFrames = [];
for (const rf of replay) {
await this.audioSource.captureFrame(rf);
this.trackRecentFrame(rf);
}
}

// Count the playback segment only after the pause/interrupt gate above. super.captureFrame
// bumps playbackSegmentsCount; if a frame interrupted-while-paused bailed at the gate after
// that bump, the count would strand ahead of playbackFinishedCount and the next
Expand All @@ -448,6 +485,25 @@ export class ParticipantAudioOutput extends AudioOutput {
// TODO(AJS-102): use frame.durationMs once available in rtc-node
this.pushedDuration += frame.samplesPerChannel / frame.sampleRate;
await this.audioSource.captureFrame(frame);
this.trackRecentFrame(frame);
}

// Maintain a rolling window of recently pushed frames covering ~queueSizeMs,
// enough to recover the unplayed tail (<= queuedDuration) that pause() drops.
private trackRecentFrame(frame: AudioFrame): void {
// Keep a little headroom over queueSizeMs: queuedDuration can momentarily
// exceed the nominal queue size by up to one frame, and we must retain enough
// history to recover the entire unplayed tail.
const cap = (this.options.queueSizeMs ?? 1000) + 200;
this.recentFrames.push(frame);
this.recentFramesMs += ParticipantAudioOutput.frameMs(frame);
while (
this.recentFrames.length > 1 &&
this.recentFramesMs - ParticipantAudioOutput.frameMs(this.recentFrames[0]!) >= cap
) {
const dropped = this.recentFrames.shift()!;
this.recentFramesMs -= ParticipantAudioOutput.frameMs(dropped);
}
}

private async waitForPlayoutTask(abortController: AbortController): Promise<void> {
Expand Down Expand Up @@ -483,6 +539,13 @@ export class ParticipantAudioOutput extends AudioOutput {

this.pushedDuration = 0;
this.firstFrameEmitted = false;
// Segment finished: drop the rolling window. On an interruption also drop any
// pending replay tail (the user chose to cut the agent off).
this.recentFrames = [];
this.recentFramesMs = 0;
if (interrupted) {
this.replayFrames = [];
Comment on lines +546 to +547

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Do not carry replay frames into the next utterance

When a false interruption happens after the last TTS frame has already been captured, pause() stores replayFrames and clears the native queue, and forwardAudio still calls audioOutput.flush() when the TTS stream ends (agents/src/voice/generation.ts:907). This non-interrupted finish clears only the rolling window, so the saved tail survives and the next reply's first captureFrame() will replay stale audio from the previous utterance before the new audio instead of resuming it in the original segment.

Useful? React with 👍 / 👎.

}
Comment on lines +546 to +548

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Unplayed audio tail from a previous response can leak into the next response

Saved replay frames are not discarded when a segment finishes without interruption (waitForPlayoutTask at agents/src/voice/room_io/_output.ts:546), so stale audio from one response can replay at the start of the next.

Impact: A listener may briefly hear the tail of the previous agent response at the beginning of the next response, causing a noticeable audio glitch.

Trigger: false interruption during playout after TTS flush
  1. TTS finishes generating, all frames are pushed via captureFrame, and flush() starts waitForPlayoutTask which awaits playout.
  2. While audio drains, a false interruption fires: pause() captures the unplayed tail into replayFrames (_output.ts:419-427) and calls clearQueue() (_output.ts:430).
  3. The queue is now empty, so waitForPlayout() resolves. waitForPlayoutTask sees interrupted=false (no clearBuffer was called) and skips the replayFrames = [] branch (_output.ts:546-548).
  4. recentFrames is cleared but replayFrames retains stale frames from the finished segment.
  5. When the next segment's first captureFrame runs, it replays the stale frames (_output.ts:465-472) before pushing the new audio, injecting old content into the new response.

The conditional clear at _output.ts:546-548 should be unconditional — once a segment finishes (regardless of how), leftover replay frames are no longer valid.

Suggested change
if (interrupted) {
this.replayFrames = [];
}
this.replayFrames = [];
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


this.onPlaybackFinished({
playbackPosition: pushedDuration,
Expand Down Expand Up @@ -522,6 +585,7 @@ export class ParticipantAudioOutput extends AudioOutput {
}

clearBuffer(): void {
this.replayFrames = [];
// Signal interruption even if no frame has been pushed yet, so a gated captureFrame can bail.
if (!this.interruptedFuture.done) {
this.interruptedFuture.resolve();
Expand Down
4 changes: 4 additions & 0 deletions agents/src/voice/room_io/room_io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ const DEFAULT_ROOM_OUTPUT_OPTIONS: RoomOutputOptions = {
syncTranscription: true,
audioPublishOptions: new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }),
jsonFormat: false,
// Match Python (_output.py: queue_size_ms=200). The rtc-node AudioSource
// default is 1000ms; a smaller prebuffer keeps the playout queue close to
// realtime so interruptions take effect promptly.
queueSizeMs: 200,
};
Comment on lines +140 to 144

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Behavioral change: default audio queue reduced from 1000ms to 200ms

The PR changes DEFAULT_ROOM_OUTPUT_OPTIONS.queueSizeMs from undefined (which fell through to the rtc-node AudioSource default of 1000ms) to 200 at agents/src/voice/room_io/room_io.ts:143. This is a 5x reduction in the audio prebuffer. The comment says it matches the Python SDK (_output.py: queue_size_ms=200). While this helps interruptions take effect more promptly, it also reduces the tolerance for TTS timing jitter — if TTS frames arrive in bursts with gaps >200ms, the smaller queue is more likely to underrun, potentially causing audible gaps. Existing users who relied on the 1000ms default may notice different behavior.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


export class RoomIO {
Expand Down
Loading