Skip to content

Fix/804 #817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 22, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 114 additions & 76 deletions src/datastore/postgres-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1034,8 +1034,8 @@ export class PgDataStore
const candidateTxIds = data.txs.map(d => d.tx.tx_id);
const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds);
if (removedTxsResult.removedTxs.length > 0) {
logger.debug(
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table`
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} microblock-txs from mempool table during microblock ingestion`
);
}
});
Expand Down Expand Up @@ -1068,7 +1068,9 @@ export class PgDataStore
const candidateTxIds = data.txs.map(d => d.tx.tx_id);
const removedTxsResult = await this.pruneMempoolTxs(client, candidateTxIds);
if (removedTxsResult.removedTxs.length > 0) {
logger.debug(`Removed ${removedTxsResult.removedTxs.length} txs from mempool table`);
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during new block ingestion`
);
}
}

Expand Down Expand Up @@ -1110,6 +1112,8 @@ export class PgDataStore
data.block.execution_cost_write_count = totalCost.execution_cost_write_count;
data.block.execution_cost_write_length = totalCost.execution_cost_write_length;

let batchedTxData: DataStoreTxEventData[] = data.txs;

// Find microblocks that weren't already inserted via the unconfirmed microblock event.
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
if (data.microblocks.length > 0) {
Expand All @@ -1136,44 +1140,50 @@ export class PgDataStore
const missingTxs = data.txs.filter(entry =>
missingMicroblockHashes.has(entry.tx.microblock_hash)
);
// TODO(mb): the microblock code after this line should take into account this already inserted confirmed microblock data,
// right now it performs redundant updates, blindly treating all microblock txs as unconfirmed.
await this.insertMicroblockData(client, missingMicroblocks, missingTxs);

// Clear already inserted microblock txs from the anchor-block update data to avoid duplicate inserts.
batchedTxData = batchedTxData.filter(entry => {
return !missingMicroblockHashes.has(entry.tx.microblock_hash);
});
}
}

let batchedTxData: DataStoreTxEventData[] = data.txs;
const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical(
client,
{
isCanonical: isCanonical,
blockHeight: data.block.block_height,
blockHash: data.block.block_hash,
indexBlockHash: data.block.index_block_hash,
parentIndexBlockHash: data.block.parent_index_block_hash,
parentMicroblockHash: data.block.parent_microblock_hash,
parentMicroblockSequence: data.block.parent_microblock_sequence,
burnBlockTime: data.block.burn_block_time,
}
);
// When processing an immediately-non-canonical block, do not orphan and possible existing microblocks
// which may be still considered canonical by the canonical block at this height.
if (isCanonical) {
const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this.updateMicroCanonical(
client,
{
isCanonical: isCanonical,
blockHeight: data.block.block_height,
blockHash: data.block.block_hash,
indexBlockHash: data.block.index_block_hash,
parentIndexBlockHash: data.block.parent_index_block_hash,
parentMicroblockHash: data.block.parent_microblock_hash,
parentMicroblockSequence: data.block.parent_microblock_sequence,
burnBlockTime: data.block.burn_block_time,
}
);

// Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
const orphanedAndMissingTxs = orphanedMicroblockTxs.filter(
tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id)
);
const restoredMempoolTxs = await this.restoreMempoolTxs(
client,
orphanedAndMissingTxs.map(tx => tx.tx_id)
);
restoredMempoolTxs.restoredTxs.forEach(txId => {
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
});
// Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
const orphanedAndMissingTxs = orphanedMicroblockTxs.filter(
tx => !data.txs.find(r => tx.tx_id === r.tx.tx_id)
);
const restoredMempoolTxs = await this.restoreMempoolTxs(
client,
orphanedAndMissingTxs.map(tx => tx.tx_id)
);
restoredMempoolTxs.restoredTxs.forEach(txId => {
logger.info(`Restored micro-orphaned tx to mempool ${txId}`);
});

// Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
batchedTxData = data.txs.filter(entry => {
const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id);
return !matchingTx;
});
// Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
batchedTxData = batchedTxData.filter(entry => {
const matchingTx = acceptedMicroblockTxs.find(tx => tx.tx_id === entry.tx.tx_id);
return !matchingTx;
});
}

// TODO(mb): sanity tests on tx_index on batchedTxData, re-normalize if necessary

Expand Down Expand Up @@ -1267,7 +1277,12 @@ export class PgDataStore
parentMicroblockSequence: number;
burnBlockTime: number;
}
): Promise<{ acceptedMicroblockTxs: DbTx[]; orphanedMicroblockTxs: DbTx[] }> {
): Promise<{
acceptedMicroblockTxs: DbTx[];
orphanedMicroblockTxs: DbTx[];
acceptedMicroblocks: string[];
orphanedMicroblocks: string[];
}> {
// Find the parent microblock if this anchor block points to one. If not, perform a sanity check for expected block headers in this case:
// > Anchored blocks that do not have parent microblock streams will have their parent microblock header hashes set to all 0's, and the parent microblock sequence number set to 0.
let acceptedMicroblockTip: DbMicroblock | undefined;
Expand All @@ -1294,7 +1309,7 @@ export class PgDataStore
acceptedMicroblockTip = this.parseMicroblockQueryResult(microblockTipQuery.rows[0]);
}

// Identify microblocks that were either excepted or orphaned by this anchor block.
// Identify microblocks that were either accepted or orphaned by this anchor block.
const unanchoredMicroblocksAtTip = await this.findUnanchoredMicroblocksAtChainTip(
client,
blockData.parentIndexBlockHash,
Expand Down Expand Up @@ -1337,6 +1352,8 @@ export class PgDataStore
return {
acceptedMicroblockTxs,
orphanedMicroblockTxs,
acceptedMicroblocks,
orphanedMicroblocks,
};
}

Expand Down Expand Up @@ -1454,8 +1471,17 @@ export class PgDataStore
args.microblocks.map(mb => hexToBuffer(mb)),
]
);

// Any txs restored need to be pruned from the mempool
const updatedMbTxs = updatedMbTxsQuery.rows.map(r => this.parseTxQueryResult(r));
const txsToPrune = updatedMbTxs
.filter(tx => tx.canonical && tx.microblock_canonical)
.map(tx => tx.tx_id);
const removedTxsResult = await this.pruneMempoolTxs(client, txsToPrune);
if (removedTxsResult.removedTxs.length > 0) {
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during micro-reorg handling`
);
}

