Skip to content

Commit

Permalink
[dist] provide range-ids segmented by part-id
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcelKoch committed Aug 9, 2024
1 parent 5bca2bb commit 347f58d
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 1 deletion.
33 changes: 33 additions & 0 deletions common/cuda_hip/distributed/partition_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@

#include "core/distributed/partition_kernels.hpp"

#include <thrust/copy.h>
#include <thrust/count.h>
#include <thrust/device_ptr.h>
#include <thrust/execution_policy.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/scan.h>
#include <thrust/sequence.h>
#include <thrust/sort.h>

#include "common/cuda_hip/base/thrust.hpp"
#include "common/cuda_hip/components/atomic.hpp"
#include "common/unified/base/kernel_launch.hpp"
#include "core/components/fill_array_kernels.hpp"

Expand Down Expand Up @@ -132,6 +135,36 @@ GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_PARTITION_BUILD_STARTING_INDICES);


void build_ranges_by_part(std::shared_ptr<const DefaultExecutor> exec,
const int* range_parts, size_type num_ranges,
int num_parts, array<size_type>& range_ids,
array<int64>& sizes)
{
auto policy = thrust_policy(exec);

range_ids.resize_and_reset(num_ranges);
auto range_ids_ptr = range_ids.get_data();
thrust::sequence(policy, range_ids_ptr, range_ids_ptr + num_ranges);

// mutable copy of range_parts such that it can be used as keys for sorting
array<int> range_parts_copy{exec, num_ranges};
thrust::copy_n(policy, range_parts, num_ranges,
range_parts_copy.get_data());
auto range_parts_ptr = range_parts_copy.get_data();

thrust::stable_sort_by_key(policy, range_parts_ptr,
range_parts_ptr + num_ranges, range_ids_ptr);

sizes.resize_and_reset(num_parts);
auto sizes_ptr = sizes.get_data();
thrust::fill_n(policy, sizes_ptr, num_parts, 0);
thrust::for_each_n(policy, range_parts_ptr, num_ranges,
[sizes_ptr] __device__(const size_type pid) {
atomic_add(sizes_ptr + pid, int64(1));
});
}


} // namespace partition
} // namespace GKO_DEVICE_NAMESPACE
} // namespace kernels
Expand Down
1 change: 1 addition & 0 deletions core/device_hooks/common_kernels.inc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ GKO_STUB_INDEX_TYPE(GKO_PARTITION_BUILD_FROM_MAPPING);
GKO_STUB_INDEX_TYPE(GKO_PARTITION_BUILD_FROM_GLOBAL_SIZE);
GKO_STUB_LOCAL_GLOBAL_TYPE(GKO_DECLARE_PARTITION_BUILD_STARTING_INDICES);
GKO_STUB_LOCAL_GLOBAL_TYPE(GKO_DECLARE_PARTITION_IS_ORDERED);
GKO_STUB(GKO_DECLARE_PARTITION_BUILD_RANGES_BY_PART);


} // namespace partition
Expand Down
11 changes: 10 additions & 1 deletion core/distributed/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ GKO_REGISTER_OPERATION(build_ranges_from_global_size,
partition::build_ranges_from_global_size);
GKO_REGISTER_OPERATION(build_starting_indices,
partition::build_starting_indices);
GKO_REGISTER_OPERATION(build_ranges_by_part, partition::build_ranges_by_part);
GKO_REGISTER_OPERATION(has_ordered_parts, partition::has_ordered_parts);


Expand All @@ -38,7 +39,8 @@ Partition<LocalIndexType, GlobalIndexType>::Partition(
offsets_{exec, num_ranges + 1},
starting_indices_{exec, num_ranges},
part_sizes_{exec, static_cast<size_type>(num_parts)},
part_ids_{exec, num_ranges}
part_ids_{exec, num_ranges},
ranges_by_part_{exec}
{
offsets_.fill(0);
starting_indices_.fill(0);
Expand Down Expand Up @@ -126,6 +128,13 @@ void Partition<LocalIndexType, GlobalIndexType>::finalize_construction()
get_num_parts(), num_empty_parts_, starting_indices_.get_data(),
part_sizes_.get_data()));
size_ = get_element(offsets_, get_num_ranges());
array<size_type> range_ids(exec);
array<int64> num_ranges_per_part(exec);
exec->run(partition::make_build_ranges_by_part(
part_ids_.get_const_data(), get_num_ranges(), get_num_parts(),
range_ids, num_ranges_per_part));
ranges_by_part_ = segmented_array<size_type>::create_from_sizes(
std::move(range_ids), num_ranges_per_part);
}

