diff --git a/src/service.ts b/src/service.ts index 1299270..c89d86c 100644 --- a/src/service.ts +++ b/src/service.ts @@ -427,6 +427,118 @@ export function createOpikService( scheduleFlush(`trace-finalized sessionKey=${sessionKey}`); } + // ========================================================================== + // Register all hooks at createOpikService() time (i.e. during register()), + // NOT inside start(). The OpenClaw gateway initialises its global hook runner + // once, immediately after register() returns. Any api.on() calls made inside + // start() execute after the runner is sealed and are silently ignored — + // meaning the plugin captures zero traces with the original code. + // + // All hooks guard with `if (!client) return` so they are safe to call in the + // brief window between register() and the first start() invocation. + // ========================================================================== + registerLlmHooks({ + api, + getClient: () => client, + activeTraces, + tags: pluginConfig.tags ?? ["openclaw"], + projectName: pluginConfig.projectName ?? "openclaw", + rememberSessionCorrelation, + closeActiveTrace, + forgetSessionCorrelation, + applyContextMeta, + safeSpanUpdate, + safeSpanEnd, + scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, + warn: (message) => log.warn(message), + formatError, + }); + + registerToolHooks({ + api, + getClient: () => client, + activeTraces, + sessionByAgentId, + getLastActiveSessionKey: () => lastActiveSessionKey, + rememberSessionCorrelation, + resolveSessionSpanContainer, + warnMissingAfterToolSessionKey, + nextSpanSeq: () => ++spanSeq, + safeSpanUpdate, + safeSpanEnd, + scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, + projectName: pluginConfig.projectName ?? "openclaw", + warn: (message) => log.warn(message), + formatError, + }); + + registerSubagentHooks({ + api, + getClient: () => client, + rememberSessionCorrelation, + resolveSubagentSpanContainer, + getSubagentSpanHost, + rememberSubagentSpanHost, + forgetSubagentSpanHost, + safeSpanUpdate, + safeSpanEnd, + warn: (message) => log.warn(message), + formatError, + }); + + api.on("tool_result_persist", (event) => { + if (!client) return; + if (pluginConfig.toolResultPersistSanitizeEnabled !== true) return; + try { + const eventObj = event as Record; + const message = eventObj.message; + if (!message || typeof message !== "object") return; + const sanitizedMessage = sanitizeValueForOpik(message); + if (sanitizedMessage !== message) return { message: sanitizedMessage }; + } catch (err) { + log.warn(`opik: tool_result_persist failed: ${formatError(err)}`); + } + }); + + api.on("agent_end", (event, agentCtx) => { + if (!client) return; + const sessionKey = agentCtx.sessionKey; + if (!sessionKey) return; + rememberSessionCorrelation(sessionKey, agentCtx.agentId); + const active = activeTraces.get(sessionKey); + if (!active) return; + applyContextMeta(active, agentCtx as Record); + for (const [toolKey, toolSpan] of active.toolSpans) + safeSpanEnd(toolSpan, `agent_end orphan tool sessionKey=${sessionKey} toolKey=${toolKey}`); + active.toolSpans.clear(); + for (const [subagentKey, subagentSpan] of active.subagentSpans) + safeSpanEnd(subagentSpan, `agent_end orphan subagent sessionKey=${sessionKey} subagentKey=${subagentKey}`); + active.subagentSpans.clear(); + active.agentEnd = { + success: event.success, + error: typeof event.error === "string" ? sanitizeStringForOpik(event.error) : event.error, + durationMs: event.durationMs, + messages: (sanitizeValueForOpik( + ((event as Record).messages as unknown[]) ?? [], + ) as unknown[]) ?? [], + }; + attachmentUploader.scheduleMediaAttachmentUploads({ + entityType: "trace", + entity: active.trace, + projectName: pluginConfig.projectName ?? "openclaw", + reason: `agent_end sessionKey=${sessionKey}`, + payloads: [ + event.error, + ((event as Record).messages as unknown[] | undefined)?.at(-1), + ], + }); + const traceRef = active.trace; + queueMicrotask(() => { + const current = activeTraces.get(sessionKey); + if (current && current.trace === traceRef) finalizeTrace(sessionKey); + }); + }); + return { id: OPIK_PLUGIN_ID, async start(ctx) { @@ -479,130 +591,8 @@ export function createOpikService( workspaceName, }); - registerLlmHooks({ - api, - getClient: () => client, - activeTraces, - tags, - projectName, - rememberSessionCorrelation, - closeActiveTrace, - forgetSessionCorrelation, - applyContextMeta, - safeSpanUpdate, - safeSpanEnd, - scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, - warn: (message) => log.warn(message), - formatError, - }); - - registerToolHooks({ - api, - getClient: () => client, - activeTraces, - sessionByAgentId, - getLastActiveSessionKey: () => lastActiveSessionKey, - rememberSessionCorrelation, - resolveSessionSpanContainer, - warnMissingAfterToolSessionKey, - nextSpanSeq: () => ++spanSeq, - safeSpanUpdate, - safeSpanEnd, - scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads, - projectName, - warn: (message) => log.warn(message), - formatError, - }); - - registerSubagentHooks({ - api, - getClient: () => client, - rememberSessionCorrelation, - resolveSubagentSpanContainer, - getSubagentSpanHost, - rememberSubagentSpanHost, - forgetSubagentSpanHost, - safeSpanUpdate, - safeSpanEnd, - warn: (message) => log.warn(message), - formatError, - }); - - // ===================================================================== - // Hook: tool_result_persist — sanitize persisted tool messages (opt-in) - // ===================================================================== - if (opikCfg.toolResultPersistSanitizeEnabled === true) { - api.on("tool_result_persist", (event) => { - try { - const eventObj = event as Record; - const message = eventObj.message; - if (!message || typeof message !== "object") return; - - const sanitizedMessage = sanitizeValueForOpik(message); - if (sanitizedMessage !== message) { - return { message: sanitizedMessage }; - } - } catch (err) { - log.warn(`opik: tool_result_persist failed: ${formatError(err)}`); - } - }); - } - - // ===================================================================== - // Hook: agent_end — Finalize Trace - // ===================================================================== - api.on("agent_end", (event, agentCtx) => { - const sessionKey = agentCtx.sessionKey; - if (!sessionKey) return; - rememberSessionCorrelation(sessionKey, agentCtx.agentId); - - const active = activeTraces.get(sessionKey); - if (!active) return; - - applyContextMeta(active, agentCtx as Record); - // Close any orphaned tool/subagent spans synchronously. - for (const [toolKey, toolSpan] of active.toolSpans) { - safeSpanEnd(toolSpan, `agent_end orphan tool sessionKey=${sessionKey} toolKey=${toolKey}`); - } - active.toolSpans.clear(); - - for (const [subagentKey, subagentSpan] of active.subagentSpans) { - safeSpanEnd( - subagentSpan, - `agent_end orphan subagent sessionKey=${sessionKey} subagentKey=${subagentKey}`, - ); - } - active.subagentSpans.clear(); - - // Store agent-end data for deferred finalization. - active.agentEnd = { - success: event.success, - error: typeof event.error === "string" ? sanitizeStringForOpik(event.error) : event.error, - durationMs: event.durationMs, - messages: (sanitizeValueForOpik( - ((event as Record).messages as unknown[]) ?? [], - ) as unknown[]) ?? [], - }; - - attachmentUploader.scheduleMediaAttachmentUploads({ - entityType: "trace", - entity: active.trace, - projectName, - reason: `agent_end sessionKey=${sessionKey}`, - payloads: [ - event.error, - ((event as Record).messages as unknown[] | undefined)?.at(-1), - ], - }); - - // Defer finalization to a microtask so llm_output (which fires on the - // same synchronous call stack) can store output/usage first. - const traceRef = active.trace; - queueMicrotask(() => { - const current = activeTraces.get(sessionKey); - if (current && current.trace === traceRef) finalizeTrace(sessionKey); - }); - }); + // Hooks are registered at createOpikService() time (see below) so they + // are captured by the gateway hook runner before start() is called. // ===================================================================== // Diagnostic event: model.usage — Accumulate cost/context info diff --git a/src/service/hooks/llm.ts b/src/service/hooks/llm.ts index e1c1da7..edd4729 100644 --- a/src/service/hooks/llm.ts +++ b/src/service/hooks/llm.ts @@ -61,6 +61,7 @@ export function registerLlmHooks(deps: LlmHooksDeps): void { }) as Record; trace = client.trace({ name: `${event.model} · ${channelId ?? "unknown"}`, + projectName: deps.projectName, threadId: sessionKey, input: sanitizedTraceInput, metadata: {