Skip to content
Open
Show file tree
Hide file tree
Changes from 66 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
450879a
[GLUTEN-XXXXX][VL] PA-1 RED: ColumnarCachedBatchKryoSuite for stats f…
yaooqinn May 12, 2026
b5754c0
[GLUTEN-XXXXX][VL] PA-1 GREEN: CachedColumnarBatch stats field + Kryo…
yaooqinn May 12, 2026
7929b09
[GLUTEN-XXXXX][VL] PA-2.1 RED: VeloxColumnarBatchSerializer::computeS…
yaooqinn May 12, 2026
6924c67
[GLUTEN-XXXXX][VL] PA-2.1 GREEN: ColumnStats + computeStats(BIGINT Fl…
yaooqinn May 12, 2026
8cc05bf
[GLUTEN-XXXXX][VL] PA-2.2 RED: NaN Float partition must not participa…
yaooqinn May 12, 2026
e5d0993
[GLUTEN-XXXXX][VL] PA-2.2 GREEN: REAL FlatVector scan with NaN poison…
yaooqinn May 12, 2026
5466854
[GLUTEN-XXXXX][VL] PA-2.4 RED: HUGEINT (long Decimal P>=19) FlatVecto…
yaooqinn May 12, 2026
aeddf5e
[GLUTEN-XXXXX][VL] PA-2.4 GREEN: HUGEINT (long Decimal P>=19) FlatVec…
yaooqinn May 12, 2026
5649593
[GLUTEN-XXXXX][VL] PA-2.5a RED: framedSerializeWithStats top-level la…
yaooqinn May 12, 2026
d520cd4
[GLUTEN-XXXXX][VL] PA-2.5a GREEN: framedSerializeWithStats top-level …
yaooqinn May 12, 2026
05e84d7
[GLUTEN-XXXXX][VL] PA-2.5b RED: statsBlob layout (BIGINT 1-col)
yaooqinn May 12, 2026
172785a
[GLUTEN-XXXXX][VL] PA-2.5b GREEN: statsBlob BIGINT marshal + PA-2.5a …
yaooqinn May 12, 2026
593eff8
[GLUTEN-XXXXX][VL] PA-2.5c: JNI bridge serializeWithStats (non-TDD pl…
yaooqinn May 12, 2026
d820720
[GLUTEN-XXXXX][VL] PA-2.6: capability check supportsStatsExt (non-TDD…
yaooqinn May 12, 2026
41d5b0e
[GLUTEN-XXXXX][VL] PA-3.0 doc-only: fix scaladoc 'graceful pass-throu…
yaooqinn May 12, 2026
4ad8d17
[GLUTEN-XXXXX][VL] PA-3.1 RED: buildFilter stats=null NPE under Equal…
yaooqinn May 12, 2026
407ef5b
[GLUTEN-XXXXX][VL] PA-3.1 GREEN: lazy-split iterator wrapper for buil…
yaooqinn May 12, 2026
50a8358
[GLUTEN-XXXXX][VL] PA-3.2 RED: statsBlob binary framing (BIGINT 1-col)
yaooqinn May 12, 2026
689743b
[GLUTEN-XXXXX][VL] PA-3.2 GREEN: replace Java Ser placeholder with cp…
yaooqinn May 12, 2026
e67cfdd
[GLUTEN-XXXXX][VL] PA-3.3 RED: framed bytes parser test (BIGINT 1-col)
yaooqinn May 12, 2026
4dbc711
[GLUTEN-XXXXX][VL] PA-3.3 GREEN: parseFramedBytes + write path JNI wi…
yaooqinn May 12, 2026
5b8d63e
[GLUTEN-XXXXX][VL] PA-3.4: lock down lazy-split wrapper prune semanti…
yaooqinn May 12, 2026
824dde9
PA-3.5: add ColumnarCachedBatchE2ESuite (e2e smoke)
yaooqinn May 12, 2026
c17fa3d
PA-4: SQLConf gate spark.gluten.sql.columnar.tableCache.partitionStat…
yaooqinn May 12, 2026
cbd0755
PA-3.5: fix numOutputRows lower bound (0 is legal full-prune)
yaooqinn May 12, 2026
37267ff
test(cache-stats): PA-5 4 ignore as PA-6..PA-10 acceptance criteria
yaooqinn May 13, 2026
acc582e
feat(cache-stats): PA-6.0 plumbing schema field + Kryo wire + dispatc…
yaooqinn May 13, 2026
d515013
feat(cache-stats): PA-6.A INT round-trip 4B LE dispatch by schema
yaooqinn May 13, 2026
1fd6f10
feat(cache-stats): PA-6.B SMALLINT round-trip 2B LE dispatch
yaooqinn May 13, 2026
9544d5e
feat(cache-stats): PA-6.C TINYINT round-trip 1B dispatch
yaooqinn May 13, 2026
88675fc
feat(cache-stats): PA-6.F YearMonth/DayTime interval shares INT/LONG …
yaooqinn May 13, 2026
85fd29d
feat(cache-stats): PA-6.G Date / Timestamp / TimestampNTZ dispatch (s…
yaooqinn May 13, 2026
7eb0eb6
[gluten-cache-stats][PA-6.2] cpp computeStats + framedSerializeWithSt…
yaooqinn May 13, 2026
e692ac7
[gluten-cache-stats][PA-6.2.D] Date e2e prune correctness via INTEGER…
yaooqinn May 13, 2026
4018647
[gluten-cache-stats][PA-6.5] hotfix Layer-2 mid-review BLOCKERS
yaooqinn May 13, 2026
155373a
[gluten-cache-stats][PA-7] short-Decimal (precision <= 18) marshal vi…
yaooqinn May 13, 2026
9781b6f
[gluten-cache-stats][PA-8] long-Decimal (precision > 18) marshal via …
yaooqinn May 13, 2026
cd98d9a
[gluten-cache-stats][PA-8.5] hotfix Layer-2 #2 SB1: count semantics i…
yaooqinn May 13, 2026
b810dfa
[gluten-cache-stats][PA-10] Float (4B) + Double (8B) + Boolean (1B) m…
yaooqinn May 13, 2026
9022427
PA-9: JVM-side String marshal (256B truncate + carry-overflow demote)
yaooqinn May 13, 2026
9c9a61e
PA-6.2.E: cpp TIMESTAMP scan + emit (toMicros -> 8B LE int64)
yaooqinn May 13, 2026
ccac88f
PA-6.2.E e2e Timestamp prune sentinel
yaooqinn May 13, 2026
1aaf839
PA-9 cpp: VARCHAR scan + emit + e2e prune sentinel
yaooqinn May 13, 2026
660cf15
CB-1: TIMESTAMP nanos conservative widen (floor lo, ceil hi)
yaooqinn May 13, 2026
0d90339
CB-2: cpp VARCHAR truncate to 256B at source (single source of truth)
yaooqinn May 13, 2026
da97b0b
S-1a: import + drop body FQDNs in ColumnarCachedBatchSerializer.scala
yaooqinn May 13, 2026
1dcbdb7
S-1b-1: tighten top + bottom scaladoc
yaooqinn May 13, 2026
67d1bb2
S-1b-2: tighten Kryo write/read inline comments + extract readOptiona…
yaooqinn May 13, 2026
1f53686
S-1b-3: tighten object body comments + collapse multi-line case arms
yaooqinn May 13, 2026
417e3cf
S-1b-4: tighten buildFilter wrapper + extract statsOf helper
yaooqinn May 13, 2026
66fad51
S-2: tighten cpp VeloxColumnarBatchSerializer comments
yaooqinn May 13, 2026
cb50268
S-3: tighten cpp test PA-x.y RED narratives -> 1-2 line contracts
yaooqinn May 13, 2026
80cc7dc
S-4-1: tighten test suite scaladocs + per-test PA-x.y RED prologues
yaooqinn May 13, 2026
e67a264
S-4-2: drop body FQDNs in test suites, import short names
yaooqinn May 13, 2026
a9948f8
S-5: tighten cpp/Java edge file PA-x.y comments
yaooqinn May 13, 2026
2c1b18c
Strip internal codenames from code body (OSS hygiene)
yaooqinn May 13, 2026
3d45e50
Rename PA9_STRING_TRUNCATE_LEN -> STRING_BOUND_TRUNCATE_LEN
yaooqinn May 13, 2026
cb745d3
Drop redundant pre-test() comments in 8 test suites
yaooqinn May 13, 2026
efa489c
Drop ColumnarCachePartitionStatsConfSuite
yaooqinn May 13, 2026
f3d0b09
Drop Kryo wire versioning (V1/V2/V3 magic prefix)
yaooqinn May 13, 2026
a2d56ee
Float/Double boundary contract: +/-Inf, +/-0, subnormal not poisoned
yaooqinn May 13, 2026
71cd336
Fail-fast on unknown-arm upperSkipLen wire bound
yaooqinn May 13, 2026
7379769
Final pre-PR fixes: lowerLen bound, StringType subtype-match, numCols…
yaooqinn May 13, 2026
6ad01d0
Add ColumnarTableCachePartitionStatsBenchmark + results
yaooqinn May 14, 2026
99b37d0
Apply clang-format-15 to cpp sources
yaooqinn May 14, 2026
2fccbd5
Regenerate docs/Configuration.md via dev/gen-all-config-docs.sh
yaooqinn May 14, 2026
9145670
[VL] Replace reflection-based capability check with callsite Unsatisf…
yaooqinn May 15, 2026
94d7207
[VL] Reject oversized serialized batch payload above u32 framing limit
yaooqinn May 15, 2026
491070b
[VL] Bound CachedColumnarBatch Kryo read length fields and tolerate V…
yaooqinn May 15, 2026
1bf1524
[VL] Cpp stats correctness: bool/NaN/non-flat null counting
yaooqinn May 15, 2026
3f9c923
[VL] Hoist per-iterator StructType + add disabled-config E2E test
yaooqinn May 15, 2026
61d1734
[VL] Trim review-narrative residue from new comments (no behavior cha…
yaooqinn May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.types.{Decimal, DecimalType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String

import org.scalatest.funsuite.AnyFunSuite

import java.util.Arrays

/**
* Ship-blocker acceptance tests for the non-BIGINT marshal paths (Decimal short/long, String, Float
* / Double NaN guard). Guards against silent corruption (wrong Decimal scale, wrong UTF-8 byte
* order, lost BigInteger sign) on round-trip.
*/
class ColumnarCacheShipBlockerMarshalSuite extends AnyFunSuite {

test("Decimal(10, 2) round-trip preserves value") {
val lo = Decimal(BigDecimal("1.50"), 10, 2)
val hi = Decimal(BigDecimal("99.99"), 10, 2)
val stats: InternalRow = new GenericInternalRow(
Array[Any](lo, hi, 0, 100, 800L))
val schema = StructType(Seq(StructField("d", DecimalType(10, 2))))
val blob = CachedColumnarBatchKryoSerializer.serializeStats(stats, schema)
val read = CachedColumnarBatchKryoSerializer.deserializeStats(blob, schema)
val dt = DecimalType(10, 2)
val readLo = read.get(0, dt)
val readHi = read.get(1, dt)
assert(readLo == lo, s"lower bound corrupted: expected $lo got $readLo")
assert(readHi == hi, s"upper bound corrupted: expected $hi got $readHi")
}

test("Decimal(30, 5) round-trip preserves big value") {
val big = BigDecimal("12345678901234567890.12345")
val lo = Decimal(big.bigDecimal.negate(), 30, 5)
val hi = Decimal(big, 30, 5)
val stats: InternalRow = new GenericInternalRow(
Array[Any](lo, hi, 0, 100, 1600L))
val schema = StructType(Seq(StructField("d", DecimalType(30, 5))))
val blob = CachedColumnarBatchKryoSerializer.serializeStats(stats, schema)
val read = CachedColumnarBatchKryoSerializer.deserializeStats(blob, schema)
val dt = DecimalType(30, 5)
val readLo = read.get(0, dt)
val readHi = read.get(1, dt)
assert(readLo == lo, s"lower bound corrupted: expected $lo got $readLo")
assert(readHi == hi, s"upper bound corrupted: expected $hi got $readHi")
}

test("String byte-wise lex round-trip preserves UTF-8 bytes") {
val lo = UTF8String.fromString("apple")
// UTF-8 bytes for two CJK code points (U+4E2D U+6587) constructed from hex
// to keep this file ASCII-only (scalastyle nonascii filter).
val hi = UTF8String.fromBytes(Array[Byte](
0xe4.toByte,
0xb8.toByte,
0xad.toByte,
0xe6.toByte,
0x96.toByte,
0x87.toByte))
val stats: InternalRow = new GenericInternalRow(
Array[Any](lo, hi, 0, 100, 1024L))
val schema = StructType(Seq(StructField("s", StringType)))
val blob = CachedColumnarBatchKryoSerializer.serializeStats(stats, schema)
val read = CachedColumnarBatchKryoSerializer.deserializeStats(blob, schema)
val readLo = read.getUTF8String(0)
val readHi = read.getUTF8String(1)
assert(readLo == lo, s"lower bound corrupted: expected $lo got $readLo")
assert(readHi == hi, s"upper bound corrupted: expected $hi got $readHi")
}

test("String truncation to 256B widens upper bound monotonically") {
val loBytes = new Array[Byte](100)
Arrays.fill(loBytes, 'a'.toByte)
// Upper: 300 bytes of 'm' followed by trailing chars. Truncated prefix
// is 256 'm's (all 0x6d); +1 carry on last byte -> last byte = 0x6e.
val hiBytes = new Array[Byte](300)
Arrays.fill(hiBytes, 'm'.toByte)
val lo = UTF8String.fromBytes(loBytes)
val hi = UTF8String.fromBytes(hiBytes)
val stats: InternalRow = new GenericInternalRow(Array[Any](lo, hi, 0, 100, 50000L))
val schema = StructType(Seq(StructField("s", StringType)))
val blob = CachedColumnarBatchKryoSerializer.serializeStats(stats, schema)
val read = CachedColumnarBatchKryoSerializer.deserializeStats(blob, schema)
val readLo = read.getUTF8String(0)
val readHi = read.getUTF8String(1)
assert(readLo.numBytes() == 100, s"lower untruncated, got numBytes=${readLo.numBytes()}")
assert(readHi.numBytes() == 256, s"upper should truncate to 256B, got ${readHi.numBytes()}")
val readHiArr = readHi.getBytes
// First 255 bytes still 'm', last byte 'm'+1 = 'n' (0x6e) due to carry.
var i = 0
while (i < 255) {
assert(readHiArr(i) == 'm'.toByte, s"upper byte $i = ${readHiArr(i)}, expected 'm'")
i += 1
}
assert(
readHiArr(255) == 'n'.toByte,
s"upper byte 255 should be 'n' (carry), got ${readHiArr(255)}")
}

test("String carry overflow demotes column to unsupported") {
val loBytes = new Array[Byte](10)
Arrays.fill(loBytes, 0xff.toByte)
val hiBytes = new Array[Byte](300)
Arrays.fill(hiBytes, 0xff.toByte)
val lo = UTF8String.fromBytes(loBytes)
val hi = UTF8String.fromBytes(hiBytes)
val stats: InternalRow = new GenericInternalRow(Array[Any](lo, hi, 0, 100, 50000L))
val schema = StructType(Seq(StructField("s", StringType)))
val blob = CachedColumnarBatchKryoSerializer.serializeStats(stats, schema)
val read = CachedColumnarBatchKryoSerializer.deserializeStats(blob, schema)
assert(read.isNullAt(0), "lower bound must be null when carry overflows")
assert(read.isNullAt(1), "upper bound must be null when carry overflows")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, GenericInternalRow, Literal}
import org.apache.spark.sql.columnar.CachedBatch
import org.apache.spark.sql.types.LongType

import org.scalatest.funsuite.AnyFunSuite

/**
* Tests for the lazy-split wrapper's stats!=null branch: batches whose [lowerBound, upperBound]
* does not cover the literal must be pruned; batches whose range covers it must be returned.
*
* Pure JVM. Uses GenericInternalRow with the vanilla 5-slot-per-col schema (lower, upper,
* nullCount, count, sizeInBytes).
*/
class ColumnarCachedBatchBuildFilterPruneSuite extends AnyFunSuite {

// Build a CachedColumnarBatch with BIGINT 1-col stats [lower, upper].
private def batchWithStats(numRows: Int, lower: Long, upper: Long): CachedColumnarBatch = {
val stats = new GenericInternalRow(Array[Any](lower, upper, 0, numRows, numRows * 8L))
CachedColumnarBatch(
numRows = numRows,
sizeInBytes = numRows * 8L,
bytes = Array.fill[Byte](numRows * 4)(0),
stats = stats)
}

test("EqualTo literal in [lower, upper] keeps the batch") {
val serializer = new ColumnarCachedBatchSerializer
val attr = AttributeReference("id", LongType, nullable = false)()
val predicate = EqualTo(attr, Literal(50L))
val filter = serializer.buildFilter(Seq(predicate), Seq(attr))

val batches: Iterator[CachedBatch] = Iterator(batchWithStats(10, 0L, 100L))
val result = filter(0, batches).toList

assert(result.length === 1, "batch with [0, 100] covers literal 50, must be kept")
assert(result.head.numRows === 10)
}

test("EqualTo literal outside [lower, upper] prunes the batch") {
val serializer = new ColumnarCachedBatchSerializer
val attr = AttributeReference("id", LongType, nullable = false)()
val predicate = EqualTo(attr, Literal(999L))
val filter = serializer.buildFilter(Seq(predicate), Seq(attr))

val batches: Iterator[CachedBatch] = Iterator(batchWithStats(10, 0L, 100L))
val result = filter(0, batches).toList

assert(result.length === 0, "batch with [0, 100] cannot contain 999, must be pruned")
}

test("mixed null/non-null stats: null through, non-null pruned by predicate") {
val serializer = new ColumnarCachedBatchSerializer
val attr = AttributeReference("id", LongType, nullable = false)()
val predicate = EqualTo(attr, Literal(50L))
val filter = serializer.buildFilter(Seq(predicate), Seq(attr))

val nullStats = CachedColumnarBatch(
numRows = 7,
sizeInBytes = 28L,
bytes = Array[Byte](9, 9),
stats = null)
val keptBatch = batchWithStats(10, 0L, 100L) // covers 50, kept
val prunedBatch = batchWithStats(5, 200L, 300L) // does not cover 50, pruned

val batches: Iterator[CachedBatch] = Iterator(nullStats, prunedBatch, keptBatch)
val result = filter(0, batches).toList

// Expected order: nullStats first (direct), then keptBatch.
// prunedBatch dropped by parent.
assert(
result.length === 2,
s"expected 2 (nullStats + keptBatch), got ${result.length}")
assert(
result.map(_.numRows) === Seq(7, 10),
"order: stats=null pass-through first, then stats-covers-literal kept")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.columnar.CachedBatch
import org.apache.spark.sql.types.IntegerType

import org.scalatest.funsuite.AnyFunSuite

/**
* Tests for the buildFilter stats=null guard (lazy-split iterator wrapper). Without the wrapper,
* vanilla `SimpleMetricsCachedBatchSerializer.buildFilter` NPEs on `partitionFilter.eval(null)`
* (non-trivial predicates, both codegen and interpreted paths). A naive null-literal occupier row
* would fail differently and silently drop every batch.
*
* Pure JVM, no native lib. Drives buildFilter on a synthetic Iterator[CachedBatch] of
* CachedColumnarBatch with stats=null under a non-trivial predicate.
*/
class ColumnarCachedBatchBuildFilterSuite extends AnyFunSuite {

test("buildFilter must direct stats=null batches through under EqualTo predicate") {
val serializer = new ColumnarCachedBatchSerializer
val attr = AttributeReference("id", IntegerType, nullable = false)()
val predicate = EqualTo(attr, Literal(5))
val cachedAttributes = Seq(attr)

val filter = serializer.buildFilter(Seq(predicate), cachedAttributes)

// Three v1 binary batches: stats=null. A correct wrapper directs
// them all through (no pruning, since we have no stats to prune on).
val batches: Iterator[CachedBatch] = Iterator(
CachedColumnarBatch(
numRows = 3,
sizeInBytes = 12L,
bytes = Array[Byte](1, 2, 3),
stats = null),
CachedColumnarBatch(
numRows = 5,
sizeInBytes = 20L,
bytes = Array[Byte](4, 5),
stats = null),
CachedColumnarBatch(
numRows = 7,
sizeInBytes = 28L,
bytes = Array[Byte](6, 7, 8),
stats = null)
)

// GREEN expectation: all 3 batches returned (stats=null -> direct through).
// Without the wrapper: NPE thrown at partitionFilter.eval(null)
// OR silent drop (if naive occupier-row placeholder was used).
val result = filter(0, batches).toList

assert(
result.length === 3,
"v1 binary (stats=null) batches must all be returned (lazy-split wrapper); " +
s"got ${result.length}")
assert(
result.map(_.numRows) === Seq(3, 5, 7),
"batch order must be preserved through wrapper")
}
}
Loading
Loading