Skip to content

Commit d8f8980

Browse files
committed
feat: pipeline backfill child workflow + never-fail error model
Implements the backfill child workflow plan: Phase 1: pipelineSync activity always returns errors instead of throwing ApplicationFailure.retryable - eliminates workflow death path Phase 2: per-stream error isolation - errored streams get status 'errored' in source state so other streams can continue; eof fires when all streams are complete or errored Phase 3: new pipelineBackfillWorkflow child workflow that accumulates errors across chunks and fails with nonRetryable at eof if permanent errors exist Phase 4: pipelineWorkflow spawns pipelineBackfillWorkflow via startChild with race against runInterrupted for clean cancellation; backfill runs concurrently with liveLoop in Promise.all Phase 5: reclassify system_error - network errors and 5xx become transient_error; system_error is now permanent in the classifier Phase 6: TraceErrorException preserves failure_type through collectMessages instead of discarding it with plain Error Phase 7: recovery signals (credentials_updated, config_updated, deployment_updated) trigger error recovery in the pipeline workflow Made-with: Cursor Committed-By-Agent: cursor Made-with: Cursor Committed-By-Agent: cursor Made-with: Cursor Committed-By-Agent: cursor Made-with: Cursor Committed-By-Agent: cursor
1 parent c60ab22 commit d8f8980

12 files changed

Lines changed: 370 additions & 132 deletions

File tree

apps/service/src/__tests__/workflow.test.ts

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,26 @@ import { Worker } from '@temporalio/worker'
44
import path from 'node:path'
55
import type { SyncActivities } from '../temporal/activities/index.js'
66
import type { RunResult } from '../temporal/activities/index.js'
7-
import { CONTINUE_AS_NEW_THRESHOLD } from '../lib/utils.js'
87

98
type SourceInput = unknown
109

11-
// Point directly at the workflow index to avoid resolving the legacy dist/temporal/workflows.js file.
1210
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows/index.js')
1311

1412
const emptyState = { streams: {}, global: {} }
1513
const noErrors: RunResult = { errors: [], state: emptyState }
14+
const noErrorsComplete: RunResult = { errors: [], state: emptyState, eof: { reason: 'complete' } }
1615
const permanentSyncError: RunResult = {
1716
errors: [{ message: 'permanent sync failure', failure_type: 'auth_error', stream: 'customers' }],
1817
state: emptyState,
1918
}
2019

21-
// Workflows now receive only the pipelineId string
2220
const testPipelineId = 'test_pipe'
2321

2422
function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities {
2523
const activities = {
2624
discoverCatalog: async () => ({ streams: [] }),
2725
pipelineSetup: async () => {},
28-
pipelineSync: async () => noErrors,
26+
pipelineSync: async () => noErrorsComplete,
2927
pipelineTeardown: async () => {},
3028
updatePipelineStatus: async () => {},
3129
...overrides,
@@ -39,7 +37,6 @@ function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities
3937
} as SyncActivities
4038
}
4139

42-
/** Signal the workflow to delete. */
4340
async function signalDelete(handle: { signal: (name: string, arg: string) => Promise<void> }) {
4441
await handle.signal('desired_status', 'deleted')
4542
}
@@ -76,7 +73,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
7673
},
7774
pipelineSync: async () => {
7875
runCallCount++
79-
return noErrors
76+
return noErrorsComplete
8077
},
8178
}),
8279
})
@@ -88,7 +85,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
8885
taskQueue: 'test-queue-1',
8986
})
9087

91-
// Let it sync several reconciliation pages
9288
await new Promise((r) => setTimeout(r, 2000))
9389

9490
const status = await handle.query('status')
@@ -112,7 +108,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
112108
activities: stubActivities({
113109
pipelineSync: async (pipelineId: string, opts?) => {
114110
syncCalls.push({ pipelineId, input: opts?.input ?? undefined })
115-
return noErrors
111+
if (opts?.input) return noErrors
112+
return noErrorsComplete
116113
},
117114
}),
118115
})
@@ -124,10 +121,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
124121
taskQueue: 'test-queue-2',
125122
})
126123

