Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/openai-reconnect-tool-call-anchors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@livekit/agents': patch
'@livekit/agents-plugin-openai': patch
---

Discard stale OpenAI Realtime tool outputs after reconnect when their original function-call anchor is no longer present in the active realtime session.
34 changes: 32 additions & 2 deletions agents/src/voice/agent_activity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
*/
import { Heap } from 'heap-js';
import { describe, expect, it, vi } from 'vitest';
import { ChatContext } from '../llm/chat_context.js';
import { ChatContext, FunctionCall, FunctionCallOutput } from '../llm/chat_context.js';
import { LLM, type LLMStream } from '../llm/llm.js';
import { Future, Task } from '../utils.js';
import { _getActivityTaskInfo } from './agent.js';
import { AgentActivity } from './agent_activity.js';
import { AgentActivity, filterFunctionCallOutputsForRealtimeSession } from './agent_activity.js';
import type { PreemptiveGenerationInfo } from './audio_recognition.js';
import { SpeechHandle } from './speech_handle.js';

Expand Down Expand Up @@ -131,6 +131,36 @@ function buildMainTaskRunner() {
};
}

describe('filterFunctionCallOutputsForRealtimeSession', () => {
it('keeps only outputs whose function call is still in the realtime session', () => {
const liveOutput = FunctionCallOutput.create({
callId: 'call_live',
name: 'lookup',
output: '{"ok":true}',
isError: false,
});
const staleOutput = FunctionCallOutput.create({
callId: 'call_stale',
name: 'lookup',
output: '{"ok":false}',
isError: false,
});
const chatCtx = new ChatContext([
FunctionCall.create({
callId: 'call_live',
name: 'lookup',
args: '{}',
}),
]);

const { currentFunctionCallOutputs, staleFunctionCallOutputs } =
filterFunctionCallOutputsForRealtimeSession(chatCtx, [liveOutput, staleOutput]);

expect(currentFunctionCallOutputs).toEqual([liveOutput]);
expect(staleFunctionCallOutputs).toEqual([staleOutput]);
});
});

