Skip to content

feat: adaptive binary subdivision with remaining-based state, per-stream error resilience, and pure progress reducers#307

Closed
tonyxiao wants to merge 297 commits intomainfrom
tx/nary-search-pagination
Closed

feat: adaptive binary subdivision with remaining-based state, per-stream error resilience, and pure progress reducers#307
tonyxiao wants to merge 297 commits intomainfrom
tx/nary-search-pagination

Conversation

@tonyxiao
Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao commented Apr 17, 2026

Summary

Replaces the fixed-segment backfill algorithm with an adaptive binary subdivision strategy that recursively splits time ranges until each completes in a single API page. This is a ground-up rewrite of the pagination, state, progress, and error-handling layers across the stack (155 commits, 150 files, +11k/-6.4k lines).

Core changes

  • New pagination algorithm — Instead of probing density and pre-computing fixed segments, the source starts with one range spanning the full time window, fetches one page, then subdivides based on the last observed created timestamp. Ranges are processed with bounded concurrency until remaining: [].
  • Simplified state modelBackfillState/SegmentState replaced by { remaining: RemainingRange[] }. Done-ness is remaining: []. No error status stored in state. Range reconciliation handles catalog time_range changes between runs.
  • Protocol overhaulTraceMessage deleted; replaced by top-level StreamStatusMessage (discriminated union: start/running/complete/range_complete/error/skip) and ProgressMessage. EOF simplified to { has_more, ending_state, run_progress, request_progress }.
  • Pure progress reducer — Engine progress tracking rewritten as a pure (ProgressPayload, Message) → ProgressPayload reducer with per-stream record/state counts, completed time ranges, and derived rates.
  • Stale-write prevention — New newerThanColumn upsert option prevents out-of-order writes. upsertWithStats returns created/updated/deleted/skipped counts.
  • Per-stream error resilience — Failed streams emit stream_status: error and are skipped for the rest of the run instead of aborting the entire sync.
  • Service workflow split — Single pipeline-workflow replaced by pipeline-lifecycle (setup → backfill → live → teardown) + child pipeline-backfill (loops paginated runs with continueAsNew).
  • Concurrency tuningmax_concurrent_streams (default 5) with max_requests_per_second derived from API key mode (live=20, test=10).

Supporting changes

  • Breadth-first priming pass (all streams get one page before bounded scheduling)
  • Retry-After header respected on 429s
  • account_created resolved alongside account_id in setup
  • Ink-based terminal progress display with time-range progress bars
  • New hourly prod e2e workflow with Sigma reconciliation
  • Extensive new documentation (binary subdivision algorithm, sync lifecycle, debugging guide)

Motivation

Syncs were failing and we couldn't tell why. The old algorithm was a black box — when a sync stalled or errored, there was no way to see which stream failed, where in its time range it got stuck, or whether the issue was transient. The TraceMessage system was an afterthought bolted onto the side; it didn't carry enough structure to answer "what went wrong and where."

This PR rewrites both the core algorithm (for performance and correctness) and the protocol (for observability):

Why the algorithm had to change

  1. Silent failures — The fixed-segment approach would probe density, build segments, then paginate them in parallel. If a segment failed mid-pagination, the error was swallowed into an opaque error_status field in state. On the next run, the segment would retry from scratch with no indication of what happened.
  2. No progress visibility — There was no way to see how far through a stream's time range we'd gotten. The only signal was "done" or "not done."
  3. Wasted work on skewed data — Density probing spent one API call per stream to guess segment count, but couldn't handle real-world distributions where 90% of data is in 10% of the time range. Segments were computed once and never adapted.
  4. Cascading failures — One stream error killed the entire sync. One 403 from a single endpoint brought down a 50-stream backfill.
  5. Unresumable after errors — When a segment failed, its cursor and progress were lost. The next run would re-probe and rebuild segments from scratch, repeating work that had already succeeded.

What we replaced it with

  • Binary subdivision — Start with one range, fetch one page, split based on actual data. The algorithm converges on where data lives instead of guessing. Failed ranges stay in remaining with their cursor — retries resume exactly where they left off.
  • remaining: [] = done — State is trivially inspectable. You can look at any stream's state and immediately see how much work is left and where.
  • StreamStatusMessage — A structured, top-level message type with discriminated variants (start, running, complete, range_complete, error, skip). Every lifecycle transition is observable. range_complete carries the exact { gte, lt } that finished.
  • ProgressPayload — Per-stream record counts, state counts, completed time ranges, and derived rates. Emitted on every meaningful event. The Ink-based CLI renders this as a live dashboard.
  • Per-stream error isolation — A failed stream emits stream_status: error and is skipped. The other 49 streams continue. The error message appears in the progress display in real time.

