Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/typescript/ai-client/src/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,10 @@ export class ChatClient {
this.setStatus('submitted')
this.setError(undefined)
this.abortController = new AbortController()
// Capture the signal immediately so that a concurrent stop() or
// sendMessage() that reassigns this.abortController cannot cause
// connect() to receive a stale or null signal.
const signal = this.abortController.signal
// Reset pending tool executions for the new stream
this.pendingToolExecutions.clear()
let streamCompletedSuccessfully = false
Expand All @@ -456,7 +460,7 @@ export class ChatClient {
const stream = this.connection.connect(
messages,
mergedBody,
this.abortController.signal,
signal,
)

await this.processStream(stream)
Expand Down
96 changes: 96 additions & 0 deletions packages/typescript/ai-client/tests/chat-client-abort.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,100 @@ describe('ChatClient - Abort Signal Handling', () => {
// Each should be a different signal instance
expect(abortSignals[0]).not.toBe(abortSignals[1])
})

it('should pass the original signal to connect() even if stop() is called during onResponse', async () => {
let signalPassedToConnect: AbortSignal | undefined

const adapter: ConnectionAdapter = {
// eslint-disable-next-line @typescript-eslint/require-await
async *connect(_messages, _data, abortSignal) {
signalPassedToConnect = abortSignal
yield {
type: 'RUN_FINISHED',
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop',
}
},
}

const client = new ChatClient({
connection: adapter,
onResponse: () => {
// Simulate a concurrent stop() during the onResponse callback,
// which sets this.abortController to null. Without the fix,
// the code would dereference this.abortController.signal after
// this point and crash with a null reference.
client.stop()
},
})

await client.append({
id: 'user-1',
role: 'user',
parts: [{ type: 'text', content: 'Hello' }],
createdAt: new Date(),
})

// The signal should still be a valid AbortSignal instance
// (captured before the await), not undefined/null
expect(signalPassedToConnect).toBeInstanceOf(AbortSignal)
})

it('should pass the original signal to connect() even if sendMessage() reassigns abortController during onResponse', async () => {
const signalsPassedToConnect: Array<AbortSignal> = []

const adapter: ConnectionAdapter = {
// eslint-disable-next-line @typescript-eslint/require-await
async *connect(_messages, _data, abortSignal) {
if (abortSignal) {
signalsPassedToConnect.push(abortSignal)
}
yield {
type: 'RUN_FINISHED',
runId: 'run-1',
model: 'test',
timestamp: Date.now(),
finishReason: 'stop',
}
},
}

let firstCall = true
const client = new ChatClient({
connection: adapter,
onResponse: () => {
if (firstCall) {
firstCall = false
// Trigger a second message during onResponse callback.
// This queues a new streamResponse that would create a new
// AbortController, potentially overwriting this.abortController
// before the first connect() call reads the signal.
client.append({
id: 'user-2',
role: 'user',
parts: [{ type: 'text', content: 'Second message' }],
createdAt: new Date(),
})
}
},
})

await client.append({
id: 'user-1',
role: 'user',
parts: [{ type: 'text', content: 'Hello' }],
createdAt: new Date(),
})

// Wait for the queued second stream to complete
await new Promise((resolve) => setTimeout(resolve, 50))

// Both calls should have received valid, distinct AbortSignal instances
expect(signalsPassedToConnect.length).toBe(2)
expect(signalsPassedToConnect[0]).toBeInstanceOf(AbortSignal)
expect(signalsPassedToConnect[1]).toBeInstanceOf(AbortSignal)
expect(signalsPassedToConnect[0]).not.toBe(signalsPassedToConnect[1])
})
})