@@ -1034,8 +1034,8 @@ export class PgDataStore
1034
1034
const candidateTxIds = data . txs . map ( d => d . tx . tx_id ) ;
1035
1035
const removedTxsResult = await this . pruneMempoolTxs ( client , candidateTxIds ) ;
1036
1036
if ( removedTxsResult . removedTxs . length > 0 ) {
1037
- logger . debug (
1038
- `Removed ${ removedTxsResult . removedTxs . length } microblock-txs from mempool table`
1037
+ logger . verbose (
1038
+ `Removed ${ removedTxsResult . removedTxs . length } microblock-txs from mempool table during microblock ingestion `
1039
1039
) ;
1040
1040
}
1041
1041
} ) ;
@@ -1068,7 +1068,9 @@ export class PgDataStore
1068
1068
const candidateTxIds = data . txs . map ( d => d . tx . tx_id ) ;
1069
1069
const removedTxsResult = await this . pruneMempoolTxs ( client , candidateTxIds ) ;
1070
1070
if ( removedTxsResult . removedTxs . length > 0 ) {
1071
- logger . debug ( `Removed ${ removedTxsResult . removedTxs . length } txs from mempool table` ) ;
1071
+ logger . verbose (
1072
+ `Removed ${ removedTxsResult . removedTxs . length } txs from mempool table during new block ingestion`
1073
+ ) ;
1072
1074
}
1073
1075
}
1074
1076
@@ -1110,6 +1112,8 @@ export class PgDataStore
1110
1112
data . block . execution_cost_write_count = totalCost . execution_cost_write_count ;
1111
1113
data . block . execution_cost_write_length = totalCost . execution_cost_write_length ;
1112
1114
1115
+ let batchedTxData : DataStoreTxEventData [ ] = data . txs ;
1116
+
1113
1117
// Find microblocks that weren't already inserted via the unconfirmed microblock event.
1114
1118
// This happens when a stacks-node is syncing and receives confirmed microblocks with their anchor block at the same time.
1115
1119
if ( data . microblocks . length > 0 ) {
@@ -1136,44 +1140,50 @@ export class PgDataStore
1136
1140
const missingTxs = data . txs . filter ( entry =>
1137
1141
missingMicroblockHashes . has ( entry . tx . microblock_hash )
1138
1142
) ;
1139
- // TODO(mb): the microblock code after this line should take into account this already inserted confirmed microblock data,
1140
- // right now it performs redundant updates, blindly treating all microblock txs as unconfirmed.
1141
1143
await this . insertMicroblockData ( client , missingMicroblocks , missingTxs ) ;
1144
+
1145
+ // Clear already inserted microblock txs from the anchor-block update data to avoid duplicate inserts.
1146
+ batchedTxData = batchedTxData . filter ( entry => {
1147
+ return ! missingMicroblockHashes . has ( entry . tx . microblock_hash ) ;
1148
+ } ) ;
1142
1149
}
1143
1150
}
1144
1151
1145
- let batchedTxData : DataStoreTxEventData [ ] = data . txs ;
1146
- const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this . updateMicroCanonical (
1147
- client ,
1148
- {
1149
- isCanonical : isCanonical ,
1150
- blockHeight : data . block . block_height ,
1151
- blockHash : data . block . block_hash ,
1152
- indexBlockHash : data . block . index_block_hash ,
1153
- parentIndexBlockHash : data . block . parent_index_block_hash ,
1154
- parentMicroblockHash : data . block . parent_microblock_hash ,
1155
- parentMicroblockSequence : data . block . parent_microblock_sequence ,
1156
- burnBlockTime : data . block . burn_block_time ,
1157
- }
1158
- ) ;
1152
+ // When processing an immediately-non-canonical block, do not orphan and possible existing microblocks
1153
+ // which may be still considered canonical by the canonical block at this height.
1154
+ if ( isCanonical ) {
1155
+ const { acceptedMicroblockTxs, orphanedMicroblockTxs } = await this . updateMicroCanonical (
1156
+ client ,
1157
+ {
1158
+ isCanonical : isCanonical ,
1159
+ blockHeight : data . block . block_height ,
1160
+ blockHash : data . block . block_hash ,
1161
+ indexBlockHash : data . block . index_block_hash ,
1162
+ parentIndexBlockHash : data . block . parent_index_block_hash ,
1163
+ parentMicroblockHash : data . block . parent_microblock_hash ,
1164
+ parentMicroblockSequence : data . block . parent_microblock_sequence ,
1165
+ burnBlockTime : data . block . burn_block_time ,
1166
+ }
1167
+ ) ;
1159
1168
1160
- // Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
1161
- const orphanedAndMissingTxs = orphanedMicroblockTxs . filter (
1162
- tx => ! data . txs . find ( r => tx . tx_id === r . tx . tx_id )
1163
- ) ;
1164
- const restoredMempoolTxs = await this . restoreMempoolTxs (
1165
- client ,
1166
- orphanedAndMissingTxs . map ( tx => tx . tx_id )
1167
- ) ;
1168
- restoredMempoolTxs . restoredTxs . forEach ( txId => {
1169
- logger . info ( `Restored micro-orphaned tx to mempool ${ txId } ` ) ;
1170
- } ) ;
1169
+ // Identify any micro-orphaned txs that also didn't make it into this anchor block, and restore them into the mempool
1170
+ const orphanedAndMissingTxs = orphanedMicroblockTxs . filter (
1171
+ tx => ! data . txs . find ( r => tx . tx_id === r . tx . tx_id )
1172
+ ) ;
1173
+ const restoredMempoolTxs = await this . restoreMempoolTxs (
1174
+ client ,
1175
+ orphanedAndMissingTxs . map ( tx => tx . tx_id )
1176
+ ) ;
1177
+ restoredMempoolTxs . restoredTxs . forEach ( txId => {
1178
+ logger . info ( `Restored micro-orphaned tx to mempool ${ txId } ` ) ;
1179
+ } ) ;
1171
1180
1172
- // Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
1173
- batchedTxData = data . txs . filter ( entry => {
1174
- const matchingTx = acceptedMicroblockTxs . find ( tx => tx . tx_id === entry . tx . tx_id ) ;
1175
- return ! matchingTx ;
1176
- } ) ;
1181
+ // Clear accepted microblock txs from the anchor-block update data to avoid duplicate inserts.
1182
+ batchedTxData = batchedTxData . filter ( entry => {
1183
+ const matchingTx = acceptedMicroblockTxs . find ( tx => tx . tx_id === entry . tx . tx_id ) ;
1184
+ return ! matchingTx ;
1185
+ } ) ;
1186
+ }
1177
1187
1178
1188
// TODO(mb): sanity tests on tx_index on batchedTxData, re-normalize if necessary
1179
1189
@@ -1267,7 +1277,12 @@ export class PgDataStore
1267
1277
parentMicroblockSequence : number ;
1268
1278
burnBlockTime : number ;
1269
1279
}
1270
- ) : Promise < { acceptedMicroblockTxs : DbTx [ ] ; orphanedMicroblockTxs : DbTx [ ] } > {
1280
+ ) : Promise < {
1281
+ acceptedMicroblockTxs : DbTx [ ] ;
1282
+ orphanedMicroblockTxs : DbTx [ ] ;
1283
+ acceptedMicroblocks : string [ ] ;
1284
+ orphanedMicroblocks : string [ ] ;
1285
+ } > {
1271
1286
// Find the parent microblock if this anchor block points to one. If not, perform a sanity check for expected block headers in this case:
1272
1287
// > Anchored blocks that do not have parent microblock streams will have their parent microblock header hashes set to all 0's, and the parent microblock sequence number set to 0.
1273
1288
let acceptedMicroblockTip : DbMicroblock | undefined ;
@@ -1294,7 +1309,7 @@ export class PgDataStore
1294
1309
acceptedMicroblockTip = this . parseMicroblockQueryResult ( microblockTipQuery . rows [ 0 ] ) ;
1295
1310
}
1296
1311
1297
- // Identify microblocks that were either excepted or orphaned by this anchor block.
1312
+ // Identify microblocks that were either accepted or orphaned by this anchor block.
1298
1313
const unanchoredMicroblocksAtTip = await this . findUnanchoredMicroblocksAtChainTip (
1299
1314
client ,
1300
1315
blockData . parentIndexBlockHash ,
@@ -1337,6 +1352,8 @@ export class PgDataStore
1337
1352
return {
1338
1353
acceptedMicroblockTxs,
1339
1354
orphanedMicroblockTxs,
1355
+ acceptedMicroblocks,
1356
+ orphanedMicroblocks,
1340
1357
} ;
1341
1358
}
1342
1359
@@ -1454,8 +1471,17 @@ export class PgDataStore
1454
1471
args . microblocks . map ( mb => hexToBuffer ( mb ) ) ,
1455
1472
]
1456
1473
) ;
1457
-
1474
+ // Any txs restored need to be pruned from the mempool
1458
1475
const updatedMbTxs = updatedMbTxsQuery . rows . map ( r => this . parseTxQueryResult ( r ) ) ;
1476
+ const txsToPrune = updatedMbTxs
1477
+ . filter ( tx => tx . canonical && tx . microblock_canonical )
1478
+ . map ( tx => tx . tx_id ) ;
1479
+ const removedTxsResult = await this . pruneMempoolTxs ( client , txsToPrune ) ;
1480
+ if ( removedTxsResult . removedTxs . length > 0 ) {
1481
+ logger . verbose (
1482
+ `Removed ${ removedTxsResult . removedTxs . length } txs from mempool table during micro-reorg handling`
1483
+ ) ;
1484
+ }
1459
1485
1460
1486
// Update the `index_block_hash` and `microblock_canonical` properties on all the tables containing other
1461
1487
// microblock-tx metadata that have been accepted or orphaned in this anchor block.
@@ -1902,29 +1928,6 @@ export class PgDataStore
1902
1928
canonical : boolean ,
1903
1929
updatedEntities : UpdatedEntities
1904
1930
) : Promise < { txsMarkedCanonical : string [ ] ; txsMarkedNonCanonical : string [ ] } > {
1905
- const microblockResult = await client . query < { microblock_hash : Buffer } > (
1906
- `
1907
- UPDATE microblocks
1908
- SET canonical = $2
1909
- WHERE index_block_hash = $1 AND canonical != $2
1910
- RETURNING microblock_hash
1911
- ` ,
1912
- [ indexBlockHash , canonical ]
1913
- ) ;
1914
- const microblockHashes = microblockResult . rows . map ( row =>
1915
- bufferToHexPrefixString ( row . microblock_hash )
1916
- ) ;
1917
- if ( canonical ) {
1918
- updatedEntities . markedCanonical . microblocks += microblockResult . rowCount ;
1919
- } else {
1920
- updatedEntities . markedNonCanonical . microblocks += microblockResult . rowCount ;
1921
- }
1922
- for ( const microblockHash of microblockHashes ) {
1923
- logger . verbose (
1924
- `Marked microblock as ${ canonical ? 'canonical' : 'non-canonical' } : ${ microblockHash } `
1925
- ) ;
1926
- }
1927
-
1928
1931
const txResult = await client . query < TxQueryResult > (
1929
1932
`
1930
1933
UPDATE txs
@@ -2118,18 +2121,6 @@ export class PgDataStore
2118
2121
}
2119
2122
updatedEntities . markedCanonical . blocks ++ ;
2120
2123
2121
- const restoredBlock = this . parseBlockQueryResult ( restoredBlockResult . rows [ 0 ] ) ;
2122
- await this . updateMicroCanonical ( client , {
2123
- isCanonical : true ,
2124
- blockHeight : restoredBlock . block_height ,
2125
- blockHash : restoredBlock . block_hash ,
2126
- indexBlockHash : restoredBlock . index_block_hash ,
2127
- parentIndexBlockHash : restoredBlock . parent_index_block_hash ,
2128
- parentMicroblockHash : restoredBlock . parent_microblock_hash ,
2129
- parentMicroblockSequence : restoredBlock . parent_microblock_sequence ,
2130
- burnBlockTime : restoredBlock . burn_block_time ,
2131
- } ) ;
2132
-
2133
2124
const orphanedBlockResult = await client . query < BlockQueryResult > (
2134
2125
`
2135
2126
-- orphan the now conflicting block at the same height
@@ -2140,10 +2131,14 @@ export class PgDataStore
2140
2131
` ,
2141
2132
[ restoredBlockResult . rows [ 0 ] . block_height , indexBlockHash ]
2142
2133
) ;
2134
+
2135
+ const microblocksOrphaned = new Set < string > ( ) ;
2136
+ const microblocksAccepted = new Set < string > ( ) ;
2137
+
2143
2138
if ( orphanedBlockResult . rowCount > 0 ) {
2144
2139
const orphanedBlocks = orphanedBlockResult . rows . map ( b => this . parseBlockQueryResult ( b ) ) ;
2145
2140
for ( const orphanedBlock of orphanedBlocks ) {
2146
- await this . updateMicroCanonical ( client , {
2141
+ const microCanonicalUpdateResult = await this . updateMicroCanonical ( client , {
2147
2142
isCanonical : false ,
2148
2143
blockHeight : orphanedBlock . block_height ,
2149
2144
blockHash : orphanedBlock . block_hash ,
@@ -2153,6 +2148,14 @@ export class PgDataStore
2153
2148
parentMicroblockSequence : orphanedBlock . parent_microblock_sequence ,
2154
2149
burnBlockTime : orphanedBlock . burn_block_time ,
2155
2150
} ) ;
2151
+ microCanonicalUpdateResult . orphanedMicroblocks . forEach ( mb => {
2152
+ microblocksOrphaned . add ( mb ) ;
2153
+ microblocksAccepted . delete ( mb ) ;
2154
+ } ) ;
2155
+ microCanonicalUpdateResult . acceptedMicroblocks . forEach ( mb => {
2156
+ microblocksOrphaned . delete ( mb ) ;
2157
+ microblocksAccepted . add ( mb ) ;
2158
+ } ) ;
2156
2159
}
2157
2160
2158
2161
updatedEntities . markedNonCanonical . blocks ++ ;
@@ -2165,14 +2168,49 @@ export class PgDataStore
2165
2168
await this . restoreMempoolTxs ( client , markNonCanonicalResult . txsMarkedNonCanonical ) ;
2166
2169
}
2167
2170
2171
+ // The canonical microblock tables _must_ be restored _after_ orphaning all other blocks at a given height,
2172
+ // because there is only 1 row per microblock hash, and both the orphaned blocks at this height and the
2173
+ // canonical block can be pointed to the same microblocks.
2174
+ const restoredBlock = this . parseBlockQueryResult ( restoredBlockResult . rows [ 0 ] ) ;
2175
+ const microCanonicalUpdateResult = await this . updateMicroCanonical ( client , {
2176
+ isCanonical : true ,
2177
+ blockHeight : restoredBlock . block_height ,
2178
+ blockHash : restoredBlock . block_hash ,
2179
+ indexBlockHash : restoredBlock . index_block_hash ,
2180
+ parentIndexBlockHash : restoredBlock . parent_index_block_hash ,
2181
+ parentMicroblockHash : restoredBlock . parent_microblock_hash ,
2182
+ parentMicroblockSequence : restoredBlock . parent_microblock_sequence ,
2183
+ burnBlockTime : restoredBlock . burn_block_time ,
2184
+ } ) ;
2185
+ microCanonicalUpdateResult . orphanedMicroblocks . forEach ( mb => {
2186
+ microblocksOrphaned . add ( mb ) ;
2187
+ microblocksAccepted . delete ( mb ) ;
2188
+ } ) ;
2189
+ microCanonicalUpdateResult . acceptedMicroblocks . forEach ( mb => {
2190
+ microblocksOrphaned . delete ( mb ) ;
2191
+ microblocksAccepted . add ( mb ) ;
2192
+ } ) ;
2193
+ updatedEntities . markedCanonical . microblocks += microblocksAccepted . size ;
2194
+ updatedEntities . markedNonCanonical . microblocks += microblocksOrphaned . size ;
2195
+
2196
+ microblocksOrphaned . forEach ( mb => logger . verbose ( `Marked microblock as non-canonical: ${ mb } ` ) ) ;
2197
+ microblocksAccepted . forEach ( mb => logger . verbose ( `Marked microblock as canonical: ${ mb } ` ) ) ;
2198
+
2168
2199
const markCanonicalResult = await this . markEntitiesCanonical (
2169
2200
client ,
2170
2201
indexBlockHash ,
2171
2202
true ,
2172
2203
updatedEntities
2173
2204
) ;
2174
- await this . pruneMempoolTxs ( client , markCanonicalResult . txsMarkedCanonical ) ;
2175
-
2205
+ const removedTxsResult = await this . pruneMempoolTxs (
2206
+ client ,
2207
+ markCanonicalResult . txsMarkedCanonical
2208
+ ) ;
2209
+ if ( removedTxsResult . removedTxs . length > 0 ) {
2210
+ logger . verbose (
2211
+ `Removed ${ removedTxsResult . removedTxs . length } txs from mempool table during reorg handling`
2212
+ ) ;
2213
+ }
2176
2214
const parentResult = await client . query < { index_block_hash : Buffer } > (
2177
2215
`
2178
2216
-- check if the parent block is also orphaned
0 commit comments