The net result: when a sync fails now, you can see exactly which stream, which time range, what error, and how far it got — without touching logs.

Package-by-package changes

packages/protocol — The foundation everything else builds on

  • TraceMessage and all subtypes deleted entirely
  • Replaced by StreamStatusMessage (discriminated union: start | running | complete | range_complete | error | skip) and ProgressMessage
  • SyncState redesigned: old 3-section model (source/destination/engine) → new source: SourceState, destination: Record<string, unknown>, sync_run: SyncRunState (carries sync_run_id, time_ceiling, progress)
  • EofPayload: old reason enum → new { has_more, ending_state, run_progress, request_progress }
  • ConfiguredStream gains supports_time_range and time_range: { gte, lt }
  • Stream gains newer_than_field and soft_delete_field
  • New utils/binary-subdivision.ts: pure subdivideRanges() + async streamingSubdivide() with bounded concurrency
  • New utils/async-iterable.ts: mergeAsync() added; split()/channel() removed
  • coerceSyncState()parseSyncState() (validates against connector spec schema)
  • New createSourceMessageFactory() and createEngineMessageFactory() typed envelope constructors

packages/source-stripe — The biggest behavioral change

  • Deleted: compactState, expandState, probeAndBuildSegments, segmentCountFromDensity, buildSegments, splitRange, sequentialBackfillStream, paginateSegment, getFailureType, errorToTrace
  • Added: reconcileRanges() — handles catalog time_range changes between runs (trims/drops/adds ranges)
  • Added: paginateRange() — fetches one page from a RemainingRange, returns after one page for created-filter streams so subdivision can happen
  • Added: iterateStream() — per-stream driver: loops remaining ranges, dispatches via mergeAsync, calls nextStep() to subdivide after each round
  • Added: Breadth-first priming pass + next-page prefetch for sequential streams
  • State types: SegmentState/BackfillStateRemainingRange (ISO gte/lt + cursor). remaining: [] = done.
  • Events cursor moved from per-stream to global state
  • Error reporting: trace: errorstream_status: error; checks structured StripeApiRequestError body
  • New: account-metadata.ts, retry.ts (getRetryAfterMs())
  • Config: rate_limit auto-derived from key mode (live=20, test=10), max_concurrent_streams configurable

apps/engine — Sync loop + progress rewrite

  • pipeline_sync rewritten: No more split() forking. Destination is sole consumer (pull-based backpressure). Engine iterates destination output through two pure reducers: stateReducer and progressReducer.
  • New state-reducer.ts: Pure (SyncState, StateEvent) → SyncState
  • New progress/ module:
    • reducer.ts — pure (ProgressPayload, Message) → ProgressPayload
    • ranges.tsmergeRanges() for coalescing time ranges
    • format.tsx — Ink-based terminal display
  • withTimeRanges() — injects time_range.lt from time_ceiling into catalog
  • resolvePipeline() — consolidates connector resolution, spec, catalog, state normalization
  • Simplified takeLimits: EOF is { has_more: boolean }
  • Removed: withLoggedStream, engineLogContext, old lib/backfill.ts, old lib/progress.ts, cli/backfill.ts
  • New: cli/sync.tsx (Ink-based), --plain flag

packages/util-postgres — Upsert overhaul

  • Renamed: keyColumnsprimaryKeyColumns, noDiffColumnsvolatileColumns, mustMatchColumnsguardColumns
  • New newerThanColumn: WHERE EXCLUDED.col > tbl.col — prevents stale writes
  • New upsertWithStats(): Returns created/updated/deleted/skipped counts via RETURNING (xmax = 0)
  • Better error wrapping with table/column/pk context

packages/destination-postgres — Error resilience + stale-write integration

  • upsertMany accepts newerThanField, forwarded to new upsert option
  • Per-stream error handling: Failed streams → stream_status: error + skipped (not aborted)
  • Records yielded back (pass-through) for downstream counting
  • Top-level catch with trace error removed

packages/openapi — Minor

  • retry-after added to DEBUG_HEADERS
  • package.json files expanded for .ts source resolution

