Skip to content

Commit 5a57f3f

Browse files
tonyxiaoclaude
andcommitted
feat: add CLI progress display and extract sync progress reducer
Extract sync display state as a pure reducer over SyncOutput messages that accumulates into an EofPayload shape. The renderer is a stateless function of (EofPayload, catalog) -> string[]. - Add --progress flag to sync command (auto-enabled on TTY) - Add --base-url flag for QA/non-prod Stripe API endpoints - Extract sync-progress-state.ts: createSyncDisplayState() reducer + renderSyncProgress() renderer with emoji status groups - Deduplicate: app.ts formatEof now delegates to renderSyncProgress - Suppress raw source stream_status traces in trackProgress (engine re-emits enriched versions) - Add scripts/test-all-accounts.sh for multi-account testing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
1 parent cf25f57 commit 5a57f3f

5 files changed

Lines changed: 328 additions & 123 deletions

File tree

apps/engine/src/api/app.ts

Lines changed: 2 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { HTTPException } from 'hono/http-exception'
66
import pg from 'pg'
77
import type { Message, ConnectorResolver, TraceMessage } from '../lib/index.js'
88
import type { EofPayload } from '@stripe/sync-protocol'
9+
import { renderSyncProgress } from '../lib/sync-progress-state.js'
910
import {
1011
createEngine,
1112
createConnectorSchemas,
@@ -119,128 +120,8 @@ async function* logApiStream<T>(
119120

120121
const dangerouslyVerbose = process.env.DANGEROUSLY_VERBOSE_LOGGING === 'true'
121122

122-
const REASON_EMOJI: Record<string, string> = {
123-
complete: '✅',
124-
time_limit: '⏱️',
125-
state_limit: '📦',
126-
error: '❌',
127-
aborted: '🛑',
128-
}
129-
130-
const ERROR_EMOJI: Record<string, string> = {
131-
transient_error: '⚠️',
132-
system_error: '❌',
133-
config_error: '⚙️',
134-
auth_error: '🔒',
135-
}
136-
137-
function formatDuration(ms: number): string {
138-
if (ms < 1000) return `${ms}ms`
139-
if (ms < 60_000) return `${(ms / 1000).toFixed(1)}s`
140-
const mins = Math.floor(ms / 60_000)
141-
const secs = Math.round((ms % 60_000) / 1000)
142-
if (mins < 60) return secs > 0 ? `${mins}m ${secs}s` : `${mins}m`
143-
const hrs = Math.floor(mins / 60)
144-
const remainMins = mins % 60
145-
return remainMins > 0 ? `${hrs}h ${remainMins}m` : `${hrs}h`
146-
}
147-
148-
function formatNumber(n: number): string {
149-
return n.toLocaleString('en-US')
150-
}
151-
152123
function formatEof(eof: EofPayload): string {
153-
const emoji = REASON_EMOJI[eof.reason] ?? '❓'
154-
const gp = eof.global_progress
155-
const runRecords = gp?.run_record_count ?? 0
156-
const cumulativeRecords = gp?.cumulative_record_count ?? runRecords
157-
const rps = gp?.records_per_second?.toFixed(1) ?? '0'
158-
const runElapsed = gp?.elapsed_ms ?? 0
159-
const cumulativeElapsed = gp?.cumulative_elapsed_ms ?? runElapsed
160-
const cumulativeRequests = gp?.cumulative_request_count ?? 0
161-
const runRequests = gp?.request_count ?? 0
162-
163-
const lines: string[] = []
164-
lines.push(`${emoji} Sync ${eof.reason}`)
165-
lines.push(
166-
` Total: ${formatNumber(cumulativeRecords)} records | ${formatNumber(cumulativeRequests)} requests | ${formatDuration(cumulativeElapsed)}`
167-
)
168-
lines.push(
169-
` This run: +${formatNumber(runRecords)} records | ${formatNumber(runRequests)} requests | ${formatDuration(runElapsed)} | ${rps} records/s`
170-
)
171-
172-
const sp = eof.stream_progress
173-
if (sp) {
174-
type StreamEntry = { name: string; cumulative: number; run: number; errors: StreamError[] }
175-
type StreamError = { message: string; failure_type?: string }
176-
const completeStreams: StreamEntry[] = []
177-
const startedStreams: StreamEntry[] = []
178-
let errorsCount = 0
179-
180-
for (const [name, s] of Object.entries(sp)) {
181-
const entry: StreamEntry = {
182-
name,
183-
cumulative: s.cumulative_record_count,
184-
run: s.run_record_count,
185-
errors: s.errors ?? [],
186-
}
187-
if (entry.errors.length > 0) errorsCount++
188-
if (s.status === 'complete') {
189-
completeStreams.push(entry)
190-
} else {
191-
startedStreams.push(entry)
192-
}
193-
}
194-
195-
// Sort by cumulative record count descending
196-
completeStreams.sort((a, b) => b.cumulative - a.cumulative)
197-
startedStreams.sort((a, b) => b.cumulative - a.cumulative)
198-
199-
const maxNameLen = Math.max(...Object.keys(sp).map((n) => n.length), 10)
200-
201-
function formatStreamLine(entry: StreamEntry): string[] {
202-
const result: string[] = []
203-
const countStr =
204-
entry.cumulative > 0
205-
? `${formatNumber(entry.cumulative).padStart(10)}${entry.run > 0 ? ` (+${formatNumber(entry.run)})` : ''}`
206-
: ''
207-
result.push(` ${entry.name.padEnd(maxNameLen)} ${countStr}`)
208-
for (const err of entry.errors) {
209-
const errEmoji = ERROR_EMOJI[err.failure_type ?? 'system_error'] ?? '❌'
210-
result.push(
211-
` ${errEmoji} ${err.message}${err.failure_type ? ` (${err.failure_type})` : ''}`
212-
)
213-
}
214-
return result
215-
}
216-
217-
if (completeStreams.length > 0) {
218-
lines.push('')
219-
lines.push(` Complete (${completeStreams.length}):`)
220-
for (const s of completeStreams) {
221-
lines.push(...formatStreamLine(s))
222-
}
223-
}
224-
225-
if (startedStreams.length > 0) {
226-
lines.push('')
227-
lines.push(` Started (${startedStreams.length}):`)
228-
for (const s of startedStreams) {
229-
lines.push(...formatStreamLine(s))
230-
}
231-
}
232-
233-
// Summary line
234-
lines.push('')
235-
const parts: string[] = []
236-
if (completeStreams.length) parts.push(`${completeStreams.length} complete`)
237-
if (startedStreams.length) parts.push(`${startedStreams.length} started`)
238-
if (errorsCount) parts.push(`${errorsCount} streams with errors`)
239-
parts.push(`+${formatNumber(runRecords)} records this run`)
240-
lines.push(` 📊 ${parts.join(', ')}`)
241-
}
242-
243-
return lines.join('\n')
124+
return renderSyncProgress(eof, [], true).join('\n')
244125
}
245126

246127
/**

apps/engine/src/cli/sync.ts

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { Engine } from '../lib/engine.js'
33
import type { ConnectorResolver } from '../lib/index.js'
44
import { readonlyStateStore, type StateStore } from '../lib/state-store.js'
55
import { type PipelineConfig, type SyncState, emptySyncState } from '@stripe/sync-protocol'
6+
import { createSyncDisplayState, renderSyncProgress } from '../lib/sync-progress-state.js'
67

78
export function createSyncCmd(engine: Engine, _resolver: ConnectorResolver) {
89
return defineCommand({
@@ -46,6 +47,15 @@ export function createSyncCmd(engine: Engine, _resolver: ConnectorResolver) {
4647
type: 'string',
4748
description: 'Stop after N seconds',
4849
},
50+
baseUrl: {
51+
type: 'string',
52+
description: 'Stripe API base URL (or STRIPE_API_BASE env, default: https://api.stripe.com)',
53+
},
54+
progress: {
55+
type: 'boolean',
56+
default: false,
57+
description: 'Force progress display (auto-enabled when stderr is a TTY)',
58+
},
4959
live: {
5060
type: 'boolean',
5161
default: false,
@@ -84,6 +94,10 @@ export function createSyncCmd(engine: Engine, _resolver: ConnectorResolver) {
8494

8595
// Inject optional source config overrides
8696
const stripeConfig = pipeline.source.stripe as Record<string, unknown>
97+
const baseUrl = args.baseUrl || process.env.STRIPE_API_BASE
98+
if (baseUrl) {
99+
stripeConfig.base_url = baseUrl
100+
}
87101
if (backfillLimit) {
88102
stripeConfig.backfill_limit = backfillLimit
89103
}
@@ -101,7 +115,10 @@ export function createSyncCmd(engine: Engine, _resolver: ConnectorResolver) {
101115
: undefined
102116
const output = engine.pipeline_sync(pipeline, { state: syncState, time_limit: timeLimit })
103117

104-
// Persist state checkpoints and stream NDJSON to stdout
118+
const showProgress = args.progress || process.stderr.isTTY
119+
const display = showProgress ? createSyncDisplayState() : null
120+
let linesPrinted = 0
121+
105122
for await (const msg of output) {
106123
if (msg.type === 'source_state') {
107124
if (msg.source_state.state_type === 'global') {
@@ -110,7 +127,19 @@ export function createSyncCmd(engine: Engine, _resolver: ConnectorResolver) {
110127
await store.set(msg.source_state.stream, msg.source_state.data)
111128
}
112129
}
113-
process.stdout.write(JSON.stringify(msg) + '\n')
130+
131+
if (display) {
132+
const changed = display.update(msg)
133+
if (changed) {
134+
if (linesPrinted > 0) process.stderr.write(`\x1b[${linesPrinted}A\x1b[0J`)
135+
const final = msg.type === 'eof'
136+
const lines = renderSyncProgress(display.state.eof, display.state.catalog, final)
137+
for (const line of lines) process.stderr.write(line + '\n')
138+
linesPrinted = lines.length
139+
}
140+
} else {
141+
process.stdout.write(JSON.stringify(msg) + '\n')
142+
}
114143
}
115144

116145
if ('close' in store && typeof store.close === 'function') {

apps/engine/src/lib/progress.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,11 @@ export function trackProgress(opts: {
337337
return
338338
}
339339

340+
// Suppress upstream stream_status traces — the engine re-emits enriched versions
341+
if (msg.type === 'trace' && msg.trace.trace_type === 'stream_status') {
342+
continue
343+
}
344+
340345
yield msg
341346
}
342347
}

0 commit comments

Comments
 (0)