From 8331607ee8f6677056999e42bd6c851475fb23b7 Mon Sep 17 00:00:00 2001 From: Owen Ko Date: Thu, 2 Apr 2026 09:48:52 +0800 Subject: [PATCH] Fix Opik traces missing after plugin registry reloads --- src/plugin.smoke.test.ts | 1 + src/service.test.ts | 22 ++- src/service.ts | 280 +++++++++++++++++++------------------- src/service/hooks/llm.ts | 9 +- src/service/hooks/tool.ts | 6 +- 5 files changed, 168 insertions(+), 150 deletions(-) diff --git a/src/plugin.smoke.test.ts b/src/plugin.smoke.test.ts index 6c7a287..bca482e 100644 --- a/src/plugin.smoke.test.ts +++ b/src/plugin.smoke.test.ts @@ -24,6 +24,7 @@ describe("plugin smoke", () => { pluginConfig: { enabled: true }, registerService, registerCli, + on: vi.fn(), runtime: { config: { loadConfig: () => ({}), diff --git a/src/service.test.ts b/src/service.test.ts index 994eecd..4254a0d 100644 --- a/src/service.test.ts +++ b/src/service.test.ts @@ -166,8 +166,9 @@ describe("opik service", () => { const service = createOpikService(api as any); await service.start(createServiceContext(false) as any); - expect(api.on).not.toHaveBeenCalled(); - expect(Object.keys(hooks)).toHaveLength(0); + // Hooks are registered at construction time now + expect(api.on).toHaveBeenCalledTimes(10); + expect(Object.keys(hooks)).toHaveLength(10); expect(mockOpikConstructor).not.toHaveBeenCalled(); }); @@ -306,7 +307,7 @@ describe("opik service", () => { const service = createOpikService(api as any); await service.start(createServiceContext() as any); - expect(api.on).toHaveBeenCalledTimes(9); + expect(api.on).toHaveBeenCalledTimes(10); expect(api.on).toHaveBeenCalledWith("llm_input", expect.any(Function)); expect(api.on).toHaveBeenCalledWith("llm_output", expect.any(Function)); expect(api.on).toHaveBeenCalledWith("before_tool_call", expect.any(Function)); @@ -315,7 +316,7 @@ describe("opik service", () => { expect(api.on).toHaveBeenCalledWith("subagent_delivery_target", expect.any(Function)); expect(api.on).toHaveBeenCalledWith("subagent_spawned", expect.any(Function)); expect(api.on).toHaveBeenCalledWith("subagent_ended", expect.any(Function)); - expect(api.on).not.toHaveBeenCalledWith("tool_result_persist", expect.any(Function)); + expect(api.on).toHaveBeenCalledWith("tool_result_persist", expect.any(Function)); expect(api.on).toHaveBeenCalledWith("agent_end", expect.any(Function)); expect(diagnosticListeners).toHaveLength(1); }); @@ -1737,7 +1738,7 @@ describe("opik service", () => { expect(result).toBeUndefined(); }); - test("is not registered when tool_result_persist sanitization is disabled", async () => { + test("is registered but no-ops when tool_result_persist sanitization is disabled", async () => { const { api, hooks } = createApi(); const service = createOpikService(api as any); await service.start( @@ -1748,7 +1749,16 @@ describe("opik service", () => { }) as any, ); - expect(hooks.tool_result_persist).toBeUndefined(); + expect(hooks.tool_result_persist).toBeDefined(); + const result = invokeHook( + hooks, + "tool_result_persist", + { + message: { role: "tool", content: "media:/tmp/image.png" }, + }, + { sessionKey: "s1" }, + ); + expect(result).toBeUndefined(); }); }); diff --git a/src/service.ts b/src/service.ts index 1299270..b9c79fe 100644 --- a/src/service.ts +++ b/src/service.ts @@ -43,11 +43,15 @@ type ServiceLogger = { warn: (message: string) => void; }; +let _sharedClient: Opik | null = null; +let _sharedTags: string[] = ["openclaw"]; +let _sharedProjectName = "openclaw"; +let _sharedToolResultPersistSanitizeEnabled = false; + export function createOpikService( api: OpenClawPluginApi, pluginConfig: OpikPluginConfig = {}, ): OpenClawPluginService { - let client: Opik | null = null; const activeTraces = new Map(); const subagentSpanHosts = new Map< string, @@ -72,7 +76,7 @@ export function createOpikService( let flushQueue: Promise = Promise.resolve(); const attachmentUploader = createAttachmentUploader({ - getClient: () => client, + getClient: () => _sharedClient, getAttachmentBaseUrl: () => attachmentBaseUrl, onWarn: (message) => log.warn(message), formatError, @@ -262,7 +266,7 @@ export function createOpikService( } async function flushWithRetry(reason: string): Promise { - const currentClient = client; + const currentClient = _sharedClient; if (!currentClient) return; const attempts = flushRetryCount + 1; @@ -380,6 +384,7 @@ export function createOpikService( created_from: OPIK_CREATED_FROM, ...active.costMeta, success: agentEnd?.success, + error: agentEnd?.error, durationMs: agentEnd?.durationMs, model: active.model ?? active.costMeta.model, provider: active.provider ?? active.costMeta.provider, @@ -427,6 +432,131 @@ export function createOpikService( scheduleFlush(`trace-finalized sessionKey=${sessionKey}`); } + registerLlmHooks({ + api, + getClient: () => _sharedClient, + activeTraces, + getTags: () => _sharedTags, + getProjectName: () => _sharedProjectName, + rememberSessionCorrelation, + closeActiveTrace, + forgetSessionCorrelation, + applyContextMeta, + safeSpanUpdate, + safeSpanEnd, + scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, + warn: (message) => log.warn(message), + formatError, + }); + + registerToolHooks({ + api, + getClient: () => _sharedClient, + activeTraces, + sessionByAgentId, + getLastActiveSessionKey: () => lastActiveSessionKey, + rememberSessionCorrelation, + resolveSessionSpanContainer, + warnMissingAfterToolSessionKey, + nextSpanSeq: () => ++spanSeq, + safeSpanUpdate, + safeSpanEnd, + scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, + getProjectName: () => _sharedProjectName, + warn: (message) => log.warn(message), + formatError, + }); + + registerSubagentHooks({ + api, + getClient: () => _sharedClient, + rememberSessionCorrelation, + resolveSubagentSpanContainer, + getSubagentSpanHost, + rememberSubagentSpanHost, + forgetSubagentSpanHost, + safeSpanUpdate, + safeSpanEnd, + warn: (message) => log.warn(message), + formatError, + }); + + // ===================================================================== + // Hook: tool_result_persist — sanitize persisted tool messages (opt-in) + // ===================================================================== + api.on("tool_result_persist", (event) => { + if (!_sharedToolResultPersistSanitizeEnabled) return; + try { + const eventObj = event as Record; + const message = eventObj.message; + if (!message || typeof message !== "object") return; + + const sanitizedMessage = sanitizeValueForOpik(message); + if (sanitizedMessage !== message) { + return { message: sanitizedMessage }; + } + } catch (err) { + log.warn(`opik: tool_result_persist failed: ${formatError(err)}`); + } + }); + + // ===================================================================== + // Hook: agent_end — Finalize Trace + // ===================================================================== + api.on("agent_end", (event, agentCtx) => { + if (!_sharedClient) return; + const sessionKey = agentCtx.sessionKey; + if (!sessionKey) return; + rememberSessionCorrelation(sessionKey, agentCtx.agentId); + + const active = activeTraces.get(sessionKey); + if (!active) return; + + applyContextMeta(active, agentCtx as Record); + // Close any orphaned tool/subagent spans synchronously. + for (const [toolKey, toolSpan] of active.toolSpans) { + safeSpanEnd(toolSpan, `agent_end orphan tool sessionKey=${sessionKey} toolKey=${toolKey}`); + } + active.toolSpans.clear(); + + for (const [subagentKey, subagentSpan] of active.subagentSpans) { + safeSpanEnd( + subagentSpan, + `agent_end orphan subagent sessionKey=${sessionKey} subagentKey=${subagentKey}`, + ); + } + active.subagentSpans.clear(); + + // Store agent-end data for deferred finalization. + active.agentEnd = { + success: event.success, + error: typeof event.error === "string" ? sanitizeStringForOpik(event.error) : event.error, + durationMs: event.durationMs, + messages: (sanitizeValueForOpik( + ((event as Record).messages as unknown[]) ?? [], + ) as unknown[]) ?? [], + }; + + attachmentUploader.scheduleMediaAttachmentUploads({ + entityType: "trace", + entity: active.trace, + projectName: _sharedProjectName, + reason: `agent_end sessionKey=${sessionKey}`, + payloads: [ + event.error, + ((event as Record).messages as unknown[] | undefined)?.at(-1), + ], + }); + + // Defer finalization to a microtask so llm_output (which fires on the + // same synchronous call stack) can store output/usage first. + const traceRef = active.trace; + queueMicrotask(() => { + const current = activeTraces.get(sessionKey); + if (current && current.trace === traceRef) finalizeTrace(sessionKey); + }); + }); + return { id: OPIK_PLUGIN_ID, async start(ctx) { @@ -445,10 +575,11 @@ export function createOpikService( const apiKey = opikCfg.apiKey ?? process.env.OPIK_API_KEY; const apiUrl = opikCfg.apiUrl ?? process.env.OPIK_URL_OVERRIDE; - const projectName = opikCfg.projectName ?? trimOrUndefined(process.env.OPIK_PROJECT_NAME) ?? "openclaw"; + _sharedProjectName = opikCfg.projectName ?? trimOrUndefined(process.env.OPIK_PROJECT_NAME) ?? "openclaw"; const workspaceName = opikCfg.workspaceName ?? trimOrUndefined(process.env.OPIK_WORKSPACE) ?? "default"; - const tags = opikCfg.tags ?? ["openclaw"]; + _sharedTags = opikCfg.tags ?? ["openclaw"]; + _sharedToolResultPersistSanitizeEnabled = opikCfg.toolResultPersistSanitizeEnabled === true; attachmentBaseUrl = (apiUrl ?? DEFAULT_ATTACHMENT_BASE_URL).replace(/\/+$/, ""); staleTraceCleanupEnabled = opikCfg.staleTraceCleanupEnabled !== false; @@ -466,144 +597,19 @@ export function createOpikService( flushRetryBaseDelayMs = asNonNegativeNumber(opikCfg.flushRetryBaseDelayMs) ?? DEFAULT_FLUSH_RETRY_BASE_DELAY_MS; - client = new Opik({ + _sharedClient = new Opik({ apiKey, ...(apiUrl ? { apiUrl } : {}), - projectName, + projectName: _sharedProjectName, workspaceName, }); await validateProjectTarget({ - client, - projectName, + client: _sharedClient, + projectName: _sharedProjectName, workspaceName, }); - registerLlmHooks({ - api, - getClient: () => client, - activeTraces, - tags, - projectName, - rememberSessionCorrelation, - closeActiveTrace, - forgetSessionCorrelation, - applyContextMeta, - safeSpanUpdate, - safeSpanEnd, - scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, - warn: (message) => log.warn(message), - formatError, - }); - - registerToolHooks({ - api, - getClient: () => client, - activeTraces, - sessionByAgentId, - getLastActiveSessionKey: () => lastActiveSessionKey, - rememberSessionCorrelation, - resolveSessionSpanContainer, - warnMissingAfterToolSessionKey, - nextSpanSeq: () => ++spanSeq, - safeSpanUpdate, - safeSpanEnd, - scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, - projectName, - warn: (message) => log.warn(message), - formatError, - }); - - registerSubagentHooks({ - api, - getClient: () => client, - rememberSessionCorrelation, - resolveSubagentSpanContainer, - getSubagentSpanHost, - rememberSubagentSpanHost, - forgetSubagentSpanHost, - safeSpanUpdate, - safeSpanEnd, - warn: (message) => log.warn(message), - formatError, - }); - - // ===================================================================== - // Hook: tool_result_persist — sanitize persisted tool messages (opt-in) - // ===================================================================== - if (opikCfg.toolResultPersistSanitizeEnabled === true) { - api.on("tool_result_persist", (event) => { - try { - const eventObj = event as Record; - const message = eventObj.message; - if (!message || typeof message !== "object") return; - - const sanitizedMessage = sanitizeValueForOpik(message); - if (sanitizedMessage !== message) { - return { message: sanitizedMessage }; - } - } catch (err) { - log.warn(`opik: tool_result_persist failed: ${formatError(err)}`); - } - }); - } - - // ===================================================================== - // Hook: agent_end — Finalize Trace - // ===================================================================== - api.on("agent_end", (event, agentCtx) => { - const sessionKey = agentCtx.sessionKey; - if (!sessionKey) return; - rememberSessionCorrelation(sessionKey, agentCtx.agentId); - - const active = activeTraces.get(sessionKey); - if (!active) return; - - applyContextMeta(active, agentCtx as Record); - // Close any orphaned tool/subagent spans synchronously. - for (const [toolKey, toolSpan] of active.toolSpans) { - safeSpanEnd(toolSpan, `agent_end orphan tool sessionKey=${sessionKey} toolKey=${toolKey}`); - } - active.toolSpans.clear(); - - for (const [subagentKey, subagentSpan] of active.subagentSpans) { - safeSpanEnd( - subagentSpan, - `agent_end orphan subagent sessionKey=${sessionKey} subagentKey=${subagentKey}`, - ); - } - active.subagentSpans.clear(); - - // Store agent-end data for deferred finalization. - active.agentEnd = { - success: event.success, - error: typeof event.error === "string" ? sanitizeStringForOpik(event.error) : event.error, - durationMs: event.durationMs, - messages: (sanitizeValueForOpik( - ((event as Record).messages as unknown[]) ?? [], - ) as unknown[]) ?? [], - }; - - attachmentUploader.scheduleMediaAttachmentUploads({ - entityType: "trace", - entity: active.trace, - projectName, - reason: `agent_end sessionKey=${sessionKey}`, - payloads: [ - event.error, - ((event as Record).messages as unknown[] | undefined)?.at(-1), - ], - }); - - // Defer finalization to a microtask so llm_output (which fires on the - // same synchronous call stack) can store output/usage first. - const traceRef = active.trace; - queueMicrotask(() => { - const current = activeTraces.get(sessionKey); - if (current && current.trace === traceRef) finalizeTrace(sessionKey); - }); - }); - // ===================================================================== // Diagnostic event: model.usage — Accumulate cost/context info // ===================================================================== @@ -687,7 +693,7 @@ export function createOpikService( }; log.info( - `opik: exporting traces to project "${projectName}" (staleCleanup=${staleTraceCleanupEnabled ? "on" : "off"}, staleTimeoutMs=${staleTraceTimeoutMs}, staleSweepMs=${staleSweepIntervalMs}, flushRetryCount=${flushRetryCount}, flushRetryBaseDelayMs=${flushRetryBaseDelayMs})`, + `opik: exporting traces to project "${_sharedProjectName}" (staleCleanup=${staleTraceCleanupEnabled ? "on" : "off"}, staleTimeoutMs=${staleTraceTimeoutMs}, staleSweepMs=${staleSweepIntervalMs}, flushRetryCount=${flushRetryCount}, flushRetryBaseDelayMs=${flushRetryBaseDelayMs})`, ); }, @@ -707,9 +713,9 @@ export function createOpikService( await flushQueue.catch(() => undefined); await attachmentUploader.waitForUploads(); - if (client) { + if (_sharedClient) { await flushWithRetry("service stop"); - client = null; + _sharedClient = null; } log.info( diff --git a/src/service/hooks/llm.ts b/src/service/hooks/llm.ts index e1c1da7..289bd41 100644 --- a/src/service/hooks/llm.ts +++ b/src/service/hooks/llm.ts @@ -14,8 +14,8 @@ type LlmHooksDeps = { api: OpenClawPluginApi; getClient: () => Opik | null; activeTraces: Map; - tags: string[]; - projectName: string; + getTags: () => string[]; + getProjectName: () => string; rememberSessionCorrelation: (sessionKey: string, agentId?: unknown) => void; closeActiveTrace: (active: ActiveTrace, reason: string) => void; forgetSessionCorrelation: (sessionKey: string) => void; @@ -59,6 +59,7 @@ export function registerLlmHooks(deps: LlmHooksDeps): void { systemPrompt: event.systemPrompt, imagesCount: event.imagesCount, }) as Record; + const tags = deps.getTags(); trace = client.trace({ name: `${event.model} · ${channelId ?? "unknown"}`, threadId: sessionKey, @@ -73,7 +74,7 @@ export function registerLlmHooks(deps: LlmHooksDeps): void { ...(channelId ? { channel: channelId, channelId } : {}), ...(trigger ? { trigger } : {}), }, - tags: deps.tags.length > 0 ? deps.tags : undefined, + tags: tags.length > 0 ? tags : undefined, }); } catch (err) { deps.warn(`opik: trace creation failed (sessionKey=${sessionKey}): ${deps.formatError(err)}`); @@ -118,7 +119,7 @@ export function registerLlmHooks(deps: LlmHooksDeps): void { deps.scheduleMediaAttachmentUploads({ entityType: "trace", entity: trace, - projectName: deps.projectName, + projectName: deps.getProjectName(), reason: `llm_input sessionKey=${sessionKey}`, payloads: [event.prompt, Array.isArray(event.historyMessages) ? event.historyMessages.at(-1) : undefined], }); diff --git a/src/service/hooks/tool.ts b/src/service/hooks/tool.ts index 8af9737..cec69c6 100644 --- a/src/service/hooks/tool.ts +++ b/src/service/hooks/tool.ts @@ -25,7 +25,7 @@ type ToolHooksDeps = { reason: string; payloads: unknown[]; }) => void; - projectName: string; + getProjectName: () => string; warn: (message: string) => void; formatError: (err: unknown) => string; }; @@ -89,7 +89,7 @@ export function registerToolHooks(deps: ToolHooksDeps): void { deps.scheduleMediaAttachmentUploads({ entityType: "span", entity: toolSpan, - projectName: deps.projectName, + projectName: deps.getProjectName(), reason: `before_tool_call sessionKey=${sessionKey} tool=${event.toolName}`, payloads: [event.params], }); @@ -199,7 +199,7 @@ export function registerToolHooks(deps: ToolHooksDeps): void { deps.scheduleMediaAttachmentUploads({ entityType: "span", entity: matchedSpan, - projectName: deps.projectName, + projectName: deps.getProjectName(), reason: `after_tool_call sessionKey=${sessionKey} tool=${event.toolName}`, payloads: [event.params, event.result, event.error], });