apps/service — Workflow architecture overhaul

  • Workflow split: pipeline-workflow.tspipeline-lifecycle.ts (setup → backfill → live → teardown) + pipeline-backfill.ts (child workflow, loops until has_more=false, continueAsNew after 200 iterations)
  • Activities simplified: RunResult, mergeStateMessage, classifySyncErrors deleted. pipelineSync returns { eof: EofPayload }.
  • sync-errors.ts deleted entirely — engine handles error classification
  • API: Soft-delete pipelines, pausedSignal for pause/resume
  • Schema: SyncState removed from OpenAPI, progressProgressPayload

e2e/ — Test adaptations

  • All assertions updated for new eof shape (has_more + run_progress/request_progress)
  • State assertions migrated from BackfillState to remaining[]
  • Network/disconnect tests updated for new progress format

Root / Config / CI

  • New CI: .github/workflows/prod-e2e-test.yml — hourly Sigma reconciliation
  • Removed injectWorkspacePackages: true from pnpm-workspace.yaml
  • New scripts: bench-subdivision.sh, check-sync-efficiency.ts, reconcile-sigma-vs-postgres.ts, test-all-accounts.sh
  • New docs: binary subdivision algorithm + diagrams, sync lifecycle guides, debugging guide

Test plan

  • pnpm build — all packages compile cleanly
  • pnpm lint + pnpm format — no violations
  • source-stripe unit tests: 101 passed (reconcileRanges, subdivision, error handling, concurrency)
  • protocol unit tests: 50 passed (binary-subdivision, message factories, state parsing)
  • engine unit tests: 211 passed (state reducer, progress reducer, pipeline, format)
  • destination-postgres: stale-write prevention + per-stream error tests
  • util-postgres: upsertWithStats counting + newerThanColumn tests
  • scripts/test-all-accounts.sh — full sync against 9 Stripe accounts (QA + prod, small → large)
  • scripts/test-all-accounts.sh --verify — Sigma reconciliation confirms every synced row matches Stripe's source of truth
  • Hourly prod e2e via .github/workflows/prod-e2e-test.yml

🤖 Generated with Claude Code

File-by-file breakdown

apps/engine (+3,915 / -2,571 · net +1,344)

Rewritten sync loop with pure state and progress reducers, new Ink-based terminal UI, and split CLI binaries. The old split()-based forking, backfill scheduling, and text progress are deleted.

File +/- Purpose
src/__generated__/openapi.json +1063/-560 Regenerated spec (new progress/eof schemas)
src/lib/progress/reducer.test.ts +516 Comprehensive progress reducer tests
src/lib/engine.test.ts +383/-147 Updated for new state/progress/eof shapes
src/cli/sync.tsx +224 Ink-based sync command with live terminal progress
src/lib/progress/format.tsx +224 Ink-based terminal progress display
src/lib/engine.ts +159/-223 Rewritten sync loop: pure reducers, no split()
src/lib/progress/reducer.ts +159 Pure (ProgressPayload, Message) → ProgressPayload reducer
src/lib/state-reducer.test.ts +164 State reducer tests (initialize, accumulate)
src/api/helpers.ts +139 Extracted API route helpers (pipeline read/write/sync)
src/lib/progress/format.test.tsx +137 Ink format component tests (inline snapshots)
src/lib/state-reducer.ts +80 Pure (SyncState, Event) → SyncState reducer
src/cli/command.ts +75/-44 Rewritten CLI command registration
src/cli/subprocess.ts +72 Subprocess management for sync worker
src/cli/source-config-cache.test.ts +60 Tests for source config caching
src/__tests__/bin-serve.test.ts +55 Tests for the new serve binary entry point
src/api/server.ts +53 Extracted HTTP server setup from app.ts
src/cli/source-config-cache.ts +49 Cache resolved source config across sync runs
src/api/index.test.ts +49 Tests for extracted API module
src/lib/pipeline.test.ts +38/-101 Simplified for has_more EOF
src/lib/progress/ranges.test.ts +35 mergeRanges tests
src/lib/pipeline.ts +30/-36 Generic message types, simplified takeLimits
src/lib/progress/ranges.ts +20 Coalesce overlapping time ranges
src/bin/serve.ts +17 Serve binary entry point
src/api/app.test.ts +15/-30 Update API tests for simplified routes
src/__tests__/docker.test.ts +13/-3 Update for new CLI structure
src/__generated__/openapi.d.ts +12/-3 Regenerated types
src/api/app.ts +12/-233 Extract helpers and server setup, slim down app
package.json +12/-8 Add @types/react, ink dependencies for TUI progress
src/bin/sync-engine.ts +9 Sync CLI binary entry point
src/lib/source-test.ts +9/-11 Update source test helper
src/lib/remote-engine.test.ts +7/-7 Update for new message types
src/bin/bootstrap.ts +6 Shared bootstrap (env, proxy) for CLI binaries
src/lib/source-exec.ts +4/-4 Use new stream_status messages
src/lib/progress/index.ts +4 Progress module barrel export
src/index.ts +3/-2 Export updates
src/api/index.ts +3/-55 Slim re-export after helper extraction
tsconfig.json +2/-1 Enable jsx: react-jsx for Ink components
src/lib/createSchemas.ts +1/-1 Minor type fix
src/lib/destination-test.ts +1/-3 Simplify destination test helper
src/__tests__/openapi.test.ts +1/-1 Trivial assertion update
src/lib/progress.test.ts -351 Old progress tests (replaced by progress/)
src/lib/progress.ts -294 Old progress tracking (replaced by progress/)
src/cli/sync.ts -130 Old text-based sync command
src/cli/backfill.ts -114 Old backfill CLI command (replaced by sync.tsx)
src/lib/backfill.test.ts -94 Old backfill scheduling tests
src/lib/backfill.ts -72 Old backfill scheduling logic
src/serve-command.ts -34 Old serve command (moved to bin/serve.ts)
src/cli/index.ts -9 Old CLI barrel export

