Skip to content

Commit

Permalink
Removed Domain from interal flush, made Domain and Mask selectable
Browse files Browse the repository at this point in the history
  • Loading branch information
stefaniereuter committed Feb 11, 2025
1 parent d5885e3 commit 3c79d7a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
15 changes: 13 additions & 2 deletions src/multio/action/aggregate/Aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,22 @@ auto Aggregate::flushCount(const Message& msg) {
bool Aggregate::handleFlush(const Message& msg) {
// Initialise if need be
util::ScopedTiming timing{statistics_.actionTiming_};
// to allow flushes coming without a domain to direclty pass through.
auto domain = msg.metadata().getOpt<std::string>("domain");
if(!domain) {
return true;
}
// get domain info if existant
const auto& domainMap = domain::Mappings::instance().get(*domain);

const auto& domainMap = domain::Mappings::instance().get(msg.domain());
auto flCount = flushCount(msg);

return domainMap.isComplete() && flCount == domainMap.size();
if((domainMap.isComplete() && flCount == domainMap.size()) == true){
//if complete, pass through and reset counter
flushes_.erase(msg.fieldId());
return true;
}
return false;
}

bool Aggregate::allPartsArrived(const Message& msg) const {
Expand Down
2 changes: 1 addition & 1 deletion src/multio/action/select/Select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Select::Select(const ComponentConfiguration& compConf) :

void Select::executeImpl(Message msg) {
//pass through action for everything that is not a field, e.g. Flush
if (matches(msg) || (msg.tag() != message::Message::Tag::Field)) {
if (matches(msg) || ((msg.tag() != message::Message::Tag::Field) || (msg.tag() != message::Message::Tag::Mask) || (msg.tag() != message::Message::Tag::Domain)) ) {
executeNext(std::move(msg));
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/multio/api/c/multio_capi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,6 @@ int multio_delete_handle(multio_handle_t* mio) {
return wrapApiFunction([mio]() {
ASSERT(mio);

multio::message::Metadata md;
md.set("flushKind", "end-of-simulation");

mio->dispatch(std::move(md), eckit::Buffer{0}, Message::Tag::Flush);

// TODO add sleep
delete mio;
});
Expand Down Expand Up @@ -508,8 +503,8 @@ int multio_close_connections(multio_handle_t* mio) {
ASSERT(mio);

multio::message::Metadata md;
md.set("flushKind", "end-of-simulation");

md.set("flushKind", "close-connection");
md.set("toAllServers",true);
mio->dispatch(std::move(md), eckit::Buffer{0}, Message::Tag::Flush);

mio->closeConnections();
Expand All @@ -526,7 +521,6 @@ int multio_flush(multio_handle_t* mio, multio_metadata_t* md) {
[mio, md]() {
ASSERT(mio);
ASSERT(md);

mio->dispatch(md->md, multio::message::PayloadReference{nullptr, 0}, Message::Tag::Flush);
},
mio);
Expand Down

0 comments on commit 3c79d7a

Please sign in to comment.