Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 69 additions & 27 deletions apps/service/src/__tests__/workflow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,26 @@ import { Worker } from '@temporalio/worker'
import path from 'node:path'
import type { SyncActivities } from '../temporal/activities/index.js'
import type { RunResult } from '../temporal/activities/index.js'
import { CONTINUE_AS_NEW_THRESHOLD } from '../lib/utils.js'

type SourceInput = unknown

// Point directly at the workflow index to avoid resolving the legacy dist/temporal/workflows.js file.
const workflowsPath = path.resolve(process.cwd(), 'dist/temporal/workflows/index.js')

const emptyState = { streams: {}, global: {} }
const noErrors: RunResult = { errors: [], state: emptyState }
const noErrorsComplete: RunResult = { errors: [], state: emptyState, eof: { reason: 'complete' } }
const permanentSyncError: RunResult = {
errors: [{ message: 'permanent sync failure', failure_type: 'auth_error', stream: 'customers' }],
state: emptyState,
}

// Workflows now receive only the pipelineId string
const testPipelineId = 'test_pipe'

function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities {
const activities = {
discoverCatalog: async () => ({ streams: [] }),
pipelineSetup: async () => {},
pipelineSync: async () => noErrors,
pipelineSync: async () => noErrorsComplete,
pipelineTeardown: async () => {},
updatePipelineStatus: async () => {},
...overrides,
Expand All @@ -39,7 +37,6 @@ function stubActivities(overrides: Partial<SyncActivities> = {}): SyncActivities
} as SyncActivities
}

/** Signal the workflow to delete. */
async function signalDelete(handle: { signal: (name: string, arg: string) => Promise<void> }) {
await handle.signal('desired_status', 'deleted')
}
Expand Down Expand Up @@ -76,7 +73,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
},
pipelineSync: async () => {
runCallCount++
return noErrors
return noErrorsComplete
},
}),
})
Expand All @@ -88,7 +85,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
taskQueue: 'test-queue-1',
})

// Let it sync several reconciliation pages
await new Promise((r) => setTimeout(r, 2000))

const status = await handle.query('status')
Expand All @@ -112,7 +108,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
activities: stubActivities({
pipelineSync: async (pipelineId: string, opts?) => {
syncCalls.push({ pipelineId, input: opts?.input ?? undefined })
return noErrors
if (opts?.input) return noErrors
return noErrorsComplete
},
}),
})
Expand All @@ -124,10 +121,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
taskQueue: 'test-queue-2',
})

// Let reconciliation start
await new Promise((r) => setTimeout(r, 1500))

// Send events
await signalSourceInput(handle, {
id: 'evt_1',
type: 'customer.created',
Expand All @@ -141,7 +136,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
await signalDelete(handle)
await handle.result()

// Find event-bearing sync calls (input is defined)
const eventCalls = syncCalls.filter((c) => c.input)
expect(eventCalls.length).toBeGreaterThanOrEqual(1)

Expand All @@ -153,7 +147,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
])
)

// All calls should use the test pipeline ID
for (const call of syncCalls) {
expect(call.pipelineId).toBe(testPipelineId)
}
Expand Down Expand Up @@ -183,7 +176,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
if (inputInFlight > 0) overlapped = true
await new Promise((r) => setTimeout(r, 250))
backfillInFlight--
return noErrors
return noErrorsComplete
},
}),
})
Expand Down Expand Up @@ -213,6 +206,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
let liveStartsWhileBackfill = 0
let liveBatchCount = 0
let liveEventCount = 0
let backfillCalls = 0

const worker = await Worker.create({
connection: testEnv.nativeConnection,
Expand All @@ -228,10 +222,11 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
return noErrors
}

backfillCalls++
backfillInFlight++
try {
await new Promise((r) => setTimeout(r, 600))
return noErrors
return noErrorsComplete
} finally {
backfillInFlight--
}
Expand All @@ -254,7 +249,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
})
}

await new Promise((r) => setTimeout(r, 350))
await new Promise((r) => setTimeout(r, 1500))
await signalDelete(handle)
await handle.result()

