Skip to content

Commit 58dfc8a

Browse files
tonyxiaoclaudecodex
committed
fix: apply linter changes and format
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com> Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com>
1 parent 0f8a3e0 commit 58dfc8a

9 files changed

Lines changed: 73 additions & 30 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ private/
2727

2828
# Local cred storage
2929
.credentials/
30+
.stripe-sync/
3031

3132
# Local test scripts
3233
scripts/test-all-accounts.sh

apps/engine/src/api/app.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,38 @@ describe('state_limit and time_limit', () => {
730730
expect(eofEvents[0]).toMatchObject({ type: 'eof', eof: { has_more: true } })
731731
})
732732

733+
it('POST /pipeline_sync forwards sync_run_id into the emitted sync state', async () => {
734+
const app = await createApp(resolver)
735+
736+
const body = toNdjson([
737+
{
738+
type: 'record',
739+
record: {
740+
stream: 'customers',
741+
data: { id: 'cus_1' },
742+
emitted_at: '2024-01-01T00:00:00.000Z',
743+
},
744+
},
745+
{ type: 'source_state', source_state: { stream: 'customers', data: { cursor: '1' } } },
746+
])
747+
const res = await app.request('/pipeline_sync?sync_run_id=run_demo', {
748+
method: 'POST',
749+
headers: {
750+
'X-Pipeline': syncParams,
751+
...bodyHeaders(body),
752+
},
753+
body,
754+
})
755+
756+
expect(res.status).toBe(200)
757+
const events = await readNdjson<Message>(res)
758+
const eofEvent = events.find((e) => e.type === 'eof')
759+
expect(eofEvent).toMatchObject({
760+
type: 'eof',
761+
eof: { ending_state: { sync_run: { sync_run_id: 'run_demo' } } },
762+
})
763+
})
764+
733765
it('POST /read without limits returns all messages plus eof:complete', async () => {
734766
const app = await createApp(resolver)
735767

apps/engine/src/api/app.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ export async function createApp(resolver: ConnectorResolver) {
241241
description: 'Stop streaming after N seconds.',
242242
example: '10',
243243
}),
244+
sync_run_id: z.string().optional().meta({
245+
description: 'Optional sync run identifier used to track bounded sync progress.',
246+
example: 'run_demo',
247+
}),
244248
})
245249