127-
// Let reconciliation start
128124
await new Promise((r) => setTimeout(r, 1500))
129125

130-
// Send events
131126
await signalSourceInput(handle, {
132127
id: 'evt_1',
133128
type: 'customer.created',
@@ -141,7 +136,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
141136
await signalDelete(handle)
142137
await handle.result()
143138

144-
// Find event-bearing sync calls (input is defined)
145139
const eventCalls = syncCalls.filter((c) => c.input)
146140
expect(eventCalls.length).toBeGreaterThanOrEqual(1)
147141

@@ -153,7 +147,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
153147
])
154148
)
155149

156-
// All calls should use the test pipeline ID
157150
for (const call of syncCalls) {
158151
expect(call.pipelineId).toBe(testPipelineId)
159152
}
@@ -183,7 +176,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
183176
if (inputInFlight > 0) overlapped = true
184177
await new Promise((r) => setTimeout(r, 250))
185178
backfillInFlight--
186-
return noErrors
179+
return noErrorsComplete
187180
},
188181
}),
189182
})
@@ -213,6 +206,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
213206
let liveStartsWhileBackfill = 0
214207
let liveBatchCount = 0
215208
let liveEventCount = 0
209+
let backfillCalls = 0
216210

217211
const worker = await Worker.create({
218212
connection: testEnv.nativeConnection,
@@ -228,10 +222,11 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
228222
return noErrors
229223
}
230224

225+
backfillCalls++
231226
backfillInFlight++
232227
try {
233228
await new Promise((r) => setTimeout(r, 600))
234-
return noErrors
229+
return noErrorsComplete
235230
} finally {
236231
backfillInFlight--
237232
}
@@ -254,7 +249,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
254249
})
255250
}
256251

257-
await new Promise((r) => setTimeout(r, 350))
252+
await new Promise((r) => setTimeout(r, 1500))
258253
await signalDelete(handle)
259254
await handle.result()
260255

@@ -299,7 +294,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
299294

300295
it('reports phase-driven status transitions through teardown', async () => {
301296
const statusWrites: string[] = []
302-
let reconcileCalls = 0
303297

304298
const worker = await Worker.create({
305299
connection: testEnv.nativeConnection,
@@ -311,9 +305,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
311305
},
312306
pipelineSync: async (_pipelineId: string, opts?) => {
313307
if (opts?.input) return noErrors
314-
315-
reconcileCalls++
316-
return reconcileCalls === 1 ? { ...noErrors, eof: { reason: 'complete' } } : noErrors
308+
return noErrorsComplete
317309
},
318310
}),
319311
})
@@ -391,7 +383,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
391383
throw new Error('transient sync failure')
392384
}
393385

394-
return { ...noErrors, eof: { reason: 'complete' as const } }
386+
return noErrorsComplete
395387
},
396388
}),
397389
})
@@ -413,6 +405,54 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
413405
})
414406
})
415407