Expand Down Expand Up @@ -299,7 +294,6 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {

it('reports phase-driven status transitions through teardown', async () => {
const statusWrites: string[] = []
let reconcileCalls = 0

const worker = await Worker.create({
connection: testEnv.nativeConnection,
Expand All @@ -311,9 +305,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
},
pipelineSync: async (_pipelineId: string, opts?) => {
if (opts?.input) return noErrors

reconcileCalls++
return reconcileCalls === 1 ? { ...noErrors, eof: { reason: 'complete' } } : noErrors
return noErrorsComplete
},
}),
})
Expand Down Expand Up @@ -391,7 +383,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
throw new Error('transient sync failure')
}

return { ...noErrors, eof: { reason: 'complete' as const } }
return noErrorsComplete
},
}),
})
Expand All @@ -413,6 +405,54 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
})
})

it('continues past returned transient errors without entering error state', async () => {
const statusWrites: string[] = []
let reconcileCalls = 0

const transientSyncError: RunResult = {
errors: [
{ message: 'transient sync failure', failure_type: 'transient_error', stream: 'customers' },
],
state: emptyState,
}

const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue: 'test-queue-3b-transient',
workflowsPath,
activities: stubActivities({
updatePipelineStatus: async (_id: string, status: string) => {
statusWrites.push(status)
},
pipelineSync: async (_pipelineId: string, opts?) => {
if (opts?.input) return noErrors

reconcileCalls++
if (reconcileCalls === 1) {
return { ...transientSyncError, eof: { reason: 'complete' as const } }
}
return noErrorsComplete
},
}),
})

await worker.runUntil(async () => {
const handle = await testEnv.client.workflow.start('pipelineWorkflow', {
args: [testPipelineId],
workflowId: 'test-sync-3b-transient',
taskQueue: 'test-queue-3b-transient',
})

await new Promise((r) => setTimeout(r, 2000))
await signalDelete(handle)
await handle.result()

expect(reconcileCalls).toBeGreaterThanOrEqual(1)
expect(statusWrites).toContain('ready')
expect(statusWrites).not.toContain('error')
})
})

it('queues live events while paused and drains them after resume', async () => {
const syncCalls: { input?: SourceInput[] }[] = []

Expand All @@ -424,7 +464,8 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
pipelineSync: async (_pipelineId: string, opts?) => {
syncCalls.push({ input: opts?.input ?? undefined })
await new Promise((r) => setTimeout(r, 50))
return noErrors
if (opts?.input) return noErrors
return noErrorsComplete
},
}),
})
Expand Down Expand Up @@ -469,10 +510,10 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
taskQueue: 'test-queue-4',
workflowsPath,
activities: stubActivities({
pipelineSync: async () => {
// Slow sync so delete arrives mid-reconciliation
pipelineSync: async (_pipelineId: string, opts?) => {
if (opts?.input) return noErrors
await new Promise((r) => setTimeout(r, 500))
return noErrors
return noErrorsComplete
},
pipelineTeardown: async (): Promise<void> => {
teardownCalled = true
Expand Down Expand Up @@ -507,6 +548,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
return {
errors: [],
state: { streams: { customers: { cursor: `cus_${syncCallCount}` } }, global: {} },
eof: { reason: 'complete' as const },
}
},
}),
Expand Down Expand Up @@ -546,9 +588,9 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
},
pipelineSync: async () => {
syncCallCount++
if (syncCallCount > CONTINUE_AS_NEW_THRESHOLD) crossedThresholdResolve?.()
if (syncCallCount > PIPELINE_CONTINUE_AS_NEW_THRESHOLD) crossedThresholdResolve?.()
await new Promise((r) => setTimeout(r, 1))
return noErrors
return noErrorsComplete
},
}),
})
Expand All @@ -564,7 +606,7 @@ describe('pipelineWorkflow (unit — stubbed activities)', () => {
await signalDelete(handle)
await handle.result()

expect(syncCallCount).toBeGreaterThan(CONTINUE_AS_NEW_THRESHOLD)
expect(syncCallCount).toBeGreaterThan(PIPELINE_CONTINUE_AS_NEW_THRESHOLD)
expect(setupCalls).toBe(1)
})
})
Expand Down
15 changes: 0 additions & 15 deletions apps/service/src/temporal/activities/pipeline-sync.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { ApplicationFailure } from '@temporalio/activity'
import type { SourceInputMessage, SourceReadOptions } from '@stripe/sync-engine'
import type { EofPayload } from '@stripe/sync-protocol'
import type { ActivitiesContext } from './_shared.js'
import { asIterable, drainMessages, type RunResult } from './_shared.js'
import { classifySyncErrors, summarizeSyncErrors } from '../sync-errors.js'

