Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ public long pushData(boolean growThreshold) throws IOException {
if (currentPartition == -1) {
currentPartition = partition;
} else {
shuffleClient.computeBatchCRC(
shuffleId, mapId, attemptNumber, currentPartition, dataBuf, 0, offSet);
int bytesWritten =
shuffleClient.mergeData(
shuffleId,
Expand All @@ -246,6 +248,8 @@ public long pushData(boolean growThreshold) throws IOException {

if (offSet + recordSize > dataBuf.length) {
try {
shuffleClient.computeBatchCRC(
shuffleId, mapId, attemptNumber, partition, dataBuf, 0, offSet);
dataPusher.addTask(partition, dataBuf, offSet);
memoryThresholdManager.updateStats(offSet, true);
} catch (InterruptedException e) {
Expand All @@ -261,6 +265,8 @@ public long pushData(boolean growThreshold) throws IOException {
}
if (offSet > 0) {
try {
shuffleClient.computeBatchCRC(
shuffleId, mapId, attemptNumber, currentPartition, dataBuf, 0, offSet);
dataPusher.addTask(currentPartition, dataBuf, offSet);
memoryThresholdManager.updateStats(offSet, offSet == pushBufferMaxSize);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ private byte[] getOrCreateBuffer(int partitionId) {

private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
logger.debug("Push giant record for partition {}, size {}.", partitionId, numBytes);
shuffleClient.computeBatchCRC(
shuffleId, mapId, encodedAttemptId, partitionId, buffer, 0, numBytes);
int bytesWritten =
shuffleClient.pushData(
shuffleId,
Expand Down Expand Up @@ -318,6 +320,7 @@ private void flushSendBuffer(int partitionId, byte[] buffer, int size)
throws IOException, InterruptedException {
long start = System.nanoTime();
logger.debug("Flush buffer for partition {}, size {}.", partitionId, size);
shuffleClient.computeBatchCRC(shuffleId, mapId, encodedAttemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);
writeMetrics.incWriteTime(System.nanoTime() - start);
}
Expand All @@ -338,6 +341,8 @@ private void close() throws IOException, InterruptedException {
for (int i = 0; i < sendBuffers.length; i++) {
final int size = sendOffsets[i];
if (size > 0) {
shuffleClient.computeBatchCRC(
shuffleId, mapId, encodedAttemptId, i, sendBuffers[i], 0, size);
int bytesWritten =
shuffleClient.mergeData(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ private void write0(scala.collection.Iterator iterator) throws IOException {

private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
shuffleClient.computeBatchCRC(
shuffleId, mapId, encodedAttemptId, partitionId, buffer, 0, numBytes);
int bytesWritten =
shuffleClient.pushData(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ private byte[] getOrCreateBuffer(int partitionId) {
protected void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throws IOException {
logger.debug("Push giant record, size {}.", numBytes);
long start = System.nanoTime();
shuffleClient.computeBatchCRC(
shuffleId, mapId, encodedAttemptId, partitionId, buffer, 0, numBytes);
int bytesWritten =
shuffleClient.pushData(
shuffleId,
Expand Down Expand Up @@ -321,6 +323,7 @@ private void flushSendBuffer(int partitionId, byte[] buffer, int size)
throws IOException, InterruptedException {
long start = System.nanoTime();
if (logger.isDebugEnabled()) logger.debug("Flush buffer, size {}.", Utils.bytesToString(size));
shuffleClient.computeBatchCRC(shuffleId, mapId, encodedAttemptId, partitionId, buffer, 0, size);
dataPusher.addTask(partitionId, buffer, size);
writeMetrics.incWriteTime(System.nanoTime() - start);
}
Expand All @@ -332,6 +335,8 @@ protected void closeWrite() throws IOException {
for (int i = 0; i < numPartitions; i++) {
final int size = sendOffsets[i];
if (size > 0) {
shuffleClient.computeBatchCRC(
shuffleId, mapId, encodedAttemptId, i, sendBuffers[i], 0, size);
mergeData(i, sendBuffers[i], 0, size);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ private void pushGiantRecord(int partitionId, byte[] buffer, int numBytes) throw
if (logger.isDebugEnabled())
logger.debug("Push giant record, size {}.", Utils.bytesToString(numBytes));
long start = System.nanoTime();
shuffleClient.computeBatchCRC(
shuffleId, mapId, encodedAttemptId, partitionId, buffer, 0, numBytes);
int bytesWritten =
shuffleClient.pushData(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ public int pushData(
return length;
}

@Override
public void computeBatchCRC(
int shuffleId,
int mapId,
int attemptId,
int partitionId,
byte[] data,
int offset,
int length) {}

@Override
public int mergeData(
int shuffleId,
Expand Down
14 changes: 14 additions & 0 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,20 @@ public abstract int pushData(
int numPartitions)
throws IOException;

/**
* Pre-compute CRC for a batch immediately after assembly in the writer, before the data enters
* the async push pipeline. This is the sole CRC accumulation path when shuffle integrity check is
* enabled; {@link #pushOrMergeData} does not perform CRC computation.
*/
public abstract void computeBatchCRC(
int shuffleId,
int mapId,
int attemptId,
int partitionId,
byte[] data,
int offset,
int length);

Comment on lines +191 to +204
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The computeBatchCRC() Javadoc says calling it “prevents double-computation in pushOrMergeData”, but this PR removes CRC accumulation from pushOrMergeData entirely. Please update the comment to match the new semantics (i.e., computeBatchCRC is now required for integrity-check metadata unless a fallback remains in pushOrMergeData), otherwise API users will be misled.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed.

public abstract int mergeData(
int shuffleId,
int mapId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,12 +1044,6 @@ public int pushOrMergeData(
// increment batchId
final int nextBatchId = pushState.nextBatchId();

Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing commit-metadata/CRC tracking from pushOrMergeData means integrity-check metadata is now only updated when callers explicitly invoke computeBatchCRC(). There are existing call sites that still call pushData()/mergeData() directly without computeBatchCRC (e.g. client-tez/tez/CelebornTezWriter.mergeData, client-mr/mr/CelebornSortBasedPusher.pushData, client-spark/columnar/ColumnarHashBasedShuffleWriter.closeColumnarWrite mergeData), so enabling celeborn.client.shuffle.integrityCheck.enabled will silently stop producing correct per-partition CRC/bytes for those paths. Please either keep a fallback CRC accumulation in pushOrMergeData (only when not already precomputed) or update all remaining writers/clients that call pushData/mergeData to invoke computeBatchCRC before enqueue/push.

Suggested change
// Preserve integrity-check metadata for callers that still invoke pushData()/mergeData()
// directly without explicitly calling computeBatchCRC() first. This must run before
// compression so CRC / bytes are recorded for the original batch payload.
computeBatchCRC(shuffleId, mapId, attemptId, partitionId, nextBatchId, data, offset, length);

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End-to-End Integrity currently does not support Tez/MR even before this change. If we want to add support, the changes will be similar.

// Track commit metadata if shuffle compression and integrity check are enabled and this request
// is not for pushing metadata itself.
if (shuffleIntegrityCheckEnabled) {
pushState.addDataWithOffsetAndLength(partitionId, data, offset, length);
}

if (shuffleCompressionEnabled && !skipCompress) {
// compress data
final Compressor compressor = compressorThreadLocal.get();
Expand Down Expand Up @@ -1404,6 +1398,23 @@ public int mergeData(
false);
}

@Override
public void computeBatchCRC(
int shuffleId,
int mapId,
int attemptId,
int partitionId,
byte[] data,
int offset,
int length) {
if (!shuffleIntegrityCheckEnabled) {
return;
}
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
PushState pushState = getPushState(mapKey);
Comment on lines +1410 to +1414
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we are doing here is computing CRC per batch, the CRC computation on the data payload (CRC32.update over potentially megabytes) dominates the per-batch cost, making the string allocation noise.

pushState.addDataWithOffsetAndLength(partitionId, data, offset, length);
}
Comment on lines +1401 to +1416
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment sounds right on high level: PushState are mutated cross-threads. But not necessarily true on micro-level: PushState.addDataWithOffsetAndLength mutable PushState.commitMetadataMap, before this PR, it is mutated on DataPusher thread; After this PR, it is mutated on writer thread. PushState.commitMetadataMap is never mutated cross multiple threads. So to keep things simple, will not do the change the comments suggested.


@Override
public void pushMergedData(int shuffleId, int mapId, int attemptId) throws IOException {
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import org.apache.celeborn.client.compress.Compressor;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.CommitMetadata;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.network.client.TransportClient;
Expand All @@ -61,6 +62,8 @@
import org.apache.celeborn.common.protocol.message.StatusCode;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.rpc.RpcTimeoutException;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushState;

public class ShuffleClientSuiteJ {

Expand Down Expand Up @@ -709,4 +712,43 @@ public void testCorrectParametersPassedInRequest() throws IOException {
assertEquals(crc32, capturedRequest.getCrc32());
assertEquals(bytesWritten, capturedRequest.getBytesWritten());
}

@Test
public void testComputeBatchCRCAccumulatesCorrectly() {
CelebornConf conf = new CelebornConf();
conf.set("celeborn.client.shuffle.integrityCheck.enabled", "true");
shuffleClient =
new ShuffleClientImpl(TEST_APPLICATION_ID, conf, new UserIdentifier("mock", "mock"));
shuffleClient.setupLifecycleManagerRef(endpointRef);

byte[] batch0 = "hello world".getBytes(StandardCharsets.UTF_8);
byte[] batch1a = "foo".getBytes(StandardCharsets.UTF_8);
byte[] batch1b = "bar".getBytes(StandardCharsets.UTF_8);

shuffleClient.computeBatchCRC(
TEST_SHUFFLE_ID, TEST_MAP_ID, TEST_ATTEMPT_ID, 0, batch0, 0, batch0.length);
shuffleClient.computeBatchCRC(
TEST_SHUFFLE_ID, TEST_MAP_ID, TEST_ATTEMPT_ID, 1, batch1a, 0, batch1a.length);
shuffleClient.computeBatchCRC(
TEST_SHUFFLE_ID, TEST_MAP_ID, TEST_ATTEMPT_ID, 1, batch1b, 0, batch1b.length);

PushState pushState =
shuffleClient.getPushState(Utils.makeMapKey(TEST_SHUFFLE_ID, TEST_MAP_ID, TEST_ATTEMPT_ID));

int numPartitions = 2;
int[] crcPerPartition = pushState.getCRC32PerPartition(true, numPartitions);
long[] bytesPerPartition = pushState.getBytesWrittenPerPartition(true, numPartitions);

// compute expected values via CommitMetadata — same code path as production
CommitMetadata expected0 = new CommitMetadata();
expected0.addDataWithOffsetAndLength(batch0, 0, batch0.length);
assertEquals(expected0.getChecksum(), crcPerPartition[0]);
assertEquals(expected0.getBytes(), bytesPerPartition[0]);

CommitMetadata expected1 = new CommitMetadata();
expected1.addDataWithOffsetAndLength(batch1a, 0, batch1a.length);
expected1.addDataWithOffsetAndLength(batch1b, 0, batch1b.length);
assertEquals(expected1.getChecksum(), crcPerPartition[1]);
assertEquals(expected1.getBytes(), bytesPerPartition[1]);
}
}
Loading