Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Background tryAdd functionality in TransactionQueue #4617

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
8 changes: 8 additions & 0 deletions src/herder/Herder.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,13 @@ class Herder
virtual TransactionFrameBaseConstPtr getTx(Hash const& hash) const = 0;

virtual void beginApply() = 0;

// TODO: Docs
virtual TransactionQueuesPtr getTransactionQueues() const = 0;

// TODO: Docs
static TransactionQueue::AddResult
recvTransaction(TransactionQueuesPtr txQueues, TransactionFrameBasePtr tx,
bool submittedFromSelf);
};
}
157 changes: 87 additions & 70 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app)
}

HerderImpl::HerderImpl(Application& app)
: mTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
TRANSACTION_QUEUE_SIZE_MULTIPLIER)
, mPendingEnvelopes(app, *this)
: mPendingEnvelopes(app, *this)
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes)
, mLastSlotSaved(0)
, mTrackingTimer(app)
Expand Down Expand Up @@ -275,11 +272,7 @@ HerderImpl::shutdown()
"Shutdown interrupting quorum transitive closure analysis.");
mLastQuorumMapIntersectionState.mInterruptFlag = true;
}
mTransactionQueue.shutdown();
if (mSorobanTransactionQueue)
{
mSorobanTransactionQueue->shutdown();
}
mTransactionQueues->shutdown();

mTxSetGarbageCollectTimer.cancel();
}
Expand Down Expand Up @@ -589,22 +582,35 @@ HerderImpl::emitEnvelope(SCPEnvelope const& envelope)
broadcast(envelope);
}

