Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 114 additions & 124 deletions src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,118 @@ export function createOpikService(
scheduleFlush(`trace-finalized sessionKey=${sessionKey}`);
}

// ==========================================================================
// Register all hooks at createOpikService() time (i.e. during register()),
// NOT inside start(). The OpenClaw gateway initialises its global hook runner
// once, immediately after register() returns. Any api.on() calls made inside
// start() execute after the runner is sealed and are silently ignored —
// meaning the plugin captures zero traces with the original code.
//
// All hooks guard with `if (!client) return` so they are safe to call in the
// brief window between register() and the first start() invocation.
// ==========================================================================
registerLlmHooks({
api,
getClient: () => client,
activeTraces,
tags: pluginConfig.tags ?? ["openclaw"],
projectName: pluginConfig.projectName ?? "openclaw",
rememberSessionCorrelation,
closeActiveTrace,
forgetSessionCorrelation,
applyContextMeta,
safeSpanUpdate,
safeSpanEnd,
scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads,
warn: (message) => log.warn(message),
formatError,
});

registerToolHooks({
api,
getClient: () => client,
activeTraces,
sessionByAgentId,
getLastActiveSessionKey: () => lastActiveSessionKey,
rememberSessionCorrelation,
resolveSessionSpanContainer,
warnMissingAfterToolSessionKey,
nextSpanSeq: () => ++spanSeq,
safeSpanUpdate,
safeSpanEnd,
scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads,
projectName: pluginConfig.projectName ?? "openclaw",
warn: (message) => log.warn(message),
formatError,
});

registerSubagentHooks({
api,
getClient: () => client,
rememberSessionCorrelation,
resolveSubagentSpanContainer,
getSubagentSpanHost,
rememberSubagentSpanHost,
forgetSubagentSpanHost,
safeSpanUpdate,
safeSpanEnd,
warn: (message) => log.warn(message),
formatError,
});

api.on("tool_result_persist", (event) => {
if (!client) return;
if (pluginConfig.toolResultPersistSanitizeEnabled !== true) return;
try {
const eventObj = event as Record<string, unknown>;
const message = eventObj.message;
if (!message || typeof message !== "object") return;
const sanitizedMessage = sanitizeValueForOpik(message);
if (sanitizedMessage !== message) return { message: sanitizedMessage };
} catch (err) {
log.warn(`opik: tool_result_persist failed: ${formatError(err)}`);
}
});

api.on("agent_end", (event, agentCtx) => {
if (!client) return;
const sessionKey = agentCtx.sessionKey;
if (!sessionKey) return;
rememberSessionCorrelation(sessionKey, agentCtx.agentId);
const active = activeTraces.get(sessionKey);
if (!active) return;
applyContextMeta(active, agentCtx as Record<string, unknown>);
for (const [toolKey, toolSpan] of active.toolSpans)
safeSpanEnd(toolSpan, `agent_end orphan tool sessionKey=${sessionKey} toolKey=${toolKey}`);
active.toolSpans.clear();
for (const [subagentKey, subagentSpan] of active.subagentSpans)
safeSpanEnd(subagentSpan, `agent_end orphan subagent sessionKey=${sessionKey} subagentKey=${subagentKey}`);
active.subagentSpans.clear();
active.agentEnd = {
success: event.success,
error: typeof event.error === "string" ? sanitizeStringForOpik(event.error) : event.error,
durationMs: event.durationMs,
messages: (sanitizeValueForOpik(
((event as Record<string, unknown>).messages as unknown[]) ?? [],
) as unknown[]) ?? [],
};
attachmentUploader.scheduleMediaAttachmentUploads({
entityType: "trace",
entity: active.trace,
projectName: pluginConfig.projectName ?? "openclaw",
reason: `agent_end sessionKey=${sessionKey}`,
payloads: [
event.error,
((event as Record<string, unknown>).messages as unknown[] | undefined)?.at(-1),
],
});
const traceRef = active.trace;
queueMicrotask(() => {
const current = activeTraces.get(sessionKey);
if (current && current.trace === traceRef) finalizeTrace(sessionKey);
});
});