246250
const errorResponse = {
@@ -610,7 +614,7 @@ export async function createApp(resolver: ConnectorResolver) {
610614
},
611615
})
612616
app.openapi(pipelineSyncRoute, async (c) => {
613-
const { state_limit, time_limit } = c.req.valid('query')
617+
const { state_limit, time_limit, sync_run_id } = c.req.valid('query')
614618

615619
const { pipeline, state } = getPipelineAndState(c)
616620
let input: AsyncIterable<unknown> | undefined
@@ -639,7 +643,11 @@ export async function createApp(resolver: ConnectorResolver) {
639643
)
640644
const ac = createConnectionAbort(c, onDisconnect)
641645

642-
const output = engine.pipeline_sync(pipeline, { state, state_limit, time_limit }, input)
646+
const output = engine.pipeline_sync(
647+
pipeline,
648+
{ state, state_limit, time_limit, sync_run_id },
649+
input
650+
)
643651
return ndjsonResponse(logApiStream('Engine API /pipeline_sync', output, context, startedAt), {
644652
signal: ac.signal,
645653
})

apps/engine/src/lib/remote-engine.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export function createRemoteEngine(engineUrl: string): Engine {
6868
const q: Record<string, string> = {}
6969
if (opts?.state_limit != null) q.state_limit = String(opts.state_limit)
7070
if (opts?.time_limit != null) q.time_limit = String(opts.time_limit)
71+
if (opts?.sync_run_id != null) q.sync_run_id = opts.sync_run_id
7172
return q
7273
}
7374

apps/service/src/api/app.test.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -480,9 +480,7 @@ describe('pipeline CRUD', () => {
480480
}
481481

482482
seenPipeline = JSON.parse(String(req.headers['x-pipeline']))
483-
seenState = req.headers['x-state']
484-
? JSON.parse(String(req.headers['x-state']))
485-
: undefined
483+
seenState = req.headers['x-state'] ? JSON.parse(String(req.headers['x-state'])) : undefined
486484
seenQuery = url.searchParams
487485

488486
const runProgress = {

apps/service/src/api/app.ts

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,8 @@ export function createApp(options: AppOptions) {
429429
async (c) => {
430430
const { id } = c.req.valid('param')
431431
const { state_limit, time_limit, sync_run_id, no_state } = c.req.valid('query')
432-
const body = ((c.req.valid('json') as z.infer<typeof SyncBodySchema> | undefined) ?? {}) as z.infer<
433-
typeof SyncBodySchema
434-
>
432+
const body = ((c.req.valid('json') as z.infer<typeof SyncBodySchema> | undefined) ??
433+
{}) as z.infer<typeof SyncBodySchema>
435434

436435
let pipeline: Pipeline
437436
try {
@@ -483,8 +482,7 @@ export function createApp(options: AppOptions) {
483482
type: 'log' as const,
484483
log: {
485484
level: 'error' as const,
486-
message:
487-
err instanceof Error ? err.message : `Sync failed: ${String(err)}`,
485+
message: err instanceof Error ? err.message : `Sync failed: ${String(err)}`,
488486
},
489487
}),
490488
})
@@ -506,8 +504,14 @@ export function createApp(options: AppOptions) {
506504
requestParams: {
507505
path: PipelineIdParam,
508506
query: z.object({
509-
state_limit: z.coerce.number().optional().meta({ description: 'Max state messages per iteration' }),
510-
time_limit: z.coerce.number().optional().meta({ description: 'Time limit per iteration (seconds)' }),
507+
state_limit: z.coerce
508+
.number()
509+
.optional()
510+
.meta({ description: 'Max state messages per iteration' }),
511+
time_limit: z.coerce
512+
.number()
513+
.optional()
514+
.meta({ description: 'Time limit per iteration (seconds)' }),
511515
}),
512516
},
513517
responses: {
@@ -548,16 +552,12 @@ export function createApp(options: AppOptions) {
548552
})
549553

550554
const syncRunId = crypto.randomUUID()
551-
const result = await runBackfillToCompletion(
552-
{ pipelineSync: activities.pipelineSync },
553-
id,
554-
{
555-
syncState: pipeline.sync_state ?? emptySyncState(),
556-
syncRunId,
557-
stateLimit: state_limit ?? 100,
558-
timeLimit: time_limit ?? 30,
559-
}
560-
)
555+
const result = await runBackfillToCompletion({ pipelineSync: activities.pipelineSync }, id, {
556+
syncState: pipeline.sync_state ?? emptySyncState(),
557+
syncRunId,
558+
stateLimit: state_limit ?? 100,
559+
timeLimit: time_limit ?? 30,
560+
})
561561

562562
return c.json({ eof: result.eof, sync_state: result.syncState }, 200)
563563
}

apps/service/src/cli.test.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,9 @@ function buildMockApp() {
172172
})
173173
}
174174

175-
const body =
176-
req.headers.get('content-type')?.includes('application/json') ? await req.json() : {}
175+
const body = req.headers.get('content-type')?.includes('application/json')
176+
? await req.json()
177+
: {}
177178
syncRequests.push({
178179
id,
179180
query: Object.fromEntries(url.searchParams.entries()),

apps/service/src/cli.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ const serveCmd = defineCommand({
107107
},
108108
'engine-url': {
109109
type: 'string',
110-
description: 'Optional sync engine URL for ad-hoc sync execution. If omitted, runs in-process.',
110+
description:
111+
'Optional sync engine URL for ad-hoc sync execution. If omitted, runs in-process.',
111112
},
112113
},
113114
async run({ args }) {

apps/service/src/temporal/workflows/pipeline-backfill.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ export async function pipelineBackfill(
3131
let operationCount = 0
3232

3333
while (true) {
34-
const result = await backfillStep(
35-
{ pipelineSync },
36-
pipelineId,
37-
{ syncState, syncRunId, stateLimit: 100, timeLimit: 30 }
38-
)
34+
const result = await backfillStep({ pipelineSync }, pipelineId, {
35+
syncState,
36+
syncRunId,
37+
stateLimit: 100,
38+
timeLimit: 30,
39+
})
3940
syncState = result.syncState
4041
operationCount++
4142

0 commit comments

Comments
 (0)