// Update the `index_block_hash` and `microblock_canonical` properties on all the tables containing other
// microblock-tx metadata that have been accepted or orphaned in this anchor block.
Expand Down Expand Up @@ -1902,29 +1928,6 @@ export class PgDataStore
canonical: boolean,
updatedEntities: UpdatedEntities
): Promise<{ txsMarkedCanonical: string[]; txsMarkedNonCanonical: string[] }> {
const microblockResult = await client.query<{ microblock_hash: Buffer }>(
`
UPDATE microblocks
SET canonical = $2
WHERE index_block_hash = $1 AND canonical != $2
RETURNING microblock_hash
`,
[indexBlockHash, canonical]
);
const microblockHashes = microblockResult.rows.map(row =>
bufferToHexPrefixString(row.microblock_hash)
);
if (canonical) {
updatedEntities.markedCanonical.microblocks += microblockResult.rowCount;
} else {
updatedEntities.markedNonCanonical.microblocks += microblockResult.rowCount;
}
for (const microblockHash of microblockHashes) {
logger.verbose(
`Marked microblock as ${canonical ? 'canonical' : 'non-canonical'}: ${microblockHash}`
);
}

const txResult = await client.query<TxQueryResult>(
`
UPDATE txs
Expand Down Expand Up @@ -2118,18 +2121,6 @@ export class PgDataStore
}
updatedEntities.markedCanonical.blocks++;

const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]);
await this.updateMicroCanonical(client, {
isCanonical: true,
blockHeight: restoredBlock.block_height,
blockHash: restoredBlock.block_hash,
indexBlockHash: restoredBlock.index_block_hash,
parentIndexBlockHash: restoredBlock.parent_index_block_hash,
parentMicroblockHash: restoredBlock.parent_microblock_hash,
parentMicroblockSequence: restoredBlock.parent_microblock_sequence,
burnBlockTime: restoredBlock.burn_block_time,
});

const orphanedBlockResult = await client.query<BlockQueryResult>(
`
-- orphan the now conflicting block at the same height
Expand All @@ -2140,10 +2131,14 @@ export class PgDataStore
`,
[restoredBlockResult.rows[0].block_height, indexBlockHash]
);

const microblocksOrphaned = new Set<string>();
const microblocksAccepted = new Set<string>();

if (orphanedBlockResult.rowCount > 0) {
const orphanedBlocks = orphanedBlockResult.rows.map(b => this.parseBlockQueryResult(b));
for (const orphanedBlock of orphanedBlocks) {
await this.updateMicroCanonical(client, {
const microCanonicalUpdateResult = await this.updateMicroCanonical(client, {
isCanonical: false,
blockHeight: orphanedBlock.block_height,
blockHash: orphanedBlock.block_hash,
Expand All @@ -2153,6 +2148,14 @@ export class PgDataStore
parentMicroblockSequence: orphanedBlock.parent_microblock_sequence,
burnBlockTime: orphanedBlock.burn_block_time,
});
microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => {
microblocksOrphaned.add(mb);
microblocksAccepted.delete(mb);
});
microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => {
microblocksOrphaned.delete(mb);
microblocksAccepted.add(mb);
});
}

updatedEntities.markedNonCanonical.blocks++;
Expand All @@ -2165,14 +2168,49 @@ export class PgDataStore
await this.restoreMempoolTxs(client, markNonCanonicalResult.txsMarkedNonCanonical);
}

// The canonical microblock tables _must_ be restored _after_ orphaning all other blocks at a given height,
// because there is only 1 row per microblock hash, and both the orphaned blocks at this height and the
// canonical block can be pointed to the same microblocks.
const restoredBlock = this.parseBlockQueryResult(restoredBlockResult.rows[0]);
const microCanonicalUpdateResult = await this.updateMicroCanonical(client, {
isCanonical: true,
blockHeight: restoredBlock.block_height,
blockHash: restoredBlock.block_hash,
indexBlockHash: restoredBlock.index_block_hash,
parentIndexBlockHash: restoredBlock.parent_index_block_hash,
parentMicroblockHash: restoredBlock.parent_microblock_hash,
parentMicroblockSequence: restoredBlock.parent_microblock_sequence,
burnBlockTime: restoredBlock.burn_block_time,
});
microCanonicalUpdateResult.orphanedMicroblocks.forEach(mb => {
microblocksOrphaned.add(mb);
microblocksAccepted.delete(mb);
});
microCanonicalUpdateResult.acceptedMicroblocks.forEach(mb => {
microblocksOrphaned.delete(mb);
microblocksAccepted.add(mb);
});
updatedEntities.markedCanonical.microblocks += microblocksAccepted.size;
updatedEntities.markedNonCanonical.microblocks += microblocksOrphaned.size;

microblocksOrphaned.forEach(mb => logger.verbose(`Marked microblock as non-canonical: ${mb}`));
microblocksAccepted.forEach(mb => logger.verbose(`Marked microblock as canonical: ${mb}`));

const markCanonicalResult = await this.markEntitiesCanonical(
client,
indexBlockHash,
true,
updatedEntities
);
await this.pruneMempoolTxs(client, markCanonicalResult.txsMarkedCanonical);

const removedTxsResult = await this.pruneMempoolTxs(
client,
markCanonicalResult.txsMarkedCanonical
);
if (removedTxsResult.removedTxs.length > 0) {
logger.verbose(
`Removed ${removedTxsResult.removedTxs.length} txs from mempool table during reorg handling`
);
}
const parentResult = await client.query<{ index_block_hash: Buffer }>(
`
-- check if the parent block is also orphaned
Expand Down