describe('AgentActivity - mainTask', () => {
it('should recover when speech handle is interrupted after authorization', async () => {
const { fakeActivity, mainTask, speechQueue, q_updated } = buildMainTaskRunner();
Expand Down
76 changes: 70 additions & 6 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,34 @@ interface PausedSpeechInfo {
timeout: number;
}

/** @internal */
export function filterFunctionCallOutputsForRealtimeSession(
chatCtx: ChatContext,
functionCallOutputs: FunctionCallOutput[],
): {
currentFunctionCallOutputs: FunctionCallOutput[];
staleFunctionCallOutputs: FunctionCallOutput[];
} {
const currentFunctionCallIds = new Set<string>();
for (const item of chatCtx.items) {
if (item.type === 'function_call') {
currentFunctionCallIds.add(item.callId);
}
}

const currentFunctionCallOutputs: FunctionCallOutput[] = [];
const staleFunctionCallOutputs: FunctionCallOutput[] = [];
for (const output of functionCallOutputs) {
if (currentFunctionCallIds.has(output.callId)) {
currentFunctionCallOutputs.push(output);
} else {
staleFunctionCallOutputs.push(output);
}
}

return { currentFunctionCallOutputs, staleFunctionCallOutputs };
}

export class AgentActivity implements RecognitionHooks {
agent: Agent;
agentSession: AgentSession;
Expand Down Expand Up @@ -3601,8 +3629,12 @@ export class AgentActivity implements RecognitionHooks {
return;
}

const { functionToolsExecutedEvent, shouldGenerateToolReply, newAgentTask, ignoreTaskSwitch } =
this.summarizeToolExecutionOutput(toolOutput, speechHandle);
const {
functionToolsExecutedEvent,
replyRequiredFunctionCallIds,
newAgentTask,
ignoreTaskSwitch,
} = this.summarizeToolExecutionOutput(toolOutput, speechHandle);

this.agentSession.emit(
AgentSessionEventTypes.FunctionToolsExecuted,
Expand All @@ -3615,6 +3647,8 @@ export class AgentActivity implements RecognitionHooks {
schedulingPaused = true;
}

let shouldGenerateToolReply = false;

if (functionToolsExecutedEvent.functionCallOutputs.length > 0) {
// wait all speeches played before updating the tool output and generating the response
// most realtime models dont support generating multiple responses at the same time
Expand All @@ -3630,13 +3664,40 @@ export class AgentActivity implements RecognitionHooks {
await new ThrowsPromise<void, never>((resolve) => setImmediate(resolve));
}
}
const chatCtx = realtimeSession.chatCtx.copy();
chatCtx.items.push(...functionToolsExecutedEvent.functionCallOutputs);
const { currentFunctionCallOutputs, staleFunctionCallOutputs } =
filterFunctionCallOutputsForRealtimeSession(
realtimeSession.chatCtx,
functionToolsExecutedEvent.functionCallOutputs as FunctionCallOutput[],
);

this.agentSession._toolItemsAdded(
functionToolsExecutedEvent.functionCallOutputs as FunctionCallOutput[],
if (staleFunctionCallOutputs.length > 0) {
this.logger.warn(
{
callIds: staleFunctionCallOutputs.map((output) => output.callId),
},
'discarding tool outputs for function calls no longer present in the realtime session',
);
}

if (currentFunctionCallOutputs.length === 0) {
if (this.agentSession.agentState === 'thinking') {
this.agentSession._updateAgentState('listening');
if (this.audioRecognition) {
this.audioRecognition.onEndOfAgentSpeech(Date.now());
}
}
return;
}

shouldGenerateToolReply = currentFunctionCallOutputs.some((output) =>
replyRequiredFunctionCallIds.has(output.callId),
);

const chatCtx = realtimeSession.chatCtx.copy();
chatCtx.items.push(...currentFunctionCallOutputs);

this.agentSession._toolItemsAdded(currentFunctionCallOutputs);
Comment on lines +3682 to +3699

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Orphaned function calls in agent/session chat context after stale output discard

When tools execute in the realtime path, onToolExecutionStarted at agents/src/voice/agent_activity.ts:3537-3540 eagerly adds FunctionCall items to both agent._chatCtx and agentSession._chatCtx (via _toolItemsAdded). If a reconnect subsequently makes those tool outputs stale, the new filter at line 3667-3694 correctly prevents the stale FunctionCallOutput items from being sent to the realtime session and from being added to agentSession._chatCtx. However, the corresponding FunctionCall entries remain in both agent._chatCtx and agentSession._chatCtx without matching outputs — creating orphaned function call items in the local chat histories. This is unlikely to cause immediate failures since these contexts are rebuilt on the next reconnect (function calls are excluded via chatCtxForReconnect), but it could affect downstream consumers of agentSession.history that expect function calls to always have corresponding outputs.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, but I think we should leave local history cleanup out of this PR.

The function call did occur and may already have been emitted to observers, while the completed output is intentionally discarded because the reconnected realtime session no longer has the matching call anchor. Removing the local FunctionCall would be a broader history-mutation behavior change, and adding the stale output locally would risk reintroducing confusing context for future syncs.

I also checked LiveKit's Python SDK for parity: its OpenAI realtime reconnect path excludes function calls/outputs from the rebuilt provider context, while the voice layer records tool items locally rather than removing the original function call on reconnect. This PR keeps the fix focused on preventing stale outputs from being sent to a provider session that cannot know about the original call.


// If the realtime model auto-generates the tool reply, install a
// placeholder so the active RunResult waits for that reply.
let fut: Future<void, never> | undefined;
Expand Down Expand Up @@ -3735,6 +3796,7 @@ export class AgentActivity implements RecognitionHooks {
});

let shouldGenerateToolReply = false;
const replyRequiredFunctionCallIds = new Set<string>();
let newAgentTask: Agent | null = null;
let ignoreTaskSwitch = false;

Expand All @@ -3745,6 +3807,7 @@ export class AgentActivity implements RecognitionHooks {
functionToolsExecutedEvent.functionCallOutputs.push(sanitizedOut.toolCallOutput);
if (sanitizedOut.replyRequired) {
shouldGenerateToolReply = true;
replyRequiredFunctionCallIds.add(sanitizedOut.toolCall.callId);
}
}

Expand All @@ -3770,6 +3833,7 @@ export class AgentActivity implements RecognitionHooks {
return {
functionToolsExecutedEvent,
shouldGenerateToolReply,
replyRequiredFunctionCallIds,
newAgentTask,
ignoreTaskSwitch,
};
Expand Down
77 changes: 77 additions & 0 deletions plugins/openai/src/realtime/realtime_model.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ type ResponseDoneSessionInternals = {
};
};

type ReconnectSessionInternals = {
remoteChatCtx: llm.RemoteChatContext;
chatCtxForReconnect: () => llm.ChatContext;
};

type GenerationCleanupSessionInternals = {
currentGeneration?: ResponseDoneSessionInternals['currentGeneration'];
closeCurrentGeneration: (reason: string) => void;
};

function createSessionForTest(): RealtimeSessionInternals {
const session = Object.create(RealtimeSession.prototype) as RealtimeSessionInternals;
session.responseCreatedFutures = {};
Expand Down Expand Up @@ -618,6 +628,73 @@ describe('livekitItemToOpenAIItem', () => {
});
});

describe('RealtimeSession reconnect chat context', () => {
afterEach(() => {
vi.restoreAllMocks();
});

it('excludes function call items when rebuilding context after reconnect', () => {
stubTaskRuntime();

const model = new RealtimeModel({ apiKey: 'test-key' });
const session = model.session() as unknown as ReconnectSessionInternals;
session.remoteChatCtx = new llm.RemoteChatContext();

session.remoteChatCtx.insert(
undefined,
llm.ChatMessage.create({
id: 'msg_user',
role: 'user',
content: ['what is the weather?'],
}),
);
session.remoteChatCtx.insert(
'msg_user',
new llm.FunctionCall({
id: 'func_call',
callId: 'call_weather',
name: 'get_weather',
args: '{"location":"San Francisco"}',
}),
);
session.remoteChatCtx.insert(
'func_call',
new llm.FunctionCallOutput({
id: 'func_output',
callId: 'call_weather',
name: 'get_weather',
output: '{"temperature":72}',
isError: false,
}),
);

const chatCtx = session.chatCtxForReconnect();

expect(chatCtx.items.map((item) => item.type)).toEqual(['message']);
});

it('closes the current generation when cleaning up a reconnected session', () => {
stubTaskRuntime();

const model = new RealtimeModel({ apiKey: 'test-key' });
const session = model.session() as unknown as GenerationCleanupSessionInternals;
const doneFut = new Future<void>();

session.currentGeneration = {
messageChannel: stream.createStreamChannel<llm.MessageGeneration>(),
functionChannel: stream.createStreamChannel<llm.FunctionCall>(),
messages: new Map<string, never>(),
_doneFut: doneFut,
_createdTimestamp: Date.now(),
};

session.closeCurrentGeneration('session reconnection');

expect(doneFut.done).toBe(true);
expect(session.currentGeneration).toBeUndefined();
});
});

describe('RealtimeSession.updateOptions', () => {
afterEach(() => {
vi.restoreAllMocks();
Expand Down
63 changes: 40 additions & 23 deletions plugins/openai/src/realtime/realtime_model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,7 @@ export class RealtimeSession extends llm.RealtimeSession {
}
}
this.responseCreatedFutures = {};
this.closeCurrentGeneration('session reconnection');

// Clear audio-capable item tracking - restored items are text-only on the server
this.audioCapableItemIds.clear();
Expand All @@ -1061,11 +1062,7 @@ export class RealtimeSession extends llm.RealtimeSession {
}

// chat context
const chatCtx = this.chatCtx.copy({
excludeFunctionCall: true,
excludeInstructions: true,
excludeEmptyMessage: true,
});
const chatCtx = this.chatCtxForReconnect();

const oldChatCtx = this.remoteChatCtx;
this.remoteChatCtx = new llm.RemoteChatContext();
Expand Down Expand Up @@ -1150,6 +1147,14 @@ export class RealtimeSession extends llm.RealtimeSession {
}
}

private chatCtxForReconnect(): llm.ChatContext {
return this.chatCtx.copy({
excludeFunctionCall: true,
excludeInstructions: true,
excludeEmptyMessage: true,
});
}

private async runWs(wsConn: WebSocket): Promise<void> {
const forwardEvents = async (signal: AbortSignal): Promise<void> => {
const abortFuture = new Future<void>();
Expand Down Expand Up @@ -1336,29 +1341,41 @@ export class RealtimeSession extends llm.RealtimeSession {
this.itemDeleteFutures = {};

this.inputTranscriptAccumulators.clear();

// Clean up current generation if exists
if (this.currentGeneration) {
for (const gen of this.currentGeneration.messages.values()) {
gen.textChannel.close();
gen.audioChannel.close();
if (!gen.modalities.done) {
gen.modalities.resolve(this._options.modalities);
}
}
this.currentGeneration.messages.clear();
this.currentGeneration.messageChannel.close();
this.currentGeneration.functionChannel.close();
if (!this.currentGeneration._doneFut.done) {
this.currentGeneration._doneFut.resolve();
}
this.currentGeneration = undefined;
}
this.closeCurrentGeneration('session close');

// Clear the message queue
this.messageChannel.items.length = 0;
}

private closeCurrentGeneration(reason: string): void {
if (!this.currentGeneration) {
return;
}

this.#logger.debug(
{
reason,
messageCount: this.currentGeneration.messages.size,
},
'Closing current OpenAI Realtime generation',
);

for (const gen of this.currentGeneration.messages.values()) {
gen.textChannel.close();
gen.audioChannel.close();
if (!gen.modalities.done) {
gen.modalities.resolve(this._options.modalities);
}
}
this.currentGeneration.messages.clear();
this.currentGeneration.messageChannel.close();
this.currentGeneration.functionChannel.close();
if (!this.currentGeneration._doneFut.done) {
this.currentGeneration._doneFut.resolve();
}
this.currentGeneration = undefined;
}

private handleInputAudioBufferSpeechStarted(
_event: api_proto.InputAudioBufferSpeechStartedEvent,
): void {
Expand Down