diff --git a/CMakeLists.txt b/CMakeLists.txt index 32fc3fed9a8..cc90a615a0f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -489,6 +489,9 @@ endif() if(VELOX_ENABLE_WAVE OR VELOX_ENABLE_CUDF) enable_language(CUDA) + # Use same C++ standard throughout + set(CMAKE_CUDA_STANDARD ${CMAKE_CXX_STANDARD}) + set(CMAKE_CUDA_STANDARD_REQUIRED ${CMAKE_CXX_STANDARD_REQUIRED}) # Determine CUDA_ARCHITECTURES automatically. cmake_policy(SET CMP0104 NEW) if(NOT DEFINED CMAKE_CUDA_ARCHITECTURES) diff --git a/velox/experimental/cudf/expression/CMakeLists.txt b/velox/experimental/cudf/expression/CMakeLists.txt index 5a705b8086b..d98ecc46872 100644 --- a/velox/experimental/cudf/expression/CMakeLists.txt +++ b/velox/experimental/cudf/expression/CMakeLists.txt @@ -18,7 +18,7 @@ add_library( AstExpression.cpp CommonFunctions.cpp DecimalExpressionKernels.cpp - DecimalExpressionKernels.cu + DecimalExpressionKernelsGpu.cu ExpressionEvaluator.cpp JitExpression.cpp PrestoFunctions.cpp @@ -36,5 +36,3 @@ target_link_libraries( ) target_compile_options(velox_cudf_expression PRIVATE -Wno-missing-field-initializers) - -set_target_properties(velox_cudf_expression PROPERTIES CUDA_STANDARD 20 CUDA_STANDARD_REQUIRED ON) diff --git a/velox/experimental/cudf/expression/DecimalExpressionKernels.cpp b/velox/experimental/cudf/expression/DecimalExpressionKernels.cpp index 84e4e72eb66..92612c4ac17 100644 --- a/velox/experimental/cudf/expression/DecimalExpressionKernels.cpp +++ b/velox/experimental/cudf/expression/DecimalExpressionKernels.cpp @@ -15,14 +15,51 @@ */ #include "velox/experimental/cudf/expression/AstPrinter.h" #include "velox/experimental/cudf/expression/DecimalExpressionKernels.h" +#include "velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.h" #include "velox/common/base/Exceptions.h" +#include "velox/type/DecimalUtil.h" +#include "velox/type/Type.h" #include +#include #include +#include +#include #include +#include namespace facebook::velox::cudf_velox { +namespace { + +__int128_t getDecimalScalarValue( + const cudf::scalar& s, + rmm::cuda_stream_view stream) { + if (s.type().id() == cudf::type_id::DECIMAL64) { + auto const& dec = + static_cast const&>(s); + return static_cast<__int128_t>(static_cast(dec.value(stream))); + } + auto const& dec = + static_cast const&>(s); + return static_cast<__int128_t>(dec.value(stream)); +} + +/// Column of \p outputType with \p size rows, all null (e.g. NULL scalar +/// operand). +std::unique_ptr makeAllNullDecimalColumn( + cudf::data_type outputType, + cudf::size_type size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + if (size == 0) { + return cudf::make_empty_column(outputType); + } + return cudf::make_fixed_width_column( + outputType, size, cudf::mask_state::ALL_NULL, stream, mr); +} + +} // namespace // Scatters null values to positions where the divisor is zero. // Returns a new column with nulls at zero-divisor positions. @@ -68,4 +105,179 @@ std::unique_ptr scatterNullsAtZeroDivisor( *nullScalar, *result, divisorIsZero->view(), stream, mr); } +std::unique_ptr decimalDivide( + const cudf::column_view& lhs, + const cudf::column_view& rhs, + cudf::data_type outputType, + int32_t aRescale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + VELOX_CHECK(lhs.size() == rhs.size(), "Decimal divide requires equal sizes"); + // Use VELOX_CHECK (not _EQ) so failed checks do not pass cudf::type_id into + // fmt, which has no formatter for that enum. + VELOX_CHECK( + lhs.type().id() == rhs.type().id(), + "Decimal divide requires matching input types"); + VELOX_CHECK_GE( + aRescale, 0, "Decimal divide requires non-negative rescale factor"); + // Rescale indexes DecimalUtil::kPowersOfTen; same bound as Presto divide + // init. + VELOX_USER_CHECK_LE( + aRescale, LongDecimalType::kMaxPrecision, "Decimal overflow"); + + const auto inType = lhs.type().id(); + const auto outType = outputType.id(); + VELOX_CHECK( + inType == cudf::type_id::DECIMAL64 || inType == cudf::type_id::DECIMAL128, + "Unsupported input type for decimal divide"); + if (inType == cudf::type_id::DECIMAL64) { + VELOX_CHECK( + outType == cudf::type_id::DECIMAL64 || + outType == cudf::type_id::DECIMAL128, + "Unexpected output type for decimal divide"); + } else { + VELOX_CHECK( + outType == cudf::type_id::DECIMAL128, + "Unexpected output type for decimal divide"); + } + + // Combine input null masks (lhs and rhs nulls). + auto [nullMask, nullCount] = + cudf::bitmask_and(cudf::table_view({lhs, rhs}), stream, mr); + + // Create output column with input null mask and perform division. + auto out = cudf::make_fixed_width_column( + outputType, lhs.size(), std::move(nullMask), nullCount, stream, mr); + + const __int128_t rescaleFactor = DecimalUtil::kPowersOfTen[aRescale]; + VELOX_USER_CHECK( + detail::decimalDivideColumnColumn( + inType, + outType, + lhs, + rhs, + out->mutable_view(), + rescaleFactor, + stream), + "Decimal overflow"); + + // Scatter nulls where divisor is zero. + return scatterNullsAtZeroDivisor(std::move(out), rhs, stream, mr); +} + +std::unique_ptr decimalDivide( + const cudf::column_view& lhs, + const cudf::scalar& rhs, + cudf::data_type outputType, + int32_t aRescale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + VELOX_CHECK_GE( + aRescale, 0, "Decimal divide requires non-negative rescale factor"); + // Rescale indexes DecimalUtil::kPowersOfTen; same bound as Presto divide + // init. + VELOX_USER_CHECK_LE( + aRescale, LongDecimalType::kMaxPrecision, "Decimal overflow"); + + if (!rhs.is_valid(stream)) { + return makeAllNullDecimalColumn(outputType, lhs.size(), stream, mr); + } + + auto nullMask = cudf::copy_bitmask(lhs, stream, mr); + auto nullCount = lhs.null_count(); + auto out = cudf::make_fixed_width_column( + outputType, lhs.size(), std::move(nullMask), nullCount, stream, mr); + + auto rhsValue = getDecimalScalarValue(rhs, stream); + + const auto inType = lhs.type().id(); + const auto outType = outputType.id(); + VELOX_CHECK( + inType == cudf::type_id::DECIMAL64 || inType == cudf::type_id::DECIMAL128, + "Unsupported input type for decimal divide"); + if (inType == cudf::type_id::DECIMAL64) { + VELOX_CHECK( + outType == cudf::type_id::DECIMAL64 || + outType == cudf::type_id::DECIMAL128, + "Unexpected output type for decimal divide"); + } else { + VELOX_CHECK( + outType == cudf::type_id::DECIMAL128, + "Unexpected output type for decimal divide"); + } + + VELOX_USER_CHECK( + detail::decimalDivideColumnScalar( + inType, + outType, + lhs, + rhsValue, + out->mutable_view(), + DecimalUtil::kPowersOfTen[aRescale], + stream), + "Decimal overflow"); + + return out; +} + +std::unique_ptr decimalDivide( + const cudf::scalar& lhs, + const cudf::column_view& rhs, + cudf::data_type outputType, + int32_t aRescale, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + VELOX_CHECK_GE( + aRescale, 0, "Decimal divide requires non-negative rescale factor"); + // Rescale indexes DecimalUtil::kPowersOfTen; same bound as Presto divide + // init. + VELOX_USER_CHECK_LE( + aRescale, LongDecimalType::kMaxPrecision, "Decimal overflow"); + + if (!lhs.is_valid(stream)) { + return makeAllNullDecimalColumn(outputType, rhs.size(), stream, mr); + } + + // Copy rhs null mask. + auto nullMask = cudf::copy_bitmask(rhs, stream, mr); + auto nullCount = rhs.null_count(); + + // Create output column and perform division. + auto out = cudf::make_fixed_width_column( + outputType, rhs.size(), std::move(nullMask), nullCount, stream, mr); + + auto lhsValue = getDecimalScalarValue(lhs, stream); + + const auto inType = rhs.type().id(); + const auto outType = outputType.id(); + VELOX_CHECK( + inType == cudf::type_id::DECIMAL64 || inType == cudf::type_id::DECIMAL128, + "Unsupported input type for decimal divide"); + if (inType == cudf::type_id::DECIMAL64) { + VELOX_CHECK( + outType == cudf::type_id::DECIMAL64 || + outType == cudf::type_id::DECIMAL128, + "Unexpected output type for decimal divide"); + } else { + VELOX_CHECK( + outType == cudf::type_id::DECIMAL128, + "Unexpected output type for decimal divide"); + } + + const __int128_t rescaleFactor = DecimalUtil::kPowersOfTen[aRescale]; + VELOX_USER_CHECK( + detail::decimalDivideScalarColumn( + inType, + outType, + lhsValue, + rhs, + out->mutable_view(), + rescaleFactor, + stream), + "Decimal overflow"); + + // Scatter nulls where divisor is zero. + return scatterNullsAtZeroDivisor(std::move(out), rhs, stream, mr); +} + } // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/expression/DecimalExpressionKernels.cu b/velox/experimental/cudf/expression/DecimalExpressionKernels.cu deleted file mode 100644 index cd9291ff493..00000000000 --- a/velox/experimental/cudf/expression/DecimalExpressionKernels.cu +++ /dev/null @@ -1,333 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "velox/experimental/cudf/expression/DecimalExpressionKernels.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include - -namespace facebook::velox::cudf_velox { -namespace { - -template -__device__ OutT -decimalDivideImpl(__int128_t numerator, __int128_t denom, __int128_t scale) { - if (denom == 0) { - return OutT{0}; - } - int sign = 1; - if (numerator < 0) { - numerator = -numerator; - sign = -sign; - } - if (denom < 0) { - denom = -denom; - sign = -sign; - } - __int128_t scaled = numerator * scale; - __int128_t quotient = scaled / denom; - __int128_t remainder = scaled % denom; - if (remainder * 2 >= denom) { - ++quotient; - } - if (sign < 0) { - quotient = -quotient; - } - return static_cast(quotient); -} - -inline __int128_t pow10Int128(int32_t exp) { - __int128_t value = 1; - for (int32_t i = 0; i < exp; ++i) { - value *= 10; - } - return value; -} - -template -struct DivideFunctor { - const InT* lhs; - const InT* rhs; - OutT* out; - __int128_t scale; - - __device__ void operator()(int32_t idx) const { - out[idx] = decimalDivideImpl(lhs[idx], rhs[idx], scale); - } -}; - -template -struct DivideLhsScalarFunctor { - __int128_t lhsValue; - const InColT* rhs; - OutT* out; - __int128_t scale; - - __device__ void operator()(int32_t idx) const { - out[idx] = decimalDivideImpl(lhsValue, rhs[idx], scale); - } -}; - -template -struct DivideRhsScalarFunctor { - const InColT* lhs; - __int128_t rhsValue; - OutT* out; - __int128_t scale; - - __device__ void operator()(int32_t idx) const { - out[idx] = decimalDivideImpl(lhs[idx], rhsValue, scale); - } -}; - -template -void launchDivideKernel( - const cudf::column_view& lhs, - const cudf::column_view& rhs, - cudf::mutable_column_view out, - int32_t aRescale, - rmm::cuda_stream_view stream) { - if (lhs.size() == 0) { - return; - } - DivideFunctor op{ - lhs.data(), - rhs.data(), - out.data(), - pow10Int128(aRescale)}; - cub::DeviceFor::ForEachN( - thrust::counting_iterator(0), lhs.size(), op, stream.value()); -} - -template -void launchDivideKernelLhsScalar( - __int128_t lhsValue, - const cudf::column_view& rhs, - cudf::mutable_column_view out, - int32_t aRescale, - rmm::cuda_stream_view stream) { - if (rhs.size() == 0) { - return; - } - DivideLhsScalarFunctor op{ - lhsValue, rhs.data(), out.data(), pow10Int128(aRescale)}; - cub::DeviceFor::ForEachN( - thrust::counting_iterator(0), rhs.size(), op, stream.value()); -} - -template -void launchDivideKernelRhsScalar( - const cudf::column_view& lhs, - __int128_t rhsValue, - cudf::mutable_column_view out, - int32_t aRescale, - rmm::cuda_stream_view stream) { - if (lhs.size() == 0) { - return; - } - DivideRhsScalarFunctor op{ - lhs.data(), rhsValue, out.data(), pow10Int128(aRescale)}; - cub::DeviceFor::ForEachN( - thrust::counting_iterator(0), lhs.size(), op, stream.value()); -} - -__int128_t getDecimalScalarValue( - const cudf::scalar& s, - rmm::cuda_stream_view stream) { - if (s.type().id() == cudf::type_id::DECIMAL64) { - auto const& dec = - static_cast const&>(s); - return static_cast<__int128_t>(static_cast(dec.value(stream))); - } - auto const& dec = - static_cast const&>(s); - return static_cast<__int128_t>(dec.value(stream)); -} - -/// Column of \p outputType with \p size rows, all null (e.g. NULL scalar -/// operand). -std::unique_ptr makeAllNullDecimalColumn( - cudf::data_type outputType, - cudf::size_type size, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) { - if (size == 0) { - return cudf::make_empty_column(outputType); - } - return cudf::make_fixed_width_column( - outputType, size, cudf::mask_state::ALL_NULL, stream, mr); -} - -} // namespace - -std::unique_ptr decimalDivide( - const cudf::column_view& lhs, - const cudf::column_view& rhs, - cudf::data_type outputType, - int32_t aRescale, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) { - CUDF_EXPECTS(lhs.size() == rhs.size(), "Decimal divide requires equal sizes"); - CUDF_EXPECTS( - lhs.type().id() == rhs.type().id(), - "Decimal divide requires matching input types"); - CUDF_EXPECTS( - aRescale >= 0, "Decimal divide requires non-negative rescale factor"); - - // Combine input null masks (lhs and rhs nulls). - auto [nullMask, nullCount] = - cudf::bitmask_and(cudf::table_view({lhs, rhs}), stream, mr); - - // Create output column with input null mask and perform division. - auto out = cudf::make_fixed_width_column( - outputType, lhs.size(), std::move(nullMask), nullCount, stream, mr); - - if (lhs.type().id() == cudf::type_id::DECIMAL64) { - if (outputType.id() == cudf::type_id::DECIMAL64) { - launchDivideKernel( - lhs, rhs, out->mutable_view(), aRescale, stream); - } else { - CUDF_EXPECTS( - outputType.id() == cudf::type_id::DECIMAL128, - "Unexpected output type for decimal divide"); - launchDivideKernel( - lhs, rhs, out->mutable_view(), aRescale, stream); - } - } else { - CUDF_EXPECTS( - lhs.type().id() == cudf::type_id::DECIMAL128, - "Unsupported input type for decimal divide"); - CUDF_EXPECTS( - outputType.id() == cudf::type_id::DECIMAL128, - "Unexpected output type for decimal divide"); - launchDivideKernel<__int128_t, __int128_t>( - lhs, rhs, out->mutable_view(), aRescale, stream); - } - - // Scatter nulls where divisor is zero. - return scatterNullsAtZeroDivisor(std::move(out), rhs, stream, mr); -} - -std::unique_ptr decimalDivide( - const cudf::column_view& lhs, - const cudf::scalar& rhs, - cudf::data_type outputType, - int32_t aRescale, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) { - CUDF_EXPECTS( - aRescale >= 0, "Decimal divide requires non-negative rescale factor"); - - if (!rhs.is_valid(stream)) { - return makeAllNullDecimalColumn(outputType, lhs.size(), stream, mr); - } - - auto nullMask = cudf::copy_bitmask(lhs, stream, mr); - auto nullCount = lhs.null_count(); - auto out = cudf::make_fixed_width_column( - outputType, lhs.size(), std::move(nullMask), nullCount, stream, mr); - - auto rhsValue = getDecimalScalarValue(rhs, stream); - - if (lhs.type().id() == cudf::type_id::DECIMAL64) { - if (outputType.id() == cudf::type_id::DECIMAL64) { - launchDivideKernelRhsScalar( - lhs, rhsValue, out->mutable_view(), aRescale, stream); - } else { - CUDF_EXPECTS( - outputType.id() == cudf::type_id::DECIMAL128, - "Unexpected output type for decimal divide"); - launchDivideKernelRhsScalar( - lhs, rhsValue, out->mutable_view(), aRescale, stream); - } - } else { - CUDF_EXPECTS( - lhs.type().id() == cudf::type_id::DECIMAL128, - "Unsupported input type for decimal divide"); - CUDF_EXPECTS( - outputType.id() == cudf::type_id::DECIMAL128, - "Unexpected output type for decimal divide"); - launchDivideKernelRhsScalar<__int128_t, __int128_t>( - lhs, rhsValue, out->mutable_view(), aRescale, stream); - } - - return out; -} - -std::unique_ptr decimalDivide( - const cudf::scalar& lhs, - const cudf::column_view& rhs, - cudf::data_type outputType, - int32_t aRescale, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) { - CUDF_EXPECTS( - aRescale >= 0, "Decimal divide requires non-negative rescale factor"); - - if (!lhs.is_valid(stream)) { - return makeAllNullDecimalColumn(outputType, rhs.size(), stream, mr); - } - - // Copy rhs null mask. - auto nullMask = cudf::copy_bitmask(rhs, stream, mr); - auto nullCount = rhs.null_count(); - - // Create output column and perform division. - auto out = cudf::make_fixed_width_column( - outputType, rhs.size(), std::move(nullMask), nullCount, stream, mr); - - auto lhsValue = getDecimalScalarValue(lhs, stream); - - if (rhs.type().id() == cudf::type_id::DECIMAL64) { - if (outputType.id() == cudf::type_id::DECIMAL64) { - launchDivideKernelLhsScalar( - lhsValue, rhs, out->mutable_view(), aRescale, stream); - } else { - CUDF_EXPECTS( - outputType.id() == cudf::type_id::DECIMAL128, - "Unexpected output type for decimal divide"); - launchDivideKernelLhsScalar( - lhsValue, rhs, out->mutable_view(), aRescale, stream); - } - } else { - CUDF_EXPECTS( - rhs.type().id() == cudf::type_id::DECIMAL128, - "Unsupported input type for decimal divide"); - CUDF_EXPECTS( - outputType.id() == cudf::type_id::DECIMAL128, - "Unexpected output type for decimal divide"); - launchDivideKernelLhsScalar<__int128_t, __int128_t>( - lhsValue, rhs, out->mutable_view(), aRescale, stream); - } - - // Scatter nulls where divisor is zero. - return scatterNullsAtZeroDivisor(std::move(out), rhs, stream, mr); -} - -} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/expression/DecimalExpressionKernels.h b/velox/experimental/cudf/expression/DecimalExpressionKernels.h index d7576ccdcf0..46b95d05cc5 100644 --- a/velox/experimental/cudf/expression/DecimalExpressionKernels.h +++ b/velox/experimental/cudf/expression/DecimalExpressionKernels.h @@ -26,6 +26,12 @@ namespace facebook::velox::cudf_velox { +// Element-wise decimal division of two columns (same DECIMAL64 or DECIMAL128 +// input type). Builds the output null mask as the bitwise AND of lhs and rhs +// validity, runs the GPU divide into outputType, and applies +// scatterNullsAtZeroDivisor so rows with a zero divisor are null. aRescale is +// the fixed-point scale adjustment (Velox passes outScale - lhsScale + +// rhsScale) used inside the kernel as a power-of-ten factor. std::unique_ptr decimalDivide( const cudf::column_view& lhs, const cudf::column_view& rhs, @@ -34,6 +40,9 @@ std::unique_ptr decimalDivide( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +// Like column/column decimalDivide, but rhs is a single decimal scalar. If the +// scalar is invalid, returns an all-null column of outputType; otherwise copies +// lhs nulls and divides without zero-divisor scattering (rhs is not per-row). std::unique_ptr decimalDivide( const cudf::column_view& lhs, const cudf::scalar& rhs, @@ -42,6 +51,10 @@ std::unique_ptr decimalDivide( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); +// Like column/column decimalDivide, but lhs is a scalar and rhs is a column. +// Invalid lhs yields all-null output; otherwise rhs nulls are propagated, then +// divide and scatterNullsAtZeroDivisor on rhs so division-by-zero rows are +// null. std::unique_ptr decimalDivide( const cudf::scalar& lhs, const cudf::column_view& rhs, @@ -50,8 +63,10 @@ std::unique_ptr decimalDivide( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); -// Helper function to scatter nulls at zero-divisor positions. -// Moved to .cpp file to allow use of VELOX_FAIL (incompatible with nvcc). +// After a decimal divide, forces output rows to null where the divisor column +// compares equal to zero (DECIMAL64 or DECIMAL128), using copy_if_else. Kept in +// the .cpp translation unit so it can use Velox checks alongside cuDF APIs +// without pulling those into CUDA compilation units. std::unique_ptr scatterNullsAtZeroDivisor( std::unique_ptr result, const cudf::column_view& divisor, diff --git a/velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.cu b/velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.cu new file mode 100644 index 00000000000..943864a72da --- /dev/null +++ b/velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.cu @@ -0,0 +1,357 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.h" + +#include +#include + +#include + +#include +#include +#include + +#include +#include + +namespace facebook::velox::cudf_velox { +namespace { + +// Device-safe int128 bounds (std::numeric_limits is host-only in CUDA). +constexpr unsigned __int128 kUnsigned128Max = + static_cast(-1); +constexpr unsigned __int128 kInt128MinMagnitude = + static_cast(1) << 127; +constexpr unsigned __int128 kInt128MaxMagnitude = kInt128MinMagnitude - 1; +// Bit pattern 2^127 maps to INT128_MIN without negating INT128_MIN (UB). +constexpr __int128_t kInt128Min = static_cast<__int128_t>(kInt128MinMagnitude); + +// Match DecimalUtil::kLongDecimal{Min,Max} (10^38 bounds); duplicated here +// because Velox headers cannot be included in this translation unit (nvcc). +constexpr __int128_t kLongDecimalPowerOfTen38 = + 1'000'000'000'000'000'000LL * (__int128_t)1'000'000'000'000'000'000LL * 100; +constexpr __int128_t kLongDecimalMax = kLongDecimalPowerOfTen38 - 1; +constexpr __int128_t kLongDecimalMin = -kLongDecimalPowerOfTen38 + 1; + +// Device threads cannot throw; record overflow for launchDecimalDivide to +// report to the host caller, matching Velox CPU decimal divide errors. +__device__ inline void markDecimalOverflow(int32_t* overflowFlag) { + atomicOr(overflowFlag, 1); +} + +// Extract absolute value in unsigned space. Signed negation of INT128_MIN is +// undefined; negating the unsigned bit pattern is always defined. +__device__ inline unsigned __int128 absToUnsigned( + __int128_t value, + bool& negative) { + if (value < 0) { + negative = !negative; + return -static_cast(value); + } + return static_cast(value); +} + +// Reapply sign after unsigned divide/round. Caller must ensure magnitude fits. +__device__ inline __int128_t signedFromUnsigned( + unsigned __int128 magnitude, + bool negative) { + if (!negative) { + return static_cast<__int128_t>(magnitude); + } + if (magnitude >= kInt128MinMagnitude) { + return kInt128Min; + } + return -static_cast<__int128_t>(magnitude); +} + +// Quotient magnitude must fit in int128 before signedFromUnsigned; rounding can +// push a representable unsigned quotient past INT128_MAX / INT128_MIN. +__device__ inline bool fitsRepresentableInt128( + unsigned __int128 magnitude, + bool negative) { + if (!negative) { + return magnitude <= kInt128MaxMagnitude; + } + return magnitude <= kInt128MinMagnitude; +} + +// Decimal divide with rescale (numerator * rescaleFactor / denom). Rounding +// matches Velox CPU DecimalUtil::divideWithRoundUp (increment unsigned +// quotient, then apply sign), not Java/Hive HALF_UP toward +infinity on ties. +// Overflow on rescale multiply, round-up, or out-of-range results sets +// overflowFlag (see launchDecimalDivide); intermediate math uses unsigned +// magnitudes so multiply, divide, mod, and abs never hit signed overflow UB. +template +__device__ OutT decimalDivideImpl( + __int128_t numerator, + __int128_t denom, + __int128_t rescaleFactor, + int32_t* overflowFlag) { + if (denom == 0) { + return OutT{0}; + } + + bool negative = false; + unsigned __int128 const uNum = absToUnsigned(numerator, negative); + unsigned __int128 const uDenom = absToUnsigned(denom, negative); + // rescaleFactor is DecimalUtil::kPowersOfTen[aRescale] from the host caller. + unsigned __int128 const uRescaleFactor = + static_cast(rescaleFactor); + + unsigned __int128 scaled = uNum * uRescaleFactor; + // Match Velox CPU checkedMultiply on rescale. + if (uRescaleFactor != 0 && scaled / uRescaleFactor != uNum) { + markDecimalOverflow(overflowFlag); + return OutT{0}; + } + + unsigned __int128 quotient = scaled / uDenom; + unsigned __int128 const remainder = scaled % uDenom; + + // Round ties away from zero (e.g. -1.5 -> -2), same as Velox CPU divide. + // Equivalent to 2 * remainder >= denom but avoids overflow when remainder is + // large. + if (remainder > (uDenom - 1) / 2) { + // Round-up would wrap unsigned quotient; CPU path would overflow too. + if (quotient >= kUnsigned128Max) { + markDecimalOverflow(overflowFlag); + return OutT{0}; + } + ++quotient; + } + + if (!fitsRepresentableInt128(quotient, negative)) { + markDecimalOverflow(overflowFlag); + return OutT{0}; + } + + __int128_t const result = signedFromUnsigned(quotient, negative); + // Match Velox CPU DecimalUtil::valueInRange after divide. + if (result < kLongDecimalMin || result > kLongDecimalMax) { + markDecimalOverflow(overflowFlag); + return OutT{0}; + } + + return static_cast(result); +} + +template +struct DivideFunctor { + const InT* lhs; + const InT* rhs; + OutT* out; + __int128_t rescaleFactor; + int32_t* overflowFlag; + + __device__ void operator()(cudf::size_type idx) const { + out[idx] = decimalDivideImpl( + lhs[idx], rhs[idx], rescaleFactor, overflowFlag); + } +}; + +template +struct DivideLhsScalarFunctor { + __int128_t lhsValue; + const InColT* rhs; + OutT* out; + __int128_t rescaleFactor; + int32_t* overflowFlag; + + __device__ void operator()(cudf::size_type idx) const { + out[idx] = decimalDivideImpl( + lhsValue, rhs[idx], rescaleFactor, overflowFlag); + } +}; + +template +struct DivideRhsScalarFunctor { + const InColT* lhs; + __int128_t rhsValue; + OutT* out; + __int128_t rescaleFactor; + int32_t* overflowFlag; + + __device__ void operator()(cudf::size_type idx) const { + out[idx] = decimalDivideImpl( + lhs[idx], rhsValue, rescaleFactor, overflowFlag); + } +}; + +// Returns false if any row set overflowFlag during the kernel. +template +bool launchDecimalDivide( + cudf::size_type size, + BuildOp buildOp, + rmm::cuda_stream_view stream) { + if (size == 0) { + return true; + } + rmm::device_scalar overflowFlag{0, stream}; + auto op = buildOp(overflowFlag.data()); + cub::DeviceFor::ForEachN( + cuda::counting_iterator{0}, size, op, stream.value()); + CUDF_CUDA_TRY(cudaGetLastError()); + return overflowFlag.value(stream) == 0; +} + +} // namespace + +namespace detail { + +template +concept ValidDecimalDivideStorageTypes = + (std::same_as && + (std::same_as || std::same_as)) || + (std::same_as && std::same_as); + +struct divideColumnColumnKernel { + const cudf::column_view& lhs; + const cudf::column_view& rhs; + cudf::mutable_column_view out; + __int128_t rescaleFactor; + rmm::cuda_stream_view stream; + + template + requires ValidDecimalDivideStorageTypes + bool operator()() const { + return launchDecimalDivide( + lhs.size(), + [&](int32_t* overflowFlag) { + return DivideFunctor{ + lhs.data(), + rhs.data(), + out.data(), + rescaleFactor, + overflowFlag}; + }, + stream); + } + + template + requires(!ValidDecimalDivideStorageTypes) + bool operator()() const { + CUDF_FAIL("Invalid types for decimal divide"); + return false; + } +}; + +struct divideColumnScalarKernel { + const cudf::column_view& lhs; + __int128_t rhsValue; + cudf::mutable_column_view out; + __int128_t rescaleFactor; + rmm::cuda_stream_view stream; + + template + requires ValidDecimalDivideStorageTypes + bool operator()() const { + return launchDecimalDivide( + lhs.size(), + [&](int32_t* overflowFlag) { + return DivideRhsScalarFunctor{ + lhs.data(), + rhsValue, + out.data(), + rescaleFactor, + overflowFlag}; + }, + stream); + } + + template + requires(!ValidDecimalDivideStorageTypes) + bool operator()() const { + CUDF_FAIL("Invalid types for decimal divide"); + return false; + } +}; + +struct divideScalarColumnKernel { + __int128_t lhsValue; + const cudf::column_view& rhs; + cudf::mutable_column_view out; + __int128_t rescaleFactor; + rmm::cuda_stream_view stream; + + template + requires ValidDecimalDivideStorageTypes + bool operator()() const { + return launchDecimalDivide( + rhs.size(), + [&](int32_t* overflowFlag) { + return DivideLhsScalarFunctor{ + lhsValue, + rhs.data(), + out.data(), + rescaleFactor, + overflowFlag}; + }, + stream); + } + + template + requires(!ValidDecimalDivideStorageTypes) + bool operator()() const { + CUDF_FAIL("Invalid types for decimal divide"); + return false; + } +}; + +bool decimalDivideColumnColumn( + cudf::type_id inType, + cudf::type_id outType, + const cudf::column_view& lhs, + const cudf::column_view& rhs, + cudf::mutable_column_view out, + __int128_t rescaleFactor, + rmm::cuda_stream_view stream) { + return cudf::double_type_dispatcher( + cudf::data_type{inType}, + cudf::data_type{outType}, + divideColumnColumnKernel{lhs, rhs, out, rescaleFactor, stream}); +} + +bool decimalDivideColumnScalar( + cudf::type_id inType, + cudf::type_id outType, + const cudf::column_view& lhs, + __int128_t rhsValue, + cudf::mutable_column_view out, + __int128_t rescaleFactor, + rmm::cuda_stream_view stream) { + return cudf::double_type_dispatcher( + cudf::data_type{inType}, + cudf::data_type{outType}, + divideColumnScalarKernel{lhs, rhsValue, out, rescaleFactor, stream}); +} + +bool decimalDivideScalarColumn( + cudf::type_id inType, + cudf::type_id outType, + __int128_t lhsValue, + const cudf::column_view& rhs, + cudf::mutable_column_view out, + __int128_t rescaleFactor, + rmm::cuda_stream_view stream) { + return cudf::double_type_dispatcher( + cudf::data_type{inType}, + cudf::data_type{outType}, + divideScalarColumnKernel{lhsValue, rhs, out, rescaleFactor, stream}); +} + +} // namespace detail +} // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.h b/velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.h new file mode 100644 index 00000000000..0093af13916 --- /dev/null +++ b/velox/experimental/cudf/expression/DecimalExpressionKernelsGpu.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include + +#include + +namespace facebook::velox::cudf_velox::detail { + +// Dispatches a per-row device loop: fixed-point divide (lhs * rescaleFactor) / +// rhs with half-away-from-zero rounding on the remainder, writing into out. +// rescaleFactor is DecimalUtil::kPowersOfTen[aRescale] from the caller. +// Zero divisors produce a numeric zero in out (callers patch nulls). Returns +// false if any row overflowed (caller should VELOX_USER_FAIL). inType / outType +// select DECIMAL64 vs DECIMAL128 storage widths via +// cudf::double_type_dispatcher. +bool decimalDivideColumnColumn( + cudf::type_id inType, + cudf::type_id outType, + const cudf::column_view& lhs, + const cudf::column_view& rhs, + cudf::mutable_column_view out, + __int128_t rescaleFactor, + rmm::cuda_stream_view stream); + +// Same kernel math as decimalDivideColumnColumn, but rhs is a single +// __int128_t decimal payload (already decoded from a cuDF scalar). +bool decimalDivideColumnScalar( + cudf::type_id inType, + cudf::type_id outType, + const cudf::column_view& lhs, + __int128_t rhsValue, + cudf::mutable_column_view out, + __int128_t rescaleFactor, + rmm::cuda_stream_view stream); + +// Same kernel math as decimalDivideColumnColumn, but lhs is a single +// __int128_t decimal payload and rhs is per-row. +bool decimalDivideScalarColumn( + cudf::type_id inType, + cudf::type_id outType, + __int128_t lhsValue, + const cudf::column_view& rhs, + cudf::mutable_column_view out, + __int128_t rescaleFactor, + rmm::cuda_stream_view stream); + +} // namespace facebook::velox::cudf_velox::detail