diff --git a/Cargo.lock b/Cargo.lock index 8afb21de452..919d242c246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3782,12 +3782,15 @@ name = "hash-temporal-client" version = "0.0.0" dependencies = [ "error-stack", + "opentelemetry", "serde", "serde_json", "simple-mermaid", "temporalio-client", "temporalio-common", "thiserror 2.0.18", + "tracing", + "tracing-opentelemetry", "type-system", "url", "uuid", diff --git a/apps/hash-ai-worker-ts/package.json b/apps/hash-ai-worker-ts/package.json index d4c3e4ca783..04069ff3804 100644 --- a/apps/hash-ai-worker-ts/package.json +++ b/apps/hash-ai-worker-ts/package.json @@ -56,9 +56,14 @@ "@local/hash-graph-sdk": "workspace:*", "@local/hash-isomorphic-utils": "workspace:*", "@local/status": "workspace:*", + "@opentelemetry/api": "1.9.0", + "@opentelemetry/api-logs": "0.207.0", + "@opentelemetry/instrumentation": "0.207.0", + "@opentelemetry/instrumentation-grpc": "0.207.0", "@sentry/node": "10.42.0", "@temporalio/activity": "1.12.1", "@temporalio/common": "1.12.1", + "@temporalio/interceptors-opentelemetry": "1.12.1", "@temporalio/proto": "1.12.1", "@temporalio/worker": "1.12.1", "@temporalio/workflow": "1.12.1", diff --git a/apps/hash-ai-worker-ts/scripts/bundle-workflow-code.ts b/apps/hash-ai-worker-ts/scripts/bundle-workflow-code.ts index 9c37039248c..9cf2a65542c 100644 --- a/apps/hash-ai-worker-ts/scripts/bundle-workflow-code.ts +++ b/apps/hash-ai-worker-ts/scripts/bundle-workflow-code.ts @@ -17,6 +17,14 @@ async function bundle() { require.resolve( "@local/hash-backend-utils/temporal/interceptors/workflows/sentry", ), + // OTEL workflow interceptor must be in the bundle: when the + // worker boots with `workflowBundle`, the `interceptors.workflowModules` + // option on `Worker.create` is ignored. The interceptor is a no-op + // when no global TracerProvider is registered, so it's safe to + // include unconditionally. + require.resolve( + "@local/hash-backend-utils/temporal/interceptors/workflows/opentelemetry", + ), ], }); const codePath = path.join(__dirname, "../dist/workflow-bundle.js"); diff --git a/apps/hash-ai-worker-ts/src/instrument.ts b/apps/hash-ai-worker-ts/src/instrument.ts new file mode 100644 index 00000000000..05650f8aae1 --- /dev/null +++ b/apps/hash-ai-worker-ts/src/instrument.ts @@ -0,0 +1,51 @@ +/** + * OpenTelemetry bootstrap for the AI worker. Imported as the very first + * statement of `main.ts` so the auto-instrumentations can patch http + * and gRPC modules before any other code requires them. + */ +import { + createHttpInstrumentation, + createUndiciInstrumentation, + registerOpenTelemetry, +} from "@local/hash-backend-utils/opentelemetry"; +import { GrpcInstrumentation } from "@opentelemetry/instrumentation-grpc"; + +/** + * Setup handles. `undefined` when no `HASH_OTLP_ENDPOINT` is configured + * (no collector) or when bootstrap throws. + */ +export const otelSetup: ReturnType = (() => { + const otlpEndpoint = process.env.HASH_OTLP_ENDPOINT; + if (!otlpEndpoint) { + return undefined; + } + try { + return registerOpenTelemetry({ + endpoint: otlpEndpoint, + serviceName: process.env.OTEL_SERVICE_NAME ?? "AI Worker", + instrumentations: [ + createHttpInstrumentation(otlpEndpoint), + new GrpcInstrumentation(), + // Native `fetch` (used by openai / @anthropic-ai/sdk / Vertex AI + // SDKs) goes through undici, which the http instrumentation does + // not patch. The shared helper sets `peer.service` so Tempo's + // service_graphs processor renders external dependencies as + // edges in the service map. + createUndiciInstrumentation(), + ], + }); + } catch (error) { + // Outside production, fail loud: realistic causes here are coding + // errors (bad URL, malformed instrumentation config) and hiding + // them in dev/CI loses regressions. + if (process.env.NODE_ENV !== "production") { + throw error; + } + // eslint-disable-next-line no-console + console.error( + "OpenTelemetry bootstrap failed; AI worker will start without telemetry.", + error, + ); + return undefined; + } +})(); diff --git a/apps/hash-ai-worker-ts/src/main.ts b/apps/hash-ai-worker-ts/src/main.ts index 2e6655ec36d..1f93a495494 100644 --- a/apps/hash-ai-worker-ts/src/main.ts +++ b/apps/hash-ai-worker-ts/src/main.ts @@ -1,4 +1,8 @@ -/* eslint-disable import/first */ +/* eslint-disable import/first, import/order, simple-import-sort/imports */ + +// Must be the first import so OTEL auto-instrumentations can patch +// http / grpc / Sentry's own monkey-patches before they apply. +import { otelSetup } from "./instrument.js"; import * as Sentry from "@sentry/node"; @@ -12,9 +16,16 @@ Sentry.init({ process.env.ENVIRONMENT || (process.env.NODE_ENV === "production" ? "production" : "development"), tracesSampleRate: process.env.NODE_ENV === "production" ? 1.0 : 0, + // Sentry registers its own global `NodeTracerProvider` by default. + // Letting it run after `registerOpenTelemetry` would replace the + // provider that the OTEL workflow client interceptor (set up in + // `createTemporalClient`) holds via `trace.getTracer(...)`, breaking + // caller → workflow → activity context propagation. With this flag + // Sentry shares our provider so its spans flow through the same + // OTLP pipeline. + skipOpenTelemetrySetup: !!otelSetup, }); -import * as http from "node:http"; import { createRequire } from "node:module"; import path from "node:path"; import { fileURLToPath } from "node:url"; @@ -22,11 +33,9 @@ import { fileURLToPath } from "node:url"; import { createGraphClient } from "@local/hash-backend-utils/create-graph-client"; import { getRequiredEnv } from "@local/hash-backend-utils/environment"; import { createCommonFlowActivities } from "@local/hash-backend-utils/flows"; -import { SentryActivityInboundInterceptor } from "@local/hash-backend-utils/temporal/interceptors/activities/sentry"; -import { sentrySinks } from "@local/hash-backend-utils/temporal/sinks/sentry"; +import type { WorkflowSource } from "@local/hash-backend-utils/temporal/worker-bootstrap"; +import { runWorker } from "@local/hash-backend-utils/temporal/worker-bootstrap"; import { createVaultClient } from "@local/hash-backend-utils/vault"; -import type { WorkerOptions } from "@temporalio/worker"; -import { defaultSinks, NativeConnection, Worker } from "@temporalio/worker"; import { config } from "dotenv-flow"; import { TsconfigPathsPlugin } from "tsconfig-paths-webpack-plugin"; @@ -43,138 +52,76 @@ export const monorepoRootDir = path.resolve(__dirname, "../../.."); config({ silent: true, path: monorepoRootDir }); -const TEMPORAL_HOST = new URL( - process.env.HASH_TEMPORAL_SERVER_HOST ?? "http://localhost", -).hostname; -const TEMPORAL_PORT = process.env.HASH_TEMPORAL_SERVER_PORT - ? parseInt(process.env.HASH_TEMPORAL_SERVER_PORT, 10) - : 7233; - -const createHealthCheckServer = () => { - const server = http.createServer((req, res) => { - if (req.method === "GET" && req.url === "/health") { - res.setHeader("Content-Type", "application/json"); - res.writeHead(200); - res.end( - JSON.stringify({ - msg: "worker healthy", - }), - ); - return; - } - res.writeHead(404); - res.end(""); - }); - - return server; -}; - -const workflowOptions: Partial = +const workflowSource: WorkflowSource = process.env.NODE_ENV === "production" ? { - workflowBundle: { - codePath: require.resolve("../dist/workflow-bundle.js"), - }, + kind: "bundle", + bundle: { codePath: require.resolve("../dist/workflow-bundle.js") }, } : { + kind: "path", + workflowsPath: require.resolve("./workflows"), bundlerOptions: { - webpackConfigHook: (webpackConfig) => { - return { - ...webpackConfig, - resolve: { - ...webpackConfig.resolve, - plugins: [ - ...((webpackConfig.plugins as [] | undefined) ?? []), - /** - * Because we run TypeScript directly in development, we need to use the 'paths' in the base tsconfig.json - * This tells TypeScript where to resolve the imports from, overwriting the 'exports' in local dependencies' package.jsons, - * which refer to the transpiled JavaScript code. This plugin converts the 'paths' to webpack 'alias'. - */ - new TsconfigPathsPlugin({ - configFile: - "../../libs/@local/tsconfig/legacy-base-tsconfig-to-refactor.json", - }), - ], - }, - }; - }, + webpackConfigHook: (webpackConfig) => ({ + ...webpackConfig, + resolve: { + ...webpackConfig.resolve, + plugins: [ + ...((webpackConfig.plugins as [] | undefined) ?? []), + /** + * We run TypeScript directly in development, so the 'paths' in + * the base tsconfig.json need to be honoured to override the + * 'exports' in local dependencies' package.jsons (which point + * at transpiled JavaScript). This plugin converts the 'paths' + * to webpack 'alias'. + */ + new TsconfigPathsPlugin({ + configFile: + "../../libs/@local/tsconfig/legacy-base-tsconfig-to-refactor.json", + }), + ], + }, + }), }, - workflowsPath: require.resolve("./workflows"), }; async function run() { - logger.info("Starting AI worker..."); - const graphApiClient = createGraphClient(logger, { host: getRequiredEnv("HASH_GRAPH_HTTP_HOST"), port: parseInt(getRequiredEnv("HASH_GRAPH_HTTP_PORT"), 10), }); - logger.info("Created Graph client"); const vaultClient = await createVaultClient({ logger }); - if (!vaultClient) { throw new Error("Failed to create Vault client, check preceding logs."); } - logger.info("Created Vault client"); - const connection = await NativeConnection.connect({ - address: `${TEMPORAL_HOST}:${TEMPORAL_PORT}`, - }); - logger.info("Created Temporal connection"); - - const worker = await Worker.create({ - ...workflowOptions, + await runWorker({ + serviceName: "AI worker", + taskQueue: "ai", + healthCheckPort: 4100, activities: { - ...createAiActivities({ - graphApiClient, - }), - ...createGraphActivities({ - graphApiClient, - }), + ...createAiActivities({ graphApiClient }), + ...createGraphActivities({ graphApiClient }), ...createFlowActivities({ vaultClient }), ...createCommonFlowActivities({ graphApiClient }), }, - connection, - /** - * The maximum time that may elapse between heartbeats being processed by the server. - * The default maxHeartbeatThrottleInterval is 60s. - * Throttling is also capped at 80% of the heartbeatTimeout set when proxying an activity. - */ - maxHeartbeatThrottleInterval: "10 seconds", - namespace: "HASH", - taskQueue: "ai", - sinks: { ...defaultSinks(), ...sentrySinks() }, - interceptors: { - workflowModules: [ - require.resolve( - "@local/hash-backend-utils/temporal/interceptors/workflows/sentry", - ), - ], - activityInbound: [(ctx) => new SentryActivityInboundInterceptor(ctx)], + workflowSource, + workerOptions: { + /** + * Maximum interval between heartbeats being processed by the server. + * Default `maxHeartbeatThrottleInterval` is 60s; throttling is also + * capped at 80% of `heartbeatTimeout` set when proxying an activity. + */ + maxHeartbeatThrottleInterval: "10 seconds", }, + otelSetup, + logger, }); - - const httpServer = createHealthCheckServer(); - const port = 4100; - httpServer.listen({ host: "0.0.0.0", port }); - - logger.info(`HTTP server listening on port ${port}`); - - await worker.run(); } -process.on("SIGINT", () => { - logger.info("Received SIGINT, exiting..."); - process.exit(1); -}); -process.on("SIGTERM", () => { - logger.info("Received SIGTERM, exiting..."); - process.exit(1); -}); - run().catch((error: unknown) => { logger.error("Error running worker", { error }); process.exit(1); diff --git a/apps/hash-api/package.json b/apps/hash-api/package.json index 796ce43a439..58972e49f89 100644 --- a/apps/hash-api/package.json +++ b/apps/hash-api/package.json @@ -54,7 +54,6 @@ "@opentelemetry/instrumentation": "0.207.0", "@opentelemetry/instrumentation-express": "0.56.0", "@opentelemetry/instrumentation-graphql": "0.55.0", - "@opentelemetry/instrumentation-http": "0.207.0", "@opentelemetry/resources": "2.2.0", "@opentelemetry/sdk-logs": "0.207.0", "@opentelemetry/sdk-trace-base": "2.2.0", diff --git a/apps/hash-api/src/ensure-system-graph-is-initialized.ts b/apps/hash-api/src/ensure-system-graph-is-initialized.ts index 47d8416d357..67ee8978b79 100644 --- a/apps/hash-api/src/ensure-system-graph-is-initialized.ts +++ b/apps/hash-api/src/ensure-system-graph-is-initialized.ts @@ -17,7 +17,7 @@ const context: ImpureGraphContext = { host: getRequiredEnv("HASH_GRAPH_HTTP_HOST"), port: Number.parseInt(getRequiredEnv("HASH_GRAPH_HTTP_PORT"), 10), }), - temporalClient: await createTemporalClient(logger), + temporalClient: await createTemporalClient(), }; await ensureSystemGraphIsInitialized({ diff --git a/apps/hash-api/src/graphql/opentelemetry.ts b/apps/hash-api/src/graphql/opentelemetry.ts deleted file mode 100644 index 3aded15c9dc..00000000000 --- a/apps/hash-api/src/graphql/opentelemetry.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { logs } from "@opentelemetry/api-logs"; -import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-grpc"; -import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc"; -import { registerInstrumentations } from "@opentelemetry/instrumentation"; -import { - ExpressInstrumentation, - ExpressLayerType, -} from "@opentelemetry/instrumentation-express"; -import { GraphQLInstrumentation } from "@opentelemetry/instrumentation-graphql"; -import { HttpInstrumentation } from "@opentelemetry/instrumentation-http"; -import { - defaultResource, - resourceFromAttributes, -} from "@opentelemetry/resources"; -import { - LoggerProvider, - SimpleLogRecordProcessor, -} from "@opentelemetry/sdk-logs"; -import { SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base"; -import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"; - -import { logger } from "../logger"; - -const traceTimeout = 5000; - -const unregisterInstrumentations = registerInstrumentations({ - instrumentations: [ - new HttpInstrumentation({ - ignoreOutgoingRequestHook: (options) => { - return options.port === 4317; - }, - }), - new ExpressInstrumentation({ - ignoreLayersType: [ExpressLayerType.MIDDLEWARE], - }), - new GraphQLInstrumentation({ - allowValues: true, - depth: 5, - mergeItems: true, - ignoreTrivialResolveSpans: true, - }), - ], -}); - -export const registerOpenTelemetry = ( - otlpGrpcEndpoint: string | null, - serviceName: string, -): (() => void) => { - if (!otlpGrpcEndpoint) { - logger.warn( - "No OpenTelemetry Protocol endpoint given. Not sending telemetry anywhere.", - ); - return () => {}; - } - - const collectorOptions = { - timeoutMillis: traceTimeout, - url: otlpGrpcEndpoint, - }; - - // Setup Tracing - const traceExporter = new OTLPTraceExporter(collectorOptions); - const traceProvider = new NodeTracerProvider({ - resource: defaultResource().merge( - resourceFromAttributes({ "service.name": serviceName }), - ), - spanProcessors: [new SimpleSpanProcessor(traceExporter)], - }); - traceProvider.register(); - - // Setup Logs - const logExporter = new OTLPLogExporter(collectorOptions); - const logProvider = new LoggerProvider({ - resource: defaultResource().merge( - resourceFromAttributes({ "service.name": serviceName }), - ), - processors: [new SimpleLogRecordProcessor(logExporter)], - }); - - logs.setGlobalLoggerProvider(logProvider); - - logger.info( - `Registered OpenTelemetry (traces + logs) at endpoint ${otlpGrpcEndpoint} for ${serviceName}`, - ); - - return () => { - traceProvider.shutdown().catch(logger.error); - logProvider.shutdown().catch(logger.error); - unregisterInstrumentations(); - }; -}; diff --git a/apps/hash-api/src/index.ts b/apps/hash-api/src/index.ts index 2df03434c5f..0f9f29e84b2 100644 --- a/apps/hash-api/src/index.ts +++ b/apps/hash-api/src/index.ts @@ -66,6 +66,7 @@ import { createEmailTransporter } from "./email/create-email-transporter"; import { ensureSystemGraphIsInitialized } from "./graph/ensure-system-graph-is-initialized"; import { ensureHashSystemAccountExists } from "./graph/system-account"; import { createApolloServer } from "./graphql/create-apollo-server"; +import { otelSetup } from "./instrument.mjs"; import { enabledIntegrations } from "./integrations/enabled-integrations"; import { checkGoogleAccessToken } from "./integrations/google/check-access-token"; import { getGoogleAccessToken } from "./integrations/google/get-access-token"; @@ -100,6 +101,16 @@ const httpServer = http.createServer(app); const shutdown = new GracefulShutdown(logger, "SIGINT", "SIGTERM"); +// Register OpenTelemetry first so it flushes last — `GracefulShutdown` +// runs cleanups in reverse registration order. Cleanup hooks added below +// can still emit shutdown spans / logs before the providers disconnect +// from the collector. `otelSetup` is `undefined` when no +// `HASH_OTLP_ENDPOINT` is configured (no collector) or when bootstrap +// throws. +if (otelSetup) { + shutdown.addCleanup("OpenTelemetry", otelSetup.shutdown); +} + const baseRateLimitOptions: Partial = { windowMs: process.env.NODE_ENV === "test" ? 10 : 1000 * 10, // 10 seconds limit: 12, // Limit each IP to 12 requests every 10 seconds @@ -244,6 +255,30 @@ const sanitizeProxyLogArgs = (args: unknown[]): unknown[] => typeof arg === "string" ? redactAuthQueryParams(arg) : arg, ); +/** + * Forward a `http-proxy-middleware` variadic log call to the + * structured logger as `(message, meta?)`. Without this, passing the + * raw `args` array as a single argument to `logger.info` results in + * the OTLP body being a JSON-encoded array (`["[HPM] …"]`) instead of + * the proxy log message itself. + */ +const forwardProxyLog = (level: "info" | "warn" | "error", args: unknown[]) => { + const sanitized = sanitizeProxyLogArgs(args); + if (sanitized.length === 0) { + return; + } + const [first, ...rest] = sanitized; + if (typeof first === "string") { + if (rest.length === 0) { + logger[level](first); + } else { + logger[level](first, { args: rest }); + } + return; + } + logger[level]("[HPM]", { args: sanitized }); +}; + const kratosProxyLogger = { /** * `http-proxy-middleware` logs include request URLs. @@ -251,15 +286,9 @@ const kratosProxyLogger = { * `/auth/*` requests can include Ory self-service query parameters, so we * sanitize all forwarded log levels consistently. */ - info: (...args: unknown[]) => { - logger.info(sanitizeProxyLogArgs(args)); - }, - warn: (...args: unknown[]) => { - logger.warn(sanitizeProxyLogArgs(args)); - }, - error: (...args: unknown[]) => { - logger.error(sanitizeProxyLogArgs(args)); - }, + info: (...args: unknown[]) => forwardProxyLog("info", args), + warn: (...args: unknown[]) => forwardProxyLog("warn", args), + error: (...args: unknown[]) => forwardProxyLog("error", args), }; const kratosProxy = createProxyMiddleware({ @@ -418,7 +447,7 @@ const main = async () => { // Setup upload storage provider and express routes for local file uploads const uploadProvider = setupStorageProviders(app, FILE_UPLOAD_PROVIDER); - const temporalClient = await createTemporalClient(logger); + const temporalClient = await createTemporalClient(); const vaultClient = await createVaultClient({ logger }); diff --git a/apps/hash-api/src/instrument.mjs b/apps/hash-api/src/instrument.mjs index 4e72042f03c..9aff48c593b 100644 --- a/apps/hash-api/src/instrument.mjs +++ b/apps/hash-api/src/instrument.mjs @@ -1,19 +1,71 @@ /** Required to load environment variables */ import "@local/hash-backend-utils/environment"; +import { + createHttpInstrumentation, + createUndiciInstrumentation, + registerOpenTelemetry, +} from "@local/hash-backend-utils/opentelemetry"; +import { + ExpressInstrumentation, + ExpressLayerType, +} from "@opentelemetry/instrumentation-express"; +import { GraphQLInstrumentation } from "@opentelemetry/instrumentation-graphql"; import * as Sentry from "@sentry/node"; import { isProdEnv } from "./lib/env-config"; -// Initialize OpenTelemetry BEFORE any app code -const otlpEndpoint = process.env.HASH_OTLP_ENDPOINT; -if (otlpEndpoint) { - const { registerOpenTelemetry } = await import("./graphql/opentelemetry.js"); - registerOpenTelemetry( - otlpEndpoint, - process.env.OTEL_SERVICE_NAME || "Node API", - ); -} +/** + * OpenTelemetry setup handle, exported so `index.ts` can wire + * `shutdown()` into the GracefulShutdown chain. `undefined` when + * `HASH_OTLP_ENDPOINT` is unset (no collector configured) or when + * bootstrap throws. + * + * @type {import("@local/hash-backend-utils/opentelemetry").OpenTelemetrySetup | undefined} + */ +export const otelSetup = (() => { + const otlpEndpoint = process.env.HASH_OTLP_ENDPOINT; + if (!otlpEndpoint) { + return undefined; + } + try { + return registerOpenTelemetry({ + endpoint: otlpEndpoint, + serviceName: process.env.OTEL_SERVICE_NAME || "Node API", + instrumentations: [ + createHttpInstrumentation(otlpEndpoint), + new ExpressInstrumentation({ + ignoreLayersType: [ExpressLayerType.MIDDLEWARE], + }), + new GraphQLInstrumentation({ + allowValues: true, + depth: 5, + mergeItems: true, + ignoreTrivialResolveSpans: true, + }), + // Native `fetch` (used by openai SDK and outbound API calls in + // resolvers) goes through undici, which the http instrumentation + // does not patch. The shared helper sets `peer.service` so + // Tempo's service_graphs processor renders external dependencies + // as edges in the service map. + createUndiciInstrumentation(), + ], + }); + } catch (error) { + // Outside production, fail loud: the only realistic causes here are + // coding errors (bad URL, malformed instrumentation config) and + // hiding them in dev/CI loses regressions. + if (!isProdEnv) { + throw error; + } + // eslint-disable-next-line no-console + console.error( + "OpenTelemetry bootstrap failed; service will start without telemetry.", + error, + ); + return undefined; + } +})(); const sentryDsn = process.env.NODE_API_SENTRY_DSN; @@ -26,4 +78,9 @@ Sentry.init({ (isProdEnv ? "production" : "development"), sendDefaultPii: true, tracesSampleRate: isProdEnv ? 1.0 : 0, + // Skip Sentry's tracer setup only when our v2 provider actually + // registered. Gating on `otlpEndpoint` instead would skip Sentry's + // setup whenever the env var is set even if our bootstrap threw, + // leaving the process with neither tracer. + skipOpenTelemetrySetup: !!otelSetup, }); diff --git a/apps/hash-api/src/integrations/linear/webhook.ts b/apps/hash-api/src/integrations/linear/webhook.ts index dbb709c77e1..c5b39974f73 100644 --- a/apps/hash-api/src/integrations/linear/webhook.ts +++ b/apps/hash-api/src/integrations/linear/webhook.ts @@ -59,7 +59,7 @@ export const linearWebhook: RequestHandler< const payload = JSON.parse(req.body) as LinearWebhookPayload; - const temporalClient = await createTemporalClient(logger); + const temporalClient = await createTemporalClient(); const organizationId = payload.organizationId; diff --git a/apps/hash-external-services/docker-compose.yml b/apps/hash-external-services/docker-compose.yml index 916282b1c98..9da1bf8ba10 100644 --- a/apps/hash-external-services/docker-compose.yml +++ b/apps/hash-external-services/docker-compose.yml @@ -237,6 +237,9 @@ services: POSTGRES_USER: "${HASH_TEMPORAL_PG_USER}" POSTGRES_PWD: "${HASH_TEMPORAL_PG_PASSWORD}" POSTGRES_SEEDS: "postgres" # the hostname of the postgres container + # Expose Prometheus-format metrics on port 8000 so the otel-collector + # can scrape them and forward to Mimir. + PROMETHEUS_ENDPOINT: "0.0.0.0:8000" security_opt: - no-new-privileges:true ports: diff --git a/apps/hash-external-services/opentelemetry-collector/otel-collector-config.yaml b/apps/hash-external-services/opentelemetry-collector/otel-collector-config.yaml index 8805bf18998..36fb21a0efb 100644 --- a/apps/hash-external-services/opentelemetry-collector/otel-collector-config.yaml +++ b/apps/hash-external-services/opentelemetry-collector/otel-collector-config.yaml @@ -5,6 +5,34 @@ receivers: endpoint: 0.0.0.0:4317 http: endpoint: 0.0.0.0:4318 + # Scrape Prometheus-format metrics that the Temporal server cluster + # exposes on its `PROMETHEUS_ENDPOINT` (port 8000) so we get + # server-side counters (workflow_started, persistence latency, + # mutable-state cache, queue depth, …) alongside the worker SDK + # metrics that arrive via OTLP. + # + # Temporal emits unprefixed metric names (`cache_miss`, + # `request_latency`, `activity_end_to_end_latency`, …) that collide + # visually with metrics from every other scrape job. Force a + # `temporal_` prefix on everything coming out of this scrape so + # they're disambiguated in Mimir / Grafana auto-complete. Go runtime + # metrics from the Temporal process are also prefixed — that's + # intentional, those are *Temporal's* runtime, not ours. + prometheus: + config: + scrape_configs: + - job_name: temporal-server + scrape_interval: 30s + static_configs: + - targets: [temporal:8000] + labels: + service.name: temporal-server + metric_relabel_configs: + - source_labels: [__name__] + regex: ^(.+)$ + target_label: __name__ + replacement: temporal_${1} + action: replace processors: batch: @@ -40,7 +68,7 @@ service: processors: [resource/deployment_environment, batch] exporters: [otlp/tempo] metrics: - receivers: [otlp] + receivers: [otlp, prometheus] processors: [batch] exporters: [otlphttp/mimir] logs: diff --git a/apps/hash-integration-worker/package.json b/apps/hash-integration-worker/package.json index b26526c56a3..9a12d26600a 100644 --- a/apps/hash-integration-worker/package.json +++ b/apps/hash-integration-worker/package.json @@ -28,8 +28,13 @@ "@local/hash-graph-sdk": "workspace:*", "@local/hash-isomorphic-utils": "workspace:*", "@local/status": "workspace:*", + "@opentelemetry/api": "1.9.0", + "@opentelemetry/api-logs": "0.207.0", + "@opentelemetry/instrumentation": "0.207.0", + "@opentelemetry/instrumentation-grpc": "0.207.0", "@sentry/node": "10.42.0", "@temporalio/activity": "1.12.1", + "@temporalio/interceptors-opentelemetry": "1.12.1", "@temporalio/worker": "1.12.1", "@temporalio/workflow": "1.12.1", "agentkeepalive": "4.6.0", diff --git a/apps/hash-integration-worker/scripts/bundle-workflow-code.ts b/apps/hash-integration-worker/scripts/bundle-workflow-code.ts index 9c37039248c..9cf2a65542c 100644 --- a/apps/hash-integration-worker/scripts/bundle-workflow-code.ts +++ b/apps/hash-integration-worker/scripts/bundle-workflow-code.ts @@ -17,6 +17,14 @@ async function bundle() { require.resolve( "@local/hash-backend-utils/temporal/interceptors/workflows/sentry", ), + // OTEL workflow interceptor must be in the bundle: when the + // worker boots with `workflowBundle`, the `interceptors.workflowModules` + // option on `Worker.create` is ignored. The interceptor is a no-op + // when no global TracerProvider is registered, so it's safe to + // include unconditionally. + require.resolve( + "@local/hash-backend-utils/temporal/interceptors/workflows/opentelemetry", + ), ], }); const codePath = path.join(__dirname, "../dist/workflow-bundle.js"); diff --git a/apps/hash-integration-worker/src/instrument.ts b/apps/hash-integration-worker/src/instrument.ts new file mode 100644 index 00000000000..004412282c2 --- /dev/null +++ b/apps/hash-integration-worker/src/instrument.ts @@ -0,0 +1,50 @@ +/** + * OpenTelemetry bootstrap for the integration worker. Imported as the + * very first statement of `main.ts` so the auto-instrumentations can + * patch http and gRPC modules before any other code requires them. + */ +import { + createHttpInstrumentation, + createUndiciInstrumentation, + registerOpenTelemetry, +} from "@local/hash-backend-utils/opentelemetry"; +import { GrpcInstrumentation } from "@opentelemetry/instrumentation-grpc"; + +/** + * Setup handles. `undefined` when no `HASH_OTLP_ENDPOINT` is configured + * (no collector) or when bootstrap throws. + */ +export const otelSetup: ReturnType = (() => { + const otlpEndpoint = process.env.HASH_OTLP_ENDPOINT; + if (!otlpEndpoint) { + return undefined; + } + try { + return registerOpenTelemetry({ + endpoint: otlpEndpoint, + serviceName: process.env.OTEL_SERVICE_NAME ?? "Integration Worker", + instrumentations: [ + createHttpInstrumentation(otlpEndpoint), + new GrpcInstrumentation(), + // Native `fetch` (used by Linear SDK / outbound API calls) goes + // through undici, which the http instrumentation does not patch. + // The shared helper sets `peer.service` so Tempo's service_graphs + // processor renders Linear etc. as external-service edges. + createUndiciInstrumentation(), + ], + }); + } catch (error) { + // Outside production, fail loud: realistic causes here are coding + // errors (bad URL, malformed instrumentation config) and hiding + // them in dev/CI loses regressions. + if (process.env.NODE_ENV !== "production") { + throw error; + } + // eslint-disable-next-line no-console + console.error( + "OpenTelemetry bootstrap failed; integration worker will start without telemetry.", + error, + ); + return undefined; + } +})(); diff --git a/apps/hash-integration-worker/src/main.ts b/apps/hash-integration-worker/src/main.ts index 5b3b16eccbf..2e85db0bb75 100644 --- a/apps/hash-integration-worker/src/main.ts +++ b/apps/hash-integration-worker/src/main.ts @@ -1,4 +1,8 @@ -/* eslint-disable import/first */ +/* eslint-disable import/first, import/order, simple-import-sort/imports */ + +// Must be the first import so OTEL auto-instrumentations can patch +// http / grpc / Sentry's own monkey-patches before they apply. +import { otelSetup } from "./instrument.js"; import * as Sentry from "@sentry/node"; @@ -12,9 +16,16 @@ Sentry.init({ process.env.ENVIRONMENT || (process.env.NODE_ENV === "production" ? "production" : "development"), tracesSampleRate: process.env.NODE_ENV === "production" ? 1.0 : 0, + // Sentry registers its own global `NodeTracerProvider` by default. + // Letting it run after `registerOpenTelemetry` would replace the + // provider that the OTEL workflow client interceptor (set up in + // `createTemporalClient`) holds via `trace.getTracer(...)`, breaking + // caller → workflow → activity context propagation. With this flag + // Sentry shares our provider so its spans flow through the same + // OTLP pipeline. + skipOpenTelemetrySetup: !!otelSetup, }); -import * as http from "node:http"; import { createRequire } from "node:module"; import path from "node:path"; import { fileURLToPath } from "node:url"; @@ -23,10 +34,9 @@ import { createGraphClient } from "@local/hash-backend-utils/create-graph-client import { getRequiredEnv } from "@local/hash-backend-utils/environment"; import { createCommonFlowActivities } from "@local/hash-backend-utils/flows"; import { Logger } from "@local/hash-backend-utils/logger"; -import { SentryActivityInboundInterceptor } from "@local/hash-backend-utils/temporal/interceptors/activities/sentry"; -import { sentrySinks } from "@local/hash-backend-utils/temporal/sinks/sentry"; +import type { WorkflowSource } from "@local/hash-backend-utils/temporal/worker-bootstrap"; +import { runWorker } from "@local/hash-backend-utils/temporal/worker-bootstrap"; import type { WorkflowTypeMap } from "@local/hash-backend-utils/temporal-integration-workflow-types"; -import { defaultSinks, NativeConnection, Worker } from "@temporalio/worker"; import { config } from "dotenv-flow"; import { createFlowActivities } from "./activities/flow-activities.js"; @@ -38,8 +48,9 @@ const __dirname = path.dirname(__filename); const require = createRequire(import.meta.url); -// This is a workaround to ensure that all functions defined in WorkflowTypeMap are exported from the workflows file -// They must be individually exported from the file, and it's impossible to check completeness of exports in the file itself +// Ensures that all functions defined in WorkflowTypeMap are exported from the +// workflows file. They must be individually exported, and it's impossible to +// check completeness of exports in the file itself. // eslint-disable-next-line @typescript-eslint/no-unused-vars const exportMap: WorkflowTypeMap = workflows; @@ -52,99 +63,36 @@ export const logger = new Logger({ serviceName: "integration-worker", }); -const TEMPORAL_HOST = new URL( - process.env.HASH_TEMPORAL_SERVER_HOST ?? "http://localhost", -).hostname; -const TEMPORAL_PORT = process.env.HASH_TEMPORAL_SERVER_PORT - ? parseInt(process.env.HASH_TEMPORAL_SERVER_PORT, 10) - : 7233; - -const createHealthCheckServer = () => { - const server = http.createServer((req, res) => { - if (req.method === "GET" && req.url === "/health") { - res.setHeader("Content-Type", "application/json"); - res.writeHead(200); - res.end( - JSON.stringify({ - msg: "worker healthy", - }), - ); - return; - } - res.writeHead(404); - res.end(""); - }); - - return server; -}; - -const workflowOption = () => +const workflowSource: WorkflowSource = process.env.NODE_ENV === "production" ? { - workflowBundle: { - codePath: require.resolve("../dist/workflow-bundle.js"), - }, + kind: "bundle", + bundle: { codePath: require.resolve("../dist/workflow-bundle.js") }, } - : { workflowsPath: require.resolve("./workflows") }; + : { kind: "path", workflowsPath: require.resolve("./workflows") }; async function run() { - // eslint-disable-next-line no-console - console.info("Starting integration worker..."); - const graphApiClient = createGraphClient(logger, { host: getRequiredEnv("HASH_GRAPH_HTTP_HOST"), port: parseInt(getRequiredEnv("HASH_GRAPH_HTTP_PORT"), 10), }); - const worker = await Worker.create({ - ...workflowOption(), + await runWorker({ + serviceName: "integration worker", + taskQueue: "integration", + healthCheckPort: 4300, activities: { - ...linearActivities.createLinearIntegrationActivities({ - graphApiClient, - }), - ...createFlowActivities({ - graphApiClient, - }), + ...linearActivities.createLinearIntegrationActivities({ graphApiClient }), + ...createFlowActivities({ graphApiClient }), ...createCommonFlowActivities({ graphApiClient }), }, - connection: await NativeConnection.connect({ - address: `${TEMPORAL_HOST}:${TEMPORAL_PORT}`, - }), - namespace: "HASH", - taskQueue: "integration", - sinks: { ...defaultSinks(), ...sentrySinks() }, - interceptors: { - workflowModules: [ - require.resolve( - "@local/hash-backend-utils/temporal/interceptors/workflows/sentry", - ), - ], - activityInbound: [(ctx) => new SentryActivityInboundInterceptor(ctx)], - }, + workflowSource, + otelSetup, + logger, }); - - const httpServer = createHealthCheckServer(); - const port = 4300; - httpServer.listen({ host: "0.0.0.0", port }); - // eslint-disable-next-line no-console - console.info(`HTTP server listening on port ${port}`); - - await worker.run(); } -process.on("SIGINT", () => { - // eslint-disable-next-line no-console - console.info("Received SIGINT, exiting..."); - process.exit(1); -}); -process.on("SIGTERM", () => { - // eslint-disable-next-line no-console - console.info("Received SIGTERM, exiting..."); - process.exit(1); -}); - -run().catch((err) => { - // eslint-disable-next-line no-console - console.error(err); +run().catch((error: unknown) => { + logger.error("Error running worker", { error }); process.exit(1); }); diff --git a/libs/@local/hash-backend-utils/package.json b/libs/@local/hash-backend-utils/package.json index 54e15527af8..53a7c7ade1f 100644 --- a/libs/@local/hash-backend-utils/package.json +++ b/libs/@local/hash-backend-utils/package.json @@ -39,12 +39,25 @@ "@local/status": "workspace:*", "@opentelemetry/api": "1.9.0", "@opentelemetry/api-logs": "0.207.0", + "@opentelemetry/core": "2.2.0", + "@opentelemetry/exporter-logs-otlp-grpc": "0.207.0", + "@opentelemetry/exporter-metrics-otlp-grpc": "0.207.0", + "@opentelemetry/exporter-trace-otlp-grpc": "0.207.0", + "@opentelemetry/instrumentation": "0.207.0", + "@opentelemetry/instrumentation-http": "0.207.0", + "@opentelemetry/instrumentation-undici": "0.25.0", + "@opentelemetry/resources": "2.2.0", + "@opentelemetry/sdk-logs": "0.207.0", + "@opentelemetry/sdk-metrics": "2.2.0", + "@opentelemetry/sdk-trace-base": "2.2.0", + "@opentelemetry/sdk-trace-node": "2.2.0", "@sentry/node": "10.42.0", "@smithy/protocol-http": "5.3.3", "@smithy/signature-v4": "5.3.3", "@temporalio/activity": "1.12.1", "@temporalio/client": "1.12.1", "@temporalio/common": "1.12.1", + "@temporalio/interceptors-opentelemetry": "1.12.1", "@temporalio/proto": "1.12.1", "@temporalio/worker": "1.12.1", "@temporalio/workflow": "1.12.1", diff --git a/libs/@local/hash-backend-utils/src/opentelemetry.test.ts b/libs/@local/hash-backend-utils/src/opentelemetry.test.ts new file mode 100644 index 00000000000..c610bc06797 --- /dev/null +++ b/libs/@local/hash-backend-utils/src/opentelemetry.test.ts @@ -0,0 +1,191 @@ +import type { ClientRequest, IncomingMessage } from "node:http"; + +import { type Span, trace } from "@opentelemetry/api"; +import { describe, expect, it } from "vitest"; + +import { + createHttpInstrumentation, + httpRequestSpanNameHook, + resolvePeerService, +} from "./opentelemetry.js"; + +describe("resolvePeerService", () => { + it("matches exact hosts to their service label", () => { + expect(resolvePeerService("api.openai.com")).toBe("OpenAI"); + expect(resolvePeerService("api.anthropic.com")).toBe("Anthropic"); + expect(resolvePeerService("api.linear.app")).toBe("Linear"); + }); + + it("matches suffix rules for subdomains", () => { + expect(resolvePeerService("bigquery.googleapis.com")).toBe("Google Cloud"); + expect(resolvePeerService("aiplatform.googleapis.com")).toBe( + "Google Cloud", + ); + }); + + it("does not match a suffix rule against the bare domain", () => { + // `.googleapis.com` (with leading dot) only matches if the host + // ends with that — `googleapis.com` itself does not. + expect(resolvePeerService("googleapis.com")).toBeUndefined(); + }); + + it("does not match unrelated hosts", () => { + expect(resolvePeerService("example.com")).toBeUndefined(); + expect(resolvePeerService("openai.com")).toBeUndefined(); + expect(resolvePeerService("anthropic.com")).toBeUndefined(); + }); + + it("does not match a substring inside a host segment", () => { + expect(resolvePeerService("not-api.openai.com.evil.test")).toBeUndefined(); + }); + + // Suffix rules use `.endsWith(rule.suffix)` with the leading dot, so a + // host that happens to share the suffix without the dot boundary + // (e.g. `evilgoogleapis.com`) must not match. This is the property + // that prevents lookalike-domain attribution. + it("requires the suffix dot boundary", () => { + expect(resolvePeerService("evilgoogleapis.com")).toBeUndefined(); + expect(resolvePeerService("googleapis.com.evil.test")).toBeUndefined(); + }); +}); + +describe("httpRequestSpanNameHook", () => { + /** + * Build a recording span with a mutable `updateName` capture so the + * hook's effect can be asserted without a full TracerProvider. + */ + const makeSpan = (): { span: Span; updates: string[] } => { + const updates: string[] = []; + const noopSpan = trace.getTracer("test").startSpan("noop"); + const span: Span = Object.assign(noopSpan, { + updateName: (name: string) => { + updates.push(name); + return span; + }, + }); + return { span, updates }; + }; + + it("renames incoming requests to METHOD /path", () => { + const { span, updates } = makeSpan(); + const incoming = { + method: "GET", + url: "/api/v1/widgets", + } as Partial; + + httpRequestSpanNameHook(span, incoming as IncomingMessage); + + expect(updates).toEqual(["GET /api/v1/widgets"]); + }); + + it("prefers Express's `originalUrl` over `url` when both are present", () => { + const { span, updates } = makeSpan(); + const incoming = { + method: "POST", + originalUrl: "/graphql", + url: "/", // Express rewrites url after route matching + } as Partial & { originalUrl: string }; + + httpRequestSpanNameHook( + span, + incoming as IncomingMessage & { originalUrl: string }, + ); + + expect(updates).toEqual(["POST /graphql"]); + }); + + it("renames outgoing requests using `path`", () => { + const { span, updates } = makeSpan(); + const outgoing = { + method: "POST", + path: "/v1/embeddings", + } as unknown as ClientRequest; + + httpRequestSpanNameHook(span, outgoing); + + expect(updates).toEqual(["POST /v1/embeddings"]); + }); + + it("strips query string to keep cardinality bounded", () => { + const { span, updates } = makeSpan(); + httpRequestSpanNameHook(span, { + method: "GET", + url: "/search?q=secret&page=2", + } as IncomingMessage); + + expect(updates).toEqual(["GET /search"]); + }); + + it("does nothing when method is missing", () => { + const { span, updates } = makeSpan(); + httpRequestSpanNameHook(span, { + url: "/api/widgets", + } as IncomingMessage); + + expect(updates).toEqual([]); + }); + + it("does nothing when no path source is available", () => { + const { span, updates } = makeSpan(); + httpRequestSpanNameHook(span, { method: "GET" } as IncomingMessage); + + expect(updates).toEqual([]); + }); +}); + +describe("createHttpInstrumentation OTLP-port filter", () => { + /** + * Read the configured `ignoreOutgoingRequestHook` back off the + * instrumentation. Without this, a regression that drops the filter + * would only show up at runtime as exporter traffic feeding back into + * itself, amplifying span volume per export batch. + */ + const ignoreOutgoingFor = (otlpEndpoint: string) => { + const config = createHttpInstrumentation(otlpEndpoint).getConfig(); + const hook = config.ignoreOutgoingRequestHook; + if (!hook) { + throw new Error("ignoreOutgoingRequestHook should be set"); + } + return (port: number | string | undefined) => + hook({ port } as Parameters>[0]); + }; + + it("ignores outgoing requests to the configured OTLP gRPC port", () => { + const ignored = ignoreOutgoingFor("http://collector:4317"); + expect(ignored(4317)).toBe(true); + expect(ignored(443)).toBe(false); + }); + + // RequestOptions.port is `string | number | null | undefined`; a strict + // `===` would let "4317" through and the exporter would self-trace. + it("matches the OTLP port whether callers pass a number or a string", () => { + const ignored = ignoreOutgoingFor("http://collector:4317"); + expect(ignored("4317")).toBe(true); + expect(ignored("443")).toBe(false); + expect(ignored(undefined)).toBe(false); + }); + + it("derives a non-default OTLP port from the endpoint URL", () => { + // `:4318` is the OTLP/HTTP convention; if the helper hardcoded 4317 + // a self-instrumented exporter on 4318 would feed back into itself. + const ignored = ignoreOutgoingFor("http://collector:4318"); + expect(ignored(4318)).toBe(true); + expect(ignored(4317)).toBe(false); + }); + + it("falls back to 4317 when the endpoint has no explicit port", () => { + const ignored = ignoreOutgoingFor("http://collector"); + expect(ignored(4317)).toBe(true); + expect(ignored(8080)).toBe(false); + }); + + it("falls back to 4317 when the endpoint is malformed", () => { + // Bad URL must not throw on every outgoing request — that would + // disable HTTP tracing entirely. Falling back to 4317 keeps the + // tracer running; `registerOpenTelemetry` surfaces the URL error + // separately when it builds the exporter. + const ignored = ignoreOutgoingFor("not a url"); + expect(ignored(4317)).toBe(true); + expect(ignored(443)).toBe(false); + }); +}); diff --git a/libs/@local/hash-backend-utils/src/opentelemetry.ts b/libs/@local/hash-backend-utils/src/opentelemetry.ts new file mode 100644 index 00000000000..638746a7c9d --- /dev/null +++ b/libs/@local/hash-backend-utils/src/opentelemetry.ts @@ -0,0 +1,353 @@ +/** + * OpenTelemetry registration for HASH Node services. + * + * Registers a global trace, log, and metric provider against an OTLP/gRPC + * collector when `endpoint` is set, plus any caller-supplied auto + * instrumentations (HTTP, Express, gRPC, …). + * + * Returns a teardown function that must run during graceful shutdown so + * pending spans / log records / metric points are flushed before exit. + */ +import { metrics } from "@opentelemetry/api"; +import { logs } from "@opentelemetry/api-logs"; +import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-grpc"; +import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-grpc"; +import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc"; +import type { Instrumentation } from "@opentelemetry/instrumentation"; +import { registerInstrumentations } from "@opentelemetry/instrumentation"; +import { + HttpInstrumentation, + type HttpInstrumentationConfig, +} from "@opentelemetry/instrumentation-http"; +import { UndiciInstrumentation } from "@opentelemetry/instrumentation-undici"; +import type { Resource } from "@opentelemetry/resources"; +import { + defaultResource, + resourceFromAttributes, +} from "@opentelemetry/resources"; +import { + BatchLogRecordProcessor, + LoggerProvider, +} from "@opentelemetry/sdk-logs"; +import { + MeterProvider, + PeriodicExportingMetricReader, +} from "@opentelemetry/sdk-metrics"; +import type { SpanExporter } from "@opentelemetry/sdk-trace-base"; +import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-base"; +import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"; + +const traceTimeoutMs = 5000; +const metricExportIntervalMs = 30_000; +const shutdownTimeoutMs = 2000; + +export interface RegisterOpenTelemetryOptions { + /** + * OTLP gRPC endpoint, e.g. `http://localhost:4317`. Falsy values disable + * registration entirely (useful for local development without a + * collector running). + */ + endpoint: string | null | undefined; + /** `service.name` resource attribute. */ + serviceName: string; + /** Auto-instrumentations to register (HTTP, gRPC, Express, …). */ + instrumentations?: Instrumentation[]; +} + +export interface OpenTelemetrySetup { + /** OTLP gRPC endpoint this setup is attached to. */ + endpoint: string; + /** Run during graceful shutdown to flush pending spans / logs / metrics. */ + shutdown: () => Promise; + /** + * The trace exporter, exposed so callers can build worker-side sinks + * (e.g. Temporal's `makeWorkflowExporter`) that share this connection. + */ + traceExporter: SpanExporter; + /** The resource used for traces / logs / metrics, shared with sinks. */ + resource: Resource; +} + +/** + * Mapping of outbound `host` → `peer.service` label used by Tempo's + * `service_graphs` processor to render external dependencies as + * separate nodes in the service map. + * + * Order matters: the first match wins, so place narrower exact matches + * before broader suffix matches. `kind: "suffix"` matches against the + * tail of the host (e.g. `.googleapis.com` matches `bigquery.googleapis.com` + * but not the bare `googleapis.com`). + */ +type PeerServiceRule = + | { kind: "exact"; host: string; service: string } + | { kind: "suffix"; suffix: string; service: string }; + +const PEER_SERVICE_RULES: readonly PeerServiceRule[] = [ + { kind: "exact", host: "api.openai.com", service: "OpenAI" }, + { kind: "exact", host: "api.anthropic.com", service: "Anthropic" }, + { kind: "exact", host: "api.linear.app", service: "Linear" }, + { kind: "suffix", suffix: ".googleapis.com", service: "Google Cloud" }, +]; + +export const resolvePeerService = (host: string): string | undefined => { + for (const rule of PEER_SERVICE_RULES) { + if (rule.kind === "exact" && rule.host === host) { + return rule.service; + } + if (rule.kind === "suffix" && host.endsWith(rule.suffix)) { + return rule.service; + } + } + return undefined; +}; + +/** + * Undici instrumentation configured to: + * + * - Tag spans with `peer.service` derived from the outbound host. Tempo's + * `service_graphs` processor turns this into an external-service edge + * in the service map. + * - Name spans `METHOD path`. The host already lives in `peer.service`, + * so the span name only carries the path. + */ +export const createUndiciInstrumentation = (): UndiciInstrumentation => + new UndiciInstrumentation({ + startSpanHook: (request) => { + try { + // `hostname` strips the port unconditionally; `host` keeps it for + // non-default ports (e.g. `collector:4318`), which would miss + // exact-host matches in `resolvePeerService`. + const { hostname } = new URL(request.origin); + const peerService = resolvePeerService(hostname); + return peerService ? { "peer.service": peerService } : {}; + } catch { + return {}; + } + }, + requestHook: (span, request) => { + if (typeof request.path !== "string") { + return; + } + // Strip query string to keep cardinality bounded. + const path = request.path.split("?")[0]; + if (path) { + span.updateName(`${request.method} ${path}`); + } + }, + }); + +/** + * `requestHook` for `@opentelemetry/instrumentation-http` that names + * spans `METHOD /path`. Path source depends on the request shape: + * outgoing `ClientRequest` exposes `path`, incoming `IncomingMessage` + * exposes `url`. `originalUrl` is checked first as a no-cost fallback + * for the case where Express has already wrapped the request before + * the hook reads it. + */ +export const httpRequestSpanNameHook: NonNullable< + HttpInstrumentationConfig["requestHook"] +> = (span, request) => { + if (!("method" in request) || !request.method) { + return; + } + const candidates = [ + "originalUrl" in request ? request.originalUrl : undefined, + "url" in request ? request.url : undefined, + "path" in request ? request.path : undefined, + ]; + const rawPath = candidates.find( + (value): value is string => typeof value === "string", + ); + // Strip query string to keep cardinality bounded. + const path = rawPath?.split("?")[0]; + if (path) { + span.updateName(`${request.method} ${path}`); + } +}; + +/** + * Default OTLP/gRPC port. Used when the configured endpoint URL does not + * carry an explicit port (e.g. `http://collector` resolves via gRPC default). + */ +const DEFAULT_OTLP_PORT = 4317; + +const otlpPortFromEndpoint = (otlpEndpoint: string): number => { + try { + const { port } = new URL(otlpEndpoint); + return port ? Number.parseInt(port, 10) : DEFAULT_OTLP_PORT; + } catch { + // `registerOpenTelemetry` will surface the malformed-URL error when + // it builds the exporter; here we just fall back so the filter does + // not throw on every outgoing request. + return DEFAULT_OTLP_PORT; + } +}; + +/** + * `@opentelemetry/instrumentation-http` configured for HASH services: + * + * - Skips outgoing requests to the OTLP collector port. Without this filter + * each export would itself produce a span, which would be batched for + * export, which would produce another span — amplifying export volume on + * every batch. The port is derived from `otlpEndpoint` so a non-default + * collector port (e.g. `:4318` for OTLP/HTTP) still gets ignored. + * - Names spans `METHOD /path` via {@link httpRequestSpanNameHook}. + * + * Pass `extra` to merge per-service options (e.g. `ignoreIncomingPaths`). + * `ignoreOutgoingRequestHook` and `requestHook` are intentionally not + * mergeable here — callers needing different shapes should construct + * `HttpInstrumentation` directly. + */ +export const createHttpInstrumentation = ( + otlpEndpoint: string, + extra: Omit< + HttpInstrumentationConfig, + "ignoreOutgoingRequestHook" | "requestHook" + > = {}, +): HttpInstrumentation => { + const otlpPort = otlpPortFromEndpoint(otlpEndpoint); + return new HttpInstrumentation({ + ...extra, + ignoreOutgoingRequestHook: (options) => + // `RequestOptions.port` is `string | number | null | undefined`; + // some callers (raw `http.request`, axios) pass a string. Coerce + // to compare consistently — otherwise `"4317" === 4317` is false + // and the exporter's own outbound traffic slips through. + Number(options.port) === otlpPort, + requestHook: httpRequestSpanNameHook, + }); +}; + +const shutdownWithTimeout = async ( + label: string, + shutdown: () => Promise, +): Promise => { + let timer: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => { + reject( + new Error( + `${label} shutdown exceeded ${shutdownTimeoutMs}ms — pending exports may be dropped.`, + ), + ); + }, shutdownTimeoutMs); + }); + try { + await Promise.race([shutdown(), timeoutPromise]); + } finally { + if (timer) { + clearTimeout(timer); + } + } +}; + +/** + * Initialise tracing, logging, and metrics. Returns `undefined` when + * `endpoint` is unset so callers can skip workflow-side sink wiring. + */ +export const registerOpenTelemetry = ({ + endpoint, + serviceName, + instrumentations = [], +}: RegisterOpenTelemetryOptions): OpenTelemetrySetup | undefined => { + if (!endpoint) { + // Runs before any logger is wired up, so direct stderr is the + // right channel. + // eslint-disable-next-line no-console + console.warn( + "No OpenTelemetry Protocol endpoint given. Not sending telemetry anywhere.", + ); + return undefined; + } + + const collectorOptions = { + timeoutMillis: traceTimeoutMs, + url: endpoint, + }; + + const resource = defaultResource().merge( + resourceFromAttributes({ "service.name": serviceName }), + ); + + // Batch processors keep span / log export off the request path. The + // Simple variants export each record synchronously, which under load + // saturates the gRPC connection and adds tail latency to every + // request. + const traceExporter = new OTLPTraceExporter(collectorOptions); + const traceProvider = new NodeTracerProvider({ + resource, + spanProcessors: [new BatchSpanProcessor(traceExporter)], + }); + traceProvider.register(); + + const logExporter = new OTLPLogExporter(collectorOptions); + const logProvider = new LoggerProvider({ + resource, + processors: [new BatchLogRecordProcessor(logExporter)], + }); + logs.setGlobalLoggerProvider(logProvider); + + const metricExporter = new OTLPMetricExporter(collectorOptions); + const meterProvider = new MeterProvider({ + resource, + readers: [ + new PeriodicExportingMetricReader({ + exporter: metricExporter, + exportIntervalMillis: metricExportIntervalMs, + }), + ], + }); + metrics.setGlobalMeterProvider(meterProvider); + + const unregisterInstrumentations = registerInstrumentations({ + instrumentations, + }); + + // eslint-disable-next-line no-console + console.info( + `Registered OpenTelemetry (traces + logs + metrics) at endpoint ${endpoint} for ${serviceName}`, + ); + + return { + endpoint, + traceExporter, + resource, + shutdown: async () => { + // Flush each provider with a per-provider timeout so a stuck + // exporter (collector unreachable, gRPC channel hung) cannot + // block the SIGTERM handler indefinitely. Failures are surfaced + // to stderr because the logger may already be shutting down. + const targets: Array Promise]> = [ + ["trace provider", () => traceProvider.shutdown()], + ["log provider", () => logProvider.shutdown()], + ["meter provider", () => meterProvider.shutdown()], + ]; + const results = await Promise.allSettled( + targets.map(async ([label, run]) => { + try { + await shutdownWithTimeout(label, run); + } catch (error) { + // eslint-disable-next-line no-console + console.error("OpenTelemetry %s shutdown failed:", label, error); + throw error; + } + }), + ); + unregisterInstrumentations(); + // `Promise.allSettled` itself never rejects, so without inspecting the + // results the caller's `catch` block would never fire and downstream + // exit-code / error-reporting logic would treat partial flush failures + // as success. Surface them as an `AggregateError` so the caller can + // react. + const failures = results.flatMap((result) => + result.status === "rejected" ? [result.reason as unknown] : [], + ); + if (failures.length > 0) { + throw new AggregateError( + failures, + "One or more OpenTelemetry providers failed to shut down", + ); + } + }, + }; +}; diff --git a/libs/@local/hash-backend-utils/src/temporal.ts b/libs/@local/hash-backend-utils/src/temporal.ts index c8afa23a291..da82626d559 100644 --- a/libs/@local/hash-backend-utils/src/temporal.ts +++ b/libs/@local/hash-backend-utils/src/temporal.ts @@ -1,13 +1,14 @@ +import { trace } from "@opentelemetry/api"; import { Client as TemporalClient, Connection } from "@temporalio/client"; +import { OpenTelemetryWorkflowClientInterceptor } from "@temporalio/interceptors-opentelemetry"; import { getRequiredEnv } from "./environment.js"; -import type { Logger } from "./logger.js"; export { Client as TemporalClient } from "@temporalio/client"; export const temporalNamespace = "HASH"; -export const createTemporalClient = async (_logger?: Logger) => { +export const createTemporalClient = async () => { const temporalServerHost = getRequiredEnv("HASH_TEMPORAL_SERVER_HOST"); const host = new URL(temporalServerHost).hostname; @@ -18,5 +19,23 @@ export const createTemporalClient = async (_logger?: Logger) => { address: `${host}:${port}`, }); - return new TemporalClient({ connection, namespace: temporalNamespace }); + // When OTEL is configured the active trace context (e.g. an Express + // HTTP span) is injected into workflow start headers. The worker-side + // interceptors extract it and parent the workflow + activity spans + // off the caller's trace. + const interceptors = process.env.HASH_OTLP_ENDPOINT + ? { + workflow: [ + new OpenTelemetryWorkflowClientInterceptor({ + tracer: trace.getTracer("@temporalio/interceptors-opentelemetry"), + }), + ], + } + : undefined; + + return new TemporalClient({ + connection, + namespace: temporalNamespace, + interceptors, + }); }; diff --git a/libs/@local/hash-backend-utils/src/temporal/interceptors/activities/opentelemetry.ts b/libs/@local/hash-backend-utils/src/temporal/interceptors/activities/opentelemetry.ts new file mode 100644 index 00000000000..f1b16d22b55 --- /dev/null +++ b/libs/@local/hash-backend-utils/src/temporal/interceptors/activities/opentelemetry.ts @@ -0,0 +1,16 @@ +/** + * Activity-side OpenTelemetry interceptors. + * + * Re-exports the upstream interceptors so workers reference the + * `@local/hash-backend-utils/...` path consistently with the existing + * Sentry interceptor wiring. + * + * Activity duration / outcome metrics (latency histograms, failed counts) + * are emitted by Temporal's SDK runtime telemetry — see + * `Runtime.install({ telemetryOptions })` in each worker's `main.ts`. + * No custom metrics interceptor is needed. + */ +export { + OpenTelemetryActivityInboundInterceptor, + OpenTelemetryActivityOutboundInterceptor, +} from "@temporalio/interceptors-opentelemetry"; diff --git a/libs/@local/hash-backend-utils/src/temporal/interceptors/workflows/opentelemetry.ts b/libs/@local/hash-backend-utils/src/temporal/interceptors/workflows/opentelemetry.ts new file mode 100644 index 00000000000..02b1c74bafa --- /dev/null +++ b/libs/@local/hash-backend-utils/src/temporal/interceptors/workflows/opentelemetry.ts @@ -0,0 +1,21 @@ +/** + * OpenTelemetry workflow interceptors. + * + * Used as a workflow module via Temporal's `workflowModules` option so the + * upstream `@temporalio/interceptors-opentelemetry` interceptors can run + * inside the workflow sandbox. Pair with the worker-side + * `makeWorkflowExporter` sink so the spans created here are exported to + * the worker's OTEL trace provider. + */ +import { + OpenTelemetryInboundInterceptor, + OpenTelemetryInternalsInterceptor, + OpenTelemetryOutboundInterceptor, +} from "@temporalio/interceptors-opentelemetry"; +import type { WorkflowInterceptors } from "@temporalio/workflow"; + +export const interceptors = (): WorkflowInterceptors => ({ + inbound: [new OpenTelemetryInboundInterceptor()], + outbound: [new OpenTelemetryOutboundInterceptor()], + internals: [new OpenTelemetryInternalsInterceptor()], +}); diff --git a/libs/@local/hash-backend-utils/src/temporal/worker-bootstrap.ts b/libs/@local/hash-backend-utils/src/temporal/worker-bootstrap.ts new file mode 100644 index 00000000000..5a2059b243f --- /dev/null +++ b/libs/@local/hash-backend-utils/src/temporal/worker-bootstrap.ts @@ -0,0 +1,354 @@ +/** + * Shared bootstrap for HASH Temporal workers. Both + * `hash-ai-worker-ts` and `hash-integration-worker` enter through + * `runWorker`, supplying per-service deltas (service name, task queue, + * port, activities, workflow bundle path) as options. + * + * `Sentry.init` stays in each worker's `main.ts` because ESM import + * ordering requires it before the rest of the imports — that cannot + * be reproduced from a helper module. Everything from `Runtime.install` + * onwards is centralised here. + */ +import * as http from "node:http"; +import { createRequire } from "node:module"; + +import type { + ActivityInterceptorsFactory, + WorkerOptions, + WorkflowBundleOption, +} from "@temporalio/worker"; +import { + DefaultLogger, + defaultSinks, + NativeConnection, + Runtime, + Worker, +} from "@temporalio/worker"; + +import type { Logger } from "../logger.js"; +import type { OpenTelemetrySetup } from "../opentelemetry.js"; +import { + OpenTelemetryActivityInboundInterceptor, + OpenTelemetryActivityOutboundInterceptor, +} from "./interceptors/activities/opentelemetry.js"; +import { SentryActivityInboundInterceptor } from "./interceptors/activities/sentry.js"; +import { sentrySinks } from "./sinks/sentry.js"; +import { makeV2WorkflowSink } from "./workflow-span-adapter.js"; + +const require = createRequire(import.meta.url); + +/** + * Adapter that pipes Temporal SDK logs (both Rust core and Node-side + * worker events) through the application logger, keeping them in the + * same JSON format and log-level scheme as the rest of the worker + * output. Lives here rather than in `temporal.ts` so the API server's + * import of `createTemporalClient` does not pull in `@temporalio/worker` + * (which bundles native Rust core bindings). + */ +const createTemporalSdkLogger = (logger: Logger): DefaultLogger => + // DefaultLogger filters at INFO, so TRACE / DEBUG paths only fire + // when the level is bumped at the call site. + new DefaultLogger("INFO", ({ level, message, meta }) => { + switch (level) { + case "TRACE": + case "DEBUG": + logger.debug(message, meta); + return; + case "INFO": + logger.info(message, meta); + return; + case "WARN": + logger.warn(message, meta); + return; + case "ERROR": + logger.error(message, meta); + return; + default: + logger.warn(`Unknown Temporal SDK log level: ${level as string}`, { + message, + meta, + }); + } + }); + +const TEMPORAL_DEFAULT_PORT = 7233; + +const getTemporalAddress = (): string => { + const host = new URL( + process.env.HASH_TEMPORAL_SERVER_HOST ?? "http://localhost", + ).hostname; + const port = process.env.HASH_TEMPORAL_SERVER_PORT + ? parseInt(process.env.HASH_TEMPORAL_SERVER_PORT, 10) + : TEMPORAL_DEFAULT_PORT; + return `${host}:${port}`; +}; + +const createHealthCheckServer = (): http.Server => + http.createServer((req, res) => { + if (req.method === "GET" && req.url === "/health") { + res.setHeader("Content-Type", "application/json"); + res.writeHead(200); + res.end(JSON.stringify({ msg: "worker healthy" })); + return; + } + res.writeHead(404); + res.end(""); + }); + +/** + * Source of workflow code passed to `Worker.create`. + * + * - `bundle` — a prebuilt webpack bundle (`workflowBundle.codePath`), + * produced by the per-worker `bundle-workflow-code.ts` script. Used + * in production builds. + * - `path` — a TypeScript entry-point that the worker bundles in-process + * (`workflowsPath`), with optional `bundlerOptions` for things like + * `tsconfig-paths-webpack-plugin`. Used in development. + */ +export type WorkflowSource = + | { kind: "bundle"; bundle: WorkflowBundleOption } + | { + kind: "path"; + workflowsPath: string; + bundlerOptions?: WorkerOptions["bundlerOptions"]; + }; + +/** + * Per-worker tuning passed straight through to `Worker.create`. Add + * keys here as needed; helper-owned wiring stays inaccessible. + */ +export type ExtraWorkerOptions = Pick< + WorkerOptions, + "maxHeartbeatThrottleInterval" +>; + +export interface RunWorkerOptions { + /** + * Logged once at startup and used as `service.name` for OTEL traces / + * logs / metrics. The Temporal worker identity stays at the SDK default + * (`pid@hostname`) so multiple replicas remain distinguishable in the + * Temporal UI. + */ + serviceName: string; + /** Temporal task queue this worker pulls work from. */ + taskQueue: string; + /** Port the health-check server listens on. */ + healthCheckPort: number; + /** + * Activities object passed to `Worker.create({ activities })`. The + * caller assembles this from per-worker activity factories. + */ + activities: WorkerOptions["activities"]; + /** Where the workflow code lives. See {@link WorkflowSource}. */ + workflowSource: WorkflowSource; + /** + * Additional `Worker.create` options for per-worker tuning (e.g. + * `maxHeartbeatThrottleInterval`). + */ + workerOptions?: ExtraWorkerOptions; + /** OTEL setup handle from `instrument.ts`; `undefined` disables OTEL wiring. */ + otelSetup: OpenTelemetrySetup | undefined; + /** Application logger shared with the rest of the worker. */ + logger: Logger; +} + +const expandWorkflowSource = ( + source: WorkflowSource, +): Pick< + WorkerOptions, + "workflowBundle" | "workflowsPath" | "bundlerOptions" +> => { + switch (source.kind) { + case "bundle": + return { workflowBundle: source.bundle }; + case "path": + return { + workflowsPath: source.workflowsPath, + bundlerOptions: source.bundlerOptions, + }; + } +}; + +/** + * Boot a HASH Temporal worker. Installs the Temporal SDK runtime + * telemetry (when OTEL is configured), connects to the Temporal server, + * builds the activity interceptor chain, registers the workflow + sink + * wiring, starts a health-check HTTP server, and finally enters + * `worker.run()`. Returns once the worker has drained on SIGTERM/SIGINT. + */ +export async function runWorker(opts: RunWorkerOptions): Promise { + const { logger, otelSetup } = opts; + + logger.info(`Starting ${opts.serviceName}...`); + + // Install signal handlers up-front so a SIGTERM during the + // potentially-slow startup phase (NativeConnection.connect, workflow + // bundle compile, Worker.create) doesn't terminate the process via + // the Node default — we'd lose the OTEL flush. The handler captures a + // reference to `worker` once it's set; until then the linear cleanup + // path below handles flush+exit when `workerRunPromise` (also unset + // pre-Worker.create) is replaced by a sentinel that resolves + // immediately on signal during startup. + let worker: Worker | undefined; + let shuttingDown = false; + const onSignal = (signal: NodeJS.Signals) => { + if (shuttingDown) { + return; + } + shuttingDown = true; + logger.info(`Received ${signal}, exiting...`); + if (worker) { + try { + worker.shutdown(); + } catch (error) { + logger.error("Worker shutdown trigger failed", { error }); + } + } + // If the worker hasn't been created yet, do nothing more here: + // `runWorker` is still in startup and the early-exit branch below + // will fall through to the linear cleanup path when control + // returns from whichever `await` is pending. + }; + process.on("SIGINT", onSignal); + process.on("SIGTERM", onSignal); + + // Temporal SDK runtime telemetry: emits SDK-internal metrics (worker + // slot utilisation, sticky cache hits, polling latency, activity / + // workflow execution latency) directly to OTLP, and forwards Rust + // core logs through the Node-side logger so they share the + // application's log pipeline. Must run before any Connection / + // Worker is created. Separate channel from the per-activity user-code + // spans the interceptors below produce. + if (otelSetup) { + Runtime.install({ + logger: createTemporalSdkLogger(logger), + telemetryOptions: { + metrics: { + otel: { + url: otelSetup.endpoint, + metricsExportInterval: "30s", + }, + }, + logging: { forward: { level: "INFO" } }, + }, + }); + } + + const connection = await NativeConnection.connect({ + address: getTemporalAddress(), + }); + logger.info("Created Temporal connection"); + + // OTEL interceptor must precede Sentry: `composeInterceptors` builds + // the chain right-to-left so index 0 is outermost. The OTEL inbound + // half extracts the trace context that the workflow's outbound OTEL + // interceptor injected via `scheduleActivity`, re-establishing the + // parent before any other interceptor opens a span. The outbound half + // stamps `trace_id` / `span_id` / `trace_flags` onto activity log + // lines so Loki ↔ Tempo correlation works. + const activityInterceptors: ActivityInterceptorsFactory[] = []; + if (otelSetup) { + activityInterceptors.push((ctx) => ({ + inbound: new OpenTelemetryActivityInboundInterceptor(ctx), + outbound: new OpenTelemetryActivityOutboundInterceptor(ctx), + })); + } + activityInterceptors.push((ctx) => ({ + inbound: new SentryActivityInboundInterceptor(ctx), + })); + + worker = await Worker.create({ + ...opts.workerOptions, + ...expandWorkflowSource(opts.workflowSource), + activities: opts.activities, + connection, + namespace: "HASH", + taskQueue: opts.taskQueue, + sinks: { + ...defaultSinks(), + ...sentrySinks(), + ...(otelSetup ? { exporter: makeV2WorkflowSink(otelSetup) } : {}), + }, + interceptors: { + workflowModules: [ + require.resolve( + "@local/hash-backend-utils/temporal/interceptors/workflows/sentry", + ), + require.resolve( + "@local/hash-backend-utils/temporal/interceptors/workflows/opentelemetry", + ), + ], + activity: activityInterceptors, + }, + }); + + const httpServer = createHealthCheckServer(); + httpServer.on("error", (error) => + logger.error("Health-check server error", { error }), + ); + await new Promise((resolve, reject) => { + const onError = (error: Error) => { + httpServer.removeListener("error", onError); + reject(error); + }; + httpServer.once("error", onError); + httpServer.listen({ host: "0.0.0.0", port: opts.healthCheckPort }, () => { + httpServer.removeListener("error", onError); + logger.info(`HTTP server listening on port ${opts.healthCheckPort}`); + resolve(); + }); + }); + + // `worker.run()` resolves once the SDK has fully drained in-flight + // activities after `worker.shutdown()` is called. If a signal already + // arrived during startup, skip running entirely and fall through to + // the cleanup path below. + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + const workerRunPromise = shuttingDown ? Promise.resolve() : worker.run(); + + let exitCode = 0; + let workerError: unknown; + try { + await workerRunPromise; + } catch (error) { + workerError = error; + exitCode = 1; + // `shuttingDown` is mutated in the signal-handler closure; TS narrows + // it to the initial `false` here so the conditional looks dead to + // eslint. + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + logger.error(shuttingDown ? "Worker drain failed" : "Worker run failed", { + error, + }); + } + + // `http.Server.close()` reports failures via the optional callback, + // not synchronously, so a `try`/`catch` around the bare call would be + // dead. Pass a callback to log close failures and let the OTEL flush + // below give the listening socket time to release before exit. + httpServer.close((error) => { + if (error) { + logger.error("Health-check server close failed", { error }); + } + }); + try { + await otelSetup?.shutdown(); + } catch (error) { + logger.error("Failed to flush OpenTelemetry", { error }); + exitCode = 1; + } + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (shuttingDown) { + // Signal-driven exit: `main.ts` has nothing to do after `run()` + // returns, so terminate explicitly with the chosen exit code. + process.exit(exitCode); + } + if (workerError) { + // The catch block stored whatever the SDK threw; rethrow the same + // value so the caller sees the original. + // eslint-disable-next-line @typescript-eslint/only-throw-error + throw workerError; + } + // Clean worker exit: return; `main.ts` has no further work. +} diff --git a/libs/@local/hash-backend-utils/src/temporal/workflow-span-adapter.test.ts b/libs/@local/hash-backend-utils/src/temporal/workflow-span-adapter.test.ts new file mode 100644 index 00000000000..3ec2ac79431 --- /dev/null +++ b/libs/@local/hash-backend-utils/src/temporal/workflow-span-adapter.test.ts @@ -0,0 +1,265 @@ +import type { ReadableSpan, SpanExporter } from "@opentelemetry/sdk-trace-base"; +import { describe, expect, it } from "vitest"; + +import { wrapWorkflowSpanExporter } from "./workflow-span-adapter.js"; + +/** + * Recording exporter that captures the spans handed to it without + * actually exporting anywhere. Used to assert that the adapter + * produces v2-shaped spans before the OTLP transformer sees them. + */ +const recordingExporter = (): { + exporter: SpanExporter; + exported: ReadableSpan[]; +} => { + const exported: ReadableSpan[] = []; + return { + exported, + exporter: { + export: (spans, callback) => { + exported.push(...spans); + callback({ code: 0 }); + }, + shutdown: () => Promise.resolve(), + forceFlush: () => Promise.resolve(), + }, + }; +}; + +/** + * Build a v1-shaped Temporal span. Mirrors the shape that + * `@temporalio/interceptors-opentelemetry`'s `extractReadableSpan` + * produces: `instrumentationLibrary` (not `instrumentationScope`) and + * `parentSpanId` (not `parentSpanContext`). + */ +const v1Span = (overrides: Partial> = {}) => + ({ + name: "RunWorkflow:exampleWorkflow", + spanContext: () => ({ + traceId: "0af7651916cd43dd8448eb211c80319c", + spanId: "b7ad6b7169203331", + traceFlags: 1, + }), + instrumentationLibrary: { name: "@temporalio/interceptor-workflow" }, + parentSpanId: "00f067aa0ba902b7", + ...overrides, + }) as unknown as ReadableSpan; + +describe("wrapWorkflowSpanExporter / normaliseSpan", () => { + it("synthesises instrumentationScope from instrumentationLibrary", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + wrapped.export([v1Span()], () => {}); + + expect(exported).toHaveLength(1); + expect(exported[0]!.instrumentationScope).toEqual({ + name: "@temporalio/interceptor-workflow", + }); + }); + + it("synthesises parentSpanContext from parentSpanId, marking parent remote", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + wrapped.export([v1Span()], () => {}); + + const ctx = exported[0]!.parentSpanContext; + expect(ctx).toEqual({ + traceId: "0af7651916cd43dd8448eb211c80319c", + spanId: "00f067aa0ba902b7", + traceFlags: 1, + isRemote: true, + }); + }); + + it("falls back to an 'unknown' scope when both legacy and v2 fields are missing", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + wrapped.export( + [v1Span({ instrumentationLibrary: undefined, parentSpanId: undefined })], + () => {}, + ); + + expect(exported[0]!.instrumentationScope).toEqual({ name: "unknown" }); + expect(exported[0]!.parentSpanContext).toBeUndefined(); + }); + + it("leaves a span with no parent untouched on the parent field", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + wrapped.export([v1Span({ parentSpanId: undefined })], () => {}); + + expect(exported[0]!.parentSpanContext).toBeUndefined(); + }); + + it("passes through spans that are already v2-shaped", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + const scope = { name: "@opentelemetry/sdk-trace-node" }; + const parentSpanContext = { + traceId: "0af7651916cd43dd8448eb211c80319c", + spanId: "00f067aa0ba902b7", + traceFlags: 1, + isRemote: false, + }; + const v2 = { + ...v1Span(), + instrumentationLibrary: undefined, + parentSpanId: undefined, + instrumentationScope: scope, + parentSpanContext, + } as unknown as ReadableSpan; + + wrapped.export([v2], () => {}); + + expect(exported[0]).toBe(v2); + }); + + it("preserves attributes, events, kind, and `spanContext()` callability", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + const ctx = { + traceId: "0af7651916cd43dd8448eb211c80319c", + spanId: "b7ad6b7169203331", + traceFlags: 1, + }; + const events = [{ name: "evt", time: [0, 0], attributes: { x: 1 } }]; + const links = [{ context: ctx }]; + const span = v1Span({ + kind: 1, + attributes: { "service.name": "x" }, + events, + links, + startTime: [123, 0], + endTime: [124, 0], + duration: [1, 0], + ended: true, + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + spanContext: () => ctx, + }); + + wrapped.export([span], () => {}); + + const out = exported[0]!; + expect(out.kind).toBe(1); + expect(out.attributes).toEqual({ "service.name": "x" }); + expect(out.events).toBe(events); + expect(out.links).toBe(links); + expect(out.duration).toEqual([1, 0]); + expect(out.ended).toBe(true); + // The arrow-function `spanContext` is an own property in + // `extractReadableSpan`'s output. Spreading must keep it callable + // — the OTLP transformer reads `span.spanContext().spanId`. + expect(out.spanContext()).toEqual(ctx); + }); + + it("treats parentSpanId === '' as 'no parent' rather than synthesising an empty span ID", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + wrapped.export([v1Span({ parentSpanId: "" })], () => {}); + + // Empty string is falsy, so `needsParent` short-circuits; an + // explicit "" must not become `parentSpanContext.spanId === ""` on + // the wire (which Tempo would interpret as a malformed parent). + expect(exported[0]!.parentSpanContext).toBeUndefined(); + }); + + it("does not overwrite an existing parentSpanContext when legacy parentSpanId is also set", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + const existing = { + traceId: "0af7651916cd43dd8448eb211c80319c", + spanId: "ffffffffffffffff", + traceFlags: 1, + isRemote: false, + }; + wrapped.export( + [ + v1Span({ + parentSpanId: "00f067aa0ba902b7", + parentSpanContext: existing, + }), + ], + () => {}, + ); + + // v2 field wins — the adapter is meant to *fill in* gaps, not + // overwrite a context that's already present. + expect(exported[0]!.parentSpanContext).toBe(existing); + }); + + it("normalises a mixed v1 / v2 batch per-span", () => { + const { exporter, exported } = recordingExporter(); + const wrapped = wrapWorkflowSpanExporter(exporter); + + const v1 = v1Span(); + const v2 = { + ...v1Span(), + instrumentationLibrary: undefined, + parentSpanId: undefined, + instrumentationScope: { name: "v2-scope" }, + } as unknown as ReadableSpan; + + wrapped.export([v1, v2, v1Span({ parentSpanId: undefined })], () => {}); + + expect(exported).toHaveLength(3); + expect(exported[0]!.instrumentationScope).toEqual({ + name: "@temporalio/interceptor-workflow", + }); + expect(exported[1]).toBe(v2); + expect(exported[2]!.parentSpanContext).toBeUndefined(); + }); + + it("propagates the inner exporter's result code to the outer callback", async () => { + const failing = { + export: ( + _spans: ReadableSpan[], + cb: (result: { code: number; error?: Error }) => void, + ) => cb({ code: 1, error: new Error("downstream failed") }), + shutdown: () => Promise.resolve(), + forceFlush: () => Promise.resolve(), + }; + const wrapped = wrapWorkflowSpanExporter(failing); + + const result = await new Promise<{ code: number; error?: Error }>( + (resolve) => { + wrapped.export([v1Span()], resolve); + }, + ); + + expect(result.code).toBe(1); + expect(result.error?.message).toBe("downstream failed"); + }); + + it("delegates shutdown and forceFlush to the inner exporter", async () => { + let shutdownCalls = 0; + let flushCalls = 0; + const inner: SpanExporter = { + export: (_, cb) => cb({ code: 0 }), + shutdown: () => { + shutdownCalls += 1; + return Promise.resolve(); + }, + forceFlush: () => { + flushCalls += 1; + return Promise.resolve(); + }, + }; + const wrapped = wrapWorkflowSpanExporter(inner); + + await wrapped.shutdown(); + await wrapped.forceFlush!(); + + expect(shutdownCalls).toBe(1); + expect(flushCalls).toBe(1); + }); +}); diff --git a/libs/@local/hash-backend-utils/src/temporal/workflow-span-adapter.ts b/libs/@local/hash-backend-utils/src/temporal/workflow-span-adapter.ts new file mode 100644 index 00000000000..e76977216b5 --- /dev/null +++ b/libs/@local/hash-backend-utils/src/temporal/workflow-span-adapter.ts @@ -0,0 +1,127 @@ +/** + * Bridge between `@temporalio/interceptors-opentelemetry` (which pins + * `@opentelemetry/sdk-trace-base@^1`) and our `@opentelemetry/sdk-trace-base@2` + * stack. Two field renames matter on the v1→v2 boundary: + * + * - `instrumentationLibrary` → `instrumentationScope`: v2's + * `OTLPTraceExporter` crashes inside `createResourceMap` reading + * `span.instrumentationScope.name` on a v1-shaped object. + * - `parentSpanId: string` → `parentSpanContext: SpanContext`: v2's + * OTLP transformer encodes parent linkage from `parentSpanContext.spanId` + * only. v1-shaped spans carry `parentSpanId` but no `parentSpanContext`, + * so without translation the OTLP envelope ships with no parent and + * Tempo renders every workflow/activity span as a root in the trace. + * + * `wrapWorkflowSpanExporter` returns a `SpanExporter` that synthesises + * the v2-shaped fields on each span on its way in. `makeV2WorkflowSink` + * is the standard entry point for worker bootstraps — it produces a + * `WorkflowSinks` entry from an `OpenTelemetrySetup`, hiding the v1↔v2 + * type-cast in one place. + * + * TODO(BE-520): drop this adapter when + * `@temporalio/interceptors-opentelemetry-v2` (PR + * https://github.com/temporalio/sdk-typescript/pull/1951) is released. + */ +import type { SpanContext } from "@opentelemetry/api"; +import type { ExportResult } from "@opentelemetry/core"; +import type { ReadableSpan, SpanExporter } from "@opentelemetry/sdk-trace-base"; +import { makeWorkflowExporter } from "@temporalio/interceptors-opentelemetry"; + +import type { OpenTelemetrySetup } from "../opentelemetry.js"; + +/** + * v1-shaped fields that may appear on spans produced by Temporal's + * `extractReadableSpan`. The v2 `instrumentationScope` and + * `parentSpanContext` fields are also re-declared as optional because + * the runtime shape is narrower than v2's `ReadableSpan` types pretend + * — `extractReadableSpan` genuinely produces objects where they are + * `undefined`. + */ +interface LegacyReadableSpan { + instrumentationLibrary?: { + name: string; + version?: string; + schemaUrl?: string; + }; + instrumentationScope?: { name: string; version?: string; schemaUrl?: string }; + parentSpanId?: string; + parentSpanContext?: SpanContext; +} + +/** + * `Omit` the v2 fields the legacy spans don't reliably populate, then + * re-add them via `LegacyReadableSpan` as optional. Without this the + * intersection inherits v2's required `instrumentationScope` typing + * and TypeScript flags every defensive `?? fallback` as "always truthy". + */ +type FlexibleSpan = Omit< + ReadableSpan, + "instrumentationScope" | "parentSpanContext" +> & + LegacyReadableSpan; + +const normaliseSpan = (span: ReadableSpan): ReadableSpan => { + const legacy = span as unknown as FlexibleSpan; + + const needsScope = !legacy.instrumentationScope; + const needsParent = legacy.parentSpanId && !legacy.parentSpanContext; + if (!needsScope && !needsParent) { + return span; + } + + // Synthesise an "unknown" scope as the last fallback — the OTLP + // resource-map logic crashes on a missing identifier. + const instrumentationScope = legacy.instrumentationScope ?? + legacy.instrumentationLibrary ?? { name: "unknown" }; + + let parentSpanContext = legacy.parentSpanContext; + if (legacy.parentSpanId && !parentSpanContext) { + const ctx = span.spanContext(); + parentSpanContext = { + traceId: ctx.traceId, + spanId: legacy.parentSpanId, + traceFlags: ctx.traceFlags, + // The parent lives outside this span exporter's sandbox — either + // in the worker's host process (RunWorkflow / RunActivity) or in + // the workflow client (e.g. an Express HTTP span). + isRemote: true, + }; + } + + // Spread is safe: `extractReadableSpan` produces a plain object with + // `spanContext` as an own arrow-function property, not as a prototype + // method, so we don't lose any callable surface. + return { ...span, instrumentationScope, parentSpanContext } as ReadableSpan; +}; + +export const wrapWorkflowSpanExporter = ( + inner: SpanExporter, +): SpanExporter => ({ + export(spans, resultCallback): void { + const adapted = spans.map(normaliseSpan); + inner.export(adapted, (result: ExportResult) => resultCallback(result)); + }, + shutdown: () => inner.shutdown(), + forceFlush: () => inner.forceFlush?.() ?? Promise.resolve(), +}); + +/** + * Build the Temporal workflow sink that exports workflow-sandbox spans + * through the application's OTLP trace exporter, normalising the v1↔v2 + * `ReadableSpan` shape on the way in. + * + * The `as unknown as` casts on the exporter and resource arguments are + * required because `@temporalio/interceptors-opentelemetry@1.x` declares + * them against `@opentelemetry/sdk-trace-base@1` types, while we run + * `@2`. They are the only place in our codebase that pins the v1 shape; + * removing them is the BE-520 deliverable. + */ +export const makeV2WorkflowSink = ( + setup: OpenTelemetrySetup, +): ReturnType => + makeWorkflowExporter( + wrapWorkflowSpanExporter(setup.traceExporter) as unknown as Parameters< + typeof makeWorkflowExporter + >[0], + setup.resource as unknown as Parameters[1], + ); diff --git a/libs/@local/temporal-client/Cargo.toml b/libs/@local/temporal-client/Cargo.toml index 7002f2f5e55..370898d10e7 100644 --- a/libs/@local/temporal-client/Cargo.toml +++ b/libs/@local/temporal-client/Cargo.toml @@ -16,14 +16,17 @@ type-system = { workspace = true, public = true } error-stack = { workspace = true } # Private third-party dependencies -serde = { workspace = true } -serde_json = { workspace = true } -simple-mermaid = { workspace = true } -temporalio-client = { workspace = true } -temporalio-common = { workspace = true } -thiserror = { workspace = true } -url = { workspace = true } -uuid = { workspace = true, features = ["v4"] } +opentelemetry = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simple-mermaid = { workspace = true } +temporalio-client = { workspace = true } +temporalio-common = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tracing-opentelemetry = { workspace = true } +url = { workspace = true } +uuid = { workspace = true, features = ["v4"] } [lints] workspace = true diff --git a/libs/@local/temporal-client/src/ai.rs b/libs/@local/temporal-client/src/ai.rs index 877e4daa296..8eae08258bd 100644 --- a/libs/@local/temporal-client/src/ai.rs +++ b/libs/@local/temporal-client/src/ai.rs @@ -1,11 +1,21 @@ use std::collections::HashMap; use error_stack::{Report, ResultExt as _}; +use opentelemetry::{global, propagation::Injector}; use serde::Serialize; -use temporalio_client::{WorkflowClientTrait as _, WorkflowOptions}; +use temporalio_client::{NamespacedClient, WorkflowService, tonic::IntoRequest as _}; use temporalio_common::protos::{ - ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL, temporal::api::common::v1::Payload, + ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL, + coresdk::IntoPayloadsExt as _, + temporal::api::{ + common::v1::{Header, Payload, WorkflowType}, + enums::v1::TaskQueueKind, + taskqueue::v1::TaskQueue, + workflowservice::v1::StartWorkflowExecutionRequest, + }, }; +use tracing::{Span, instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt as _; use type_system::{ knowledge::entity::EntityId, ontology::{ @@ -17,6 +27,72 @@ use uuid::Uuid; use crate::{TemporalClient, WorkflowError}; +/// Header key used by `@temporalio/interceptors-opentelemetry` to carry the +/// trace-context payload across workflow boundaries. Must stay in sync with +/// `TRACE_HEADER` in that package's `instrumentation.ts`; if it drifts, +/// workflows started from Rust will ship correct headers that the TypeScript +/// inbound interceptor silently ignores, and every resulting span renders +/// parent-less in Tempo. +const TRACE_HEADER: &str = "_tracer-data"; + +/// Adapter so `opentelemetry`'s text-map propagator can write into a plain +/// `HashMap` carrier. +struct CarrierWriter<'a>(&'a mut HashMap); + +impl Injector for CarrierWriter<'_> { + fn set(&mut self, key: &str, value: String) { + self.0.insert(key.to_owned(), value); + } +} + +/// Build a Temporal `Header` containing the active OTEL trace context as +/// a JSON-encoded text-map under the `_tracer-data` field. +/// +/// Returns `None` if no propagator wrote anything into the carrier (e.g. +/// no active span, or no propagator registered) — the caller should leave +/// the request `header` field empty in that case rather than send an +/// empty payload. +fn build_otel_header() -> Option
{ + let context = Span::current().context(); + let mut carrier = HashMap::::new(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(&context, &mut CarrierWriter(&mut carrier)); + }); + if carrier.is_empty() { + // Surface this once per process: an empty carrier means either no + // active tracing span (caller missing `#[instrument]`) or no + // global propagator registered (telemetry bootstrap missing + // `set_text_map_propagator`). Either way the workflow will start + // with no parent context and the worker-side span renders detached + // from the caller's trace. + static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + WARNED.get_or_init(|| { + tracing::warn!( + "OpenTelemetry text-map propagator wrote no headers when starting workflow; \ + workflow spans will be parent-less. Verify the global propagator is installed \ + and the calling fn carries an active tracing span." + ); + }); + return None; + } + + let payload = Payload { + metadata: HashMap::from([( + ENCODING_PAYLOAD_KEY.to_owned(), + JSON_ENCODING_VAL.as_bytes().to_vec(), + )]), + // `HashMap` cannot fail to serialise — fail loud + // rather than silently dropping the trace context (which would + // produce a parent-less workflow span on every start). + data: serde_json::to_vec(&carrier).expect("HashMap serialises"), + ..Default::default() + }; + + Some(Header { + fields: HashMap::from([(TRACE_HEADER.to_owned(), payload)]), + }) +} + #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct AuthenticationContext { @@ -24,31 +100,66 @@ struct AuthenticationContext { } impl TemporalClient { + /// Start a workflow on the `ai` task queue, injecting the active + /// OTEL trace context into the workflow start headers so the + /// worker-side interceptors can parent the workflow + activity + /// spans off the caller's trace. + /// + /// Goes via the low-level `WorkflowService::start_workflow_execution` + /// because `WorkflowClientTrait::start_workflow` does not expose the + /// proto `header` field. The span is annotated with `otel.kind = + /// "producer"` for the asynchronous fire-and-forget shape (the value + /// is case-sensitive; `tracing-opentelemetry` falls back to + /// `Internal` on typos). + #[instrument( + skip(self, payload), + fields(workflow_type = workflow, otel.kind = "producer"), + )] async fn start_ai_workflow( &self, workflow: &'static str, payload: &(impl Serialize + Sync), ) -> Result> { - Ok(self - .client - .start_workflow( - vec![Payload { - metadata: HashMap::from([( - ENCODING_PAYLOAD_KEY.to_owned(), - JSON_ENCODING_VAL.as_bytes().to_vec(), - )]), - data: serde_json::to_vec(payload).change_context(WorkflowError(workflow))?, - external_payloads: Vec::new(), - }], - "ai".to_owned(), - Uuid::new_v4().to_string(), - workflow.to_owned(), - None, - WorkflowOptions::default(), - ) - .await - .change_context(WorkflowError(workflow))? - .run_id) + let mut client = self.client.clone(); + // `WorkflowClientTrait::start_workflow` auto-populates `identity` from + // `ClientOptions` (typically `pid@hostname`). The low-level + // `StartWorkflowExecutionRequest` defaults it to an empty string, + // which makes Temporal Server / UI unable to attribute starts to a + // client. Read it back from the configured client. + let identity = client.get_client().identity(); + let request = StartWorkflowExecutionRequest { + namespace: <_ as NamespacedClient>::namespace(&client), + input: vec![Payload { + metadata: HashMap::from([( + ENCODING_PAYLOAD_KEY.to_owned(), + JSON_ENCODING_VAL.as_bytes().to_vec(), + )]), + data: serde_json::to_vec(payload).change_context(WorkflowError(workflow))?, + ..Default::default() + }] + .into_payloads(), + workflow_id: Uuid::new_v4().to_string(), + workflow_type: Some(WorkflowType { + name: workflow.to_owned(), + }), + task_queue: Some(TaskQueue { + name: "ai".to_owned(), + kind: TaskQueueKind::Unspecified as i32, + normal_name: String::new(), + }), + identity, + request_id: Uuid::new_v4().to_string(), + header: build_otel_header(), + ..Default::default() + }; + + let response = + WorkflowService::start_workflow_execution(&mut client, request.into_request()) + .await + .change_context(WorkflowError(workflow))? + .into_inner(); + + Ok(response.run_id) } /// Starts a workflow to update the embeddings for the provided data type. @@ -138,10 +249,7 @@ impl TemporalClient { .await } - /// Starts a workflow to update the embeddings for the provided entities. - /// - /// The `embedding_exclusions` parameter specifies which properties should be excluded - /// from embedding generation for specific entity types (e.g., email for User entities). + /// Starts workflows to update the embeddings for the provided entities. /// /// Returns the run IDs of the workflows. /// diff --git a/tests/hash-backend-integration/src/tests/setup-opentelemetry.ts b/tests/hash-backend-integration/src/tests/setup-opentelemetry.ts index c689f5fc760..a6aed910015 100644 --- a/tests/hash-backend-integration/src/tests/setup-opentelemetry.ts +++ b/tests/hash-backend-integration/src/tests/setup-opentelemetry.ts @@ -1,7 +1,13 @@ -import { registerOpenTelemetry } from "@apps/hash-api/src/graphql/opentelemetry"; +import { + createUndiciInstrumentation, + registerOpenTelemetry, +} from "@local/hash-backend-utils/opentelemetry"; -// Initialize OpenTelemetry for backend integration tests const otlpEndpoint = process.env.HASH_OTLP_ENDPOINT; if (otlpEndpoint) { - registerOpenTelemetry(otlpEndpoint, "BE Integration Tests"); + registerOpenTelemetry({ + endpoint: otlpEndpoint, + serviceName: "BE Integration Tests", + instrumentations: [createUndiciInstrumentation()], + }); } diff --git a/yarn.lock b/yarn.lock index 65886557dba..25150abacb4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -352,9 +352,14 @@ __metadata: "@local/hash-isomorphic-utils": "workspace:*" "@local/status": "workspace:*" "@local/tsconfig": "workspace:*" + "@opentelemetry/api": "npm:1.9.0" + "@opentelemetry/api-logs": "npm:0.207.0" + "@opentelemetry/instrumentation": "npm:0.207.0" + "@opentelemetry/instrumentation-grpc": "npm:0.207.0" "@sentry/node": "npm:10.42.0" "@temporalio/activity": "npm:1.12.1" "@temporalio/common": "npm:1.12.1" + "@temporalio/interceptors-opentelemetry": "npm:1.12.1" "@temporalio/proto": "npm:1.12.1" "@temporalio/worker": "npm:1.12.1" "@temporalio/workflow": "npm:1.12.1" @@ -442,7 +447,6 @@ __metadata: "@opentelemetry/instrumentation": "npm:0.207.0" "@opentelemetry/instrumentation-express": "npm:0.56.0" "@opentelemetry/instrumentation-graphql": "npm:0.55.0" - "@opentelemetry/instrumentation-http": "npm:0.207.0" "@opentelemetry/resources": "npm:2.2.0" "@opentelemetry/sdk-logs": "npm:0.207.0" "@opentelemetry/sdk-trace-base": "npm:2.2.0" @@ -700,8 +704,13 @@ __metadata: "@local/hash-isomorphic-utils": "workspace:*" "@local/status": "workspace:*" "@local/tsconfig": "workspace:*" + "@opentelemetry/api": "npm:1.9.0" + "@opentelemetry/api-logs": "npm:0.207.0" + "@opentelemetry/instrumentation": "npm:0.207.0" + "@opentelemetry/instrumentation-grpc": "npm:0.207.0" "@sentry/node": "npm:10.42.0" "@temporalio/activity": "npm:1.12.1" + "@temporalio/interceptors-opentelemetry": "npm:1.12.1" "@temporalio/worker": "npm:1.12.1" "@temporalio/workflow": "npm:1.12.1" "@types/dotenv-flow": "npm:3.3.3" @@ -9202,12 +9211,25 @@ __metadata: "@local/tsconfig": "workspace:*" "@opentelemetry/api": "npm:1.9.0" "@opentelemetry/api-logs": "npm:0.207.0" + "@opentelemetry/core": "npm:2.2.0" + "@opentelemetry/exporter-logs-otlp-grpc": "npm:0.207.0" + "@opentelemetry/exporter-metrics-otlp-grpc": "npm:0.207.0" + "@opentelemetry/exporter-trace-otlp-grpc": "npm:0.207.0" + "@opentelemetry/instrumentation": "npm:0.207.0" + "@opentelemetry/instrumentation-http": "npm:0.207.0" + "@opentelemetry/instrumentation-undici": "npm:0.25.0" + "@opentelemetry/resources": "npm:2.2.0" + "@opentelemetry/sdk-logs": "npm:0.207.0" + "@opentelemetry/sdk-metrics": "npm:2.2.0" + "@opentelemetry/sdk-trace-base": "npm:2.2.0" + "@opentelemetry/sdk-trace-node": "npm:2.2.0" "@sentry/node": "npm:10.42.0" "@smithy/protocol-http": "npm:5.3.3" "@smithy/signature-v4": "npm:5.3.3" "@temporalio/activity": "npm:1.12.1" "@temporalio/client": "npm:1.12.1" "@temporalio/common": "npm:1.12.1" + "@temporalio/interceptors-opentelemetry": "npm:1.12.1" "@temporalio/proto": "npm:1.12.1" "@temporalio/worker": "npm:1.12.1" "@temporalio/workflow": "npm:1.12.1" @@ -10437,6 +10459,15 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/api-logs@npm:0.215.0": + version: 0.215.0 + resolution: "@opentelemetry/api-logs@npm:0.215.0" + dependencies: + "@opentelemetry/api": "npm:^1.3.0" + checksum: 10c0/9ec6a46c064f71d01c21b45d1e90183efb2476f41ef9ac450bc95077c618cc7fe95bdb65fcd6f134d634e1950a1fe97678e6efe1a8382a11ebae6d8e960330e0 + languageName: node + linkType: hard + "@opentelemetry/api-logs@npm:0.41.2": version: 0.41.2 resolution: "@opentelemetry/api-logs@npm:0.41.2" @@ -10455,13 +10486,20 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/api@npm:1.9.0, @opentelemetry/api@npm:^1.0.0, @opentelemetry/api@npm:^1.3.0, @opentelemetry/api@npm:^1.4.1, @opentelemetry/api@npm:^1.9.0": +"@opentelemetry/api@npm:1.9.0": version: 1.9.0 resolution: "@opentelemetry/api@npm:1.9.0" checksum: 10c0/9aae2fe6e8a3a3eeb6c1fdef78e1939cf05a0f37f8a4fae4d6bf2e09eb1e06f966ece85805626e01ba5fab48072b94f19b835449e58b6d26720ee19a58298add languageName: node linkType: hard +"@opentelemetry/api@npm:^1.0.0, @opentelemetry/api@npm:^1.3.0, @opentelemetry/api@npm:^1.4.1, @opentelemetry/api@npm:^1.7.0, @opentelemetry/api@npm:^1.9.0": + version: 1.9.1 + resolution: "@opentelemetry/api@npm:1.9.1" + checksum: 10c0/c608485fc8b5a91e1f7e05e843b45b509307456b31cd2ad365933d90813e40ebfedf179f1451c762037e82d7c76aa8500e95d2da3609f640a1206cde5322cd14 + languageName: node + linkType: hard + "@opentelemetry/context-async-hooks@npm:2.2.0": version: 2.2.0 resolution: "@opentelemetry/context-async-hooks@npm:2.2.0" @@ -10511,7 +10549,7 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/core@npm:1.30.1": +"@opentelemetry/core@npm:1.30.1, @opentelemetry/core@npm:^1.19.0": version: 1.30.1 resolution: "@opentelemetry/core@npm:1.30.1" dependencies: @@ -11053,6 +11091,18 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/instrumentation-grpc@npm:0.207.0": + version: 0.207.0 + resolution: "@opentelemetry/instrumentation-grpc@npm:0.207.0" + dependencies: + "@opentelemetry/instrumentation": "npm:0.207.0" + "@opentelemetry/semantic-conventions": "npm:^1.29.0" + peerDependencies: + "@opentelemetry/api": ^1.3.0 + checksum: 10c0/d3dc63193ea941836fbd60dc6d637e06b9ba9a5a2806e1d28117cf6ee6a4d2bf942903ad572c32869f1d4cd06a669f76c7848ce3af32c697c940dbe324f0d3da + languageName: node + linkType: hard + "@opentelemetry/instrumentation-hapi@npm:0.55.0": version: 0.55.0 resolution: "@opentelemetry/instrumentation-hapi@npm:0.55.0" @@ -11450,6 +11500,19 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/instrumentation-undici@npm:0.25.0": + version: 0.25.0 + resolution: "@opentelemetry/instrumentation-undici@npm:0.25.0" + dependencies: + "@opentelemetry/core": "npm:^2.0.0" + "@opentelemetry/instrumentation": "npm:^0.215.0" + "@opentelemetry/semantic-conventions": "npm:^1.24.0" + peerDependencies: + "@opentelemetry/api": ^1.7.0 + checksum: 10c0/03e6133149a33801655dcc9fc3b0459b50581f6379e811bbfd43ff98bcca1e956c5acfb1fdd5ade9b2184c108e06fd1ccc2509b8703d16c290d2e203dacc9de8 + languageName: node + linkType: hard + "@opentelemetry/instrumentation@npm:0.207.0, @opentelemetry/instrumentation@npm:^0.207.0": version: 0.207.0 resolution: "@opentelemetry/instrumentation@npm:0.207.0" @@ -11476,7 +11539,7 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/instrumentation@npm:0.211.0, @opentelemetry/instrumentation@npm:>=0.52.0 <1, @opentelemetry/instrumentation@npm:^0.211.0": +"@opentelemetry/instrumentation@npm:0.211.0, @opentelemetry/instrumentation@npm:^0.211.0": version: 0.211.0 resolution: "@opentelemetry/instrumentation@npm:0.211.0" dependencies: @@ -11489,6 +11552,19 @@ __metadata: languageName: node linkType: hard +"@opentelemetry/instrumentation@npm:>=0.52.0 <1, @opentelemetry/instrumentation@npm:^0.215.0": + version: 0.215.0 + resolution: "@opentelemetry/instrumentation@npm:0.215.0" + dependencies: + "@opentelemetry/api-logs": "npm:0.215.0" + import-in-the-middle: "npm:^3.0.0" + require-in-the-middle: "npm:^8.0.0" + peerDependencies: + "@opentelemetry/api": ^1.3.0 + checksum: 10c0/d9b3f2f927192a866d4c44b80bdebfea54c958660242bba7c61889357c6e51ab8fda54a711d41b7e93bf5fe41e8c7a5384a90a5262467667de997387c3e54924 + languageName: node + linkType: hard + "@opentelemetry/otlp-exporter-base@npm:0.205.0": version: 0.205.0 resolution: "@opentelemetry/otlp-exporter-base@npm:0.205.0" @@ -11696,7 +11772,7 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/resources@npm:1.30.1, @opentelemetry/resources@npm:^1.15.2": +"@opentelemetry/resources@npm:1.30.1, @opentelemetry/resources@npm:^1.15.2, @opentelemetry/resources@npm:^1.19.0": version: 1.30.1 resolution: "@opentelemetry/resources@npm:1.30.1" dependencies: @@ -11916,7 +11992,7 @@ __metadata: languageName: node linkType: hard -"@opentelemetry/sdk-trace-base@npm:1.30.1, @opentelemetry/sdk-trace-base@npm:^1.15.2": +"@opentelemetry/sdk-trace-base@npm:1.30.1, @opentelemetry/sdk-trace-base@npm:^1.15.2, @opentelemetry/sdk-trace-base@npm:^1.19.0": version: 1.30.1 resolution: "@opentelemetry/sdk-trace-base@npm:1.30.1" dependencies: @@ -17740,6 +17816,24 @@ __metadata: languageName: node linkType: hard +"@temporalio/interceptors-opentelemetry@npm:1.12.1": + version: 1.12.1 + resolution: "@temporalio/interceptors-opentelemetry@npm:1.12.1" + dependencies: + "@opentelemetry/api": "npm:^1.7.0" + "@opentelemetry/core": "npm:^1.19.0" + "@opentelemetry/resources": "npm:^1.19.0" + "@opentelemetry/sdk-trace-base": "npm:^1.19.0" + peerDependencies: + "@temporalio/activity": 1.12.1 + "@temporalio/client": 1.12.1 + "@temporalio/common": 1.12.1 + "@temporalio/worker": 1.12.1 + "@temporalio/workflow": 1.12.1 + checksum: 10c0/e933f0580f974b1f44203a4df23d9eed6bbf165b1a9a957b981bbe911ed092bfb183c447873468bf3b858e3f31ac84baaebf944afbd1332cf0c95b35a6c008c4 + languageName: node + linkType: hard + "@temporalio/proto@npm:1.12.1": version: 1.12.1 resolution: "@temporalio/proto@npm:1.12.1" @@ -31482,6 +31576,18 @@ __metadata: languageName: node linkType: hard +"import-in-the-middle@npm:^3.0.0": + version: 3.0.1 + resolution: "import-in-the-middle@npm:3.0.1" + dependencies: + acorn: "npm:^8.15.0" + acorn-import-attributes: "npm:^1.9.5" + cjs-module-lexer: "npm:^2.2.0" + module-details-from-path: "npm:^1.0.4" + checksum: 10c0/afd314edb76764ff53d624e2868f5489a9fb81d22745da3db88121a962f422390b59be86fc5cb1158ed672025ece3b3f8a4943e5dde116bae3dac9f0ff5aff86 + languageName: node + linkType: hard + "import-lazy@npm:~4.0.0": version: 4.0.0 resolution: "import-lazy@npm:4.0.0"