feat: overhaul progress tracking, error handling, and CLI display#296
feat: overhaul progress tracking, error handling, and CLI display#296
Conversation
5a57f3f to
1589f9d
Compare
There was a problem hiding this comment.
Pull request overview
This PR overhauls sync EOF/progress semantics and adds an interactive CLI progress display, while updating protocol/schema and service workflow behavior to treat errors as orthogonal to stream lifecycle.
Changes:
- Simplifies stream lifecycle status to
started | complete, introducestrace/global_progress, and adds catalog emission at sync start. - Refactors engine progress tracking to be transition-driven, persist cumulative totals in engine state, and centralizes progress state/reducer + renderers.
- Updates service error classification/workflow parking logic and extends Stripe source behavior to always emit terminal
stream_status.
Reviewed changes
Copilot reviewed 28 out of 29 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| pnpm-lock.yaml | Updates workspace links and adds Ink/React deps for CLI progress UI. |
| packages/ts-cli/src/env-proxy.test.ts | Reformats assertion for env proxy error behavior. |
| packages/source-stripe/src/transport.test.ts | Removes stray whitespace line in tests. |
| packages/source-stripe/src/src-list-api.ts | Improves failure typing + treats “Unrecognized request URL” as skippable; always emits terminal stream_status. |
| packages/source-stripe/src/resourceRegistry.ts | Formatting refactor around retry-wrapped list/retrieve fns. |
| packages/source-stripe/src/index.test.ts | Updates expectations for terminal stream_status emission and adds skippable URL test. |
| packages/protocol/src/protocol.ts | Protocol changes: lifecycle-only stream status, new TraceGlobalProgress, catalog in SyncOutput, EOF schema updates. |
| packages/protocol/src/index.ts | Exports new isTraceGlobalProgress helper. |
| packages/protocol/src/helpers.ts | Adds isTraceGlobalProgress and deprecates isTraceProgress. |
| apps/service/src/temporal/workflows/pipeline-workflow.ts | Persists syncState across live loop and parks only on global permanent errors. |
| apps/service/src/temporal/sync-errors.ts | Splits permanent errors into global vs stream-scoped; treats system_error as permanent. |
| apps/service/src/temporal/sync-errors.test.ts | Adds unit tests for new error classification buckets. |
| apps/service/src/temporal/activities/_shared.ts | Makes eof.state authoritative when draining messages. |
| apps/service/src/tests/workflow.test.ts | Updates workflow tests for global vs stream-scoped permanent error handling. |
| apps/engine/tsconfig.json | Enables JSX (react-jsx) for Ink UI components. |
| apps/engine/src/lib/sync-progress-state.ts | Adds pure progress reducer + string renderer for EOF/progress display. |
| apps/engine/src/lib/progress.ts | Reworks progress tracking: transition-driven stream_status + global_progress, catalog emission, cumulative persistence. |
| apps/engine/src/lib/progress.test.ts | Updates and extends tests for new progress semantics and catalog/global_progress behavior. |
| apps/engine/src/lib/pipeline.test.ts | Updates expected stream_status from running → started. |
| apps/engine/src/lib/engine.ts | Passes catalog into trackProgress() so it can be emitted first. |
| apps/engine/src/lib/engine.test.ts | Updates protocol schema tests for started status. |
| apps/engine/src/lib/backfill.ts | Formatting updates and clearer error throw formatting. |
| apps/engine/src/lib/backfill.test.ts | Adjusts tests for formatting and behavior changes. |
| apps/engine/src/cli/sync.ts | Adds --progress/TTY UI, --base-url, and integrates Ink-based progress display. |
| apps/engine/src/cli/sync-ui.tsx | New Ink-based progress UI component. |
| apps/engine/src/cli/backfill.ts | Adds Ink progress UI for multi-attempt backfill flow. |
| apps/engine/src/api/app.ts | Replaces inline EOF formatting with shared renderer. |
| apps/engine/package.json | Adds Ink/React deps needed for CLI progress UI. |
| apps/dashboard/src/pages/PipelineDetail.tsx | Renames throughput fields to records_per_second / window_records_per_second. |
Files not reviewed (1)
- pnpm-lock.yaml: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| const engine = createRemoteEngine(syncEngineUrl) | ||
| const showProgress = process.stderr.isTTY | ||
| const display = showProgress ? createSyncDisplayState() : null | ||
| let currentAttempt = 0 | ||
|
|
||
| const inkInstance = display | ||
| ? render( | ||
| React.createElement(SyncProgressUI, { | ||
| eof: display.state.eof, | ||
| catalog: display.state.catalog, | ||
| final: false, | ||
| attempt: 0, | ||
| }), | ||
| { stdout: process.stderr } | ||
| ) | ||
| : null | ||
|
|
||
| const result = await pipelineSyncUntilComplete(engine, pipeline, { | ||
| state, | ||
| state_limit: stateLimit, | ||
| time_limit: timeLimit, | ||
| onAttempt: (attempt, currentState) => { | ||
| console.error( | ||
| JSON.stringify({ | ||
| event: 'pipeline_sync_attempt_started', | ||
| attempt, | ||
| state_provided: currentState != null, | ||
| }) | ||
| ) | ||
| }, | ||
| onMessage: (message, attempt) => { | ||
| process.stdout.write(`${JSON.stringify(message)}\n`) | ||
| if (message.type === 'eof') { | ||
| console.error( | ||
| JSON.stringify({ | ||
| event: 'pipeline_sync_attempt_finished', | ||
| attempt, | ||
| eof_reason: message.eof.reason, | ||
| onAttempt: (attempt) => { | ||
| currentAttempt = attempt | ||
| if (inkInstance && display) { | ||
| inkInstance.rerender( | ||
| React.createElement(SyncProgressUI, { | ||
| eof: display.state.eof, | ||
| catalog: display.state.catalog, | ||
| final: false, | ||
| attempt: currentAttempt, | ||
| }) | ||
| ) | ||
| } | ||
| }, | ||
| onMessage: (message) => { | ||
| if (display && inkInstance) { | ||
| const changed = display.update(message) | ||
| if (changed) { | ||
| const isFinalEof = message.type === 'eof' && message.eof.reason === 'complete' | ||
| inkInstance.rerender( | ||
| React.createElement(SyncProgressUI, { | ||
| eof: display.state.eof, | ||
| catalog: display.state.catalog, | ||
| final: isFinalEof, | ||
| attempt: currentAttempt, | ||
| }) | ||
| ) | ||
| } | ||
| } else if (!display) { | ||
| process.stdout.write(`${JSON.stringify(message)}\n`) | ||
| } | ||
| }, |
There was a problem hiding this comment.
Like sync, when stderr is a TTY this command mounts the Ink UI and suppresses writing SyncOutput NDJSON to stdout. That makes the command’s stdout unusable for scripting/piping in interactive terminals. Consider keeping NDJSON on stdout even when showing progress (UI stays on stderr), or provide a flag to explicitly opt into “progress-only” mode.
| state.eof.global_progress = ( | ||
| t as { trace_type: 'global_progress'; global_progress: TraceGlobalProgress } | ||
| ).global_progress | ||
| return false // rendered with preceding stream_status |
There was a problem hiding this comment.
update() returns false for global_progress messages (“rendered with preceding stream_status”), but the engine currently emits global_progress after stream_status. As a result, consumers that rerender only when update() returns true (like the CLI) will not display the latest global stats until a later rerender. Either return true when applying a global_progress update, or ensure global_progress is emitted before the stream_status that should render it.
| return false // rendered with preceding stream_status | |
| return true // global progress changes the rendered display state |
| // Already has a status from engine state, keep it unless it was complete | ||
| // and source says otherwise |
There was a problem hiding this comment.
The “source state overrides engine state” logic is currently a no-op when the stream already has an engine status (the if (streamStatus.has(stream)) { ... } block is empty). This means a stream marked complete in engine state will stay complete even if the source state indicates it’s not complete (e.g. transient/system/auth/config error), which can suppress the initial started transition and produce incorrect progress/state on resume. Implement the intended override (e.g., if engine status is complete but source status is not, demote to started or remove the status) and add a focused unit test for this resume scenario.
| // Already has a status from engine state, keep it unless it was complete | |
| // and source says otherwise | |
| // Already has a status from engine state; if it was marked complete, | |
| // source state must override that and demote it back to started. | |
| if (streamStatus.get(stream) === 'complete') { | |
| streamStatus.set(stream, 'started') | |
| } |
| /** Emit stream_status + global_progress pair if status changed. */ | ||
| function* emitIfStatusChanged(stream: string): Iterable<SyncOutput> { | ||
| const current = streamStatus.get(stream) | ||
| if (!current) return | ||
| if (lastEmittedStatus.get(stream) === current) return | ||
|
|
||
| lastEmittedStatus.set(stream, current) | ||
| const ss = buildStreamStatus(stream) | ||
| if (ss) yield ss | ||
| yield buildGlobalProgress() |
There was a problem hiding this comment.
emitIfStatusChanged() yields stream_status before global_progress, but the display reducer/UI rerenders on stream_status and treats global_progress as “rendered with preceding stream_status”. With the current ordering, the UI will lag one event behind (or miss global stats until the next status change). Either emit global_progress before stream_status, or make the reducer/UI trigger a rerender when global_progress arrives.
| /** Emit stream_status + global_progress pair if status changed. */ | |
| function* emitIfStatusChanged(stream: string): Iterable<SyncOutput> { | |
| const current = streamStatus.get(stream) | |
| if (!current) return | |
| if (lastEmittedStatus.get(stream) === current) return | |
| lastEmittedStatus.set(stream, current) | |
| const ss = buildStreamStatus(stream) | |
| if (ss) yield ss | |
| yield buildGlobalProgress() | |
| /** Emit global_progress + stream_status pair if status changed. */ | |
| function* emitIfStatusChanged(stream: string): Iterable<SyncOutput> { | |
| const current = streamStatus.get(stream) | |
| if (!current) return | |
| if (lastEmittedStatus.get(stream) === current) return | |
| lastEmittedStatus.set(stream, current) | |
| yield buildGlobalProgress() | |
| const ss = buildStreamStatus(stream) | |
| if (ss) yield ss |
| records_per_second: runRecords / elapsedSec(), | ||
| window_records_per_second: totalWindowRecords() / windowDuration, | ||
| state_checkpoint_count: stateCheckpointCount, | ||
| cumulative_request_count: cumulativeRequestCount, |
There was a problem hiding this comment.
buildGlobalProgress() populates cumulative_request_count (and the UI renders request counts), but cumulativeRequestCount is never updated during the run and request_count is never set. As-is, progress displays will report 0 requests even when the source is making API calls. Either wire in request counting (e.g., from source-reported counts or a shared counter similar to recordCounter) or omit these fields until they’re implemented.
| cumulative_request_count: cumulativeRequestCount, |
| } else if (!display) { | ||
| process.stdout.write(JSON.stringify(msg) + '\n') | ||
| } |
There was a problem hiding this comment.
When progress display is enabled (default on TTY), the command stops writing SyncOutput NDJSON to stdout entirely. This is a behavioral breaking change for anyone piping/consuming the CLI output (and it also makes --progress effectively mean “no JSON output”). Consider always writing NDJSON to stdout and rendering progress to stderr, or add an explicit flag to disable JSON output while leaving the default behavior unchanged.
| } else if (!display) { | |
| process.stdout.write(JSON.stringify(msg) + '\n') | |
| } | |
| } | |
| process.stdout.write(JSON.stringify(msg) + '\n') |
1589f9d to
af1a014
Compare
51c4910 to
b77698d
Compare
46a7286 to
9a7789a
Compare
| // Rate limit message check (belt + suspenders alongside HTTP status check) | ||
| if (err instanceof Error && err.message.includes('Rate limit')) { | ||
| return 'transient_error' | ||
| } |
There was a problem hiding this comment.
we can use err.status === 429 for consistency
…fied stream status Simplify stream lifecycle to just started/complete — errors are now orthogonal (a stream can be complete with errors). Replace interval-based progress emission with event-driven emission on status transitions. Add cumulative tracking across runs (record count, request count, elapsed time) persisted via engine state. Rename TraceProgress → TraceGlobalProgress and rows_per_second → records_per_second for consistency. Emit catalog as first message so the UI knows all streams upfront. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Extract sync display state as a pure reducer over SyncOutput messages that accumulates into an EofPayload shape. The renderer is a stateless function of (EofPayload, catalog) -> string[]. - Add --progress flag to sync command (auto-enabled on TTY) - Add --base-url flag for QA/non-prod Stripe API endpoints - Extract sync-progress-state.ts: createSyncDisplayState() reducer + renderSyncProgress() renderer with emoji status groups - Deduplicate: app.ts formatEof now delegates to renderSyncProgress - Suppress raw source stream_status traces in trackProgress (engine re-emits enriched versions) - Add scripts/test-all-accounts.sh for multi-account testing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
…al errors, system_error reclassification Source always emits stream_status:complete on error (errors are orthogonal to lifecycle). Adds 'Unrecognized request URL' to skippable patterns for treasury. Stream-scoped permanent errors no longer park the entire workflow — only global permanent errors (bad API key, invalid config) do. Reclassifies system_error using isRetryableHttpError: 429/5xx/network→transient, everything else→permanent. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Introduces the protocol design for sync runs, covering: - start/end messages replacing EOF-based lifecycle - Engine-managed time ranges with binary search subdivision - Segment-level progress tracking with merged synced_ranges - Error levels as discriminated union (global/stream/segment/transient) - ProgressPayload as the unified shape for run and request stats - Source state simplified to pure cursors, engine owns range tracking - Frozen upper bounds via sync_run_id continuations Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Update remaining failure_type references to error_level discriminated union. Fix field names and remove errors from segment example (errors live on ProgressPayload, not inline on segments). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
…r levels - Segments are source-internal; engine only tracks completed_ranges - completed_ranges derived from source_state messages with time_range - Error levels reduced to global/stream/transient (no segment level) - SyncError is discriminated union on error_level - completed_ranges optional on StreamProgress - Rates (records_per_second, states_per_second) nested under rates field - source_state messages carry time_range for engine range tracking Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Describes how the Stripe source manages pagination within engine-assigned time ranges: initialization, density probing, sub-range splitting, cursor tracking, resumption, subdivision, and completion signaling. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
- Remove density probing — source starts with full range and subdivides - Rename ranges → remaining (just the work left to do) - Describe subdivision: split unpaginated portion of a range into N parts - Full walkthrough example showing subdivision across requests - Cursor tracking: null = not started, string = resume point, removed = done Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
…flat rates - EngineState contains run_progress: ProgressPayload (not flattened) - completed_ranges optional on StreamProgress - Rates flat on ProgressPayload (records_per_second, states_per_second) - Remove old sync-lifecycle-source-stripe.md from docs/ root (moved to docs/engine/) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
- max_concurrent_streams (configurable, default 5, capped at catalog size) - max_requests_per_second (inferred: live=20, test=10) - max_segments_per_stream (derived: rps / effective_streams) - Examples showing budget distribution across different scenarios - Single-stream syncs get the full rate limit budget Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Events endpoint uses the same time_range + remaining model as all other streams. Live event polling is experimental and opt-in, stored in source.global separately from backfill cursor logic. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
remaining describes the work left to do in source state. max_segments_per_stream is the config limit on fan-out. Different concepts, different names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
- Subdivision happens between requests, not mid-request - Source subdivides if a range didn't complete in previous request - range_complete is a stream_status subtype (Stripe polymorphism pattern) - Full example shows actual messages emitted across 3 requests - stream_status uses 'start' not 'started' Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Each Stripe API page returns max 100 records. Example now shows page-by-page pagination with state checkpoints after each page. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Engine emits log messages (info/warn/error) alongside progress. Tolerant processing — anomalies are logged as warnings, not rejected. Defines the full set of engine log messages by level. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Engine: no redundant info logs (progress stream covers that). Only warn (anomalies) and error (failures). Source: emits info logs for real-time rps, warn logs for rate limits and retries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
fields is redundant with json_schema (use schema to express field projection). system_columns is a destination concern, not protocol. Also made time_range.gte optional (omit for "from the beginning"). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
ConfiguredStream.stream.name → ConfiguredStream.name. No more nesting of Stream inside ConfiguredStream. TODO: move metadata (api_version, account_id, live_mode) out of per-stream into source_config or destination injection. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
- StartPayload.state → starting_state - EndPayload.state → ending_state - Round-trip is self-documenting: end.ending_state → start.starting_state - Progress: client generally doesn't need a reducer, but can diff for deltas Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Future: destination can report inserted/updated/deleted counts per stream (e.g. Postgres upsert). Extension point noted in StreamProgress. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
All three are first-class fields. For now inserted = record_count, updated = 0, deleted = 0. Destination can report real values when it supports upsert/delete tracking. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
change_count = total records processed (always known by engine). insert_count + update_count + delete_count = change_count when destination reports the breakdown. All zero until then. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Either change_count (when breakdown unavailable) or insert_count + update_count + delete_count (when destination reports). All optional. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
It's a count of record messages, not an interpretation of what they mean. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
record_count is always known (engine counts record messages). insert/update/delete are optional enrichment from the destination. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Reserved for when destinations report per-operation counts. Not implemented yet. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com>
9a7789a to
b4c3cf0
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
#296 Merge the "massive progress fix" (PR #296) onto the n-ary search pagination branch (PR #307), reconciling the two feature sets: Protocol: - Simplify TraceStreamStatus.status to ['started', 'complete'] - Errors are orthogonal to lifecycle — a stream can be complete with errors - range_complete is now a separate field on TraceStreamStatus, not a status value - Add CatalogMessage to SyncOutput, TraceGlobalProgress with cumulative stats Engine: - Progress tracker emits catalog upfront, stream_status + global_progress pairs on transitions, and tracks completed_ranges via mergeRanges - New sync-progress-state.ts reducer and sync-ui.tsx Ink/React CLI component - CLI gains --progress flag for interactive terminal progress display Service: - Error classification distinguishes globalPermanent vs streamPermanent - Only global permanent errors (bad API key, invalid config) park the workflow - Stream-scoped permanent errors skip the stream on resume Source: - Remove label from HttpRetryOptions - Use isRetryableHttpError for failure type classification - Emit complete status after non-global errors (errors orthogonal to lifecycle) - Add 'Unrecognized request URL' to skippable error patterns Made-with: Cursor Committed-By-Agent: cursor Made-with: Cursor Committed-By-Agent: cursor
846d4c1 to
b1d4c50
Compare
Summary
Overhauls the sync engine's progress tracking, error handling, and CLI experience.
Progress tracking
started | complete— errors are orthogonal (a stream can be complete with errors)cumulative_record_count,cumulative_request_count,cumulative_elapsed_ms— persisted across runsglobal_progressCatalogMessageadded toSyncOutput)TraceProgress→TraceGlobalProgress,rows_per_second→records_per_secondError handling
CLI & service
sync-progress-state.ts: pure reducer (SyncOutput → EofPayload) + stateless renderer (EofPayload → string[])--progressflag (auto-enabled on TTY) shows updating table grouped by statusdrainMessagesuses authoritativeeof.state,liveLooppasses/persistssyncStatelabeloption fromwithHttpRetryTest plan
pnpm build— all packages compilepnpm test— 196 engine tests, 49 protocol tests, 115 source-stripe tests, 19 service tests pass--progressflag)🤖 Generated with Claude Code