export function createPipelineSyncActivity(context: ActivitiesContext) {
return async function pipelineSync(
Expand All @@ -18,7 +16,6 @@ export function createPipelineSyncActivity(context: ActivitiesContext) {
context.engine.pipeline_sync(config, readOpts, input),
readOpts.state
)
// Full replacement — connector emits the complete updated config
if (sourceConfig) {
const type = pipeline.source.type
await context.pipelineStore.update(pipelineId, {
Expand All @@ -31,18 +28,6 @@ export function createPipelineSyncActivity(context: ActivitiesContext) {
destination: { type, [type]: destConfig },
})
}
const { transient, permanent } = classifySyncErrors(errors)
if (permanent.length > 0) {
if (transient.length > 0) {
console.warn(
`Transient errors suppressed by permanent failures: ${summarizeSyncErrors(transient)}`
)
}
return { errors, state, eof }
}
if (transient.length > 0) {
throw ApplicationFailure.retryable(summarizeSyncErrors(transient), 'TransientSyncError')
}
return { errors, state, eof }
}
}
2 changes: 1 addition & 1 deletion apps/service/src/temporal/sync-errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export type ClassifiedSyncErrors = {
permanent: SyncRunError[]
}

const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error'])
const PERMANENT_FAILURE_TYPES = new Set(['config_error', 'auth_error', 'system_error'])

export function classifySyncErrors(errors: SyncRunError[]): ClassifiedSyncErrors {
const transient: SyncRunError[] = []
Expand Down
5 changes: 5 additions & 0 deletions apps/service/src/temporal/workflows/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ export const sourceInputSignal = defineSignal<[SourceInputMessage]>('source_inpu
/** Carries the new desired_status value — workflow updates its local state directly. */
export const desiredStatusSignal = defineSignal<[DesiredStatus]>('desired_status')

/** Signals that notify the workflow about external changes, enabling targeted error recovery. */
export const credentialsUpdatedSignal = defineSignal('credentials_updated')
export const configUpdatedSignal = defineSignal('config_updated')
export const deploymentUpdatedSignal = defineSignal('deployment_updated')

export const { pipelineSetup, pipelineTeardown } = proxyActivities<SyncActivities>({
startToCloseTimeout: '2m',
retry: retryPolicy,
Expand Down
1 change: 1 addition & 0 deletions apps/service/src/temporal/workflows/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { pipelineWorkflow } from './pipeline-workflow.js'
export { pipelineBackfillWorkflow } from './pipeline-backfill-workflow.js'
52 changes: 52 additions & 0 deletions apps/service/src/temporal/workflows/pipeline-backfill-workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { ApplicationFailure, continueAsNew } from '@temporalio/workflow'

import type { SourceState } from '@stripe/sync-protocol'
import { classifySyncErrors, summarizeSyncErrors, type SyncRunError } from '../sync-errors.js'
import { pipelineSync } from './_shared.js'

const BACKFILL_CONTINUE_AS_NEW_THRESHOLD = 500

export async function pipelineBackfillWorkflow(
pipelineId: string,
opts: { state: SourceState; accumulatedErrors?: SyncRunError[] }
): Promise<SourceState> {
let sourceState = opts.state
let operationCount = 0
const accumulatedErrors: SyncRunError[] = opts.accumulatedErrors
? [...opts.accumulatedErrors]
: []

while (true) {
const result = await pipelineSync(pipelineId, {
state: sourceState,
state_limit: 100,
time_limit: 10,
})
operationCount++
sourceState = result.state

for (const err of result.errors) {
accumulatedErrors.push(err)
}

if (result.eof?.reason === 'complete') {
const { permanent } = classifySyncErrors(accumulatedErrors)
if (permanent.length > 0) {
throw ApplicationFailure.nonRetryable(
summarizeSyncErrors(permanent),
'BackfillPermanentError',
sourceState
)
}
return sourceState
}

if (operationCount >= BACKFILL_CONTINUE_AS_NEW_THRESHOLD) {
const { permanent } = classifySyncErrors(accumulatedErrors)
await continueAsNew<typeof pipelineBackfillWorkflow>(pipelineId, {
state: sourceState,
accumulatedErrors: permanent,
})
}
}
}
Loading
Loading