template <typename LocalIndexType, typename GlobalIndexType>
Expand Down
7 changes: 7 additions & 0 deletions core/distributed/partition_kernels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ namespace kernels {
comm_index_type& num_empty_parts, \
LocalIndexType* ranks, LocalIndexType* sizes)

#define GKO_DECLARE_PARTITION_BUILD_RANGES_BY_PART \
void build_ranges_by_part(std::shared_ptr<const DefaultExecutor> exec, \
const int* range_parts, size_type num_ranges, \
int num_parts, array<size_type>& range_ids, \
array<int64>& sizes)

#define GKO_DECLARE_PARTITION_IS_ORDERED(LocalIndexType, GlobalIndexType) \
void has_ordered_parts(std::shared_ptr<const DefaultExecutor> exec, \
const experimental::distributed::Partition< \
Expand All @@ -67,6 +73,7 @@ namespace kernels {
template <typename LocalIndexType, typename GlobalIndexType> \
GKO_DECLARE_PARTITION_BUILD_STARTING_INDICES(LocalIndexType, \
GlobalIndexType); \
GKO_DECLARE_PARTITION_BUILD_RANGES_BY_PART; \
template <typename LocalIndexType, typename GlobalIndexType> \
GKO_DECLARE_PARTITION_IS_ORDERED(LocalIndexType, GlobalIndexType)

Expand Down
30 changes: 30 additions & 0 deletions dpcpp/distributed/partition_kernels.dp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "common/unified/base/kernel_launch.hpp"
#include "core/components/fill_array_kernels.hpp"
#include "dpcpp/base/onedpl.hpp"
#include "dpcpp/components/atomic.dp.hpp"


namespace gko {
Expand Down Expand Up @@ -130,6 +131,35 @@ GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_PARTITION_BUILD_STARTING_INDICES);


void build_ranges_by_part(std::shared_ptr<const DefaultExecutor> exec,
const int* range_parts, size_type num_ranges,
int num_parts, array<size_type>& range_ids,
array<int64>& sizes)
{
auto policy = onedpl_policy(exec);

range_ids.resize_and_reset(num_ranges);
auto range_ids_ptr = range_ids.get_data();
// fill range_ids with 0,...,num_ranges - 1
run_kernel(
exec, [] GKO_KERNEL(auto i, auto rid) { rid[i] = i; }, num_ranges,
range_ids_ptr);

oneapi::dpl::stable_sort(policy, range_ids_ptr, range_ids_ptr + num_ranges,
[range_parts](const auto rid_a, const auto rid_b) {
return range_parts[rid_a] < range_parts[rid_b];
});

sizes.resize_and_reset(num_parts);
auto sizes_ptr = sizes.get_data();
oneapi::dpl::fill_n(policy, sizes_ptr, num_parts, 0);
oneapi::dpl::for_each_n(policy, range_parts, num_ranges,
[sizes_ptr](const size_type pid) {
atomic_add(sizes_ptr + pid, int64(1));
});
}


} // namespace partition
} // namespace dpcpp
} // namespace kernels
Expand Down
12 changes: 12 additions & 0 deletions include/ginkgo/core/distributed/partition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <ginkgo/core/base/array.hpp>
#include <ginkgo/core/base/polymorphic_object.hpp>
#include <ginkgo/core/base/segmented_array.hpp>
#include <ginkgo/core/base/types.hpp>


Expand Down Expand Up @@ -190,6 +191,16 @@ class Partition : public EnablePolymorphicObject<
*/
local_index_type get_part_size(comm_index_type part) const;

/**
* Returns the range IDs segmented by their part ID.
*
* @return range IDs segmented by part IDs
*/
const segmented_array<size_type>& get_ranges_by_part() const
{
return ranges_by_part_;
}

