diff --git a/velox/experimental/cudf/exec/CudfGroupby.cpp b/velox/experimental/cudf/exec/CudfGroupby.cpp index 333419b98ad..b23369bb02d 100644 --- a/velox/experimental/cudf/exec/CudfGroupby.cpp +++ b/velox/experimental/cudf/exec/CudfGroupby.cpp @@ -62,7 +62,8 @@ using cudf_velox::validateIntermediateColumnType; void addGroupbyRequest( \ cudf::table_view const& tbl, \ std::vector& requests, \ - rmm::cuda_stream_view stream) override { \ + rmm::cuda_stream_view stream, \ + rmm::device_async_resource_ref mr) override { \ VELOX_CHECK( \ constant == nullptr, \ #Name "Aggregator does not yet support constant input"); \ @@ -75,11 +76,12 @@ using cudf_velox::validateIntermediateColumnType; \ std::unique_ptr makeOutputColumn( \ std::vector& results, \ - rmm::cuda_stream_view stream) override { \ + rmm::cuda_stream_view stream, \ + rmm::device_async_resource_ref mr) override { \ auto col = std::move(results[output_idx].results[0]); \ const auto cudfType = cudf_velox::veloxToCudfDataType(resultType); \ if (col->type() != cudfType) { \ - col = cudf::cast(*col, cudfType, stream, get_output_mr()); \ + col = cudf::cast(*col, cudfType, stream, mr); \ } \ return col; \ } \ @@ -106,12 +108,13 @@ void addDecimalSumCountRequestsAfterDecode( int32_t scale, std::vector& requests, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, uint32_t& sumIdx, uint32_t& countIdx, std::unique_ptr& decodedSum, std::unique_ptr& decodedCount) { - auto sumAndCount = cudf_velox::deserializeDecimalSumState( - encodedColumn, scale, stream, cudf_velox::get_output_mr()); + auto sumAndCount = + cudf_velox::deserializeDecimalSumState(encodedColumn, scale, stream, mr); decodedSum.swap(sumAndCount.sum); decodedCount.swap(sumAndCount.count); @@ -134,6 +137,7 @@ void addDecimalIntermediateSumCountRequests( const TypePtr& resultType, std::vector& requests, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, uint32_t& sumIdx, uint32_t& countIdx, std::unique_ptr& decodedSum, @@ -148,6 +152,7 @@ void addDecimalIntermediateSumCountRequests( scale, requests, stream, + mr, sumIdx, countIdx, decodedSum, @@ -160,6 +165,7 @@ void addDecimalFinalAvgSumCountRequests( const TypePtr& resultType, std::vector& requests, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, uint32_t& sumIdx, uint32_t& countIdx, std::unique_ptr& decodedSum, @@ -171,6 +177,7 @@ void addDecimalFinalAvgSumCountRequests( scale, requests, stream, + mr, sumIdx, countIdx, decodedSum, @@ -183,6 +190,7 @@ void addDecimalFinalSumOnlyRequest( const TypePtr& resultType, std::vector& requests, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, uint32_t& sumIdx, std::unique_ptr& decodedSum) { validateIntermediateColumnType(tbl.column(inputIndex)); @@ -190,7 +198,7 @@ void addDecimalFinalSumOnlyRequest( auto& request = requests.emplace_back(); sumIdx = requests.size() - 1; auto sumAndCount = cudf_velox::deserializeDecimalSumState( - tbl.column(inputIndex), scale, stream, cudf_velox::get_output_mr()); + tbl.column(inputIndex), scale, stream, mr); decodedSum.swap(sumAndCount.sum); request.values = decodedSum->view(); request.aggregations.push_back( @@ -226,7 +234,8 @@ struct GroupbyDecimalSumAggregator : GroupbyAggregator { void addGroupbyRequest( cudf::table_view const& tbl, std::vector& requests, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { if (step == core::AggregationNode::Step::kIntermediate) { addDecimalIntermediateSumCountRequests( tbl, @@ -234,13 +243,21 @@ struct GroupbyDecimalSumAggregator : GroupbyAggregator { resultType, requests, stream, + mr, sumIdx_, countIdx_, decodedSum_, decodedCount_); } else if (step == core::AggregationNode::Step::kFinal) { addDecimalFinalSumOnlyRequest( - tbl, inputIndex, resultType, requests, stream, sumIdx_, decodedSum_); + tbl, + inputIndex, + resultType, + requests, + stream, + mr, + sumIdx_, + decodedSum_); } else { addDecimalRawPartialSingleSumRequest( tbl, @@ -253,21 +270,22 @@ struct GroupbyDecimalSumAggregator : GroupbyAggregator { std::unique_ptr makeOutputColumn( std::vector& results, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { auto col = std::move(results[sumIdx_].results[0]); if (step == core::AggregationNode::Step::kPartial) { auto count = std::move(results[sumIdx_].results[1]); return serializeDecimalPartialOrIntermediateState( - std::move(col), std::move(count), stream); + std::move(col), std::move(count), stream, mr); } if (step == core::AggregationNode::Step::kIntermediate) { auto count = std::move(results[countIdx_].results[0]); return serializeDecimalPartialOrIntermediateState( - std::move(col), std::move(count), stream); + std::move(col), std::move(count), stream, mr); } auto const cudfResType = cudf_velox::veloxToCudfDataType(resultType); if (col->type() != cudfResType) { - col = cudf::cast(*col, cudfResType, stream, cudf_velox::get_output_mr()); + col = cudf::cast(*col, cudfResType, stream, mr); } return col; } @@ -290,7 +308,8 @@ struct GroupbyDecimalAvgAggregator : GroupbyAggregator { void addGroupbyRequest( cudf::table_view const& tbl, std::vector& requests, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { if (step == core::AggregationNode::Step::kIntermediate) { addDecimalIntermediateSumCountRequests( tbl, @@ -298,6 +317,7 @@ struct GroupbyDecimalAvgAggregator : GroupbyAggregator { resultType, requests, stream, + mr, sumIdx_, countIdx_, decodedSum_, @@ -309,6 +329,7 @@ struct GroupbyDecimalAvgAggregator : GroupbyAggregator { resultType, requests, stream, + mr, sumIdx_, countIdx_, decodedSum_, @@ -326,31 +347,32 @@ struct GroupbyDecimalAvgAggregator : GroupbyAggregator { std::unique_ptr makeOutputColumn( std::vector& results, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { auto col = std::move(results[sumIdx_].results[0]); if (step == core::AggregationNode::Step::kSingle) { auto count = std::move(results[sumIdx_].results[1]); return finalizeDecimalAverage( - std::move(col), std::move(count), resultType, stream); + std::move(col), std::move(count), resultType, stream, mr); } if (step == core::AggregationNode::Step::kPartial) { auto count = std::move(results[sumIdx_].results[1]); return serializeDecimalPartialOrIntermediateState( - std::move(col), std::move(count), stream); + std::move(col), std::move(count), stream, mr); } if (step == core::AggregationNode::Step::kIntermediate) { auto count = std::move(results[countIdx_].results[0]); return serializeDecimalPartialOrIntermediateState( - std::move(col), std::move(count), stream); + std::move(col), std::move(count), stream, mr); } if (step == core::AggregationNode::Step::kFinal) { auto count = std::move(results[countIdx_].results[0]); return finalizeDecimalAverage( - std::move(col), std::move(count), resultType, stream); + std::move(col), std::move(count), resultType, stream, mr); } auto const cudfResType = cudf_velox::veloxToCudfDataType(resultType); if (col->type() != cudfResType) { - col = cudf::cast(*col, cudfResType, stream, cudf_velox::get_output_mr()); + col = cudf::cast(*col, cudfResType, stream, mr); } return col; } @@ -374,7 +396,8 @@ struct GroupbyCountAggregator : GroupbyAggregator { void addGroupbyRequest( cudf::table_view const& tbl, std::vector& requests, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { auto& request = requests.emplace_back(); outputIndex_ = requests.size() - 1; // kCountAll and kNullConstant both submit a count-all-rows request; @@ -397,17 +420,17 @@ struct GroupbyCountAggregator : GroupbyAggregator { std::unique_ptr makeOutputColumn( std::vector& results, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { auto col = std::move(results[outputIndex_].results[0]); if (inputKind_ == CountInputKind::kNullConstant) { auto zero = cudf::numeric_scalar(0, true, stream, get_temp_mr()); - col = cudf::make_column_from_scalar( - zero, col->size(), stream, get_output_mr()); + col = cudf::make_column_from_scalar(zero, col->size(), stream, mr); } // cudf produces int32 for count but velox expects int64. const auto cudfOutputType = cudf_velox::veloxToCudfDataType(resultType); if (col->type() != cudfOutputType) { - col = cudf::cast(*col, cudfOutputType, stream, get_output_mr()); + col = cudf::cast(*col, cudfOutputType, stream, mr); } return col; } @@ -428,7 +451,8 @@ struct GroupbyMeanAggregator : GroupbyAggregator { void addGroupbyRequest( cudf::table_view const& tbl, std::vector& requests, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { switch (step) { case core::AggregationNode::Step::kSingle: { auto& request = requests.emplace_back(); @@ -475,7 +499,8 @@ struct GroupbyMeanAggregator : GroupbyAggregator { std::unique_ptr makeOutputColumn( std::vector& results, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { const auto& outputType = asRowType(resultType); switch (step) { case core::AggregationNode::Step::kSingle: @@ -490,12 +515,11 @@ struct GroupbyMeanAggregator : GroupbyAggregator { auto const cudfCountType = cudf_velox::veloxToCudfDataType(outputType->childAt(1)); if (sum->type() != cudf::data_type(cudfSumType)) { - sum = cudf::cast( - *sum, cudf::data_type(cudfSumType), stream, get_output_mr()); + sum = cudf::cast(*sum, cudf::data_type(cudfSumType), stream, mr); } if (count->type() != cudf::data_type(cudfCountType)) { - count = cudf::cast( - *count, cudf::data_type(cudfCountType), stream, get_output_mr()); + count = + cudf::cast(*count, cudf::data_type(cudfCountType), stream, mr); } auto children = std::vector>(); @@ -528,12 +552,11 @@ struct GroupbyMeanAggregator : GroupbyAggregator { auto const cudfCountType = cudf_velox::veloxToCudfDataType(outputType->childAt(1)); if (sum->type() != cudf::data_type(cudfSumType)) { - sum = cudf::cast( - *sum, cudf::data_type(cudfSumType), stream, get_output_mr()); + sum = cudf::cast(*sum, cudf::data_type(cudfSumType), stream, mr); } if (count->type() != cudf::data_type(cudfCountType)) { - count = cudf::cast( - *count, cudf::data_type(cudfCountType), stream, get_output_mr()); + count = + cudf::cast(*count, cudf::data_type(cudfCountType), stream, mr); } auto children = std::vector>(); @@ -557,7 +580,7 @@ struct GroupbyMeanAggregator : GroupbyAggregator { cudf::binary_operator::DIV, cudf_velox::veloxToCudfDataType(resultType), stream, - get_output_mr()); + mr); return avg; } default: @@ -584,7 +607,8 @@ struct GroupbyStddevSampAggregator : GroupbyAggregator { void addGroupbyRequest( cudf::table_view const& tbl, std::vector& requests, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { auto& request = requests.emplace_back(); outputIdx_ = requests.size() - 1; request.values = tbl.column(inputIndex); @@ -618,7 +642,8 @@ struct GroupbyStddevSampAggregator : GroupbyAggregator { std::unique_ptr makeOutputColumn( std::vector& results, - rmm::cuda_stream_view stream) override { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) override { switch (step) { case core::AggregationNode::Step::kSingle: return std::move(results[outputIdx_].results[0]); @@ -627,7 +652,7 @@ struct GroupbyStddevSampAggregator : GroupbyAggregator { auto mean = std::move(results[outputIdx_].results[1]); auto m2 = std::move(results[outputIdx_].results[2]); return makeM2StructColumn( - std::move(count), std::move(mean), std::move(m2), stream); + std::move(count), std::move(mean), std::move(m2), stream, mr); } case core::AggregationNode::Step::kIntermediate: { auto merged = std::move(results[outputIdx_].results[0]); @@ -653,14 +678,14 @@ struct GroupbyStddevSampAggregator : GroupbyAggregator { // Types don't match - need to copy and cast (use output_mr since // these become part of the output) - auto count = std::make_unique( - mergedView.child(0), stream, get_output_mr()); - auto mean = std::make_unique( - mergedView.child(1), stream, get_output_mr()); - auto m2 = std::make_unique( - mergedView.child(2), stream, get_output_mr()); + auto count = + std::make_unique(mergedView.child(0), stream, mr); + auto mean = + std::make_unique(mergedView.child(1), stream, mr); + auto m2 = + std::make_unique(mergedView.child(2), stream, mr); return makeM2StructColumn( - std::move(count), std::move(mean), std::move(m2), stream); + std::move(count), std::move(mean), std::move(m2), stream, mr); } case core::AggregationNode::Step::kFinal: { // MERGE_M2 returns struct(count, mean, m2) @@ -706,8 +731,7 @@ struct GroupbyStddevSampAggregator : GroupbyAggregator { // Apply mask: where count < 2, result is NULL cudf::numeric_scalar nullDouble( 0.0, false, stream, get_temp_mr()); - return cudf::copy_if_else( - *stddev, nullDouble, *validMask, stream, get_output_mr()); + return cudf::copy_if_else(*stddev, nullDouble, *validMask, stream, mr); } default: VELOX_NYI("Unsupported aggregation step for stddev_samp"); @@ -720,7 +744,8 @@ struct GroupbyStddevSampAggregator : GroupbyAggregator { std::unique_ptr count, std::unique_ptr mean, std::unique_ptr m2, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { const auto& outputType = asRowType(resultType); auto const cudfCountType = cudf_velox::veloxToCudfDataType(outputType->childAt(0)); @@ -729,13 +754,13 @@ struct GroupbyStddevSampAggregator : GroupbyAggregator { auto const cudfM2Type = cudf_velox::veloxToCudfDataType(outputType->childAt(2)); if (count->type() != cudfCountType) { - count = cudf::cast(*count, cudfCountType, stream, get_output_mr()); + count = cudf::cast(*count, cudfCountType, stream, mr); } if (mean->type() != cudfMeanType) { - mean = cudf::cast(*mean, cudfMeanType, stream, get_output_mr()); + mean = cudf::cast(*mean, cudfMeanType, stream, mr); } if (m2->type() != cudfM2Type) { - m2 = cudf::cast(*m2, cudfM2Type, stream, get_output_mr()); + m2 = cudf::cast(*m2, cudfM2Type, stream, mr); } auto const size = count->size(); @@ -987,7 +1012,8 @@ void CudfGroupby::computePartialGroupbyStreaming(CudfVectorPtr tbl) { groupingKeyOutputChannels_, aggregators_, bufferedResultType_, - inputTableStream); + inputTableStream, + get_output_mr()); // If we already have partial output, concatenate the new results with it. if (bufferedResult_) { @@ -1008,7 +1034,8 @@ void CudfGroupby::computePartialGroupbyStreaming(CudfVectorPtr tbl) { groupingKeyOutputChannels_, intermediateAggregators_, bufferedResultType_, - partialOutputStream); + partialOutputStream, + get_output_mr()); bufferedResult_ = compactedOutput; } else { // First time processing, just store the result of the input batch's groupby @@ -1028,7 +1055,8 @@ void CudfGroupby::computeFinalGroupbyStreaming(CudfVectorPtr tbl) { groupingKeyOutputChannels_, intermediateAggregators_, bufferedResultType_, - inputTableStream); + inputTableStream, + get_output_mr()); if (!groupbyOnInput) { return; } @@ -1053,7 +1081,8 @@ void CudfGroupby::computeFinalGroupbyStreaming(CudfVectorPtr tbl) { groupingKeyOutputChannels_, intermediateAggregators_, bufferedResultType_, - finalStream); + finalStream, + get_output_mr()); bufferedResult_ = compactedOutput; } @@ -1066,7 +1095,8 @@ void CudfGroupby::computeSingleGroupbyStreaming(CudfVectorPtr tbl) { groupingKeyOutputChannels_, partialAggregators_, bufferedResultType_, - inputTableStream); + inputTableStream, + get_output_mr()); if (bufferedResult_) { auto partialOutputStream = bufferedResult_->stream(); @@ -1084,7 +1114,8 @@ void CudfGroupby::computeSingleGroupbyStreaming(CudfVectorPtr tbl) { groupingKeyOutputChannels_, intermediateAggregators_, bufferedResultType_, - partialOutputStream); + partialOutputStream, + get_output_mr()); bufferedResult_ = compactedOutput; } else { bufferedResult_ = groupbyOnInput; @@ -1122,7 +1153,8 @@ CudfVectorPtr CudfGroupby::doGroupByAggregation( std::vector const& groupByKeys, std::vector>& aggregators, TypePtr const& outputType, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto groupbyKeyView = tableView.select(groupByKeys.begin(), groupByKeys.end()); @@ -1135,11 +1167,10 @@ CudfVectorPtr CudfGroupby::doGroupByAggregation( std::vector requests; for (auto& aggregator : aggregators) { - aggregator->addGroupbyRequest(tableView, requests, stream); + aggregator->addGroupbyRequest(tableView, requests, stream, mr); } - auto [groupKeys, results] = - groupByOwner.aggregate(requests, stream, get_output_mr()); + auto [groupKeys, results] = groupByOwner.aggregate(requests, stream, mr); // flatten the results std::vector> resultColumns; @@ -1152,7 +1183,7 @@ CudfVectorPtr CudfGroupby::doGroupByAggregation( // then fill the aggregation results for (auto& aggregator : aggregators) { - resultColumns.push_back(aggregator->makeOutputColumn(results, stream)); + resultColumns.push_back(aggregator->makeOutputColumn(results, stream, mr)); } // make a cudf table out of columns @@ -1236,7 +1267,8 @@ RowVectorPtr CudfGroupby::doGetOutput() { groupingKeyOutputChannels_, aggs, outputType_, - stream); + stream, + get_output_mr()); stream.synchronize(); bufferedResult_.reset(); return result; @@ -1247,9 +1279,10 @@ RowVectorPtr CudfGroupby::doGetOutput() { } auto stream = cudfGlobalStreamPool().get_stream(); + auto const outputMr = get_output_mr(); auto tbl = getConcatenatedTable( - std::exchange(inputs_, {}), inputType_, stream, get_output_mr()); + std::exchange(inputs_, {}), inputType_, stream, outputMr); // Release input data after synchronizing. stream.synchronize(); @@ -1268,7 +1301,8 @@ RowVectorPtr CudfGroupby::doGetOutput() { groupingKeyOutputChannels_, aggregators_, outputType_, - stream); + stream, + outputMr); } void CudfGroupby::doNoMoreInput() { diff --git a/velox/experimental/cudf/exec/CudfGroupby.h b/velox/experimental/cudf/exec/CudfGroupby.h index 4c3f0acbb0a..a958d17c321 100644 --- a/velox/experimental/cudf/exec/CudfGroupby.h +++ b/velox/experimental/cudf/exec/CudfGroupby.h @@ -20,6 +20,8 @@ #include +#include + namespace facebook::velox::cudf_velox { struct GroupbyAggregator { @@ -31,11 +33,13 @@ struct GroupbyAggregator { virtual void addGroupbyRequest( cudf::table_view const& tbl, std::vector& requests, - rmm::cuda_stream_view stream) = 0; + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) = 0; virtual std::unique_ptr makeOutputColumn( std::vector& results, - rmm::cuda_stream_view stream) = 0; + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) = 0; virtual ~GroupbyAggregator() = default; @@ -101,7 +105,8 @@ class CudfGroupby : public CudfOperatorBase { std::vector const& groupByKeys, std::vector>& aggregators, TypePtr const& outputType, - rmm::cuda_stream_view stream); + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); CudfVectorPtr releaseAndResetBufferedResult(); diff --git a/velox/experimental/cudf/exec/CudfReduce.cpp b/velox/experimental/cudf/exec/CudfReduce.cpp index c2a7a884115..79fd1cfdd66 100644 --- a/velox/experimental/cudf/exec/CudfReduce.cpp +++ b/velox/experimental/cudf/exec/CudfReduce.cpp @@ -64,6 +64,7 @@ using facebook::velox::cudf_velox::validateIntermediateColumnType; cudf::table_view const& input, \ TypePtr const& outputType, \ rmm::cuda_stream_view stream, \ + rmm::device_async_resource_ref mr, \ vector_size_t /*inputRowCount*/) override { \ auto const aggRequest = \ cudf::make_##name##_aggregation(); \ @@ -74,8 +75,7 @@ using facebook::velox::cudf_velox::validateIntermediateColumnType; cudfOutputType, \ stream, \ get_temp_mr()); \ - return cudf::make_column_from_scalar( \ - *resultScalar, 1, stream, get_output_mr()); \ + return cudf::make_column_from_scalar(*resultScalar, 1, stream, mr); \ } \ }; @@ -96,6 +96,7 @@ struct ReduceCountAggregator : ReduceAggregator { cudf::table_view const& input, TypePtr const& outputType, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, vector_size_t inputRowCount) override { if (exec::isRawInput(step)) { int64_t count; @@ -122,8 +123,7 @@ struct ReduceCountAggregator : ReduceAggregator { auto resultScalar = cudf::numeric_scalar(count, true, stream, get_temp_mr()); - return cudf::make_column_from_scalar( - resultScalar, 1, stream, get_output_mr()); + return cudf::make_column_from_scalar(resultScalar, 1, stream, mr); } else { // For non-raw input (intermediate/final), use sum aggregation auto const aggRequest = @@ -136,8 +136,7 @@ struct ReduceCountAggregator : ReduceAggregator { stream, get_temp_mr()); resultScalar->set_valid_async(true, stream); - return cudf::make_column_from_scalar( - *resultScalar, 1, stream, get_output_mr()); + return cudf::make_column_from_scalar(*resultScalar, 1, stream, mr); } } @@ -157,6 +156,7 @@ struct ReduceMeanAggregator : ReduceAggregator { cudf::table_view const& input, TypePtr const& outputType, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, vector_size_t /*inputRowCount*/) override { switch (step) { case core::AggregationNode::Step::kSingle: { @@ -169,8 +169,7 @@ struct ReduceMeanAggregator : ReduceAggregator { cudfOutputType, stream, get_temp_mr()); - return cudf::make_column_from_scalar( - *resultScalar, 1, stream, get_output_mr()); + return cudf::make_column_from_scalar(*resultScalar, 1, stream, mr); } case core::AggregationNode::Step::kPartial: { VELOX_CHECK(outputType->isRow()); @@ -189,8 +188,8 @@ struct ReduceMeanAggregator : ReduceAggregator { cudfSumType, stream, get_temp_mr()); - auto sumCol = cudf::make_column_from_scalar( - *sumResultScalar, 1, stream, get_output_mr()); + auto sumCol = + cudf::make_column_from_scalar(*sumResultScalar, 1, stream, mr); // libcudf doesn't have a count agg for reduce. What we want is to // count the number of valid rows. @@ -203,7 +202,7 @@ struct ReduceMeanAggregator : ReduceAggregator { get_temp_mr()), 1, stream, - get_output_mr()); + mr); // Assemble into struct as expected by velox. auto children = std::vector>(); @@ -227,8 +226,8 @@ struct ReduceMeanAggregator : ReduceAggregator { cudf::make_sum_aggregation(); auto const sumResultScalar = cudf::reduce( sumCol, *sumAggRequest, sumCol.type(), stream, get_temp_mr()); - auto sumResultCol = cudf::make_column_from_scalar( - *sumResultScalar, 1, stream, get_output_mr()); + auto sumResultCol = + cudf::make_column_from_scalar(*sumResultScalar, 1, stream, mr); // sum the counts auto const countAggRequest = @@ -244,7 +243,7 @@ struct ReduceMeanAggregator : ReduceAggregator { cudf::binary_operator::DIV, cudfOutputType, stream, - get_output_mr()); + mr); } default: VELOX_NYI("Unsupported aggregation step for mean"); @@ -254,7 +253,8 @@ struct ReduceMeanAggregator : ReduceAggregator { std::unique_ptr partialDecimalSumCountToSerializedString( cudf::column_view inputCol, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto const sumAgg = cudf::make_sum_aggregation(); auto sumScalar = cudf::reduce(inputCol, *sumAgg, inputCol.type(), stream, get_temp_mr()); @@ -266,21 +266,20 @@ std::unique_ptr partialDecimalSumCountToSerializedString( cudf::data_type{cudf::type_id::INT64}, stream, get_temp_mr()); - auto sumCol = - cudf::make_column_from_scalar(*sumScalar, 1, stream, get_output_mr()); - auto countCol = - cudf::make_column_from_scalar(*countScalar, 1, stream, get_output_mr()); + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, mr); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, mr); return serializeDecimalPartialOrIntermediateState( - std::move(sumCol), std::move(countCol), stream); + std::move(sumCol), std::move(countCol), stream, mr); } std::unique_ptr intermediateDecimalMergeSerializedString( cudf::column_view inputCol, int32_t scale, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto const sumAgg = cudf::make_sum_aggregation(); - auto sumAndCount = cudf_velox::deserializeDecimalSumState( - inputCol, scale, stream, get_output_mr()); + auto sumAndCount = + cudf_velox::deserializeDecimalSumState(inputCol, scale, stream, mr); auto sumScalar = cudf::reduce( sumAndCount.sum->view(), *sumAgg, @@ -293,22 +292,21 @@ std::unique_ptr intermediateDecimalMergeSerializedString( cudf::data_type{cudf::type_id::INT64}, stream, get_temp_mr()); - auto sumCol = - cudf::make_column_from_scalar(*sumScalar, 1, stream, get_output_mr()); - auto countCol = - cudf::make_column_from_scalar(*countScalar, 1, stream, get_output_mr()); + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, mr); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, mr); return serializeDecimalPartialOrIntermediateState( - std::move(sumCol), std::move(countCol), stream); + std::move(sumCol), std::move(countCol), stream, mr); } std::unique_ptr finalDecimalAvgFromSerializedString( cudf::column_view inputCol, int32_t scale, TypePtr const& resultType, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto const sumAgg = cudf::make_sum_aggregation(); - auto sumAndCount = cudf_velox::deserializeDecimalSumState( - inputCol, scale, stream, get_output_mr()); + auto sumAndCount = + cudf_velox::deserializeDecimalSumState(inputCol, scale, stream, mr); auto sumScalar = cudf::reduce( sumAndCount.sum->view(), *sumAgg, @@ -321,18 +319,17 @@ std::unique_ptr finalDecimalAvgFromSerializedString( cudf::data_type{cudf::type_id::INT64}, stream, get_temp_mr()); - auto sumCol = - cudf::make_column_from_scalar(*sumScalar, 1, stream, get_output_mr()); - auto countCol = - cudf::make_column_from_scalar(*countScalar, 1, stream, get_output_mr()); + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, mr); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, mr); return finalizeDecimalAverage( - std::move(sumCol), std::move(countCol), resultType, stream); + std::move(sumCol), std::move(countCol), resultType, stream, mr); } std::unique_ptr singleDecimalAvgFromRawColumn( cudf::column_view inputCol, TypePtr const& resultType, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto const sumAgg = cudf::make_sum_aggregation(); auto sumScalar = cudf::reduce(inputCol, *sumAgg, inputCol.type(), stream, get_temp_mr()); @@ -344,63 +341,64 @@ std::unique_ptr singleDecimalAvgFromRawColumn( cudf::data_type{cudf::type_id::INT64}, stream, get_temp_mr()); - auto sumCol = - cudf::make_column_from_scalar(*sumScalar, 1, stream, get_output_mr()); - auto countCol = - cudf::make_column_from_scalar(*countScalar, 1, stream, get_output_mr()); + auto sumCol = cudf::make_column_from_scalar(*sumScalar, 1, stream, mr); + auto countCol = cudf::make_column_from_scalar(*countScalar, 1, stream, mr); return finalizeDecimalAverage( - std::move(sumCol), std::move(countCol), resultType, stream); + std::move(sumCol), std::move(countCol), resultType, stream, mr); } std::unique_ptr singleOrRawDecimalSumWithCast( cudf::column_view inputCol, TypePtr const& outputType, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto const sumAgg = cudf::make_sum_aggregation(); auto const cudfOutType = cudf_velox::veloxToCudfDataType(outputType); std::unique_ptr castedInput; if (outputType->isDecimal() && inputCol.type() != cudfOutType) { - castedInput = cudf::cast(inputCol, cudfOutType, stream, get_output_mr()); + castedInput = cudf::cast(inputCol, cudfOutType, stream, mr); inputCol = castedInput->view(); } auto const resultScalar = cudf::reduce(inputCol, *sumAgg, cudfOutType, stream, get_temp_mr()); - return cudf::make_column_from_scalar( - *resultScalar, 1, stream, get_output_mr()); + return cudf::make_column_from_scalar(*resultScalar, 1, stream, mr); } std::unique_ptr reduceIntermediateDecimalFromSerializedColumn( cudf::column_view inputCol, TypePtr const& outputType, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { validateIntermediateColumnType(inputCol); // outputType here could be DECIMAL or VARBINARY auto scale = outputType->isDecimal() ? getDecimalPrecisionScale(*outputType).second : 0; - return intermediateDecimalMergeSerializedString(inputCol, scale, stream); + return intermediateDecimalMergeSerializedString(inputCol, scale, stream, mr); } std::unique_ptr reduceFinalDecimalSumFromSerializedColumn( cudf::column_view inputCol, TypePtr const& outputType, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { validateIntermediateColumnType(inputCol); auto scale = getDecimalPrecisionScale(*outputType).second; - auto sumAndCount = cudf_velox::deserializeDecimalSumState( - inputCol, scale, stream, get_output_mr()); + auto sumAndCount = + cudf_velox::deserializeDecimalSumState(inputCol, scale, stream, mr); return singleOrRawDecimalSumWithCast( - sumAndCount.sum->view(), outputType, stream); + sumAndCount.sum->view(), outputType, stream, mr); } std::unique_ptr reduceFinalDecimalAvgFromSerializedColumn( cudf::column_view inputCol, TypePtr const& outputType, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { validateIntermediateColumnType(inputCol); auto scale = getDecimalPrecisionScale(*outputType).second; return finalDecimalAvgFromSerializedString( - inputCol, scale, outputType, stream); + inputCol, scale, outputType, stream, mr); } struct ReduceDecimalSumAggregator : ReduceAggregator { @@ -415,19 +413,20 @@ struct ReduceDecimalSumAggregator : ReduceAggregator { cudf::table_view const& input, TypePtr const& outputType, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, vector_size_t /*inputRowCount*/) override { cudf::column_view inputCol = input.column(inputIndex); switch (step) { case core::AggregationNode::Step::kSingle: - return singleOrRawDecimalSumWithCast(inputCol, outputType, stream); + return singleOrRawDecimalSumWithCast(inputCol, outputType, stream, mr); case core::AggregationNode::Step::kPartial: - return partialDecimalSumCountToSerializedString(inputCol, stream); + return partialDecimalSumCountToSerializedString(inputCol, stream, mr); case core::AggregationNode::Step::kIntermediate: return reduceIntermediateDecimalFromSerializedColumn( - inputCol, outputType, stream); + inputCol, outputType, stream, mr); case core::AggregationNode::Step::kFinal: return reduceFinalDecimalSumFromSerializedColumn( - inputCol, outputType, stream); + inputCol, outputType, stream, mr); default: VELOX_NYI("Unsupported aggregation step for decimal sum reduce"); } @@ -446,20 +445,21 @@ struct ReduceDecimalAvgAggregator : ReduceAggregator { cudf::table_view const& input, TypePtr const& outputType, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, vector_size_t /*inputRowCount*/) override { cudf::column_view inputCol = input.column(inputIndex); switch (step) { case core::AggregationNode::Step::kSingle: - return singleDecimalAvgFromRawColumn(inputCol, resultType, stream); + return singleDecimalAvgFromRawColumn(inputCol, resultType, stream, mr); case core::AggregationNode::Step::kPartial: - return partialDecimalSumCountToSerializedString(inputCol, stream); + return partialDecimalSumCountToSerializedString(inputCol, stream, mr); case core::AggregationNode::Step::kIntermediate: return reduceIntermediateDecimalFromSerializedColumn( - inputCol, outputType, stream); + inputCol, outputType, stream, mr); case core::AggregationNode::Step::kFinal: VELOX_CHECK(outputType == resultType, "outputType/resultType mismatch"); return reduceFinalDecimalAvgFromSerializedColumn( - inputCol, outputType, stream); + inputCol, outputType, stream, mr); default: VELOX_NYI("Unsupported aggregation step for decimal avg reduce"); } @@ -488,13 +488,14 @@ struct ApproxDistinctAggregator : ReduceAggregator { cudf::table_view const& input, TypePtr const& outputType, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, vector_size_t /*inputRowCount*/) override { if (exec::isRawInput(step)) { return doPartialReduce(input, stream); } else if (step == core::AggregationNode::Step::kIntermediate) { return doIntermediateReduce(input, stream); } else { - return doFinalReduce(input, stream); + return doFinalReduce(input, stream, mr); } } @@ -633,7 +634,8 @@ struct ApproxDistinctAggregator : ReduceAggregator { std::unique_ptr doFinalReduce( cudf::table_view const& input, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { auto sketch_column = input.column(inputIndex); if (sketch_column.size() == 0) { @@ -641,19 +643,19 @@ struct ApproxDistinctAggregator : ReduceAggregator { cudf::numeric_scalar(0, true, stream, get_temp_mr()), 1, stream, - get_output_mr()); + mr); } return mergeSketchesAndApply( sketch_column, - [stream](cudf::approx_distinct_count& sketch) { + [stream, mr](cudf::approx_distinct_count& sketch) { std::size_t estimate = sketch.estimate(stream); return cudf::make_column_from_scalar( cudf::numeric_scalar( static_cast(estimate), true, stream, get_temp_mr()), 1, stream, - get_output_mr()); + mr); }, stream); } @@ -828,13 +830,14 @@ void CudfReduce::doAddInput(RowVectorPtr input) { CudfVectorPtr CudfReduce::doGlobalAggregation( cudf::table_view tableView, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { std::vector> resultColumns; resultColumns.reserve(aggregators_.size()); for (auto i = 0; i < aggregators_.size(); i++) { resultColumns.push_back( aggregators_[i]->doReduce( - tableView, outputType_->childAt(i), stream, numInputRows_)); + tableView, outputType_->childAt(i), stream, mr, numInputRows_)); } return std::make_shared( @@ -861,9 +864,10 @@ RowVectorPtr CudfReduce::doGetOutput() { } auto stream = cudfGlobalStreamPool().get_stream(); + auto const outputMr = get_output_mr(); - auto tbl = getConcatenatedTable( - std::move(inputs_), inputType_, stream, get_output_mr()); + auto tbl = + getConcatenatedTable(std::move(inputs_), inputType_, stream, outputMr); // Release input data after synchronizing. stream.synchronize(); @@ -879,7 +883,7 @@ RowVectorPtr CudfReduce::doGetOutput() { ? tbl->view() : tbl->view().select( aggregationInputChannels_.begin(), aggregationInputChannels_.end()); - auto output = doGlobalAggregation(tableView, stream); + auto output = doGlobalAggregation(tableView, stream, outputMr); if (isPartialOutput_ && !noMoreInput_) { numInputRows_ = 0; } diff --git a/velox/experimental/cudf/exec/CudfReduce.h b/velox/experimental/cudf/exec/CudfReduce.h index 117adc42f71..03ee4d302a7 100644 --- a/velox/experimental/cudf/exec/CudfReduce.h +++ b/velox/experimental/cudf/exec/CudfReduce.h @@ -18,6 +18,8 @@ #include "velox/experimental/cudf/exec/CudfAggregation.h" #include "velox/experimental/cudf/exec/CudfOperator.h" +#include + namespace facebook::velox::cudf_velox { struct ReduceAggregator { @@ -30,6 +32,7 @@ struct ReduceAggregator { cudf::table_view const& input, TypePtr const& outputType, rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr, vector_size_t inputRowCount) = 0; virtual ~ReduceAggregator() = default; @@ -91,7 +94,8 @@ class CudfReduce : public CudfOperatorBase { private: CudfVectorPtr doGlobalAggregation( cudf::table_view tableView, - rmm::cuda_stream_view stream); + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); std::shared_ptr aggregationNode_; std::vector> aggregators_; diff --git a/velox/experimental/cudf/exec/DecimalAggregationCommon.cpp b/velox/experimental/cudf/exec/DecimalAggregationCommon.cpp index adc1e6b4aea..f36f2e33360 100644 --- a/velox/experimental/cudf/exec/DecimalAggregationCommon.cpp +++ b/velox/experimental/cudf/exec/DecimalAggregationCommon.cpp @@ -16,7 +16,6 @@ #include "velox/experimental/cudf/exec/DecimalAggregationCommon.h" #include "velox/experimental/cudf/exec/DecimalAggregationKernels.h" -#include "velox/experimental/cudf/exec/GpuResources.h" #include "velox/experimental/cudf/exec/VeloxCudfInterop.h" #include "velox/common/base/Exceptions.h" @@ -37,10 +36,11 @@ void validateIntermediateColumnType(cudf::column_view const& column) { std::unique_ptr castCountColumnToInt64( std::unique_ptr count, - rmm::cuda_stream_view stream) { + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { if (count->type().id() != cudf::type_id::INT64) { - count = cudf::cast( - *count, cudf::data_type{cudf::type_id::INT64}, stream, get_output_mr()); + count = + cudf::cast(*count, cudf::data_type{cudf::type_id::INT64}, stream, mr); } return count; } @@ -48,23 +48,23 @@ std::unique_ptr castCountColumnToInt64( std::unique_ptr serializeDecimalPartialOrIntermediateState( std::unique_ptr sum, std::unique_ptr count, - rmm::cuda_stream_view stream) { - count = castCountColumnToInt64(std::move(count), stream); - return serializeDecimalSumState( - sum->view(), count->view(), stream, get_output_mr()); + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + count = castCountColumnToInt64(std::move(count), stream, mr); + return serializeDecimalSumState(sum->view(), count->view(), stream, mr); } std::unique_ptr finalizeDecimalAverage( std::unique_ptr sum, std::unique_ptr count, const TypePtr& resultType, - rmm::cuda_stream_view stream) { - count = castCountColumnToInt64(std::move(count), stream); - auto avgCol = computeDecimalAverage( - sum->view(), count->view(), stream, get_output_mr()); + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + count = castCountColumnToInt64(std::move(count), stream, mr); + auto avgCol = computeDecimalAverage(sum->view(), count->view(), stream, mr); auto const cudfOutType = veloxToCudfDataType(resultType); if (avgCol->type() != cudfOutType) { - avgCol = cudf::cast(avgCol->view(), cudfOutType, stream, get_output_mr()); + avgCol = cudf::cast(avgCol->view(), cudfOutType, stream, mr); } return avgCol; } diff --git a/velox/experimental/cudf/exec/DecimalAggregationCommon.h b/velox/experimental/cudf/exec/DecimalAggregationCommon.h index 20dec2e6449..ba6b57ad381 100644 --- a/velox/experimental/cudf/exec/DecimalAggregationCommon.h +++ b/velox/experimental/cudf/exec/DecimalAggregationCommon.h @@ -21,6 +21,7 @@ #include #include +#include #include @@ -31,11 +32,12 @@ namespace facebook::velox::cudf_velox { // sum/count payloads (see serializeDecimalSumState). void validateIntermediateColumnType(cudf::column_view const& column); -// Ensures the partial-row count column is INT64, casting with the default GPU -// output memory resource when the incoming type differs. +// Ensures the partial-row count column is INT64, casting when the incoming type +// differs. std::unique_ptr castCountColumnToInt64( std::unique_ptr count, - rmm::cuda_stream_view stream); + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); // Normalizes the count column to INT64, then encodes sum and count into a // single STRING column of fixed-width per-row payloads (delegates to @@ -44,7 +46,8 @@ std::unique_ptr castCountColumnToInt64( std::unique_ptr serializeDecimalPartialOrIntermediateState( std::unique_ptr sum, std::unique_ptr count, - rmm::cuda_stream_view stream); + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); // Normalizes the count column to INT64, computes a per-row decimal average // from intermediate sum/count (delegates to computeDecimalAverage), then casts @@ -54,6 +57,7 @@ std::unique_ptr finalizeDecimalAverage( std::unique_ptr sum, std::unique_ptr count, const TypePtr& resultType, - rmm::cuda_stream_view stream); + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); } // namespace facebook::velox::cudf_velox diff --git a/velox/experimental/cudf/exec/DecimalAggregationKernels.cpp b/velox/experimental/cudf/exec/DecimalAggregationKernels.cpp index f1f131125af..a0f85d2aeae 100644 --- a/velox/experimental/cudf/exec/DecimalAggregationKernels.cpp +++ b/velox/experimental/cudf/exec/DecimalAggregationKernels.cpp @@ -44,12 +44,14 @@ DecimalSumStateColumns deserializeDecimalSumState( cudf::data_type{cudf::type_id::DECIMAL128, -scale}, 0, cudf::mask_state::UNALLOCATED, - stream); + stream, + mr); empty.count = cudf::make_fixed_width_column( cudf::data_type{cudf::type_id::INT64}, 0, cudf::mask_state::UNALLOCATED, - stream); + stream, + mr); return empty; } @@ -61,12 +63,14 @@ DecimalSumStateColumns deserializeDecimalSumState( cudf::data_type{cudf::type_id::DECIMAL128, -scale}, numRows, cudf::mask_state::ALL_NULL, - stream); + stream, + mr); allNull.count = cudf::make_fixed_width_column( cudf::data_type{cudf::type_id::INT64}, numRows, cudf::mask_state::ALL_NULL, - stream); + stream, + mr); return allNull; } @@ -81,12 +85,14 @@ DecimalSumStateColumns deserializeDecimalSumState( cudf::data_type{cudf::type_id::DECIMAL128, -scale}, numRows, cudf::mask_state::UNALLOCATED, - stream); + stream, + mr); auto countCol = cudf::make_fixed_width_column( cudf::data_type{cudf::type_id::INT64}, numRows, cudf::mask_state::UNALLOCATED, - stream); + stream, + mr); auto sumView = sumCol->mutable_view(); auto countView = countCol->mutable_view(); @@ -165,11 +171,12 @@ std::unique_ptr serializeDecimalSumState( cudf::data_type{offsetsType}, numRows + 1, cudf::mask_state::UNALLOCATED, - stream); + stream, + mr); auto offsetsView = offsetsCol->mutable_view(); rmm::device_buffer charsBuf( - static_cast(numRows) * detail::kDecimalSumStateSize, stream); + static_cast(numRows) * detail::kDecimalSumStateSize, stream, mr); detail::fillOffsetsForDecimalSumState( useLargeOffsets, @@ -236,7 +243,7 @@ std::unique_ptr computeDecimalAverage( auto numRows = sumCol.size(); auto out = cudf::make_fixed_width_column( - sumCol.type(), numRows, cudf::mask_state::UNALLOCATED, stream); + sumCol.type(), numRows, cudf::mask_state::UNALLOCATED, stream, mr); if (numRows > 0) { auto const rowCount = static_cast(numRows);