diff --git a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp index 44cd02fb380..09cdd6cd87e 100644 --- a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp @@ -20,10 +20,19 @@ namespace facebook::velox::dwrf { template SelectiveDecimalColumnReader::SelectiveDecimalColumnReader( + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec) - : SelectiveColumnReader(fileType->type(), fileType, params, scanSpec) { + // Read using requestedType so that values are materialized at the + // table-schema scale rather than the file-footer scale. See the header + // comment for the Hive ORC DECIMAL(38, 18) footer behavior this works + // around. + : SelectiveColumnReader(requestedType, fileType, params, scanSpec) { + VELOX_CHECK( + requestedType_->isDecimal(), + "SelectiveDecimalColumnReader requires a decimal requestedType, got {}", + requestedType_->toString()); EncodingKey encodingKey{fileType_->id(), params.flatMapContext().sequence}; auto& stripe = params.stripeStreams(); if constexpr (std::is_same_v) { diff --git a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h index 338d8ac4756..a770fd2d9cd 100644 --- a/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h +++ b/velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h @@ -28,7 +28,16 @@ using namespace dwio::common; template class SelectiveDecimalColumnReader : public SelectiveColumnReader { public: + // requestedType is the DECIMAL type to materialize values as. It must be a + // decimal type. Hive's ORC writer always records DECIMAL(38, 18) in the + // file footer regardless of the metastore-declared precision/scale; the + // per-row scale at which each value was actually written lives in the + // SECONDARY (a.k.a. NANO_DATA) stream. The reader uses + // requestedType.scale() (the table-schema scale) as the target scale and + // rescales each value from its per-row scale, so the output matches what + // table consumers expect even when the file footer scale differs. SelectiveDecimalColumnReader( + const TypePtr& requestedType, const std::shared_ptr& fileType, DwrfParams& params, common::ScanSpec& scanSpec); diff --git a/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp b/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp index da57c358ebd..dd12ff7ff04 100644 --- a/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp +++ b/velox/dwio/dwrf/reader/SelectiveDwrfReader.cpp @@ -81,7 +81,7 @@ std::unique_ptr SelectiveDwrfReader::build( case TypeKind::BIGINT: if (fileType->type()->isDecimal()) { return std::make_unique>( - fileType, params, scanSpec); + requestedType, fileType, params, scanSpec); } else { return buildIntegerReader( requestedType, fileType, params, LONG_BYTE_SIZE, scanSpec); @@ -150,7 +150,7 @@ std::unique_ptr SelectiveDwrfReader::build( case TypeKind::HUGEINT: if (fileType->type()->isDecimal()) { return std::make_unique>( - fileType, params, scanSpec); + requestedType, fileType, params, scanSpec); } [[fallthrough]]; default: diff --git a/velox/dwio/dwrf/test/TestColumnReader.cpp b/velox/dwio/dwrf/test/TestColumnReader.cpp index 39e6d2d33f8..1ead2ca74b1 100644 --- a/velox/dwio/dwrf/test/TestColumnReader.cpp +++ b/velox/dwio/dwrf/test/TestColumnReader.cpp @@ -27,6 +27,7 @@ #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" +#include "velox/vector/tests/utils/VectorTestBase.h" #include #include @@ -360,6 +361,55 @@ class TestColumnReader : public testing::TestWithParam, bool parallelDecoding() const override { return parallelDecoding_; } + + // Helper for SelectiveDecimalColumnReader tests that exercise schema + // mismatch between the file footer (e.g. Hive ORC's DECIMAL(38, 18)) + // and the requested table-schema type. + template + void verifyDecimalRequestedType( + const unsigned char (&dataBuffer)[kDataSize], + const unsigned char (&scaleBuffer)[kScaleSize], + const TypePtr& fileType, + const TypePtr& requestedType, + const std::vector& expectedValues) { + auto fileRowType = ROW("col_0", fileType); + auto requestedRowType = ROW("col_0", requestedType); + proto::ColumnEncoding directEncoding; + directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + EXPECT_CALL(streams_, getEncodingProxy(_)) + .WillRepeatedly(Return(&directEncoding)); + + EXPECT_CALL( + streams_, getStreamProxy(_, proto::Stream_Kind_ROW_INDEX, false)) + .WillRepeatedly(Return(nullptr)); + EXPECT_CALL(streams_, getStreamProxy(_, proto::Stream_Kind_PRESENT, false)) + .WillRepeatedly(Return(nullptr)); + + EXPECT_CALL(streams_, getStreamProxy(1, proto::Stream_Kind_DATA, true)) + .WillRepeatedly( + Return(new SeekableArrayInputStream(dataBuffer, kDataSize))); + EXPECT_CALL(streams_, getStreamProxy(1, proto::Stream_Kind_NANO_DATA, true)) + .WillRepeatedly( + Return(new SeekableArrayInputStream(scaleBuffer, kScaleSize))); + + auto scanSpec = std::make_unique("root"); + buildReader(requestedRowType, fileRowType, {}, scanSpec.get()); + VectorPtr batch = newBatch(requestedRowType); + skipAndRead(batch, /*readSize=*/expectedValues.size()); + + auto actual = getOnlyChild>(batch); + ASSERT_EQ(expectedValues.size(), batch->size()); + ASSERT_EQ(0, getNullCount(batch)); + ASSERT_EQ(0, getNullCount(actual)); + + auto* pool = &streams_.getMemoryPool(); + auto expected = BaseVector::create>( + requestedType, expectedValues.size(), pool); + for (vector_size_t i = 0; i < expectedValues.size(); ++i) { + expected->set(i, expectedValues[i]); + } + facebook::velox::test::assertEqualVectors(expected, actual); + } }; struct NonSelectiveReaderTestParams { @@ -4077,6 +4127,71 @@ TEST_P(TestColumnReader, testDecimal128WithSkip) { DecimalUtil::toString(intBatch->valueAt(4), decimalType)); } +// Verify that when the file type doesn't match the metastore type, +// the metastore type wins and data is rescaled accordingly. + +// Integer column (per-row scale=0) stored in an ORC file whose footer +// declares DECIMAL(38, 18), but the metastore says DECIMAL(20, 0). The +// reader must produce the original integer values without rescaling. +TEST_P(TestColumnReader, longDecimalRequestedTypeScaleZero) { + const unsigned char dataBuffer[] = {0x02, 0x04, 0x06, 0x08}; + const unsigned char scaleBuffer[] = {0x01, 0x00, 0x00}; + verifyDecimalRequestedType( + dataBuffer, scaleBuffer, DECIMAL(38, 18), DECIMAL(20, 0), {1, 2, 3, 4}); +} + +// Per-row scale (5) already matches the requestedType scale (5); no +// rescaling expected. +TEST_P(TestColumnReader, longDecimalRequestedTypeScaleMatchesData) { + const unsigned char dataBuffer[] = {0x02, 0x04, 0x06, 0x08}; + const unsigned char scaleBuffer[] = {0x01, 0x00, 0x0A}; + verifyDecimalRequestedType( + dataBuffer, scaleBuffer, DECIMAL(38, 18), DECIMAL(25, 5), {1, 2, 3, 4}); +} + +// Per-row scale (3) is lower than the requestedType scale (5). The reader +// must upscale by multiplying by 10^(5-3) = 100. +TEST_P(TestColumnReader, longDecimalRequestedTypeUpscale) { + const unsigned char dataBuffer[] = {0x02, 0x04, 0x06, 0x08}; + const unsigned char scaleBuffer[] = {0x01, 0x00, 0x06}; + verifyDecimalRequestedType( + dataBuffer, + scaleBuffer, + DECIMAL(38, 18), + DECIMAL(25, 5), + {100, 200, 300, 400}); +} + +// Short decimal (BIGINT, precision<=18). File declares DECIMAL(12, 5), +// metastore says DECIMAL(10, 2). Reader must downscale by 10^(5-2)=1000. +TEST_P(TestColumnReader, shortDecimalRequestedTypeDownscale) { + const unsigned char dataBuffer[] = { + 0xD0, 0x0F, 0xA0, 0x1F, 0xF0, 0x2E, 0xC0, 0x3E}; + const unsigned char scaleBuffer[] = {0x01, 0x00, 0x0A}; + verifyDecimalRequestedType( + dataBuffer, scaleBuffer, DECIMAL(12, 5), DECIMAL(10, 2), {1, 2, 3, 4}); +} + +TEST_P(TestColumnReader, decimalRequestedTypeNonDecimalRejected) { + proto::ColumnEncoding directEncoding; + directEncoding.set_kind(proto::ColumnEncoding_Kind_DIRECT); + EXPECT_CALL(streams_, getEncodingProxy(_)) + .WillRepeatedly(Return(&directEncoding)); + EXPECT_CALL(streams_, getStreamProxy(_, proto::Stream_Kind_ROW_INDEX, false)) + .WillRepeatedly(Return(nullptr)); + EXPECT_CALL(streams_, getStreamProxy(_, proto::Stream_Kind_PRESENT, false)) + .WillRepeatedly(Return(nullptr)); + + auto fileType = ROW("col_0", DECIMAL(38, 18)); + auto scanSpec = std::make_unique("root"); + VELOX_ASSERT_THROW( + buildReader(ROW("col_0", BIGINT()), fileType, {}, scanSpec.get()), + "Schema mismatch, From Kind: HUGEINT, To Kind: BIGINT"); + VELOX_ASSERT_THROW( + buildReader(ROW("col_0", DOUBLE()), fileType, {}, scanSpec.get()), + "Schema mismatch, From Kind: HUGEINT, To Kind: DOUBLE"); +} + TEST_P(TestColumnReader, testLargeSkip) { // set getEncoding proto::ColumnEncoding directEncoding;