diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index 783d68a00cd9..c541df23bc39 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -150,7 +150,8 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId), nativeBufferSize, GlutenConfig.get.columnarShuffleReallocThreshold, - partitionWriterHandle + partitionWriterHandle, + false ) case SortShuffleWriterType => shuffleWriterJniWrapper.createSortShuffleWriter( diff --git a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index ef35818c7b59..72bd9f3c125a 100644 --- a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -185,7 +185,8 @@ protected void writeImpl(Iterator> records) { columnarDep.nativePartitioning(), partitionId), nativeBufferSize, reallocThreshold, - partitionWriterHandle); + partitionWriterHandle, + false); } runtime diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 01f4bd06bab6..10de292fb921 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -60,6 +60,8 @@ class ColumnarShuffleWriter[K, V]( private val blockManager = SparkEnv.get.blockManager + private val rowBasedChecksumEnabled: Boolean = GlutenMapStatusUtil.isRowBasedChecksumEnabled + // Are we in the process of stopping? Because map tasks can call stop() with success = true // and then call stop() with success = false if they get an exception, we want to make sure // we don't try deleting files, etc twice. @@ -192,7 +194,8 @@ class ColumnarShuffleWriter[K, V]( taskContext.partitionId), nativeBufferSize, reallocThreshold, - partitionWriterHandle + partitionWriterHandle, + rowBasedChecksumEnabled ) } @@ -277,11 +280,16 @@ class ColumnarShuffleWriter[K, V]( } } - // The partitionLength is much more than vanilla spark partitionLengths, - // almost 3 times than vanilla spark partitionLengths - // This value is sensitive in rules such as AQE rule OptimizeSkewedJoin DynamicJoinSelection - // May affect the final plan - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) + // Use native row-based checksums (order-independent, per-row XXH64) for MapStatus. + val rowChecksums = splitResult.getRowBasedChecksums + val aggregatedChecksum = if (rowChecksums != null && rowChecksums.nonEmpty) { + rowChecksums.foldLeft(0L)((acc, c) => acc * 31L + c) + } else 0L + mapStatus = GlutenMapStatusUtil.createMapStatus( + blockManager.shuffleServerId, + partitionLengths, + mapId, + aggregatedChecksum) } private def handleEmptyInput(): Unit = { @@ -292,7 +300,11 @@ class ColumnarShuffleWriter[K, V]( partitionLengths, Array[Long](), null) - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) + mapStatus = GlutenMapStatusUtil.createMapStatus( + blockManager.shuffleServerId, + partitionLengths, + mapId, + 0L) } @throws[IOException] diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9e194be6ea84..dc8ab49e70d9 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -269,7 +269,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, "close", "()V"); splitResultClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/GlutenSplitResult;"); - splitResultConstructor = getMethodIdOrError(env, splitResultClass, "", "(JJJJJJJJJJDJ[J[J)V"); + splitResultConstructor = getMethodIdOrError(env, splitResultClass, "", "(JJJJJJJJJJDJ[J[J[J)V"); metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;"); @@ -990,7 +990,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jint startPartitionId, jint splitBufferSize, jdouble splitBufferReallocThreshold, - jlong partitionWriterHandle) { + jlong partitionWriterHandle, + jboolean rowBasedChecksumEnabled) { JNI_METHOD_START const auto ctx = getRuntime(env, wrapper); @@ -1005,6 +1006,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe startPartitionId, splitBufferSize, splitBufferReallocThreshold); + shuffleWriterOptions->rowBasedChecksumEnabled = rowBasedChecksumEnabled; return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions)); JNI_METHOD_END(kInvalidObjectHandle) @@ -1159,6 +1161,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap auto rawSrc = reinterpret_cast(rawPartitionLengths.data()); env->SetLongArrayRegion(rawPartitionLengthArr, 0, rawPartitionLengths.size(), rawSrc); + const auto& rowBasedChecksums = shuffleWriter->rowBasedChecksums(); + auto rowBasedChecksumArr = env->NewLongArray(rowBasedChecksums.size()); + if (!rowBasedChecksums.empty()) { + auto checksumSrc = reinterpret_cast(rowBasedChecksums.data()); + env->SetLongArrayRegion(rowBasedChecksumArr, 0, rowBasedChecksums.size(), checksumSrc); + } + jobject splitResult = env->NewObject( splitResultClass, splitResultConstructor, @@ -1175,7 +1184,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap shuffleWriter->avgDictionaryFields(), shuffleWriter->dictionarySize(), partitionLengthArr, - rawPartitionLengthArr); + rawPartitionLengthArr, + rowBasedChecksumArr); return splitResult; JNI_METHOD_END(nullptr) diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 1d7f9ad9f9c7..b0df56e72e43 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -69,6 +69,7 @@ struct ShuffleWriterOptions { ShuffleWriterType shuffleWriterType; Partitioning partitioning = Partitioning::kRoundRobin; int32_t startPartitionId = 0; + bool rowBasedChecksumEnabled = false; ShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : shuffleWriterType(shuffleWriterType) {} @@ -224,5 +225,6 @@ struct ShuffleWriterMetrics { int64_t dictionarySize{0}; std::vector partitionLengths{}; std::vector rawPartitionLengths{}; // Uncompressed size. + std::vector rowBasedChecksums{}; // Per-partition row-based checksums. }; } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleWriter.cc b/cpp/core/shuffle/ShuffleWriter.cc index 3f0feadfb0f6..7287181ffa95 100644 --- a/cpp/core/shuffle/ShuffleWriter.cc +++ b/cpp/core/shuffle/ShuffleWriter.cc @@ -109,6 +109,10 @@ const std::vector& ShuffleWriter::rawPartitionLengths() const { return metrics_.rawPartitionLengths; } +const std::vector& ShuffleWriter::rowBasedChecksums() const { + return metrics_.rowBasedChecksums; +} + ShuffleWriter::ShuffleWriter(int32_t numPartitions, Partitioning partitioning) : numPartitions_(numPartitions), partitioning_(partitioning) {} } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h index 934ad090763b..2a892e2b32f0 100644 --- a/cpp/core/shuffle/ShuffleWriter.h +++ b/cpp/core/shuffle/ShuffleWriter.h @@ -67,6 +67,8 @@ class ShuffleWriter : public Reclaimable { const std::vector& rawPartitionLengths() const; + const std::vector& rowBasedChecksums() const; + protected: ShuffleWriter(int32_t numPartitions, Partitioning partitioning); diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index f25d4ac88764..969669fd9af2 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -25,6 +25,8 @@ #include "utils/VeloxArrowUtils.h" #include "velox/buffer/Buffer.h" #include "velox/common/base/Nulls.h" +#include "velox/external/xxhash/xxhash.h" +#include "velox/row/UnsafeRowFast.h" #include "velox/type/HugeInt.h" #include "velox/type/Timestamp.h" #include "velox/type/Type.h" @@ -182,6 +184,11 @@ arrow::Status VeloxHashShuffleWriter::init() { partitionBufferBase_.resize(numPartitions_); + if (rowBasedChecksumEnabled_) { + checksumXor_.resize(numPartitions_, 0); + checksumSum_.resize(numPartitions_, 0); + } + return arrow::Status::OK(); } @@ -361,6 +368,17 @@ arrow::Status VeloxHashShuffleWriter::stop() { stat(); + // Populate row-based checksums into metrics. + if (rowBasedChecksumEnabled_) { + metrics_.rowBasedChecksums.resize(numPartitions_); + for (auto pid = 0; pid < numPartitions_; ++pid) { + int64_t xorVal = checksumXor_[pid]; + int64_t sumVal = checksumSum_[pid]; + int64_t rotated = (static_cast(sumVal) << 27) | (static_cast(sumVal) >> 37); + metrics_.rowBasedChecksums[pid] = xorVal ^ rotated; + } + } + return arrow::Status::OK(); } @@ -422,6 +440,7 @@ void VeloxHashShuffleWriter::setSplitState(SplitState state) { arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector& rv, int64_t memLimit) { auto rowNum = rv.size(); RETURN_NOT_OK(buildPartition2Row(rowNum)); + computeRowBasedChecksums(rv); RETURN_NOT_OK(updateInputHasNull(rv)); { @@ -1503,4 +1522,49 @@ bool VeloxHashShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr return (rv->size() > maxBatchSize_ && maxBatchSize_ > 0); } +void VeloxHashShuffleWriter::computeRowBasedChecksums(const facebook::velox::RowVector& rv) { + if (!rowBasedChecksumEnabled_) { + return; + } + + auto numRows = rv.size(); + VELOX_DCHECK(rv.nulls() == nullptr, "RowVector with top-level nulls not supported for checksum"); + // Get the RowVector to serialize (strip pid column if present). + facebook::velox::RowVectorPtr dataVector; + if (partitioner_->hasPid()) { + // Strip the first column (partition id). + auto rowType = std::dynamic_pointer_cast(rv.type()); + std::vector names(rowType->names().begin() + 1, rowType->names().end()); + std::vector types(rowType->children().begin() + 1, rowType->children().end()); + std::vector children(rv.children().begin() + 1, rv.children().end()); + auto dataType = facebook::velox::ROW(std::move(names), std::move(types)); + dataVector = + std::make_shared(rv.pool(), dataType, nullptr, numRows, std::move(children)); + } else { + auto rowType = std::dynamic_pointer_cast(rv.type()); + dataVector = std::make_shared(rv.pool(), rowType, nullptr, numRows, rv.children()); + } + + facebook::velox::row::UnsafeRowFast fast(dataVector); + auto dataType = std::dynamic_pointer_cast(dataVector->type()); + auto fixedSize = facebook::velox::row::UnsafeRowFast::fixedRowSize(dataType); + int32_t bufSize = fixedSize.value_or(1024); + if (checksumBuffer_.size() < static_cast(bufSize)) { + checksumBuffer_.resize(bufSize); + } + + for (uint32_t row = 0; row < numRows; ++row) { + auto pid = row2Partition_[row]; + auto size = fast.rowSize(row); + if (size > static_cast(checksumBuffer_.size())) { + checksumBuffer_.resize(size); + } + fast.serialize(row, checksumBuffer_.data()); + + auto hash = static_cast(XXH64(checksumBuffer_.data(), size, 0)); + checksumXor_[pid] ^= hash; + checksumSum_[pid] += hash; + } +} + } // namespace gluten diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h b/cpp/velox/shuffle/VeloxHashShuffleWriter.h index 9203c70594cd..f8591cc93f05 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h @@ -214,7 +214,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { MemoryManager* memoryManager) : VeloxShuffleWriter(numPartitions, partitionWriter, options, memoryManager), splitBufferSize_(options->splitBufferSize), - splitBufferReallocThreshold_(options->splitBufferReallocThreshold) { + splitBufferReallocThreshold_(options->splitBufferReallocThreshold), + rowBasedChecksumEnabled_(options->rowBasedChecksumEnabled) { arenas_.resize(numPartitions); } @@ -436,6 +437,14 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { std::optional partitionBufferInUse_{std::nullopt}; std::vector> arenas_; + + // Row-based checksum state (per-partition XOR + SUM aggregation). + bool rowBasedChecksumEnabled_{false}; + std::vector checksumXor_; + std::vector checksumSum_; + std::vector checksumBuffer_; + + void computeRowBasedChecksums(const facebook::velox::RowVector& rv); }; // class VeloxHashBasedShuffleWriter } // namespace gluten diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index 0c61850e12ce..3a4396d06a52 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -132,6 +132,7 @@ add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc add_velox_test(runtime_test SOURCES RuntimeTest.cc) add_velox_test(velox_memory_test SOURCES MemoryManagerTest.cc) add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc) +add_velox_test(row_based_checksum_test SOURCES RowBasedChecksumTest.cc) if(BUILD_EXAMPLES) add_velox_test(my_udf_test SOURCES MyUdfTest.cc) endif() diff --git a/cpp/velox/tests/RowBasedChecksumTest.cc b/cpp/velox/tests/RowBasedChecksumTest.cc new file mode 100644 index 000000000000..8f640dfdd97b --- /dev/null +++ b/cpp/velox/tests/RowBasedChecksumTest.cc @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/common/memory/Memory.h" +#include "velox/external/xxhash/xxhash.h" +#include "velox/row/UnsafeRowFast.h" +#include "velox/type/Type.h" +#include "velox/vector/FlatVector.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; + +class RowBasedChecksumTest : public test::VectorTestBase, public testing::Test { + protected: + static void SetUpTestSuite() { + memory::MemoryManager::testingSetInstance({}); + } + // Simulate the checksum computation from VeloxHashShuffleWriter. + std::pair computeChecksums(const RowVectorPtr& rv, const std::vector& rowOrder) { + row::UnsafeRowFast fast(rv); + auto rowType = std::dynamic_pointer_cast(rv->type()); + auto fixedSize = row::UnsafeRowFast::fixedRowSize(rowType); + int32_t bufSize = fixedSize.value_or(1024); + std::vector buffer(bufSize, 0); + + int64_t checksumXor = 0; + int64_t checksumSum = 0; + + for (auto row : rowOrder) { + auto size = fast.rowSize(row); + if (size > static_cast(buffer.size())) { + buffer.resize(size); + } + std::memset(buffer.data(), 0, size); + fast.serialize(row, buffer.data()); + + auto hash = static_cast(XXH64(buffer.data(), size, 0)); + checksumXor ^= hash; + checksumSum += hash; + } + + int64_t rotated = (static_cast(checksumSum) << 27) | (static_cast(checksumSum) >> 37); + return {checksumXor ^ rotated, checksumSum}; + } +}; + +TEST_F(RowBasedChecksumTest, orderIndependence) { + // Create a RowVector with 5 rows: (int, string) + auto rv = makeRowVector( + {"a", "b"}, + {makeFlatVector({10, 20, 30, 40, 50}), + makeFlatVector({"hello", "world", "foo", "bar", "baz"})}); + + // Compute checksum in original order + std::vector order1 = {0, 1, 2, 3, 4}; + auto [checksum1, _1] = computeChecksums(rv, order1); + + // Compute checksum in reversed order + std::vector order2 = {4, 3, 2, 1, 0}; + auto [checksum2, _2] = computeChecksums(rv, order2); + + // Compute checksum in shuffled order + std::vector order3 = {2, 4, 0, 3, 1}; + auto [checksum3, _3] = computeChecksums(rv, order3); + + // All should be equal (order-independent) + EXPECT_EQ(checksum1, checksum2); + EXPECT_EQ(checksum1, checksum3); + EXPECT_NE(checksum1, 0); // Should be non-zero +} + +TEST_F(RowBasedChecksumTest, differentDataProducesDifferentChecksum) { + auto rv1 = makeRowVector({"a"}, {makeFlatVector({1, 2, 3})}); + auto rv2 = makeRowVector({"a"}, {makeFlatVector({1, 2, 4})}); // last value different + + std::vector order = {0, 1, 2}; + auto [checksum1, _1] = computeChecksums(rv1, order); + auto [checksum2, _2] = computeChecksums(rv2, order); + + EXPECT_NE(checksum1, checksum2); +} + +TEST_F(RowBasedChecksumTest, nullHandling) { + auto rv1 = makeRowVector({"a"}, {makeNullableFlatVector({1, std::nullopt, 3})}); + auto rv2 = makeRowVector({"a"}, {makeNullableFlatVector({1, 0, 3})}); // 0 vs null + + std::vector order = {0, 1, 2}; + auto [checksum1, _1] = computeChecksums(rv1, order); + auto [checksum2, _2] = computeChecksums(rv2, order); + + // null and 0 should produce different checksums + EXPECT_NE(checksum1, checksum2); +} + +TEST_F(RowBasedChecksumTest, deterministic) { + auto rv = + makeRowVector({"a", "b"}, {makeFlatVector({100, 200, 300}), makeFlatVector({1.1, 2.2, 3.3})}); + + std::vector order = {0, 1, 2}; + auto [checksum1, _1] = computeChecksums(rv, order); + auto [checksum2, _2] = computeChecksums(rv, order); + + // Same input, same order -> same result (deterministic) + EXPECT_EQ(checksum1, checksum2); +} diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java index 96b2a3fc541e..a22d27da1193 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java @@ -25,6 +25,7 @@ public class GlutenSplitResult { private final long totalBytesEvicted; private final long[] partitionLengths; private final long[] rawPartitionLengths; + private final long[] rowBasedChecksums; private final long bytesToEvict; private final long peakBytes; private final long sortTime; @@ -46,7 +47,8 @@ public GlutenSplitResult( double avgDictionaryFields, long dictionarySize, long[] partitionLengths, - long[] rawPartitionLengths) { + long[] rawPartitionLengths, + long[] rowBasedChecksums) { this.totalComputePidTime = totalComputePidTime; this.totalWriteTime = totalWriteTime; this.totalEvictTime = totalEvictTime; @@ -55,6 +57,7 @@ public GlutenSplitResult( this.totalBytesEvicted = totalBytesEvicted; this.partitionLengths = partitionLengths; this.rawPartitionLengths = rawPartitionLengths; + this.rowBasedChecksums = rowBasedChecksums; this.bytesToEvict = totalBytesToEvict; this.peakBytes = peakBytes; this.sortTime = totalSortTime; @@ -99,6 +102,10 @@ public long[] getRawPartitionLengths() { return rawPartitionLengths; } + public long[] getRowBasedChecksums() { + return rowBasedChecksums; + } + public long getBytesToEvict() { return bytesToEvict; } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index a389da68603e..1e32b3fce07d 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -43,7 +43,8 @@ public native long createHashShuffleWriter( int startPartitionId, int splitBufferSize, double splitBufferReallocThreshold, - long partitionWriterHandle); + long partitionWriterHandle, + boolean rowBasedChecksumEnabled); public native long createSortShuffleWriter( int numPartitions, diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 71b12e340e25..6751a67a9f91 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -1027,7 +1027,7 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-49386: test SortMergeJoin (with spill by size threshold)") enableSuite[GlutenMathFunctionsSuite] // TODO: fix on Spark-4.1 see https://github.com/apache/spark/pull/50230 - // enableSuite[GlutenMapStatusEndToEndSuite] + enableSuite[GlutenMapStatusEndToEndSuite] enableSuite[GlutenMetadataCacheSuite] .exclude("SPARK-16336,SPARK-27961 Suggest fixing FileNotFoundException") enableSuite[GlutenMiscFunctionsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenMapStatusEndToEndSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenMapStatusEndToEndSuite.scala index 309ec5d227fc..4eeeb0211699 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenMapStatusEndToEndSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenMapStatusEndToEndSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.sql +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf class GlutenMapStatusEndToEndSuite extends MapStatusEndToEndSuite with GlutenTestsTrait { @@ -28,5 +29,58 @@ class GlutenMapStatusEndToEndSuite extends MapStatusEndToEndSuite with GlutenTes _spark.sparkContext.conf .set(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, "false") _spark.conf.set(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, "false") + + // Disable ANSI fallback to force Gluten's ColumnarShuffleWriter path. + _spark.conf.set("spark.gluten.sql.ansiFallback.enabled", "false") + } + + import org.apache.spark.MapOutputTrackerMaster + + private def getLatestShuffleChecksumValues(): Array[Long] = { + val tracker = _spark.sparkContext.env.mapOutputTracker + .asInstanceOf[MapOutputTrackerMaster] + val latestShuffleId = tracker.shuffleStatuses.keys.max + tracker.shuffleStatuses(latestShuffleId).mapStatuses.map(_.checksumValue) + } + + test("Gluten row-based checksum is deterministic") { + withSQLConf( + SQLConf.SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.key -> "true", + SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") { + withTable("t_det1", "t_det2") { + _spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_det1") + val checksums1 = getLatestShuffleChecksumValues() + + _spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_det2") + val checksums2 = getLatestShuffleChecksumValues() + + // Same input -> same checksumValue (deterministic) + assert( + checksums1.zip(checksums2).forall { case (a, b) => a == b }, + s"Checksums not deterministic: ${checksums1.toSeq} vs ${checksums2.toSeq}") + } + } + } + + test("Gluten row-based checksum detects data change") { + withSQLConf( + SQLConf.SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED.key -> "true", + SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") { + withTable("t_diff1", "t_diff2") { + _spark.range(500).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_diff1") + val checksums1 = getLatestShuffleChecksumValues() + + // Different data + _spark.range( + 500, + 1000).repartition(5, col("id")).write.mode("overwrite").saveAsTable("t_diff2") + val checksums2 = getLatestShuffleChecksumValues() + + // Different input -> different checksumValue + assert( + checksums1.zip(checksums2).exists { case (a, b) => a != b }, + s"Checksums should differ for different data: ${checksums1.toSeq} vs ${checksums2.toSeq}") + } + } } } diff --git a/shims/spark33/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala b/shims/spark33/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala new file mode 100644 index 000000000000..9b5000946a22 --- /dev/null +++ b/shims/spark33/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.storage.BlockManagerId + +object GlutenMapStatusUtil { + def isRowBasedChecksumEnabled: Boolean = false + + def createMapStatus( + loc: BlockManagerId, + uncompressedSizes: Array[Long], + mapTaskId: Long, + checksumValue: Long): MapStatus = { + MapStatus(loc, uncompressedSizes, mapTaskId) + } +} diff --git a/shims/spark34/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala b/shims/spark34/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala new file mode 100644 index 000000000000..9b5000946a22 --- /dev/null +++ b/shims/spark34/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.storage.BlockManagerId + +object GlutenMapStatusUtil { + def isRowBasedChecksumEnabled: Boolean = false + + def createMapStatus( + loc: BlockManagerId, + uncompressedSizes: Array[Long], + mapTaskId: Long, + checksumValue: Long): MapStatus = { + MapStatus(loc, uncompressedSizes, mapTaskId) + } +} diff --git a/shims/spark35/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala b/shims/spark35/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala new file mode 100644 index 000000000000..9b5000946a22 --- /dev/null +++ b/shims/spark35/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.storage.BlockManagerId + +object GlutenMapStatusUtil { + def isRowBasedChecksumEnabled: Boolean = false + + def createMapStatus( + loc: BlockManagerId, + uncompressedSizes: Array[Long], + mapTaskId: Long, + checksumValue: Long): MapStatus = { + MapStatus(loc, uncompressedSizes, mapTaskId) + } +} diff --git a/shims/spark40/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala b/shims/spark40/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala new file mode 100644 index 000000000000..9b5000946a22 --- /dev/null +++ b/shims/spark40/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.storage.BlockManagerId + +object GlutenMapStatusUtil { + def isRowBasedChecksumEnabled: Boolean = false + + def createMapStatus( + loc: BlockManagerId, + uncompressedSizes: Array[Long], + mapTaskId: Long, + checksumValue: Long): MapStatus = { + MapStatus(loc, uncompressedSizes, mapTaskId) + } +} diff --git a/shims/spark41/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala b/shims/spark41/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala new file mode 100644 index 000000000000..bf8108c4b063 --- /dev/null +++ b/shims/spark41/src/main/scala/org/apache/spark/shuffle/GlutenMapStatusUtil.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle + +import org.apache.spark.scheduler.MapStatus +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.storage.BlockManagerId + +object GlutenMapStatusUtil { + def isRowBasedChecksumEnabled: Boolean = { + val sqlConf = SQLConf.get + sqlConf.shuffleOrderIndependentChecksumEnabled || + sqlConf.shuffleChecksumMismatchFullRetryEnabled + } + + def createMapStatus( + loc: BlockManagerId, + uncompressedSizes: Array[Long], + mapTaskId: Long, + checksumValue: Long): MapStatus = { + MapStatus(loc, uncompressedSizes, mapTaskId, checksumValue) + } +}