docs (+1,971 / -89 · net +1,882)

New algorithm design docs, sync lifecycle guides, state-flow diagrams, plans, and a debugging guide. Old state-flow diagrams relocated.

File +/- Purpose
engine-refactor/sync-lifecycle.md +443 Full sync lifecycle documentation
engine-refactor/sync-lifecycle-source-stripe.md +401 Stripe source lifecycle deep-dive
architecture/binary-subdivision.md +270 Algorithm design doc for n-ary subdivision
engine/pipeline-handle-events.md +177 Pipeline event handling doc
engine-refactor/sync-lifecycle-start-end-message.md +160 Start/end message handling doc
plans/2026-04-18-engine-binary-split.md +150 Engine binary split plan
engine-refactor/state-flow.puml +117 PlantUML source (moved from engine/)
plans/2026-04-19-structured-request-logging.md +100 Structured logging plan
architecture/binary-subdivision.puml +78 PlantUML diagram source
guides/debugging-sync-cli.md +44 Debugging guide for sync subprocess
guides/cli-spec.md +19/-7 Updated CLI spec
architecture/packages.md +5/-2 Minor update
slides/demo.md +2/-2 Minor update
engine-refactor/state-flow.png new State flow diagram (binary)
engine-refactor/state-flow.svg +1 Rendered SVG
architecture/binary-subdivision.svg +1 Rendered SVG
service/entities.svg +1 Service entity diagram
engine/sync-engine-types.ts +1/-8 Simplified type examples
slides/step5-engine.sh +1/-1 Minor update
engine/state-flow.puml -68 Moved to engine-refactor/
engine/state-flow.png deleted Moved to engine-refactor/
engine/state-flow.svg -1 Moved to engine-refactor/

packages/source-stripe (+1,550 / -1,454 · net +96)

Full rewrite of pagination from fixed segments to n-ary subdivision. Nearly net-zero — it's a true replacement, not additive. New reconcileRanges, paginateRange, iterateStream. Error handling via stream_status instead of trace.

File +/- Purpose
src/index.test.ts +663/-403 Rewritten for remaining-based state, subdivision, concurrency
src/src-list-api.ts +515/-524 Full rewrite: reconcileRanges, paginateRange, iterateStream
src/src-list-api.test.ts +107/-298 reconcileRanges tests replace old segment tests
src/index.ts +75/-76 New state types, msg factory, resolve account_created at setup
src/process-event.ts +44/-23 Use typed msg factory
src/account-metadata.ts +34 Resolve account_id + account_created together
src/src-events-api.ts +29/-56 Events cursor to global state, completion via remaining.length
src/resourceRegistry.ts +28/-8 Add EXCLUDED_TABLES, formatting
src/spec.ts +22/-19 Add account_created, max_concurrent_streams, remainingRangeSpec
src/retry.ts +19/-2 Add getRetryAfterMs() for Retry-After header
src/catalog.ts +5/-14 Remove catalogFromRegistry, add newer_than_field
src/client.ts +4/-1 Formatting
src/__tests__/eventsPolling.integration.test.ts +3/-3 Message type update
package.json +2/-2 Version/dep update
src/rate-limiter.ts -24 Remove MAX_SEGMENTS, MAX_CONCURRENCY, DEFAULT_MAX_RPS
src/transport.test.ts -1 Remove unused import