/**
* Checks if each part has no more than one contiguous range.
*
Expand Down Expand Up @@ -274,6 +285,7 @@ class Partition : public EnablePolymorphicObject<
array<local_index_type> starting_indices_;
array<local_index_type> part_sizes_;
array<comm_index_type> part_ids_;
segmented_array<size_type> ranges_by_part_;
};


Expand Down
28 changes: 28 additions & 0 deletions omp/distributed/partition_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "core/distributed/partition_kernels.hpp"

#include <algorithm>

#include <omp.h>

#include <ginkgo/core/base/math.hpp>
Expand Down Expand Up @@ -72,6 +74,32 @@ GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_PARTITION_BUILD_STARTING_INDICES);


void build_ranges_by_part(std::shared_ptr<const DefaultExecutor> exec,
const int* range_parts, size_type num_ranges,
int num_parts, array<size_type>& range_ids,
array<int64>& sizes)
{
range_ids.resize_and_reset(num_ranges);
std::iota(range_ids.get_data(), range_ids.get_data() + num_ranges,
size_type(0));
// sort by (part_id, range_id)
std::sort(range_ids.get_data(), range_ids.get_data() + num_ranges,
[range_parts](auto rid_a, auto rid_b) {
return std::tie(range_parts[rid_a], rid_a) <
std::tie(range_parts[rid_b], rid_b);
});

sizes.resize_and_reset(num_parts);
std::fill_n(sizes.get_data(), num_parts, int64(0));
#pragma omp parallel for
for (size_type i = 0; i < num_ranges; ++i) {
auto& size = sizes.get_data()[range_parts[range_ids.get_data()[i]]];
#pragma omp atomic
size++;
}
}


} // namespace partition
} // namespace omp
} // namespace kernels
Expand Down
26 changes: 26 additions & 0 deletions reference/distributed/partition_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "core/distributed/partition_kernels.hpp"

#include "core/base/segmented_array.hpp"


namespace gko {
namespace kernels {
Expand Down Expand Up @@ -109,6 +111,30 @@ void build_starting_indices(std::shared_ptr<const DefaultExecutor> exec,
GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE(
GKO_DECLARE_PARTITION_BUILD_STARTING_INDICES);


void build_ranges_by_part(std::shared_ptr<const DefaultExecutor> exec,
const int* range_parts, size_type num_ranges,
int num_parts, array<size_type>& range_ids,
array<int64>& sizes)
{
range_ids.resize_and_reset(num_ranges);
std::iota(range_ids.get_data(), range_ids.get_data() + num_ranges,
size_type(0));
// sort by (part_id, range_id)
std::sort(range_ids.get_data(), range_ids.get_data() + num_ranges,
[range_parts](auto rid_a, auto rid_b) {
return std::tie(range_parts[rid_a], rid_a) <
std::tie(range_parts[rid_b], rid_b);
});

sizes.resize_and_reset(num_parts);
std::fill_n(sizes.get_data(), num_parts, int64(0));
for (size_type i = 0; i < num_ranges; ++i) {
sizes.get_data()[range_parts[i]]++;
}
}


template <typename LocalIndexType, typename GlobalIndexType>
void has_ordered_parts(
std::shared_ptr<const DefaultExecutor> exec,
Expand Down
35 changes: 35 additions & 0 deletions reference/test/distributed/partition_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ void assert_equal_data(const T* data, std::initializer_list<U> reference_data)
}


template <typename T, typename U>
void assert_equal_segmented_array(const gko::segmented_array<T>& data,
std::initializer_list<U> buffer,
std::initializer_list<gko::int64> offsets)
{
gko::array<T> buffer_arr(data.get_executor(), buffer);
gko::array<gko::int64> offsets_arr(data.get_executor(), offsets);
auto view = gko::make_const_array_view(data.get_executor(), data.get_size(),
data.get_const_flat_data())
.copy_to_array();

GKO_ASSERT_ARRAY_EQ(view, buffer_arr);
GKO_ASSERT_ARRAY_EQ(data.get_offsets(), offsets_arr);
}


template <typename LocalGlobalIndexType>
class Partition : public ::testing::Test {
protected:
Expand Down Expand Up @@ -75,6 +91,8 @@ TYPED_TEST(Partition, BuildsFromMapping)
assert_equal_data(partition->get_range_starting_indices(),
{0, 0, 0, 2, 1, 2, 3, 3, 3, 4});
assert_equal_data(partition->get_part_sizes(), {5, 6, 5});
assert_equal_segmented_array(partition->get_ranges_by_part(),
{1, 4, 6, 9, 2, 5, 7, 0, 3, 8}, {0, 4, 7, 10});
}


Expand All @@ -100,6 +118,9 @@ TYPED_TEST(Partition, BuildsFromMappingWithEmptyParts)
assert_equal_data(partition->get_range_starting_indices(),
{0, 0, 0, 2, 1, 2, 3, 3, 3, 4});
assert_equal_data(partition->get_part_sizes(), {5, 6, 0, 5, 0});
assert_equal_segmented_array(partition->get_ranges_by_part(),
{1, 4, 6, 9, 2, 5, 7, 0, 3, 8},
{0, 4, 7, 7, 10, 10});
}


Expand All @@ -119,6 +140,8 @@ TYPED_TEST(Partition, BuildsFromRanges)
assert_equal_data(partition->get_part_ids(), {0, 1, 2, 3, 4});
assert_equal_data(partition->get_range_starting_indices(), {0, 0, 0, 0, 0});
assert_equal_data(partition->get_part_sizes(), {5, 0, 2, 2, 1});
assert_equal_segmented_array(partition->get_ranges_by_part(),
{0, 1, 2, 3, 4}, {0, 1, 2, 3, 4, 5});
}


Expand All @@ -135,6 +158,8 @@ TYPED_TEST(Partition, BuildsFromRangeWithSingleElement)
EXPECT_EQ(partition->get_num_parts(), 0);
EXPECT_EQ(partition->get_num_empty_parts(), 0);
assert_equal_data(partition->get_range_bounds(), {0});
assert_equal_segmented_array(partition->get_ranges_by_part(), I<int>{},
{0});
}


Expand All @@ -156,6 +181,8 @@ TYPED_TEST(Partition, BuildsFromRangesWithPartIds)
assert_equal_data(partition->get_part_ids(), {0, 4, 3, 1, 2});
assert_equal_data(partition->get_range_starting_indices(), {0, 0, 0, 0, 0});
assert_equal_data(partition->get_part_sizes(), {5, 2, 1, 2, 0});
assert_equal_segmented_array(partition->get_ranges_by_part(),
{0, 3, 4, 2, 1}, {0, 1, 2, 3, 4, 5});
}


Expand All @@ -174,6 +201,8 @@ TYPED_TEST(Partition, BuildsFromGlobalSize)
assert_equal_data(partition->get_part_ids(), {0, 1, 2, 3, 4});
assert_equal_data(partition->get_range_starting_indices(), {0, 0, 0, 0, 0});
assert_equal_data(partition->get_part_sizes(), {3, 3, 3, 2, 2});
assert_equal_segmented_array(partition->get_ranges_by_part(),
{0, 1, 2, 3, 4}, {0, 1, 2, 3, 4, 5});
}


Expand All @@ -191,6 +220,8 @@ TYPED_TEST(Partition, BuildsFromGlobalSizeEmptySize)
assert_equal_data(partition->get_part_ids(), {0, 1, 2, 3, 4});
assert_equal_data(partition->get_range_starting_indices(), {0, 0, 0, 0, 0});
assert_equal_data(partition->get_part_sizes(), {0, 0, 0, 0, 0});
assert_equal_segmented_array(partition->get_ranges_by_part(),
{0, 1, 2, 3, 4}, {0, 1, 2, 3, 4, 5});
}


Expand All @@ -208,6 +239,8 @@ TYPED_TEST(Partition, BuildsFromGlobalSizeWithEmptyParts)
assert_equal_data(partition->get_part_ids(), {0, 1, 2, 3, 4});
assert_equal_data(partition->get_range_starting_indices(), {0, 0, 0, 0, 0});
assert_equal_data(partition->get_part_sizes(), {1, 1, 1, 0, 0});
assert_equal_segmented_array(partition->get_ranges_by_part(),
{0, 1, 2, 3, 4}, {0, 1, 2, 3, 4, 5});
}


Expand All @@ -225,6 +258,8 @@ TYPED_TEST(Partition, BuildsFromGlobalSizeWithZeroParts)
ASSERT_EQ(partition->get_part_ids(), nullptr);
ASSERT_EQ(partition->get_range_starting_indices(), nullptr);
ASSERT_EQ(partition->get_part_sizes(), nullptr);
assert_equal_segmented_array(partition->get_ranges_by_part(), I<int>{},
{0});
}


Expand Down
10 changes: 10 additions & 0 deletions test/distributed/partition_kernels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ class Partition : public CommonTestFixture {
gko::make_array_view(
this->exec, dpart->get_num_parts(),
const_cast<local_index_type*>(dpart->get_part_sizes())));

GKO_ASSERT_ARRAY_EQ(
gko::make_const_array_view(
this->ref, part->get_num_ranges(),
part->get_ranges_by_part().get_const_flat_data()),
gko::make_const_array_view(
this->exec, dpart->get_num_ranges(),
dpart->get_ranges_by_part().get_const_flat_data()));
GKO_ASSERT_ARRAY_EQ(part->get_ranges_by_part().get_offsets(),
dpart->get_ranges_by_part().get_offsets())
}

std::default_random_engine rand_engine;
Expand Down

0 comments on commit 347f58d

Please sign in to comment.