Skip to content

[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670

Open
xumingming wants to merge 1 commit into
apache:mainfrom
xumingming:extend-e2e-checked-zone
Open

[CELEBORN-2313] Extend E2E checked zone to batch assembly point#3670
xumingming wants to merge 1 commit into
apache:mainfrom
xumingming:extend-e2e-checked-zone

Conversation

@xumingming
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Celeborn's E2E integrity check computes CRC_M inside ShuffleClientImpl.pushOrMergeData(), which runs in the async DataPusher thread. This leaves the segment from batch assembly in the writer thread through the DataPusher queue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.

This change closes that gap and enables detection of a class of correctness bugs where data corruption occurs between batch assembly and async push dispatch, including bugs involving shared buffer pool references.

Introduce ShuffleClient.computeBatchCRC() and call it immediately before each assembled batch enters the async push pipeline, at 7 call sites across 3 classes: HashBasedShuffleWriter (spark-2/3) at flushSendBuffer(), pushGiantRecord(), and the per-partition flush in close(); and SortBasedPusher at the partition-change flush, buffer-overflow flush, pushGiantRecord(), and final flush. The now-redundant CRC computation inside pushOrMergeData() is removed.

This approach is less elegant than the original design, which had a single CRC call site inside pushOrMergeData() — one place to reason about and maintain. The new design scatters computeBatchCRC() across 7 call sites, but the trade-off is justified: the checked zone now starts at batch assembly rather than at async push dispatch, covering more of the data pipeline and enabling detection of a broader class of correctness bugs.

Why are the changes needed?

Celeborn's E2E integrity check computes CRC_M inside ShuffleClientImpl.pushOrMergeData(), which runs in the async DataPusher thread. This leaves the segment from batch assembly in the writer thread through the DataPusher queue entirely outside the checked zone — meaning any corruption that occurs in that window is invisible to the integrity check and reaches reducers silently.

Does this PR resolve a correctness bug?

It could help us detect more correctness bug.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

@xumingming xumingming force-pushed the extend-e2e-checked-zone branch 2 times, most recently from cf055e4 to a4ee01e Compare April 22, 2026 13:03
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 22, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 67.05%. Comparing base (b4cb5a0) to head (a4ee01e).
⚠️ Report is 26 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3670      +/-   ##
==========================================
+ Coverage   66.91%   67.05%   +0.15%     
==========================================
  Files         358      359       +1     
  Lines       21986    22197     +211     
  Branches     1946     1970      +24     
==========================================
+ Hits        14710    14883     +173     
- Misses       6262     6292      +30     
- Partials     1014     1022       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Extends Celeborn’s E2E shuffle integrity “checked zone” upstream by computing per-batch CRC immediately after batch assembly (writer thread) rather than inside the async DataPusher pipeline, aiming to detect corruption occurring between batch assembly and async dispatch.

Changes:

  • Adds ShuffleClient.computeBatchCRC() and implements it in ShuffleClientImpl (no-op in DummyShuffleClient).
  • Moves CRC accumulation call sites into Spark shuffle writers and SortBasedPusher right before enqueue/push/merge.
  • Adds a unit test validating CRC accumulation behavior.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java Adds UT for computeBatchCRC() accumulation behavior.
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java Removes CRC accumulation from pushOrMergeData() and adds computeBatchCRC() implementation.
client/src/main/java/org/apache/celeborn/client/ShuffleClient.java Introduces the new computeBatchCRC() API with Javadoc.
client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java Implements new abstract method as a no-op.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Calls computeBatchCRC() for giant-record pushes.
client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Calls computeBatchCRC() before enqueue and per-partition final flush.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java Calls computeBatchCRC() for giant-record pushes.
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java Calls computeBatchCRC() before enqueue and per-partition final flush.
client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedPusher.java Calls computeBatchCRC() before partition-change flush, overflow flush, and final flush.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Comment thread client/src/test/java/org/apache/celeborn/client/ShuffleClientSuiteJ.java Outdated
@gauravkm
Copy link
Copy Markdown
Contributor

Thanks for the fix!

I have been thinking about this change as well.
Copilot raises a valid point around potentially missing call sites where we should explicitly invoke the computation. That is where I sort of got stuck as well. I am going to spend more time thinking about what could be a more robust approach.

I am also interested in learning what other folks in the community think as well about the current explicit call approach from all writer call sites.

@gauravkm
Copy link
Copy Markdown
Contributor

One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data

@RexXiong
Copy link
Copy Markdown
Contributor

RexXiong commented Apr 23, 2026

Thanks for this PR — the analysis of the coverage gap is spot-on. I have a suggestion on the API design that could make it more maintainable.

Concern: Implicit CRC contract

The current approach requires every call site to remember computeBatchCRC() before pushData/mergeData/addTask. This is an implicit contract — a new code path can easily call pushData directly and silently bypass the integrity check. The original single-site design was self-enforcing; this PR trades that away.

Suggestion: Encapsulate CRC + push as atomic operations

Introduce WithCRC variants that combine CRC computation and data push into a single call:

// ShuffleClient.java
public int pushDataWithCRC(int shuffleId, int mapId, int attemptId,
    int partitionId, byte[] data, int offset, int length,
    int numMappers, int numPartitions) throws IOException {
  computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
  return pushData(shuffleId, mapId, attemptId, partitionId,
      data, offset, length, numMappers, numPartitions);
}

public int mergeDataWithCRC(int shuffleId, int mapId, int attemptId,
    int partitionId, byte[] data, int offset, int length,
    int numMappers, int numPartitions) throws IOException {
  computeBatchCRC(shuffleId, mapId, attemptId, partitionId, data, offset, length);
  return mergeData(shuffleId, mapId, attemptId, partitionId,
      data, offset, length, numMappers, numPartitions);
}

// For the DataPusher path
public void addTaskWithCRC(int shuffleId, int mapId, int attemptId,
    int partitionId, byte[] buffer, int size) throws InterruptedException {
  computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
  dataPusher.addTask(partitionId, buffer, size);
}

Then the 7 writer call sites each become a single call — no way to forget CRC:

// Before (two operations, easy to miss CRC)
shuffleClient.computeBatchCRC(shuffleId, mapId, attemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);

// After (one atomic operation)
shuffleClient.addTaskWithCRC(shuffleId, mapId, attemptId, partitionId, buffer, size);

The bare pushData/mergeData can be annotated as internal-only (or made package-private if feasible), so new code naturally gravitates toward the WithCRC variants.

This doesn't eliminate the 7 call sites, but it makes them self-contained and impossible to get wrong. The invariant shifts from "callers must remember to compute CRC first" (implicit, fragile) to "use the CRC-inclusive API" (explicit, hard to misuse).


Other than that, a couple of minor notes on the test:

  • celebornCrcCombine in the test duplicates CelebornCRC32.combine. If combine's implementation changes, the test won't catch it. Consider calling CelebornCRC32.combine directly instead.
  • It would be valuable to add an integration test that exercises the full writer → computeBatchCRCDataPusherpushOrMergeData path to guard against future regressions (e.g., someone re-adding CRC to pushOrMergeData causing double-counting).

Review generated with the assistance of Claude AI

@RexXiong
Copy link
Copy Markdown
Contributor

One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data

@gauravkm interesting idea! Failing the task at the mapper side is far cheaper than discovering corruption at reducers and rerunning the entire job. We can explore this as a follow-up improvement.

@xumingming xumingming closed this Apr 23, 2026
@xumingming xumingming reopened this Apr 23, 2026
@xumingming
Copy link
Copy Markdown
Contributor Author

@RexXiong Thanks for the review. After some thinking, I find the suggestion you provided to be not feasible. The reason I want to separate CRC calculation from pushOrMergeData is because current CRC calculation is too late, so I separate it from pushOrMergeData to call it at a earlier call site. The call site could be:

  • shuffleClient.pushData
  • shuffleClient.mergeData
  • dataPusher.addTask
  • etc

For adding pushDataWithCRC to replace pushData, it is doable technically, but it is actually similar to original pushOrMergeData, which is where we were from.
For adding addTaskWithCRC, it is trickier, because addTask is a method from DataPusher, DataPusher is a higher level concept than ShuffleClient, mix DataPusher into ShuffleClient seems not a very good idea.

Even if we have added all the xxxWithCRC methods. The methods in ShuffleClient will be:

  • pushData: can not be removed, because when data flowing through dataPusher.addTask, CRC is calculated there, but ultimately we need to call pushData to do actual push, we should not call pushDataWithCRC here.
  • mergeData: can not be removed, similar reason as above.
  • pushDataWithCRC: newly added
  • mergeDataWithCRC: newly added
  • addTaskWithCRC: newly added

It seems more complicated than my current solution. How do you think?

@xumingming
Copy link
Copy Markdown
Contributor Author

One option might to have 2 checksums (from both writer and shuffle client) on the client as well, and then compare them to be the same before sending the metadata. If there is a mismatch, then fail the task -> would catch both cases - that either a call site has been missed or that DataPusher did not push all data

@gauravkm Interesting idea! Looking forward to detailed proposal.

@xumingming xumingming force-pushed the extend-e2e-checked-zone branch from a4ee01e to f3dd0f0 Compare April 23, 2026 09:02
@xumingming
Copy link
Copy Markdown
Contributor Author

@RexXiong @SteNicholas Have made corresponding changes, please take a look again.

@gauravkm
Copy link
Copy Markdown
Contributor

gauravkm commented Apr 23, 2026

@gauravkm Interesting idea! Looking forward to detailed proposal

There isn't a lot more to it. Essentially we keep both the existing checksum computation, and store the CRC computation being added from the writers (in this PR) separately. And then before we send metadata, we ensure that both the computations match. Otherwise we fail the task

Essentially there are two layers (writer and shuffle client) computing their own checksums, and then shuffle client compares them before propagation at mapper end

@xumingming
Copy link
Copy Markdown
Contributor Author

@RexXiong @SteNicholas Gentle ping :)

