Skip to content

Commit 1ce75de

Browse files
authored
fix: prune and restore mempool transactions with equal nonces for the same sender (#2091)
1 parent c13fb8a commit 1ce75de

File tree

5 files changed

+203
-74
lines changed

5 files changed

+203
-74
lines changed

src/datastore/pg-write-store.ts

Lines changed: 110 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,5 @@
11
import * as assert from 'assert';
2-
import {
3-
getOrAdd,
4-
I32_MAX,
5-
getIbdBlockHeight,
6-
getUintEnvOrDefault,
7-
unwrapOptionalProp,
8-
} from '../helpers';
2+
import { getOrAdd, I32_MAX, getIbdBlockHeight, getUintEnvOrDefault } from '../helpers';
93
import {
104
DbBlock,
115
DbTx,
@@ -92,7 +86,6 @@ import { parseResolver, parseZoneFileTxt } from '../event-stream/bns/bns-helpers
9286
import { SyntheticPoxEventName } from '../pox-helpers';
9387
import { logger } from '../logger';
9488
import {
95-
PgJsonb,
9689
PgSqlClient,
9790
batchIterate,
9891
connectPostgres,
@@ -122,6 +115,8 @@ class MicroblockGapError extends Error {
122115
}
123116
}
124117

118+
type TransactionHeader = { txId: string; sender: string; nonce: number };
119+
125120
/**
126121
* Extends `PgStore` to provide data insertion functions. These added features are usually called by
127122
* the `EventServer` upon receiving blockchain events from a Stacks node. It also deals with chain data
@@ -208,8 +203,12 @@ export class PgWriteStore extends PgStore {
208203
if (!isCanonical) {
209204
markBlockUpdateDataAsNonCanonical(data);
210205
} else {
211-
const txIds = data.txs.map(d => d.tx.tx_id);
212-
await this.pruneMempoolTxs(sql, txIds);
206+
const prunableTxs: TransactionHeader[] = data.txs.map(d => ({
207+
txId: d.tx.tx_id,
208+
sender: d.tx.sender_address,
209+
nonce: d.tx.nonce,
210+
}));
211+
await this.pruneMempoolTxs(sql, prunableTxs);
213212
}
214213
setTotalBlockUpdateDataExecutionCost(data);
215214

@@ -245,7 +244,11 @@ export class PgWriteStore extends PgStore {
245244
);
246245
const restoredMempoolTxs = await this.restoreMempoolTxs(
247246
sql,
248-
orphanedAndMissingTxs.map(tx => tx.tx_id)
247+
orphanedAndMissingTxs.map(tx => ({
248+
txId: tx.tx_id,
249+
sender: tx.sender_address,
250+
nonce: tx.nonce,
251+
}))
249252
);
250253
restoredMempoolTxs.restoredTxs.forEach(txId => {
251254
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
@@ -688,15 +691,23 @@ export class PgWriteStore extends PgStore {
688691
// Restore any micro-orphaned txs into the mempool
689692
const restoredMempoolTxs = await this.restoreMempoolTxs(
690693
sql,
691-
microOrphanedTxs.map(tx => tx.tx_id)
694+
microOrphanedTxs.map(tx => ({
695+
txId: tx.tx_id,
696+
sender: tx.sender_address,
697+
nonce: tx.nonce,
698+
}))
692699
);
693700
restoredMempoolTxs.restoredTxs.forEach(txId => {
694701
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
695702
});
696703
}
697704

698-
const candidateTxIds = data.txs.map(d => d.tx.tx_id);
699-
const removedTxsResult = await this.pruneMempoolTxs(sql, candidateTxIds);
705+
const prunableTxs: TransactionHeader[] = data.txs.map(d => ({
706+
txId: d.tx.tx_id,
707+
sender: d.tx.sender_address,
708+
nonce: d.tx.nonce,
709+
}));
710+
const removedTxsResult = await this.pruneMempoolTxs(sql, prunableTxs);
700711
if (removedTxsResult.removedTxs.length > 0) {
701712
logger.debug(
702713
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table during microblock ingestion`
@@ -2471,9 +2482,9 @@ export class PgWriteStore extends PgStore {
24712482
`;
24722483
// Any txs restored need to be pruned from the mempool
24732484
const updatedMbTxs = updatedMbTxsQuery.map(r => parseTxQueryResult(r));
2474-
const txsToPrune = updatedMbTxs
2485+
const txsToPrune: TransactionHeader[] = updatedMbTxs
24752486
.filter(tx => tx.canonical && tx.microblock_canonical)
2476-
.map(tx => tx.tx_id);
2487+
.map(tx => ({ txId: tx.tx_id, sender: tx.sender_address, nonce: tx.nonce }));
24772488
const removedTxsResult = await this.pruneMempoolTxs(sql, txsToPrune);
24782489
if (removedTxsResult.removedTxs.length > 0) {
24792490
logger.debug(
@@ -2648,17 +2659,31 @@ export class PgWriteStore extends PgStore {
26482659
* marked from canonical to non-canonical.
26492660
* @param txIds - List of transactions to update in the mempool
26502661
*/
2651-
async restoreMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ restoredTxs: string[] }> {
2652-
if (txIds.length === 0) return { restoredTxs: [] };
2653-
for (const txId of txIds) {
2654-
logger.debug(`Restoring mempool tx: ${txId}`);
2655-
}
2656-
2662+
async restoreMempoolTxs(
2663+
sql: PgSqlClient,
2664+
transactions: TransactionHeader[]
2665+
): Promise<{ restoredTxs: string[] }> {
2666+
if (transactions.length === 0) return { restoredTxs: [] };
2667+
if (logger.isLevelEnabled('debug'))
2668+
for (const tx of transactions)
2669+
logger.debug(`Restoring mempool tx: ${tx.txId} sender: ${tx.sender} nonce: ${tx.nonce}`);
2670+
2671+
// Also restore transactions for the same `sender_address` with the same `nonce`.
2672+
const inputData = transactions.map(t => [t.txId.replace('0x', '\\x'), t.sender, t.nonce]);
26572673
const updatedRows = await sql<{ tx_id: string }[]>`
2658-
WITH restored AS (
2674+
WITH input_data (tx_id, sender_address, nonce) AS (VALUES ${sql(inputData)}),
2675+
affected_mempool_tx_ids AS (
2676+
SELECT m.tx_id
2677+
FROM mempool_txs AS m
2678+
INNER JOIN input_data AS i
2679+
ON m.sender_address = i.sender_address AND m.nonce = i.nonce::int
2680+
UNION
2681+
SELECT tx_id::bytea FROM input_data
2682+
),
2683+
restored AS (
26592684
UPDATE mempool_txs
2660-
SET pruned = FALSE, status = ${DbTxStatus.Pending}
2661-
WHERE tx_id IN ${sql(txIds)} AND pruned = TRUE
2685+
SET pruned = false, status = ${DbTxStatus.Pending}
2686+
WHERE pruned = true AND tx_id IN (SELECT DISTINCT tx_id FROM affected_mempool_tx_ids)
26622687
RETURNING tx_id
26632688
),
26642689
count_update AS (
@@ -2668,60 +2693,67 @@ export class PgWriteStore extends PgStore {
26682693
)
26692694
SELECT tx_id FROM restored
26702695
`;
2671-
2672-
const updatedTxs = updatedRows.map(r => r.tx_id);
2673-
for (const tx of updatedTxs) {
2674-
logger.debug(`Updated mempool tx: ${tx}`);
2675-
}
2676-
2677-
let restoredTxs = updatedRows.map(r => r.tx_id);
2678-
2679-
// txs that didnt exist in the mempool need to be inserted into the mempool
2680-
if (updatedRows.length < txIds.length) {
2681-
const txsRequiringInsertion = txIds.filter(txId => !updatedTxs.includes(txId));
2682-
2683-
logger.debug(`To restore mempool txs, ${txsRequiringInsertion.length} txs require insertion`);
2684-
2696+
const restoredTxIds = updatedRows.map(r => r.tx_id);
2697+
if (logger.isLevelEnabled('debug'))
2698+
for (const txId of restoredTxIds) logger.debug(`Restored mempool tx: ${txId}`);
2699+
2700+
// Transactions that didn't exist in the mempool need to be inserted into the mempool
2701+
const txIdsRequiringInsertion = transactions
2702+
.filter(tx => !restoredTxIds.includes(tx.txId))
2703+
.map(tx => tx.txId);
2704+
if (txIdsRequiringInsertion.length) {
2705+
logger.debug(
2706+
`To restore mempool txs, ${txIdsRequiringInsertion.length} txs require insertion`
2707+
);
26852708
const txs: TxQueryResult[] = await sql`
26862709
SELECT DISTINCT ON(tx_id) ${sql(TX_COLUMNS)}
26872710
FROM txs
2688-
WHERE tx_id IN ${sql(txsRequiringInsertion)}
2711+
WHERE tx_id IN ${sql(txIdsRequiringInsertion)}
26892712
ORDER BY tx_id, block_height DESC, microblock_sequence DESC, tx_index DESC
26902713
`;
2691-
2692-
if (txs.length !== txsRequiringInsertion.length) {
2714+
if (txs.length !== txIdsRequiringInsertion.length) {
26932715
logger.error(`Not all txs requiring insertion were found`);
26942716
}
26952717

26962718
const mempoolTxs = convertTxQueryResultToDbMempoolTx(txs);
2697-
26982719
await this.updateMempoolTxs({ mempoolTxs });
2699-
2700-
restoredTxs = [...restoredTxs, ...txsRequiringInsertion];
2701-
2702-
for (const tx of mempoolTxs) {
2703-
logger.debug(`Inserted mempool tx: ${tx.tx_id}`);
2704-
}
2720+
if (logger.isLevelEnabled('debug'))
2721+
for (const tx of mempoolTxs) logger.debug(`Inserted non-existing mempool tx: ${tx.tx_id}`);
27052722
}
27062723

2707-
return { restoredTxs: restoredTxs };
2724+
return { restoredTxs: [...restoredTxIds, ...txIdsRequiringInsertion] };
27082725
}
27092726

27102727
/**
27112728
* Remove transactions in the mempool table. This should be called when transactions are
27122729
* mined into a block.
27132730
* @param txIds - List of transactions to update in the mempool
27142731
*/
2715-
async pruneMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ removedTxs: string[] }> {
2716-
if (txIds.length === 0) return { removedTxs: [] };
2717-
for (const txId of txIds) {
2718-
logger.debug(`Pruning mempool tx: ${txId}`);
2719-
}
2732+
async pruneMempoolTxs(
2733+
sql: PgSqlClient,
2734+
transactions: TransactionHeader[]
2735+
): Promise<{ removedTxs: string[] }> {
2736+
if (transactions.length === 0) return { removedTxs: [] };
2737+
if (logger.isLevelEnabled('debug'))
2738+
for (const tx of transactions)
2739+
logger.debug(`Pruning mempool tx: ${tx.txId} sender: ${tx.sender} nonce: ${tx.nonce}`);
2740+
2741+
// Also prune transactions for the same `sender_address` with the same `nonce`.
2742+
const inputData = transactions.map(t => [t.txId.replace('0x', '\\x'), t.sender, t.nonce]);
27202743
const updateResults = await sql<{ tx_id: string }[]>`
2721-
WITH pruned AS (
2744+
WITH input_data (tx_id, sender_address, nonce) AS (VALUES ${sql(inputData)}),
2745+
affected_mempool_tx_ids AS (
2746+
SELECT m.tx_id
2747+
FROM mempool_txs AS m
2748+
INNER JOIN input_data AS i
2749+
ON m.sender_address = i.sender_address AND m.nonce = i.nonce::int
2750+
UNION
2751+
SELECT tx_id::bytea FROM input_data
2752+
),
2753+
pruned AS (
27222754
UPDATE mempool_txs
27232755
SET pruned = true
2724-
WHERE tx_id IN ${sql(txIds)} AND pruned = FALSE
2756+
WHERE pruned = false AND tx_id IN (SELECT DISTINCT tx_id FROM affected_mempool_tx_ids)
27252757
RETURNING tx_id
27262758
),
27272759
count_update AS (
@@ -2769,20 +2801,28 @@ export class PgWriteStore extends PgStore {
27692801
indexBlockHash: string,
27702802
canonical: boolean,
27712803
updatedEntities: ReOrgUpdatedEntities
2772-
): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> {
2773-
const result: { txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] } = {
2804+
): Promise<{
2805+
txsMarkedCanonical: TransactionHeader[];
2806+
txsMarkedNonCanonical: TransactionHeader[];
2807+
}> {
2808+
const result: {
2809+
txsMarkedCanonical: TransactionHeader[];
2810+
txsMarkedNonCanonical: TransactionHeader[];
2811+
} = {
27742812
txsMarkedCanonical: [],
27752813
txsMarkedNonCanonical: [],
27762814
};
27772815

27782816
const q = new PgWriteQueue();
27792817
q.enqueue(async () => {
2780-
const txResult = await sql<{ tx_id: string; update_balances_count: number }[]>`
2818+
const txResult = await sql<
2819+
{ tx_id: string; sender_address: string; nonce: number; update_balances_count: number }[]
2820+
>`
27812821
WITH updated_txs AS (
27822822
UPDATE txs
27832823
SET canonical = ${canonical}
27842824
WHERE index_block_hash = ${indexBlockHash} AND canonical != ${canonical}
2785-
RETURNING tx_id, sender_address, sponsor_address, fee_rate, sponsored, canonical
2825+
RETURNING tx_id, sender_address, nonce, sponsor_address, fee_rate, sponsored, canonical
27862826
),
27872827
affected_addresses AS (
27882828
SELECT
@@ -2817,22 +2857,26 @@ export class PgWriteStore extends PgStore {
28172857
SET balance = ft_balances.balance + EXCLUDED.balance
28182858
RETURNING ft_balances.address
28192859
)
2820-
SELECT tx_id, (SELECT COUNT(*)::int FROM update_ft_balances) AS update_balances_count
2860+
SELECT tx_id, sender_address, nonce, (SELECT COUNT(*)::int FROM update_ft_balances) AS update_balances_count
28212861
FROM updated_txs
28222862
`;
2823-
const txIds = txResult.map(row => row.tx_id);
2863+
const txs = txResult.map(row => ({
2864+
txId: row.tx_id,
2865+
sender: row.sender_address,
2866+
nonce: row.nonce,
2867+
}));
28242868
if (canonical) {
28252869
updatedEntities.markedCanonical.txs += txResult.count;
2826-
result.txsMarkedCanonical = txIds;
2870+
result.txsMarkedCanonical = txs;
28272871
} else {
28282872
updatedEntities.markedNonCanonical.txs += txResult.count;
2829-
result.txsMarkedNonCanonical = txIds;
2873+
result.txsMarkedNonCanonical = txs;
28302874
}
28312875
if (txResult.count) {
28322876
await sql`
28332877
UPDATE principal_stx_txs
28342878
SET canonical = ${canonical}
2835-
WHERE tx_id IN ${sql(txIds)}
2879+
WHERE tx_id IN ${sql(txs.map(t => t.txId))}
28362880
AND index_block_hash = ${indexBlockHash} AND canonical != ${canonical}
28372881
`;
28382882
}

tests/api/datastore.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4352,7 +4352,7 @@ describe('postgres datastore', () => {
43524352
microblock_sequence: undefined,
43534353
tx_count: 2, // Tx from block 2b now counts, but compensates with tx from block 2
43544354
tx_count_unanchored: 2,
4355-
mempool_tx_count: 1,
4355+
mempool_tx_count: 0,
43564356
});
43574357

43584358
const b1 = await db.getBlock({ hash: block1.block_hash });

0 commit comments

Comments
 (0)