return {
id: OPIK_PLUGIN_ID,
async start(ctx) {
Expand Down Expand Up @@ -479,130 +591,8 @@ export function createOpikService(
workspaceName,
});

registerLlmHooks({
api,
getClient: () => client,
activeTraces,
tags,
projectName,
rememberSessionCorrelation,
closeActiveTrace,
forgetSessionCorrelation,
applyContextMeta,
safeSpanUpdate,
safeSpanEnd,
scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads,
warn: (message) => log.warn(message),
formatError,
});

registerToolHooks({
api,
getClient: () => client,
activeTraces,
sessionByAgentId,
getLastActiveSessionKey: () => lastActiveSessionKey,
rememberSessionCorrelation,
resolveSessionSpanContainer,
warnMissingAfterToolSessionKey,
nextSpanSeq: () => ++spanSeq,
safeSpanUpdate,
safeSpanEnd,
scheduleMediaAttachmentUploads: attachmentUploader.scheduleMediaAttachmentUploads,
projectName,
warn: (message) => log.warn(message),
formatError,
});

registerSubagentHooks({
api,
getClient: () => client,
rememberSessionCorrelation,
resolveSubagentSpanContainer,
getSubagentSpanHost,
rememberSubagentSpanHost,
forgetSubagentSpanHost,
safeSpanUpdate,
safeSpanEnd,
warn: (message) => log.warn(message),
formatError,
});

// =====================================================================
// Hook: tool_result_persist — sanitize persisted tool messages (opt-in)
// =====================================================================
if (opikCfg.toolResultPersistSanitizeEnabled === true) {
api.on("tool_result_persist", (event) => {
try {
const eventObj = event as Record<string, unknown>;
const message = eventObj.message;
if (!message || typeof message !== "object") return;

const sanitizedMessage = sanitizeValueForOpik(message);
if (sanitizedMessage !== message) {
return { message: sanitizedMessage };
}
} catch (err) {
log.warn(`opik: tool_result_persist failed: ${formatError(err)}`);
}
});
}

// =====================================================================
// Hook: agent_end — Finalize Trace
// =====================================================================
api.on("agent_end", (event, agentCtx) => {
const sessionKey = agentCtx.sessionKey;
if (!sessionKey) return;
rememberSessionCorrelation(sessionKey, agentCtx.agentId);

const active = activeTraces.get(sessionKey);
if (!active) return;

applyContextMeta(active, agentCtx as Record<string, unknown>);
// Close any orphaned tool/subagent spans synchronously.
for (const [toolKey, toolSpan] of active.toolSpans) {
safeSpanEnd(toolSpan, `agent_end orphan tool sessionKey=${sessionKey} toolKey=${toolKey}`);
}
active.toolSpans.clear();

for (const [subagentKey, subagentSpan] of active.subagentSpans) {
safeSpanEnd(
subagentSpan,
`agent_end orphan subagent sessionKey=${sessionKey} subagentKey=${subagentKey}`,
);
}
active.subagentSpans.clear();

// Store agent-end data for deferred finalization.
active.agentEnd = {
success: event.success,
error: typeof event.error === "string" ? sanitizeStringForOpik(event.error) : event.error,
durationMs: event.durationMs,
messages: (sanitizeValueForOpik(
((event as Record<string, unknown>).messages as unknown[]) ?? [],
) as unknown[]) ?? [],
};

attachmentUploader.scheduleMediaAttachmentUploads({
entityType: "trace",
entity: active.trace,
projectName,
reason: `agent_end sessionKey=${sessionKey}`,
payloads: [
event.error,
((event as Record<string, unknown>).messages as unknown[] | undefined)?.at(-1),
],
});

// Defer finalization to a microtask so llm_output (which fires on the
// same synchronous call stack) can store output/usage first.
const traceRef = active.trace;
queueMicrotask(() => {
const current = activeTraces.get(sessionKey);
if (current && current.trace === traceRef) finalizeTrace(sessionKey);
});
});
// Hooks are registered at createOpikService() time (see below) so they
// are captured by the gateway hook runner before start() is called.

// =====================================================================
// Diagnostic event: model.usage — Accumulate cost/context info
Expand Down
1 change: 1 addition & 0 deletions src/service/hooks/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export function registerLlmHooks(deps: LlmHooksDeps): void {
}) as Record<string, unknown>;
trace = client.trace({
name: `${event.model} · ${channelId ?? "unknown"}`,
projectName: deps.projectName,
threadId: sessionKey,
input: sanitizedTraceInput,
metadata: {
Expand Down