scripts (+1,160 / -5 · net +1,155)

New operational tooling: multi-account test harness, Sigma reconciliation, efficiency analysis, and subdivision benchmarking.

File +/- Purpose
reconcile-sigma-vs-postgres.ts +729 Reconcile Sigma query results vs Postgres data
check-sync-efficiency.ts +214 Analyze sync efficiency metrics
test-all-accounts.sh +116 Test sync against all configured accounts
bench-subdivision.sh +68 Benchmark the subdivision algorithm
generate-diagrams.sh +21 Generate PlantUML diagrams
mitmweb-env.sh +8/-1 Verify mitmweb startup
open-docs.sh +4/-4 Path updates

packages/protocol (+1,139 / -743 · net +396)

TraceMessage deleted. Replaced by StreamStatusMessage + ProgressMessage. New SyncRunState, simplified EOF. Binary subdivision algorithm and mergeAsync added. split/channel removed.

File +/- Purpose
src/utils/binary-subdivision.test.ts +374 N-ary subdivision algorithm tests
src/protocol.ts +278/-250 StreamStatus/Progress messages, SyncRunState, new EOF shape
src/utils/binary-subdivision.ts +240 subdivideRanges + streamingSubdivide implementation
src/helpers.ts +150/-158 Message factories replace type guards
src/utils/async-iterable.ts +58/-132 Add mergeAsync, remove split/channel
src/index.ts +20/-22 Updated exports
src/__tests__/cli.test.ts +8/-3 Update CLI schema tests
src/cli.ts +6/-11 Simplified CLI types
package.json +2/-2 Version/dep update
src/utils/async-iterable.test.ts +1/-117 Tests for mergeAsync (split/channel tests removed)
src/__tests__/control.test.ts +1/-35 Remove trace message tests
src/__tests__/state.test.ts +1/-13 Simplify state schema tests

apps/service (+986 / -1,109 · net -123)

Actually shrank. Monolithic workflow split into lifecycle + child backfill. Error classification deleted (engine handles it). Activities simplified to return { eof }.

File +/- Purpose
src/__generated__/openapi.json +409/-442 Regenerated spec
src/__generated__/openapi.d.ts +205/-225 Regenerated types
src/temporal/workflows/pipeline-lifecycle.ts +173 Top-level lifecycle workflow (replaces monolith)
src/__tests__/workflow.test.ts +87/-77 Updated for lifecycle/backfill workflow split
src/temporal/workflows/pipeline-backfill.ts +52 Child workflow: loop sync until !has_more
src/api/app.test.ts +24/-4 Test soft-delete and pause/resume
src/api/app.ts +12/-9 Soft-delete pipelines, add pausedSignal
src/temporal/activities/pipeline-sync.ts +11/-25 Return { eof } instead of RunResult
src/temporal/activities/_shared.ts +3/-59 Remove RunResult, mergeStateMessage, classifySyncErrors
src/temporal/workflows/_shared.ts +2/-3 Signal type update
src/temporal/workflows/index.ts +2/-1 Export new workflows
src/lib/createSchemas.ts +2/-2 Type update
tsconfig.json +2/-1 Add jsx setting
src/temporal/activities/pipeline-teardown.ts +1/-1 Add pipeline store delete
src/index.ts +1/-1 Import path update
src/temporal/workflows/pipeline-workflow.ts -220 Old monolithic workflow
src/temporal/activities/index.ts -2 Remove sync-errors export
src/temporal/sync-errors.ts -37 Error classification (engine handles it now)

packages/util-postgres (+623 / -99 · net +524)

Upsert overhaul: renamed options for clarity, new newerThanColumn for stale-write prevention, new upsertWithStats with row-level classification.

