diff --git a/docs/content/docs/references/meta.json b/docs/content/docs/references/meta.json index febde356..94594db9 100644 --- a/docs/content/docs/references/meta.json +++ b/docs/content/docs/references/meta.json @@ -17,6 +17,7 @@ "[@xsai/utils-chat](https://doc.deno.land/https://esm.sh/@xsai/utils-chat)", "[@xsai/utils-reasoning](https://doc.deno.land/https://esm.sh/@xsai/utils-reasoning)", "[@xsai/utils-stream](https://doc.deno.land/https://esm.sh/@xsai/utils-stream)", + "[@xsai-ext/messages](https://doc.deno.land/https://esm.sh/@xsai-ext/messages)", "[@xsai-ext/providers](https://doc.deno.land/https://esm.sh/@xsai-ext/providers)", "[@xsai-ext/responses](https://doc.deno.land/https://esm.sh/@xsai-ext/responses)", "[@xsai-ext/telemetry](https://doc.deno.land/https://esm.sh/@xsai-ext/telemetry)", diff --git a/packages-ext/messages/LIVE_INVESTIGATION.md b/packages-ext/messages/LIVE_INVESTIGATION.md new file mode 100644 index 00000000..8c553793 --- /dev/null +++ b/packages-ext/messages/LIVE_INVESTIGATION.md @@ -0,0 +1,80 @@ +# Anthropic Messages Live Investigation + +Date: 2026-03-15 + +## xsAI changes made during investigation + +- Ignore trailing `data: [DONE]` sentinels in the streaming event parser. +- Stop defaulting Anthropic tool definitions to `strict: true`. +- Add a live verification script at `scripts/live.ts`. + +## Live results against current gateway + +Environment: + +- `ANTHROPIC_BASE_URL=https://copilot.api.menci.xyz` +- `ANTHROPIC_MODEL=claude-opus-4.6-1m` + +Actual API base used by the live script: + +- `https://copilot.api.menci.xyz/v1/` + +Results: + +- `countTokens`: failed. + - Gateway response: `{"error":{"type":"invalid_request_error","message":"Failed to count tokens: malloc is not a function"}}` +- `messages` basic stream: passed. +- `messages` thinking stream: passed. +- `messages` tool loop: passed when `tool_choice` was omitted and the prompt strongly required tool use. + +Additional manual verification: + +- Raw streaming output from the gateway ends with `data: [DONE]` after `event: message_stop`. +- `tool_choice: { type: 'tool', name: 'add' }` and `tool_choice: { type: 'any' }` both failed through the gateway with: + - `Thinking may not be enabled when tool_choice forces tool use.` + +## What this means for xsAI + +- Ignoring `[DONE]` is required for real proxy compatibility. +- Defaulting Anthropic tools to `strict: true` is too aggressive for compatibility layers. +- The remaining `countTokens` failure is not explained by xsAI request shape. + +## copilot-deno findings + +### Endpoint shape + +- The app only exposes `POST /v1/messages` and `POST /v1/messages/count_tokens`. +- It does not expose `/messages` or `/messages/count_tokens`. + +### Native messages proxy behavior + +- Native `/v1/messages` requests are forwarded through `proxySSE(...)` without filtering `[DONE]`. +- This is why xsAI must tolerate a terminal `[DONE]` line even though Anthropic-native streams do not require it. + +### Request rewriting + +- `web_search` tools are stripped before forwarding. +- The reserved keyword `x-anthropic-billing-header` is stripped from prompts. +- Invalid thinking blocks are removed before native forwarding. +- If the selected model advertises `adaptive_thinking`, the route overwrites the request with `payload.thinking = { type: 'adaptive' }` and also injects `output_config.effort = 'high'` when missing. + +### Tool schema shape + +- `copilot-deno`'s local Anthropic type for tools is: + - `name` + - `description?` + - `input_schema` +- It does not include `strict` in its local Anthropic route types. + +### Why `countTokens` fails on the deployed gateway + +- The route tries to use `@anthropic-ai/tokenizer` for Claude models. +- It only falls back to estimation if loading the tokenizer fails. +- It does not fall back if calling the tokenizer function throws. +- The deployed error `malloc is not a function` strongly suggests the tokenizer function is being loaded but then crashing at runtime in the deployed environment. + +## Suggested follow-up in copilot-deno + +- Add a second fallback around `countFn(extractPayloadText(payload))` so tokenizer invocation failures downgrade to estimation instead of returning 400. +- Decide whether native `/v1/messages` should strip trailing `[DONE]` when proxying Anthropic-compatible streams. +- Decide whether forced `tool_choice` should disable adaptive thinking rather than returning an upstream validation error. diff --git a/packages-ext/messages/README.md b/packages-ext/messages/README.md new file mode 100644 index 00000000..72bc1262 --- /dev/null +++ b/packages-ext/messages/README.md @@ -0,0 +1 @@ +https://xsai.js.org/docs/packages-ext/messages diff --git a/packages-ext/messages/package.json b/packages-ext/messages/package.json new file mode 100644 index 00000000..74fbc5da --- /dev/null +++ b/packages-ext/messages/package.json @@ -0,0 +1,49 @@ +{ + "name": "@xsai-ext/messages", + "type": "module", + "version": "0.4.4", + "description": "extra-small AI SDK.", + "author": "Moeru AI", + "license": "MIT", + "homepage": "https://xsai.js.org", + "repository": { + "type": "git", + "url": "git+https://github.com/moeru-ai/xsai.git", + "directory": "packages-ext/messages" + }, + "bugs": "https://github.com/moeru-ai/xsai/issues", + "keywords": [ + "xsai", + "anthropic", + "ai" + ], + "sideEffects": false, + "exports": "./src/index.ts", + "files": [ + "dist" + ], + "publishConfig": { + "exports": { + ".": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "./package.json": "./package.json" + }, + "main": "./dist/index.js", + "types": "./dist/index.d.ts" + }, + "scripts": { + "build": "pkgroll", + "test": "vitest run", + "test:live": "tsx scripts/live.ts" + }, + "dependencies": { + "@xsai/shared": "workspace:~", + "eventsource-parser": "catalog:" + }, + "devDependencies": { + "@standard-schema/spec": "catalog:", + "zod": "catalog:schema-dev" + } +} diff --git a/packages-ext/messages/scripts/live.ts b/packages-ext/messages/scripts/live.ts new file mode 100644 index 00000000..b503b083 --- /dev/null +++ b/packages-ext/messages/scripts/live.ts @@ -0,0 +1,311 @@ +import type { StreamingEvent } from '../src/types/streaming-event' + +import process from 'node:process' + +import { z } from 'zod' + +import { countTokens, messages, tool } from '../src' + +interface CheckResult { + details: Record + name: string + ok: boolean +} + +interface LiveConfig { + apiKey: string + baseURL: string + model: string +} + +const abortSignal = AbortSignal.timeout(60_000) + +process.on('unhandledRejection', (error) => { + console.error('\n[UNHANDLED_REJECTION]') + console.error(error) +}) + +const normalizeAnthropicBaseURL = (baseURL: string): string => { + const url = new URL(baseURL) + + if (!url.pathname.endsWith('/v1/') && !url.pathname.endsWith('/v1')) { + url.pathname = `${url.pathname.replace(/\/$/, '')}/v1/` + } + + return url.toString() +} + +const getConfig = (): LiveConfig => { + const baseURL = process.env.ANTHROPIC_BASE_URL + const apiKey = process.env.ANTHROPIC_AUTH_TOKEN + const model = process.env.ANTHROPIC_MODEL + + if (baseURL == null || apiKey == null || model == null) { + throw new Error('Missing required environment variables: ANTHROPIC_BASE_URL, ANTHROPIC_AUTH_TOKEN, ANTHROPIC_MODEL.') + } + + return { + apiKey, + baseURL: normalizeAnthropicBaseURL(baseURL), + model, + } +} + +const collectText = async (stream: ReadableStream): Promise => { + let text = '' + + for await (const chunk of stream) { + text += chunk + } + + return text +} + +const collectEvents = async (stream: ReadableStream): Promise => { + const events: StreamingEvent[] = [] + + for await (const event of stream) { + events.push(event) + } + + return events +} + +const printResult = ({ details, name, ok }: CheckResult): void => { + const status = ok ? 'PASS' : 'FAIL' + console.log(`\n[${status}] ${name}`) + console.log(JSON.stringify(details, null, 2)) +} + +const runCountTokensCheck = async (config: LiveConfig): Promise => { + const result = await countTokens({ + abortSignal, + apiKey: config.apiKey, + baseURL: config.baseURL, + messages: [{ content: 'Count tokens for this sentence.', role: 'user' }], + model: config.model, + }) + + return { + details: result, + name: 'countTokens', + ok: Number.isFinite(result.input_tokens) && result.input_tokens > 0, + } +} + +const runBasicStreamCheck = async (config: LiveConfig): Promise => { + const result = messages({ + abortSignal, + apiKey: config.apiKey, + baseURL: config.baseURL, + max_tokens: 128, + messages: [{ content: 'Reply with exactly BASIC_OK and nothing else.', role: 'user' }], + model: config.model, + temperature: 1, + }) + + const [text, reasoningText, events, steps, usage, totalUsage] = await Promise.all([ + collectText(result.textStream), + collectText(result.reasoningTextStream), + collectEvents(result.eventStream), + result.steps, + result.usage, + result.totalUsage, + ]) + + return { + details: { + eventTypes: events.map(event => event.type), + reasoningText, + stepCount: steps.length, + stopReason: steps.at(-1)?.stopReason, + text, + totalUsage, + usage, + }, + name: 'messages basic stream', + ok: text.trim() === 'BASIC_OK' && steps.length === 1, + } +} + +const runThinkingCheck = async (config: LiveConfig): Promise => { + const result = messages({ + abortSignal, + apiKey: config.apiKey, + baseURL: config.baseURL, + max_tokens: 2_048, + messages: [{ content: 'Think briefly, then answer with exactly THINKING_OK.', role: 'user' }], + model: config.model, + temperature: 1, + thinking: { + budget_tokens: 1024, + type: 'enabled', + }, + }) + + const [text, reasoningText, events, steps, usage] = await Promise.all([ + collectText(result.textStream), + collectText(result.reasoningTextStream), + collectEvents(result.eventStream), + result.steps, + result.usage, + ]) + + return { + details: { + eventTypes: events.map(event => event.type), + reasoningLength: reasoningText.length, + reasoningPreview: reasoningText.slice(0, 200), + stepCount: steps.length, + text, + usage, + }, + name: 'messages thinking stream', + ok: text.includes('THINKING_OK') && reasoningText.length > 0, + } +} + +const runToolLoopCheck = async (config: LiveConfig): Promise => { + const add = tool({ + description: 'Add two integers and return their sum as plain text.', + execute: ({ a, b }) => (a + b).toString(), + inputSchema: z.object({ + a: z.number(), + b: z.number(), + }), + name: 'add', + }) + + const result = messages({ + abortSignal, + apiKey: config.apiKey, + baseURL: config.baseURL, + max_tokens: 256, + messages: [{ content: 'You must use the add tool with a=19 and b=23. Do not compute mentally. After using the tool, answer with exactly TOOL_OK 42.', role: 'user' }], + model: config.model, + temperature: 1, + tools: [add], + }) + + const [text, events, steps, usage, totalUsage] = await Promise.all([ + collectText(result.textStream), + collectEvents(result.eventStream), + result.steps, + result.usage, + result.totalUsage, + ]) + + const eventTypes = events.map(event => event.type) + const firstMessageStopIndex = eventTypes.indexOf('message_stop') + const secondMessageStartIndex = eventTypes.indexOf('message_start', firstMessageStopIndex + 1) + + return { + details: { + eventTypes, + firstStepFinishReason: steps[0]?.finishReason, + stepCount: steps.length, + terminalOrderingOk: firstMessageStopIndex !== -1 && secondMessageStartIndex > firstMessageStopIndex, + text, + toolResults: steps[0]?.toolResults, + toolUses: steps[0]?.toolUses, + totalUsage, + usage, + }, + name: 'messages tool loop', + ok: text.includes('TOOL_OK 42') && steps.length >= 2 && steps[0]?.toolUses.length === 1, + } +} + +const runStrictToolCheck = async (config: LiveConfig): Promise => { + const add = tool({ + description: 'Add two integers and return their sum as plain text.', + execute: ({ a, b }) => (a + b).toString(), + inputSchema: z.object({ + a: z.number(), + b: z.number(), + }), + name: 'add', + strict: true, + }) + + const result = messages({ + abortSignal, + apiKey: config.apiKey, + baseURL: config.baseURL, + max_tokens: 256, + messages: [{ content: 'You must use the add tool with a=20 and b=22. Do not compute mentally. After using the tool, answer with exactly STRICT_OK 42.', role: 'user' }], + model: config.model, + temperature: 1, + tools: [add], + }) + + const [text, events, steps, usage, totalUsage] = await Promise.all([ + collectText(result.textStream), + collectEvents(result.eventStream), + result.steps, + result.usage, + result.totalUsage, + ]) + + return { + details: { + eventTypes: events.map(event => event.type), + stepCount: steps.length, + text, + toolResults: steps[0]?.toolResults, + toolUses: steps[0]?.toolUses, + totalUsage, + usage, + }, + name: 'messages strict tool loop', + ok: text.includes('STRICT_OK 42') && steps.length >= 2 && steps[0]?.toolUses.length === 1, + } +} + +const main = async (): Promise => { + const config = getConfig() + const checks = [ + runCountTokensCheck, + runBasicStreamCheck, + runThinkingCheck, + runToolLoopCheck, + runStrictToolCheck, + ] + const results: CheckResult[] = [] + + console.log(`Running live checks against ${config.baseURL} with model ${config.model}`) + + for (const check of checks) { + try { + const result = await check(config) + results.push(result) + printResult(result) + } + catch (error) { + const result: CheckResult = { + details: { + error: error instanceof Error + ? { + message: error.message, + stack: error.stack, + } + : { value: error }, + }, + name: check.name, + ok: false, + } + results.push(result) + printResult(result) + } + } + + const failedChecks = results.filter(result => !result.ok) + + console.log(`\nSummary: ${results.length - failedChecks.length}/${results.length} passed`) + + if (failedChecks.length > 0) { + process.exitCode = 1 + } +} + +await main() diff --git a/packages-ext/messages/src/index.ts b/packages-ext/messages/src/index.ts new file mode 100644 index 00000000..e78cb51c --- /dev/null +++ b/packages-ext/messages/src/index.ts @@ -0,0 +1,10 @@ +export type * from './types/anthropic-message' +export type * from './types/anthropic-tool' +export type * from './types/finish-reason' +export type * from './types/messages-options' +export type * from './types/step' +export type * from './types/streaming-event' +export type * from './types/usage' +export * from './utils/count-tokens' +export * from './utils/messages' +export * from './utils/tool' diff --git a/packages-ext/messages/src/types/anthropic-message.ts b/packages-ext/messages/src/types/anthropic-message.ts new file mode 100644 index 00000000..e30e7269 --- /dev/null +++ b/packages-ext/messages/src/types/anthropic-message.ts @@ -0,0 +1,169 @@ +import type { AnthropicCacheControl, AnthropicTool, AnthropicToolChoice } from './anthropic-tool' +import type { AnthropicStopReason } from './finish-reason' +import type { Usage } from './usage' + +export interface AnthropicAssistantMessageParam { + content: (AnthropicRedactedThinkingBlock | AnthropicTextBlockParam | AnthropicThinkingBlockParam | AnthropicToolUseBlock)[] | AnthropicRedactedThinkingBlock[] | AnthropicTextBlockParam[] | AnthropicThinkingBlockParam[] | AnthropicToolUseBlock[] | string + role: 'assistant' +} + +export interface AnthropicBase64ImageSource { + data: string + media_type: 'image/gif' | 'image/jpeg' | 'image/png' | 'image/webp' + type: 'base64' +} + +export interface AnthropicBase64PDFSource { + data: string + media_type: 'application/pdf' + type: 'base64' +} + +export type AnthropicContentBlock = AnthropicRedactedThinkingBlock | AnthropicTextBlock | AnthropicThinkingBlock | AnthropicToolUseBlock | (Record & { type: string }) + +export interface AnthropicDocumentBlockParam { + cache_control?: AnthropicCacheControl | null + citations?: { + enabled?: boolean + } + context?: string + source: AnthropicBase64PDFSource | AnthropicPlainTextSource | AnthropicURLDocumentSource + title?: string + type: 'document' +} + +export interface AnthropicImageBlockParam { + cache_control?: AnthropicCacheControl | null + source: AnthropicBase64ImageSource | AnthropicURLImageSource + type: 'image' +} + +export interface AnthropicMessage { + content: AnthropicContentBlock[] + id: string + model: string + role: 'assistant' + stop_reason: AnthropicStopReason | null + stop_sequence: null | string + type: 'message' + usage: Usage +} + +export interface AnthropicMessageCountTokensBody { + cache_control?: AnthropicCacheControl | null + messages: AnthropicMessageParam[] + model: string + system?: AnthropicSystemPrompt + thinking?: AnthropicThinkingConfig + tool_choice?: AnthropicToolChoice + tools?: AnthropicTool[] +} + +export interface AnthropicMessageCreateBody { + cache_control?: AnthropicCacheControl | null + max_tokens: number + messages: AnthropicMessageParam[] + metadata?: AnthropicMetadata + model: string + service_tier?: 'auto' | 'standard_only' + stop_sequences?: string[] + stream?: boolean + system?: AnthropicSystemPrompt + temperature?: number + thinking?: AnthropicThinkingConfig + tool_choice?: AnthropicToolChoice + tools?: AnthropicTool[] + top_k?: number + top_p?: number +} + +export type AnthropicMessageParam = AnthropicAssistantMessageParam | AnthropicUserMessageParam + +export interface AnthropicMetadata { + user_id?: null | string +} + +export interface AnthropicPlainTextSource { + data: string + media_type: 'text/plain' + type: 'text' +} + +export interface AnthropicRedactedThinkingBlock { + data: string + type: 'redacted_thinking' +} + +export type AnthropicSystemPrompt = AnthropicTextBlockParam[] | string + +export interface AnthropicTextBlock { + citations?: unknown[] + text: string + type: 'text' +} + +export interface AnthropicTextBlockParam { + cache_control?: AnthropicCacheControl | null + citations?: { + enabled?: boolean + } + text: string + type: 'text' +} + +export interface AnthropicThinkingBlock { + signature: string + thinking: string + type: 'thinking' +} + +export interface AnthropicThinkingBlockParam { + signature: string + thinking: string + type: 'thinking' +} + +export type AnthropicThinkingConfig = AnthropicThinkingConfigAdaptive | AnthropicThinkingConfigDisabled | AnthropicThinkingConfigEnabled + +export interface AnthropicThinkingConfigAdaptive { + type: 'adaptive' +} + +export interface AnthropicThinkingConfigDisabled { + type: 'disabled' +} + +export interface AnthropicThinkingConfigEnabled { + budget_tokens: number + type: 'enabled' +} + +export interface AnthropicToolResultBlockParam { + cache_control?: AnthropicCacheControl | null + content?: AnthropicTextBlockParam[] | string + is_error?: boolean + tool_use_id: string + type: 'tool_result' +} + +export interface AnthropicToolUseBlock { + id: string + input: Record + name: string + type: 'tool_use' +} + +export interface AnthropicURLDocumentSource { + type: 'url' + url: string +} + +export interface AnthropicURLImageSource { + type: 'url' + url: string +} + +export interface AnthropicUserMessageParam { + content: (AnthropicDocumentBlockParam | AnthropicImageBlockParam | AnthropicTextBlockParam | AnthropicToolResultBlockParam)[] | AnthropicDocumentBlockParam[] | AnthropicImageBlockParam[] | AnthropicTextBlockParam[] | AnthropicToolResultBlockParam[] | string + role: 'user' +} diff --git a/packages-ext/messages/src/types/anthropic-tool.ts b/packages-ext/messages/src/types/anthropic-tool.ts new file mode 100644 index 00000000..497f4547 --- /dev/null +++ b/packages-ext/messages/src/types/anthropic-tool.ts @@ -0,0 +1,50 @@ +import type { StandardJSONSchemaV1 } from '@standard-schema/spec' + +import type { AnthropicTextBlockParam } from './anthropic-message' + +export interface AnthropicCacheControl { + type: 'ephemeral' +} + +export interface AnthropicTool { + description?: string + input_schema: Record + name: string + strict?: boolean +} + +export type AnthropicToolChoice = AnthropicToolChoiceAny | AnthropicToolChoiceAuto | AnthropicToolChoiceNone | AnthropicToolChoiceTool + +export interface AnthropicToolChoiceAny { + disable_parallel_tool_use?: boolean + type: 'any' +} + +export interface AnthropicToolChoiceAuto { + disable_parallel_tool_use?: boolean + type: 'auto' +} + +export interface AnthropicToolChoiceNone { + type: 'none' +} + +export interface AnthropicToolChoiceTool { + disable_parallel_tool_use?: boolean + name: string + type: 'tool' +} + +export interface ExecutableTool extends AnthropicTool { + execute: (input: unknown) => Promise | ToolExecuteResult +} + +export type ToolExecuteResult = AnthropicTextBlockParam[] | object | string | unknown[] + +export interface ToolOptions { + description?: string + execute: (input: StandardJSONSchemaV1.InferInput) => Promise | ToolExecuteResult + inputSchema: T + name: string + strict?: boolean +} diff --git a/packages-ext/messages/src/types/finish-reason.ts b/packages-ext/messages/src/types/finish-reason.ts new file mode 100644 index 00000000..e3401769 --- /dev/null +++ b/packages-ext/messages/src/types/finish-reason.ts @@ -0,0 +1,3 @@ +export type AnthropicStopReason = 'end_turn' | 'max_tokens' | 'model_context_window_exceeded' | 'pause_turn' | 'refusal' | 'stop_sequence' | 'tool_use' | (string & {}) + +export type FinishReason = 'content_filter' | 'length' | 'other' | 'stop' | 'tool-calls' | (string & {}) diff --git a/packages-ext/messages/src/types/messages-options.ts b/packages-ext/messages/src/types/messages-options.ts new file mode 100644 index 00000000..8d2d501e --- /dev/null +++ b/packages-ext/messages/src/types/messages-options.ts @@ -0,0 +1,21 @@ +import type { AnthropicMessageCountTokensBody, AnthropicMessageCreateBody } from './anthropic-message' +import type { ExecutableTool } from './anthropic-tool' + +export interface CommonAnthropicTransportOptions { + abortSignal?: AbortSignal + anthropicBeta?: string | string[] + anthropicVersion?: string + apiKey?: string + baseURL?: string | URL + fetch?: typeof globalThis.fetch + headers?: Record +} + +export interface CountTokensOptions extends CommonAnthropicTransportOptions, Omit { + tools?: ExecutableTool[] +} + +export interface MessagesOptions extends CommonAnthropicTransportOptions, Omit { + stream?: never + tools?: ExecutableTool[] +} diff --git a/packages-ext/messages/src/types/step.ts b/packages-ext/messages/src/types/step.ts new file mode 100644 index 00000000..4a8f401d --- /dev/null +++ b/packages-ext/messages/src/types/step.ts @@ -0,0 +1,14 @@ +import type { AnthropicMessage, AnthropicToolResultBlockParam, AnthropicToolUseBlock } from './anthropic-message' +import type { AnthropicStopReason, FinishReason } from './finish-reason' +import type { Usage } from './usage' + +export interface Step { + finishReason: FinishReason + message: AnthropicMessage + reasoningText?: string + stopReason: AnthropicStopReason | null + text?: string + toolResults: AnthropicToolResultBlockParam[] + toolUses: AnthropicToolUseBlock[] + usage?: Usage +} diff --git a/packages-ext/messages/src/types/streaming-event.ts b/packages-ext/messages/src/types/streaming-event.ts new file mode 100644 index 00000000..d65e9f36 --- /dev/null +++ b/packages-ext/messages/src/types/streaming-event.ts @@ -0,0 +1,79 @@ +import type { AnthropicContentBlock, AnthropicMessage } from './anthropic-message' +import type { AnthropicStopReason } from './finish-reason' +import type { DeltaUsage } from './usage' + +export interface CitationsDelta { + citation: unknown + type: 'citations_delta' +} + +export type ContentBlockDelta = CitationsDelta | InputJsonDelta | SignatureDelta | TextDelta | ThinkingDelta + +export interface ContentBlockDeltaEvent { + delta: ContentBlockDelta + index: number + type: 'content_block_delta' +} + +export interface ContentBlockStartEvent { + content_block: AnthropicContentBlock + index: number + type: 'content_block_start' +} + +export interface ContentBlockStopEvent { + index: number + type: 'content_block_stop' +} + +export interface ErrorEvent { + error: { + message: string + type: string + } + type: 'error' +} + +export interface InputJsonDelta { + partial_json: string + type: 'input_json_delta' +} + +export interface MessageDeltaEvent { + delta: { + stop_reason: AnthropicStopReason | null + stop_sequence: null | string + } + type: 'message_delta' + usage: DeltaUsage +} + +export interface MessageStartEvent { + message: AnthropicMessage + type: 'message_start' +} + +export interface MessageStopEvent { + type: 'message_stop' +} + +export interface PingEvent { + type: 'ping' +} + +export interface SignatureDelta { + signature: string + type: 'signature_delta' +} + +export type StreamingEvent = ContentBlockDeltaEvent | ContentBlockStartEvent | ContentBlockStopEvent | ErrorEvent | MessageDeltaEvent | MessageStartEvent | MessageStopEvent | PingEvent + +export interface TextDelta { + text: string + type: 'text_delta' +} + +export interface ThinkingDelta { + thinking: string + type: 'thinking_delta' +} diff --git a/packages-ext/messages/src/types/usage.ts b/packages-ext/messages/src/types/usage.ts new file mode 100644 index 00000000..1ed07ff7 --- /dev/null +++ b/packages-ext/messages/src/types/usage.ts @@ -0,0 +1,13 @@ +export interface DeltaUsage { + cache_creation_input_tokens?: null | number + cache_read_input_tokens?: null | number + input_tokens?: null | number + output_tokens: number +} + +export interface Usage { + cache_creation_input_tokens?: number + cache_read_input_tokens?: number + input_tokens: number + output_tokens: number +} diff --git a/packages-ext/messages/src/utils/count-tokens.ts b/packages-ext/messages/src/utils/count-tokens.ts new file mode 100644 index 00000000..4e4246a8 --- /dev/null +++ b/packages-ext/messages/src/utils/count-tokens.ts @@ -0,0 +1,25 @@ +import type { CountTokensOptions } from '../types/messages-options' + +import { clean, requestURL, responseCatch, responseJSON } from '@xsai/shared' + +import { requestHeaders } from './request-headers' + +export interface CountTokensResult { + input_tokens: number +} + +export const countTokens = async (options: CountTokensOptions): Promise => { + const { abortSignal, anthropicBeta, anthropicVersion, apiKey, baseURL = 'https://api.anthropic.com/v1/', fetch, headers, tools, ...body } = options + + return (fetch ?? globalThis.fetch)(requestURL('messages/count_tokens', baseURL), { + body: JSON.stringify(clean({ + ...body, + tools: tools?.map(({ execute, ...tool }) => tool), + })), + headers: requestHeaders(headers, apiKey, anthropicVersion, anthropicBeta), + method: 'POST', + signal: abortSignal, + }) + .then(responseCatch) + .then(responseJSON) +} diff --git a/packages-ext/messages/src/utils/execute-tool.ts b/packages-ext/messages/src/utils/execute-tool.ts new file mode 100644 index 00000000..71dabdc4 --- /dev/null +++ b/packages-ext/messages/src/utils/execute-tool.ts @@ -0,0 +1,35 @@ +import type { AnthropicToolResultBlockParam, AnthropicToolUseBlock } from '../types/anthropic-message' +import type { ExecutableTool } from '../types/anthropic-tool' + +import { wrapToolResult } from './wrap-tool-result' + +export interface ExecuteToolOptions { + tools?: ExecutableTool[] + toolUse: AnthropicToolUseBlock +} + +export interface ExecuteToolResult { + toolResult: AnthropicToolResultBlockParam +} + +export const executeTool = async ({ tools, toolUse }: ExecuteToolOptions): Promise => { + const tool = tools?.find(tool => tool.name === toolUse.name) + + if (!tool) { + const availableTools = tools?.map(tool => tool.name) + const availableToolsErrorMsg = (availableTools == null || availableTools.length === 0) + ? 'No tools are available' + : `Available tools: ${availableTools.join(', ')}` + throw new Error(`Model tried to call unavailable tool "${toolUse.name}", ${availableToolsErrorMsg}.`) + } + + const toolResult = wrapToolResult(await tool.execute(toolUse.input)) + + return { + toolResult: { + content: toolResult, + tool_use_id: toolUse.id, + type: 'tool_result', + }, + } +} diff --git a/packages-ext/messages/src/utils/extract-message-parts.ts b/packages-ext/messages/src/utils/extract-message-parts.ts new file mode 100644 index 00000000..2547b4b6 --- /dev/null +++ b/packages-ext/messages/src/utils/extract-message-parts.ts @@ -0,0 +1,22 @@ +import type { AnthropicContentBlock, AnthropicMessage, AnthropicThinkingBlock, AnthropicToolUseBlock } from '../types/anthropic-message' + +export const getToolUses = (message: AnthropicMessage): AnthropicToolUseBlock[] => + message.content.filter((content): content is AnthropicToolUseBlock => content.type === 'tool_use') + +export const getText = (message: AnthropicMessage): string | undefined => { + const text = message.content + .filter((content): content is Extract => content.type === 'text') + .map(content => content.text) + .join('') + + return text.length > 0 ? text : undefined +} + +export const getReasoningText = (message: AnthropicMessage): string | undefined => { + const reasoningText = message.content + .filter((content): content is AnthropicThinkingBlock => content.type === 'thinking') + .map(content => content.thinking) + .join('') + + return reasoningText.length > 0 ? reasoningText : undefined +} diff --git a/packages-ext/messages/src/utils/map-stop-reason.ts b/packages-ext/messages/src/utils/map-stop-reason.ts new file mode 100644 index 00000000..6d31f8e3 --- /dev/null +++ b/packages-ext/messages/src/utils/map-stop-reason.ts @@ -0,0 +1,19 @@ +import type { AnthropicStopReason, FinishReason } from '../types/finish-reason' + +const stopReasonMap = new Map([ + ['end_turn', 'stop'], + ['max_tokens', 'length'], + ['model_context_window_exceeded', 'length'], + ['pause_turn', 'stop'], + ['refusal', 'content_filter'], + ['stop_sequence', 'stop'], + ['tool_use', 'tool-calls'], +]) + +// eslint-disable-next-line sonarjs/function-return-type +export const mapStopReason = (stopReason?: AnthropicStopReason | null): FinishReason => { + if (stopReason == null) + return 'other' + + return stopReasonMap.get(stopReason) ?? 'other' +} diff --git a/packages-ext/messages/src/utils/messages.ts b/packages-ext/messages/src/utils/messages.ts new file mode 100644 index 00000000..bdba5d3b --- /dev/null +++ b/packages-ext/messages/src/utils/messages.ts @@ -0,0 +1,433 @@ +import type { AnthropicContentBlock, AnthropicMessage, AnthropicMessageParam, AnthropicTextBlock, AnthropicThinkingBlock, AnthropicToolResultBlockParam, AnthropicToolUseBlock } from '../types/anthropic-message' +import type { MessagesOptions } from '../types/messages-options' +import type { Step } from '../types/step' +import type { ContentBlockDelta, ContentBlockDeltaEvent, ContentBlockStartEvent, MessageDeltaEvent, MessageStartEvent, StreamingEvent } from '../types/streaming-event' +import type { DeltaUsage, Usage } from '../types/usage' + +import { clean, DelayedPromise, requestURL, responseCatch } from '@xsai/shared' +import { EventSourceParserStream } from 'eventsource-parser/stream' + +import { executeTool } from './execute-tool' +import { getReasoningText, getText, getToolUses } from './extract-message-parts' +import { mapStopReason } from './map-stop-reason' +import { requestHeaders } from './request-headers' +import { StreamingEventParserStream } from './streaming-event-parser-stream' + +export interface MessagesResult { + eventStream: ReadableStream + reasoningTextStream: ReadableStream + steps: Promise + textStream: ReadableStream + totalUsage: Promise + usage: Promise +} + +type ContentBlockState = AnthropicContentBlock | ToolUseBlockState + +interface StepState { + contentBlocks: Array + message: AnthropicMessage + stopReason: AnthropicMessage['stop_reason'] + stopSequence: AnthropicMessage['stop_sequence'] + usage?: Usage +} + +interface ToolUseBlockState extends AnthropicToolUseBlock { + inputText: string +} + +const mergeUsage = (prev: undefined | Usage, next: DeltaUsage | undefined | Usage): undefined | Usage => { + if (next == null) + return prev + + return { + ...(prev?.cache_creation_input_tokens != null || next.cache_creation_input_tokens != null + ? { cache_creation_input_tokens: next.cache_creation_input_tokens ?? prev?.cache_creation_input_tokens ?? 0 } + : {}), + ...(prev?.cache_read_input_tokens != null || next.cache_read_input_tokens != null + ? { cache_read_input_tokens: next.cache_read_input_tokens ?? prev?.cache_read_input_tokens ?? 0 } + : {}), + input_tokens: next.input_tokens ?? prev?.input_tokens ?? 0, + output_tokens: next.output_tokens ?? prev?.output_tokens ?? 0, + } +} + +const addUsage = (total: undefined | Usage, next: undefined | Usage): undefined | Usage => { + if (next == null) + return total + + return total == null + ? next + : { + ...(total.cache_creation_input_tokens != null || next.cache_creation_input_tokens != null + ? { cache_creation_input_tokens: (total.cache_creation_input_tokens ?? 0) + (next.cache_creation_input_tokens ?? 0) } + : {}), + ...(total.cache_read_input_tokens != null || next.cache_read_input_tokens != null + ? { cache_read_input_tokens: (total.cache_read_input_tokens ?? 0) + (next.cache_read_input_tokens ?? 0) } + : {}), + input_tokens: total.input_tokens + next.input_tokens, + output_tokens: total.output_tokens + next.output_tokens, + } +} + +const isTextBlock = (content: AnthropicContentBlock): content is AnthropicTextBlock => content.type === 'text' +const isThinkingBlock = (content: AnthropicContentBlock): content is AnthropicThinkingBlock => content.type === 'thinking' +const isToolUseBlockState = (content: ContentBlockState | undefined): content is ToolUseBlockState => content?.type === 'tool_use' && 'inputText' in content + +const requireCurrentStep = (currentStep: StepState | undefined, eventType: StreamingEvent['type']): StepState => { + if (currentStep == null) + throw new Error(`Received ${eventType} before message_start.`) + + return currentStep +} + +const createContentBlockState = (contentBlock: AnthropicContentBlock): ContentBlockState => contentBlock.type === 'tool_use' + ? { + ...contentBlock, + inputText: Object.keys(contentBlock.input).length > 0 ? JSON.stringify(contentBlock.input) : '', + } + : contentBlock + +const applyContentBlockDelta = (contentBlock: ContentBlockState, delta: ContentBlockDelta): void => { + if (delta.type === 'text_delta') { + if (isTextBlock(contentBlock)) + contentBlock.text += delta.text + + return + } + + if (delta.type === 'thinking_delta') { + if (isThinkingBlock(contentBlock)) + contentBlock.thinking += delta.thinking + + return + } + + if (delta.type === 'signature_delta') { + if (isThinkingBlock(contentBlock)) + contentBlock.signature = delta.signature + + return + } + + if (delta.type === 'input_json_delta' && isToolUseBlockState(contentBlock)) + contentBlock.inputText += delta.partial_json +} + +const handleContentBlockStart = (currentStep: StepState | undefined, event: ContentBlockStartEvent): void => { + const step = requireCurrentStep(currentStep, event.type) + step.contentBlocks[event.index] = createContentBlockState(structuredClone(event.content_block)) +} + +const handleContentBlockDelta = (currentStep: StepState | undefined, event: ContentBlockDeltaEvent): void => { + const step = requireCurrentStep(currentStep, event.type) + const contentBlock = step.contentBlocks[event.index] + + if (contentBlock == null) + throw new Error(`Missing content block for index ${event.index}.`) + + applyContentBlockDelta(contentBlock, event.delta) +} + +const handleMessageStart = (event: MessageStartEvent): StepState => ({ + contentBlocks: [], + message: structuredClone(event.message), + stopReason: event.message.stop_reason, + stopSequence: event.message.stop_sequence, + usage: mergeUsage(undefined, event.message.usage), +}) + +const handleMessageDelta = (currentStep: StepState | undefined, event: MessageDeltaEvent): void => { + const step = requireCurrentStep(currentStep, event.type) + step.stopReason = event.delta.stop_reason + step.stopSequence = event.delta.stop_sequence + step.usage = mergeUsage(step.usage, event.usage) +} + +const parseToolUseInput = (toolUse: ToolUseBlockState): AnthropicToolUseBlock => { + const inputText = toolUse.inputText.trim() + + try { + return { + id: toolUse.id, + input: JSON.parse(inputText.length > 0 ? inputText : '{}') as Record, + name: toolUse.name, + type: 'tool_use', + } + } + catch (error) { + throw new Error(`Failed to parse tool input as JSON for tool "${toolUse.name}". Input: ${toolUse.inputText}`, { + cause: error, + }) + } +} + +const createAssistantMessageParam = (message: AnthropicMessage): AnthropicMessageParam => ({ + content: message.content, + role: 'assistant', +}) + +const finalizeCurrentMessage = (currentStep: StepState | undefined): AnthropicMessage => { + const step = requireCurrentStep(currentStep, 'message_stop') + const content = step.contentBlocks + .filter((block): block is ContentBlockState => block != null) + .map((block): AnthropicContentBlock => isToolUseBlockState(block) ? parseToolUseInput(block) : block) + + return { + ...step.message, + content, + stop_reason: step.stopReason, + stop_sequence: step.stopSequence, + usage: step.usage ?? step.message.usage, + } +} + +const createStep = (message: AnthropicMessage, toolResults: AnthropicToolResultBlockParam[]): Step => ({ + finishReason: mapStopReason(message.stop_reason), + message, + reasoningText: getReasoningText(message), + stopReason: message.stop_reason, + text: getText(message), + toolResults, + toolUses: getToolUses(message), + usage: message.usage, +}) + +interface HandleMessageStopOptions { + conversation: AnthropicMessageParam[] + currentStep: StepState | undefined + steps: Step[] + tools: MessagesOptions['tools'] + totalUsage: undefined | Usage +} + +interface ProcessEventResult { + currentStep: StepState | undefined + shouldContinue: boolean + totalUsage: undefined | Usage + usage: undefined | Usage +} + +const handleMessageStop = async ({ conversation, currentStep, steps, tools, totalUsage }: HandleMessageStopOptions): Promise => { + const message = finalizeCurrentMessage(currentStep) + const stepUsage = message.usage + const nextUsage = stepUsage + const nextTotalUsage = addUsage(totalUsage, stepUsage) + const step = createStep(message, []) + + conversation.push(createAssistantMessageParam(message)) + steps.push(step) + + const shouldContinue = message.stop_reason === 'tool_use' && step.toolUses.length > 0 + + if (shouldContinue) { + const results = await Promise.all( + step.toolUses.map(async toolUse => executeTool({ tools, toolUse })), + ) + + for (const { toolResult } of results) { + step.toolResults.push(toolResult) + } + + conversation.push({ + content: step.toolResults, + role: 'user', + }) + } + + return { + currentStep: undefined, + shouldContinue, + totalUsage: nextTotalUsage, + usage: nextUsage, + } +} + +interface ProcessEventOptions { + conversation: AnthropicMessageParam[] + currentStep: StepState | undefined + event: StreamingEvent + steps: Step[] + tools: MessagesOptions['tools'] + totalUsage: undefined | Usage + usage: undefined | Usage +} + +const processEvent = async ({ conversation, currentStep, event, steps, tools, totalUsage, usage }: ProcessEventOptions): Promise => { + switch (event.type) { + case 'content_block_delta': { + handleContentBlockDelta(currentStep, event) + return { currentStep, shouldContinue: false, totalUsage, usage } + } + case 'content_block_start': { + handleContentBlockStart(currentStep, event) + return { currentStep, shouldContinue: false, totalUsage, usage } + } + case 'content_block_stop': + case 'ping': { + return { currentStep, shouldContinue: false, totalUsage, usage } + } + case 'error': { + throw new Error(`Anthropic stream error: ${event.error.message}`) + } + case 'message_start': { + return { + currentStep: handleMessageStart(event), + shouldContinue: false, + totalUsage, + usage, + } + } + case 'message_delta': { + handleMessageDelta(currentStep, event) + return { currentStep, shouldContinue: false, totalUsage, usage } + } + case 'message_stop': { + return handleMessageStop({ + conversation, + currentStep, + steps, + tools, + totalUsage, + }) + } + } +} + +export const messages = (options: MessagesOptions): MessagesResult => { + const { abortSignal, anthropicBeta, anthropicVersion, apiKey, baseURL = 'https://api.anthropic.com/v1/', fetch, headers, tools, ...body } = options + + const conversation = structuredClone(options.messages) + const steps: Step[] = [] + let usage: undefined | Usage + let totalUsage: undefined | Usage + + const resultSteps = new DelayedPromise() + const resultUsage = new DelayedPromise() + const resultTotalUsage = new DelayedPromise() + + const stepsPromise = resultSteps.promise + const usagePromise = resultUsage.promise + const totalUsagePromise = resultTotalUsage.promise + + // Avoid unhandled rejections when callers only consume the streams. + stepsPromise.catch(() => {}) + usagePromise.catch(() => {}) + totalUsagePromise.catch(() => {}) + + let currentStep: StepState | undefined + + const createReader = async () => { + const res = await (fetch ?? globalThis.fetch)(requestURL('messages', baseURL), { + body: JSON.stringify(clean({ + ...body, + messages: conversation, + stream: true, + tools: tools?.map(({ execute, ...tool }) => tool), + })), + headers: requestHeaders(headers, apiKey, anthropicVersion, anthropicBeta), + method: 'POST', + signal: abortSignal, + }).then(responseCatch) + + return res.body! + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .pipeThrough(new StreamingEventParserStream()) + .getReader() + } + + let reader: ReadableStreamDefaultReader | undefined + + const mainStream = new ReadableStream({ + cancel: async () => { + await reader?.cancel() + resultSteps.resolve(steps) + resultUsage.resolve(usage) + resultTotalUsage.resolve(totalUsage) + }, + pull: async (controller) => { + if (reader == null) + return controller.close() + + try { + const { done, value: event } = await reader.read() + + if (done) { + resultSteps.resolve(steps) + resultUsage.resolve(usage) + resultTotalUsage.resolve(totalUsage) + return controller.close() + } + + const result = await processEvent({ + conversation, + currentStep, + event, + steps, + tools, + totalUsage, + usage, + }) + + currentStep = result.currentStep + usage = result.usage + totalUsage = result.totalUsage + + controller.enqueue(event) + + if (result.shouldContinue) { + reader.releaseLock() + reader = await createReader() + } + } + catch (err) { + resultSteps.reject(err) + resultUsage.reject(err) + resultTotalUsage.reject(err) + controller.error(err) + } + }, + start: async (controller) => { + try { + reader = await createReader() + } + catch (err) { + resultSteps.reject(err) + resultUsage.reject(err) + resultTotalUsage.reject(err) + controller.error(err) + } + }, + }) + + const [eventStream, streamA] = mainStream.tee() + const [streamB, streamC] = streamA.tee() + + const textStream = streamB.pipeThrough(new TransformStream({ + transform: (event, controller) => { + if (event.type !== 'content_block_delta' || event.delta.type !== 'text_delta') + return + + controller.enqueue(event.delta.text) + }, + })) + + const reasoningTextStream = streamC.pipeThrough(new TransformStream({ + transform: (event, controller) => { + if (event.type !== 'content_block_delta' || event.delta.type !== 'thinking_delta') + return + + controller.enqueue(event.delta.thinking) + }, + })) + + return { + eventStream, + reasoningTextStream, + steps: stepsPromise, + textStream, + totalUsage: totalUsagePromise, + usage: usagePromise, + } +} diff --git a/packages-ext/messages/src/utils/request-headers.ts b/packages-ext/messages/src/utils/request-headers.ts new file mode 100644 index 00000000..b1773f67 --- /dev/null +++ b/packages-ext/messages/src/utils/request-headers.ts @@ -0,0 +1,16 @@ +import { clean } from '@xsai/shared' + +export const requestHeaders = ( + headers?: Record, + apiKey?: string, + anthropicVersion = '2023-06-01', + anthropicBeta?: string | string[], +) => clean({ + 'anthropic-beta': Array.isArray(anthropicBeta) + ? anthropicBeta.join(',') + : anthropicBeta, + 'anthropic-version': anthropicVersion, + 'content-type': 'application/json', + 'x-api-key': apiKey, + ...headers, +}) diff --git a/packages-ext/messages/src/utils/streaming-event-parser-stream.ts b/packages-ext/messages/src/utils/streaming-event-parser-stream.ts new file mode 100644 index 00000000..91d33cb5 --- /dev/null +++ b/packages-ext/messages/src/utils/streaming-event-parser-stream.ts @@ -0,0 +1,17 @@ +import type { EventSourceMessage } from 'eventsource-parser/stream' + +import type { StreamingEvent } from '../types/streaming-event' + +export class StreamingEventParserStream extends TransformStream { + constructor() { + super({ + transform: async (chunk, controller) => { + if (chunk.data === '[DONE]' || chunk.data.length === 0) + return + + const event = JSON.parse(chunk.data) as StreamingEvent + controller.enqueue(event) + }, + }) + } +} diff --git a/packages-ext/messages/src/utils/tool.ts b/packages-ext/messages/src/utils/tool.ts new file mode 100644 index 00000000..302f8bad --- /dev/null +++ b/packages-ext/messages/src/utils/tool.ts @@ -0,0 +1,12 @@ +import type { StandardJSONSchemaV1 } from '@standard-schema/spec' + +import type { ExecutableTool, ToolOptions } from '../types/anthropic-tool' + +/** @experimental */ +export const tool = ({ description, execute, inputSchema, name, strict }: ToolOptions): ExecutableTool => ({ + description, + execute, + input_schema: inputSchema['~standard'].jsonSchema.input({ target: 'draft-07' }), + name, + strict, +}) diff --git a/packages-ext/messages/src/utils/wrap-tool-result.ts b/packages-ext/messages/src/utils/wrap-tool-result.ts new file mode 100644 index 00000000..5291e045 --- /dev/null +++ b/packages-ext/messages/src/utils/wrap-tool-result.ts @@ -0,0 +1,20 @@ +import type { AnthropicTextBlockParam } from '../types/anthropic-message' +import type { ToolExecuteResult } from '../types/anthropic-tool' + +const isTextBlockParam = (value: unknown): value is AnthropicTextBlockParam => + typeof value === 'object' + && value != null + && 'type' in value + && 'text' in value + && (value as { type: unknown }).type === 'text' + +// eslint-disable-next-line sonarjs/function-return-type +export const wrapToolResult = (result: ToolExecuteResult): AnthropicTextBlockParam[] | string => { + if (typeof result === 'string') + return result + + if (Array.isArray(result) && result.every(isTextBlockParam)) + return result + + return JSON.stringify(result) +} diff --git a/packages-ext/messages/test/count-tokens.test.ts b/packages-ext/messages/test/count-tokens.test.ts new file mode 100644 index 00000000..7062d9be --- /dev/null +++ b/packages-ext/messages/test/count-tokens.test.ts @@ -0,0 +1,55 @@ +import { describe, expect, it, vi } from 'vitest' +import { z } from 'zod' + +import { countTokens, tool } from '../src' +import { createJSONResponse } from './utils' + +describe('@xsai-ext/messages countTokens', async () => { + it('serializes requests and strips executable tool fields', async () => { + const lookup = tool({ + description: 'Look up a value', + execute: ({ key }) => `value for ${key}`, + inputSchema: z.object({ + key: z.string(), + }), + name: 'lookup', + }) + + const fetch = vi.fn().mockResolvedValue(createJSONResponse({ input_tokens: 42 })) + + await expect(countTokens({ + anthropicBeta: ['tools-2024-04-04'], + apiKey: 'test-key', + fetch, + messages: [{ content: 'Hello', role: 'user' }], + model: 'claude-sonnet-4-5', + tools: [lookup], + })).resolves.toEqual({ input_tokens: 42 }) + + expect(fetch).toHaveBeenCalledTimes(1) + + const [url, init] = fetch.mock.calls[0] as [URL, RequestInit] + expect(url.toString()).toBe('https://api.anthropic.com/v1/messages/count_tokens') + expect(init.headers).toMatchObject({ + 'anthropic-beta': 'tools-2024-04-04', + 'anthropic-version': '2023-06-01', + 'x-api-key': 'test-key', + }) + + const body = JSON.parse(init.body as string) as { tools: Array> } + expect(body.tools).toEqual([ + { + description: 'Look up a value', + input_schema: { + $schema: 'http://json-schema.org/draft-07/schema#', + properties: { + key: { type: 'string' }, + }, + required: ['key'], + type: 'object', + }, + name: 'lookup', + }, + ]) + }) +}) diff --git a/packages-ext/messages/test/index.test.ts b/packages-ext/messages/test/index.test.ts new file mode 100644 index 00000000..b4b74071 --- /dev/null +++ b/packages-ext/messages/test/index.test.ts @@ -0,0 +1,442 @@ +import type { StreamingEvent } from '../src/types/streaming-event' + +import { describe, expect, it, vi } from 'vitest' +import { z } from 'zod' + +import { messages, tool } from '../src' +import { createEventStreamResponse } from './utils' + +describe('@xsai-ext/messages basic', async () => { + it('streams text, thinking, steps, and usage', async () => { + const fetch = vi.fn().mockResolvedValue(createEventStreamResponse([ + { + message: { + content: [], + id: 'msg_1', + model: 'claude-sonnet-4-5', + role: 'assistant', + stop_reason: null, + stop_sequence: null, + type: 'message', + usage: { + input_tokens: 7, + output_tokens: 0, + }, + }, + type: 'message_start', + }, + { + content_block: { + signature: '', + thinking: '', + type: 'thinking', + }, + index: 0, + type: 'content_block_start', + }, + { + delta: { + thinking: 'I should answer briefly. ', + type: 'thinking_delta', + }, + index: 0, + type: 'content_block_delta', + }, + { + delta: { + signature: 'sig_1', + type: 'signature_delta', + }, + index: 0, + type: 'content_block_delta', + }, + { + index: 0, + type: 'content_block_stop', + }, + { + content_block: { + text: '', + type: 'text', + }, + index: 1, + type: 'content_block_start', + }, + { + delta: { + text: 'Hello', + type: 'text_delta', + }, + index: 1, + type: 'content_block_delta', + }, + { + delta: { + text: '!', + type: 'text_delta', + }, + index: 1, + type: 'content_block_delta', + }, + { + index: 1, + type: 'content_block_stop', + }, + { + delta: { + stop_reason: 'end_turn', + stop_sequence: null, + }, + type: 'message_delta', + usage: { + cache_creation_input_tokens: null, + cache_read_input_tokens: null, + input_tokens: null, + output_tokens: 2, + }, + }, + { + type: 'message_stop', + }, + ])) + + const { eventStream, reasoningTextStream, steps, textStream, totalUsage, usage } = messages({ + fetch, + max_tokens: 128, + messages: [{ content: 'Hello?', role: 'user' }], + model: 'claude-sonnet-4-5', + }) + + let text = '' + for await (const chunk of textStream) { + text += chunk + } + + let reasoningText = '' + for await (const chunk of reasoningTextStream) { + reasoningText += chunk + } + + const events: StreamingEvent[] = [] + for await (const event of eventStream) { + events.push(event) + } + + expect(text).toBe('Hello!') + expect(reasoningText).toBe('I should answer briefly. ') + expect(events.map(event => event.type)).toEqual([ + 'message_start', + 'content_block_start', + 'content_block_delta', + 'content_block_delta', + 'content_block_stop', + 'content_block_start', + 'content_block_delta', + 'content_block_delta', + 'content_block_stop', + 'message_delta', + 'message_stop', + ]) + + await expect(steps).resolves.toEqual([ + { + finishReason: 'stop', + message: { + content: [ + { + signature: 'sig_1', + thinking: 'I should answer briefly. ', + type: 'thinking', + }, + { + text: 'Hello!', + type: 'text', + }, + ], + id: 'msg_1', + model: 'claude-sonnet-4-5', + role: 'assistant', + stop_reason: 'end_turn', + stop_sequence: null, + type: 'message', + usage: { + input_tokens: 7, + output_tokens: 2, + }, + }, + reasoningText: 'I should answer briefly. ', + stopReason: 'end_turn', + text: 'Hello!', + toolResults: [], + toolUses: [], + usage: { + input_tokens: 7, + output_tokens: 2, + }, + }, + ]) + await expect(usage).resolves.toEqual({ input_tokens: 7, output_tokens: 2 }) + await expect(totalUsage).resolves.toEqual({ input_tokens: 7, output_tokens: 2 }) + }) + + it('rejects result promises when the initial stream setup fails', async () => { + const error = new Error('boom') + const fetch = vi.fn().mockRejectedValue(error) + + const { eventStream, steps, totalUsage, usage } = messages({ + fetch, + max_tokens: 128, + messages: [{ content: 'Hello?', role: 'user' }], + model: 'claude-sonnet-4-5', + }) + + await expect(eventStream.getReader().read()).rejects.toThrow(error) + await expect(steps).rejects.toThrow(error) + await expect(usage).rejects.toThrow(error) + await expect(totalUsage).rejects.toThrow(error) + }) + + it('does not emit unhandled rejections when auxiliary promises are ignored', async () => { + const error = new Error('boom') + const fetch = vi.fn().mockRejectedValue(error) + const onUnhandledRejection = vi.fn() + + process.on('unhandledRejection', onUnhandledRejection) + + try { + const { textStream } = messages({ + fetch, + max_tokens: 128, + messages: [{ content: 'Hello?', role: 'user' }], + model: 'claude-sonnet-4-5', + }) + + await expect((async () => { + for await (const chunk of textStream) { + void chunk + } + })()).rejects.toThrow(error) + + await Promise.resolve() + await new Promise(resolve => setTimeout(resolve, 0)) + + expect(onUnhandledRejection).not.toHaveBeenCalled() + } + finally { + process.off('unhandledRejection', onUnhandledRejection) + } + }) +}) + +describe('@xsai-ext/messages tool', async () => { + it('emits message_stop before starting the next tool step', async () => { + const add = tool({ + description: 'Adds two numbers', + execute: ({ a, b }) => (Number.parseInt(a, 10) + Number.parseInt(b, 10)).toString(), + inputSchema: z.object({ + a: z.string(), + b: z.string(), + }), + name: 'add', + }) + + const fetch = vi.fn() + .mockResolvedValueOnce(createEventStreamResponse([ + { + message: { + content: [], + id: 'msg_1', + model: 'claude-sonnet-4-5', + role: 'assistant', + stop_reason: null, + stop_sequence: null, + type: 'message', + usage: { + input_tokens: 10, + output_tokens: 0, + }, + }, + type: 'message_start', + }, + { + content_block: { + id: 'toolu_1', + input: {}, + name: 'add', + type: 'tool_use', + }, + index: 0, + type: 'content_block_start', + }, + { + delta: { + partial_json: '{"a":"1"', + type: 'input_json_delta', + }, + index: 0, + type: 'content_block_delta', + }, + { + delta: { + partial_json: ',"b":"2"}', + type: 'input_json_delta', + }, + index: 0, + type: 'content_block_delta', + }, + { + index: 0, + type: 'content_block_stop', + }, + { + delta: { + stop_reason: 'tool_use', + stop_sequence: null, + }, + type: 'message_delta', + usage: { + cache_creation_input_tokens: null, + cache_read_input_tokens: null, + input_tokens: null, + output_tokens: 5, + }, + }, + { + type: 'message_stop', + }, + ])) + .mockResolvedValueOnce(createEventStreamResponse([ + { + message: { + content: [], + id: 'msg_2', + model: 'claude-sonnet-4-5', + role: 'assistant', + stop_reason: null, + stop_sequence: null, + type: 'message', + usage: { + input_tokens: 13, + output_tokens: 0, + }, + }, + type: 'message_start', + }, + { + content_block: { + text: '', + type: 'text', + }, + index: 0, + type: 'content_block_start', + }, + { + delta: { + text: '3', + type: 'text_delta', + }, + index: 0, + type: 'content_block_delta', + }, + { + index: 0, + type: 'content_block_stop', + }, + { + delta: { + stop_reason: 'end_turn', + stop_sequence: null, + }, + type: 'message_delta', + usage: { + cache_creation_input_tokens: null, + cache_read_input_tokens: null, + input_tokens: null, + output_tokens: 1, + }, + }, + { + type: 'message_stop', + }, + ])) + + const { eventStream, steps, totalUsage, usage } = messages({ + fetch, + max_tokens: 128, + messages: [{ content: 'Please use the tool.', role: 'user' }], + model: 'claude-sonnet-4-5', + tool_choice: { type: 'any' }, + tools: [add], + }) + + const events: StreamingEvent[] = [] + for await (const event of eventStream) { + events.push(event) + } + + expect(events.map(event => event.type)).toEqual([ + 'message_start', + 'content_block_start', + 'content_block_delta', + 'content_block_delta', + 'content_block_stop', + 'message_delta', + 'message_stop', + 'message_start', + 'content_block_start', + 'content_block_delta', + 'content_block_stop', + 'message_delta', + 'message_stop', + ]) + + await expect(steps).resolves.toMatchObject([ + { + finishReason: 'tool-calls', + stopReason: 'tool_use', + toolResults: [ + { + content: '3', + tool_use_id: 'toolu_1', + type: 'tool_result', + }, + ], + toolUses: [ + { + id: 'toolu_1', + input: { + a: '1', + b: '2', + }, + name: 'add', + type: 'tool_use', + }, + ], + }, + { + finishReason: 'stop', + stopReason: 'end_turn', + text: '3', + }, + ]) + await expect(usage).resolves.toEqual({ input_tokens: 13, output_tokens: 1 }) + await expect(totalUsage).resolves.toEqual({ input_tokens: 23, output_tokens: 6 }) + expect(fetch).toHaveBeenCalledTimes(2) + + const secondRequestInit = fetch.mock.calls[1]?.[1] as RequestInit + const secondRequestBody = JSON.parse(secondRequestInit.body as string) as { + messages: Array<{ content: unknown, role: string }> + } + + expect(secondRequestBody.messages.at(-1)).toEqual({ + content: [ + { + content: '3', + tool_use_id: 'toolu_1', + type: 'tool_result', + }, + ], + role: 'user', + }) + }) +}) diff --git a/packages-ext/messages/test/streaming-event-parser-stream.test.ts b/packages-ext/messages/test/streaming-event-parser-stream.test.ts new file mode 100644 index 00000000..e77a7f80 --- /dev/null +++ b/packages-ext/messages/test/streaming-event-parser-stream.test.ts @@ -0,0 +1,36 @@ +import type { EventSourceMessage } from 'eventsource-parser/stream' + +import { describe, expect, it } from 'vitest' + +import { StreamingEventParserStream } from '../src/utils/streaming-event-parser-stream' + +const collect = async (stream: ReadableStream) => { + const events: unknown[] = [] + + for await (const event of stream.pipeThrough(new StreamingEventParserStream())) { + events.push(event) + } + + return events +} + +const createEventSourceMessageStream = (messages: EventSourceMessage[]): ReadableStream => new ReadableStream({ + start: (controller) => { + for (const message of messages) { + controller.enqueue(message) + } + + controller.close() + }, +}) + +describe('@xsai-ext/messages StreamingEventParserStream', async () => { + it('ignores [DONE] sentinels', async () => { + const stream = createEventSourceMessageStream([ + { data: '{"type":"ping"}', event: '', id: undefined }, + { data: '[DONE]', event: '', id: undefined }, + ]) + + await expect(collect(stream)).resolves.toEqual([{ type: 'ping' }]) + }) +}) diff --git a/packages-ext/messages/test/tool.test.ts b/packages-ext/messages/test/tool.test.ts new file mode 100644 index 00000000..44977e24 --- /dev/null +++ b/packages-ext/messages/test/tool.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from 'vitest' +import { z } from 'zod' + +import { tool } from '../src' + +describe('@xsai-ext/messages tool helper', async () => { + it('builds anthropic tool definitions', async () => { + const weather = tool({ + description: 'Get the weather in a location', + execute: ({ location }) => `Sunny in ${location}`, + inputSchema: z.object({ + location: z.string().describe('City name'), + }), + name: 'weather', + }) + + expect(weather).toMatchObject({ + description: 'Get the weather in a location', + input_schema: { + properties: { + location: { + description: 'City name', + type: 'string', + }, + }, + required: ['location'], + type: 'object', + }, + name: 'weather', + }) + + expect(await weather.execute({ location: 'Shanghai' })).toBe('Sunny in Shanghai') + }) +}) diff --git a/packages-ext/messages/test/utils.ts b/packages-ext/messages/test/utils.ts new file mode 100644 index 00000000..14c3a187 --- /dev/null +++ b/packages-ext/messages/test/utils.ts @@ -0,0 +1,25 @@ +export const createEventStreamResponse = (events: unknown[]): Response => { + const encoder = new TextEncoder() + + return new Response(new ReadableStream({ + start: (controller) => { + for (const event of events) { + controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`)) + } + + controller.close() + }, + }), { + headers: { + 'Content-Type': 'text/event-stream', + }, + status: 200, + }) +} + +export const createJSONResponse = (body: unknown): Response => new Response(JSON.stringify(body), { + headers: { + 'Content-Type': 'application/json', + }, + status: 200, +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f3b6e3c0..9f35ddc0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -411,6 +411,22 @@ importers: specifier: catalog:schema version: 4.2.1 + packages-ext/messages: + dependencies: + '@xsai/shared': + specifier: workspace:~ + version: link:../../packages/shared + eventsource-parser: + specifier: 'catalog:' + version: 3.0.6 + devDependencies: + '@standard-schema/spec': + specifier: 'catalog:' + version: 1.1.0 + zod: + specifier: catalog:schema-dev + version: 4.2.1 + packages-ext/providers: dependencies: '@xsai/shared': diff --git a/skills/xsai/agents/openai.yaml b/skills/xsai/agents/openai.yaml index 9ed67d64..66394202 100644 --- a/skills/xsai/agents/openai.yaml +++ b/skills/xsai/agents/openai.yaml @@ -1,7 +1,7 @@ interface: - display_name: "xsAI" - short_description: "Use xsAI for minimal OpenAI-compatible code" - default_prompt: "Use $xsai to choose the smallest xsAI package and build a runnable OpenAI-compatible example." + display_name: xsAI + short_description: Use xsAI for minimal OpenAI-compatible code + default_prompt: Use $xsai to choose the smallest xsAI package and build a runnable OpenAI-compatible example. policy: allow_implicit_invocation: true diff --git a/skills/xsai/references/recipes.md b/skills/xsai/references/recipes.md index 1c7e382b..81500cf9 100644 --- a/skills/xsai/references/recipes.md +++ b/skills/xsai/references/recipes.md @@ -13,9 +13,10 @@ Use this reference when the user wants code, when you are editing xsAI code, or ## Minimal text generation ```ts -import { generateText } from '@xsai/generate-text' import { env } from 'node:process' +import { generateText } from '@xsai/generate-text' + const { text } = await generateText({ apiKey: env.OPENAI_API_KEY!, baseURL: 'https://api.openai.com/v1/', @@ -38,9 +39,11 @@ Use this as the default starting point for simple scripts, tests, and one-shot h ## Streaming text with tools ```ts +import { env } from 'node:process' + import { streamText } from '@xsai/stream-text' import { tool } from '@xsai/tool' -import { env } from 'node:process' + import * as v from 'valibot' const add = await tool({ @@ -94,8 +97,10 @@ Use `textStream` for plain live text. Use `fullStream` when the caller needs too Valibot examples require `@valibot/to-json-schema` in the project. If the repo already uses a different supported schema library, keep that choice. ```ts -import { generateObject } from '@xsai/generate-object' import { env } from 'node:process' + +import { generateObject } from '@xsai/generate-object' + import * as v from 'valibot' const { object } = await generateObject({ @@ -125,8 +130,10 @@ Prefer this over asking the model for free-form JSON. ## Streaming structured output ```ts -import { streamObject } from '@xsai/stream-object' import { env } from 'node:process' + +import { streamObject } from '@xsai/stream-object' + import * as v from 'valibot' const { partialObjectStream } = await streamObject({