Skip to content

Commit 74dfc5d

Browse files
committed
feat: sync use async send
1 parent a720082 commit 74dfc5d

File tree

13 files changed

+169
-108
lines changed

13 files changed

+169
-108
lines changed

network/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ pub use p2p::{
4141
builder::ServiceBuilder,
4242
bytes, multiaddr, runtime,
4343
secio::{self, PeerId, PublicKey},
44-
service::{ServiceControl, SessionType as RawSessionType, TargetProtocol, TargetSession},
44+
service::{
45+
ServiceAsyncControl, ServiceControl, SessionType as RawSessionType, TargetProtocol,
46+
TargetSession,
47+
},
4548
traits::ServiceProtocol,
4649
utils::{extract_peer_id, multiaddr_to_socketaddr},
4750
};

sync/src/relayer/compact_block_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ fn contextual_check(
269269
block_hash,
270270
peer
271271
);
272-
active_chain.send_getheaders_to_peer(nc.as_ref(), peer, (&tip).into());
272+
active_chain.send_getheaders_to_peer(nc, peer, (&tip).into());
273273
return StatusCode::CompactBlockRequiresParent.with_context(format!(
274274
"{} parent: {}",
275275
block_hash,

sync/src/relayer/get_block_proposal_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::relayer::{MAX_RELAY_TXS_BYTES_PER_BATCH, Relayer};
2-
use crate::utils::{send_message_to, send_message_to_async};
2+
use crate::utils::send_message_to_async;
33
use crate::{Status, StatusCode, attempt};
44
use ckb_logger::debug_target;
55
use ckb_network::{CKBProtocolContext, PeerIndex};

sync/src/relayer/get_block_transactions_process.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::relayer::{MAX_RELAY_TXS_NUM_PER_BATCH, Relayer};
2-
use crate::utils::send_message_to;
3-
use crate::{Status, StatusCode, attempt};
2+
use crate::utils::send_message_to_async;
3+
use crate::{Status, StatusCode};
44
use ckb_logger::debug_target;
55
use ckb_network::{CKBProtocolContext, PeerIndex};
66
use ckb_store::ChainStore;
@@ -10,15 +10,15 @@ use std::sync::Arc;
1010
pub struct GetBlockTransactionsProcess<'a> {
1111
message: packed::GetBlockTransactionsReader<'a>,
1212
relayer: &'a Relayer,
13-
nc: Arc<dyn CKBProtocolContext>,
13+
nc: Arc<dyn CKBProtocolContext + Sync>,
1414
peer: PeerIndex,
1515
}
1616

1717
impl<'a> GetBlockTransactionsProcess<'a> {
1818
pub fn new(
1919
message: packed::GetBlockTransactionsReader<'a>,
2020
relayer: &'a Relayer,
21-
nc: Arc<dyn CKBProtocolContext>,
21+
nc: Arc<dyn CKBProtocolContext + Sync>,
2222
peer: PeerIndex,
2323
) -> Self {
2424
GetBlockTransactionsProcess {
@@ -94,7 +94,13 @@ impl<'a> GetBlockTransactionsProcess<'a> {
9494
.build();
9595
let message = packed::RelayMessage::new_builder().set(content).build();
9696

97-
attempt!(send_message_to(self.nc.as_ref(), self.peer, &message));
97+
self.relayer
98+
.shared()
99+
.shared()
100+
.async_handle()
101+
.spawn(async move {
102+
let _ignore = send_message_to_async(&self.nc, self.peer, &message).await;
103+
});
98104
}
99105

100106
Status::ok()

sync/src/relayer/get_transactions_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::relayer::{MAX_RELAY_TXS_BYTES_PER_BATCH, MAX_RELAY_TXS_NUM_PER_BATCH, Relayer};
2-
use crate::utils::{send_message_to, send_message_to_async};
2+
use crate::utils::send_message_to_async;
33
use crate::{Status, StatusCode, attempt};
44
use ckb_logger::{debug_target, trace_target};
55
use ckb_network::{CKBProtocolContext, PeerIndex};

sync/src/synchronizer/get_blocks_process.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
use crate::synchronizer::Synchronizer;
2-
use crate::utils::send_message_to;
3-
use crate::{Status, StatusCode, attempt};
2+
use crate::utils::send_message_to_async;
3+
use crate::{Status, StatusCode};
44
use ckb_constant::sync::{INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN};
55
use ckb_logger::debug;
66
use ckb_network::{CKBProtocolContext, PeerIndex};
77
use ckb_shared::block_status::BlockStatus;
88
use ckb_types::{packed, prelude::*};
9-
use std::collections::HashSet;
9+
use std::{collections::HashSet, sync::Arc};
1010

1111
pub struct GetBlocksProcess<'a> {
1212
message: packed::GetBlocksReader<'a>,
1313
synchronizer: &'a Synchronizer,
14-
nc: &'a dyn CKBProtocolContext,
14+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
1515
peer: PeerIndex,
1616
}
1717

@@ -20,7 +20,7 @@ impl<'a> GetBlocksProcess<'a> {
2020
message: packed::GetBlocksReader<'a>,
2121
synchronizer: &'a Synchronizer,
2222
peer: PeerIndex,
23-
nc: &'a dyn CKBProtocolContext,
23+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
2424
) -> Self {
2525
GetBlocksProcess {
2626
peer,
@@ -75,7 +75,12 @@ impl<'a> GetBlocksProcess<'a> {
7575
let content = packed::SendBlock::new_builder().block(block.data()).build();
7676
let message = packed::SyncMessage::new_builder().set(content).build();
7777

78-
attempt!(send_message_to(self.nc, self.peer, &message));
78+
let nc = Arc::clone(self.nc);
79+
self.synchronizer
80+
.shared()
81+
.shared()
82+
.async_handle()
83+
.spawn(async move { send_message_to_async(&nc, self.peer, &message).await });
7984
} else {
8085
// TODO response not found
8186
// TODO add timeout check in synchronizer

sync/src/synchronizer/get_headers_process.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::synchronizer::Synchronizer;
2-
use crate::utils::{send_message, send_message_to};
3-
use crate::{Status, StatusCode, attempt};
2+
use crate::utils::{send_message_async, send_message_to_async};
3+
use crate::{Status, StatusCode};
44
use ckb_constant::sync::MAX_LOCATOR_SIZE;
55
use ckb_logger::{debug, info};
66
use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
@@ -9,20 +9,21 @@ use ckb_types::{
99
packed::{self, Byte32},
1010
prelude::*,
1111
};
12+
use std::sync::Arc;
1213

1314
pub struct GetHeadersProcess<'a> {
1415
message: packed::GetHeadersReader<'a>,
1516
synchronizer: &'a Synchronizer,
1617
peer: PeerIndex,
17-
nc: &'a dyn CKBProtocolContext,
18+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
1819
}
1920

2021
impl<'a> GetHeadersProcess<'a> {
2122
pub fn new(
2223
message: packed::GetHeadersReader<'a>,
2324
synchronizer: &'a Synchronizer,
2425
peer: PeerIndex,
25-
nc: &'a dyn CKBProtocolContext,
26+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
2627
) -> Self {
2728
GetHeadersProcess {
2829
message,
@@ -84,8 +85,12 @@ impl<'a> GetHeadersProcess<'a> {
8485
.headers(headers.into_iter().map(|x| x.data()).collect::<Vec<_>>())
8586
.build();
8687
let message = packed::SyncMessage::new_builder().set(content).build();
87-
88-
attempt!(send_message_to(self.nc, self.peer, &message));
88+
let nc = Arc::clone(self.nc);
89+
self.synchronizer
90+
.shared()
91+
.shared()
92+
.async_handle()
93+
.spawn(async move { send_message_to_async(&nc, self.peer, &message).await });
8994
} else {
9095
return StatusCode::GetHeadersMissCommonAncestors
9196
.with_context(format!("{block_locator_hashes:#x?}"));
@@ -96,11 +101,16 @@ impl<'a> GetHeadersProcess<'a> {
96101
fn send_in_ibd(&self) {
97102
let content = packed::InIBD::new_builder().build();
98103
let message = packed::SyncMessage::new_builder().set(content).build();
99-
let _ignore = send_message(
100-
SupportProtocols::Sync.protocol_id(),
101-
self.nc,
102-
self.peer,
103-
&message,
104-
);
104+
let nc = Arc::clone(self.nc);
105+
let peer = self.peer;
106+
self.synchronizer
107+
.shared()
108+
.shared()
109+
.async_handle()
110+
.spawn(async move {
111+
let _ignore =
112+
send_message_async(SupportProtocols::Sync.protocol_id(), &nc, peer, &message)
113+
.await;
114+
});
105115
}
106116
}

sync/src/synchronizer/headers_process.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ use ckb_traits::HeaderFieldsProvider;
1010
use ckb_types::{core, packed, prelude::*};
1111
use ckb_verification::{HeaderError, HeaderVerifier};
1212
use ckb_verification_traits::Verifier;
13+
use std::sync::Arc;
1314

1415
pub struct HeadersProcess<'a> {
1516
message: packed::SendHeadersReader<'a>,
1617
synchronizer: &'a Synchronizer,
1718
peer: PeerIndex,
18-
nc: &'a dyn CKBProtocolContext,
19+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
1920
active_chain: ActiveChain,
2021
}
2122

@@ -24,7 +25,7 @@ impl<'a> HeadersProcess<'a> {
2425
message: packed::SendHeadersReader<'a>,
2526
synchronizer: &'a Synchronizer,
2627
peer: PeerIndex,
27-
nc: &'a dyn CKBProtocolContext,
28+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
2829
) -> Self {
2930
let active_chain = synchronizer.shared.active_chain();
3031
HeadersProcess {
@@ -202,12 +203,16 @@ impl<'a> HeadersProcess<'a> {
202203
&& (!peer_flags.is_protect && !peer_flags.is_whitelist && peer_flags.is_outbound)
203204
{
204205
debug!("Disconnect an unprotected outbound peer ({})", self.peer);
205-
if let Err(err) = self
206-
.nc
207-
.disconnect(self.peer, "useless outbound peer in IBD")
208-
{
209-
return StatusCode::Network.with_context(format!("Disconnect error: {err:?}"));
210-
}
206+
let nc = Arc::clone(self.nc);
207+
self.synchronizer
208+
.shared()
209+
.shared()
210+
.async_handle()
211+
.spawn(async move {
212+
let _ignore = nc
213+
.async_disconnect(self.peer, "useless outbound peer in IBD")
214+
.await;
215+
});
211216
}
212217

213218
Status::ok()

sync/src/synchronizer/in_ibd_process.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,19 @@ use crate::synchronizer::Synchronizer;
22
use crate::{Status, StatusCode};
33
use ckb_logger::info;
44
use ckb_network::{CKBProtocolContext, PeerIndex};
5+
use std::sync::Arc;
56

67
pub struct InIBDProcess<'a> {
78
synchronizer: &'a Synchronizer,
89
peer: PeerIndex,
9-
nc: &'a dyn CKBProtocolContext,
10+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
1011
}
1112

1213
impl<'a> InIBDProcess<'a> {
1314
pub fn new(
1415
synchronizer: &'a Synchronizer,
1516
peer: PeerIndex,
16-
nc: &'a dyn CKBProtocolContext,
17+
nc: &'a Arc<dyn CKBProtocolContext + Sync>,
1718
) -> Self {
1819
InIBDProcess {
1920
nc,
@@ -22,7 +23,7 @@ impl<'a> InIBDProcess<'a> {
2223
}
2324
}
2425

25-
pub fn execute(self) -> Status {
26+
pub async fn execute(self) -> Status {
2627
info!("getheader with ibd peer {:?}", self.peer);
2728
if let Some(mut kv_pair) = self.synchronizer.peers().state.get_mut(&self.peer) {
2829
let state = kv_pair.value_mut();
@@ -34,7 +35,9 @@ impl<'a> InIBDProcess<'a> {
3435
if state.peer_flags.is_outbound {
3536
if state.peer_flags.is_whitelist {
3637
self.synchronizer.shared().state().suspend_sync(state);
37-
} else if let Err(err) = self.nc.disconnect(self.peer, "outbound in ibd") {
38+
} else if let Err(err) =
39+
self.nc.async_disconnect(self.peer, "outbound in ibd").await
40+
{
3841
return StatusCode::Network.with_context(format!("Disconnect error: {err:?}"));
3942
}
4043
} else {

0 commit comments

Comments
 (0)