Skip to content

Commit 28f2ced

Browse files
committed
feat: compact block async
1 parent ce62624 commit 28f2ced

File tree

11 files changed

+178
-107
lines changed

11 files changed

+178
-107
lines changed

sync/src/relayer/block_transactions_process.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl<'a> BlockTransactionsProcess<'a> {
4242
}
4343
}
4444

45-
pub fn execute(self) -> Status {
45+
pub async fn execute(self) -> Status {
4646
let shared = self.relayer.shared();
4747
let active_chain = shared.active_chain();
4848
let block_transactions = self.message.to_entity();
@@ -65,6 +65,7 @@ impl<'a> BlockTransactionsProcess<'a> {
6565
if let Entry::Occupied(mut pending) = shared
6666
.state()
6767
.pending_compact_blocks()
68+
.await
6869
.entry(block_hash.clone())
6970
{
7071
let (compact_block, peers_map, _) = pending.get_mut();
@@ -87,13 +88,16 @@ impl<'a> BlockTransactionsProcess<'a> {
8788
&received_uncles,
8889
));
8990

90-
let ret = self.relayer.reconstruct_block(
91-
&active_chain,
92-
compact_block,
93-
received_transactions,
94-
expected_uncle_indexes,
95-
&received_uncles,
96-
);
91+
let ret = self
92+
.relayer
93+
.reconstruct_block(
94+
&active_chain,
95+
compact_block,
96+
received_transactions,
97+
expected_uncle_indexes,
98+
&received_uncles,
99+
)
100+
.await;
97101

98102
// Request proposal
99103
{
@@ -167,13 +171,7 @@ impl<'a> BlockTransactionsProcess<'a> {
167171
.build();
168172
let message = packed::RelayMessage::new_builder().set(content).build();
169173

170-
self.relayer
171-
.shared()
172-
.shared()
173-
.async_handle()
174-
.spawn(
175-
async move { send_message_to_async(&self.nc, self.peer, &message).await },
176-
);
174+
let _ignore = send_message_to_async(&self.nc, self.peer, &message).await;
177175

178176
let _ignore_prev_value =
179177
mem::replace(expected_transaction_indexes, missing_transactions);

sync/src/relayer/compact_block_process.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl<'a> CompactBlockProcess<'a> {
5353
}
5454
}
5555

56-
pub fn execute(self) -> Status {
56+
pub async fn execute(self) -> Status {
5757
let instant = Instant::now();
5858
let shared = self.relayer.shared();
5959
let active_chain = shared.active_chain();
@@ -67,7 +67,7 @@ impl<'a> CompactBlockProcess<'a> {
6767
return status;
6868
}
6969

70-
let status = contextual_check(&header, shared, &active_chain, &self.nc, self.peer);
70+
let status = contextual_check(&header, shared, &active_chain, &self.nc, self.peer).await;
7171
if !status.is_ok() {
7272
return status;
7373
}
@@ -89,7 +89,8 @@ impl<'a> CompactBlockProcess<'a> {
8989
// Reconstruct block
9090
let ret = self
9191
.relayer
92-
.reconstruct_block(&active_chain, &compact_block, vec![], &[], &[]);
92+
.reconstruct_block(&active_chain, &compact_block, vec![], &[], &[])
93+
.await;
9394

9495
// Accept block
9596
// `relayer.accept_block` will make sure the validity of block before persisting
@@ -102,7 +103,7 @@ impl<'a> CompactBlockProcess<'a> {
102103
.inc_by(block.transactions().len() as u64);
103104
metrics.ckb_relay_cb_reconstruct_ok.inc();
104105
}
105-
let mut pending_compact_blocks = shared.state().pending_compact_blocks();
106+
let mut pending_compact_blocks = shared.state().pending_compact_blocks().await;
106107
pending_compact_blocks.remove(&block_hash);
107108
// remove all pending request below this block epoch
108109
//
@@ -144,7 +145,8 @@ impl<'a> CompactBlockProcess<'a> {
144145
missing_transactions,
145146
missing_uncles,
146147
self.peer,
147-
);
148+
)
149+
.await;
148150

149151
StatusCode::CompactBlockRequiresFreshTransactions.with_context(&block_hash)
150152
}
@@ -163,7 +165,8 @@ impl<'a> CompactBlockProcess<'a> {
163165
missing_transactions,
164166
missing_uncles,
165167
self.peer,
166-
);
168+
)
169+
.await;
167170
StatusCode::CompactBlockMeetsShortIdsCollision.with_context(&block_hash)
168171
}
169172
ReconstructionResult::Error(status) => status,
@@ -224,7 +227,7 @@ fn non_contextual_check(
224227
/// * check compact block's parent block is not stored in db
225228
/// * check compact block is in pending
226229
/// * check compact header verification
227-
fn contextual_check(
230+
async fn contextual_check(
228231
compact_block_header: &HeaderView,
229232
shared: &Arc<SyncShared>,
230233
active_chain: &ActiveChain,
@@ -278,7 +281,7 @@ fn contextual_check(
278281
}
279282

280283
// compact block is in pending
281-
let pending_compact_blocks = shared.state().pending_compact_blocks();
284+
let pending_compact_blocks = shared.state().pending_compact_blocks().await;
282285
if pending_compact_blocks
283286
.get(&block_hash)
284287
.map(|(_, peers_map, _)| peers_map.contains_key(&peer))
@@ -339,7 +342,7 @@ fn contextual_check(
339342
}
340343

341344
/// request missing txs and uncles from peer
342-
fn missing_or_collided_post_process(
345+
async fn missing_or_collided_post_process(
343346
compact_block: CompactBlock,
344347
block_hash: Byte32,
345348
shared: &SyncShared,
@@ -351,6 +354,7 @@ fn missing_or_collided_post_process(
351354
shared
352355
.state()
353356
.pending_compact_blocks()
357+
.await
354358
.entry(block_hash.clone())
355359
.or_insert_with(|| (compact_block, HashMap::default(), unix_time_as_millis()))
356360
.1

sync/src/relayer/get_block_proposal_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl<'a> GetBlockProposalProcess<'a> {
5353

5454
let fetched_transactions = {
5555
let tx_pool = self.relayer.shared.shared().tx_pool_controller();
56-
let fetch_txs = tx_pool.fetch_txs_async(proposals.clone()).await;
56+
let fetch_txs = tx_pool.fetch_txs(proposals.clone()).await;
5757
if let Err(e) = fetch_txs {
5858
debug_target!(
5959
crate::LOG_TARGET_RELAY,

sync/src/relayer/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ impl Relayer {
121121

122122
match message {
123123
packed::RelayMessageUnionReader::CompactBlock(reader) => {
124-
CompactBlockProcess::new(reader, self, nc, peer).execute()
124+
CompactBlockProcess::new(reader, self, nc, peer)
125+
.execute()
126+
.await
125127
}
126128
packed::RelayMessageUnionReader::RelayTransactions(reader) => {
127129
if reader.check_data() {
@@ -146,7 +148,9 @@ impl Relayer {
146148
}
147149
packed::RelayMessageUnionReader::BlockTransactions(reader) => {
148150
if reader.check_data() {
149-
BlockTransactionsProcess::new(reader, self, nc, peer).execute()
151+
BlockTransactionsProcess::new(reader, self, nc, peer)
152+
.execute()
153+
.await
150154
} else {
151155
StatusCode::ProtocolMessageIsMalformed
152156
.with_context("BlockTransactions is invalid")
@@ -337,7 +341,7 @@ impl Relayer {
337341
// then once the block has been reconstructed, it shall be processed as normal,
338342
// keeping in mind that short_ids are expected to occasionally collide,
339343
// and that nodes must not be penalized for such collisions, wherever they appear.
340-
pub fn reconstruct_block(
344+
pub async fn reconstruct_block(
341345
&self,
342346
active_chain: &ActiveChain,
343347
compact_block: &packed::CompactBlock,
@@ -371,7 +375,7 @@ impl Relayer {
371375

372376
if !short_ids_set.is_empty() {
373377
let tx_pool = self.shared.shared().tx_pool_controller();
374-
let fetch_txs = tx_pool.fetch_txs(short_ids_set);
378+
let fetch_txs = tx_pool.fetch_txs(short_ids_set).await;
375379
if let Err(e) = fetch_txs {
376380
return ReconstructionResult::Error(StatusCode::TxPool.with_context(e));
377381
}
@@ -537,7 +541,7 @@ impl Relayer {
537541
let tx_pool = self.shared.shared().tx_pool_controller();
538542

539543
let fetch_txs = tx_pool
540-
.fetch_txs_async(
544+
.fetch_txs(
541545
get_block_proposals
542546
.iter()
543547
.map(|kv_pair| kv_pair.key().clone())

sync/src/relayer/tests/block_transactions_process.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ fn test_accept_block() {
2121
let (_chain, relayer, _) = build_chain(5);
2222
let peer_index: PeerIndex = 100.into();
2323
let other_peer_index: PeerIndex = 101.into();
24+
let rt = tokio::runtime::Builder::new_current_thread()
25+
.enable_all()
26+
.build()
27+
.unwrap();
2428

2529
let tx1 = TransactionBuilder::default().build();
2630
let tx2 = TransactionBuilder::default()
@@ -54,7 +58,8 @@ fn test_accept_block() {
5458
let hash = compact_block.header().calc_header_hash();
5559

5660
{
57-
let mut pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
61+
let mut pending_compact_blocks =
62+
rt.block_on(relayer.shared.state().pending_compact_blocks());
5863
pending_compact_blocks.insert(
5964
hash.clone(),
6065
(
@@ -84,9 +89,9 @@ fn test_accept_block() {
8489
peer_index,
8590
);
8691

87-
assert_eq!(process.execute(), Status::ok());
92+
assert_eq!(rt.block_on(process.execute()), Status::ok());
8893

89-
let pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
94+
let pending_compact_blocks = rt.block_on(relayer.shared.state().pending_compact_blocks());
9095
assert!(pending_compact_blocks.get(&hash).is_none());
9196

9297
std::thread::sleep(std::time::Duration::from_millis(100));
@@ -102,6 +107,10 @@ fn test_accept_block() {
102107
fn test_unknown_request() {
103108
let (_chain, relayer, _) = build_chain(5);
104109
let peer_index: PeerIndex = 100.into();
110+
let rt = tokio::runtime::Builder::new_current_thread()
111+
.enable_all()
112+
.build()
113+
.unwrap();
105114

106115
let tx1 = TransactionBuilder::default().build();
107116
let tx2 = TransactionBuilder::default()
@@ -123,7 +132,8 @@ fn test_unknown_request() {
123132

124133
let foo_peer_index: PeerIndex = 998.into();
125134
{
126-
let mut pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
135+
let mut pending_compact_blocks =
136+
rt.block_on(relayer.shared.state().pending_compact_blocks());
127137
pending_compact_blocks.insert(
128138
compact_block.header().calc_header_hash(),
129139
(
@@ -148,13 +158,17 @@ fn test_unknown_request() {
148158
Arc::<MockProtocolContext>::clone(&nc),
149159
peer_index,
150160
);
151-
assert_eq!(process.execute(), Status::ignored());
161+
assert_eq!(rt.block_on(process.execute()), Status::ignored());
152162
}
153163

154164
#[test]
155165
fn test_invalid_transaction_root() {
156166
let (_chain, relayer, _) = build_chain(5);
157167
let peer_index: PeerIndex = 100.into();
168+
let rt = tokio::runtime::Builder::new_current_thread()
169+
.enable_all()
170+
.build()
171+
.unwrap();
158172

159173
let tx1 = TransactionBuilder::default().build();
160174
let tx2 = TransactionBuilder::default()
@@ -188,7 +202,8 @@ fn test_invalid_transaction_root() {
188202
let block_hash = compact_block.header().calc_header_hash();
189203

190204
{
191-
let mut pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
205+
let mut pending_compact_blocks =
206+
rt.block_on(relayer.shared.state().pending_compact_blocks());
192207
pending_compact_blocks.insert(
193208
block_hash.clone(),
194209
(
@@ -214,14 +229,18 @@ fn test_invalid_transaction_root() {
214229
peer_index,
215230
);
216231
assert_eq!(
217-
process.execute(),
232+
rt.block_on(process.execute()),
218233
StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock.into(),
219234
);
220235
}
221236

222237
#[test]
223238
fn test_collision_and_send_missing_indexes() {
224239
let (_chain, relayer, _) = build_chain(5);
240+
let rt = tokio::runtime::Builder::new_current_thread()
241+
.enable_all()
242+
.build()
243+
.unwrap();
225244

226245
let active_chain = relayer.shared.active_chain();
227246
let last_block = relayer
@@ -284,7 +303,8 @@ fn test_collision_and_send_missing_indexes() {
284303

285304
let hash = compact_block.header().calc_header_hash();
286305
{
287-
let mut pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
306+
let mut pending_compact_blocks =
307+
rt.block_on(relayer.shared.state().pending_compact_blocks());
288308
pending_compact_blocks.insert(
289309
hash.clone(),
290310
(
@@ -310,7 +330,7 @@ fn test_collision_and_send_missing_indexes() {
310330
peer_index,
311331
);
312332
assert_eq!(
313-
process.execute(),
333+
rt.block_on(process.execute()),
314334
StatusCode::CompactBlockMeetsShortIdsCollision.into()
315335
);
316336

@@ -327,7 +347,7 @@ fn test_collision_and_send_missing_indexes() {
327347

328348
// update cached missing_index
329349
{
330-
let pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
350+
let pending_compact_blocks = rt.block_on(relayer.shared.state().pending_compact_blocks());
331351
assert_eq!(
332352
pending_compact_blocks
333353
.get(&hash)
@@ -354,7 +374,7 @@ fn test_collision_and_send_missing_indexes() {
354374
peer_index,
355375
);
356376
assert_eq!(
357-
process.execute(),
377+
rt.block_on(process.execute()),
358378
StatusCode::CompactBlockHasUnmatchedTransactionRootWithReconstructedBlock.into(),
359379
);
360380
}
@@ -369,6 +389,10 @@ fn test_missing() {
369389

370390
let (_chain, relayer, _) = build_chain(5);
371391
let peer_index: PeerIndex = 100.into();
392+
let rt = tokio::runtime::Builder::new_current_thread()
393+
.enable_all()
394+
.build()
395+
.unwrap();
372396

373397
let tx1 = TransactionBuilder::default().build();
374398
let tx2 = TransactionBuilder::default()
@@ -399,7 +423,8 @@ fn test_missing() {
399423
// tx3 should be in tx_pool already, but it's not.
400424
// so the reconstruct block will fail
401425
{
402-
let mut pending_compact_blocks = relayer.shared.state().pending_compact_blocks();
426+
let mut pending_compact_blocks =
427+
rt.block_on(relayer.shared.state().pending_compact_blocks());
403428
pending_compact_blocks.insert(
404429
compact_block.header().calc_header_hash(),
405430
(
@@ -425,7 +450,7 @@ fn test_missing() {
425450
peer_index,
426451
);
427452
assert_eq!(
428-
process.execute(),
453+
rt.block_on(process.execute()),
429454
StatusCode::CompactBlockRequiresFreshTransactions.into()
430455
);
431456

0 commit comments

Comments
 (0)