From 3c79d7a7c7b7a42cdd7c64e74393bd0966847b22 Mon Sep 17 00:00:00 2001 From: Stefanie Reuter Date: Tue, 11 Feb 2025 08:54:37 +0000 Subject: [PATCH] Removed Domain from interal flush, made Domain and Mask selectable --- src/multio/action/aggregate/Aggregate.cc | 15 +++++++++++++-- src/multio/action/select/Select.cc | 2 +- src/multio/api/c/multio_capi.cc | 10 ++-------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/multio/action/aggregate/Aggregate.cc b/src/multio/action/aggregate/Aggregate.cc index 6eacb82dd..1b0fd3bac 100644 --- a/src/multio/action/aggregate/Aggregate.cc +++ b/src/multio/action/aggregate/Aggregate.cc @@ -61,11 +61,22 @@ auto Aggregate::flushCount(const Message& msg) { bool Aggregate::handleFlush(const Message& msg) { // Initialise if need be util::ScopedTiming timing{statistics_.actionTiming_}; + // to allow flushes coming without a domain to direclty pass through. + auto domain = msg.metadata().getOpt("domain"); + if(!domain) { + return true; + } + // get domain info if existant + const auto& domainMap = domain::Mappings::instance().get(*domain); - const auto& domainMap = domain::Mappings::instance().get(msg.domain()); auto flCount = flushCount(msg); - return domainMap.isComplete() && flCount == domainMap.size(); + if((domainMap.isComplete() && flCount == domainMap.size()) == true){ + //if complete, pass through and reset counter + flushes_.erase(msg.fieldId()); + return true; + } + return false; } bool Aggregate::allPartsArrived(const Message& msg) const { diff --git a/src/multio/action/select/Select.cc b/src/multio/action/select/Select.cc index 56a1ef84f..eca487433 100644 --- a/src/multio/action/select/Select.cc +++ b/src/multio/action/select/Select.cc @@ -28,7 +28,7 @@ Select::Select(const ComponentConfiguration& compConf) : void Select::executeImpl(Message msg) { //pass through action for everything that is not a field, e.g. Flush - if (matches(msg) || (msg.tag() != message::Message::Tag::Field)) { + if (matches(msg) || ((msg.tag() != message::Message::Tag::Field) || (msg.tag() != message::Message::Tag::Mask) || (msg.tag() != message::Message::Tag::Domain)) ) { executeNext(std::move(msg)); } } diff --git a/src/multio/api/c/multio_capi.cc b/src/multio/api/c/multio_capi.cc index 1674395de..39e0d644b 100644 --- a/src/multio/api/c/multio_capi.cc +++ b/src/multio/api/c/multio_capi.cc @@ -460,11 +460,6 @@ int multio_delete_handle(multio_handle_t* mio) { return wrapApiFunction([mio]() { ASSERT(mio); - multio::message::Metadata md; - md.set("flushKind", "end-of-simulation"); - - mio->dispatch(std::move(md), eckit::Buffer{0}, Message::Tag::Flush); - // TODO add sleep delete mio; }); @@ -508,8 +503,8 @@ int multio_close_connections(multio_handle_t* mio) { ASSERT(mio); multio::message::Metadata md; - md.set("flushKind", "end-of-simulation"); - + md.set("flushKind", "close-connection"); + md.set("toAllServers",true); mio->dispatch(std::move(md), eckit::Buffer{0}, Message::Tag::Flush); mio->closeConnections(); @@ -526,7 +521,6 @@ int multio_flush(multio_handle_t* mio, multio_metadata_t* md) { [mio, md]() { ASSERT(mio); ASSERT(md); - mio->dispatch(md->md, multio::message::PayloadReference{nullptr, 0}, Message::Tag::Flush); }, mio);