Skip to content

Commit 7434dfc

Browse files
committed
[coll-comm] add dense communication
Signed-off-by: Marcel Koch <[email protected]>
1 parent 99a98e8 commit 7434dfc

File tree

5 files changed

+490
-0
lines changed

5 files changed

+490
-0
lines changed

core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ if(GINKGO_BUILD_MPI)
147147
mpi/exception.cpp
148148
distributed/assembly.cpp
149149
distributed/collective_communicator.cpp
150+
distributed/dense_communicator.cpp
150151
distributed/matrix.cpp
151152
distributed/partition_helpers.cpp
152153
distributed/vector.cpp
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// SPDX-FileCopyrightText: 2024 - 2025 The Ginkgo authors
2+
//
3+
// SPDX-License-Identifier: BSD-3-Clause
4+
5+
#include "ginkgo/core/distributed/dense_communicator.hpp"
6+
7+
namespace gko {
8+
namespace experimental {
9+
namespace mpi {
10+
11+
12+
DenseCommunicator::DenseCommunicator(communicator base)
13+
: CollectiveCommunicator(base),
14+
comm_(base),
15+
recv_sizes_(comm_.size()),
16+
recv_offsets_(comm_.size() + 1),
17+
send_sizes_(comm_.size()),
18+
send_offsets_(comm_.size() + 1)
19+
{}
20+
21+
22+
template <typename LocalIndexType, typename GlobalIndexType>
23+
DenseCommunicator::DenseCommunicator(
24+
communicator base,
25+
const distributed::index_map<LocalIndexType, GlobalIndexType>& imap)
26+
: DenseCommunicator(base)
27+
{
28+
auto exec = imap.get_executor();
29+
if (!exec) {
30+
return;
31+
}
32+
auto host_exec = exec->get_master();
33+
34+
auto recv_target_ids_arr =
35+
make_temporary_clone(host_exec, &imap.get_remote_target_ids());
36+
auto remote_idx_offsets_arr = make_temporary_clone(
37+
host_exec, &imap.get_remote_global_idxs().get_offsets());
38+
for (size_type seg_id = 0;
39+
seg_id < imap.get_remote_global_idxs().get_segment_count(); ++seg_id) {
40+
recv_sizes_[recv_target_ids_arr->get_const_data()[seg_id]] =
41+
remote_idx_offsets_arr->get_const_data()[seg_id + 1] -
42+
remote_idx_offsets_arr->get_const_data()[seg_id];
43+
}
44+
45+
comm_.all_to_all(host_exec, recv_sizes_.data(), 1, send_sizes_.data(), 1);
46+
47+
std::partial_sum(send_sizes_.begin(), send_sizes_.end(),
48+
send_offsets_.begin() + 1);
49+
std::partial_sum(recv_sizes_.begin(), recv_sizes_.end(),
50+
recv_offsets_.begin() + 1);
51+
}
52+
53+
#define GKO_DECLARE_DENSE_CONSTRUCTOR(LocalIndexType, GlobalIndexType) \
54+
DenseCommunicator::DenseCommunicator( \
55+
communicator base, \
56+
const distributed::index_map<LocalIndexType, GlobalIndexType>& imap)
57+
58+
GKO_INSTANTIATE_FOR_EACH_LOCAL_GLOBAL_INDEX_TYPE(GKO_DECLARE_DENSE_CONSTRUCTOR);
59+
60+
#undef GKO_DECLARE_DENSE_CONSTRUCTOR
61+
62+
63+
DenseCommunicator::DenseCommunicator(
64+
communicator base, const std::vector<comm_index_type>& recv_sizes,
65+
const std::vector<comm_index_type>& recv_offsets,
66+
const std::vector<comm_index_type>& send_sizes,
67+
const std::vector<comm_index_type>& send_offsets)
68+
: CollectiveCommunicator(base),
69+
comm_(base),
70+
recv_sizes_(recv_sizes),
71+
recv_offsets_(recv_offsets),
72+
send_sizes_(send_sizes),
73+
send_offsets_(send_offsets)
74+
{}
75+
76+
77+
request DenseCommunicator::i_all_to_all_v(std::shared_ptr<const Executor> exec,
78+
const void* send_buffer,
79+
MPI_Datatype send_type,
80+
void* recv_buffer,
81+
MPI_Datatype recv_type) const
82+
{
83+
#ifdef GINKGO_FORCE_SPMV_BLOCKING_COMM
84+
comm_.all_to_all_v(exec, send_buffer, send_sizes_.data(),
85+
send_offsets_.data(), send_type, recv_buffer,
86+
recv_sizes_.data(), recv_offsets_.data(), recv_type);
87+
return {};
88+
#else
89+
return comm_.i_all_to_all_v(
90+
exec, send_buffer, send_sizes_.data(), send_offsets_.data(), send_type,
91+
recv_buffer, recv_sizes_.data(), recv_offsets_.data(), recv_type);
92+
#endif
93+
}
94+
95+
96+
std::unique_ptr<CollectiveCommunicator>
97+
DenseCommunicator::create_with_same_type(
98+
communicator base, const distributed::index_map_variant& imap) const
99+
{
100+
return std::visit(
101+
[base](const auto& imap) {
102+
return std::make_unique<DenseCommunicator>(base, imap);
103+
},
104+
imap);
105+
}
106+
107+
108+
std::unique_ptr<CollectiveCommunicator> DenseCommunicator::create_inverse()
109+
const
110+
{
111+
return std::make_unique<DenseCommunicator>(
112+
comm_, send_sizes_, send_offsets_, recv_sizes_, recv_offsets_);
113+
}
114+
115+
116+
comm_index_type DenseCommunicator::get_recv_size() const
117+
{
118+
return recv_offsets_.back();
119+
}
120+
121+
122+
comm_index_type DenseCommunicator::get_send_size() const
123+
{
124+
return send_offsets_.back();
125+
}
126+
127+
128+
} // namespace mpi
129+
} // namespace experimental
130+
} // namespace gko

