diff --git a/apps/service/src/__tests__/workflow.test.ts b/apps/service/src/__tests__/workflow.test.ts index cde8e94e2..e503ceda4 100644 --- a/apps/service/src/__tests__/workflow.test.ts +++ b/apps/service/src/__tests__/workflow.test.ts @@ -4,28 +4,26 @@ import { Worker } from '@temporalio/worker' import path from 'node:path' import type { SyncActivities } from '../temporal/activities/index.js' import type { RunResult } from '../temporal/activities/index.js' -import { CONTINUE_AS_NEW_THRESHOLD } from '../lib/utils.js' type SourceInput = unknown -// Point directly at the workflow index to avoid resolving the legacy dist/temporal/workflows.js file. const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows/index.js') const emptyState = { streams: {}, global: {} } const noErrors: RunResult = { errors: [], state: emptyState } +const noErrorsComplete: RunResult = { errors: [], state: emptyState, eof: { reason: 'complete' } } const permanentSyncError: RunResult = { errors: [{ message: 'permanent sync failure', failure_type: 'auth_error', stream: 'customers' }], state: emptyState, } -// Workflows now receive only the pipelineId string const testPipelineId = 'test_pipe' function stubActivities(overrides: Partial = {}): SyncActivities { const activities = { discoverCatalog: async () => ({ streams: [] }), pipelineSetup: async () => {}, - pipelineSync: async () => noErrors, + pipelineSync: async () => noErrorsComplete, pipelineTeardown: async () => {}, updatePipelineStatus: async () => {}, ...overrides, @@ -39,7 +37,6 @@ function stubActivities(overrides: Partial = {}): SyncActivities } as SyncActivities } -/** Signal the workflow to delete. */ async function signalDelete(handle: { signal: (name: string, arg: string) => Promise }) { await handle.signal('desired_status', 'deleted') } @@ -76,7 +73,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { }, pipelineSync: async () => { runCallCount++ - return noErrors + return noErrorsComplete }, }), }) @@ -88,7 +85,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { taskQueue: 'test-queue-1', }) - // Let it sync several reconciliation pages await new Promise((r) => setTimeout(r, 2000)) const status = await handle.query('status') @@ -112,7 +108,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { activities: stubActivities({ pipelineSync: async (pipelineId: string, opts?) => { syncCalls.push({ pipelineId, input: opts?.input ?? undefined }) - return noErrors + if (opts?.input) return noErrors + return noErrorsComplete }, }), }) @@ -124,10 +121,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { taskQueue: 'test-queue-2', }) - // Let reconciliation start await new Promise((r) => setTimeout(r, 1500)) - // Send events await signalSourceInput(handle, { id: 'evt_1', type: 'customer.created', @@ -141,7 +136,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { await signalDelete(handle) await handle.result() - // Find event-bearing sync calls (input is defined) const eventCalls = syncCalls.filter((c) => c.input) expect(eventCalls.length).toBeGreaterThanOrEqual(1) @@ -153,7 +147,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { ]) ) - // All calls should use the test pipeline ID for (const call of syncCalls) { expect(call.pipelineId).toBe(testPipelineId) } @@ -183,7 +176,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { if (inputInFlight > 0) overlapped = true await new Promise((r) => setTimeout(r, 250)) backfillInFlight-- - return noErrors + return noErrorsComplete }, }), }) @@ -213,6 +206,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { let liveStartsWhileBackfill = 0 let liveBatchCount = 0 let liveEventCount = 0 + let backfillCalls = 0 const worker = await Worker.create({ connection: testEnv.nativeConnection, @@ -228,10 +222,11 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { return noErrors } + backfillCalls++ backfillInFlight++ try { await new Promise((r) => setTimeout(r, 600)) - return noErrors + return noErrorsComplete } finally { backfillInFlight-- } @@ -254,7 +249,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { }) } - await new Promise((r) => setTimeout(r, 350)) + await new Promise((r) => setTimeout(r, 1500)) await signalDelete(handle) await handle.result() @@ -299,7 +294,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { it('reports phase-driven status transitions through teardown', async () => { const statusWrites: string[] = [] - let reconcileCalls = 0 const worker = await Worker.create({ connection: testEnv.nativeConnection, @@ -311,9 +305,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { }, pipelineSync: async (_pipelineId: string, opts?) => { if (opts?.input) return noErrors - - reconcileCalls++ - return reconcileCalls === 1 ? { ...noErrors, eof: { reason: 'complete' } } : noErrors + return noErrorsComplete }, }), }) @@ -391,7 +383,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { throw new Error('transient sync failure') } - return { ...noErrors, eof: { reason: 'complete' as const } } + return noErrorsComplete }, }), }) @@ -413,6 +405,54 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { }) }) + it('continues past returned transient errors without entering error state', async () => { + const statusWrites: string[] = [] + let reconcileCalls = 0 + + const transientSyncError: RunResult = { + errors: [ + { message: 'transient sync failure', failure_type: 'transient_error', stream: 'customers' }, + ], + state: emptyState, + } + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue: 'test-queue-3b-transient', + workflowsPath, + activities: stubActivities({ + updatePipelineStatus: async (_id: string, status: string) => { + statusWrites.push(status) + }, + pipelineSync: async (_pipelineId: string, opts?) => { + if (opts?.input) return noErrors + + reconcileCalls++ + if (reconcileCalls === 1) { + return { ...transientSyncError, eof: { reason: 'complete' as const } } + } + return noErrorsComplete + }, + }), + }) + + await worker.runUntil(async () => { + const handle = await testEnv.client.workflow.start('pipelineWorkflow', { + args: [testPipelineId], + workflowId: 'test-sync-3b-transient', + taskQueue: 'test-queue-3b-transient', + }) + + await new Promise((r) => setTimeout(r, 2000)) + await signalDelete(handle) + await handle.result() + + expect(reconcileCalls).toBeGreaterThanOrEqual(1) + expect(statusWrites).toContain('ready') + expect(statusWrites).not.toContain('error') + }) + }) + it('queues live events while paused and drains them after resume', async () => { const syncCalls: { input?: SourceInput[] }[] = [] @@ -424,7 +464,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { pipelineSync: async (_pipelineId: string, opts?) => { syncCalls.push({ input: opts?.input ?? undefined }) await new Promise((r) => setTimeout(r, 50)) - return noErrors + if (opts?.input) return noErrors + return noErrorsComplete }, }), }) @@ -469,10 +510,10 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { taskQueue: 'test-queue-4', workflowsPath, activities: stubActivities({ - pipelineSync: async () => { - // Slow sync so delete arrives mid-reconciliation + pipelineSync: async (_pipelineId: string, opts?) => { + if (opts?.input) return noErrors await new Promise((r) => setTimeout(r, 500)) - return noErrors + return noErrorsComplete }, pipelineTeardown: async (): Promise => { teardownCalled = true @@ -507,6 +548,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { return { errors: [], state: { streams: { customers: { cursor: `cus_${syncCallCount}` } }, global: {} }, + eof: { reason: 'complete' as const }, } }, }), @@ -546,9 +588,9 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { }, pipelineSync: async () => { syncCallCount++ - if (syncCallCount > CONTINUE_AS_NEW_THRESHOLD) crossedThresholdResolve?.() + if (syncCallCount > PIPELINE_CONTINUE_AS_NEW_THRESHOLD) crossedThresholdResolve?.() await new Promise((r) => setTimeout(r, 1)) - return noErrors + return noErrorsComplete }, }), }) @@ -564,7 +606,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => { await signalDelete(handle) await handle.result() - expect(syncCallCount).toBeGreaterThan(CONTINUE_AS_NEW_THRESHOLD) + expect(syncCallCount).toBeGreaterThan(PIPELINE_CONTINUE_AS_NEW_THRESHOLD) expect(setupCalls).toBe(1) }) }) diff --git a/apps/service/src/temporal/activities/pipeline-sync.ts b/apps/service/src/temporal/activities/pipeline-sync.ts index 51c6aec05..3cb0abf05 100644 --- a/apps/service/src/temporal/activities/pipeline-sync.ts +++ b/apps/service/src/temporal/activities/pipeline-sync.ts @@ -1,9 +1,7 @@ -import { ApplicationFailure } from '@temporalio/activity' import type { SourceInputMessage, SourceReadOptions } from '@stripe/sync-engine' import type { EofPayload } from '@stripe/sync-protocol' import type { ActivitiesContext } from './_shared.js' import { asIterable, drainMessages, type RunResult } from './_shared.js' -import { classifySyncErrors, summarizeSyncErrors } from '../sync-errors.js' export function createPipelineSyncActivity(context: ActivitiesContext) { return async function pipelineSync( @@ -18,7 +16,6 @@ export function createPipelineSyncActivity(context: ActivitiesContext) { context.engine.pipeline_sync(config, readOpts, input), readOpts.state ) - // Full replacement — connector emits the complete updated config if (sourceConfig) { const type = pipeline.source.type await context.pipelineStore.update(pipelineId, { @@ -31,18 +28,6 @@ export function createPipelineSyncActivity(context: ActivitiesContext) { destination: { type, [type]: destConfig }, }) } - const { transient, permanent } = classifySyncErrors(errors) - if (permanent.length > 0) { - if (transient.length > 0) { - console.warn( - `Transient errors suppressed by permanent failures: ${summarizeSyncErrors(transient)}` - ) - } - return { errors, state, eof } - } - if (transient.length > 0) { - throw ApplicationFailure.retryable(summarizeSyncErrors(transient), 'TransientSyncError') - } return { errors, state, eof } } } diff --git a/apps/service/src/temporal/sync-errors.ts b/apps/service/src/temporal/sync-errors.ts index 7f9a74fe3..c2a996c45 100644 --- a/apps/service/src/temporal/sync-errors.ts +++ b/apps/service/src/temporal/sync-errors.ts @@ -9,7 +9,7 @@ export type ClassifiedSyncErrors = { permanent: SyncRunError[] } -const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error']) +const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error', 'system_error']) export function classifySyncErrors(errors: SyncRunError[]): ClassifiedSyncErrors { const transient: SyncRunError[] = [] diff --git a/apps/service/src/temporal/workflows/_shared.ts b/apps/service/src/temporal/workflows/_shared.ts index 9f7930d44..341043a8c 100644 --- a/apps/service/src/temporal/workflows/_shared.ts +++ b/apps/service/src/temporal/workflows/_shared.ts @@ -9,6 +9,11 @@ export const sourceInputSignal = defineSignal<[SourceInputMessage]>('source_inpu /** Carries the new desired_status value — workflow updates its local state directly. */ export const desiredStatusSignal = defineSignal<[DesiredStatus]>('desired_status') +/** Signals that notify the workflow about external changes, enabling targeted error recovery. */ +export const credentialsUpdatedSignal = defineSignal('credentials_updated') +export const configUpdatedSignal = defineSignal('config_updated') +export const deploymentUpdatedSignal = defineSignal('deployment_updated') + export const { pipelineSetup, pipelineTeardown } = proxyActivities({ startToCloseTimeout: '2m', retry: retryPolicy, diff --git a/apps/service/src/temporal/workflows/index.ts b/apps/service/src/temporal/workflows/index.ts index d7ba31759..8c42822b1 100644 --- a/apps/service/src/temporal/workflows/index.ts +++ b/apps/service/src/temporal/workflows/index.ts @@ -1 +1,2 @@ export { pipelineWorkflow } from './pipeline-workflow.js' +export { pipelineBackfillWorkflow } from './pipeline-backfill-workflow.js' diff --git a/apps/service/src/temporal/workflows/pipeline-backfill-workflow.ts b/apps/service/src/temporal/workflows/pipeline-backfill-workflow.ts new file mode 100644 index 000000000..f12c53cf8 --- /dev/null +++ b/apps/service/src/temporal/workflows/pipeline-backfill-workflow.ts @@ -0,0 +1,52 @@ +import { ApplicationFailure, continueAsNew } from '@temporalio/workflow' + +import type { SourceState } from '@stripe/sync-protocol' +import { classifySyncErrors, summarizeSyncErrors, type SyncRunError } from '../sync-errors.js' +import { pipelineSync } from './_shared.js' + +const BACKFILL_CONTINUE_AS_NEW_THRESHOLD = 500 + +export async function pipelineBackfillWorkflow( + pipelineId: string, + opts: { state: SourceState; accumulatedErrors?: SyncRunError[] } +): Promise { + let sourceState = opts.state + let operationCount = 0 + const accumulatedErrors: SyncRunError[] = opts.accumulatedErrors + ? [...opts.accumulatedErrors] + : [] + + while (true) { + const result = await pipelineSync(pipelineId, { + state: sourceState, + state_limit: 100, + time_limit: 10, + }) + operationCount++ + sourceState = result.state + + for (const err of result.errors) { + accumulatedErrors.push(err) + } + + if (result.eof?.reason === 'complete') { + const { permanent } = classifySyncErrors(accumulatedErrors) + if (permanent.length > 0) { + throw ApplicationFailure.nonRetryable( + summarizeSyncErrors(permanent), + 'BackfillPermanentError', + sourceState + ) + } + return sourceState + } + + if (operationCount >= BACKFILL_CONTINUE_AS_NEW_THRESHOLD) { + const { permanent } = classifySyncErrors(accumulatedErrors) + await continueAsNew(pipelineId, { + state: sourceState, + accumulatedErrors: permanent, + }) + } + } +} diff --git a/apps/service/src/temporal/workflows/pipeline-workflow.ts b/apps/service/src/temporal/workflows/pipeline-workflow.ts index 7d7d9e435..343566325 100644 --- a/apps/service/src/temporal/workflows/pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/pipeline-workflow.ts @@ -1,20 +1,32 @@ -import { condition, continueAsNew, setHandler } from '@temporalio/workflow' +import { + condition, + continueAsNew, + startChild, + getExternalWorkflowHandle, + setHandler, + ChildWorkflowFailure, + ApplicationFailure, +} from '@temporalio/workflow' import type { SourceInputMessage, SourceState } from '@stripe/sync-protocol' import type { DesiredStatus, PipelineStatus } from '../../lib/createSchemas.js' -import { CONTINUE_AS_NEW_THRESHOLD } from '../../lib/utils.js' import { classifySyncErrors } from '../sync-errors.js' import { desiredStatusSignal, + credentialsUpdatedSignal, + configUpdatedSignal, + deploymentUpdatedSignal, pipelineSetup, sourceInputSignal, pipelineSync, pipelineTeardown, updatePipelineStatus, } from './_shared.js' +import { pipelineBackfillWorkflow } from './pipeline-backfill-workflow.js' const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 const LIVE_EVENT_BATCH_SIZE = 10 +const PIPELINE_CONTINUE_AS_NEW_THRESHOLD = 1000 export type ReconcileState = 'backfilling' | 'reconciling' | 'ready' export type SetupState = 'started' | 'completed' @@ -36,18 +48,47 @@ export interface PipelineWorkflowOpts { errorRecoveryRequested?: boolean } +/** + * Extract the child's latest sourceState from a BackfillPermanentError. + * The child encodes it as the first detail in ApplicationFailure.nonRetryable. + */ +function extractStateFromChildFailure(err: unknown): SourceState | undefined { + if (err instanceof ChildWorkflowFailure && err.cause instanceof ApplicationFailure) { + const detail = err.cause.details?.[0] + if (detail && typeof detail === 'object' && 'streams' in detail) { + return detail as SourceState + } + } + return undefined +} + +/** + * Reset any streams with status 'errored' back to 'pending' so they are + * retried on the next backfill run after recovery. + */ +function resetErroredStreams(state: SourceState): SourceState { + const streams: Record = {} + for (const [name, data] of Object.entries(state.streams)) { + const streamData = data as Record | undefined + if (streamData?.status === 'errored') { + streams[name] = { ...streamData, status: 'pending' } + } else { + streams[name] = data + } + } + return { ...state, streams } +} + export async function pipelineWorkflow( pipelineId: string, opts?: PipelineWorkflowOpts ): Promise { - // Persisted through continue-as-new. const inputQueue: SourceInputMessage[] = opts?.inputQueue ? [...opts.inputQueue] : [] let desiredStatus: DesiredStatus = opts?.desiredStatus ?? 'active' let sourceState: SourceState = opts?.sourceState ?? { streams: {}, global: {} } let state: PipelineWorkflowState = { ...opts?.state } let errorRecoveryRequested = opts?.errorRecoveryRequested ?? false - // Transient workflow-local state. let operationCount = 0 setHandler(sourceInputSignal, (event: SourceInputMessage) => { @@ -59,6 +100,15 @@ export async function pipelineWorkflow( errorRecoveryRequested = true } }) + setHandler(credentialsUpdatedSignal, () => { + if (state.errored) errorRecoveryRequested = true + }) + setHandler(configUpdatedSignal, () => { + if (state.errored) errorRecoveryRequested = true + }) + setHandler(deploymentUpdatedSignal, () => { + if (state.errored) errorRecoveryRequested = true + }) // MARK: - State @@ -80,13 +130,11 @@ export async function pipelineWorkflow( } } - /** - * Returns whether active work in this run should stop because the pipeline is - * no longer active or because the workflow should roll over into continue-as-new. - */ function runInterrupted() { return ( - desiredStatus !== 'active' || operationCount >= CONTINUE_AS_NEW_THRESHOLD || !!state.errored + desiredStatus !== 'active' || + operationCount >= PIPELINE_CONTINUE_AS_NEW_THRESHOLD || + !!state.errored ) } @@ -98,6 +146,7 @@ export async function pipelineWorkflow( await condition(() => desiredStatus === 'deleted' || errorRecoveryRequested) errorRecoveryRequested = false if (desiredStatus === 'active') { + sourceState = resetErroredStreams(sourceState) await setState({ errored: false }) } } @@ -121,47 +170,76 @@ export async function pipelineWorkflow( const result = await pipelineSync(pipelineId, { input: events }) operationCount++ - if (classifySyncErrors(result.errors).permanent.length > 0) { + const { permanent, transient } = classifySyncErrors(result.errors) + if (permanent.length > 0) { await markPermanentError() return } + if (transient.length > 0) { + inputQueue.unshift(...events) + } } } - // MARK: - Reconcile loop - - async function waitForReconcileTurn(): Promise { - await condition(() => runInterrupted() || state.phase !== 'ready', ONE_WEEK_MS) - - if (runInterrupted()) { + // MARK: - Backfill (child workflow) + + async function runBackfill( + phase: 'backfilling' | 'reconciling', + workflowId: string + ): Promise { + await setState({ phase }) + const handle = await startChild(pipelineBackfillWorkflow, { + workflowId, + args: [pipelineId, { state: sourceState }], + }) + + type Outcome = + | { kind: 'done'; state: SourceState } + | { kind: 'interrupted' } + | { kind: 'failed'; error: unknown } + + const outcome: Outcome = await Promise.race([ + handle + .result() + .then((s): Outcome => ({ kind: 'done', state: s })) + .catch((err): Outcome => ({ kind: 'failed', error: err })), + condition(() => runInterrupted()).then((): Outcome => ({ kind: 'interrupted' })), + ]) + + if (outcome.kind === 'done') { + sourceState = outcome.state + await setState({ phase: 'ready' }) + return true + } + if (outcome.kind === 'interrupted') { + getExternalWorkflowHandle(workflowId).cancel().catch(() => {}) return false } - - return true + // kind === 'failed' — extract the child's latest state if available + const childState = extractStateFromChildFailure(outcome.error) + if (childState) sourceState = childState + await markPermanentError() + return false } - async function reconcileLoop(): Promise { - while (await waitForReconcileTurn()) { - if (!state.phase) { - await setState({ phase: 'backfilling' }) - } else if (state.phase === 'ready') { - await setState({ phase: 'reconciling' }) - } + // MARK: - Backfill loop (spawns child workflows) + + async function backfillLoop(): Promise { + // Resume whichever backfill phase was interrupted. `reconciling` should not + // fall back to the initial backfill path after a pause/resume cycle. + if (!state.phase || state.phase === 'backfilling') { + const ok = await runBackfill('backfilling', `backfill-${pipelineId}`) + if (!ok) return + } else if (state.phase === 'reconciling') { + const ok = await runBackfill('reconciling', `reconcile-${pipelineId}-${Date.now()}`) + if (!ok) return + } - const result = await pipelineSync(pipelineId, { - state: sourceState, - state_limit: 100, - time_limit: 10, - }) - operationCount++ - sourceState = result.state - if (classifySyncErrors(result.errors).permanent.length > 0) { - await markPermanentError() - return - } - if (result.eof?.reason === 'complete' && !state.errored) { - await setState({ phase: 'ready' }) - } + while (!runInterrupted()) { + await condition(() => runInterrupted(), ONE_WEEK_MS) + if (runInterrupted()) return + + await runBackfill('reconciling', `reconcile-${pipelineId}-${Date.now()}`) } } @@ -183,14 +261,12 @@ export async function pipelineWorkflow( await setState({ paused: true }) await condition(() => desiredStatus !== 'paused') await setState({ paused: false }) - // Re-enter root control flow after pause in case the pipeline resumed - // normally or was deleted while we were waiting. continue } - await Promise.all([liveLoop(), reconcileLoop()]) + await Promise.all([liveLoop(), backfillLoop()]) - if (operationCount >= CONTINUE_AS_NEW_THRESHOLD) { + if (operationCount >= PIPELINE_CONTINUE_AS_NEW_THRESHOLD) { return await continueAsNew(pipelineId, { desiredStatus, sourceState, @@ -201,8 +277,6 @@ export async function pipelineWorkflow( } } - // Delete stays in normal workflow control flow instead of cancellation so teardown - // can run once in the terminal path after the active loops have stopped. await setState({ teardown: 'started' }) await pipelineTeardown(pipelineId) await setState({ teardown: 'completed' }) diff --git a/docs/plans/2026-04-14-backfill-child-workflow.md b/docs/plans/2026-04-14-backfill-child-workflow.md index 12ecdfc02..e9371ae73 100644 --- a/docs/plans/2026-04-14-backfill-child-workflow.md +++ b/docs/plans/2026-04-14-backfill-child-workflow.md @@ -1,55 +1,63 @@ -# Backfill Child Workflow: Extract `backfillLoop` from `pipelineWorkflow` +# Backfill Child Workflow + Never-Fail Pipeline **Status**: Plan (not yet implemented) -**Related**: [Never-Fail Workflow](2026-04-14-never-fail-workflow.md) (error model changes that complement this restructuring) +**Context**: [PR #284](https://github.com/stripe/sync-engine-fork/pull/284) cleaned up `SKIPPABLE_ERROR_PATTERNS`; discussion about workflow failure vs. pause, child workflows vs. activities ## Problem -`pipelineWorkflow` is a monolithic workflow that handles setup, backfill, live events, reconciliation, error recovery, pause/resume, teardown, and `continueAsNew` housekeeping in one event history. +The pipeline workflow has two structural problems: -- **Backfill has no completion semantics.** "Is the backfill done?" requires inspecting internal `phase` state. There's no workflow execution you can point to and say "that's the backfill, it completed at 3pm." -- **Event history bloat.** A full backfill can invoke `pipelineSync` hundreds of times. This dominates the history and drives the `CONTINUE_AS_NEW_THRESHOLD` of 500 operations. -- **No failure isolation.** A poison stream during backfill errors the entire pipeline, stopping live event processing that may be perfectly healthy. -- **Heavy `continueAsNew` payload.** The workflow serializes all `sourceState` (stream cursors, segment state, backfill progress) through every boundary. -- **Backfill and reconcile are the same operation** but wired differently with separate phase labels and control flow. Both call `pipelineSync` with `state` + limits, which runs `listApiBackfill`, which skips complete streams and paginates incomplete ones. The only difference is starting state. +**1. Monolithic workflow does too much.** `pipelineWorkflow` handles setup, backfill, live events, reconciliation, error recovery, pause/resume, teardown, and `continueAsNew` in one event history. Backfill has no completion semantics, dominates the event history, and a poison stream during backfill kills live event processing. -## Design +**2. Workflows can die from transient errors.** When `pipelineSync` encounters only transient/system errors, it throws `ApplicationFailure.retryable`. Temporal retries up to 10 times. If the error persists, the workflow execution dies — losing state and requiring a new execution. Most `system_error` cases (connector bugs, schema mismatches) are deterministic and won't self-heal; retrying them wastes 30 minutes before the workflow dies anyway. -Extract the `reconcileLoop` into a child workflow called `backfillLoop`. The pipeline workflow becomes a lightweight entity that manages lifecycle and spawns bounded tasks. +## Design principles -### Architecture +1. **The pipeline workflow never fails.** It's an entity that lives until deleted. Errors are states, not exits. +2. **The backfill child workflow can fail.** It's a task with a goal. If it can't succeed, it should fail — but only after giving every stream a chance to complete. +3. **One stream's failure shouldn't block others.** The child runs all streams to completion (or individual failure), accumulates errors, then decides: all succeeded → return, some had non-retryable errors → fail with the full error picture. +4. **Backfill and reconcile are the same operation.** Both run `listApiBackfill` which skips complete streams and paginates incomplete ones. The only difference is starting state. + +## Architecture + +The parent has a **backfill loop** — logic that spawns child workflows for initial backfill and periodic reconciliation. Each child workflow is a single **backfill run** (`pipelineBackfillWorkflow`) that processes all streams and either succeeds or fails. ``` -pipelineWorkflow (entity — lives forever, never fails) +pipelineWorkflow (entity — never fails) │ ├── setup (activity) │ -├── executeChild(backfillLoop, { state: {} }) ← initial backfill -│ └── calls pipelineSync in a loop until complete -│ └── returns final sourceState + any errors +├── backfill loop: +│ ├── executeChild(pipelineBackfillWorkflow) ← initial backfill +│ │ ├── runs all streams, accumulates errors +│ │ ├── success: returns final sourceState +│ │ └── failure: ChildWorkflowFailure → parent parks, waits for signal +│ │ +│ └── on schedule or signal: +│ └── executeChild(pipelineBackfillWorkflow) ← reconcile (same workflow, later state) │ -├── main loop: -│ ├── receive live events via signal → pipelineSync (activity) -│ ├── on schedule or signal: -│ │ └── executeChild(backfillLoop, { state }) ← reconcile -│ │ └── skips complete streams, completes -│ └── continueAsNew when needed (lightweight) +├── live loop: +│ └── receive events via signal → pipelineSync (activity) │ -├── on error: park, wait for recovery signal +├── on child failure or live error: +│ └── park in errored state, wait for recovery signal └── on delete: teardown (activity) ``` -### `backfillLoop` child workflow +## `pipelineBackfillWorkflow` child workflow -A finite workflow that takes a source state, advances all incomplete streams to completion, and returns the final state. Same code path for initial backfill and reconciliation — the only difference is the input state. +### Behavior + +A single backfill run. Calls `pipelineSync` in a loop, processing chunks of work. Errors from individual streams are accumulated but don't stop the run — other streams continue. Only after all streams have had their chance (eof) does the child evaluate the result. ```ts -export async function backfillLoop( +export async function pipelineBackfillWorkflow( pipelineId: string, opts: { state: SourceState } -): Promise { +): Promise { let sourceState = opts.state let operationCount = 0 + const accumulatedErrors: SyncRunError[] = [] while (true) { const result = await pipelineSync(pipelineId, { @@ -60,51 +68,116 @@ export async function backfillLoop( operationCount++ sourceState = result.state - if (result.errors.length > 0) { - return { state: sourceState, errors: result.errors, completed: false } + for (const err of result.errors) { + accumulatedErrors.push(err) } if (result.eof?.reason === 'complete') { - return { state: sourceState, errors: [], completed: true } + const { permanent } = classifySyncErrors(accumulatedErrors) + if (permanent.length > 0) { + throw ApplicationFailure.nonRetryable( + summarizeSyncErrors(permanent), + 'BackfillPermanentError', + { state: sourceState, errors: permanent } + ) + } + return sourceState } if (operationCount >= BACKFILL_CONTINUE_AS_NEW_THRESHOLD) { - await continueAsNew(pipelineId, { state: sourceState }) + await continueAsNew(pipelineId, { state: sourceState }) } } } +``` + +### What this means for `pipelineSync` activity + +Today the activity has two paths: + +- Permanent errors → return `{ errors, state }` +- Transient errors → throw `ApplicationFailure.retryable` -interface BackfillLoopResult { - state: SourceState - errors: SyncRunError[] - completed: boolean +For the child workflow model, the activity should **always return** — both permanent and transient errors come back as `{ errors, state }`. The child workflow decides what to do. Transient errors from one chunk don't stop the next chunk from running (different streams may be affected). The activity's Temporal retry policy still handles transport-level failures (activity crash, network error to the engine), but classified sync errors are always returned. + +```ts +// pipeline-sync.ts — proposed +export function createPipelineSyncActivity(context: ActivitiesContext) { + return async function pipelineSync( + pipelineId: string, + opts?: SourceReadOptions & { input?: SourceInputMessage[] } + ): Promise { + // ... same as today up to drainMessages ... + // Always return — let the workflow decide + return { errors, state, eof } + } } ``` -Properties: -- **Finite**: runs until all streams complete or an error stops it -- **Own event history**: backfill pagination doesn't bloat the pipeline workflow -- **Own `continueAsNew`**: manages its own history size independently -- **Returns a result**: parent gets final state + success/error status -- **Deterministic workflow ID**: `backfill-{pipelineId}` for initial, `reconcile-{pipelineId}-{timestamp}` for scheduled runs — so the parent can find them after its own `continueAsNew` +### What this means for the source connector + +The source needs to **continue past stream errors** and mark errored streams so they don't block eof. Today `listApiBackfill` already does this partially — the `catch` block emits an error trace and `continue`s to the next stream. But the errored stream's state isn't advanced, so the next `pipelineSync` call would retry it. -Note: error handling inside `backfillLoop` (transient retry, escalation) is covered in the [Never-Fail Workflow](2026-04-14-never-fail-workflow.md) plan. The version above returns immediately on any error; the never-fail plan adds bounded in-workflow retry for transient errors before returning. +Two options: -### Why child workflow, not just activities +- **Mark errored streams as complete** (with an error flag in state) so they're skipped on the next chunk. The child accumulates the error and reports it at the end. +- **Mark errored streams as `errored`** (new status alongside `complete` and `pending`). The source skips `errored` streams the same way it skips `complete` ones. Eof fires when all streams are either `complete` or `errored`. -- A backfill can run for hours with thousands of pages — it needs its own event history budget -- It needs its own `continueAsNew` cadence, independent of the pipeline -- It has clear completion semantics ("the backfill is done" = "the workflow completed") -- Future: it could receive signals (e.g., "skip this stream", "pause backfill") +The second option is cleaner — it preserves the distinction between "successfully synced" and "gave up on this stream." The child workflow treats `errored` streams as failures in its final evaluation. -### Why `pipelineSetup` and `pipelineTeardown` stay as activities +### Error classification -- Short, bounded operations (2 min timeout) -- No independent lifecycle needed -- No signals or complex state management -- Activity retry is appropriate for transient network errors +Reclassify the catch-all `system_error` into genuinely transient vs. deterministic: -### Simplified `pipelineWorkflow` +| Error | Current type | Proposed type | +| ---------------------------- | ----------------- | ----------------------------- | +| Rate limit (429) | `transient_error` | `transient_error` (no change) | +| Auth (401/403) | `auth_error` | `auth_error` (no change) | +| Network timeout / ECONNRESET | `system_error` | `transient_error` | +| Stripe 5xx | `system_error` | `transient_error` | +| JSON parse failure | `system_error` | `system_error` → permanent | +| Connector bug (bad params) | `system_error` | `system_error` → permanent | +| Unknown stream | `config_error` | `config_error` (no change) | + +```ts +const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error', 'system_error']) +``` + +```ts +function classifyError(err: unknown): TraceError['failure_type'] { + if (err instanceof StripeApiRequestError) { + if (err.status === 401 || err.status === 403) return 'auth_error' + if (err.status === 429) return 'transient_error' + if (err.status >= 500) return 'transient_error' + } + if (isNetworkError(err)) return 'transient_error' + if (err instanceof Error && err.message.includes('Rate limit')) return 'transient_error' + return 'system_error' // deterministic by default +} +``` + +Only `transient_error` is worth retrying. Everything else is permanent — but the stream still gets marked `errored` (not retried within the same backfill run), and other streams continue. + +### Preserve `failure_type` through `collectMessages` + +Today `collectMessages` throws a plain `Error`, discarding `failure_type`. So `pipelineSetup` retries a `config_error` the same as a network blip. + +```ts +export class TraceErrorException extends Error { + constructor( + public readonly failure_type: TraceError['failure_type'], + message: string, + public readonly stream?: string + ) { + super(message) + this.name = 'TraceErrorException' + } +} +``` + +## `pipelineWorkflow` — the never-fail entity + +The pipeline workflow is the parent. It contains two loops: a **backfill loop** that spawns `pipelineBackfillWorkflow` child workflows, and a **live loop** that processes events via activities. The backfill loop handles both initial backfill and periodic reconciliation — same child workflow, different starting state. ```ts export async function pipelineWorkflow( @@ -123,34 +196,41 @@ export async function pipelineWorkflow( await setState({ setup: 'completed' }) } - // Initial backfill + // Initial backfill — spawn child, catch failure if (state.phase !== 'ready') { await setState({ phase: 'backfilling' }) - const result = await executeChild(backfillLoop, { - workflowId: `backfill-${pipelineId}`, - args: [pipelineId, { state: sourceState }], - }) - sourceState = result.state - if (!result.completed) { - await handleErrors(result.errors) - } else { + try { + sourceState = await executeChild(pipelineBackfillWorkflow, { + workflowId: `backfill-${pipelineId}`, + args: [pipelineId, { state: sourceState }], + }) await setState({ phase: 'ready' }) + } catch (err) { + await markPermanentError(extractErrorDetails(err)) } } // Main loop while (desiredStatus !== 'deleted') { - if (state.errored) { await waitForErrorRecovery(); continue } - if (desiredStatus === 'paused') { await waitForResume(); continue } + if (state.errored) { + await waitForErrorRecovery() + continue + } + if (desiredStatus === 'paused') { + await waitForResume() + continue + } await Promise.all([ - liveLoop(), // signals → pipelineSync activity - reconcileScheduler() // periodic backfillLoop child workflows + liveLoop(), + backfillLoop(), // spawns pipelineBackfillWorkflow children on a schedule ]) if (shouldContinueAsNew()) { await continueAsNew(pipelineId, { - desiredStatus, sourceState, state, + desiredStatus, + sourceState, + state, }) } } @@ -162,80 +242,141 @@ export async function pipelineWorkflow( } ``` -The `continueAsNew` payload shrinks significantly — `sourceState` is just the last completed checkpoint, not in-flight pagination cursors. `inputQueue` is no longer serialized (Temporal buffers signals). +### `liveLoop` — activities in the parent -### Reconcile as a scheduled backfill +Live events stay as activity calls in the parent workflow. Permanent errors park the workflow. -Reconciliation is "run `backfillLoop` again with the current state." The pipeline workflow schedules it: +```ts +async function liveLoop(): Promise { + while (true) { + const events = await waitForLiveEvents() + if (!events) return + + const result = await pipelineSync(pipelineId, { input: events }) + if (result.errors.length > 0) { + const { permanent } = classifySyncErrors(result.errors) + if (permanent.length > 0) { + await markPermanentError(permanent) + return + } + } + } +} +``` + +### `backfillLoop` — the loop in the parent that spawns child workflows + +This is _not_ a separate workflow — it's a function inside `pipelineWorkflow` that periodically spawns `pipelineBackfillWorkflow` child workflows. Each child is an independent run. ```ts -async function reconcileScheduler(): Promise { +async function backfillLoop(): Promise { while (!runInterrupted()) { await condition(() => reconcileRequested || runInterrupted(), ONE_WEEK_MS) if (runInterrupted()) return await setState({ phase: 'reconciling' }) - const result = await executeChild(backfillLoop, { - workflowId: `reconcile-${pipelineId}-${Date.now()}`, - args: [pipelineId, { state: sourceState }], - }) - sourceState = result.state - if (!result.completed) { - await handleErrors(result.errors) + try { + sourceState = await executeChild(pipelineBackfillWorkflow, { + workflowId: `reconcile-${pipelineId}-${Date.now()}`, + args: [pipelineId, { state: sourceState }], + }) + await setState({ phase: 'ready' }) + } catch (err) { + await markPermanentError(extractErrorDetails(err)) return } - await setState({ phase: 'ready' }) } } ``` +### Recovery signals + +| Signal | Trigger | Workflow action | +| ------------------------ | ---------------------- | -------------------------------------------------- | +| `desired_status: active` | User re-enables | Clear errored state, re-enter main loop (existing) | +| `credentials_updated` | User rotates API key | Clear if `auth_error` | +| `config_updated` | User modifies config | Clear, re-run setup if needed | +| `deployment_updated` | New connector deployed | Clear if `system_error` | + +After recovery, the parent spawns a new `pipelineBackfillWorkflow` that resumes from the last checkpointed `sourceState`. Previously-completed streams are skipped. Previously-errored streams are retried (their state resets from `errored` to `pending`). Streams that were in-flight when the child failed resume from their last cursor. + ## Observability -- **Is the backfill done?** → check if `backfill-{pipelineId}` child workflow completed +- **Is the backfill done?** → `backfill-{pipelineId}` workflow status: completed / failed / running +- **Why did it fail?** → child workflow failure has full error list: which streams failed, why - **How long did backfill take?** → child workflow start/end timestamps - **Which reconcile runs happened?** → list child workflows matching `reconcile-{pipelineId}-*` -- **Is it making progress?** → child workflow heartbeats / operation count +- **Did some streams succeed despite the failure?** → yes, `sourceState` shows which streams are complete + +## Implementation order + +### Phase 1: Activity always returns errors + +Highest impact, prerequisite for everything else. + +1. Modify `pipeline-sync.ts`: always return `{ errors, state, eof }`, never throw +2. Update `pipelineWorkflow` to handle transient errors from the activity (the current `reconcileLoop` and `liveLoop` need to classify errors instead of relying on the throw) +3. For now, permanent errors still stop the loop (existing `markPermanentError` behavior) + +### Phase 2: Add `errored` stream status to source + +Enable per-stream error isolation. + +1. Add `errored` status alongside `complete` and `pending` in source state +2. Update `listApiBackfill`: on non-retryable error, mark stream as `errored` and continue +3. Eof fires when all streams are `complete` or `errored` +4. `pipelineSync` returns errors for `errored` streams but keeps going + +### Phase 3: Extract `pipelineBackfillWorkflow` child workflow + +1. Create `apps/service/src/temporal/workflows/pipeline-backfill-workflow.ts` +2. Register in worker alongside `pipelineWorkflow` +3. Accumulate errors across chunks, evaluate at eof +4. Throw `ApplicationFailure.nonRetryable` if permanent errors exist -All visible in the Temporal UI without custom dashboards. +### Phase 4: Rewire `pipelineWorkflow` -## Implementation +1. Replace inline `reconcileLoop` with `backfillLoop` function that spawns `pipelineBackfillWorkflow` children +2. Add try/catch for `ChildWorkflowFailure` → `markPermanentError` +3. Keep `liveLoop` as activities in the parent +4. Simplify `continueAsNew` payload -### Phase 1: Create `backfillLoop` child workflow +### Phase 5: Reclassify `system_error` -1. Create `apps/service/src/temporal/workflows/backfill-loop.ts` -2. Define `BackfillLoopResult` type -3. Register in worker alongside `pipelineWorkflow` -4. Use same activity proxies (`pipelineSync`) with same timeout/retry config +1. Add `isNetworkError` helper +2. Update `classifyError` in source connector +3. Expand `PERMANENT_FAILURE_TYPES` to include `system_error` +4. Update tests -### Phase 2: Rewire `pipelineWorkflow` +### Phase 6: Preserve `failure_type` through `collectMessages` -1. Replace inline `reconcileLoop` with `executeChild(backfillLoop, ...)` -2. Handle child workflow result (errors, state, completed) -3. Move `sourceState` management to only update on child completion -4. Add `reconcileScheduler` for periodic reconcile runs -5. Keep `liveLoop` as-is (activities within the pipeline workflow) +1. Add `TraceErrorException` to `packages/protocol` +2. Update `collectMessages` to throw it +3. Update `pipelineSetup` activity to use `nonRetryableErrorTypes` -### Phase 3: Simplify `continueAsNew` +### Phase 7: Recovery signals (additive) -1. Remove `inputQueue` from `continueAsNew` payload -2. `sourceState` is now just the last completed checkpoint -3. Raise or remove `CONTINUE_AS_NEW_THRESHOLD` — the pipeline workflow generates far fewer events +1. Define new signals in `_shared.ts` +2. Add handlers in `pipelineWorkflow` +3. Wire to service API endpoints ### Migration -Existing running workflows need to transition. Deploy new workflow code, let existing workflows `continueAsNew` into the new shape. The new `pipelineWorkflow` accepts the old `PipelineWorkflowOpts` format — if `state.phase === 'backfilling'` and no child is running, spawn one. +Phase 1 ships without versioning concerns — same workflow shape, different activity behavior. Phase 3–4 (child workflow extraction) requires migration: deploy new code, existing workflows `continueAsNew` into the new shape. The new `pipelineWorkflow` accepts the old `PipelineWorkflowOpts` — if `state.phase === 'backfilling'` and no child is running, spawn a `pipelineBackfillWorkflow`. ## Constants ```ts -const BACKFILL_CONTINUE_AS_NEW_THRESHOLD = 500 // for backfillLoop -const PIPELINE_CONTINUE_AS_NEW_THRESHOLD = 1000 // pipeline is much lighter now -const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 // reconcile schedule +const BACKFILL_CONTINUE_AS_NEW_THRESHOLD = 500 // for pipelineBackfillWorkflow +const PIPELINE_CONTINUE_AS_NEW_THRESHOLD = 1000 // pipeline is lighter now +const MAX_TRANSIENT_RETRIES = 5 // for transient errors in liveLoop +const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 // reconcile schedule ``` ## Open questions -1. **Should the parent pause live events during initial backfill?** Currently live and reconcile run in parallel via `Promise.all`. With a child workflow, they're still concurrent. Should we avoid writing to the same streams from both paths? -2. **Per-stream child workflows (future)?** This plan extracts the backfill loop as a single child. A future iteration could spawn per-stream children for independent failure isolation (Airbyte model). -3. **Backfill progress reporting.** Today `updatePipelineStatus` fires on phase transitions. With a child workflow, we could also report progress (e.g., "47/50 streams complete") via queries or heartbeats. -4. **Child workflow survival across `continueAsNew`.** Child workflows don't carry over when the parent continues-as-new. Use deterministic workflow IDs so the parent can re-attach after its own `continueAsNew`. +1. **Should the parent pause live events during initial backfill?** Currently live and reconcile run in parallel. Should we avoid writing to the same streams from both paths? +2. **Per-stream child workflows (future)?** This plan has one child for all streams. A future iteration could spawn per-stream children for fully independent lifecycle management. +3. **Backfill progress reporting.** With a child workflow, we could report progress (e.g., "47/50 streams complete, 2 errored, 1 in progress") via Temporal queries. +4. **Child workflow survival across `continueAsNew`.** Child workflows don't carry over when the parent continues-as-new. Use deterministic workflow IDs so the parent can re-attach. +5. **Transient errors in `pipelineBackfillWorkflow`.** If a stream has a transient error in one chunk, should the child retry it in the next chunk (since the stream stays `pending`)? Or should transient errors that persist across N chunks escalate to `errored`? diff --git a/packages/protocol/src/helpers.ts b/packages/protocol/src/helpers.ts index b7c44fcde..fff4afc66 100644 --- a/packages/protocol/src/helpers.ts +++ b/packages/protocol/src/helpers.ts @@ -12,9 +12,24 @@ import type { SourceStateMessage, SpecMessage, StreamStatePayload, + TraceError, TraceMessage, } from './protocol.js' +// MARK: - Error types + +/** Typed error preserving failure_type from trace errors for activity-level handling. */ +export class TraceErrorException extends Error { + constructor( + public readonly failure_type: TraceError['failure_type'], + message: string, + public readonly stream?: string + ) { + super(message) + this.name = 'TraceErrorException' + } +} + // MARK: - Message constructors /** Wrap a raw object into an envelope RecordMessage. */ @@ -137,7 +152,11 @@ export async function collectMessages( if (msg.type === 'log') { logs.push(`[${msg.log.level}] ${msg.log.message}`) } else if (msg.type === 'trace' && msg.trace.trace_type === 'error') { - throw new Error(msg.trace.error.message) + throw new TraceErrorException( + msg.trace.error.failure_type, + msg.trace.error.message, + msg.trace.error.stream + ) } if (typeSet.has(msg.type)) { messages.push(msg as Extract) diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index 607329941..98be3d2ef 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -23,6 +23,8 @@ export { isDataMessage, isTraceError, isTraceStreamStatus, + // Error types + TraceErrorException, // Stream collectors collectMessages, collectFirst, diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index 8b02c88d3..ba2e0ccf3 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -540,7 +540,7 @@ describe('StripeSource', () => { source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) - // trace(stream_status started) + trace(error) + // trace(stream_status started) + trace(error) — transient errors don't mark stream errored expect(messages).toHaveLength(2) expect(messages[0]).toMatchObject({ type: 'trace', @@ -606,7 +606,7 @@ describe('StripeSource', () => { source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) - expect(messages).toHaveLength(2) + expect(messages).toHaveLength(3) const errorMsg = messages[1] as TraceMessage expect(errorMsg.type).toBe('trace') expect(errorMsg.trace.trace_type).toBe('error') @@ -657,7 +657,7 @@ describe('StripeSource', () => { }) ) - expect(messages).toHaveLength(2) + expect(messages).toHaveLength(3) expect(listFn).not.toHaveBeenCalled() expect(messages[0]).toMatchObject({ type: 'trace', @@ -709,7 +709,7 @@ describe('StripeSource', () => { source.read({ config, catalog: catalog({ name: 'tax_ids', primary_key: [['id']] }) }) ) - expect(messages).toHaveLength(2) + expect(messages).toHaveLength(3) const errorMsg = messages[1] as TraceMessage expect(errorMsg.trace.trace_type).toBe('error') const traceError = ( @@ -741,7 +741,7 @@ describe('StripeSource', () => { source.read({ config, catalog: catalog({ name: 'customers', primary_key: [['id']] }) }) ) - expect(messages).toHaveLength(2) + expect(messages).toHaveLength(3) expect(messages[1]).toMatchObject({ type: 'trace', trace: { @@ -752,6 +752,14 @@ describe('StripeSource', () => { }, }, }) + expect(messages[2]).toMatchObject({ + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'customers', + data: { status: 'errored' }, + }, + }) }) it('marks known skippable Stripe list errors as complete without emitting error traces', async () => { @@ -820,9 +828,9 @@ describe('StripeSource', () => { }) ) - // customers: started + error = 2 + // customers: started + error + errored-state = 3 // invoices: started + record + state + complete = 4 - expect(messages).toHaveLength(6) + expect(messages).toHaveLength(7) // Customers errored expect(messages[0]).toMatchObject({ @@ -836,16 +844,24 @@ describe('StripeSource', () => { type: 'trace', trace: { trace_type: 'error', error: { stream: 'customers' } }, }) + expect(messages[2]).toMatchObject({ + type: 'source_state', + source_state: { + state_type: 'stream', + stream: 'customers', + data: { status: 'errored' }, + }, + }) // Invoices succeeded - expect(messages[2]).toMatchObject({ + expect(messages[3]).toMatchObject({ type: 'trace', trace: { trace_type: 'stream_status', stream_status: { stream: 'invoices', status: 'started' }, }, }) - expect(messages[5]).toMatchObject({ + expect(messages[6]).toMatchObject({ type: 'trace', trace: { trace_type: 'stream_status', diff --git a/packages/source-stripe/src/src-list-api.ts b/packages/source-stripe/src/src-list-api.ts index 2d89a3c5e..12222bec9 100644 --- a/packages/source-stripe/src/src-list-api.ts +++ b/packages/source-stripe/src/src-list-api.ts @@ -6,15 +6,45 @@ import type { RateLimiter } from './rate-limiter.js' import { StripeApiRequestError } from '@stripe/sync-openapi' import type { StripeClient } from './client.js' +const NETWORK_ERROR_CODES = new Set([ + 'ECONNRESET', + 'ECONNREFUSED', + 'ENOTFOUND', + 'ETIMEDOUT', + 'EPIPE', + 'EAI_AGAIN', + 'UND_ERR_CONNECT_TIMEOUT', + 'UND_ERR_SOCKET', +]) + +function isNetworkError(err: unknown): boolean { + if (!(err instanceof Error)) return false + const code = (err as NodeJS.ErrnoException).code + if (code && NETWORK_ERROR_CODES.has(code)) return true + if (err.cause) return isNetworkError(err.cause) + return false +} + +function classifyError( + err: unknown +): 'transient_error' | 'auth_error' | 'system_error' { + if (err instanceof StripeApiRequestError) { + if (err.status === 401 || err.status === 403) return 'auth_error' + if (err.status === 429) return 'transient_error' + if (err.status >= 500) return 'transient_error' + } + if (isNetworkError(err)) return 'transient_error' + if (err instanceof Error && err.message.includes('Rate limit')) return 'transient_error' + return 'system_error' +} + export function errorToTrace(err: unknown, stream: string): TraceMessage { - const isRateLimit = err instanceof Error && err.message.includes('Rate limit') - const isAuth = err instanceof StripeApiRequestError && (err.status === 401 || err.status === 403) return { type: 'trace', trace: { trace_type: 'error', error: { - failure_type: isRateLimit ? 'transient_error' : isAuth ? 'auth_error' : 'system_error', + failure_type: classifyError(err), message: err instanceof Error ? err.message : String(err), stream, ...(err instanceof Error ? { stack_trace: err.stack } : {}), @@ -36,15 +66,16 @@ export function errorToTrace(err: unknown, stream: string): TraceMessage { // 400 "This endpoint is not in live mode" → not in live mode // 400 "Must provide customer" → Must provide customer // 400 "Must provide source or customer" → Must provide -// 400 "This API surface is not enabled for testmode usage." → not enabled for -// 400 "Accounts v2 is not enabled for your platform." → not enabled for +// 400 "Missing required param: customer" → Missing required param +// 400 "Unrecognized request URL (GET: /v1/exchange_rates)" → Unrecognized request URL // 400 "Your account is not set up to use Issuing." → not set up to use const SKIPPABLE_ERROR_PATTERNS = [ 'only available in testmode', 'not in live mode', - 'not enabled for', 'Must provide customer', 'Must provide ', + 'Missing required param', + 'Unrecognized request URL', 'not set up to use', ] @@ -457,7 +488,7 @@ export async function* listApiBackfill(opts: { if (!resourceConfig.listFn) continue const streamState = state?.[stream.name] - if (streamState?.status === 'complete') continue + if (streamState?.status === 'complete' || streamState?.status === 'errored') continue yield { type: 'trace', @@ -555,6 +586,12 @@ export async function* listApiBackfill(opts: { error: err instanceof Error ? err.message : String(err), }) yield errorToTrace(err, stream.name) + if (classifyError(err) !== 'transient_error') { + yield stateMsg({ + stream: stream.name, + data: { ...streamState, status: 'errored' }, + }) + } } } }