File +/- Purpose
src/upsert.test.ts +404/-75 Tests for renamed options, newerThanColumn, stats
src/upsert.ts +214/-19 Rename options, add newerThanColumn, add upsertWithStats
src/index.ts +2/-2 Export upsertWithStats
package.json +2/-2 Version/dep update
src/httpConnectStream.test.ts +1/-1 Minor fix

e2e (+179 / -170 · net +9)

All assertions updated for new eof/progress shape and remaining-based state. Nearly net-zero — same coverage, new format.

File +/- Purpose
test-server-sync.test.ts +106/-106 Full rewrite for new eof/progress shape
test-sync-engine.test.ts +24/-22 Updated for refactored error handling
test-server-all-api.test.ts +23/-10 Add STREAMS env filter, tuning knobs
connector-loading.test.sh +13/-8 Proxy env cleanup, relaxed assertions
test-e2e-network.test.ts +6/-6 Updated for new progress format
test-disconnect.test.ts +5/-10 Updated for new progress format
header-size-docker.test.ts +1/-7 Simplified
stripe-to-postgres.test.ts +1/-1 Minor update

.github (+162 / -0 · net +162)

File +/- Purpose
workflows/prod-e2e-test.yml +162 Hourly production e2e workflow with Sigma reconciliation

packages/destination-postgres (+138 / -48 · net +90)

Per-stream error resilience (failed streams skipped, not fatal) and stale-write prevention via newerThanField passthrough.

File +/- Purpose
src/index.test.ts +81/-7 Add newer_than_field tests
src/index.ts +55/-39 Per-stream errors, newerThanField passthrough, record yield
package.json +2/-2 Version/dep update

packages/openapi (+12 / -2 · net +10)

File +/- Purpose
listFnResolver.ts +8/-1 Add retry-after to DEBUG_HEADERS
package.json +4/-1 Expand files for .ts source resolution

packages/destination-google-sheets (+12 / -15 · net -3)

File +/- Purpose
src/index.ts +10/-13 Adapt to new message types
package.json +2/-2 Version/dep update

packages/ts-cli (+35 / -15 · net +20)

File +/- Purpose
src/openapi/command.ts +14/-3 Command improvements
src/ndjson.ts +7/-3 NDJSON handling improvement
src/__tests__/json-content-header.test.ts +5/-3 Test update
src/openapi/parse.ts +4 Type addition
src/openapi/command.test.ts +3/-1 Test update
src/env-proxy.test.ts +3/-3 Test update
src/openapi/types.ts +2 Type addition
package.json +2/-2 Version/dep update

packages/state-postgres (+2 / -2 · net 0)

File +/- Purpose
package.json +2/-2 Files reorder (src before dist)

apps/supabase (+6 / -3 · net +3)

File +/- Purpose
src/__tests__/bundle.test.ts +4/-1 Skip test (Deno-only code)
src/index.ts +2/-1 Import update
package.json -1 Remove dependency

Root config

File +/- Purpose
pnpm-lock.yaml +304/-209 Lockfile churn from dependency updates
.gitignore +9 Add tmp/, test-all-accounts.sh, and worktree patterns
AGENTS.md +6 Add debugging section and key gotchas
Dockerfile +3/-4 Minor build adjustments
pnpm-workspace.yaml -2 Remove injectWorkspacePackages: true (breaks hardlinks)
package.json +1 Root dependency addition
eslint.config.mjs +1 ESLint config tweak

demo

File +/- Purpose
stripe-to-postgres.sh +2/-2 CLI flag updates
stripe-to-google-sheets.sh +1/-1 CLI flag update
stripe-to-postgres-live.sh +1/-1 CLI flag update

@tonyxiao tonyxiao force-pushed the tx/nary-search-pagination branch from 742660a to 46cf03f Compare April 17, 2026 09:08
tonyxiao added a commit that referenced this pull request Apr 17, 2026
#296

