Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
PointKernel committed Jan 19, 2024
1 parent 11b8126 commit 166ed49
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 40 deletions.
79 changes: 42 additions & 37 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/groupby.hpp>
#include <cudf/hashing/detail/default_hash.cuh>
#include <cudf/hashing/detail/hash_allocator.cuh>
#include <cudf/scalar/scalar.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table.hpp>
Expand All @@ -45,12 +46,10 @@
#include <cudf/types.hpp>
#include <cudf/utilities/traits.cuh>
#include <cudf/utilities/traits.hpp>
#include <cudf/hashing/detail/hash_allocator.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/polymorphic_allocator.hpp>


#include <cuco/static_set.cuh>

#include <thrust/copy.h>
Expand All @@ -71,24 +70,24 @@ namespace detail {
namespace hash {
namespace {

int constexpr cg_size = 1; ///< Number of threads used to handle each input key
int constexpr window_size = 1; ///< Number of slots checked per thread
cudf::size_type constexpr key_sentinel = -1; ///< Sentinel value indicating an empty slot
int constexpr cg_size = 1; ///< Number of threads used to handle each input key
int constexpr window_size = 1; ///< Number of slots checked per thread
cudf::size_type constexpr key_sentinel = -1; ///< Sentinel value indicating an empty slot

using probing_scheme_type = cuco::experimental::linear_probing<cg_size,
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>>;
using probing_scheme_type = cuco::experimental::linear_probing<
cg_size,
cudf::experimental::row::hash::device_row_hasher<cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>>;
using allocator_type = rmm::mr::stream_allocator_adaptor<default_allocator<cudf::size_type>>;

template <typename ComparatorType>
using set_type = cuco::experimental::static_set<
cudf::size_type,
cuco::experimental::extent<cudf::size_type>,
cuda::thread_scope_device,
ComparatorType,
probing_scheme_type,
allocator_type,
cuco::experimental::storage<window_size>>;
using set_type = cuco::experimental::static_set<cudf::size_type,
cuco::experimental::extent<cudf::size_type>,
cuda::thread_scope_device,
ComparatorType,
probing_scheme_type,
allocator_type,
cuco::experimental::storage<window_size>>;

/**
* @brief List of aggregation operations that can be computed with a hash-based
Expand Down Expand Up @@ -508,16 +507,16 @@ void compute_single_pass_aggs(table_view const& keys,
? cudf::detail::bitmask_and(keys, stream, rmm::mr::get_current_device_resource()).first
: rmm::device_buffer{};

thrust::for_each_n(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn{
set,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
keys.num_rows(),
hash::compute_single_pass_aggs_fn{set,
*d_values,
*d_sparse_table,
d_aggs.data(),
static_cast<bitmask_type*>(row_bitmask.data()),
skip_key_rows_with_nulls});
// Add results back to sparse_results cache
auto sparse_result_cols = sparse_table.release();
for (size_t i = 0; i < aggs.size(); i++) {
Expand Down Expand Up @@ -591,17 +590,23 @@ std::unique_ptr<table> groupby(table_view const& keys,
cudf::detail::result_cache sparse_results(requests.size());

auto const comparator_helper = [&](auto const d_key_equal) {
auto const set = set_type<decltype(d_key_equal)>{num_keys,
0.5, // desired load factor
cuco::empty_key{key_sentinel},
d_key_equal,
probing_scheme_type{d_row_hash},
allocator_type{default_allocator<cudf::size_type>{}, stream},
stream.value()};

// Compute all single pass aggs first
compute_single_pass_aggs(
keys, requests, &sparse_results, set.ref(cuco::experimental::insert_and_find), keys_have_nulls, include_null_keys, stream);
auto const set =
set_type<decltype(d_key_equal)>{num_keys,
0.5, // desired load factor
cuco::empty_key{key_sentinel},
d_key_equal,
probing_scheme_type{d_row_hash},
allocator_type{default_allocator<cudf::size_type>{}, stream},
stream.value()};

// Compute all single pass aggs first
compute_single_pass_aggs(keys,
requests,
&sparse_results,
set.ref(cuco::experimental::insert_and_find),
keys_have_nulls,
include_null_keys,
stream);

// Extract the populated indices from the hash map and create a gather map.
// Gathering using this map from sparse results will give dense results.
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/groupby/hash/groupby_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ struct compute_single_pass_aggs_fn {
if (not skip_rows_with_nulls or cudf::bit_is_set(row_bitmask, i)) {
auto const result = set.insert_and_find(i);

cudf::detail::aggregate_row<true, true>(
output_values, *result.first, input_values, i, aggs);
cudf::detail::aggregate_row<true, true>(output_values, *result.first, input_values, i, aggs);
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/hash/multi_pass_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct var_hash_functor {
__device__ inline void operator()(size_type source_index)
{
if (row_bitmask == nullptr or cudf::bit_is_set(row_bitmask, source_index)) {
auto const target_index = *set.find(source_index);
auto const target_index = *set.find(source_index);

auto col = source;
auto source_type = source.type();
Expand Down

0 comments on commit 166ed49

Please sign in to comment.