// TODO: Move to Herder.cpp?
TransactionQueue::AddResult
HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
Herder::recvTransaction(TransactionQueuesPtr txQueues,
TransactionFrameBasePtr tx, bool submittedFromSelf)
{
ZoneScoped;
ClassicTransactionQueue& classicTxQueue =
txQueues->getClassicTransactionQueue();
TransactionQueue::AddResult result(
TransactionQueue::AddResultCode::ADD_STATUS_COUNT);

// Allow txs of the same kind to reach the tx queue in case it can be
// replaced by fee
// TODO: Is there a potential TOCTOU issue here as sourceAccountPending
// could change before adding? I think no because the other competing thread
// would be whatever is handling ledger close. However, that will only
// decrease the sourceAccountPending value, which means this erroneously
// rejects (which is safe). I guess it's possible for a user-submitted
// transaction to come in and conflict with the tx queue thread, but that
// would require them to be simultaneously running two clients and
// submitting from both of them. Still, it might be safest to use some kind
// of atomic function that handles both this check AND the add.
bool hasSoroban =
mSorobanTransactionQueue &&
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
txQueues->hasSorobanTransactionQueue() &&
txQueues->getSorobanTransactionQueue().sourceAccountPending(
tx->getSourceID()) &&
!tx->isSoroban();
bool hasClassic =
mTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
tx->isSoroban();
bool hasClassic = classicTxQueue.sourceAccountPending(tx->getSourceID()) &&
tx->isSoroban();
if (hasSoroban || hasClassic)
{
CLOG_DEBUG(Herder,
Expand All @@ -617,11 +623,12 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
}
else if (!tx->isSoroban())
{
result = mTransactionQueue.tryAdd(tx, submittedFromSelf);
result = classicTxQueue.tryAdd(tx, submittedFromSelf);
}
else if (mSorobanTransactionQueue)
else if (txQueues->hasSorobanTransactionQueue())
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
result = txQueues->getSorobanTransactionQueue().tryAdd(
tx, submittedFromSelf);
}
else
{
Expand All @@ -641,6 +648,13 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
return result;
}

TransactionQueue::AddResult
HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
{
ZoneScoped;
return Herder::recvTransaction(mTransactionQueues, tx, submittedFromSelf);
}

bool
HerderImpl::checkCloseTime(SCPEnvelope const& envelope, bool enforceRecent)
{
Expand Down Expand Up @@ -923,13 +937,7 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq,
bool
HerderImpl::sourceAccountPending(AccountID const& accountID) const
{
bool accPending = mTransactionQueue.sourceAccountPending(accountID);
if (mSorobanTransactionQueue)
{
accPending = accPending ||
mSorobanTransactionQueue->sourceAccountPending(accountID);
}
return accPending;
return mTransactionQueues->sourceAccountPending(accountID);
}

#endif
Expand Down Expand Up @@ -1092,13 +1100,12 @@ HerderImpl::getPendingEnvelopes()
ClassicTransactionQueue&
HerderImpl::getTransactionQueue()
{
return mTransactionQueue;
return mTransactionQueues->getClassicTransactionQueue();
}
SorobanTransactionQueue&
HerderImpl::getSorobanTransactionQueue()
{
releaseAssert(mSorobanTransactionQueue);
return *mSorobanTransactionQueue;
return mTransactionQueues->getSorobanTransactionQueue();
}
#endif

Expand Down Expand Up @@ -1391,14 +1398,16 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
// it's guaranteed to be up-to-date
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
PerPhaseTransactionList txPhases;
txPhases.emplace_back(mTransactionQueue.getTransactions(lcl.header));
txPhases.emplace_back(
mTransactionQueues->getClassicTransactionQueue().getTransactions(
lcl.header));

if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
{
releaseAssert(mSorobanTransactionQueue);
txPhases.emplace_back(
mSorobanTransactionQueue->getTransactions(lcl.header));
mTransactionQueues->getSorobanTransactionQueue().getTransactions(
lcl.header));
}

// We pick as next close time the current time unless it's before the last
Expand Down Expand Up @@ -1465,12 +1474,11 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
{
releaseAssert(mSorobanTransactionQueue);
mSorobanTransactionQueue->ban(
mTransactionQueues->getSorobanTransactionQueue().ban(
invalidTxPhases[static_cast<size_t>(TxSetPhase::SOROBAN)]);
}

mTransactionQueue.ban(
mTransactionQueues->getClassicTransactionQueue().ban(
invalidTxPhases[static_cast<size_t>(TxSetPhase::CLASSIC)]);

auto txSetHash = proposedSet->getContentsHash();
Expand Down Expand Up @@ -2170,16 +2178,18 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
{
if (protocolVersionStartsFrom(protocolVersion, SOROBAN_PROTOCOL_VERSION))
{
if (!mSorobanTransactionQueue)
if (!mTransactionQueues->hasSorobanTransactionQueue())
{
mSorobanTransactionQueue =
releaseAssert(mTxQueueBucketSnapshot);
mTransactionQueues->setSorobanTransactionQueue(
std::make_unique<SorobanTransactionQueue>(
mApp, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
mApp, mTxQueueBucketSnapshot,
TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER);
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER));
}
}
else if (mSorobanTransactionQueue)
else if (mTransactionQueues->hasSorobanTransactionQueue())
{
throw std::runtime_error(
"Invalid state: Soroban queue initialized before v20");
Expand All @@ -2189,6 +2199,15 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
void
HerderImpl::start()
{
releaseAssert(!mTxQueueBucketSnapshot);
mTxQueueBucketSnapshot = mApp.getBucketManager()
.getBucketSnapshotManager()
.copySearchableLiveBucketListSnapshot();
mTransactionQueues->setClassicTransactionQueue(
std::make_unique<ClassicTransactionQueue>(
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER));

mMaxTxSize = mApp.getHerder().getMaxClassicTxSize();
{
uint32_t version = mApp.getLedgerManager()
Expand Down Expand Up @@ -2333,33 +2352,34 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)

auto lhhe = mLedgerManager.getLastClosedLedgerHeader();

auto updateQueue = [&](auto& queue, auto const& applied) {
queue.removeApplied(applied);
queue.shift();

auto txs = queue.getTransactions(lhhe.header);

auto invalidTxs = TxSetUtils::getInvalidTxList(
auto filterInvalidTxs = [&](TxFrameList const& txs) {
return TxSetUtils::getInvalidTxList(
txs, mApp, 0,
getUpperBoundCloseTimeOffset(mApp, lhhe.header.scpValue.closeTime));
queue.ban(invalidTxs);

queue.rebroadcast();
getUpperBoundCloseTimeOffset(mApp.getAppConnector(),
lhhe.header.scpValue.closeTime));
};
// Update bucket list snapshot, if needed. Note that this modifies the
// pointer itself on update, so we need to pass the potentially new pointer
// to the tx queues.
mApp.getBucketManager()
.getBucketSnapshotManager()
.maybeCopySearchableBucketListSnapshot(mTxQueueBucketSnapshot);
if (txsPerPhase.size() > static_cast<size_t>(TxSetPhase::CLASSIC))
{
updateQueue(mTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)]);
mTransactionQueues->getClassicTransactionQueue().update(
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}

// Even if we're in protocol 20, still check for number of phases, in case
// we're dealing with the upgrade ledger that contains old-style transaction
// set
if (mSorobanTransactionQueue != nullptr &&
if (mTransactionQueues->hasSorobanTransactionQueue() &&
txsPerPhase.size() > static_cast<size_t>(TxSetPhase::SOROBAN))
{
updateQueue(*mSorobanTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)]);
mTransactionQueues->getSorobanTransactionQueue().update(
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}
}

Expand Down Expand Up @@ -2476,37 +2496,34 @@ HerderImpl::isNewerNominationOrBallotSt(SCPStatement const& oldSt,
size_t
HerderImpl::getMaxQueueSizeOps() const
{
return mTransactionQueue.getMaxQueueSizeOps();
return mTransactionQueues->getClassicTransactionQueue()
.getMaxQueueSizeOps();
}

size_t
HerderImpl::getMaxQueueSizeSorobanOps() const
{
return mSorobanTransactionQueue
? mSorobanTransactionQueue->getMaxQueueSizeOps()
return mTransactionQueues->hasSorobanTransactionQueue()
? mTransactionQueues->getSorobanTransactionQueue()
.getMaxQueueSizeOps()
: 0;
}

bool
HerderImpl::isBannedTx(Hash const& hash) const
{
auto banned = mTransactionQueue.isBanned(hash);
if (mSorobanTransactionQueue)
{
banned = banned || mSorobanTransactionQueue->isBanned(hash);
}
return banned;
return mTransactionQueues->isBanned(hash);
}

TransactionFrameBaseConstPtr
HerderImpl::getTx(Hash const& hash) const
{
auto classic = mTransactionQueue.getTx(hash);
if (!classic && mSorobanTransactionQueue)
{
return mSorobanTransactionQueue->getTx(hash);
}
return classic;
return mTransactionQueues->getTx(hash);
}

TransactionQueuesPtr HerderImpl::getTransactionQueues() const {
releaseAssert(mTransactionQueues);
return mTransactionQueues;
}

}
9 changes: 7 additions & 2 deletions src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ class HerderImpl : public Herder

virtual void beginApply() override;

TransactionQueuesPtr getTransactionQueues() const override;

void startTxSetGCTimer();

#ifdef BUILD_TESTS
Expand Down Expand Up @@ -248,8 +250,8 @@ class HerderImpl : public Herder
void purgeOldPersistedTxSets();
void writeDebugTxSet(LedgerCloseData const& lcd);

ClassicTransactionQueue mTransactionQueue;
std::unique_ptr<SorobanTransactionQueue> mSorobanTransactionQueue;
TransactionQueuesPtr const mTransactionQueues =
std::make_shared<TransactionQueues>();

void updateTransactionQueue(TxSetXDRFrameConstPtr txSet);
void maybeSetupSorobanQueue(uint32_t protocolVersion);
Expand Down Expand Up @@ -301,6 +303,9 @@ class HerderImpl : public Herder
Application& mApp;
LedgerManager& mLedgerManager;

// Bucket list snapshot to use for transaction queues
SearchableSnapshotConstPtr mTxQueueBucketSnapshot;

struct SCPMetrics
{
medida::Meter& mLostSync;
Expand Down
Loading