From d9e8dcc36c9737e2d70dcab241508961a763811a Mon Sep 17 00:00:00 2001 From: Marcel Koch Date: Thu, 4 Apr 2024 10:05:30 +0000 Subject: [PATCH] [coll-comm] adds neighborhood implementation of collective communicator Co-authored-by: Pratik Nayak --- core/CMakeLists.txt | 1 + .../distributed/neighborhood_communicator.cpp | 240 ++++++++++++++++++ core/test/mpi/distributed/CMakeLists.txt | 1 + .../distributed/neighborhood_communicator.cpp | 213 ++++++++++++++++ .../distributed/neighborhood_communicator.hpp | 135 ++++++++++ include/ginkgo/ginkgo.hpp | 1 + 6 files changed, 591 insertions(+) create mode 100644 core/distributed/neighborhood_communicator.cpp create mode 100644 core/test/mpi/distributed/neighborhood_communicator.cpp create mode 100644 include/ginkgo/core/distributed/neighborhood_communicator.hpp diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index c4bea01dbd0..11707f3727d 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -134,6 +134,7 @@ if(GINKGO_BUILD_MPI) distributed/dense_communicator.cpp distributed/matrix.cpp distributed/partition_helpers.cpp + distributed/neighborhood_communicator.cpp distributed/vector.cpp distributed/preconditioner/schwarz.cpp) endif() diff --git a/core/distributed/neighborhood_communicator.cpp b/core/distributed/neighborhood_communicator.cpp new file mode 100644 index 00000000000..40e75b16ade --- /dev/null +++ b/core/distributed/neighborhood_communicator.cpp @@ -0,0 +1,240 @@ +// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#include "ginkgo/core/distributed/neighborhood_communicator.hpp" + +#include +#include + + +namespace gko { +namespace experimental { +namespace mpi { + + +/** + * \brief Computes the inverse envelope (target-ids, sizes) for a given + * one-sided communication pattern. + * + * \param exec the executor, this will always use the host executor + * \param comm communicator + * \param ids target ids of the one-sided operation + * \param sizes number of elements send to each id + * + * \return the inverse envelope consisting of the target-ids and the sizes + */ +std::tuple, std::vector> +communicate_inverse_envelope(std::shared_ptr exec, + mpi::communicator comm, + const std::vector& ids, + const std::vector& sizes) +{ + auto host_exec = exec->get_master(); + std::vector inverse_sizes_full(comm.size()); + mpi::window window(host_exec, inverse_sizes_full.data(), + inverse_sizes_full.size(), comm, + sizeof(comm_index_type), MPI_INFO_ENV); + window.fence(); + for (int i = 0; i < ids.size(); ++i) { + window.put(host_exec, sizes.data() + i, 1, ids[i], comm.rank(), 1); + } + window.fence(); + + std::vector inverse_sizes; + std::vector inverse_ids; + for (int i = 0; i < inverse_sizes_full.size(); ++i) { + if (inverse_sizes_full[i] > 0) { + inverse_ids.push_back(i); + inverse_sizes.push_back(inverse_sizes_full[i]); + } + } + + return std::make_tuple(std::move(inverse_ids), std::move(inverse_sizes)); +} + + +/** + * Creates a distributed graph communicator based on the input sources and + * destinations. + * + * The graph is unweighted and has the same rank ordering as the input + * communicator. + */ +mpi::communicator create_neighborhood_comm( + mpi::communicator base, const std::vector& sources, + const std::vector& destinations) +{ + auto in_degree = static_cast(sources.size()); + auto out_degree = static_cast(destinations.size()); + + // adjacent constructor guarantees that querying sources/destinations + // will result in the array having the same order as defined here + MPI_Comm graph_comm; + MPI_Info info; + GKO_ASSERT_NO_MPI_ERRORS(MPI_Info_dup(MPI_INFO_ENV, &info)); + GKO_ASSERT_NO_MPI_ERRORS(MPI_Dist_graph_create_adjacent( + base.get(), in_degree, sources.data(), + in_degree ? MPI_UNWEIGHTED : MPI_WEIGHTS_EMPTY, out_degree, + destinations.data(), out_degree ? MPI_UNWEIGHTED : MPI_WEIGHTS_EMPTY, + info, false, &graph_comm)); + GKO_ASSERT_NO_MPI_ERRORS(MPI_Info_free(&info)); + + return mpi::communicator::create_owning(graph_comm, + base.force_host_buffer()); +} + + +std::unique_ptr +NeighborhoodCommunicator::create_inverse() const +{ + auto base_comm = this->get_base_communicator(); + distributed::comm_index_type num_sources; + distributed::comm_index_type num_destinations; + distributed::comm_index_type weighted; + GKO_ASSERT_NO_MPI_ERRORS(MPI_Dist_graph_neighbors_count( + comm_.get(), &num_sources, &num_destinations, &weighted)); + + std::vector sources(num_sources); + std::vector destinations(num_destinations); + GKO_ASSERT_NO_MPI_ERRORS(MPI_Dist_graph_neighbors( + comm_.get(), num_sources, sources.data(), MPI_UNWEIGHTED, + num_destinations, destinations.data(), MPI_UNWEIGHTED)); + + return std::make_unique( + base_comm, destinations, send_sizes_, send_offsets_, sources, + recv_sizes_, recv_offsets_); +} + + +comm_index_type NeighborhoodCommunicator::get_recv_size() const +{ + return recv_offsets_.back(); +} + + +comm_index_type NeighborhoodCommunicator::get_send_size() const +{ + return send_offsets_.back(); +} + + +NeighborhoodCommunicator::NeighborhoodCommunicator( + communicator base, const std::vector& sources, + const std::vector& recv_sizes, + const std::vector& recv_offsets, + const std::vector& destinations, + const std::vector& send_sizes, + const std::vector& send_offsets) + : CollectiveCommunicator(base), comm_(MPI_COMM_NULL) +{ + comm_ = create_neighborhood_comm(base, sources, destinations); + send_sizes_ = send_sizes; + send_offsets_ = send_offsets; + recv_sizes_ = recv_sizes; + recv_offsets_ = recv_offsets; +} + + +NeighborhoodCommunicator::NeighborhoodCommunicator(communicator base) + : CollectiveCommunicator(std::move(base)), + comm_(MPI_COMM_SELF), + send_sizes_(), + send_offsets_(1), + recv_sizes_(), + recv_offsets_(1) +{ + // ensure that comm_ always has the correct topology + std::vector non_nullptr(1); + non_nullptr.resize(0); + comm_ = create_neighborhood_comm(this->get_base_communicator(), non_nullptr, + non_nullptr); +} + + +request NeighborhoodCommunicator::i_all_to_all_v( + std::shared_ptr exec, const void* send_buffer, + MPI_Datatype send_type, void* recv_buffer, MPI_Datatype recv_type) const +{ + auto guard = exec->get_scoped_device_id_guard(); + request req; + GKO_ASSERT_NO_MPI_ERRORS(MPI_Ineighbor_alltoallv( + send_buffer, send_sizes_.data(), send_offsets_.data(), send_type, + recv_buffer, recv_sizes_.data(), recv_offsets_.data(), recv_type, + comm_.get(), req.get())); + return req; +} + + +std::unique_ptr +NeighborhoodCommunicator::create_with_same_type( + communicator base, const distributed::index_map_variant& imap) const +{ + return std::visit( + [base](const auto& imap) { + return std::unique_ptr( + new NeighborhoodCommunicator(base, imap)); + }, + imap); +} + + +template +NeighborhoodCommunicator::NeighborhoodCommunicator( + communicator base, + const distributed::index_map& imap) + : CollectiveCommunicator(base), + comm_(MPI_COMM_SELF), + recv_sizes_(imap.get_remote_target_ids().get_size()), + recv_offsets_(recv_sizes_.size() + 1), + send_offsets_(1) +{ + auto exec = imap.get_executor(); + if (!exec) { + return; + } + auto host_exec = exec->get_master(); + + auto recv_target_ids_arr = + make_temporary_clone(host_exec, &imap.get_remote_target_ids()); + auto remote_idx_offsets_arr = make_temporary_clone( + host_exec, &imap.get_remote_global_idxs().get_offsets()); + std::vector recv_target_ids( + recv_target_ids_arr->get_size()); + std::copy_n(recv_target_ids_arr->get_const_data(), + recv_target_ids_arr->get_size(), recv_target_ids.begin()); + for (size_type seg_id = 0; + seg_id < imap.get_remote_global_idxs().get_segment_count(); ++seg_id) { + recv_sizes_[seg_id] = + remote_idx_offsets_arr->get_const_data()[seg_id + 1] - + remote_idx_offsets_arr->get_const_data()[seg_id]; + } + auto send_envelope = + communicate_inverse_envelope(exec, base, recv_target_ids, recv_sizes_); + const auto& send_target_ids = std::get<0>(send_envelope); + send_sizes_ = std::move(std::get<1>(send_envelope)); + + send_offsets_.resize(send_sizes_.size() + 1); + std::partial_sum(send_sizes_.begin(), send_sizes_.end(), + send_offsets_.begin() + 1); + std::partial_sum(recv_sizes_.begin(), recv_sizes_.end(), + recv_offsets_.begin() + 1); + + comm_ = create_neighborhood_comm(base, recv_target_ids, send_target_ids); +} + + +#define GKO_DECLARE_NEIGHBORHOOD_CONSTRUCTOR(LocalIndexType, GlobalIndexType) \ + NeighborhoodCommunicator::NeighborhoodCommunicator( \ + communicator base, \ + const distributed::index_map& imap) + +GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE( + GKO_DECLARE_NEIGHBORHOOD_CONSTRUCTOR); + +#undef GKO_DECLARE_NEIGHBORHOOD_CONSTRUCTOR + + +} // namespace mpi +} // namespace experimental +} // namespace gko diff --git a/core/test/mpi/distributed/CMakeLists.txt b/core/test/mpi/distributed/CMakeLists.txt index 433b3dd1fc6..3a87d3518ef 100644 --- a/core/test/mpi/distributed/CMakeLists.txt +++ b/core/test/mpi/distributed/CMakeLists.txt @@ -1,6 +1,7 @@ ginkgo_create_test(helpers MPI_SIZE 1) ginkgo_create_test(matrix MPI_SIZE 1) ginkgo_create_test(dense_communicator MPI_SIZE 6) +ginkgo_create_test(neighborhood_communicator MPI_SIZE 6) add_subdirectory(preconditioner) add_subdirectory(solver) diff --git a/core/test/mpi/distributed/neighborhood_communicator.cpp b/core/test/mpi/distributed/neighborhood_communicator.cpp new file mode 100644 index 00000000000..07363a755eb --- /dev/null +++ b/core/test/mpi/distributed/neighborhood_communicator.cpp @@ -0,0 +1,213 @@ +// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#include + +#include + +#include "core/test/utils/assertions.hpp" + +using gko::experimental::mpi::comm_index_type; + +class NeighborhoodCommunicator : public ::testing::Test { +protected: + using part_type = gko::experimental::distributed::Partition; + using map_type = gko::experimental::distributed::index_map; + + void SetUp() override { ASSERT_EQ(comm.size(), 6); } + + std::shared_ptr ref = gko::ReferenceExecutor::create(); + gko::experimental::mpi::communicator comm = MPI_COMM_WORLD; + int rank = comm.rank(); +}; + + +TEST_F(NeighborhoodCommunicator, CanDefaultConstruct) +{ + gko::experimental::mpi::NeighborhoodCommunicator nhcomm{comm}; + + ASSERT_EQ(nhcomm.get_base_communicator(), comm); + ASSERT_EQ(nhcomm.get_send_size(), 0); + ASSERT_EQ(nhcomm.get_recv_size(), 0); +} + + +TEST_F(NeighborhoodCommunicator, CanConstructFromIndexMap) +{ + auto part = gko::share(part_type::build_from_global_size_uniform( + ref, comm.size(), comm.size() * 3)); + gko::array recv_connections[] = {{ref, {3, 5, 10, 11}}, + {ref, {0, 1, 7, 12, 13}}, + {ref, {3, 4, 17}}, + {ref, {1, 2, 12, 14}}, + {ref, {4, 5, 9, 10, 16, 15}}, + {ref, {8, 12, 13, 14}}}; + auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]}; + + gko::experimental::mpi::NeighborhoodCommunicator spcomm{comm, imap}; + + std::array send_sizes = {4, 6, 2, 4, 7, 3}; + ASSERT_EQ(spcomm.get_recv_size(), recv_connections[rank].get_size()); + ASSERT_EQ(spcomm.get_send_size(), send_sizes[rank]); +} + + +TEST_F(NeighborhoodCommunicator, CanConstructFromEnvelopData) +{ + std::vector sources[] = {{1, 2}, {0, 2, 4}, {1, 5}, + {0, 4}, {1, 3, 5}, {2, 4}}; + std::vector recv_sizes[] = {{2, 2}, {2, 1, 2}, {2, 1}, + {2, 2}, {2, 2, 2}, {1, 3}}; + std::vector destinations = sources[rank]; + std::vector send_sizes[] = {{2, 2}, {2, 2, 2}, {1, 1}, + {2, 2}, {2, 2, 3}, {1, 2}}; + std::vector recv_offsets(recv_sizes[rank].size() + 1); + std::vector send_offsets(send_sizes[rank].size() + 1); + std::partial_sum(recv_sizes[rank].begin(), recv_sizes[rank].end(), + recv_offsets.begin() + 1); + std::partial_sum(send_sizes[rank].begin(), send_sizes[rank].end(), + send_offsets.begin() + 1); + + gko::experimental::mpi::NeighborhoodCommunicator spcomm{ + comm, sources[rank], recv_sizes[rank], recv_offsets, + destinations, send_sizes[rank], send_offsets}; + + ASSERT_EQ(spcomm.get_recv_size(), recv_offsets.back()); + ASSERT_EQ(spcomm.get_send_size(), send_offsets.back()); +} + + +TEST_F(NeighborhoodCommunicator, CanConstructFromEmptyIndexMap) +{ + auto imap = map_type{ref}; + + gko::experimental::mpi::NeighborhoodCommunicator spcomm{comm, imap}; + + ASSERT_EQ(spcomm.get_recv_size(), 0); + ASSERT_EQ(spcomm.get_send_size(), 0); +} + + +TEST_F(NeighborhoodCommunicator, CanConstructFromIndexMapWithoutConnection) +{ + auto part = gko::share(part_type::build_from_global_size_uniform( + ref, comm.size(), comm.size() * 3)); + auto imap = map_type{ref, part, comm.rank(), {ref, 0}}; + + gko::experimental::mpi::NeighborhoodCommunicator spcomm{comm, imap}; + + ASSERT_EQ(spcomm.get_recv_size(), 0); + ASSERT_EQ(spcomm.get_send_size(), 0); +} + + +TEST_F(NeighborhoodCommunicator, CanConstructFromEmptyEnvelopData) +{ + std::vector sources; + std::vector recv_sizes; + std::vector destinations; + std::vector send_sizes; + std::vector recv_offsets{0}; + std::vector send_offsets{0}; + + gko::experimental::mpi::NeighborhoodCommunicator spcomm{ + comm, sources, send_sizes, send_offsets, + destinations, recv_sizes, recv_offsets}; + + ASSERT_EQ(spcomm.get_recv_size(), 0); + ASSERT_EQ(spcomm.get_send_size(), 0); +} + + +TEST_F(NeighborhoodCommunicator, CanCommunicateIalltoall) +{ + auto part = gko::share(part_type::build_from_global_size_uniform( + ref, comm.size(), comm.size() * 3)); + gko::array recv_connections[] = {{ref, {3, 5, 10, 11}}, + {ref, {0, 1, 7, 12, 13}}, + {ref, {3, 4, 17}}, + {ref, {1, 2, 12, 14}}, + {ref, {4, 5, 9, 10, 16, 15}}, + {ref, {8, 12, 13, 14}}}; + auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]}; + gko::experimental::mpi::NeighborhoodCommunicator spcomm{comm, imap}; + gko::array recv_buffer{ref, recv_connections[rank].get_size()}; + gko::array send_buffers[] = {{ref, {0, 1, 1, 2}}, + {ref, {3, 5, 3, 4, 4, 5}}, + {ref, {7, 8}}, + {ref, {10, 11, 9, 10}}, + {ref, {12, 13, 12, 14, 12, 13, 14}}, + {ref, {17, 16, 15}}}; + + auto req = spcomm.i_all_to_all_v(ref, send_buffers[rank].get_const_data(), + recv_buffer.get_data()); + req.wait(); + + GKO_ASSERT_ARRAY_EQ(recv_buffer, recv_connections[rank]); +} + + +TEST_F(NeighborhoodCommunicator, CanCommunicateIalltoallWhenEmpty) +{ + gko::experimental::mpi::NeighborhoodCommunicator spcomm{comm}; + + auto req = spcomm.i_all_to_all_v(ref, static_cast(nullptr), + static_cast(nullptr)); + req.wait(); +} + + +TEST_F(NeighborhoodCommunicator, CanCreateInverse) +{ + auto part = gko::share(part_type::build_from_global_size_uniform( + ref, comm.size(), comm.size() * 3)); + gko::array recv_connections[] = {{ref, {3, 5, 10, 11}}, + {ref, {0, 1, 7, 12, 13}}, + {ref, {3, 4, 17}}, + {ref, {1, 2, 12, 14}}, + {ref, {4, 5, 9, 10, 16, 15}}, + {ref, {8, 12, 13, 14}}}; + auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]}; + gko::experimental::mpi::NeighborhoodCommunicator spcomm{comm, imap}; + + auto inverse = spcomm.create_inverse(); + + ASSERT_EQ(inverse->get_recv_size(), spcomm.get_send_size()); + ASSERT_EQ(inverse->get_send_size(), spcomm.get_recv_size()); +} + + +TEST_F(NeighborhoodCommunicator, CanCommunicateRoundTrip) +{ + auto part = gko::share(part_type::build_from_global_size_uniform( + ref, comm.size(), comm.size() * 3)); + gko::array recv_connections[] = {{ref, {3, 5, 10, 11}}, + {ref, {0, 1, 7, 12, 13}}, + {ref, {3, 4, 17}}, + {ref, {1, 2, 12, 14}}, + {ref, {4, 5, 9, 10, 16, 15}}, + {ref, {8, 12, 13, 14}}}; + auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]}; + gko::experimental::mpi::NeighborhoodCommunicator spcomm{comm, imap}; + auto inverse = spcomm.create_inverse(); + gko::array send_buffers[] = {{ref, {1, 2, 3, 4}}, + {ref, {5, 6, 7, 8, 9, 10}}, + {ref, {11, 12}}, + {ref, {13, 14, 15, 16}}, + {ref, {17, 18, 19, 20, 21, 22, 23}}, + {ref, {24, 25, 26}}}; + gko::array recv_buffer{ref, recv_connections[rank].get_size()}; + gko::array round_trip{ref, send_buffers[rank].get_size()}; + + spcomm + .i_all_to_all_v(ref, send_buffers[rank].get_const_data(), + recv_buffer.get_data()) + .wait(); + inverse + ->i_all_to_all_v(ref, recv_buffer.get_const_data(), + round_trip.get_data()) + .wait(); + + GKO_ASSERT_ARRAY_EQ(send_buffers[rank], round_trip); +} diff --git a/include/ginkgo/core/distributed/neighborhood_communicator.hpp b/include/ginkgo/core/distributed/neighborhood_communicator.hpp new file mode 100644 index 00000000000..897cc47922e --- /dev/null +++ b/include/ginkgo/core/distributed/neighborhood_communicator.hpp @@ -0,0 +1,135 @@ +// SPDX-FileCopyrightText: 2017 - 2024 The Ginkgo authors +// +// SPDX-License-Identifier: BSD-3-Clause + +#ifndef GKO_PUBLIC_CORE_DISTRIBUTED_NEIGHBORHOOD_COMMUNICATOR_HPP_ +#define GKO_PUBLIC_CORE_DISTRIBUTED_NEIGHBORHOOD_COMMUNICATOR_HPP_ + + +#include + + +#if GINKGO_BUILD_MPI + +#include +#include +#include + + +namespace gko { +namespace experimental { +namespace mpi { + + +/** + * A collective_communicator that uses a neighborhood topology. + * + * The neighborhood communicator is defined by a list of neighbors this + * rank sends data to and a list of neighbors this rank receives data from. + * No communication with any ranks that is not in one of those lists will + * take place. + */ +class NeighborhoodCommunicator final : public CollectiveCommunicator { +public: + using CollectiveCommunicator::i_all_to_all_v; + + /** + * Default constructor with empty communication pattern + * @param base the base communicator + */ + explicit NeighborhoodCommunicator(communicator base); + + /** + * Create a neighborhood_communicator from an index map. + * + * The receive neighbors are defined by the remote indices and their + * owning ranks of the index map. The send neighbors are deduced + * from that through collective communication. + * + * @tparam LocalIndexType the local index type of the map + * @tparam GlobalIndexType the global index type of the map + * @param base the base communicator + * @param imap the index map that defines the communication pattern + */ + template + NeighborhoodCommunicator( + communicator base, + const distributed::index_map& imap); + + /** + * Create a neighborhood_communicator by explicitly defining the + * neighborhood lists and sizes/offsets. + * + * @param base the base communicator + * @param sources the ranks to receive from + * @param recv_sizes the number of elements to recv for each source + * @param recv_offsets the offset for each source + * @param destinations the ranks to send to + * @param send_sizes the number of elements to send for each destination + * @param send_offsets the offset for each destination + */ + NeighborhoodCommunicator( + communicator base, + const std::vector& sources, + const std::vector& recv_sizes, + const std::vector& recv_offsets, + const std::vector& destinations, + const std::vector& send_sizes, + const std::vector& send_offsets); + + /** + * Communicate data from all ranks to all other ranks using the + * neighboorhood communication MPI_Ineighbor_alltoallv. See MPI + * documentation for more details + * + * @param exec The executor, on which the message buffers are located. + * @param send_buffer the buffer to send + * @param send_type the MPI_Datatype for the send buffer + * @param recv_buffer the buffer to gather into + * @param recv_type the MPI_Datatype for the recv buffer + * + * @return the request handle for the call + */ + request i_all_to_all_v(std::shared_ptr exec, + const void* send_buffer, MPI_Datatype send_type, + void* recv_buffer, + MPI_Datatype recv_type) const override; + + std::unique_ptr create_with_same_type( + communicator base, + const distributed::index_map_variant& imap) const override; + /** + * Creates the inverse neighborhood_communicator by switching sources + * and destinations. + * + * @return collective_communicator with the inverse communication pattern + */ + [[nodiscard]] std::unique_ptr create_inverse() + const override; + + /** + * @copydoc collective_communicator::get_recv_size + */ + comm_index_type get_recv_size() const override; + + /** + * @copydoc collective_communicator::get_recv_size + */ + comm_index_type get_send_size() const override; + +private: + communicator comm_; + + std::vector send_sizes_; + std::vector send_offsets_; + std::vector recv_sizes_; + std::vector recv_offsets_; +}; + + +} // namespace mpi +} // namespace experimental +} // namespace gko + +#endif +#endif // GKO_PUBLIC_CORE_DISTRIBUTED_NEIGHBORHOOD_COMMUNICATOR_HPP_ diff --git a/include/ginkgo/ginkgo.hpp b/include/ginkgo/ginkgo.hpp index 7bf6a8c477d..9d897ce8762 100644 --- a/include/ginkgo/ginkgo.hpp +++ b/include/ginkgo/ginkgo.hpp @@ -65,6 +65,7 @@ #include #include #include +#include #include #include #include