Skip to content

Commit

Permalink
Make the QueryHub class simpler, and try to get rid of test stalls …
Browse files Browse the repository at this point in the history
…on MacOS. (#1205)

The old logic that was used to remove a Websocket distributor from the global registry after either the corresponding query was finished, or all listening connections were closed, was rather complicated. This was because of the combination of synchronously blocking code (destructors) and asynchronous code (the server module in general).
As of this commit, all these deletion operations are performed asynchronously, s.t. destructors never have to block. The correct behavior of this deferred deletions is ensured by Boost::ASIO's scheduling guarantees and is checked by several assertions.
This hopefully gets rid of some spurious test failures on MacOS where a test would be caught in a deadlock.
  • Loading branch information
joka921 authored Dec 22, 2023
1 parent 5479687 commit cdbe121
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 162 deletions.
12 changes: 3 additions & 9 deletions src/util/UniqueCleanup.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class UniqueCleanup {
/// Note: Make sure the function doesn't capture the this
/// pointer as this may lead to segfaults because the pointer
/// will point to the old object after moving.
UniqueCleanup(T&& value, Func function)
UniqueCleanup(T value, Func function)
: value_{std::move(value)}, function_{std::move(function)} {}

T& operator*() noexcept { return value_; }
Expand All @@ -47,14 +47,8 @@ class UniqueCleanup {

UniqueCleanup& operator=(UniqueCleanup&& cleanupDeleter) noexcept = default;

/// Runs the cleanup call preemptively by moving out of this instance and
/// immediately destructing the new instance
void invokeManuallyAndCancel() && {
if (active_) {
std::invoke(std::move(function_), std::move(value_));
active_ = false;
}
}
/// Disable the cleanup call without executing it.
void cancel() && { active_ = false; }

~UniqueCleanup() {
if (active_) {
Expand Down
50 changes: 31 additions & 19 deletions src/util/http/websocket/MessageSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,33 @@

namespace ad_utility::websocket {

MessageSender::MessageSender(DistributorAndOwningQueryId distributor,
net::any_io_executor executor)
: distributor_{std::move(distributor),
[executor](auto&& distributor) {
auto coroutine =
[](auto distributor) -> net::awaitable<void> {
// signalEnd() removes the distributor from the QueryHub.
// When the coroutine is destroyed afterwards the query
// id is unregistered from the registry by the destructor
// of `OwningQueryId`! This is the reason why the
// `OwningQueryId` is part of the struct but never
// actually accessed.
co_await distributor.distributor_->signalEnd();
};
net::co_spawn(executor, coroutine(AD_FWD(distributor)),
net::detached);
}},
MessageSender::MessageSender(
DistributorAndOwningQueryId distributorAndOwningQueryId,
net::any_io_executor executor)
: distributorAndOwningQueryId_{std::move(distributorAndOwningQueryId),
[executor](
auto&& distributorAndOwningQueryId) {
auto coroutine =
[](auto distributorAndOwningQueryId)
-> net::awaitable<void> {
// signalEnd() removes the
// distributorAndOwningQueryId from the
// QueryHub. When the coroutine is
// destroyed afterwards the query id is
// unregistered from the registry by the
// destructor of `OwningQueryId`! This is
// the reason why the `OwningQueryId` is
// part of the struct but never actually
// accessed.
co_await distributorAndOwningQueryId
.distributor_->signalEnd();
};
net::co_spawn(
executor,
coroutine(AD_FWD(
distributorAndOwningQueryId)),
net::detached);
}},
executor_{std::move(executor)} {}

// _____________________________________________________________________________
Expand All @@ -46,7 +56,9 @@ void MessageSender::operator()(std::string json) const {
// it is passed by value
co_await distributor->addQueryStatusUpdate(std::move(json));
};
net::co_spawn(executor_, lambda(distributor_->distributor_, std::move(json)),
net::detached);
net::co_spawn(
executor_,
lambda(distributorAndOwningQueryId_->distributor_, std::move(json)),
net::detached);
}
} // namespace ad_utility::websocket
9 changes: 5 additions & 4 deletions src/util/http/websocket/MessageSender.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ using unique_cleanup::UniqueCleanup;
/// from the non-asio world.
class MessageSender {
private:
/// Keep the OwningQueryId alive until distributor_->signalEnd() is called
/// (see the constructor of `MessageSender` for details).
/// Keep the OwningQueryId alive until
/// distributorAndOwningQueryId_->signalEnd() is called (see the constructor
/// of `MessageSender` for details).
struct DistributorAndOwningQueryId {
std::shared_ptr<QueryToSocketDistributor> distributor_;
OwningQueryId owningQueryId_;
Expand All @@ -35,7 +36,7 @@ class MessageSender {
: distributor_{std::move(distributor)},
owningQueryId_{std::move(owningQueryId)} {}
};
UniqueCleanup<DistributorAndOwningQueryId> distributor_;
UniqueCleanup<DistributorAndOwningQueryId> distributorAndOwningQueryId_;
net::any_io_executor executor_;

// This constructor is private because this instance should only ever be
Expand All @@ -57,7 +58,7 @@ class MessageSender {

/// Get read only view of underlying `QueryId`.
const QueryId& getQueryId() const noexcept {
return distributor_->owningQueryId_.toQueryId();
return distributorAndOwningQueryId_->owningQueryId_.toQueryId();
}
};

Expand Down
96 changes: 49 additions & 47 deletions src/util/http/websocket/QueryHub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,80 @@

namespace ad_utility::websocket {

template <bool isSender>
net::awaitable<std::shared_ptr<
QueryHub::ConditionalConst<isSender, QueryToSocketDistributor>>>
QueryHub::createOrAcquireDistributorInternalUnsafe(QueryId queryId) {
while (socketDistributors_->contains(queryId)) {
// Return a lambda that deletes the `queryId` from the `socketDistributors`, but
// only if it is either expired or `alwaysDelete` was specified.
inline auto makeDeleteFromDistributors =
[](auto socketDistributors, QueryId queryId, bool alwaysDelete) {
return [socketDistributors = std::move(socketDistributors),
queryId = std::move(queryId), alwaysDelete]() {
auto it = socketDistributors->find(queryId);
if (it == socketDistributors->end()) {
return;
}
bool expired = it->second.pointer_.expired();
// The branch `both of them are true` is currently not covered by tests
// and also not coverable, because the manual `signalEnd` call always
// comes before the destructor.
if (alwaysDelete || expired) {
socketDistributors->erase(it);
}
};
};

// _____________________________________________________________________________
net::awaitable<std::shared_ptr<QueryToSocketDistributor>>
QueryHub::createOrAcquireDistributorInternalUnsafe(QueryId queryId,
bool isSender) {
if (socketDistributors_->contains(queryId)) {
auto& reference = socketDistributors_->at(queryId);
if (auto ptr = reference.pointer_.lock()) {
if constexpr (isSender) {
if (isSender) {
// Ensure only single sender reference is acquired for a single session
AD_CONTRACT_CHECK(!reference.started_);
reference.started_ = true;
}
co_return ptr;
} else {
socketDistributors_->erase(queryId);
}
// There's the unlikely case where the reference counter reached zero and
// the weak pointer can no longer create a shared pointer, but the
// destructor is waiting for execution on `globalStrand_`. In this case
// re-schedule this coroutine to be executed after destruction. So it is
// crucial to use post over dispatch here.
co_await net::post(net::bind_executor(globalStrand_, net::use_awaitable));
}

// The cleanup call for the distributor.
// We pass a copy of the `shared_pointer socketDistributors_` here,
// because in unit tests the callback might be invoked after this
// `QueryHub` was destroyed.
auto cleanupCall = [globalStrand = globalStrand_,
socketDistributors = socketDistributors_, queryId,
alreadyCalled = false](bool alwaysDelete) mutable {
AD_CORRECTNESS_CHECK(!alreadyCalled);
alreadyCalled = true;
net::dispatch(globalStrand,
makeDeleteFromDistributors(std::move(socketDistributors),
std::move(queryId), alwaysDelete));
// We don't wait for the deletion to complete here, but only for its
// scheduling. We still get the expected behavior because all accesses
// to the `socketDistributor` are synchronized via a strand and
// BOOST::asio schedules in a FIFO manner.
};

auto distributor = std::make_shared<QueryToSocketDistributor>(
// We pass a copy of the `shared_pointer socketDistributors_` here,
// because in unit tests the callback might be invoked after this
// `QueryHub` was destroyed.
ioContext_, [&ioContext = ioContext_, globalStrand = globalStrand_,
socketDistributors = socketDistributors_, queryId]() {
auto future = net::dispatch(net::bind_executor(
globalStrand,
std::packaged_task<void()>([&socketDistributors, &queryId]() {
bool wasErased = socketDistributors->erase(queryId);
AD_CORRECTNESS_CHECK(wasErased);
})));
// As long as the destructor would have to block anyway, perform work
// on the `ioContext_`. This avoids blocking in case the destructor
// already runs inside the `ioContext_`.
// Note: When called on a strand this may block the current strand.
// If the ioContext has been stopped for some reason don't wait
// for the result, or this will never terminate.
while (future.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready &&
!ioContext.stopped()) {
ioContext.poll_one();
}
});
ioContext_, std::move(cleanupCall));
socketDistributors_->emplace(queryId,
WeakReferenceHolder{distributor, isSender});
co_return distributor;
}

// _____________________________________________________________________________

template <bool isSender>
net::awaitable<std::shared_ptr<
QueryHub::ConditionalConst<isSender, QueryToSocketDistributor>>>
QueryHub::createOrAcquireDistributorInternal(QueryId queryId) {
co_await net::post(net::bind_executor(globalStrand_, net::use_awaitable));
co_return co_await createOrAcquireDistributorInternalUnsafe<isSender>(
std::move(queryId));
co_return co_await createOrAcquireDistributorInternalUnsafe(
std::move(queryId), isSender);
}

// _____________________________________________________________________________

net::awaitable<std::shared_ptr<QueryToSocketDistributor>>
QueryHub::createOrAcquireDistributorForSending(QueryId queryId) {
return resumeOnOriginalExecutor(
Expand All @@ -86,12 +96,4 @@ QueryHub::createOrAcquireDistributorForReceiving(QueryId queryId) {
createOrAcquireDistributorInternal<false>(std::move(queryId)));
}

// _____________________________________________________________________________

// Clang does not seem to keep this instantiation around when called directly,
// so we need to explicitly instantiate it here for the
// QueryHub_testCorrectReschedulingForEmptyPointerOnDestruct test to compile
// properly
template net::awaitable<std::shared_ptr<const QueryToSocketDistributor>>
QueryHub::createOrAcquireDistributorInternalUnsafe<false>(QueryId);
} // namespace ad_utility::websocket
6 changes: 2 additions & 4 deletions src/util/http/websocket/QueryHub.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ class QueryHub {
/// Implementation of createOrAcquireDistributorForSending and
/// createOrAcquireDistributorForReceiving, without thread safety,
/// exposed for testing
template <bool isSender>
net::awaitable<
std::shared_ptr<ConditionalConst<isSender, QueryToSocketDistributor>>>
createOrAcquireDistributorInternalUnsafe(QueryId);
net::awaitable<std::shared_ptr<QueryToSocketDistributor>>
createOrAcquireDistributorInternalUnsafe(QueryId, bool isSender);

/// createOrAcquireDistributorInternalUnsafe, but dispatched on global strand
template <bool isSender>
Expand Down
15 changes: 13 additions & 2 deletions src/util/http/websocket/QueryToSocketDistributor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@

namespace ad_utility::websocket {

// _____________________________________________________________________________
QueryToSocketDistributor::QueryToSocketDistributor(
net::io_context& ioContext, const std::function<void(bool)>& cleanupCall)
: strand_{net::make_strand(ioContext)},
infiniteTimer_{strand_, static_cast<net::deadline_timer::time_type>(
boost::posix_time::pos_infin)},
cleanupCall_{
cleanupCall,
[](const auto& cleanupCall) { std::invoke(cleanupCall, false); }},
signalEndCall_{[cleanupCall] { std::invoke(cleanupCall, true); }} {}

net::awaitable<void> QueryToSocketDistributor::postToStrand() const {
return net::post(net::bind_executor(strand_, net::use_awaitable));
}

// _____________________________________________________________________________

net::awaitable<void> QueryToSocketDistributor::waitForUpdate() const {
auto [error] = co_await infiniteTimer_.async_wait(
net::bind_executor(strand_, net::as_tuple(net::use_awaitable)));
Expand Down Expand Up @@ -56,7 +66,8 @@ net::awaitable<void> QueryToSocketDistributor::signalEnd() {
finished_ = true;
wakeUpWaitingListeners();
// Invoke cleanup pre-emptively
std::move(cleanupCall_).invokeManuallyAndCancel();
signalEndCall_();
std::move(cleanupCall_).cancel();
}

// _____________________________________________________________________________
Expand Down
18 changes: 8 additions & 10 deletions src/util/http/websocket/QueryToSocketDistributor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class QueryToSocketDistributor {

/// Function to remove this distributor from the `QueryHub` when it is
/// destructed.
unique_cleanup::UniqueCleanup<std::function<void()>> cleanupCall_;
unique_cleanup::UniqueCleanup<std::function<void(bool)>> cleanupCall_;
std::function<void()> signalEndCall_;

/// Wakes up all websockets that are currently "blocked" and waiting for an
/// update of the given query. After being woken up they will check for
Expand All @@ -52,15 +53,12 @@ class QueryToSocketDistributor {
net::awaitable<void> postToStrand() const;

public:
/// Constructor that builds a new strand from the provided io context.
explicit QueryToSocketDistributor(net::io_context& ioContext,
std::function<void()> cleanupCall)
: strand_{net::make_strand(ioContext)},
infiniteTimer_{strand_, static_cast<net::deadline_timer::time_type>(
boost::posix_time::pos_infin)},
cleanupCall_{std::move(cleanupCall), [](const auto& cleanupCall) {
std::invoke(cleanupCall);
}} {}
/// Constructor that builds a new strand from the provided io context and
/// `cleanupCall`. The cleanup call is invoked with `true` as an argument when
/// `signalEnd()` is called and with `false` In the destructor if there where
/// no explicit calls to `signalEnd()` before.
explicit QueryToSocketDistributor(
net::io_context& ioContext, const std::function<void(bool)>& cleanupCall);

/// Appends specified data to the vector and signals all waiting websockets
/// that new data is available
Expand Down
Loading

0 comments on commit cdbe121

Please sign in to comment.