Skip to content

Commit 521fe47

Browse files
authored
adding message type to asynchronous send/recv (#28)
1 parent c83395c commit 521fe47

File tree

3 files changed

+56
-16
lines changed

3 files changed

+56
-16
lines changed

Diff for: include/faabric/scheduler/MpiWorld.h

+9-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ class MpiWorld
6060
int recvRank,
6161
const uint8_t* buffer,
6262
faabric_datatype_t* dataType,
63-
int count);
63+
int count,
64+
faabric::MPIMessage::MPIMessageType messageType =
65+
faabric::MPIMessage::NORMAL);
6466

6567
void broadcast(int sendRank,
6668
const uint8_t* buffer,
@@ -82,7 +84,9 @@ class MpiWorld
8284
int recvRank,
8385
uint8_t* buffer,
8486
faabric_datatype_t* dataType,
85-
int count);
87+
int count,
88+
faabric::MPIMessage::MPIMessageType messageType =
89+
faabric::MPIMessage::NORMAL);
8690

8791
void awaitAsyncRequest(int requestId);
8892

@@ -220,7 +224,9 @@ class MpiWorld
220224
const uint8_t* sendBuffer,
221225
uint8_t* recvBuffer,
222226
faabric_datatype_t* dataType,
223-
int count);
227+
int count,
228+
faabric::MPIMessage::MPIMessageType messageType =
229+
faabric::MPIMessage::NORMAL);
224230

225231
void pushToState();
226232
};

Diff for: src/scheduler/MpiWorld.cpp

+34-13
Original file line numberDiff line numberDiff line change
@@ -227,31 +227,47 @@ int MpiWorld::isend(int sendRank,
227227
int recvRank,
228228
const uint8_t* buffer,
229229
faabric_datatype_t* dataType,
230-
int count)
230+
int count,
231+
faabric::MPIMessage::MPIMessageType messageType)
231232
{
232-
return doISendRecv(sendRank, recvRank, buffer, nullptr, dataType, count);
233+
return doISendRecv(
234+
sendRank, recvRank, buffer, nullptr, dataType, count, messageType);
233235
}
234236

235237
int MpiWorld::doISendRecv(int sendRank,
236238
int recvRank,
237239
const uint8_t* sendBuffer,
238240
uint8_t* recvBuffer,
239241
faabric_datatype_t* dataType,
240-
int count)
242+
int count,
243+
faabric::MPIMessage::MPIMessageType messageType)
241244
{
242245

243246
int requestId = (int)faabric::util::generateGid();
244247

245248
// Spawn a thread to do the work
246249
asyncThreadMap.insert(std::pair<int, std::thread>(
247250
requestId,
248-
[this, sendRank, recvRank, sendBuffer, recvBuffer, dataType, count] {
251+
[this,
252+
sendRank,
253+
recvRank,
254+
sendBuffer,
255+
recvBuffer,
256+
dataType,
257+
count,
258+
messageType] {
249259
// Do the operation (i.e. the underlying synchronous send/ receive)
250260
if (recvBuffer == nullptr) {
251-
this->send(sendRank, recvRank, sendBuffer, dataType, count);
261+
this->send(
262+
sendRank, recvRank, sendBuffer, dataType, count, messageType);
252263
} else {
253-
this->recv(
254-
sendRank, recvRank, recvBuffer, dataType, count, nullptr);
264+
this->recv(sendRank,
265+
recvRank,
266+
recvBuffer,
267+
dataType,
268+
count,
269+
nullptr,
270+
messageType);
255271
}
256272
}));
257273

@@ -335,16 +351,19 @@ void MpiWorld::sendRecv(uint8_t* sendBuffer,
335351
}
336352

337353
// Post async recv
338-
int recvId = irecv(recvRank, sendRank, recvBuffer, recvDataType, recvCount);
354+
int recvId = irecv(recvRank,
355+
sendRank,
356+
recvBuffer,
357+
recvDataType,
358+
recvCount,
359+
faabric::MPIMessage::SENDRECV);
339360
// Then send the message
340-
// TODO change MPIMessage to MPIMessage::SENDRECV. This requires a change
341-
// in the signature of doISendRecv.
342361
send(sendRank,
343362
recvRank,
344363
sendBuffer,
345364
sendDataType,
346365
sendCount,
347-
faabric::MPIMessage::NORMAL);
366+
faabric::MPIMessage::SENDRECV);
348367
// And wait
349368
awaitAsyncRequest(recvId);
350369
}
@@ -553,9 +572,11 @@ int MpiWorld::irecv(int sendRank,
553572
int recvRank,
554573
uint8_t* buffer,
555574
faabric_datatype_t* dataType,
556-
int count)
575+
int count,
576+
faabric::MPIMessage::MPIMessageType messageType)
557577
{
558-
return doISendRecv(sendRank, recvRank, nullptr, buffer, dataType, count);
578+
return doISendRecv(
579+
sendRank, recvRank, nullptr, buffer, dataType, count, messageType);
559580
}
560581

561582
void MpiWorld::recv(int sendRank,

Diff for: tests/test/scheduler/test_mpi_world.cpp

+13
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,19 @@ TEST_CASE("Test send and recv on same host", "[mpi]")
227227
REQUIRE(status.MPI_SOURCE == rankA1);
228228
REQUIRE(status.bytesSize == messageData.size() * sizeof(int));
229229
}
230+
231+
SECTION("Test recv with type missmatch")
232+
{
233+
// Receive a message from a different type
234+
auto buffer = new int[messageData.size()];
235+
REQUIRE_THROWS(world.recv(rankA1,
236+
rankA2,
237+
BYTES(buffer),
238+
MPI_INT,
239+
messageData.size(),
240+
nullptr,
241+
faabric::MPIMessage::SENDRECV));
242+
}
230243
}
231244

232245
TEST_CASE("Test sendrecv", "[mpi]")

0 commit comments

Comments
 (0)