Skip to content

Commit

Permalink
Add function to allow checking thread type
Browse files Browse the repository at this point in the history
Closes #4613

This change adds a function `threadIsType` function to `Application`
that allows a thread to check its type. This is intended to support more
detailed assertions than the usual `releaseAssert(threadIsMain())`
assertions we're currently using everywhere.

The implementation differs a bit from the proposed solution in #4613 as
it uses a mapping in `ApplicationImpl` to track thread types, rather
than using static variables. I chose this approach because as far as I
can tell, it's not possible to assign a thread an id.  Given that,
`ApplicationImpl` would need to set these variables in its constructor,
and the variables would hold meaningless values prior to that. I figure
it's safer to ensure that thread types can only be reasoned about after
the creation of the threads themselves in `ApplicationImpl`'s
constructor. However, if it's important that thread types be reasoned
about without an `Application` or `AppConnector`, then I'm open to
changing the design.
  • Loading branch information
bboston7 committed Feb 11, 2025
1 parent bf55e27 commit d1ae2d7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/main/AppConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ AppConnector::checkScheduledAndCache(
return mApp.getOverlayManager().checkScheduledAndCache(msgTracker);
}

bool
AppConnector::threadIsType(Application::ThreadType type) const
{
return mApp.threadIsType(type);
}

SearchableHotArchiveSnapshotConstPtr
AppConnector::copySearchableHotArchiveBucketListSnapshot()
{
Expand Down
3 changes: 2 additions & 1 deletion src/main/AppConnector.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#pragma once

#include "bucket/BucketUtils.h"
#include "main/Application.h"
#include "main/Config.h"
#include "medida/metrics_registry.h"

namespace stellar
{
class Application;
class OverlayManager;
class LedgerManager;
class Herder;
Expand Down Expand Up @@ -57,6 +57,7 @@ class AppConnector
checkScheduledAndCache(std::shared_ptr<CapacityTrackedMessage> msgTracker);
SorobanNetworkConfig const& getSorobanNetworkConfigReadOnly() const;
SorobanNetworkConfig const& getSorobanNetworkConfigForApply() const;
bool threadIsType(Application::ThreadType type) const;

medida::MetricsRegistry& getMetrics() const;
SearchableHotArchiveSnapshotConstPtr
Expand Down
13 changes: 13 additions & 0 deletions src/main/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ class Application
APP_NUM_STATE
};

// Types of threads that may be running
enum class ThreadType
{
MAIN,
WORKER,
EVICTION,
OVERLAY,
APPLY
};

virtual ~Application(){};

virtual void initialize(bool createNewDB, bool forceRebuild) = 0;
Expand Down Expand Up @@ -330,6 +340,9 @@ class Application
return ret;
}

// Returns true iff the calling thread has the same type as `type`
virtual bool threadIsType(ThreadType type) const = 0;

virtual AppConnector& getAppConnector() = 0;

protected:
Expand Down
14 changes: 14 additions & 0 deletions src/main/ApplicationImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,14 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
releaseAssert(mConfig.WORKER_THREADS > 0);
releaseAssert(mEvictionIOContext);

mThreadTypes[std::this_thread::get_id()] = ThreadType::MAIN;

// Allocate one thread for Eviction scan
mEvictionThread = std::thread{[this]() {
runCurrentThreadWithMediumPriority();
mEvictionIOContext->run();
}};
mThreadTypes[mEvictionThread->get_id()] = ThreadType::EVICTION;

--t;

Expand All @@ -174,19 +177,22 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg)
runCurrentThreadWithLowPriority();
mWorkerIOContext.run();
}};
mThreadTypes[thread.get_id()] = ThreadType::WORKER;
mWorkerThreads.emplace_back(std::move(thread));
}

if (mConfig.BACKGROUND_OVERLAY_PROCESSING)
{
// Keep priority unchanged as overlay processes time-sensitive tasks
mOverlayThread = std::thread{[this]() { mOverlayIOContext->run(); }};
mThreadTypes[mOverlayThread->get_id()] = ThreadType::OVERLAY;
}

if (mConfig.parallelLedgerClose())
{
mLedgerCloseThread =
std::thread{[this]() { mLedgerCloseIOContext->run(); }};
mThreadTypes[mLedgerCloseThread->get_id()] = ThreadType::APPLY;
}
}

Expand Down Expand Up @@ -1197,6 +1203,14 @@ ApplicationImpl::getMetrics()
return *mMetrics;
}

bool
ApplicationImpl::threadIsType(ThreadType type) const
{
auto it = mThreadTypes.find(std::this_thread::get_id());
releaseAssert(it != mThreadTypes.end());
return it->second == type;
}

void
ApplicationImpl::syncOwnMetrics()
{
Expand Down
7 changes: 7 additions & 0 deletions src/main/ApplicationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class ApplicationImpl : public Application
manualClose(std::optional<uint32_t> const& manualLedgerSeq,
std::optional<TimePoint> const& manualCloseTime) override;

bool threadIsType(ThreadType type) const override;

#ifdef BUILD_TESTS
virtual void generateLoad(GeneratedLoadConfig cfg) override;

Expand Down Expand Up @@ -220,6 +222,11 @@ class ApplicationImpl : public Application
// thread for eviction scans.
std::optional<std::thread> mEvictionThread;

// NOTE: It is important that this map not be updated outside of the
// constructor. `unordered_map` is safe for multiple threads to read from,
// so long as there are no concurrent writers.
std::unordered_map<std::thread::id, Application::ThreadType> mThreadTypes;

asio::signal_set mStopSignals;

bool mStarted;
Expand Down
3 changes: 2 additions & 1 deletion src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ void
Peer::maybeExecuteInBackground(std::string const& jobName,
std::function<void(std::shared_ptr<Peer>)> f)
{
if (useBackgroundThread() && threadIsMain())
if (useBackgroundThread() &&
!mAppConnector.threadIsType(Application::ThreadType::OVERLAY))
{
mAppConnector.postOnOverlayThread(
[self = shared_from_this(), f]() { f(self); }, jobName);
Expand Down

0 comments on commit d1ae2d7

Please sign in to comment.