backfill: dedup against unflushed spool rows (#107)#114
Merged
Conversation
`hyp backfill` deduped only against committed Iceberg `part_id`s (`scanExistingPartIds`) and was blind to the spool. Rows captured live but still sitting unflushed in the spool (debounce window, captured not yet committed) were invisible, so backfill materialized its own copy and the spool later flushed its copy — two rows with the same `part_id`. Fix: fold spooled `part_id`s into the backfill materializer's pre-write seen-set. - spool.js: read-only `readSpooledRows(tablePath)` that parses the pending `active.jsonl` + `flush-*.jsonl` envelopes and yields rows; it never rotates files or advances flush progress, so it is safe beside live capture. `discoverSpoolTables` is now exported. - storage.js: `readSpooledRows(dataset, columns)` on the extended storage service — discovers a dataset's spool tables, strips internal fields, projects to the requested columns (parity with `readRows`). - dataset.js: `scanSpooledPartIds` folds spool `part_id`s into the seen-set, invoked ONLY from `createBackfillDedupe` (once per run, alongside the committed scan). Best-effort: a missing spool surface or read error leaves the seen-set untouched rather than dropping rows. Settle-path hazard avoided: `dedupeByPartId` (flush-time settlement) is deliberately left scanning committed partitions only. The rows it settles at flush ARE the spool rows; seeding its seen-set with spool `part_id`s would make every row match itself and be dropped — the flush would delete the data it is committing. Both functions carry comments stating this. LLP 0027: marks the "backfill-vs-spool same-id duplicates" open question resolved (scan spooled rows in the materializer). `@ref LLP 0027` added above the spool-scan constructs. Tests: materializer tests for spool-overlap dedupe (full skip, partial skip, legacy message_id+part_index recompose, committed∪spool union, unreadable-spool degrade-to-committed, missing-surface skip) and storage tests for `readSpooledRows` (pending-then-empty-after-flush, column projection + dataset filtering, unknown dataset). Existing no-spool tests stay green. Full `npm test`, lint, and typecheck pass. Fixes #107 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Contributor
Author
Dual-agent review —
|
| Source | Finding (severity, evidence) | Intersects |
|---|---|---|
| Codex | major/high: readSpooledRows envelope validation looser than streamFlushFile (spool.js:235, streaming-reader.js:106) |
Yes — Targets (readSpooledRows), Risks (envelope-validation asymmetry) |
| Claude | No issues found | (n/a) |
Codex review
Fix Validations
Backfill dedupes against unflushed spool rows
- Status: correct
- Evidence: hypaware-core/plugins-workspace/ai-gateway/src/dataset.js:449, hypaware-core/plugins-workspace/ai-gateway/src/dataset.js:554, src/core/cache/storage.js:300, test/plugins/ai-gateway-backfill-materializer.test.js:317
- Assessment: Valid spooled rows now seed the backfill seen-set before materialized rows are returned, and tests cover full overlap, partial overlap, legacy
message_id+part_index, read failure, and missing spool surface.
Settle-path self-drop hazard avoided
- Status: correct
- Evidence: hypaware-core/plugins-workspace/ai-gateway/src/dataset.js:322, hypaware-core/plugins-workspace/ai-gateway/src/dataset.js:325, hypaware-core/plugins-workspace/ai-gateway/src/dataset.js:454
- Assessment: Flush-time
dedupeByPartIdstill scans only committed partitions;scanSpooledPartIdsis only invoked from backfill dedupe.
Findings
1) Behavioral Correctness
- Severity: major
- Confidence: high
- Evidence: src/core/cache/spool.js:235, src/core/cache/spool.js:236, src/core/cache/streaming-reader.js:106, src/core/cache/streaming-reader.js:112
- Why it matters:
readSpooledRowscan dedupe backfill against a parseable-but-unflushable spool line because it accepts{version:1, rows:[...]}without validatingcolumns, while the actual flush reader rejects that same envelope as malformed and drops it. - Suggested fix: Make
readSpooledRowsuse the same envelope validity contract asstreamFlushFile: requireArray.isArray(envelope.columns)before yielding rows, and add a storage/materializer test with a malformed parseable spool line proving backfill does not skip rows that flush cannot commit.
No Finding
- Contract & Interface Fidelity
- Change Impact / Blast Radius
- Concurrency, Ordering & State Safety
- Error Handling & Resilience
- Security Surface
- Resource Lifecycle & Cleanup
- Release Safety
- Test Evidence Quality
- Architectural Consistency
- Debuggability & Operability
Evidence Bundle
- Changed hot paths:
createBackfillDedupe->scanExistingPartIds+scanSpooledPartIds;createQueryStorageService.readSpooledRows;createCacheSpool.readSpooledRows;discoverSpoolTables. - Impacted callers: hypaware-core/plugins-workspace/ai-gateway/src/dataset.js:454, src/core/cache/storage.js:311, src/core/cache/spool.js:146.
- Impacted tests: test/plugins/ai-gateway-backfill-materializer.test.js:317, test/plugins/ai-gateway-backfill-materializer.test.js:328, test/plugins/ai-gateway-backfill-materializer.test.js:339, test/plugins/ai-gateway-backfill-materializer.test.js:349, test/core/cache-storage.test.js:283, test/core/cache-storage.test.js:310.
- Unresolved uncertainty: I did not run the test suite; I reviewed the supplied diff plus targeted caller/contract context.
Claude review
Claude review
No issues found.
Reports: /Users/phil/workspace/hypaware/.git/worktrees/dual-review-pr-114/dual-review/pr-114
…g flush streamFlushFile treats a parseable envelope without an Array `columns` field as malformed and drops it — its rows never reach a committed partition. The backfill spool reader validated only `version` and `rows`, so it would surface those rows and let backfill dedupe against them, refusing to materialize rows flush will never commit. Align readSpooledRows' envelope-validity contract with streamFlushFile (require Array.isArray(columns)) and add a regression test feeding a malformed parseable spool line. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
# Conflicts: # llp/0027-cache-settlement.decision.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Root cause
hyp backfilldeduped only against committed Icebergpart_ids viascanExistingPartIds(committed-partition scan indataset.js), and was blind to the spool. When rows are captured live but still sitting unflushed in the spool — the window is the ~2-minute flush debounce, so this is deterministic, not a tight race — backfill cannot see them. It materializes its own copy of those messages, then the spool flushes its copy later, leaving two rows with the samepart_id.Fix
Fold the
part_ids of rows pending in the spool into the backfill materializer's pre-write seen-set.src/core/cache/spool.js— add a read-onlyreadSpooledRows(tablePath)that parses the pendingactive.jsonl+ rotatedflush-*.jsonlenvelopes ({version, columns, rows}) and yields rows. It never rotates files or advances flush progress, so it is safe to call alongside live capture; any error degrades to skipping that file/line.discoverSpoolTablesis now exported.src/core/cache/storage.js— addreadSpooledRows(dataset, columns)to the extended storage service: discovers the dataset's spool tables (filtering bydatasetForTablePath), strips internal fields, and projects to the requested columns for parity withreadRows.hypaware-core/plugins-workspace/ai-gateway/src/dataset.js— addscanSpooledPartIds(storage, seen)and invoke it only fromcreateBackfillDedupe, once per run alongside the committed scan. Best-effort: a storage stub without the spool surface, or any read error, leaves the seen-set untouched rather than dropping rows (a dedupe miss is recoverable via compaction; dropping rows is not).Settle-path hazard avoided
scanExistingPartIdsis also used bydedupeByPartIdinsidecreateSettleBatch, which runs during flush. The rows being settled at flush are the spool rows. If the spool scan were wired into the settle path, every row would match itself in the seen-set and be dropped — the flush would delete the very data it is committing. The spool scan is therefore strictly opt-in (backfill only), and bothscanSpooledPartIdsanddedupeByPartIdcarry comments stating this constraint.LLP
llp/0027-cache-settlement.decision.md— the "backfill-vs-spool same-id duplicates" open question is marked resolved (scan spooled rows in the materializer).@ref LLP 0027#open-questionsannotations added above the spool-scan constructs indataset.jsandstorage.js.Tests
test/plugins/ai-gateway-backfill-materializer.test.js): spooled rows are not re-materialized; only spool-overlapping parts are skipped; legacy spooled rows match viamessage_id+part_index; committed ∪ spool unioned into one seen-set; unreadable spool degrades to committed-only dedup; a stub withoutreadSpooledRowsstill dedupes against committed. Existing no-spool tests stay green.test/core/cache-storage.test.js):readSpooledRowsyields unflushed rows then is empty after flush; projects to requested columns and filters by dataset; empty stream for an unknown dataset.npm test(1155 pass / 0 fail / 1 skipped),npm run lint, andnpm run typecheckall pass.Fixes #107
🤖 Generated with Claude Code