408+
it('continues past returned transient errors without entering error state', async () => {
409+
const statusWrites: string[] = []
410+
let reconcileCalls = 0
411+
412+
const transientSyncError: RunResult = {
413+
errors: [
414+
{ message: 'transient sync failure', failure_type: 'transient_error', stream: 'customers' },
415+
],
416+
state: emptyState,
417+
}
418+
419+
const worker = await Worker.create({
420+
connection: testEnv.nativeConnection,
421+
taskQueue: 'test-queue-3b-transient',
422+
workflowsPath,
423+
activities: stubActivities({
424+
updatePipelineStatus: async (_id: string, status: string) => {
425+
statusWrites.push(status)
426+
},
427+
pipelineSync: async (_pipelineId: string, opts?) => {
428+
if (opts?.input) return noErrors
429+
430+
reconcileCalls++
431+
if (reconcileCalls === 1) {
432+
return { ...transientSyncError, eof: { reason: 'complete' as const } }
433+
}
434+
return noErrorsComplete
435+
},
436+
}),
437+
})
438+
439+
await worker.runUntil(async () => {
440+
const handle = await testEnv.client.workflow.start('pipelineWorkflow', {
441+
args: [testPipelineId],
442+
workflowId: 'test-sync-3b-transient',
443+
taskQueue: 'test-queue-3b-transient',
444+
})
445+
446+
await new Promise((r) => setTimeout(r, 2000))
447+
await signalDelete(handle)
448+
await handle.result()
449+
450+
expect(reconcileCalls).toBeGreaterThanOrEqual(1)
451+
expect(statusWrites).toContain('ready')
452+
expect(statusWrites).not.toContain('error')
453+
})
454+
})
455+
416456
it('queues live events while paused and drains them after resume', async () => {
417457
const syncCalls: { input?: SourceInput[] }[] = []
418458

@@ -424,7 +464,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
424464
pipelineSync: async (_pipelineId: string, opts?) => {
425465
syncCalls.push({ input: opts?.input ?? undefined })
426466
await new Promise((r) => setTimeout(r, 50))
427-
return noErrors
467+
if (opts?.input) return noErrors
468+
return noErrorsComplete
428469
},
429470
}),
430471
})
@@ -469,10 +510,10 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
469510
taskQueue: 'test-queue-4',
470511
workflowsPath,
471512
activities: stubActivities({
472-
pipelineSync: async () => {
473-
// Slow sync so delete arrives mid-reconciliation
513+
pipelineSync: async (_pipelineId: string, opts?) => {
514+
if (opts?.input) return noErrors
474515
await new Promise((r) => setTimeout(r, 500))
475-
return noErrors
516+
return noErrorsComplete
476517
},
477518
pipelineTeardown: async (): Promise<void> => {
478519
teardownCalled = true
@@ -507,6 +548,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
507548
return {
508549
errors: [],
509550
state: { streams: { customers: { cursor: `cus_${syncCallCount}` } }, global: {} },
551+
eof: { reason: 'complete' as const },
510552
}
511553
},
512554
}),
@@ -546,9 +588,9 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
546588
},
547589
pipelineSync: async () => {
548590
syncCallCount++
549-
if (syncCallCount > CONTINUE_AS_NEW_THRESHOLD) crossedThresholdResolve?.()
591+
if (syncCallCount > PIPELINE_CONTINUE_AS_NEW_THRESHOLD) crossedThresholdResolve?.()
550592
await new Promise((r) => setTimeout(r, 1))
551-
return noErrors
593+
return noErrorsComplete
552594
},
553595
}),
554596
})
@@ -564,7 +606,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
564606
await signalDelete(handle)
565607
await handle.result()
566608

567-
expect(syncCallCount).toBeGreaterThan(CONTINUE_AS_NEW_THRESHOLD)
609+
expect(syncCallCount).toBeGreaterThan(PIPELINE_CONTINUE_AS_NEW_THRESHOLD)
568610
expect(setupCalls).toBe(1)
569611
})
570612
})
Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import { ApplicationFailure } from '@temporalio/activity'
21
import type { SourceInputMessage, SourceReadOptions } from '@stripe/sync-engine'
32
import type { EofPayload } from '@stripe/sync-protocol'
43
import type { ActivitiesContext } from './_shared.js'
54
import { asIterable, drainMessages, type RunResult } from './_shared.js'
6-
import { classifySyncErrors, summarizeSyncErrors } from '../sync-errors.js'
75

86
export function createPipelineSyncActivity(context: ActivitiesContext) {
97
return async function pipelineSync(
@@ -18,7 +16,6 @@ export function createPipelineSyncActivity(context: ActivitiesContext) {
1816
context.engine.pipeline_sync(config, readOpts, input),
1917
readOpts.state
2018
)
21-
// Full replacement — connector emits the complete updated config
2219
if (sourceConfig) {
2320
const type = pipeline.source.type
2421
await context.pipelineStore.update(pipelineId, {
@@ -31,18 +28,6 @@ export function createPipelineSyncActivity(context: ActivitiesContext) {
3128
destination: { type, [type]: destConfig },
3229
})
3330
}
34-
const { transient, permanent } = classifySyncErrors(errors)
35-
if (permanent.length > 0) {
36-
if (transient.length > 0) {
37-
console.warn(
38-
`Transient errors suppressed by permanent failures: ${summarizeSyncErrors(transient)}`
39-
)
40-
}
41-
return { errors, state, eof }
42-
}
43-
if (transient.length > 0) {
44-
throw ApplicationFailure.retryable(summarizeSyncErrors(transient), 'TransientSyncError')
45-
}
4631
return { errors, state, eof }
4732
}
4833
}

apps/service/src/temporal/sync-errors.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export type ClassifiedSyncErrors = {
99
permanent: SyncRunError[]
1010
}
1111

12-
const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error'])
12+
const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error', 'system_error'])
1313

1414
export function classifySyncErrors(errors: SyncRunError[]): ClassifiedSyncErrors {
1515
const transient: SyncRunError[] = []

apps/service/src/temporal/workflows/_shared.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ export const sourceInputSignal = defineSignal<[SourceInputMessage]>('source_inpu
99
/** Carries the new desired_status value — workflow updates its local state directly. */
1010
export const desiredStatusSignal = defineSignal<[DesiredStatus]>('desired_status')
1111

12+
/** Signals that notify the workflow about external changes, enabling targeted error recovery. */
13+
export const credentialsUpdatedSignal = defineSignal('credentials_updated')
14+
export const configUpdatedSignal = defineSignal('config_updated')
15+
export const deploymentUpdatedSignal = defineSignal('deployment_updated')
16+
1217
export const { pipelineSetup, pipelineTeardown } = proxyActivities<SyncActivities>({
1318
startToCloseTimeout: '2m',
1419
retry: retryPolicy,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export { pipelineWorkflow } from './pipeline-workflow.js'
2+
export { pipelineBackfillWorkflow } from './pipeline-backfill-workflow.js'
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { ApplicationFailure, continueAsNew } from '@temporalio/workflow'
2+
3+
import type { SourceState } from '@stripe/sync-protocol'
4+
import { classifySyncErrors, summarizeSyncErrors, type SyncRunError } from '../sync-errors.js'
5+
import { pipelineSync } from './_shared.js'
6+
7+
const BACKFILL_CONTINUE_AS_NEW_THRESHOLD = 500
8+
9+
export async function pipelineBackfillWorkflow(
10+
pipelineId: string,
11+
opts: { state: SourceState; accumulatedErrors?: SyncRunError[] }
12+
): Promise<SourceState> {
13+
let sourceState = opts.state
14+
let operationCount = 0
15+
const accumulatedErrors: SyncRunError[] = opts.accumulatedErrors
16+
? [...opts.accumulatedErrors]
17+
: []
18+
19+
while (true) {
20+
const result = await pipelineSync(pipelineId, {
21+
state: sourceState,
22+
state_limit: 100,
23+
time_limit: 10,
24+
})
25+
operationCount++
26+
sourceState = result.state
27+
28+
for (const err of result.errors) {
29+
accumulatedErrors.push(err)
30+
}
31+
32+
if (result.eof?.reason === 'complete') {
33+
const { permanent } = classifySyncErrors(accumulatedErrors)
34+
if (permanent.length > 0) {
35+
throw ApplicationFailure.nonRetryable(
36+
summarizeSyncErrors(permanent),
37+
'BackfillPermanentError',
38+
sourceState
39+
)
40+
}
41+
return sourceState
42+
}
43+
44+
if (operationCount >= BACKFILL_CONTINUE_AS_NEW_THRESHOLD) {
45+
const { permanent } = classifySyncErrors(accumulatedErrors)
46+
await continueAsNew<typeof pipelineBackfillWorkflow>(pipelineId, {
47+
state: sourceState,
48+
accumulatedErrors: permanent,
49+
})
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)