@SteNicholas SteNicholas requested a review from Copilot May 13, 2026 07:37
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.

@xumingming
Copy link
Copy Markdown
Contributor Author

@SteNicholas Can you take another look?

@SteNicholas
Copy link
Copy Markdown
Member

@xumingming, please take a look at the comments of claude code:

Code Review — [CELEBORN-2313] Extend E2E checked zone to batch assembly point 
  (apache/celeborn#3670)

  Overview

  Moves CRC_M (per-partition checksum + byte-count) accumulation out of
  ShuffleClientImpl.pushOrMergeData() (which runs on the async DataPusher thread) into a new
  explicit ShuffleClient.computeBatchCRC() invoked synchronously in the writer thread, right after 
  batch assembly, at 7 dispatch sites (Hash spark-2/3: flushSendBuffer, pushGiantRecord,
  close()/closeWrite() residual; Sort pushGiantRecord; SortBasedPusher: partition-change,
  buffer-overflow, final flush). The old accumulation line in pushOrMergeData() is removed;
  ShuffleClient/DummyShuffleClient get the new method. +103/-6, 9 files. Goal: widen the
  integrity-checked zone to cover the assembly→async-dispatch window.

  Correctness — the central concern is call-site completeness

  - The single chokepoint becomes an invariant maintained by convention. Previously every push
  passed through pushOrMergeData(), so CRC_M provably covered exactly what was sent. Now CRC_M
  correctness depends on enumerating every assembled-data dispatch site across 4 writers + the
  pusher. A missed site → under-count → false integrity-mismatch task failures; an extra/duplicated 
  site or a batch CRC'd then not sent (discard/abort) → over-count → same. This is the dominant risk
   and is asserted by inspection only. The enumerated sites look plausibly complete (normal flush,
  giant record, close residual, sort partition-change/overflow/final; Sort normal path is covered
  via the shared SortBasedPusher), but please double-check there is no other path that reaches the
  server with assembled user data (e.g., revive/split re-push reuses an already-CRC'd buffer = OK;
  confirm nothing else calls pushData/mergeData with fresh data).
  - ⚠️  attemptId encoding consistency. Hash writers pass encodedAttemptId; SortBasedPusher passes
  attemptNumber. computeBatchCRC does getPushState(makeMapKey(shuffleId, mapId, attemptId)). If that
   key doesn't match the key used by the actual push/commit path for the same batch, CRC_M is
  accumulated into a different PushState than the one read at commit → integrity check silently
  broken or always-mismatching. Verify both encodings resolve to the same PushState (high priority).
  - ⚠️  Thread-safety / visibility. Accumulation moves from the DataPusher thread to the writer
  thread, while PushState.addDataWithOffsetAndLength was previously single-threaded on the pusher.
  Confirm: (a) SortBasedPusher.pushData can't run concurrently with another path mutating the same
  PushState (e.g., spill-triggered flush), and (b) there is a happens-before between writer-thread
  accumulation and the commit-time getCRC32PerPartition read (likely OK since the writer also drives
   mapperEnd, but it should be explicit/confirmed).
  - Buffer ordering is correct: computeBatchCRC is placed immediately before
  addTask/mergeData/pushData at every site, so the snapshot reflects the bytes about to be
  dispatched (and any later mutation is exactly what this PR intends to catch). Partition/offset
  args line up with the following dispatch call at each site.
  - CRC is still over uncompressed assembled bytes (same content the old code saw before the
  compression block), so semantics vs. the reducer-side check are preserved.

  Code quality / style
  
  - 7 duplicated computeBatchCRC(shuffleId, mapId, <attempt>, partition, buf, 0, size) calls are
  fragile — a future new dispatch path will silently omit it. Consider funnelling writer-side
  dispatch through a small helper (crcThenAddTask(...) / crcThenPushOrMerge(...)) so the CRC and the
   dispatch can't drift apart, restoring much of the lost single-point property without giving up
  the wider zone.
  - Good: the ShuffleClient.computeBatchCRC Javadoc states the invariant ("sole CRC accumulation
  path; pushOrMergeData does not compute CRC"). Keep that — it's the contract the whole change rests
   on.
  - The PR description is honest about the elegance trade-off; appreciated. Worth adding an inline
  comment at each site (or near pushOrMergeData) pointing to the enumerated sites so the invariant
  is discoverable.
  - Disabled path is correctly early-returned (if (!shuffleIntegrityCheckEnabled) return;), so
  default-off users pay nothing.

  Performance

  - CRC32 now runs on the synchronous writer critical path instead of overlapping on the DataPusher
  thread. For large shuffles with integrity check enabled this removes pipelining and may measurably
   reduce write throughput. It's gated by shuffleIntegrityCheckEnabled (default off → no impact),
  but for users who enable it this is a real regression that should be benchmarked and called out as
   a conscious trade-off.

  Test coverage — thin for the risk profile

  - testComputeBatchCRCAccumulatesCorrectly is a solid unit test for the primitive: multi-batch,
  multi-partition accumulation verified against an independent CommitMetadata recomputation,
  including per-partition separation. Good.
  - Missing the high-risk coverage: (1) no test that each writer path (giant / normal flush / close
  residual / sort overflow/partition-change/final) actually calls computeBatchCRC exactly once for
  the bytes it sends; (2) no end-to-end assertion that writer-side CRC_M equals a recomputation over
   the actually pushed bytes (the regression that a moved/missed site would introduce); (3) no test
  for the attemptId-keying consistency; (4) no disabled-path test. Recommend at least a writer-level
   test (Hash and Sort, integrity enabled) asserting accumulated per-partition CRC matches
  independent recomputation across the giant/normal/close paths.

  Security

  - N/A — internal integrity mechanism, no new external input handling.

  Verdict

  Reasonable, well-motivated correctness enhancement with a clean primitive and honest
  documentation. No definitive bug found, but it trades a structural guarantee for a convention
  spread over 7 sites. Before merge I'd want: (1) confirmation of attemptId/PushState key
  consistency (esp. SortBasedPusher), (2) confirmation of call-site completeness and PushState
  thread-safety/visibility, (3) writer-level/E2E tests that would fail if a site is missed or
  mis-keyed, and (4) a benchmark note on the writer-thread CRC cost. Consider the dispatch-helper
  refactor to harden the invariant.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.

@xumingming xumingming force-pushed the extend-e2e-checked-zone branch from edd0b92 to 7d49b38 Compare May 20, 2026 07:04
@xumingming
Copy link
Copy Markdown
Contributor Author

@SteNicholas Made corresponding changes, and the following are the detailed responses to Claude's comments:

  • ⚠️ attemptId encoding consistency. Hash writers pass encodedAttemptId; SortBasedPusher passes
    attemptNumber. computeBatchCRC does getPushState(makeMapKey(shuffleId, mapId, attemptId)). If that
    key doesn't match the key used by the actual push/commit path for the same batch, CRC_M is
    accumulated into a different PushState than the one read at commit → integrity check silently
    broken or always-mismatching. Verify both encodings resolve to the same PushState (high priority).

This seems a mis-read. All the call site are using encodedAttemptId. The current code does use attemptNumber and encodedAttemptId in difference places, but it is just a naming issue, and not introduced by this PR.

  • ⚠️ Thread-safety / visibility. Accumulation moves from the DataPusher thread to the writer
    thread, while PushState.addDataWithOffsetAndLength was previously single-threaded on the pusher.
    Confirm: (a) SortBasedPusher.pushData can't run concurrently with another path mutating the same
    PushState (e.g., spill-triggered flush), and (b) there is a happens-before between writer-thread
    accumulation and the commit-time getCRC32PerPartition read (likely OK since the writer also drives
    mapperEnd, but it should be explicit/confirmed).

(a). Only the Spark task thread writes; the DataPusher thread only reads. Spill is a no-op in SortBasedPusher, there is no cross-thread visibility issue.
(b). In the new implementation, both the writer-thread and the read-thread are the same spark task thread, so there is a nature happens-before.

Test coverage — thin for the risk profile

Added e2e tests in CelebornShuffleWriterSuiteBase which covers the following code path:

  • Giant path: records > buffer size → pushGiantRecord → computeBatchCRC + pushData
  • Normal flush path: buffer fills up → computeBatchCRC + dataPusher.addTask/mergeData
  • Close residual path: remaining bytes at writer.stop() → computeBatchCRC + mergeData

@xumingming xumingming force-pushed the extend-e2e-checked-zone branch from 7d49b38 to 744c69f Compare May 20, 2026 08:12
Celeborn's E2E integrity check computes CRC_M inside
`ShuffleClientImpl.pushOrMergeData()`, which runs in the async `DataPusher`
thread. This leaves the segment from batch assembly in the writer thread
through the `DataPusher` queue entirely outside the checked zone — meaning
any corruption that occurs in that window is invisible to the integrity check
and reaches reducers silently.

This change closes that gap and enables detection of a class of correctness
bugs where data corruption occurs between batch assembly and async push
dispatch, including bugs involving shared buffer pool references.

Introduce `ShuffleClient.computeBatchCRC()` and call it immediately before
each assembled batch enters the async push pipeline, at 7 call sites across
3 classes: `HashBasedShuffleWriter` (spark-2/3) at `flushSendBuffer()`,
`pushGiantRecord()`, and the per-partition flush in `close()`; and
`SortBasedPusher` at the partition-change flush, buffer-overflow flush,
`pushGiantRecord()`, and final flush. The now-redundant CRC computation
inside `pushOrMergeData()` is removed.

This approach is less elegant than the original design, which had a single
CRC call site inside `pushOrMergeData()` — one place to reason about and
maintain. The new design scatters `computeBatchCRC()` across 7 call sites,
but the trade-off is justified: the checked zone now starts at batch assembly
rather than at async push dispatch, covering more of the data pipeline and
enabling detection of a broader class of correctness bugs.
@xumingming xumingming force-pushed the extend-e2e-checked-zone branch from 744c69f to 7e0a376 Compare May 20, 2026 08:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants