diff --git a/AGENTS.md b/AGENTS.md index 2add69f48..488149f9e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -43,6 +43,7 @@ for the full dependency graph. | `packages/destination-postgres` | Postgres destination connector | `protocol`, `util-postgres` | | `packages/destination-google-sheets` | Google Sheets destination connector | `protocol` | | `packages/state-postgres` | Postgres state store + migrations | `util-postgres` | +| `packages/logger` | Shared structured logger with PII redaction | `pino` | | `packages/util-postgres` | Shared Postgres utilities (upsert, rate limiter) | standalone | | `packages/ts-cli` | Generic TypeScript module CLI runner | standalone | | `apps/engine` | Sync engine library + stateless CLI + HTTP API | `protocol`, connectors, `state-postgres` | @@ -75,7 +76,8 @@ See [docs/architecture/principles.md](docs/architecture/principles.md) for the c ## Conventions - All serializable inputs/outputs (Zod schemas, JSON wire format) must use **snake_case** field names. -- Source connectors must use `console.error` for logging (stdout is the NDJSON stream). +- **All logging must use `@stripe/sync-logger`** — raw `console.*` is banned in source code (enforced by ESLint `no-console: error`). Apps use `createLogger()`, subprocess connectors use `createConnectorLogger()` (writes to stderr). +- **Never log secrets or synced data** — the shared logger redacts `api_key`, `secret`, `token`, `password`, `connection_string`, `data`, and related fields. Do not log record payloads, API keys, or connection strings even in test output. - Generated OpenAPI specs live in each package's `src/__generated__/openapi.json`. Run `./scripts/generate-openapi.sh` and commit the output before pushing when schemas change. Never edit generated files by hand. - Non-trivial PRs should be accompanied by a plan artifact in `docs/plans/YYYY-MM-DD-.md`. Save it before or alongside the first implementation commit. diff --git a/apps/engine/package.json b/apps/engine/package.json index eefddacec..508253284 100644 --- a/apps/engine/package.json +++ b/apps/engine/package.json @@ -32,7 +32,8 @@ "scripts": { "build": "tsc", "x:watch": "sh -c 'if command -v bun > /dev/null 2>&1; then bun --watch \"$@\"; else tsx --watch --conditions bun \"$@\"; fi' --", - "dev": "LOG_LEVEL=debug LOG_PRETTY=true DANGEROUSLY_VERBOSE_LOGGING=true pnpm x:watch src/api/index.ts", + "dev": "PORT=4010 LOG_PRETTY=true pnpm x:watch src/api/index.ts", + "lint": "eslint src/", "test": "vitest run", "generate:types": "openapi-typescript src/__generated__/openapi.json -o src/__generated__/openapi.d.ts" }, @@ -47,6 +48,7 @@ "@stripe/sync-destination-postgres": "workspace:*", "@stripe/sync-hono-zod-openapi": "workspace:*", "@stripe/sync-integration-supabase": "workspace:*", + "@stripe/sync-logger": "workspace:*", "@stripe/sync-protocol": "workspace:*", "@stripe/sync-source-stripe": "workspace:*", "@stripe/sync-state-postgres": "workspace:*", diff --git a/apps/engine/src/__tests__/stripe-to-postgres.test.ts b/apps/engine/src/__tests__/stripe-to-postgres.test.ts index 381ddec7c..43b9fc339 100644 --- a/apps/engine/src/__tests__/stripe-to-postgres.test.ts +++ b/apps/engine/src/__tests__/stripe-to-postgres.test.ts @@ -187,8 +187,9 @@ describe('engine read → write', () => { for (const r of records) { expect(r.stream).toBe(targetStream) - expect((r as any).data).toBeDefined() - expect((r as any).data.id).toBeDefined() + const data = (r as Record).data as Record + expect(data).toBeDefined() + expect(data.id).toBeDefined() } }) diff --git a/apps/engine/src/api/app.test.ts b/apps/engine/src/api/app.test.ts index bff75631e..e8c57978a 100644 --- a/apps/engine/src/api/app.test.ts +++ b/apps/engine/src/api/app.test.ts @@ -2,6 +2,10 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' import type { ConnectorResolver, Message, SourceStateMessage } from '../lib/index.js' import { sourceTest, destinationTest, collectFirst } from '../lib/index.js' import { createApp } from './app.js' +import { z } from 'zod' + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type Json = Record // --------------------------------------------------------------------------- // Helpers @@ -33,7 +37,7 @@ beforeAll(async () => { 'test', { connector: sourceTest, - configSchema: {} as any, + configSchema: z.object({}), rawConfigJsonSchema: srcConfigSchema, }, ], @@ -44,7 +48,7 @@ beforeAll(async () => { 'test', { connector: destinationTest, - configSchema: {} as any, + configSchema: z.object({}), rawConfigJsonSchema: destConfigSchema, }, ], @@ -131,7 +135,7 @@ describe('GET /openapi.json', () => { it('has typed connector schemas in components (auto-generated from Zod)', async () => { const app = await createApp(resolver) const res = await app.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json const schemaNames = Object.keys(spec.components?.schemas ?? {}) expect(schemaNames).toContain('SourceTestConfig') @@ -150,7 +154,7 @@ describe('GET /openapi.json', () => { it('defines NDJSON message schemas with discriminated unions', async () => { const app = await createApp(resolver) const res = await app.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json const schemas = spec.components.schemas // Individual message types — zod-openapi uses const for z.literal() in OpenAPI 3.1 @@ -187,18 +191,18 @@ describe('GET /openapi.json', () => { it('ControlMessage source_config/destination_config reference typed connector schemas', async () => { const app = await createApp(resolver) const res = await app.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json const control = spec.components.schemas.ControlMessage.properties.control const sourceVariant = control.oneOf.find( - (v: any) => v.properties?.control_type?.const === 'source_config' + (v: Json) => v.properties?.control_type?.const === 'source_config' ) expect(sourceVariant.properties.source_config.$ref).toBe( '#/components/schemas/SourceTestConfig' ) const destVariant = control.oneOf.find( - (v: any) => v.properties?.control_type?.const === 'destination_config' + (v: Json) => v.properties?.control_type?.const === 'destination_config' ) expect(destVariant.properties.destination_config.$ref).toBe( '#/components/schemas/DestinationTestConfig' @@ -208,7 +212,7 @@ describe('GET /openapi.json', () => { it('/setup spec documents 200 response (not 204)', async () => { const app = await createApp(resolver) const res = await app.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json const setupOp = spec.paths['/pipeline_setup']?.post expect(setupOp).toBeDefined() expect(setupOp.responses['200']).toBeDefined() @@ -218,7 +222,7 @@ describe('GET /openapi.json', () => { it('/write spec documents a required NDJSON request body', async () => { const app = await createApp(resolver) const res = await app.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json const writeOp = spec.paths['/pipeline_write']?.post expect(writeOp).toBeDefined() const body = writeOp.requestBody @@ -232,7 +236,7 @@ describe('GET /openapi.json', () => { it('/read and /sync spec documents an optional NDJSON request body', async () => { const app = await createApp(resolver) const res = await app.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json for (const path of ['/pipeline_read', '/pipeline_sync'] as const) { const op = spec.paths[path]?.post @@ -247,13 +251,13 @@ describe('GET /openapi.json', () => { it('documents the X-Pipeline header on sync routes', async () => { const app = await createApp(resolver) const res = await app.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json // /check is a POST with X-Pipeline header const checkOp = spec.paths['/pipeline_check']?.post expect(checkOp).toBeDefined() const headerParam = checkOp.parameters?.find( - (p: any) => p.in === 'header' && p.name === 'x-pipeline' + (p: Json) => p.in === 'header' && p.name === 'x-pipeline' ) expect(headerParam).toBeDefined() }) @@ -264,9 +268,9 @@ describe('GET /meta/sources', () => { const app = await createApp(resolver) const res = await app.request('/meta/sources') expect(res.status).toBe(200) - const body = (await res.json()) as any + const body = (await res.json()) as Json expect(Array.isArray(body.items)).toBe(true) - expect(body.items.find((c: any) => c.type === 'test')?.config_schema).toBeDefined() + expect(body.items.find((c: Json) => c.type === 'test')?.config_schema).toBeDefined() }) }) @@ -275,7 +279,7 @@ describe('GET /meta/sources/:type', () => { const app = await createApp(resolver) const res = await app.request('/meta/sources/test') expect(res.status).toBe(200) - const body = (await res.json()) as any + const body = (await res.json()) as Json expect(body.config_schema).toBeDefined() }) @@ -291,9 +295,9 @@ describe('GET /meta/destinations', () => { const app = await createApp(resolver) const res = await app.request('/meta/destinations') expect(res.status).toBe(200) - const body = (await res.json()) as any + const body = (await res.json()) as Json expect(Array.isArray(body.items)).toBe(true) - expect(body.items.find((c: any) => c.type === 'test')?.config_schema).toBeDefined() + expect(body.items.find((c: Json) => c.type === 'test')?.config_schema).toBeDefined() }) }) @@ -302,7 +306,7 @@ describe('GET /meta/destinations/:type', () => { const app = await createApp(resolver) const res = await app.request('/meta/destinations/test') expect(res.status).toBe(200) - const body = (await res.json()) as any + const body = (await res.json()) as Json expect(body.config_schema).toBeDefined() }) @@ -372,7 +376,7 @@ describe('POST /check', () => { const events = await readNdjson>(res) const statuses = events.filter((e) => e.type === 'connection_status') expect(statuses).toHaveLength(2) - expect(statuses.every((s: any) => s.connection_status.status === 'succeeded')).toBe(true) + expect(statuses.every((s: Json) => s.connection_status.status === 'succeeded')).toBe(true) }) }) @@ -436,7 +440,7 @@ describe('POST /read', () => { 'test', { connector: sourceTest, - configSchema: {} as any, + configSchema: z.object({}), rawConfigJsonSchema: srcConfigSchema, rawInputJsonSchema: inputSchema, }, @@ -448,7 +452,7 @@ describe('POST /read', () => { 'test', { connector: destinationTest, - configSchema: {} as any, + configSchema: z.object({}), rawConfigJsonSchema: destConfigSchema, }, ], @@ -459,7 +463,7 @@ describe('POST /read', () => { it('spec uses SourceInputMessage schema for /read and /sync request body when source has input schema', async () => { const res = await inputApp.request('/openapi.json') - const spec = (await res.json()) as any + const spec = (await res.json()) as Json for (const path of ['/pipeline_read', '/pipeline_sync'] as const) { const body = spec.paths[path]?.post?.requestBody @@ -794,8 +798,8 @@ describe('POST /source_discover', () => { const events = await readNdjson>(res) const catalogs = events.filter((e) => e.type === 'catalog') expect(catalogs).toHaveLength(1) - const catalog = (catalogs[0] as any).catalog - const streamNames = catalog.streams.map((s: any) => s.name) + const catalog = (catalogs[0] as Json).catalog + const streamNames = catalog.streams.map((s: Json) => s.name) expect(streamNames).toContain('customers') expect(streamNames).toContain('products') }) @@ -824,7 +828,7 @@ describe('POST /source_discover', () => { const events = await readNdjson>(res) const traces = events.filter((e) => e.type === 'trace') expect(traces).toHaveLength(1) - const trace = (traces[0] as any).trace + const trace = (traces[0] as Json).trace expect(trace.trace_type).toBe('error') expect(trace.error.failure_type).toBe('system_error') expect(trace.error.message).toContain('network unreachable') diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index ce5de9fdb..bdcba2573 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -37,6 +37,7 @@ const ndjsonRef = { SourceInputMessage: { $ref: '#/components/schemas/SourceInputMessage' }, } import { ndjsonResponse } from '@stripe/sync-ts-cli/ndjson' +import { REQUEST_HEADER_REDACT } from '@stripe/sync-logger' import { logger } from '../logger.js' import { sslConfigFromConnectionString, @@ -142,7 +143,7 @@ export async function createApp(resolver: ConnectorResolver) { } }) logger.debug( - { requestId, method: c.req.method, path: c.req.path, headers }, + { requestId, method: c.req.method, path: c.req.path, request_headers: headers }, 'request headers' ) } @@ -150,20 +151,14 @@ export async function createApp(resolver: ConnectorResolver) { if (dangerouslyVerbose) { const curlParts = [`curl -X ${c.req.method} '${c.req.url}'`] c.req.raw.headers.forEach((value, key) => { - curlParts.push(` -H '${key}: ${value}'`) + curlParts.push( + REQUEST_HEADER_REDACT.has(key.toLowerCase()) + ? ` -H '${key}: [REDACTED]'` + : ` -H '${key}: ${value}'` + ) }) if (hasBody(c)) { - const cl = c.req.header('Content-Length') - if (cl && Number(cl) < 100_000) { - try { - const body = await c.req.raw.clone().text() - curlParts.push(` -d '${body.replace(/'/g, "'\\''")}'`) - } catch { - /* skip */ - } - } else { - curlParts.push(' --data-binary @-') - } + curlParts.push(' --data-binary @-') } logger.debug(curlParts.join(' \\\n')) } diff --git a/apps/engine/src/cli/supabase.ts b/apps/engine/src/cli/supabase.ts index 789e24921..6c45fcc0b 100644 --- a/apps/engine/src/cli/supabase.ts +++ b/apps/engine/src/cli/supabase.ts @@ -1,5 +1,6 @@ import { defineCommand } from 'citty' import { install, uninstall, getCurrentVersion } from '@stripe/sync-integration-supabase' +import { logger } from '../logger.js' const installCmd = defineCommand({ meta: { @@ -67,8 +68,7 @@ const installCmd = defineCommand({ const version = args.packageVersion || getCurrentVersion() - console.log(`Installing Stripe sync to Supabase project ${project}...`) - console.log(` Edge function version: ${version}`) + logger.info({ project, version }, 'Installing Stripe sync to Supabase project') await install({ supabaseAccessToken: token, @@ -82,7 +82,7 @@ const installCmd = defineCommand({ supabaseManagementUrl: managementUrl, }) - console.log('Installation complete.') + logger.info('Installation complete') }, }) @@ -118,7 +118,7 @@ const uninstallCmd = defineCommand({ throw new Error('Missing --project or SUPABASE_PROJECT_REF env') } - console.log(`Uninstalling Stripe sync from Supabase project ${project}...`) + logger.info({ project }, 'Uninstalling Stripe sync from Supabase project') await uninstall({ supabaseAccessToken: token, @@ -126,7 +126,7 @@ const uninstallCmd = defineCommand({ supabaseManagementUrl: managementUrl, }) - console.log('Uninstall complete.') + logger.info('Uninstall complete') }, }) diff --git a/apps/engine/src/lib/createSchemas.ts b/apps/engine/src/lib/createSchemas.ts index 45d4d163b..0a080a949 100644 --- a/apps/engine/src/lib/createSchemas.ts +++ b/apps/engine/src/lib/createSchemas.ts @@ -73,19 +73,25 @@ export function createConnectorSchemas(resolver: ConnectorResolver) { return { name, config, variant: z.object({ type: z.literal(name), [name]: config }) } }) - // eslint-disable-next-line @typescript-eslint/no-explicit-any const SourceConfig = sources.length > 0 ? z - .discriminatedUnion('type', sources.map((s) => s.variant) as [any, any, ...any[]]) + .discriminatedUnion( + 'type', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sources.map((s) => s.variant) as [any, any, ...any[]] + ) .meta({ id: connectorUnionId('Source') }) : z.object({ type: z.string() }).catchall(z.unknown()) - // eslint-disable-next-line @typescript-eslint/no-explicit-any const DestinationConfig = destinations.length > 0 ? z - .discriminatedUnion('type', destinations.map((d) => d.variant) as [any, any, ...any[]]) + .discriminatedUnion( + 'type', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + destinations.map((d) => d.variant) as [any, any, ...any[]] + ) .meta({ id: connectorUnionId('Destination') }) : z.object({ type: z.string() }).catchall(z.unknown()) diff --git a/apps/engine/src/lib/exec.test.ts b/apps/engine/src/lib/exec.test.ts index 42d51463b..31f799461 100644 --- a/apps/engine/src/lib/exec.test.ts +++ b/apps/engine/src/lib/exec.test.ts @@ -3,7 +3,6 @@ import { resolveBin } from './resolver.js' import { createSourceFromExec } from './source-exec.js' import { createDestinationFromExec } from './destination-exec.js' import { collectFirst } from '@stripe/sync-protocol' -import type { Message } from '@stripe/sync-protocol' // These tests use real connector binaries (built by `pnpm build`). diff --git a/apps/engine/src/lib/pipeline.test.ts b/apps/engine/src/lib/pipeline.test.ts index 7dcf24483..e001722bf 100644 --- a/apps/engine/src/lib/pipeline.test.ts +++ b/apps/engine/src/lib/pipeline.test.ts @@ -64,7 +64,10 @@ describe('enforceCatalog()', () => { ] const result = await drain(enforceCatalog(catalog([{ name: 'customers' }]))(toAsync(msgs))) expect(result).toHaveLength(1) - expect((result[0] as any).record.data).toEqual({ id: 'cus_1', name: 'Alice' }) + expect((result[0] as { record: { data: unknown } }).record.data).toEqual({ + id: 'cus_1', + name: 'Alice', + }) }) it('filters record fields to json_schema.properties when present', async () => { @@ -92,7 +95,10 @@ describe('enforceCatalog()', () => { )(toAsync(msgs)) ) expect(result).toHaveLength(1) - expect((result[0] as any).record.data).toEqual({ id: 'sub_1', status: 'active' }) + expect((result[0] as { record: { data: unknown } }).record.data).toEqual({ + id: 'sub_1', + status: 'active', + }) }) it('drops unknown internal fields that are not present in the catalog schema', async () => { @@ -126,7 +132,7 @@ describe('enforceCatalog()', () => { )(toAsync(msgs)) ) expect(result).toHaveLength(1) - expect((result[0] as any).record.data).toEqual({ + expect((result[0] as { record: { data: unknown } }).record.data).toEqual({ id: 'sub_1', status: 'active', }) @@ -145,7 +151,7 @@ describe('enforceCatalog()', () => { ] const result = await drain(enforceCatalog(catalog([{ name: 'subscriptions' }]))(toAsync(msgs))) expect(result).toHaveLength(1) - expect((result[0] as any).record.data).toEqual({ + expect((result[0] as { record: { data: unknown } }).record.data).toEqual({ id: 'sub_1', status: 'active', customer: 'cus_1', diff --git a/apps/engine/src/lib/remote-engine.test.ts b/apps/engine/src/lib/remote-engine.test.ts index 2c50a274c..79ebeb8cb 100644 --- a/apps/engine/src/lib/remote-engine.test.ts +++ b/apps/engine/src/lib/remote-engine.test.ts @@ -6,6 +6,7 @@ import { sourceTest, destinationTest, collectFirst } from './index.js' import { createApp } from '../api/app.js' import { createRemoteEngine } from './remote-engine.js' import type { PipelineConfig, SourceStateMessage } from '@stripe/sync-protocol' +import { z } from 'zod' // --------------------------------------------------------------------------- // Server setup @@ -54,7 +55,7 @@ beforeAll(async () => { 'test', { connector: sourceTest, - configSchema: {} as any, + configSchema: z.object({}), rawConfigJsonSchema: srcConfigSchema, }, ], @@ -65,7 +66,7 @@ beforeAll(async () => { 'test', { connector: destinationTest, - configSchema: {} as any, + configSchema: z.object({}), rawConfigJsonSchema: destConfigSchema, }, ], diff --git a/apps/engine/src/lib/remote-engine.ts b/apps/engine/src/lib/remote-engine.ts index 5b18a5810..b6a977446 100644 --- a/apps/engine/src/lib/remote-engine.ts +++ b/apps/engine/src/lib/remote-engine.ts @@ -99,7 +99,8 @@ export function createRemoteEngine(engineUrl: string): Engine { }) if (!response.ok) { const text = await response.text().catch(() => '') - throw new Error(`Engine ${path} failed (${response.status}): ${text}`) + const safeText = text.length > 200 ? text.slice(0, 200) + '...' : text + throw new Error(`Engine ${path} failed (${response.status}): ${safeText}`) } return response } diff --git a/apps/engine/src/lib/resolver.ts b/apps/engine/src/lib/resolver.ts index 1bd83250d..cc4460207 100644 --- a/apps/engine/src/lib/resolver.ts +++ b/apps/engine/src/lib/resolver.ts @@ -1,7 +1,7 @@ import { existsSync, readFileSync } from 'node:fs' import { dirname, join } from 'node:path' import { z } from 'zod' -import type { Source, Destination, ConnectorSpecification } from '@stripe/sync-protocol' +import type { Source, Destination } from '@stripe/sync-protocol' import { collectFirst } from '@stripe/sync-protocol' import { createSourceFromExec } from './source-exec.js' import { createDestinationFromExec } from './destination-exec.js' diff --git a/apps/engine/src/logger.ts b/apps/engine/src/logger.ts index 4ba380c1c..3c789ea77 100644 --- a/apps/engine/src/logger.ts +++ b/apps/engine/src/logger.ts @@ -1,10 +1,3 @@ -import pino from 'pino' +import { createLogger } from '@stripe/sync-logger' -export const logger = pino({ - level: process.env.LOG_LEVEL ?? 'info', - transport: process.env.LOG_PRETTY ? { target: import.meta.resolve('pino-pretty') } : undefined, - redact: { - paths: ['*.api_key', '*.connection_string', '*.password', '*.url'], - censor: '[redacted]', - }, -}) +export const logger = createLogger({ name: 'engine' }) diff --git a/apps/service/package.json b/apps/service/package.json index 1cd3f6d38..cd71a9b0d 100644 --- a/apps/service/package.json +++ b/apps/service/package.json @@ -19,6 +19,7 @@ "x:watch": "sh -c 'if command -v bun > /dev/null 2>&1; then bun --watch \"$@\"; else tsx --watch --conditions bun \"$@\"; fi' --", "dev:serve": "pnpm x:watch src/bin/sync-service.ts serve --temporal-address localhost:7233 --port 4020", "dev:worker": "pnpm x:watch src/bin/sync-service.ts worker --temporal-address localhost:7233", + "lint": "eslint src/", "test": "vitest run", "test:integration": "vitest run --config vitest.integration.config.ts", "generate:types": "openapi-typescript src/__generated__/openapi.json -o src/__generated__/openapi.d.ts" @@ -34,6 +35,7 @@ "@stripe/sync-destination-postgres": "workspace:*", "@stripe/sync-engine": "workspace:*", "@stripe/sync-hono-zod-openapi": "workspace:*", + "@stripe/sync-logger": "workspace:*", "@stripe/sync-protocol": "workspace:*", "@stripe/sync-source-stripe": "workspace:*", "@stripe/sync-ts-cli": "workspace:*", diff --git a/apps/service/src/api/app.integration.test.ts b/apps/service/src/api/app.integration.test.ts index e14cfcc51..fc1812087 100644 --- a/apps/service/src/api/app.integration.test.ts +++ b/apps/service/src/api/app.integration.test.ts @@ -282,7 +282,8 @@ describe('pipeline integration', () => { // 11. Pipeline should be gone from list and get const { data: listAfter } = await c.GET('/pipelines') - expect(listAfter!.data.find((p: any) => p.id === id)).toBeUndefined() + const match = listAfter!.data.find((p: Record) => p.id === id) + expect(match).toBeUndefined() const { error: getAfter } = await c.GET('/pipelines/{id}', { params: { path: { id } }, diff --git a/apps/service/src/cli.ts b/apps/service/src/cli.ts index 5f6328061..7fa71aae3 100644 --- a/apps/service/src/cli.ts +++ b/apps/service/src/cli.ts @@ -204,14 +204,14 @@ export async function createProgram() { terminate: async () => {}, }), list: async function* () {}, - } as any + } as unknown as Parameters[0]['temporal']['client'] const mockStore = { get: async () => ({}), set: async () => {}, update: async () => ({}), delete: async () => {}, list: async () => [], - } as any + } as unknown as Parameters[0]['pipelineStore'] const resolver = await resolverPromise const mockApp = createApp({ diff --git a/apps/service/src/lib/createSchemas.ts b/apps/service/src/lib/createSchemas.ts index eb87493c7..f1581eb4d 100644 --- a/apps/service/src/lib/createSchemas.ts +++ b/apps/service/src/lib/createSchemas.ts @@ -75,11 +75,14 @@ export function createSchemas(resolver: ConnectorResolver) { return z.object({ type: z.literal(name), [name]: obj }) }) - // eslint-disable-next-line @typescript-eslint/no-explicit-any const SourceConfig = sourceVariants.length > 0 ? z - .discriminatedUnion('type', sourceVariants as [any, any, ...any[]]) + .discriminatedUnion( + 'type', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + sourceVariants as [any, any, ...any[]] + ) .meta({ id: connectorUnionId('Source') }) : z.object({ type: z.string() }).catchall(z.unknown()) @@ -92,11 +95,14 @@ export function createSchemas(resolver: ConnectorResolver) { return z.object({ type: z.literal(name), [name]: obj }) }) - // eslint-disable-next-line @typescript-eslint/no-explicit-any const DestinationConfig = destVariants.length > 0 ? z - .discriminatedUnion('type', destVariants as [any, any, ...any[]]) + .discriminatedUnion( + 'type', + // eslint-disable-next-line @typescript-eslint/no-explicit-any + destVariants as [any, any, ...any[]] + ) .meta({ id: connectorUnionId('Destination') }) : z.object({ type: z.string() }).catchall(z.unknown()) diff --git a/apps/service/src/lib/stores.ts b/apps/service/src/lib/stores.ts index 537c31bf3..7eeaf5350 100644 --- a/apps/service/src/lib/stores.ts +++ b/apps/service/src/lib/stores.ts @@ -1,4 +1,3 @@ -import type { PipelineConfig } from '@stripe/sync-protocol' import type { Pipeline } from './createSchemas.js' export type { Pipeline } diff --git a/apps/service/src/logger.ts b/apps/service/src/logger.ts index 8f1b9244d..914f672b8 100644 --- a/apps/service/src/logger.ts +++ b/apps/service/src/logger.ts @@ -1,9 +1,3 @@ -import pino from 'pino' +import { createLogger } from '@stripe/sync-logger' -export const logger = pino({ - level: process.env.LOG_LEVEL ?? 'info', - redact: { - paths: ['*.api_key', '*.connection_string', '*.password', '*.url'], - censor: '[redacted]', - }, -}) +export const logger = createLogger({ name: 'service' }) diff --git a/apps/service/src/temporal/activities/pipeline-sync.ts b/apps/service/src/temporal/activities/pipeline-sync.ts index 15d207ba1..00c90a9d1 100644 --- a/apps/service/src/temporal/activities/pipeline-sync.ts +++ b/apps/service/src/temporal/activities/pipeline-sync.ts @@ -1,5 +1,6 @@ import { ApplicationFailure } from '@temporalio/activity' import type { SourceInputMessage, SourceReadOptions } from '@stripe/sync-engine' +import { logger } from '../../logger.js' import type { ActivitiesContext } from './_shared.js' import { asIterable, drainMessages, type RunResult } from './_shared.js' import { classifySyncErrors, summarizeSyncErrors } from '../sync-errors.js' @@ -33,7 +34,8 @@ export function createPipelineSyncActivity(context: ActivitiesContext) { const { transient, permanent } = classifySyncErrors(errors) if (permanent.length > 0) { if (transient.length > 0) { - console.warn( + logger.warn( + { transientCount: transient.length, permanentCount: permanent.length }, `Transient errors suppressed by permanent failures: ${summarizeSyncErrors(transient)}` ) } diff --git a/apps/service/src/temporal/activities/pipeline-teardown.ts b/apps/service/src/temporal/activities/pipeline-teardown.ts index 3f936f004..4680db7b6 100644 --- a/apps/service/src/temporal/activities/pipeline-teardown.ts +++ b/apps/service/src/temporal/activities/pipeline-teardown.ts @@ -1,5 +1,4 @@ import { drain } from '@stripe/sync-protocol' -import type { Message } from '@stripe/sync-protocol' import type { ActivitiesContext } from './_shared.js' diff --git a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts index 645ea29fd..55391319c 100644 --- a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts @@ -48,7 +48,7 @@ export async function googleSheetPipelineWorkflow( let desiredStatus: DesiredStatus = opts?.desiredStatus ?? 'active' let syncState: SourceState = opts?.syncState ?? { streams: {}, global: {} } let readState: SourceState = opts?.readState ?? syncState - let rowIndexMap: RowIndexMap = opts?.rowIndexMap ?? {} + const rowIndexMap: RowIndexMap = opts?.rowIndexMap ?? {} let catalog: ConfiguredCatalog | undefined = opts?.catalog let state: GoogleSheetWorkflowState = { ...opts?.state } const writeRps = opts?.writeRps diff --git a/apps/supabase/build.mjs b/apps/supabase/build.mjs index 22d9753dd..2226f5072 100644 --- a/apps/supabase/build.mjs +++ b/apps/supabase/build.mjs @@ -82,7 +82,7 @@ await esbuild.build({ format: 'esm', platform: 'node', target: 'node22', - external: ['npm:*', 'esbuild'], + external: ['npm:*', 'esbuild', 'pino', 'pino-pretty'], plugins: [rawTsBundledPlugin], }) diff --git a/apps/supabase/package.json b/apps/supabase/package.json index 02d9c8ba8..32eef6085 100644 --- a/apps/supabase/package.json +++ b/apps/supabase/package.json @@ -23,6 +23,7 @@ "src" ], "dependencies": { + "@stripe/sync-logger": "workspace:*", "@stripe/sync-protocol": "workspace:*", "@stripe/sync-engine": "workspace:*", "@stripe/sync-source-stripe": "workspace:*", diff --git a/apps/supabase/src/edge-functions/stripe-setup.ts b/apps/supabase/src/edge-functions/stripe-setup.ts index 15e3c1966..ebfb4aeff 100644 --- a/apps/supabase/src/edge-functions/stripe-setup.ts +++ b/apps/supabase/src/edge-functions/stripe-setup.ts @@ -7,9 +7,12 @@ */ import { runMigrationsFromContent, migrations } from '@stripe/sync-state-postgres' +import { createLogger } from '@stripe/sync-logger' import Stripe from 'npm:stripe' import pg from 'npm:pg@8' +const logger = createLogger({ name: 'stripe-setup' }) + // --------------------------------------------------------------------------- // Helpers (inlined — edge functions must be self-contained) // --------------------------------------------------------------------------- @@ -130,8 +133,8 @@ async function deleteSecret( }) if (!response.ok && response.status !== 404) { - const text = await response.text() - console.warn(`Failed to delete secret ${secretName}: ${response.status} ${text}`) + await response.text() + logger.warn({ secretName, status: response.status }, `Failed to delete secret`) } } @@ -190,8 +193,8 @@ async function handleSetupPost(req: Request): Promise { // Clear skip_until so cron resumes immediately after reinstall try { await pool.query(`DELETE FROM vault.secrets WHERE name = 'stripe_sync_skip_until'`) - } catch (err) { - console.warn('Could not delete skip_until vault secret:', err) + } catch { + logger.warn('Could not delete skip_until vault secret') } // Clear stale sync state so the new install starts fresh @@ -215,9 +218,9 @@ async function handleSetupPost(req: Request): Promise { for (const old of managedWebhooks.filter((wh) => wh.url !== webhookUrl)) { try { await stripe.webhookEndpoints.del(old.id) - console.log(`Deleted legacy webhook ${old.id} (${old.url})`) + logger.info({ webhookId: old.id }, 'Deleted legacy webhook') } catch (err) { - console.warn(`Could not delete legacy webhook ${old.id}:`, err) + logger.warn({ err, webhookId: old.id }, 'Could not delete legacy webhook') } } @@ -257,13 +260,13 @@ async function handleSetupPost(req: Request): Promise { }) } catch (error: unknown) { const err = error as Error - console.error('Setup error:', error) + logger.error({ err }, 'Setup error') if (pool) { try { await pool.query('SELECT pg_advisory_unlock_all()') await pool.end() - } catch (cleanupErr) { - console.warn('Cleanup failed:', cleanupErr) + } catch { + logger.warn('Cleanup failed') } } return jsonResponse({ success: false, error: err.message }, 500) @@ -307,8 +310,8 @@ async function handleSetupGet(_req: Request): Promise { ORDER BY account_id, started_at DESC `) syncStatus = syncResult.rows - } catch (err) { - console.warn('sync_runs query failed (may not exist yet):', err) + } catch { + logger.warn('sync_runs query failed (may not exist yet)') } } @@ -330,7 +333,7 @@ async function handleSetupGet(_req: Request): Promise { ) } catch (error: unknown) { const err = error as Error - console.error('Status query error:', error) + logger.error({ err }, 'Status query error') return jsonResponse( { error: err.message, @@ -378,14 +381,14 @@ async function handleSetupDelete(req: Request): Promise { if (wh.metadata?.managed_by === 'stripe-sync') { try { await stripe.webhookEndpoints.del(wh.id) - console.log(`Deleted webhook: ${wh.id}`) + logger.info({ webhookId: wh.id }, 'Deleted webhook') } catch (err) { - console.warn(`Could not delete webhook ${wh.id}:`, err) + logger.warn({ err, webhookId: wh.id }, 'Could not delete webhook') } } } } catch (err) { - console.warn(`Could not get webhooks:`, err) + logger.warn({ err }, 'Could not get webhooks') } // Unschedule pg_cron jobs @@ -401,8 +404,8 @@ async function handleSetupDelete(req: Request): Promise { END IF; END $$; `) - } catch (err) { - console.warn('Could not unschedule pg_cron job:', err) + } catch { + logger.warn('Could not unschedule pg_cron job') } // Delete vault secrets @@ -411,16 +414,16 @@ async function handleSetupDelete(req: Request): Promise { DELETE FROM vault.secrets WHERE name IN ('stripe_sync_worker_secret', 'stripe_sigma_worker_secret') `) - } catch (err) { - console.warn('Could not delete vault secret:', err) + } catch { + logger.warn('Could not delete vault secret') } // Drop Sigma self-trigger function if present try { const dropSchema = syncTablesSchemaName.replace(/"/g, '""') await pool.query(`DROP FUNCTION IF EXISTS "${dropSchema}".trigger_sigma_worker()`) - } catch (err) { - console.warn('Could not drop sigma trigger function:', err) + } catch { + logger.warn('Could not drop sigma trigger function') } // Terminate connections holding locks on schema @@ -433,8 +436,8 @@ async function handleSetupDelete(req: Request): Promise { WHERE n.nspname = $1 AND l.pid != pg_backend_pid()`, [syncTablesSchemaName] ) - } catch (err) { - console.warn('Could not terminate connections:', err) + } catch { + logger.warn('Could not terminate connections') } // Drop schema(s) with retry @@ -475,8 +478,8 @@ async function handleSetupDelete(req: Request): Promise { ]) { try { await deleteSecret(projectRef, secretName, accessToken) - } catch (err) { - console.warn(`Could not delete ${secretName} secret:`, err) + } catch { + logger.warn({ secretName }, 'Could not delete secret') } } @@ -491,20 +494,20 @@ async function handleSetupDelete(req: Request): Promise { ]) { try { await deleteEdgeFunction(projectRef, slug, accessToken) - } catch (err) { - console.warn(`Could not delete ${slug} function:`, err) + } catch { + logger.warn({ slug }, 'Could not delete edge function') } } return jsonResponse({ success: true, message: 'Uninstall complete' }) } catch (error: unknown) { const err = error as Error - console.error('Uninstall error:', error) + logger.error({ err }, 'Uninstall error') if (pool) { try { await pool.end() - } catch (cleanupErr) { - console.warn('Cleanup failed:', cleanupErr) + } catch { + logger.warn('Cleanup failed') } } return jsonResponse({ success: false, error: err.message }, 500) diff --git a/apps/supabase/src/edge-functions/stripe-sync.ts b/apps/supabase/src/edge-functions/stripe-sync.ts index 82e741ec4..928531bf8 100644 --- a/apps/supabase/src/edge-functions/stripe-sync.ts +++ b/apps/supabase/src/edge-functions/stripe-sync.ts @@ -15,8 +15,11 @@ import sourceStripe, { DEFAULT_SYNC_OBJECTS, } from '@stripe/sync-source-stripe' import destinationPostgres, { type Config as DestConfig } from '@stripe/sync-destination-postgres' +import { createLogger } from '@stripe/sync-logger' import pg from 'npm:pg@8' +const logger = createLogger({ name: 'stripe-sync' }) + // --------------------------------------------------------------------------- // Helpers (inlined — edge functions must be self-contained) // --------------------------------------------------------------------------- @@ -97,7 +100,7 @@ Deno.serve(async (req) => { const skipUntil = Number(skipRows[0].decrypted_secret) const remaining = Math.round((skipUntil - Date.now()) / 1000) if (skipUntil > Date.now()) { - console.log(`Skipping — skip_until is ${remaining}s in the future`) + logger.info({ remainingSeconds: remaining }, 'Skipping — skip_until in the future') await pool.end() return jsonResponse({ skipped: true, @@ -107,8 +110,8 @@ Deno.serve(async (req) => { // skip_until has passed — delete it and continue with sync await pool.query(`DELETE FROM vault.secrets WHERE name = 'stripe_sync_skip_until'`) } - } catch (err) { - console.warn('Could not read skip_until from vault:', err) + } catch { + logger.warn('Could not read skip_until from vault') } const stateStore = createScopedPgStateStore(pool, schemaName, 'default') @@ -130,12 +133,13 @@ Deno.serve(async (req) => { await pool.query(`SELECT vault.create_secret($1, 'stripe_sync_skip_until')`, [ skipUntilMs, ]) - } catch (err) { - console.warn('Could not write skip_until to vault:', err) + } catch { + logger.warn('Could not write skip_until to vault') } const remainingSec = Math.round(SYNC_INTERVAL - elapsed) - console.log( - `Skipping — all streams complete ${Math.round(elapsed)}s ago, next sync in ${remainingSec}s` + logger.info( + { elapsedSeconds: Math.round(elapsed), nextSyncSeconds: remainingSec }, + 'Skipping — all streams complete, next sync scheduled' ) await pool.end() return jsonResponse({ @@ -175,7 +179,6 @@ Deno.serve(async (req) => { } // Consume setup generator to run migrations/table creation - // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _msg of destinationPostgres.setup({ config: destConfig, catalog })) { // setup yields control messages; we don't need them here } @@ -207,8 +210,9 @@ Deno.serve(async (req) => { const elapsed = ((Date.now() - startedAt) / 1000).toFixed(1) await pool.end() - console.log( - `Sync pass done — ${records} rows, ${checkpoints} checkpoints, ${elapsed}s elapsed (${stopReason})` + logger.info( + { records, checkpoints, elapsedSeconds: Number(elapsed), stopReason }, + 'Sync pass done' ) return jsonResponse({ @@ -220,7 +224,7 @@ Deno.serve(async (req) => { }) } catch (error: unknown) { const err = error as Error - console.error('Sync error:', error) + logger.error({ err }, 'Sync error') try { await pool.end() } catch {} diff --git a/apps/supabase/src/edge-functions/stripe-webhook.ts b/apps/supabase/src/edge-functions/stripe-webhook.ts index f31f2bd97..dcc78fbaf 100644 --- a/apps/supabase/src/edge-functions/stripe-webhook.ts +++ b/apps/supabase/src/edge-functions/stripe-webhook.ts @@ -9,6 +9,9 @@ import sourceStripe, { DEFAULT_SYNC_OBJECTS, } from '@stripe/sync-source-stripe' import destinationPostgres, { type Config as DestConfig } from '@stripe/sync-destination-postgres' +import { createLogger } from '@stripe/sync-logger' + +const logger = createLogger({ name: 'stripe-webhook' }) // --------------------------------------------------------------------------- // Helpers (inlined — edge functions must be self-contained) @@ -70,7 +73,6 @@ Deno.serve(async (req) => { yield { body: rawBody, signature: sig } })() ) - // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _stateMsg of destinationPostgres.write( { config: destConfig, catalog }, messages @@ -80,7 +82,7 @@ Deno.serve(async (req) => { return jsonResponse({ received: true }) } catch (error: unknown) { const err = error as Error & { type?: string } - console.error('Webhook processing error:', error) + logger.error({ err }, 'Webhook processing error') const isSignatureError = err.message?.includes('signature') || err.type === 'StripeSignatureVerificationError' const status = isSignatureError ? 400 : 500 diff --git a/apps/supabase/src/supabase.ts b/apps/supabase/src/supabase.ts index cadb0c2c8..45c02a86a 100644 --- a/apps/supabase/src/supabase.ts +++ b/apps/supabase/src/supabase.ts @@ -1,8 +1,11 @@ import { SupabaseManagementAPI } from 'supabase-management-js' +import { createLogger } from '@stripe/sync-logger' import { setupFunctionCode, webhookFunctionCode, syncFunctionCode } from './edge-function-code.js' import pkg from '../package.json' with { type: 'json' } import { parseSchemaComment, StripeSchemaComment } from './schemaComment.js' +const logger = createLogger({ name: 'supabase' }) + export interface DeployClientOptions { accessToken: string projectRef: string @@ -483,8 +486,7 @@ export class SupabaseSetupClient { try { await this.invokeFunction('stripe-worker', 'POST', this.workerSecret) } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error) - console.warn(`Failed to invoke stripe-worker: ${errorMessage}`) + logger.warn({ err: error }, 'Failed to invoke stripe-worker') } } } catch (error) { diff --git a/e2e/connector-loading.test.sh b/e2e/connector-loading.test.sh index 5b42f6a24..c7a333ad8 100755 --- a/e2e/connector-loading.test.sh +++ b/e2e/connector-loading.test.sh @@ -35,6 +35,7 @@ cleanup() { rm -f "$REPO_ROOT"/stripe-sync-ts-cli-*.tgz rm -f "$REPO_ROOT"/stripe-sync-hono-zod-openapi-*.tgz rm -f "$REPO_ROOT"/stripe-sync-integration-supabase-*.tgz + rm -f "$REPO_ROOT"/stripe-sync-logger-*.tgz } trap cleanup EXIT @@ -58,9 +59,10 @@ UTIL_PG_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-util-postgres pack 2 TSCLI_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-ts-cli pack 2>/dev/null | tail -1) HONO_ZOD_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-hono-zod-openapi pack 2>/dev/null | tail -1) SUPABASE_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-integration-supabase pack 2>/dev/null | tail -1) +LOGGER_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-logger pack 2>/dev/null | tail -1) for tgz in "$PROTOCOL_TGZ" "$OPENAPI_TGZ" "$ENGINE_TGZ" "$SOURCE_TGZ" "$DEST_TGZ" "$DEST_SHEETS_TGZ" \ - "$STATE_PG_TGZ" "$UTIL_PG_TGZ" "$TSCLI_TGZ" "$HONO_ZOD_TGZ" "$SUPABASE_TGZ"; do + "$STATE_PG_TGZ" "$UTIL_PG_TGZ" "$TSCLI_TGZ" "$HONO_ZOD_TGZ" "$SUPABASE_TGZ" "$LOGGER_TGZ"; do if [ ! -f "$tgz" ]; then echo "FAIL: tarball not found: $tgz" exit 1 @@ -100,14 +102,15 @@ cat > package.json <&1 | tail -5 echo "" diff --git a/eslint.config.mjs b/eslint.config.mjs index 0f493ba21..6c06146be 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -13,6 +13,9 @@ const compat = new FlatCompat({ }) export default [ + { + ignores: ['**/__generated__/**'], + }, ...compat.extends('plugin:@typescript-eslint/recommended', 'plugin:prettier/recommended'), { languageOptions: { @@ -22,7 +25,26 @@ export default [ }, rules: { '@typescript-eslint/ban-ts-comment': 'off', - '@typescript-eslint/no-unused-vars': ['error', { argsIgnorePattern: '^_' }], + '@typescript-eslint/no-unused-vars': [ + 'error', + { argsIgnorePattern: '^_', varsIgnorePattern: '^_' }, + ], + 'no-console': 'error', + }, + }, + { + files: [ + '**/*.test.ts', + '**/__tests__/**', + 'e2e/**', + 'scripts/**', + 'docs/**', + '**/scripts/**', + 'apps/visualizer/**', + 'apps/dashboard/**', + ], + rules: { + 'no-console': 'off', }, }, ] diff --git a/packages/destination-google-sheets/package.json b/packages/destination-google-sheets/package.json index d81adbda4..8c87adbce 100644 --- a/packages/destination-google-sheets/package.json +++ b/packages/destination-google-sheets/package.json @@ -15,6 +15,7 @@ }, "scripts": { "build": "tsc", + "lint": "eslint src/", "test": "vitest" }, "files": [ diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index deac75161..5effa32a6 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -2,17 +2,14 @@ import type { Destination, DestinationInput } from '@stripe/sync-protocol' import { destinationControlMsg } from '@stripe/sync-protocol' import type { sheets_v4 } from 'googleapis' import { google } from 'googleapis' -import { z } from 'zod' import { - GOOGLE_SHEETS_META_LOG_PREFIX, formatGoogleSheetsMetaLog, - parseGoogleSheetsMetaLog, ROW_KEY_FIELD, ROW_NUMBER_FIELD, serializeRowKey, stripSystemFields, } from './metadata.js' -import defaultSpec, { configSchema } from './spec.js' +import defaultSpec from './spec.js' import type { Config } from './spec.js' import { appendRows, diff --git a/packages/destination-postgres/package.json b/packages/destination-postgres/package.json index d9a605f6e..89c3a07dd 100644 --- a/packages/destination-postgres/package.json +++ b/packages/destination-postgres/package.json @@ -15,6 +15,7 @@ }, "scripts": { "build": "tsc", + "lint": "eslint src/", "test": "vitest" }, "files": [ @@ -22,6 +23,7 @@ "src" ], "dependencies": { + "@stripe/sync-logger": "workspace:*", "@stripe/sync-protocol": "workspace:*", "@stripe/sync-util-postgres": "workspace:*", "pg": "^8.16.3", diff --git a/packages/destination-postgres/src/aws.test.ts b/packages/destination-postgres/src/aws.test.ts index 1bbbaabcf..55f09e5ad 100644 --- a/packages/destination-postgres/src/aws.test.ts +++ b/packages/destination-postgres/src/aws.test.ts @@ -5,7 +5,7 @@ const mockGetAuthToken = vi.fn() vi.mock('@aws-sdk/client-sts', () => ({ STSClient: vi.fn(() => ({ send: mockSend })), - AssumeRoleCommand: vi.fn((params: any) => ({ input: params })), + AssumeRoleCommand: vi.fn((params: Record) => ({ input: params })), })) vi.mock('@aws-sdk/rds-signer', () => ({ diff --git a/packages/destination-postgres/src/aws.ts b/packages/destination-postgres/src/aws.ts index e4db78c5d..5055c59e4 100644 --- a/packages/destination-postgres/src/aws.ts +++ b/packages/destination-postgres/src/aws.ts @@ -30,17 +30,17 @@ async function assumeRole( externalId: string | undefined, region: string ): Promise { - let STSClient: any, AssumeRoleCommand: any + let stsMod: typeof import('@aws-sdk/client-sts') try { - ;({ STSClient, AssumeRoleCommand } = await import('@aws-sdk/client-sts')) + stsMod = await import('@aws-sdk/client-sts') } catch { throw new Error( '@aws-sdk/client-sts is required for AWS IAM auth. Install it: pnpm add @aws-sdk/client-sts' ) } - const sts = new STSClient({ region }) - const command = new AssumeRoleCommand({ + const sts = new stsMod.STSClient({ region }) + const command = new stsMod.AssumeRoleCommand({ RoleArn: roleArn, RoleSessionName: 'sync-engine', ...(externalId ? { ExternalId: externalId } : {}), @@ -71,9 +71,9 @@ async function assumeRole( * Performs an eager validation call on build to surface config errors early. */ export async function buildRdsIamPasswordFn(config: RdsIamConfig): Promise<() => Promise> { - let Signer: any + let signerMod: typeof import('@aws-sdk/rds-signer') try { - ;({ Signer } = await import('@aws-sdk/rds-signer')) + signerMod = await import('@aws-sdk/rds-signer') } catch { throw new Error( '@aws-sdk/rds-signer is required for AWS IAM auth. Install it: pnpm add @aws-sdk/rds-signer' @@ -91,22 +91,19 @@ export async function buildRdsIamPasswordFn(config: RdsIamConfig): Promise<() => cachedCredentials = await assumeRole(config.roleArn, config.externalId, config.region) } - const signerOptions: Record = { + const signer = new signerMod.Signer({ hostname: config.host, port: config.port, username: config.user, region: config.region, - } - - if (cachedCredentials) { - signerOptions.credentials = { - accessKeyId: cachedCredentials.accessKeyId, - secretAccessKey: cachedCredentials.secretAccessKey, - sessionToken: cachedCredentials.sessionToken, - } - } - - const signer = new Signer(signerOptions) + ...(cachedCredentials && { + credentials: { + accessKeyId: cachedCredentials.accessKeyId, + secretAccessKey: cachedCredentials.secretAccessKey, + sessionToken: cachedCredentials.sessionToken, + }, + }), + }) return signer.getAuthToken() } } diff --git a/packages/destination-postgres/src/index.test.ts b/packages/destination-postgres/src/index.test.ts index 9c3289a07..6c1f0e547 100644 --- a/packages/destination-postgres/src/index.test.ts +++ b/packages/destination-postgres/src/index.test.ts @@ -10,7 +10,6 @@ import type { SourceStateMessage, } from '@stripe/sync-protocol' import { collectFirst, drain } from '@stripe/sync-protocol' -import type { Message } from '@stripe/sync-protocol' // --------------------------------------------------------------------------- // Docker Postgres lifecycle diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 8552e8c78..236f89c15 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -1,6 +1,7 @@ import pg from 'pg' import type { PoolConfig } from 'pg' import type { Destination, DestinationInput, LogMessage } from '@stripe/sync-protocol' +import { createConnectorLogger } from '@stripe/sync-logger' import { sql, sslConfigFromConnectionString, @@ -13,6 +14,8 @@ import { buildCreateTableDDL } from './schemaProjection.js' import defaultSpec from './spec.js' import type { Config } from './spec.js' +const logger = createConnectorLogger('destination-postgres') + function logMsg(message: string, level: LogMessage['log']['level'] = 'info'): LogMessage { return { type: 'log', log: { level, message } } } @@ -109,7 +112,7 @@ function createPool(config: PoolConfig): pg.Pool { const pool = new pg.Pool(config) // Destination connectors should surface pool failures without crashing the host process. pool.on('error', (err) => { - console.error('Postgres destination pool error:', err) + logger.error({ err }, 'Postgres destination pool error') }) return pool } diff --git a/packages/hono-zod-openapi/package.json b/packages/hono-zod-openapi/package.json index 1350690ed..261b73b94 100644 --- a/packages/hono-zod-openapi/package.json +++ b/packages/hono-zod-openapi/package.json @@ -14,6 +14,7 @@ "scripts": { "build": "tsc", "dev": "tsc --watch", + "lint": "eslint src/", "test": "vitest --passWithNoTests" }, "files": [ diff --git a/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts b/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts index c57729ecc..f0a1015a7 100644 --- a/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts +++ b/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts @@ -189,10 +189,11 @@ describe('JSON content header — OAS spec', () => { const app = createTestApp() const spec = app.getOpenAPI31Document({ info: { title: 'test', version: '1' }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any }) as any const params = spec.paths['/test'].post.parameters - const xData = params.find((p: any) => p.name === 'x-data') + const xData = params.find((p: { name: string }) => p.name === 'x-data') expect(xData).toBeDefined() expect(xData.in).toBe('header') @@ -222,10 +223,11 @@ describe('JSON content header — OAS spec', () => { const spec = app.getOpenAPI31Document({ info: { title: 'test', version: '1' }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any }) as any const params = spec.paths['/plain'].get.parameters - const xToken = params.find((p: any) => p.name === 'x-token') + const xToken = params.find((p: { name: string }) => p.name === 'x-token') expect(xToken.schema).toBeDefined() expect(xToken.content).toBeUndefined() }) @@ -234,6 +236,7 @@ describe('JSON content header — OAS spec', () => { const app = createTestApp() const spec = app.getOpenAPI31Document({ info: { title: 'test', version: '1' }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any }) as any // Item schema should appear as a named component @@ -243,7 +246,7 @@ describe('JSON content header — OAS spec', () => { // The header content schema should use $ref const params = spec.paths['/test'].post.parameters - const xData = params.find((p: any) => p.name === 'x-data') + const xData = params.find((p: { name: string }) => p.name === 'x-data') expect(xData.content['application/json'].schema.$ref).toBe('#/components/schemas/Item') }) }) diff --git a/packages/hono-zod-openapi/src/index.ts b/packages/hono-zod-openapi/src/index.ts index 4790ae700..352103fb8 100644 --- a/packages/hono-zod-openapi/src/index.ts +++ b/packages/hono-zod-openapi/src/index.ts @@ -17,15 +17,7 @@ import { Hono } from 'hono' import { zValidator } from '@hono/zod-validator' import { createDocument, createSchema } from 'zod-openapi' import type { Hook } from '@hono/zod-validator' -import type { - Context, - Env, - Handler, - Input, - MiddlewareHandler, - Schema, - ValidationTargets, -} from 'hono' +import type { Env, Handler, Input, MiddlewareHandler, Schema, ValidationTargets } from 'hono' import type { ZodOpenApiOperationObject, ZodOpenApiObject, @@ -246,6 +238,7 @@ function processJsonContentHeaders(op: ZodOpenApiOperationObject): { export class OpenAPIHono< E extends Env = Env, + // eslint-disable-next-line @typescript-eslint/no-empty-object-type S extends Schema = {}, BasePath extends string = '/', > extends Hono { diff --git a/packages/logger/package.json b/packages/logger/package.json new file mode 100644 index 000000000..020d73cf4 --- /dev/null +++ b/packages/logger/package.json @@ -0,0 +1,30 @@ +{ + "name": "@stripe/sync-logger", + "version": "0.1.0", + "private": true, + "type": "module", + "exports": { + ".": { + "bun": "./src/index.ts", + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "scripts": { + "build": "tsc", + "lint": "eslint src/", + "test": "vitest --passWithNoTests" + }, + "dependencies": { + "pino": "^10" + }, + "devDependencies": { + "@types/node": "^24.5.0", + "pino-pretty": "^13", + "vitest": "^3.2.1" + }, + "files": [ + "dist", + "src" + ] +} diff --git a/packages/logger/src/index.ts b/packages/logger/src/index.ts new file mode 100644 index 000000000..c600e069f --- /dev/null +++ b/packages/logger/src/index.ts @@ -0,0 +1,8 @@ +export { + createLogger, + createConnectorLogger, + REQUEST_HEADER_REDACT, + type CreateLoggerOptions, + type Logger, +} from './logger.js' +export { REDACT_PATHS, REDACT_CENSOR, SECRET_PATTERNS, scrubSecrets } from './redaction.js' diff --git a/packages/logger/src/logger.test.ts b/packages/logger/src/logger.test.ts new file mode 100644 index 000000000..b8f1f3c27 --- /dev/null +++ b/packages/logger/src/logger.test.ts @@ -0,0 +1,453 @@ +import { Writable } from 'node:stream' +import { describe, it, expect, beforeEach } from 'vitest' +import { createLogger, createConnectorLogger, type Logger } from './logger.js' +import { REDACT_CENSOR, scrubSecrets, REDACT_PATHS } from './redaction.js' + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function createCapture(): { stream: Writable; lines: () => Record[] } { + const chunks: string[] = [] + const stream = new Writable({ + write(chunk, _encoding, cb) { + chunks.push(chunk.toString()) + cb() + }, + }) + return { + stream, + lines: () => + chunks + .join('') + .split('\n') + .filter((l) => l.trim()) + .map((l) => JSON.parse(l) as Record), + } +} + +function logAndCapture(logger: Logger, fn: (l: Logger) => void) { + fn(logger) +} + +// --------------------------------------------------------------------------- +// scrubSecrets +// --------------------------------------------------------------------------- + +describe('scrubSecrets', () => { + it('preserves URL structure when scrubbing credentials', () => { + expect(scrubSecrets('postgres://admin:s3cret@db.host.com:5432/mydb')).toBe( + `postgres://${REDACT_CENSOR}@db.host.com:5432/mydb` + ) + }) + + it('leaves clean text unchanged', () => { + expect(scrubSecrets('Sync completed: 42 records in 3.2s')).toBe( + 'Sync completed: 42 records in 3.2s' + ) + }) + + it.each([ + ['Stripe live key', 'sk_live_abc123XYZ789012'], + ['Stripe test key', 'sk_test_51HnGDhKJ3xyz'], + ['Stripe restricted key', 'rk_live_longkeyvalue1234'], + ['webhook signing secret', 'whsec_abc123XYZ789012'], + ['Bearer token', 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.payload.sig'], + ['URL credentials', 'postgres://admin:s3cret@db.host.com:5432/mydb'], + [ + 'Supabase JWT key', + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJvbGUiOiJhbm9uIn0.abcdefghijklmnopqrstuvwx', + ], + ['AWS access key', 'AKIAIOSFODNN7EXAMPLE'], + ['GitHub PAT', 'ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij'], + ['GitHub OAuth token', 'gho_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij'], + ['GitHub fine-grained PAT', 'github_pat_11AABBC_xyzXYZxyzXYZxy'], + ])('scrubs %s', (_label, secret) => { + const text = `error: ${secret} leaked` + const scrubbed = scrubSecrets(text) + expect(scrubbed).not.toContain(secret) + expect(scrubbed).toContain(REDACT_CENSOR) + }) +}) + +// --------------------------------------------------------------------------- +// REDACT_PATHS coverage +// --------------------------------------------------------------------------- + +describe('REDACT_PATHS', () => { + it('includes top-level and nested paths for every sensitive key', () => { + expect(REDACT_PATHS).toContain('api_key') + expect(REDACT_PATHS).toContain('*.api_key') + expect(REDACT_PATHS).toContain('connection_string') + expect(REDACT_PATHS).toContain('*.connection_string') + }) + + it('includes newly added keys', () => { + for (const key of ['credentials', 'cookie', 'private_key', 'stripe_key', 'dsn']) { + expect(REDACT_PATHS).toContain(key) + expect(REDACT_PATHS).toContain(`*.${key}`) + } + }) + + it('does not include bare *.data (overly broad)', () => { + expect(REDACT_PATHS).not.toContain('*.data') + expect(REDACT_PATHS).not.toContain('data') + }) + + it('includes record.data for synced payloads', () => { + expect(REDACT_PATHS).toContain('record.data') + expect(REDACT_PATHS).toContain('*.record.data') + }) +}) + +// --------------------------------------------------------------------------- +// createLogger — redaction +// --------------------------------------------------------------------------- + +describe('createLogger redaction', () => { + let capture: ReturnType + let logger: Logger + + beforeEach(() => { + capture = createCapture() + logger = createLogger({ destination: capture.stream, pretty: false, level: 'debug' }) + }) + + it('redacts top-level sensitive fields', () => { + logAndCapture(logger, (l) => l.info({ api_key: 'sk_test_123', msg_text: 'hello' }, 'test')) + logger.flush() + const out = capture.lines() + expect(out).toHaveLength(1) + expect(out[0].api_key).toBe(REDACT_CENSOR) + }) + + it('redacts nested sensitive fields', () => { + logAndCapture(logger, (l) => l.info({ config: { secret_key: 'whsec_abc123' } }, 'nested test')) + logger.flush() + const out = capture.lines() + expect(out).toHaveLength(1) + const config = out[0].config as Record + expect(config.secret_key).toBe(REDACT_CENSOR) + }) + + it('redacts record.data payloads', () => { + logAndCapture(logger, (l) => + l.info({ record: { data: { name: 'Alice', email: 'a@b.com' } } }, 'record') + ) + logger.flush() + const out = capture.lines() + const record = out[0].record as Record + expect(record.data).toBe(REDACT_CENSOR) + }) + + it('does not redact non-sensitive fields named "data" at top level', () => { + logAndCapture(logger, (l) => + l.info({ context: { data: { count: 5 } } }, 'should not be redacted') + ) + logger.flush() + const out = capture.lines() + const context = out[0].context as Record + expect(context.data).toEqual({ count: 5 }) + }) + + it('merges custom redactPaths with defaults', () => { + const custom = createCapture() + const customLogger = createLogger({ + destination: custom.stream, + pretty: false, + redactPaths: ['*.custom_secret'], + }) + customLogger.info({ config: { custom_secret: 'hidden' } }, 'custom') + customLogger.flush() + const out = custom.lines() + const config = out[0].config as Record + expect(config.custom_secret).toBe(REDACT_CENSOR) + }) +}) + +// --------------------------------------------------------------------------- +// createLogger — string value scrubbing (final safety hook) +// --------------------------------------------------------------------------- + +describe('createLogger string value scrubbing', () => { + let capture: ReturnType + let logger: Logger + + beforeEach(() => { + capture = createCapture() + logger = createLogger({ destination: capture.stream, pretty: false }) + }) + + it('scrubs secrets from arbitrary string fields', () => { + logAndCapture(logger, (l) => + l.error({ error: 'connection to postgres://admin:pass123@host:5432/db failed' }, 'db error') + ) + logger.flush() + const out = capture.lines() + expect(out[0].error as string).not.toContain('pass123') + expect(out[0].error as string).not.toContain('admin') + }) + + it('scrubs Stripe keys from any string field', () => { + logAndCapture(logger, (l) => + l.warn({ detail: 'Invalid key sk_test_51HnGDhKJ3xyzABC' }, 'auth failure') + ) + logger.flush() + const out = capture.lines() + expect(out[0].detail as string).not.toContain('sk_test_') + expect(out[0].detail as string).toContain(REDACT_CENSOR) + }) + + it('scrubs secrets from message strings', () => { + logAndCapture(logger, (l) => + l.error('connection to postgres://admin:pass123@host:5432/db failed') + ) + logger.flush() + const out = capture.lines() + expect(out[0].msg as string).not.toContain('pass123') + expect(out[0].msg as string).not.toContain('admin') + expect(out[0].msg as string).toContain(REDACT_CENSOR) + }) + + it('scrubs secrets from nested string fields', () => { + logAndCapture(logger, (l) => + l.error( + { + request: { + url: 'postgres://admin:pass123@host:5432/db', + headers: ['Bearer abcdefghijklmnopqrstuvwxyz123456'], + }, + }, + 'nested secret' + ) + ) + logger.flush() + const out = capture.lines() + const request = out[0].request as Record + expect(request.url).toBe(`postgres://${REDACT_CENSOR}@host:5432/db`) + expect(request.headers).toEqual([REDACT_CENSOR]) + }) + + it('does not modify non-string fields', () => { + logAndCapture(logger, (l) => l.info({ count: 42, active: true }, 'stats')) + logger.flush() + const out = capture.lines() + expect(out[0].count).toBe(42) + expect(out[0].active).toBe(true) + }) + + it('leaves clean strings unchanged', () => { + logAndCapture(logger, (l) => l.info({ status: 'ok', region: 'us-east-1' }, 'healthy')) + logger.flush() + const out = capture.lines() + expect(out[0].status).toBe('ok') + expect(out[0].region).toBe('us-east-1') + }) +}) + +// --------------------------------------------------------------------------- +// createLogger — structured serializers +// --------------------------------------------------------------------------- + +describe('createLogger structured serializers', () => { + let capture: ReturnType + let logger: Logger + + beforeEach(() => { + capture = createCapture() + logger = createLogger({ destination: capture.stream, pretty: false, level: 'debug' }) + }) + + it('allowlists and redacts request headers', () => { + logAndCapture(logger, (l) => + l.debug( + { + request_headers: { + authorization: 'Bearer abcdefghijklmnopqrstuvwxyz123456', + cookie: 'session=top-secret', + 'content-type': 'application/json', + referer: 'https://example.com/?token=hidden', + 'x-custom': 'left-out', + }, + }, + 'request headers' + ) + ) + logger.flush() + const out = capture.lines() + expect(out[0].request_headers).toEqual({ + authorization: REDACT_CENSOR, + cookie: REDACT_CENSOR, + 'content-type': 'application/json', + omitted_header_count: 2, + }) + }) +}) + +// --------------------------------------------------------------------------- +// createLogger — error serializer +// --------------------------------------------------------------------------- + +describe('createLogger error serializer', () => { + let capture: ReturnType + let logger: Logger + + beforeEach(() => { + capture = createCapture() + logger = createLogger({ destination: capture.stream, pretty: false }) + }) + + it('scrubs Stripe keys from error messages', () => { + const err = new Error('Invalid API key sk_test_51HnGDhKJ3xyzABC provided') + logAndCapture(logger, (l) => l.error({ err }, 'request failed')) + logger.flush() + const out = capture.lines() + const serialized = out[0].err as Record + expect(serialized.message).not.toContain('sk_test_') + expect(serialized.message).toContain(REDACT_CENSOR) + }) + + it('scrubs connection strings from error messages', () => { + const err = new Error('connection to postgres://admin:pass123@host:5432/db failed') + logAndCapture(logger, (l) => l.error({ err }, 'db error')) + logger.flush() + const out = capture.lines() + const serialized = out[0].err as Record + expect(serialized.message as string).not.toContain('pass123') + expect(serialized.message as string).not.toContain('admin') + }) + + it('scrubs enumerable string properties on the error', () => { + const err = Object.assign(new Error('fail'), { + detail: 'Bearer abcdefghijklmnopqrstuvwxyz123456', + meta: { url: 'postgres://admin:pass123@host:5432/db' }, + }) + logAndCapture(logger, (l) => l.error({ err }, 'request failed')) + logger.flush() + const out = capture.lines() + const serialized = out[0].err as Record + expect(serialized.detail).toBe(REDACT_CENSOR) + expect(serialized.meta).toEqual({ url: `postgres://${REDACT_CENSOR}@host:5432/db` }) + }) + + it('bounds deep recursive scrubbing on error properties', () => { + let nested: Record = { secret: 'sk_test_51HnGDhKJ3xyzABC' } + for (const k of ['f', 'e', 'd', 'c', 'b', 'a']) nested = { [k]: nested } + const err = Object.assign(new Error('fail'), { nested }) + logAndCapture(logger, (l) => l.error({ err }, 'depth bound')) + logger.flush() + const serialized = capture.lines()[0].err as Record + expect(serialized.nested).toEqual({ + a: { b: { c: { d: { e: { f: '[Truncated: depth limit]' } } } } }, + }) + }) + + it('summarizes binary error properties instead of logging their contents', () => { + const err = Object.assign(new Error('fail'), { + payload: Buffer.from('secret'), + }) + logAndCapture(logger, (l) => l.error({ err }, 'binary payload')) + logger.flush() + const out = capture.lines() + const serialized = out[0].err as Record + expect(serialized.payload).toEqual({ + type: 'Buffer', + byte_length: 6, + data_redacted: true, + }) + }) + + it('preserves extra enumerable properties on the error', () => { + const err = Object.assign(new Error('fail'), { code: 'ECONNREFUSED', port: 5432 }) + logAndCapture(logger, (l) => l.error({ err }, 'connection error')) + logger.flush() + const out = capture.lines() + const serialized = out[0].err as Record + expect(serialized.code).toBe('ECONNREFUSED') + expect(serialized.port).toBe(5432) + expect(serialized.type).toBe('Error') + }) +}) + +describe('createLogger pretty env parsing', () => { + it.each(['0', 'false'])('treats LOG_PRETTY=%s as disabled', (value) => { + const origEnv = process.env.LOG_PRETTY + const capture = createCapture() + try { + process.env.LOG_PRETTY = value + const logger = createLogger({ destination: capture.stream }) + logger.info({ ok: true }, 'hello') + logger.flush() + expect(capture.lines()).toHaveLength(1) + } finally { + if (origEnv === undefined) { + delete process.env.LOG_PRETTY + } else { + process.env.LOG_PRETTY = origEnv + } + } + }) +}) + +// --------------------------------------------------------------------------- +// createConnectorLogger +// --------------------------------------------------------------------------- + +describe('createConnectorLogger', () => { + it('creates a named logger', () => { + const logger = createConnectorLogger('test-connector') + expect(logger).toBeDefined() + }) + + it('is not affected by LOG_PRETTY env var', () => { + const origEnv = process.env.LOG_PRETTY + try { + process.env.LOG_PRETTY = '1' + const logger = createConnectorLogger('test-connector') + expect(logger).toBeDefined() + } finally { + if (origEnv === undefined) { + delete process.env.LOG_PRETTY + } else { + process.env.LOG_PRETTY = origEnv + } + } + }) +}) + +// --------------------------------------------------------------------------- +// createLogger — named logger output +// --------------------------------------------------------------------------- + +describe('createLogger named logger output', () => { + it('outputs valid NDJSON with the configured name', () => { + const capture = createCapture() + const logger = createLogger({ + name: 'test-connector', + destination: capture.stream, + pretty: false, + }) + logger.info({ stream: 'customers' }, 'sync started') + logger.flush() + const out = capture.lines() + expect(out).toHaveLength(1) + expect(out[0].name).toBe('test-connector') + expect(out[0].stream).toBe('customers') + expect(out[0].msg).toBe('sync started') + expect(out[0].level).toBe(30) + }) + + it('redacts secrets in named logger output', () => { + const capture = createCapture() + const logger = createLogger({ + name: 'test-connector', + destination: capture.stream, + pretty: false, + }) + logger.error({ api_key: 'sk_test_secret123' }, 'auth failed') + logger.flush() + const out = capture.lines() + expect(out[0].api_key).toBe(REDACT_CENSOR) + expect(JSON.stringify(out[0])).not.toContain('sk_test_secret123') + }) +}) diff --git a/packages/logger/src/logger.ts b/packages/logger/src/logger.ts new file mode 100644 index 000000000..61db9dd53 --- /dev/null +++ b/packages/logger/src/logger.ts @@ -0,0 +1,163 @@ +import pino, { type Logger, type LoggerOptions } from 'pino' +import { + REDACT_CENSOR, + REDACT_PATHS, + scrubSecrets, + scrubValue, + isPlainObject, +} from './redaction.js' + +export type { Logger } + +export type CreateLoggerOptions = { + name?: string + level?: string + /** + * Additional redaction paths beyond the defaults. Uses fast-redact syntax. + * Note: default paths only cover depth 0 and 1. For deeper nesting, supply + * explicit paths here (e.g. `['outer.inner.api_key']`). + */ + redactPaths?: string[] + /** Enable pretty-printing (reads LOG_PRETTY env var by default) */ + pretty?: boolean + /** Pino destination — defaults to stdout (fd 1) */ + destination?: pino.DestinationStream | number +} + +const REQUEST_HEADER_ALLOWLIST = new Set([ + 'accept', + 'accept-encoding', + 'accept-language', + 'content-length', + 'content-type', + 'host', + 'traceparent', + 'tracestate', + 'user-agent', + 'x-request-id', +]) + +export const REQUEST_HEADER_REDACT = new Set([ + 'authorization', + 'cookie', + 'proxy-authorization', + 'set-cookie', + 'x-api-key', +]) + +function parseBooleanEnv(value: string | undefined): boolean { + if (value == null) return false + switch (value.trim().toLowerCase()) { + case '1': + case 'true': + case 'yes': + case 'on': + return true + default: + return false + } +} + +function errSerializer(err: Error): Record { + const obj: Record = { + type: err.constructor?.name ?? 'Error', + message: scrubSecrets(err.message), + ...(err.stack ? { stack: scrubSecrets(err.stack) } : {}), + } + for (const key of Object.keys(err)) { + if (key === 'message' || key === 'stack' || key === 'type') continue + obj[key] = scrubValue((err as unknown as Record)[key]) + } + return obj +} + +function requestHeadersSerializer(headers: unknown): Record { + if (!isPlainObject(headers)) { + return { value: scrubValue(headers) } + } + + const out: Record = {} + let omittedHeaderCount = 0 + + for (const [rawKey, rawValue] of Object.entries(headers)) { + const key = rawKey.toLowerCase() + if (REQUEST_HEADER_REDACT.has(key)) { + out[key] = REDACT_CENSOR + continue + } + if (!REQUEST_HEADER_ALLOWLIST.has(key)) { + omittedHeaderCount += 1 + continue + } + out[key] = scrubValue(rawValue) + } + + if (omittedHeaderCount > 0) { + out.omitted_header_count = omittedHeaderCount + } + + return out +} + +/** + * Create a structured logger with PII redaction built in. + * All sync-engine packages should use this instead of raw console calls. + * If logs are exported to a collector, keep downstream redaction enabled as a final safety net. + */ +export function createLogger(opts: CreateLoggerOptions = {}): Logger { + const level = opts.level ?? process.env.LOG_LEVEL ?? 'info' + const pretty = opts.pretty ?? parseBooleanEnv(process.env.LOG_PRETTY) + + const redactPaths = [...REDACT_PATHS, ...(opts.redactPaths ?? [])] + + const options: LoggerOptions = { + level, + redact: { + paths: redactPaths, + censor: REDACT_CENSOR, + }, + serializers: { + err: errSerializer, + request_headers: requestHeadersSerializer, + }, + hooks: { + // Final pass safety net for message strings and any secrets that escaped structured redaction. + streamWrite: scrubSecrets, + }, + ...(opts.name ? { name: opts.name } : {}), + } + + if (pretty) { + if (typeof opts.destination === 'number') { + options.transport = { + target: 'pino-pretty', + options: { destination: opts.destination }, + } + return pino(options) + } + // pino ignores transport when a destination stream is provided, + // so pretty-printing only works with fd-based destinations. + options.transport = { target: 'pino-pretty' } + return pino(options) + } + + if (opts.destination != null) { + return pino( + options, + typeof opts.destination === 'number' ? pino.destination(opts.destination) : opts.destination + ) + } + + return pino(options) +} + +const STDERR_FD = 2 + +/** + * Create a logger that writes structured JSON to stderr. + * Designed for subprocess connectors where stdout is the NDJSON data stream. + * Pretty-printing is always disabled to avoid corrupting the NDJSON stream on stdout. + */ +export function createConnectorLogger(name: string): Logger { + return createLogger({ name, destination: STDERR_FD, pretty: false }) +} diff --git a/packages/logger/src/redaction.ts b/packages/logger/src/redaction.ts new file mode 100644 index 000000000..9bb50839c --- /dev/null +++ b/packages/logger/src/redaction.ts @@ -0,0 +1,148 @@ +const SENSITIVE_KEYS = [ + 'api_key', + 'apiKey', + 'secret', + 'secret_key', + 'secretKey', + 'token', + 'access_token', + 'accessToken', + 'refresh_token', + 'refreshToken', + 'password', + 'authorization', + 'webhook_secret', + 'webhookSecret', + 'connection_string', + 'connectionString', + 'database_url', + 'databaseUrl', + 'credentials', + 'cookie', + 'cookies', + 'private_key', + 'privateKey', + 'stripe_key', + 'stripeKey', + 'supabase_key', + 'supabaseKey', + 'service_role_key', + 'serviceRoleKey', + 'dsn', +] + +/** + * `fast-redact` paths for depth 0 and 1 only (`key`, `*.key`). + * Depth 2+ secrets are caught by `SECRET_PATTERNS` via `streamWrite` + * or registered serializers. For deeper paths, pass custom `redactPaths`. + */ +export const REDACT_PATHS: string[] = [ + ...SENSITIVE_KEYS, + ...SENSITIVE_KEYS.map((k) => `*.${k}`), + + // Synced record payloads + 'record.data', + '*.record.data', + 'request_body', + '*.request_body', + 'requestBody', + '*.requestBody', + 'response_body', + '*.response_body', + 'responseBody', + '*.responseBody', +] + +export const REDACT_CENSOR = '[REDACTED]' + +/** Regexes for secrets in free text. Anchor with `\b`; must use the `g` flag. */ +export const SECRET_PATTERNS: RegExp[] = [ + /\b[sr]k_(live|test)_[A-Za-z0-9]{10,}\b/g, // Stripe API / restricted keys + /\bwhsec_[A-Za-z0-9]{10,}\b/g, // Stripe webhook secrets + /Bearer\s+[A-Za-z0-9._\-]{20,}/gi, // Bearer tokens + /\beyJ[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]{20,}\b/g, // JWTs + /\bAKIA[0-9A-Z]{16}\b/g, // AWS access key IDs + /\bgh[pso]_[A-Za-z0-9]{36,}\b/g, // GitHub tokens + /\bgithub_pat_[A-Za-z0-9_]{22,}\b/g, +] + +// URL credentials — preserves URL structure +const URL_CREDS_PATTERN = /:\/\/([^/\s]+):([^/\s]+)@/g + +export function scrubSecrets(text: string): string { + let result = text + for (const pattern of SECRET_PATTERNS) { + pattern.lastIndex = 0 + result = result.replace(pattern, REDACT_CENSOR) + } + URL_CREDS_PATTERN.lastIndex = 0 + result = result.replace(URL_CREDS_PATTERN, `://${REDACT_CENSOR}@`) + return result +} + +// Deep value scrubbing (safety net for logger serializers) + +function isNonNullObject(value: unknown): value is object { + return value != null && typeof value === 'object' +} + +export function isPlainObject(value: unknown): value is Record { + if (!isNonNullObject(value) || Array.isArray(value)) return false + const proto = Object.getPrototypeOf(value) + return proto === Object.prototype || proto === null +} + +const MAX_SCRUB_DEPTH = 6 +const MAX_SCRUB_NODES = 500 + +function summarizeBinary(type: string, byteLength: number): Record { + return { type, byte_length: byteLength, data_redacted: true } +} + +/** Recursively scrub secrets from an arbitrary value. */ +export function scrubValue( + value: unknown, + state: { nodes: number; seen: WeakSet } = { nodes: 0, seen: new WeakSet() }, + depth = 0 +): unknown { + if (typeof value === 'string') return scrubSecrets(value) + if (value == null || typeof value !== 'object') return value + + if (Buffer.isBuffer(value)) return summarizeBinary('Buffer', value.byteLength) + if (value instanceof ArrayBuffer) return summarizeBinary('ArrayBuffer', value.byteLength) + if (ArrayBuffer.isView(value)) return summarizeBinary(value.constructor.name, value.byteLength) + if (value instanceof Date) return value.toISOString() + if (value instanceof URL || value instanceof URLSearchParams) { + return scrubSecrets(value.toString()) + } + if (value instanceof Map) return { type: 'Map', size: value.size, data_redacted: true } + if (value instanceof Set) return { type: 'Set', size: value.size, data_redacted: true } + + if (state.seen.has(value)) return '[Circular]' + if (depth >= MAX_SCRUB_DEPTH) return '[Truncated: depth limit]' + state.nodes += 1 + if (state.nodes > MAX_SCRUB_NODES) return '[Truncated: node limit]' + state.seen.add(value) + + if (Array.isArray(value)) { + return value.map((item) => scrubValue(item, state, depth + 1)) + } + + if (!isPlainObject(value)) { + const json = (value as { toJSON?: () => unknown }).toJSON + if (typeof json === 'function') { + try { + return scrubValue(json.call(value), state, depth + 1) + } catch { + return `[${value.constructor?.name ?? 'Object'}]` + } + } + return `[${value.constructor?.name ?? 'Object'}]` + } + + const out: Record = {} + for (const [key, entry] of Object.entries(value)) { + out[key] = scrubValue(entry, state, depth + 1) + } + return out +} diff --git a/packages/logger/tsconfig.json b/packages/logger/tsconfig.json new file mode 100644 index 000000000..2481fe545 --- /dev/null +++ b/packages/logger/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*"], + "exclude": ["src/**/*.test.ts", "src/**/__tests__/**"] +} diff --git a/packages/openapi/package.json b/packages/openapi/package.json index 4d48f3986..f4cb2092c 100644 --- a/packages/openapi/package.json +++ b/packages/openapi/package.json @@ -12,6 +12,7 @@ }, "scripts": { "build": "tsc && cp -r oas dist/", + "lint": "eslint src/", "test": "vitest --passWithNoTests" }, "files": [ diff --git a/packages/protocol/package.json b/packages/protocol/package.json index b24974f9a..fca33bce9 100644 --- a/packages/protocol/package.json +++ b/packages/protocol/package.json @@ -17,6 +17,7 @@ }, "scripts": { "build": "tsc", + "lint": "eslint src/", "test": "vitest --passWithNoTests" }, "dependencies": { diff --git a/packages/protocol/src/async-iterable-utils.ts b/packages/protocol/src/async-iterable-utils.ts index d36037753..809e700e6 100644 --- a/packages/protocol/src/async-iterable-utils.ts +++ b/packages/protocol/src/async-iterable-utils.ts @@ -25,7 +25,7 @@ export function channel(): AsyncIterable & { if (pending.length > 0) { return Promise.resolve({ value: pending.shift()!, done: false }) } - if (done) return Promise.resolve({ value: undefined as any, done: true }) + if (done) return Promise.resolve({ value: undefined, done: true } as IteratorResult) return new Promise>((r) => { resolve = r }) @@ -48,7 +48,7 @@ export function channel(): AsyncIterable & { if (resolve) { const r = resolve resolve = null - r({ value: undefined as any, done: true }) + r({ value: undefined, done: true } as IteratorResult) } }, }) diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 465dd6f55..ae1f5416a 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -496,6 +496,7 @@ export type TeardownOutput = z.infer */ export interface Source< TConfig extends Record = Record, + // eslint-disable-next-line @typescript-eslint/no-unused-vars TStreamState = unknown, TInput = unknown, > { diff --git a/packages/source-stripe/package.json b/packages/source-stripe/package.json index e45c93dd6..c5de77b27 100644 --- a/packages/source-stripe/package.json +++ b/packages/source-stripe/package.json @@ -20,6 +20,7 @@ }, "scripts": { "build": "tsc", + "lint": "eslint src/", "test": "vitest" }, "files": [ @@ -27,6 +28,7 @@ "src" ], "dependencies": { + "@stripe/sync-logger": "workspace:*", "@stripe/sync-openapi": "workspace:*", "@stripe/sync-protocol": "workspace:*", "https-proxy-agent": "^7.0.6", diff --git a/packages/source-stripe/src/client.ts b/packages/source-stripe/src/client.ts index 655969243..3df31c0b7 100644 --- a/packages/source-stripe/src/client.ts +++ b/packages/source-stripe/src/client.ts @@ -8,9 +8,12 @@ import { type StripeWebhookEndpoint, } from '@stripe/sync-openapi' import { withHttpRetry } from './retry.js' +import { createConnectorLogger } from '@stripe/sync-logger' import { stripeEventSchema, type StripeEvent } from './spec.js' import { fetchWithProxy, parsePositiveInteger, type TransportEnv } from './transport.js' +const logger = createConnectorLogger('source-stripe') + export type StripeClientConfig = { api_key: string api_version: string @@ -32,6 +35,8 @@ export function makeClient(config: StripeClientConfig, env: TransportEnv = proce env.STRIPE_REQUEST_TIMEOUT_MS, 10_000 ) + const logRequests = env.STRIPE_LOG_REQUESTS === '1' + const headers: Record = { Authorization: `Bearer ${config.api_key}`, 'Content-Type': 'application/x-www-form-urlencoded', @@ -52,6 +57,11 @@ export function makeClient(config: StripeClientConfig, env: TransportEnv = proce body = encodeFormData(params) } + if (logRequests) { + logger.info({ method, path, apiVersion: config.api_version }, 'Stripe API request started') + } + + const start = Date.now() const response = await fetchWithProxy( url.toString(), { @@ -65,6 +75,20 @@ export function makeClient(config: StripeClientConfig, env: TransportEnv = proce const json = (await response.json()) as unknown + if (logRequests) { + logger.info( + { + method, + path, + status: response.status, + elapsed: Date.now() - start, + requestId: response.headers.get('request-id'), + apiVersion: config.api_version, + }, + 'Stripe API request completed' + ) + } + if (!response.ok) { throw new StripeApiRequestError(response.status, json, method, path) } diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index c71acd837..4c37d8b80 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -130,7 +130,7 @@ describe('StripeSource', () => { invoices: makeConfig({ order: 2, tableName: 'invoices' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const cat = (await collectFirst(source.discover({ config }), 'catalog')).catalog expect(cat.streams).toHaveLength(2) @@ -143,7 +143,7 @@ describe('StripeSource', () => { internal: makeConfig({ order: 2, tableName: 'internal', sync: false }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const cat = (await collectFirst(source.discover({ config }), 'catalog')).catalog expect(cat.streams).toHaveLength(1) @@ -151,7 +151,7 @@ describe('StripeSource', () => { }) it('returns empty streams for empty registry', async () => { - vi.mocked(buildResourceRegistry).mockReturnValue({} as any) + vi.mocked(buildResourceRegistry).mockReturnValue({}) const cat = (await collectFirst(source.discover({ config }), 'catalog')).catalog expect(cat.streams).toEqual([]) @@ -184,7 +184,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) @@ -257,7 +257,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, @@ -333,7 +333,7 @@ describe('StripeSource', () => { global: {}, } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, @@ -365,7 +365,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) @@ -528,7 +528,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) @@ -559,7 +559,7 @@ describe('StripeSource', () => { }) it('emits TraceMessage error with failure_type config_error for unknown stream', async () => { - vi.mocked(buildResourceRegistry).mockReturnValue({} as any) + vi.mocked(buildResourceRegistry).mockReturnValue({}) const messages = await collect( source.read({ config, @@ -594,7 +594,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) @@ -623,21 +623,19 @@ describe('StripeSource', () => { } const mockClient = { - getAccount: vi - .fn() - .mockRejectedValueOnce( - new StripeRequestError( - 401, - { - error: { - type: 'invalid_request_error', - message: 'Invalid API Key provided: sk_test_bad', - }, + getAccount: vi.fn().mockRejectedValueOnce( + new StripeRequestError( + 401, + { + error: { + type: 'invalid_request_error', + message: 'Invalid API Key provided: sk_test_bad', }, - 'GET', - '/v1/account' - ) - ), + }, + 'GET', + '/v1/account' + ) + ), } as unknown as StripeClient const messages = await collect( @@ -675,21 +673,19 @@ describe('StripeSource', () => { }) it('emits TraceMessage error for Invalid API Key on sequential streams', async () => { - const listFn = vi - .fn() - .mockRejectedValueOnce( - new StripeRequestError( - 401, - { - error: { - type: 'invalid_request_error', - message: 'Invalid API Key provided: sk_test_bad', - }, + const listFn = vi.fn().mockRejectedValueOnce( + new StripeRequestError( + 401, + { + error: { + type: 'invalid_request_error', + message: 'Invalid API Key provided: sk_test_bad', }, - 'GET', - '/v1/tax_ids' - ) + }, + 'GET', + '/v1/tax_ids' ) + ) const registry: Record = { tax_ids: makeConfig({ @@ -700,7 +696,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, catalog: catalog({ name: 'tax_ids', primary_key: [['id']] }) }) ) @@ -732,7 +728,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) @@ -763,7 +759,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, catalog: catalog({ name: 'invoices', primary_key: [['id']] }) }) ) @@ -805,7 +801,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, @@ -864,7 +860,7 @@ describe('StripeSource', () => { beforeEach(() => { listFn.mockReset() - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) }) it('backfill only: no input, no state → paginates from beginning', async () => { @@ -902,7 +898,7 @@ describe('StripeSource', () => { }) it('stream via webhook (input): single event → record + state, no pagination', async () => { - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_wh_1', type: 'customer.updated', @@ -938,7 +934,7 @@ describe('StripeSource', () => { it('stream via websocket (input): same code path as webhook', async () => { // WebSocket is a transport concern — the StripeEvent is identical. // read() with input= behaves the same regardless of transport. - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_ws_1', type: 'customer.created', @@ -967,7 +963,7 @@ describe('StripeSource', () => { }) it('stream via input: filters out events for streams not in catalog', async () => { - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_other', type: 'invoice.paid', @@ -995,7 +991,7 @@ describe('StripeSource', () => { has_more: false, }) - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, @@ -1026,7 +1022,7 @@ describe('StripeSource', () => { has_more: false, }) - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, @@ -1059,7 +1055,7 @@ describe('StripeSource', () => { customers: makeConfig({ order: 1, tableName: 'customers' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_del_1', type: 'customer.deleted', @@ -1090,7 +1086,7 @@ describe('StripeSource', () => { products: makeConfig({ order: 1, tableName: 'products' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) // product.deleted event — the object may not have deleted: true in its body const event = makeEvent({ id: 'evt_del_2', @@ -1115,7 +1111,7 @@ describe('StripeSource', () => { subscriptions: makeConfig({ order: 1, tableName: 'subscriptions' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_sub_1', type: 'customer.subscription.updated', @@ -1162,7 +1158,7 @@ describe('StripeSource', () => { active_entitlements: makeConfig({ order: 1, tableName: 'active_entitlements' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_ent_1', type: 'entitlements.active_entitlement_summary.updated', @@ -1245,7 +1241,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_reval_1', type: 'customer.subscription.updated', @@ -1280,7 +1276,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_reval_2', type: 'customer.subscription.deleted', @@ -1309,7 +1305,7 @@ describe('StripeSource', () => { invoices: makeConfig({ order: 1, tableName: 'invoices' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_preview_1', type: 'invoice.upcoming', @@ -1328,7 +1324,7 @@ describe('StripeSource', () => { checkout_sessions: makeConfig({ order: 1, tableName: 'checkout_sessions' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const event = makeEvent({ id: 'evt_cs_1', type: 'checkout.session.completed', @@ -1352,7 +1348,7 @@ describe('StripeSource', () => { customers: makeConfig({ order: 1, tableName: 'customers' }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const rawInput = { body: '{"id":"evt_1"}', signature: 'sig_test' } await expect( @@ -1393,7 +1389,7 @@ describe('StripeSource', () => { } beforeEach(() => { - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) }) afterEach(() => { @@ -1428,7 +1424,7 @@ describe('StripeSource', () => { }) it("read()'s finally block closes WebSocket client", async () => { - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const iter = source .read({ @@ -1449,7 +1445,7 @@ describe('StripeSource', () => { }) it('streams WebSocket events after empty backfill', async () => { - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) // No setup() needed — WebSocket client is created inside read() const iter = source @@ -1526,7 +1522,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(wsRegistry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(wsRegistry) // No setup() needed — WebSocket client is created inside read() const iter = source @@ -1619,7 +1615,7 @@ describe('StripeSource', () => { }) it('filters out WebSocket events for streams not in catalog', async () => { - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) // No setup() needed — WebSocket client is created inside read() const iter = source @@ -1668,7 +1664,7 @@ describe('StripeSource', () => { it('read() with websocket: true creates WebSocket client (combined config)', async () => { const { createStripeWebSocketClient } = await import('./src-websocket.js') - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) vi.mocked(createStripeWebSocketClient).mockClear() @@ -1690,7 +1686,7 @@ describe('StripeSource', () => { }) it('teardown() is safe when no websocket was configured', async () => { - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) // No setup() call — teardown should not throw await drain( source.teardown!({ @@ -1707,7 +1703,7 @@ describe('StripeSource', () => { const registry: Record = { customers: makeConfig({ order: 1, tableName: 'customers', listFn }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const cat = catalog({ name: 'customers' }) // Use port 0 so the OS picks a free port @@ -1753,7 +1749,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config: { ...config, poll_events: true }, @@ -1785,7 +1781,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const now = Math.floor(Date.now() / 1000) const messages = await collect( source.read({ @@ -1815,7 +1811,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config, // no poll_events @@ -1855,7 +1851,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) const messages = await collect( source.read({ config: { ...config, poll_events: true }, @@ -2219,7 +2215,7 @@ describe('StripeSource', () => { }), } - vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) + vi.mocked(buildResourceRegistry).mockReturnValue(registry) await collect( customSource.read({ diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 0efdcf53b..7aa998863 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -1,7 +1,5 @@ import type { CatalogPayload, - ConfiguredCatalog, - Message, Source, SpecOutput, CheckOutput, @@ -10,8 +8,7 @@ import type { TeardownOutput, } from '@stripe/sync-protocol' import { sourceControlMsg } from '@stripe/sync-protocol' -import { z } from 'zod' -import defaultSpec, { configSchema } from './spec.js' +import defaultSpec from './spec.js' import type { Config } from './spec.js' import type { StripeEvent } from './spec.js' import { buildResourceRegistry } from './resourceRegistry.js' @@ -28,7 +25,6 @@ import { listApiBackfill } from './src-list-api.js' import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' import { createStripeWebSocketClient } from './src-websocket.js' -import type { ResourceConfig } from './types.js' import { makeClient } from './client.js' import type { RateLimiter } from './rate-limiter.js' import { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js' @@ -105,10 +101,13 @@ export function createStripeSource( type: 'connection_status' as const, connection_status: { status: 'succeeded' as const }, } - } catch (err: any) { + } catch (err: unknown) { yield { type: 'connection_status' as const, - connection_status: { status: 'failed' as const, message: err.message }, + connection_status: { + status: 'failed' as const, + message: err instanceof Error ? err.message : String(err), + }, } } }, @@ -150,7 +149,7 @@ export function createStripeSource( yield { type: 'catalog' as const, catalog } }, - async *setup({ config, catalog }): AsyncGenerator { + async *setup({ config, catalog: _catalog }): AsyncGenerator { const updates: Partial = {} const client = makeClient({ ...config, diff --git a/packages/source-stripe/src/retry.ts b/packages/source-stripe/src/retry.ts index 3cd65341c..ed7fa7264 100644 --- a/packages/source-stripe/src/retry.ts +++ b/packages/source-stripe/src/retry.ts @@ -1,3 +1,7 @@ +import { createConnectorLogger } from '@stripe/sync-logger' + +const logger = createConnectorLogger('source-stripe') + const BACKOFF_BASE_MS = 1000 const BACKOFF_MAX_MS = 32000 const MAX_RETRIES = 5 @@ -102,8 +106,9 @@ export async function withHttpRetry( const status = getHttpErrorStatus(err) const errName = err instanceof Error ? err.name : 'UnknownError' - console.error( - `[source-stripe] retry attempt=${attempt + 1}/${maxRetries} delay=${delayMs}ms status=${status ?? 'n/a'} error=${errName}` + logger.warn( + { attempt: attempt + 1, maxRetries, delayMs, status: status ?? 'n/a', error: errName }, + 'Retrying after transient error' ) await sleep(delayMs) diff --git a/packages/source-stripe/src/src-list-api.ts b/packages/source-stripe/src/src-list-api.ts index dace3b91e..1da3be9fb 100644 --- a/packages/source-stripe/src/src-list-api.ts +++ b/packages/source-stripe/src/src-list-api.ts @@ -1,11 +1,14 @@ import type { Message, TraceMessage } from '@stripe/sync-protocol' import { toRecordMessage, stateMsg } from '@stripe/sync-protocol' +import { createConnectorLogger } from '@stripe/sync-logger' import type { ResourceConfig } from './types.js' import type { SegmentState, BackfillState } from './index.js' import type { RateLimiter } from './rate-limiter.js' import { StripeApiRequestError } from '@stripe/sync-openapi' import type { StripeClient } from './client.js' +const logger = createConnectorLogger('source-stripe') + // Errors matching these patterns are silently skipped during backfill. // The stream is marked complete without yielding records. // NOTE: these are band-aids — the underlying issue is that the OpenAPI spec @@ -520,11 +523,7 @@ export async function* listApiBackfill(opts: { } satisfies TraceMessage continue } - console.error({ - msg: 'Stripe list page failed', - stream: stream.name, - error: err instanceof Error ? err.message : String(err), - }) + logger.error({ err, stream: stream.name }, 'Stripe list page failed') const isRateLimit = err instanceof Error && err.message.includes('Rate limit') const isAuthError = err instanceof StripeApiRequestError && (err.status === 401 || err.status === 403) diff --git a/packages/source-stripe/src/transport.test.ts b/packages/source-stripe/src/transport.test.ts index 06f87a1c5..c53d4ead5 100644 --- a/packages/source-stripe/src/transport.test.ts +++ b/packages/source-stripe/src/transport.test.ts @@ -113,7 +113,7 @@ describe('fetchWithProxy', () => { expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] - expect((init as any)?.dispatcher).toBeUndefined() + expect((init as Record)?.dispatcher).toBeUndefined() }) it('calls fetch with a proxy dispatcher when HTTPS_PROXY is set', async () => { @@ -130,7 +130,7 @@ describe('fetchWithProxy', () => { expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] - expect((init as any).dispatcher).toBeDefined() + expect((init as Record).dispatcher).toBeDefined() }) it('bypasses proxy for localhost even when HTTPS_PROXY is set', async () => { @@ -147,7 +147,7 @@ describe('fetchWithProxy', () => { expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] - expect((init as any)?.dispatcher).toBeUndefined() + expect((init as Record)?.dispatcher).toBeUndefined() }) it('bypasses proxy for NO_PROXY domains', async () => { @@ -165,7 +165,7 @@ describe('fetchWithProxy', () => { expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] - expect((init as any)?.dispatcher).toBeUndefined() + expect((init as Record)?.dispatcher).toBeUndefined() }) }) diff --git a/packages/ts-cli/package.json b/packages/ts-cli/package.json index 3e16c3935..6ba4e2e0f 100644 --- a/packages/ts-cli/package.json +++ b/packages/ts-cli/package.json @@ -27,6 +27,7 @@ }, "scripts": { "build": "tsc", + "lint": "eslint src/", "test": "vitest run" }, "dependencies": { diff --git a/packages/util-postgres/package.json b/packages/util-postgres/package.json index cf96eaa1e..5109e1b68 100644 --- a/packages/util-postgres/package.json +++ b/packages/util-postgres/package.json @@ -12,6 +12,7 @@ }, "scripts": { "build": "tsc", + "lint": "eslint src/", "test": "vitest run" }, "dependencies": { diff --git a/packages/util-postgres/src/queryLogging.ts b/packages/util-postgres/src/queryLogging.ts index 23bef0724..b1c5166be 100644 --- a/packages/util-postgres/src/queryLogging.ts +++ b/packages/util-postgres/src/queryLogging.ts @@ -1,11 +1,16 @@ import type pg from 'pg' -const verbose = !!process.env.DANGEROUSLY_VERBOSE_LOGGING +const verbose = process.env.DANGEROUSLY_VERBOSE_LOGGING === 'true' + +function writeStderr(level: string, obj: Record, msg: string) { + const entry = JSON.stringify({ level, name: 'util-postgres', msg, ...obj, time: Date.now() }) + process.stderr.write(entry + '\n') +} /** * Wrap a pg.Pool so every query is logged to stderr when * DANGEROUSLY_VERBOSE_LOGGING is enabled. - * Format: [pg] ms | rows= | + * Format: structured log with duration, row count, and truncated SQL preview. */ export function withQueryLogging(pool: T): T { if (!verbose) return pool @@ -22,16 +27,31 @@ export function withQueryLogging(pool: T): T { // eslint-disable-next-line @typescript-eslint/no-explicit-any ;(pool as any).query = async function (...args: unknown[]) { const sql = extractSql(args) - const label = sql?.replace(/\s+/g, ' ').slice(0, 300) ?? '(unknown)' + const sql_preview = sql?.replace(/\s+/g, ' ').slice(0, 300) ?? '(unknown)' const start = Date.now() try { // eslint-disable-next-line @typescript-eslint/no-explicit-any const result = await (origQuery as any)(...args) - console.error(`[pg] ${Date.now() - start}ms | rows=${result?.rowCount ?? 0} | ${label}`) + writeStderr( + 'info', + { + duration_ms: Date.now() - start, + row_count: result?.rowCount ?? 0, + sql_preview, + }, + 'Postgres query' + ) return result } catch (err) { - const msg = err instanceof Error ? err.message : String(err) - console.error(`[pg] ${Date.now() - start}ms | ERROR ${msg} | ${label}`) + writeStderr( + 'error', + { + duration_ms: Date.now() - start, + sql_preview, + err: err instanceof Error ? { type: err.constructor.name, message: err.message } : err, + }, + 'Postgres query failed' + ) throw err } } diff --git a/packages/util-postgres/src/upsert.test.ts b/packages/util-postgres/src/upsert.test.ts index b62c9b566..7bfffed4d 100644 --- a/packages/util-postgres/src/upsert.test.ts +++ b/packages/util-postgres/src/upsert.test.ts @@ -448,8 +448,10 @@ describe('batch multi-row', () => { const r = await rows(table) expect(r).toHaveLength(3) - expect(r.find((x: any) => x.id === '2').name).toBe('Bob Updated') - expect(r.find((x: any) => x.id === '3').name).toBe('Charlie') + const bob = r.find((x: Record) => x.id === '2')! + expect(bob.name).toBe('Bob Updated') + const charlie = r.find((x: Record) => x.id === '3')! + expect(charlie.name).toBe('Charlie') }) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 959ea3f10..e8eb60cea 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -164,6 +164,9 @@ importers: '@stripe/sync-integration-supabase': specifier: workspace:* version: link:../supabase + '@stripe/sync-logger': + specifier: workspace:* + version: link:../../packages/logger '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol @@ -246,6 +249,9 @@ importers: '@stripe/sync-hono-zod-openapi': specifier: workspace:* version: link:../../packages/hono-zod-openapi + '@stripe/sync-logger': + specifier: workspace:* + version: link:../../packages/logger '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol @@ -322,6 +328,9 @@ importers: '@stripe/sync-engine': specifier: workspace:* version: link:../engine + '@stripe/sync-logger': + specifier: workspace:* + version: link:../../packages/logger '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol @@ -482,6 +491,9 @@ importers: packages/destination-postgres: dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger '@stripe/sync-protocol': specifier: workspace:* version: link:../protocol @@ -530,6 +542,22 @@ importers: specifier: ^3.2 version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/logger: + dependencies: + pino: + specifier: ^10 + version: 10.1.0 + devDependencies: + '@types/node': + specifier: ^24.5.0 + version: 24.10.1 + pino-pretty: + specifier: ^13 + version: 13.1.3 + vitest: + specifier: ^3.2.1 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/openapi: dependencies: zod: @@ -561,6 +589,9 @@ importers: packages/source-stripe: dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger '@stripe/sync-openapi': specifier: workspace:* version: link:../openapi @@ -2618,6 +2649,9 @@ packages: '@stripe/sync-integration-supabase@file:apps/supabase': resolution: {directory: apps/supabase, type: directory} + '@stripe/sync-logger@file:packages/logger': + resolution: {directory: packages/logger, type: directory} + '@stripe/sync-openapi@file:packages/openapi': resolution: {directory: packages/openapi, type: directory} @@ -7201,6 +7235,7 @@ snapshots: '@stripe/sync-destination-postgres@file:packages/destination-postgres(@aws-sdk/client-sts@3.1013.0)(@aws-sdk/rds-signer@3.1013.0)': dependencies: + '@stripe/sync-logger': file:packages/logger '@stripe/sync-protocol': file:packages/protocol '@stripe/sync-util-postgres': file:packages/util-postgres pg: 8.16.3 @@ -7219,6 +7254,7 @@ snapshots: '@stripe/sync-destination-postgres': file:packages/destination-postgres(@aws-sdk/client-sts@3.1013.0)(@aws-sdk/rds-signer@3.1013.0) '@stripe/sync-hono-zod-openapi': file:packages/hono-zod-openapi '@stripe/sync-integration-supabase': file:apps/supabase(@aws-sdk/client-sts@3.1013.0)(@aws-sdk/rds-signer@3.1013.0) + '@stripe/sync-logger': file:packages/logger '@stripe/sync-protocol': file:packages/protocol '@stripe/sync-source-stripe': file:packages/source-stripe '@stripe/sync-state-postgres': file:packages/state-postgres @@ -7255,6 +7291,7 @@ snapshots: dependencies: '@stripe/sync-destination-postgres': file:packages/destination-postgres(@aws-sdk/client-sts@3.1013.0)(@aws-sdk/rds-signer@3.1013.0) '@stripe/sync-engine': file:apps/engine(@aws-sdk/client-sts@3.1013.0)(@aws-sdk/rds-signer@3.1013.0) + '@stripe/sync-logger': file:packages/logger '@stripe/sync-protocol': file:packages/protocol '@stripe/sync-source-stripe': file:packages/source-stripe '@stripe/sync-state-postgres': file:packages/state-postgres @@ -7269,6 +7306,10 @@ snapshots: - supports-color - utf-8-validate + '@stripe/sync-logger@file:packages/logger': + dependencies: + pino: 10.1.0 + '@stripe/sync-openapi@file:packages/openapi': dependencies: zod: 4.3.6 @@ -7286,6 +7327,7 @@ snapshots: '@stripe/sync-destination-postgres': file:packages/destination-postgres(@aws-sdk/client-sts@3.1013.0)(@aws-sdk/rds-signer@3.1013.0) '@stripe/sync-engine': file:apps/engine(@aws-sdk/client-sts@3.1013.0)(@aws-sdk/rds-signer@3.1013.0) '@stripe/sync-hono-zod-openapi': file:packages/hono-zod-openapi + '@stripe/sync-logger': file:packages/logger '@stripe/sync-protocol': file:packages/protocol '@stripe/sync-source-stripe': file:packages/source-stripe '@stripe/sync-ts-cli': file:packages/ts-cli @@ -7317,6 +7359,7 @@ snapshots: '@stripe/sync-source-stripe@file:packages/source-stripe': dependencies: + '@stripe/sync-logger': file:packages/logger '@stripe/sync-openapi': file:packages/openapi '@stripe/sync-protocol': file:packages/protocol https-proxy-agent: 7.0.6(supports-color@10.2.2)