diff --git a/.changeset/await-realtime-chatctx-sync.md b/.changeset/await-realtime-chatctx-sync.md new file mode 100644 index 000000000..02763d3cf --- /dev/null +++ b/.changeset/await-realtime-chatctx-sync.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Await active realtime chat context updates through `Agent.updateChatCtx()` so callers can reliably sequence follow-up model turns after conversation item sync completes. diff --git a/agents/src/voice/agent.test.ts b/agents/src/voice/agent.test.ts index 8dd83fee3..8de3bb379 100644 --- a/agents/src/voice/agent.test.ts +++ b/agents/src/voice/agent.test.ts @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { describe, expect, it, vi } from 'vitest'; import { z } from 'zod'; +import { ChatContext } from '../llm/chat_context.js'; import { tool } from '../llm/index.js'; import { initializeLogger } from '../log.js'; import { Task } from '../utils.js'; @@ -24,6 +25,49 @@ describe('Agent', () => { expect(agent.instructions).toBe(instructions); }); + it('should wait for active activity chat context updates', async () => { + const agent = new Agent({ instructions: 'test' }); + const chatCtx = new ChatContext(); + let resolveUpdate: () => void = () => { + throw new Error('update promise was not initialized'); + }; + const updatePromise = new Promise((resolve) => { + resolveUpdate = resolve; + }); + const updateChatCtx = vi.fn(() => updatePromise); + ( + agent as unknown as { _agentActivity: { updateChatCtx: typeof updateChatCtx } } + )._agentActivity = { updateChatCtx }; + + let settled = false; + const update = agent.updateChatCtx(chatCtx).then(() => { + settled = true; + }); + + await Promise.resolve(); + + expect(updateChatCtx).toHaveBeenCalledWith(chatCtx); + expect(settled).toBe(false); + + resolveUpdate(); + await update; + + expect(settled).toBe(true); + }); + + it('should propagate active activity chat context update failures', async () => { + const agent = new Agent({ instructions: 'test' }); + const chatCtx = new ChatContext(); + const error = new Error('update failed'); + const updateChatCtx = vi.fn(() => Promise.reject(error)); + ( + agent as unknown as { _agentActivity: { updateChatCtx: typeof updateChatCtx } } + )._agentActivity = { updateChatCtx }; + + await expect(agent.updateChatCtx(chatCtx)).rejects.toBe(error); + expect(updateChatCtx).toHaveBeenCalledWith(chatCtx); + }); + it('should create agent with instructions and tools', () => { const instructions = 'You are a helpful assistant with tools'; @@ -64,6 +108,57 @@ describe('Agent', () => { expect(agentTools.getTool2?.description).toBe('Second test tool'); }); + it('should wait for realtime session chat context updates', async () => { + const agent = new Agent({ instructions: 'test' }); + const activity = Object.create(AgentActivity.prototype) as AgentActivity & { + agent: Agent; + realtimeSession: { updateChatCtx: ReturnType }; + }; + const chatCtx = new ChatContext(); + let resolveUpdate: () => void = () => { + throw new Error('update promise was not initialized'); + }; + const updatePromise = new Promise((resolve) => { + resolveUpdate = resolve; + }); + activity.agent = agent; + activity.realtimeSession = { + updateChatCtx: vi.fn(() => updatePromise), + }; + + let settled = false; + const update = activity.updateChatCtx(chatCtx).then(() => { + settled = true; + }); + + await Promise.resolve(); + + expect(activity.realtimeSession.updateChatCtx).toHaveBeenCalledOnce(); + expect(settled).toBe(false); + + resolveUpdate(); + await update; + + expect(settled).toBe(true); + }); + + it('should propagate realtime session chat context update failures', async () => { + const agent = new Agent({ instructions: 'test' }); + const activity = Object.create(AgentActivity.prototype) as AgentActivity & { + agent: Agent; + realtimeSession: { updateChatCtx: ReturnType }; + }; + const chatCtx = new ChatContext(); + const error = new Error('realtime update failed'); + activity.agent = agent; + activity.realtimeSession = { + updateChatCtx: vi.fn(() => Promise.reject(error)), + }; + + await expect(activity.updateChatCtx(chatCtx)).rejects.toBe(error); + expect(activity.realtimeSession.updateChatCtx).toHaveBeenCalledOnce(); + }); + it('should return a copy of tools, not the original reference', () => { const instructions = 'You are a helpful assistant'; const mockTool = tool({ diff --git a/agents/src/voice/agent.ts b/agents/src/voice/agent.ts index c6f7b82df..3a30e3cb3 100644 --- a/agents/src/voice/agent.ts +++ b/agents/src/voice/agent.ts @@ -343,7 +343,7 @@ export class Agent { return; } - this._agentActivity.updateChatCtx(chatCtx); + await this._agentActivity.updateChatCtx(chatCtx); } // TODO: Add when AgentConfigUpdate is ported to ChatContext. diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 21a6a9fac..cbba3ec08 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -853,7 +853,7 @@ export class AgentActivity implements RecognitionHooks { if (this.realtimeSession) { removeInstructions(chatCtx); - this.realtimeSession.updateChatCtx(chatCtx); + await this.realtimeSession.updateChatCtx(chatCtx); } else { updateInstructions({ chatCtx,