Merge the "massive progress fix" (PR #296) onto the n-ary search pagination
branch (PR #307), reconciling the two feature sets:

Protocol:
- Simplify TraceStreamStatus.status to ['started', 'complete']
- Errors are orthogonal to lifecycle — a stream can be complete with errors
- range_complete is now a separate field on TraceStreamStatus, not a status value
- Add CatalogMessage to SyncOutput, TraceGlobalProgress with cumulative stats

Engine:
- Progress tracker emits catalog upfront, stream_status + global_progress
  pairs on transitions, and tracks completed_ranges via mergeRanges
- New sync-progress-state.ts reducer and sync-ui.tsx Ink/React CLI component
- CLI gains --progress flag for interactive terminal progress display

Service:
- Error classification distinguishes globalPermanent vs streamPermanent
- Only global permanent errors (bad API key, invalid config) park the workflow
- Stream-scoped permanent errors skip the stream on resume

Source:
- Remove label from HttpRetryOptions
- Use isRetryableHttpError for failure type classification
- Emit complete status after non-global errors (errors orthogonal to lifecycle)
- Add 'Unrecognized request URL' to skippable error patterns

Made-with: Cursor
Committed-By-Agent: cursor
Made-with: Cursor
Committed-By-Agent: cursor
Comment thread apps/engine/src/lib/progress.ts Fixed
Comment thread packages/source-stripe/src/index.test.ts Fixed
Comment thread packages/source-stripe/src/index.test.ts Fixed
Comment thread packages/source-stripe/src/index.test.ts Fixed
tonyxiao and others added 24 commits April 19, 2026 00:31
…on tests

Verifies that when a stream has newer_than_field set, the destination
skips upserts where the incoming record is older than the existing row,
and allows upserts when the incoming record is newer.

Also fixes docker port parsing (IPv4/IPv6 multi-line output).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…progress tracking

- Extract app.ts helpers (formatEof, logApiStream, etc.) into api/helpers.ts
- Remove backfill CLI command and lib/backfill.ts (sync-only CLI)
- Remove sync-errors.ts error classification — eof.run_progress.derived.status
  already contains failure info
- Simplify pipeline-sync activity to just drain and return { eof }
- Fix progress tracking: separate run_progress (cumulative) from
  request_progress (per-call), seed with catalog stream names
- Extract stateReducer into own file with tests, fold progress into state
  naturally instead of bolting on at eof
- Simplify pipeline-lifecycle workflow: remove permanent error state machine,
  replace ReconcileState enum with backfilling + backfillCount
- Update test-all-accounts.sh for new CLI entry point and --base-url arg
- Pipeline schema progress field now uses ProgressPayload (not EofPayload)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Local test script with account-specific env vars, not for CI.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Formats ProgressPayload into a human-readable string with emojis,
elapsed time, throughput, and per-stream status. Will be used by
the CLI sync command to replace raw NDJSON output.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Explicitly annotate the generator return type so TypeScript doesn't
widen the yield type from takeLimits' T | EofMessage union.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
- Replace emoji with monospace-friendly icons (○ ◐ ● ⊘ ✗)
- formatProgress accepts optional prev param to show per-stream
  deltas (e.g. "+50") since last emission

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
⚪ not_started, 🟡 started, 🟢 completed, ⏭️ skipped, 🔴 errored

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
- Error message now appended to header line instead of separate line
- Header shows total row delta (+N) when prev progress is provided
- Added 10-stream test case for realistic output visibility

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
- Connection error message appears on the errored stream's line
  (falls back to header when multiple streams errored)
- Checkpoint count shows delta (+N) when prev is provided

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Replaces raw NDJSON stdout with human-readable progress on stderr.
Shows run_progress on each progress emission and at eof, with deltas
from the previous emission.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
- Use progress.derived.records_per_second and states_per_second
  directly instead of recomputing from elapsed_ms
- Collapse not_started streams into "⚪ N streams pending"
- Only show streams with activity (started/completed/errored/skipped)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…progress/

- Show all stream names (no collapsing not_started)
- Completed streams always show record count (even 0)
- Only show "Sync failed" when no streams are still active
- Move format.ts into progress/ folder

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…rted

- deriveStatus only reports 'failed' when no streams are still active
- Rates are 0 when elapsed_ms is 0 (no more divide-by-near-zero)
- formatProgress collapses not_started streams into "⚪ N remaining"
- Added 'succeeded' status label in format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Header shows "Syncing N streams", collapsed line shows "N not started: names"

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…ames

Header shows count per status: [3 🟢 2 🟡 5 ⚪]
Collapsed not-started line just lists names.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
[3 done, 2 active, 5 queued] instead of [3 🟢 2 🟡 5 ⚪]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…ader line

Shows "Syncing N streams" and appends breakdown (3 completed, 2 started, etc.)
to the header line.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
progressReducer now throws if msg._ts is missing — the reducer stays
pure. The engine stamps _ts on every message before it reaches the
reducers, fixing the 0.0s elapsed bug.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
tonyxiao and others added 29 commits April 21, 2026 02:37
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
Use the connector's JSON Schema (via z.fromJSONSchema, already cached on
the resolver) in strict mode so that typos like --postgres.connection_strin
are caught with a clear error listing valid keys. Also adds --reset-state
to pipelines get.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
ConfiguredStream.time_range.gte and .lt are now optional. The engine
only injects lt (from time_ceiling) and never fabricates gte. The
source fills missing bounds from account metadata before processing.

Also adds time_range to PipelineConfig.streams so users can specify
bounded syncs. User-provided bounds are never overridden by the engine.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Connector shorthand flags (e.g. --postgres.url) now merge on top of the
stored pipeline config instead of replacing it entirely. This fixes sync
losing fields like api_key when only overriding one property.

Extracted fetchAndMergeOverrides helper shared by both get and sync so
the fetch → merge → validate-via-OAS-schema flow isn't duplicated.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Print the error message and exit instead of letting it bubble up as an
unhandled exception (which caused citty and Node to both print it).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
v3 runs on Node.js 20 which is deprecated and will be removed from
GitHub Actions runners on 2026-09-16. v4 supports Node.js 24.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Committed-By-Agent: codex
Co-authored-by: codex <noreply@openai.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
- Create `packages/logger/src/bin/pretty.ts` — stdin NDJSON pretty
  printer with consistent `type:` labels for all protocol message types
- Move progress formatting (ProgressView, formatProgress) from
  `apps/engine/src/lib/progress/format.tsx` into
  `packages/logger/src/format/progress.tsx`, exported as
  `@stripe/sync-logger/progress`
- Add `_ts` field to all protocol log payloads from the logger
- Add ink, react as logger package dependencies
- Update all consumers to import from `@stripe/sync-logger/progress`

Usage: cat sync_run.log | tsx packages/logger/src/bin/pretty.ts

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Type labels now match protocol exactly (stream_status:, source_state:,
connection_status:, etc.) with no fixed-width padding. Time ranges
trimmed to second precision.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Progress uses ProgressHeader component (compact, 2 lines).
EOF renders ProgressView inside an Ink Box with rounded border,
colored by status (red/green/yellow).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
withHttpRetry uses exponential backoff (up to 31s) but had no abort
signal, so retries blocked past the chunk time limit causing hard
deadline hits. Race listFn against the abort signal in withRateLimit
so retries are abandoned immediately on pipeline teardown.

Also handle AbortError in the stream catch block: log a warning about
potential cross-chunk retry loops instead of emitting stream_status:error,
letting the stream retry on the next chunk.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Remove max_concurrent_streams config — concurrent streams is now
min(rate_limit, catalog.streams.length). With rate_limit=50 and 74
streams, all 50 run in parallel instead of the old default of 5.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
withAbortOnReturn was using plain Error('iterator returned') as the
abort reason, which callers couldn't reliably detect. Changed to
DOMException with name 'AbortError' so the entire abort chain uses
a consistent error type. Also swallow the losing Promise.race
rejection in withRateLimit to prevent unhandled promise crashes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…config

node-postgres parses connectionString last via Object.assign, so
sslmode in the URL always overwrites any ssl key set on the config
object. Strip SSL params from the connection string and translate
sslmode to Node.js TLS options only when the caller hasn't already
set an explicit ssl key.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
The SSL param stripping logic (stripSslParams + sslConfigFromConnectionString)
addresses a real node-postgres issue where sslmode in the URL overwrites the
ssl config key. However, this proxy + SSL + node-postgres interaction has been
repeatedly tricky and needs thorough testing across RDS, local Docker, and
tunneled connections before enabling. Keeping the code commented out as a
reference for when we're ready to re-enable with proper test coverage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
…unction

Move SSL connection string handling into a named, exported function
instead of inline commented-out code. Not called yet — needs testing
across RDS, Docker, and tunneled connections. When enabled, should be
applied in the pool factory (all connections), not just proxied ones.

Adds tests for sslmode=require, sslmode=verify-full, and explicit ssl
override scenarios.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
… var

normalizePgSslConfig is now called inside withPgConnectProxy so all
callers (destination-postgres, state-postgres) get it consistently.
Gated by PG_NORMALIZE_SSL=1 — set it to test, unset to disable.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Committed-By-Agent: claude
@tonyxiao tonyxiao closed this Apr 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants