Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cuco::static_set in the hash-based groupby #14813

Merged
merged 27 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
11b8126
Rewrite hash groupby with hash set
PointKernel Jan 19, 2024
166ed49
Formatting
PointKernel Jan 19, 2024
07313fe
Merge remote-tracking branch 'upstream/branch-24.04' into cuco-set-gr…
PointKernel Jan 22, 2024
b1db243
Minor cleanups
PointKernel Jan 23, 2024
ed8502d
Merge remote-tracking branch 'upstream/branch-24.04' into cuco-set-gr…
PointKernel Feb 16, 2024
ca6829d
Update cuco code
PointKernel Feb 16, 2024
0c10a0b
Add CUCO_CUDF_SIZE_TYPE_SENTINEL
PointKernel Feb 16, 2024
2470c68
Header cleanups
PointKernel Feb 16, 2024
7da8c55
Update docs
PointKernel Feb 16, 2024
7dd59a6
Minor doc updates
PointKernel Feb 16, 2024
3cbdb7c
Add peak memory usage metrics to groupby NV bencmarks
PointKernel Feb 16, 2024
82aa0ce
Revert some benchmark changes
PointKernel Feb 16, 2024
230928a
Merge remote-tracking branch 'upstream/branch-24.04' into cuco-set-gr…
PointKernel Feb 20, 2024
2259455
Merge remote-tracking branch 'upstream/branch-24.04' into cuco-set-gr…
PointKernel Feb 21, 2024
4193c75
Fix pytests
PointKernel Feb 22, 2024
9645865
Merge remote-tracking branch 'upstream/branch-24.04' into cuco-set-gr…
PointKernel Feb 22, 2024
574f628
Renaming
PointKernel Feb 22, 2024
75a8e64
Fix several docstring tests
PointKernel Feb 23, 2024
85a47db
Make value_counts docstring test deterministic
PointKernel Feb 23, 2024
241aca0
Merge remote-tracking branch 'upstream/branch-24.04' into cuco-set-gr…
PointKernel Feb 23, 2024
dbd9e6b
Merge branch 'branch-24.04' into cuco-set-groupby
PointKernel Feb 27, 2024
0af1d13
Merge branch 'branch-24.04' into cuco-set-groupby
PointKernel Feb 28, 2024
f79f1d6
Update docs
PointKernel Feb 29, 2024
8263b4f
Merge branch 'branch-24.04' into cuco-set-groupby
PointKernel Feb 29, 2024
56a2229
Add TODO reminder for future performance tuning
PointKernel Feb 29, 2024
8bade44
Merge remote-tracking branch 'origin/cuco-set-groupby' into cuco-set-…
PointKernel Feb 29, 2024
6e54cd9
Merge remote-tracking branch 'upstream/branch-24.04' into cuco-set-gr…
PointKernel Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/benchmarks/groupby/group_max.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>

#include <cudf/groupby.hpp>

Expand Down Expand Up @@ -50,9 +51,13 @@ void bench_groupby_max(nvbench::state& state, nvbench::type_list<Type>)
requests[0].values = vals->view();
requests[0].aggregations.push_back(cudf::make_max_aggregation<cudf::groupby_aggregation>());

auto const mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { auto const result = gb_obj.aggregate(requests); });

state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
}

NVBENCH_BENCH_TYPES(bench_groupby_max,
Expand Down
9 changes: 7 additions & 2 deletions cpp/benchmarks/groupby/group_struct_keys.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>

#include <cudf_test/column_wrapper.hpp>

Expand Down Expand Up @@ -80,11 +81,15 @@ void bench_groupby_struct_keys(nvbench::state& state)
requests[0].aggregations.push_back(cudf::make_min_aggregation<cudf::groupby_aggregation>());

// Set up nvbench default stream
auto stream = cudf::get_default_stream();
auto const mem_stats_logger = cudf::memory_stats_logger();
auto stream = cudf::get_default_stream();
state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));

state.exec(nvbench::exec_tag::sync,
[&](nvbench::launch& launch) { auto const result = gb_obj.aggregate(requests); });

state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
}

NVBENCH_BENCH(bench_groupby_struct_keys)
Expand Down
5 changes: 5 additions & 0 deletions cpp/include/cudf/detail/cuco_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

#pragma once

#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/polymorphic_allocator.hpp>