core/test/mpi/distributed/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
ginkgo_create_test(helpers MPI_SIZE 1)
22
ginkgo_create_test(matrix MPI_SIZE 1)
3+
ginkgo_create_test(dense_communicator MPI_SIZE 6)
34
ginkgo_create_test(vector_cache MPI_SIZE 3)
45

56
add_subdirectory(preconditioner)
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
// SPDX-FileCopyrightText: 2017 - 2025 The Ginkgo authors
2+
//
3+
// SPDX-License-Identifier: BSD-3-Clause
4+
5+
#include <gtest/gtest.h>
6+
7+
#include <ginkgo/core/distributed/dense_communicator.hpp>
8+
9+
#include "core/test/utils/assertions.hpp"
10+
11+
using gko::experimental::mpi::comm_index_type;
12+
13+
class DenseCommunicator : public ::testing::Test {
14+
protected:
15+
using part_type = gko::experimental::distributed::Partition<int, long>;
16+
using map_type = gko::experimental::distributed::index_map<int, long>;
17+
18+
void SetUp() override { ASSERT_EQ(comm.size(), 6); }
19+
20+
std::shared_ptr<gko::Executor> ref = gko::ReferenceExecutor::create();
21+
gko::experimental::mpi::communicator comm = MPI_COMM_WORLD;
22+
int rank = comm.rank();
23+
};
24+
25+
26+
TEST_F(DenseCommunicator, CanDefaultConstruct)
27+
{
28+
gko::experimental::mpi::DenseCommunicator nhcomm{comm};
29+
30+
ASSERT_EQ(nhcomm.get_base_communicator(), comm);
31+
ASSERT_EQ(nhcomm.get_send_size(), 0);
32+
ASSERT_EQ(nhcomm.get_recv_size(), 0);
33+
}
34+
35+
36+
TEST_F(DenseCommunicator, CanConstructFromIndexMap)
37+
{
38+
auto part = gko::share(part_type::build_from_global_size_uniform(
39+
ref, comm.size(), comm.size() * 3));
40+
gko::array<long> recv_connections[] = {{ref, {3, 5, 10, 11}},
41+
{ref, {0, 1, 7, 12, 13}},
42+
{ref, {3, 4, 17}},
43+
{ref, {1, 2, 12, 14}},
44+
{ref, {4, 5, 9, 10, 16, 15}},
45+
{ref, {8, 12, 13, 14}}};
46+
auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]};
47+
48+
gko::experimental::mpi::DenseCommunicator spcomm{comm, imap};
49+
50+
std::array<gko::size_type, 6> send_sizes = {4, 6, 2, 4, 7, 3};
51+
ASSERT_EQ(spcomm.get_recv_size(), recv_connections[rank].get_size());
52+
ASSERT_EQ(spcomm.get_send_size(), send_sizes[rank]);
53+
}
54+
55+
56+
TEST_F(DenseCommunicator, CanConstructFromEnvelopData)
57+
{
58+
// clang-format off
59+
std::vector<comm_index_type> recv_sizes[] = {
60+
{0, 2, 2,
61+
0, 0, 0},
62+
{2, 0, 1,
63+
2, 0, 0},
64+
{0, 2, 0,
65+
0, 0, 1},
66+
{2, 0, 0,
67+
0, 2, 0},
68+
{0, 2, 0,
69+
2, 0, 2},
70+
{0, 0, 1,
71+
0, 3, 0}};
72+
std::vector<comm_index_type> send_sizes[] = {
73+
{0, 2, 0,
74+
2, 0, 0},
75+
{2, 0, 2,
76+
0, 2, 0},
77+
{0, 1, 0,
78+
0, 0, 1},
79+
{2, 0, 0,
80+
0, 2, 0},
81+
{0, 2, 0,
82+
2, 0, 3},
83+
{0, 0, 1,
84+
0, 2, 0}};
85+
// clang-format on
86+
std::vector<comm_index_type> recv_offsets(recv_sizes[rank].size() + 1);
87+
std::vector<comm_index_type> send_offsets(send_sizes[rank].size() + 1);
88+
std::partial_sum(recv_sizes[rank].begin(), recv_sizes[rank].end(),
89+
recv_offsets.begin() + 1);
90+
std::partial_sum(send_sizes[rank].begin(), send_sizes[rank].end(),
91+
send_offsets.begin() + 1);
92+
93+
gko::experimental::mpi::DenseCommunicator spcomm{
94+
comm, recv_sizes[rank], recv_offsets, send_sizes[rank], send_offsets,
95+
};
96+
97+
ASSERT_EQ(spcomm.get_recv_size(), recv_offsets.back());
98+
ASSERT_EQ(spcomm.get_send_size(), send_offsets.back());
99+
}
100+
101+
102+
TEST_F(DenseCommunicator, CanConstructFromEmptyIndexMap)
103+
{
104+
auto imap = map_type{ref};
105+
106+
gko::experimental::mpi::DenseCommunicator spcomm{comm, imap};
107+
108+
ASSERT_EQ(spcomm.get_recv_size(), 0);
109+
ASSERT_EQ(spcomm.get_send_size(), 0);
110+
}
111+
112+
113+
TEST_F(DenseCommunicator, CanConstructFromIndexMapWithoutConnection)
114+
{
115+
auto part = gko::share(part_type::build_from_global_size_uniform(
116+
ref, comm.size(), comm.size() * 3));
117+
auto imap = map_type{ref, part, comm.rank(), {ref, 0}};
118+
119+
gko::experimental::mpi::DenseCommunicator spcomm{comm, imap};
120+
121+
ASSERT_EQ(spcomm.get_recv_size(), 0);
122+
ASSERT_EQ(spcomm.get_send_size(), 0);
123+
}
124+
125+
126+
TEST_F(DenseCommunicator, CanConstructFromEmptyEnvelopData)
127+
{
128+
std::vector<comm_index_type> recv_sizes;
129+
std::vector<comm_index_type> send_sizes;
130+
std::vector<comm_index_type> recv_offsets{0};
131+
std::vector<comm_index_type> send_offsets{0};
132+
133+
gko::experimental::mpi::DenseCommunicator spcomm{
134+
comm, recv_sizes, recv_offsets, send_sizes, send_offsets,
135+
};
136+
137+
ASSERT_EQ(spcomm.get_recv_size(), 0);
138+
ASSERT_EQ(spcomm.get_send_size(), 0);
139+
}
140+
141+
142+
TEST_F(DenseCommunicator, CanCommunicateIalltoall)
143+
{
144+
auto part = gko::share(part_type::build_from_global_size_uniform(
145+
ref, comm.size(), comm.size() * 3));
146+
gko::array<long> recv_connections[] = {{ref, {3, 5, 10, 11}},
147+
{ref, {0, 1, 7, 12, 13}},
148+
{ref, {3, 4, 17}},
149+
{ref, {1, 2, 12, 14}},
150+
{ref, {4, 5, 9, 10, 16, 15}},
151+
{ref, {8, 12, 13, 14}}};
152+
auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]};
153+
gko::experimental::mpi::DenseCommunicator spcomm{comm, imap};
154+
gko::array<long> recv_buffer{ref, recv_connections[rank].get_size()};
155+
gko::array<long> send_buffers[] = {{ref, {0, 1, 1, 2}},
156+
{ref, {3, 5, 3, 4, 4, 5}},
157+
{ref, {7, 8}},
158+
{ref, {10, 11, 9, 10}},
159+
{ref, {12, 13, 12, 14, 12, 13, 14}},
160+
{ref, {17, 16, 15}}};
161+
162+
auto req = spcomm.i_all_to_all_v(ref, send_buffers[rank].get_const_data(),
163+
recv_buffer.get_data());
164+
req.wait();
165+
166+
GKO_ASSERT_ARRAY_EQ(recv_buffer, recv_connections[rank]);
167+
}
168+
169+
170+
TEST_F(DenseCommunicator, CanCommunicateIalltoallWhenEmpty)
171+
{
172+
gko::experimental::mpi::DenseCommunicator spcomm{comm};
173+
174+
auto req = spcomm.i_all_to_all_v(ref, static_cast<int*>(nullptr),
175+
static_cast<int*>(nullptr));
176+
req.wait();
177+
}
178+
179+
180+
TEST_F(DenseCommunicator, CanCreateInverse)
181+
{
182+
auto part = gko::share(part_type::build_from_global_size_uniform(
183+
ref, comm.size(), comm.size() * 3));
184+
gko::array<long> recv_connections[] = {{ref, {3, 5, 10, 11}},
185+
{ref, {0, 1, 7, 12, 13}},
186+
{ref, {3, 4, 17}},
187+
{ref, {1, 2, 12, 14}},
188+
{ref, {4, 5, 9, 10, 16, 15}},
189+
{ref, {8, 12, 13, 14}}};
190+
auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]};
191+
gko::experimental::mpi::DenseCommunicator spcomm{comm, imap};
192+
193+
auto inverse = spcomm.create_inverse();
194+
195+
ASSERT_EQ(inverse->get_recv_size(), spcomm.get_send_size());
196+
ASSERT_EQ(inverse->get_send_size(), spcomm.get_recv_size());
197+
}
198+
199+
200+
TEST_F(DenseCommunicator, CanCommunicateRoundTrip)
201+
{
202+
auto part = gko::share(part_type::build_from_global_size_uniform(
203+
ref, comm.size(), comm.size() * 3));
204+
gko::array<long> recv_connections[] = {{ref, {3, 5, 10, 11}},
205+
{ref, {0, 1, 7, 12, 13}},
206+
{ref, {3, 4, 17}},
207+
{ref, {1, 2, 12, 14}},
208+
{ref, {4, 5, 9, 10, 16, 15}},
209+
{ref, {8, 12, 13, 14}}};
210+
auto imap = map_type{ref, part, comm.rank(), recv_connections[rank]};
211+
gko::experimental::mpi::DenseCommunicator spcomm{comm, imap};
212+
auto inverse = spcomm.create_inverse();
213+
gko::array<long> send_buffers[] = {{ref, {1, 2, 3, 4}},
214+
{ref, {5, 6, 7, 8, 9, 10}},
215+
{ref, {11, 12}},
216+
{ref, {13, 14, 15, 16}},
217+
{ref, {17, 18, 19, 20, 21, 22, 23}},
218+
{ref, {24, 25, 26}}};
219+
gko::array<long> recv_buffer{ref, recv_connections[rank].get_size()};
220+
gko::array<long> round_trip{ref, send_buffers[rank].get_size()};
221+
222+
spcomm
223+
.i_all_to_all_v(ref, send_buffers[rank].get_const_data(),
224+
recv_buffer.get_data())
225+
.wait();
226+
inverse
227+
->i_all_to_all_v(ref, recv_buffer.get_const_data(),
228+
round_trip.get_data())
229+
.wait();
230+
231+
GKO_ASSERT_ARRAY_EQ(send_buffers[rank], round_trip);
232+
}

0 commit comments

Comments
 (0)