namespace cudf::detail {

/// Sentinel value for `cudf::size_type`
static cudf::size_type constexpr CUDF_SIZE_TYPE_SENTINEL = -1;

/// Default load factor for cuco data structures
static double constexpr CUCO_DESIRED_LOAD_FACTOR = 0.5;

Expand Down
121 changes: 52 additions & 69 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@
#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/copying.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/aggregation/result_cache.hpp>
#include <cudf/detail/binaryop.hpp>
#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/groupby.hpp>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/replace.hpp>
#include <cudf/detail/unary.hpp>
#include <cudf/detail/utilities/algorithm.cuh>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/groupby.hpp>
#include <cudf/hashing/detail/default_hash.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table.hpp>
#include <cudf/table/table_device_view.cuh>
Expand All @@ -49,12 +45,9 @@

#include <rmm/cuda_stream_view.hpp>

#include <cuda/functional>
#include <cuda/std/atomic>
#include <thrust/copy.h>
#include <cuco/static_set.cuh>
#include <thrust/for_each.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>

#include <memory>
#include <unordered_set>
Expand All @@ -66,15 +59,10 @@ namespace detail {
namespace hash {
namespace {

// TODO: replace it with `cuco::static_map`
// https://github.com/rapidsai/cudf/issues/10401
template <typename ComparatorType>
using map_type = concurrent_unordered_map<
cudf::size_type,
cudf::size_type,
using probing_scheme_type = cuco::linear_probing<
1, ///< Number of threads used to handle each input key
Comment on lines +64 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PointKernel You mentioned:

Based on our past tuning experience, e.g.:
cudf/cpp/src/search/contains_table.cu
using a larger CG size (like 2 or 4) for nested types can bring significant speedups compared to the current CGSize == 1 for all types.

Do we need to adopt that here? Do we need an issue or TODO describing that optimization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to adopt that here?

Probably not. That requires nontrivial changes to the current code and the benefit is unclear, i.e., no users actually complained about the groupby performance with nested types. I'm inclined to look into it until we have the bandwidth or someone raises an issue about it. Does it sound good to you?

Copy link
Contributor

@bdice bdice Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d like some kind of note in the code or an issue to make sure that we are aware of this optimization potential for the future. Otherwise, no action needed in terms of implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Done in 56a2229

cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>,
ComparatorType>;
cudf::nullate::DYNAMIC>>;

/**
* @brief List of aggregation operations that can be computed with a hash-based
Expand Down Expand Up @@ -190,14 +178,14 @@ class groupby_simple_aggregations_collector final
}
};

template <typename ComparatorType>
template <typename SetType>
class hash_compound_agg_finalizer final : public cudf::detail::aggregation_finalizer {
column_view col;
data_type result_type;
cudf::detail::result_cache* sparse_results;
cudf::detail::result_cache* dense_results;
device_span<size_type const> gather_map;
map_type<ComparatorType> const& map;
SetType set;
bitmask_type const* __restrict__ row_bitmask;
rmm::cuda_stream_view stream;
rmm::mr::device_memory_resource* mr;
Expand All @@ -209,15 +197,15 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
map_type<ComparatorType> const& map,
SetType set,
bitmask_type const* row_bitmask,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: col(col),
sparse_results(sparse_results),
dense_results(dense_results),
gather_map(gather_map),
map(map),
set(set),
row_bitmask(row_bitmask),
stream(stream),
mr(mr)
Expand Down Expand Up @@ -340,8 +328,8 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
col.size(),
::cudf::detail::var_hash_functor<map_type<ComparatorType>>{
map, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
::cudf::detail::var_hash_functor{
set, row_bitmask, *var_result_view, *values_view, *sum_view, *count_view, agg._ddof});
sparse_results->add_result(col, agg, std::move(var_result));
dense_results->add_result(col, agg, to_dense_agg_result(agg));
}
Expand Down Expand Up @@ -398,13 +386,13 @@ flatten_single_pass_aggs(host_span<aggregation_request const> requests)
*
* @see groupby_null_templated()
*/
template <typename ComparatorType>
template <typename SetType>
void sparse_to_dense_results(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
cudf::detail::result_cache* dense_results,
device_span<size_type const> gather_map,
map_type<ComparatorType> const& map,
SetType set,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream,
Expand All @@ -423,7 +411,7 @@ void sparse_to_dense_results(table_view const& keys,
// Given an aggregation, this will get the result from sparse_results and
// convert and return dense, compacted result
auto finalizer = hash_compound_agg_finalizer(
col, sparse_results, dense_results, gather_map, map, row_bitmask_ptr, stream, mr);
col, sparse_results, dense_results, gather_map, set, row_bitmask_ptr, stream, mr);
for (auto&& agg : agg_v) {
agg->finalize(finalizer);
}
Expand Down Expand Up @@ -467,11 +455,11 @@ auto create_sparse_results_table(table_view const& flattened_values,
* @brief Computes all aggregations from `requests` that require a single pass
* over the data and stores the results in `sparse_results`
*/
template <typename ComparatorType>
template <typename SetType>
void compute_single_pass_aggs(table_view const& keys,
host_span<aggregation_request const> requests,
cudf::detail::result_cache* sparse_results,
map_type<ComparatorType>& map,
SetType set,
bool keys_have_nulls,
null_policy include_null_keys,
rmm::cuda_stream_view stream)
Expand All @@ -494,16 +482,16 @@ void compute_single_pass_aggs(table_view const& keys,
? cudf::detail::bitmask_and(keys, stream, rmm::mr::get_current_device_resource()).first
: rmm::device_buffer{};

thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn<map_type<ComparatorType>>{
map,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn{set,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
// Add results back to sparse_results cache
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
Expand All @@ -517,23 +505,15 @@ void compute_single_pass_aggs(table_view const& keys,
* @brief Computes and returns a device vector containing all populated keys in
* `map`.
*/
template <typename ComparatorType>
rmm::device_uvector<size_type> extract_populated_keys(map_type<ComparatorType> const& map,
template <typename SetType>
rmm::device_uvector<size_type> extract_populated_keys(SetType const& key_set,
size_type num_keys,
rmm::cuda_stream_view stream)
{
rmm::device_uvector<size_type> populated_keys(num_keys, stream);
auto const keys_end = key_set.retrieve_all(populated_keys.begin(), stream.value());

auto const get_key = cuda::proclaim_return_type<typename map_type<ComparatorType>::key_type>(
[] __device__(auto const& element) { return element.first; }); // first = key
auto const key_used = [unused = map.get_unused_key()] __device__(auto key) {
return key != unused;
};
auto const key_itr = thrust::make_transform_iterator(map.data(), get_key);
auto const end_it = cudf::detail::copy_if_safe(
key_itr, key_itr + map.capacity(), populated_keys.begin(), key_used, stream);

populated_keys.resize(std::distance(populated_keys.begin(), end_it), stream);
populated_keys.resize(std::distance(populated_keys.begin(), keys_end), stream);
return populated_keys;
}

Expand Down Expand Up @@ -580,38 +560,41 @@ std::unique_ptr<table> groupby(table_view const& keys,
auto const row_hash = cudf::experimental::row::hash::row_hasher{std::move(preprocessed_keys)};
auto const d_row_hash = row_hash.device_hasher(has_null);

size_type constexpr unused_key{std::numeric_limits<size_type>::max()};
size_type constexpr unused_value{std::numeric_limits<size_type>::max()};

// Cache of sparse results where the location of aggregate value in each
// column is indexed by the hash map
// column is indexed by the hash set
cudf::detail::result_cache sparse_results(requests.size());

auto const comparator_helper = [&](auto const d_key_equal) {
using allocator_type = typename map_type<decltype(d_key_equal)>::allocator_type;

auto const map = map_type<decltype(d_key_equal)>::create(compute_hash_table_size(num_keys),
stream,
unused_key,
unused_value,
d_row_hash,
d_key_equal,
allocator_type());
// Compute all single pass aggs first
compute_single_pass_aggs(
keys, requests, &sparse_results, *map, keys_have_nulls, include_null_keys, stream);
auto const set = cuco::static_set{num_keys,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_key_equal,
probing_scheme_type{d_row_hash},
cuco::thread_scope_device,
cuco::storage<1>{},
cudf::detail::cuco_allocator{stream},
stream.value()};

// Extract the populated indices from the hash map and create a gather map.
// Compute all single pass aggs first
compute_single_pass_aggs(keys,
requests,
&sparse_results,
set.ref(cuco::insert_and_find),
keys_have_nulls,
include_null_keys,
stream);

// Extract the populated indices from the hash set and create a gather map.
// Gathering using this map from sparse results will give dense results.
auto gather_map = extract_populated_keys(*map, keys.num_rows(), stream);
auto gather_map = extract_populated_keys(set, keys.num_rows(), stream);

// Compact all results from sparse_results and insert into cache
sparse_to_dense_results(keys,
requests,
&sparse_results,
cache,
gather_map,
*map,
set.ref(cuco::find),
keys_have_nulls,
include_null_keys,
stream,
Expand Down