Skip to content

Commit 61a11ea

Browse files
committed
feat: sync use async send
1 parent a720082 commit 61a11ea

28 files changed

+296
-196
lines changed

network/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ pub use p2p::{
4141
builder::ServiceBuilder,
4242
bytes, multiaddr, runtime,
4343
secio::{self, PeerId, PublicKey},
44-
service::{ServiceControl, SessionType as RawSessionType, TargetProtocol, TargetSession},
44+
service::{
45+
ServiceAsyncControl, ServiceControl, SessionType as RawSessionType, TargetProtocol,
46+
TargetSession,
47+
},
4548
traits::ServiceProtocol,
4649
utils::{extract_peer_id, multiaddr_to_socketaddr},
4750
};

sync/src/filter/get_block_filter_check_points_process.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use crate::Status;
12
use crate::filter::BlockFilter;
2-
use crate::utils::send_message_to;
3-
use crate::{Status, attempt};
3+
use crate::utils::send_message_to_async;
44
use ckb_network::{CKBProtocolContext, PeerIndex};
55
use ckb_types::core::BlockNumber;
66
use ckb_types::{packed, prelude::*};
@@ -12,15 +12,15 @@ const CHECK_POINT_INTERVAL: BlockNumber = 2000;
1212
pub struct GetBlockFilterCheckPointsProcess<'a> {
1313
message: packed::GetBlockFilterCheckPointsReader<'a>,
1414
filter: &'a BlockFilter,
15-
nc: Arc<dyn CKBProtocolContext>,
15+
nc: Arc<dyn CKBProtocolContext + Sync>,
1616
peer: PeerIndex,
1717
}
1818

1919
impl<'a> GetBlockFilterCheckPointsProcess<'a> {
2020
pub fn new(
2121
message: packed::GetBlockFilterCheckPointsReader<'a>,
2222
filter: &'a BlockFilter,
23-
nc: Arc<dyn CKBProtocolContext>,
23+
nc: Arc<dyn CKBProtocolContext + Sync>,
2424
peer: PeerIndex,
2525
) -> Self {
2626
Self {
@@ -31,7 +31,7 @@ impl<'a> GetBlockFilterCheckPointsProcess<'a> {
3131
}
3232
}
3333

34-
pub fn execute(self) -> Status {
34+
pub async fn execute(self) -> Status {
3535
let active_chain = self.filter.shared.active_chain();
3636
let start_number: BlockNumber = self.message.to_entity().start_number().into();
3737
let latest: BlockNumber = active_chain.get_latest_built_filter_block_number();
@@ -59,7 +59,7 @@ impl<'a> GetBlockFilterCheckPointsProcess<'a> {
5959
let message = packed::BlockFilterMessage::new_builder()
6060
.set(content)
6161
.build();
62-
attempt!(send_message_to(self.nc.as_ref(), self.peer, &message))
62+
send_message_to_async(&self.nc, self.peer, &message).await
6363
} else {
6464
Status::ignored()
6565
}

sync/src/filter/get_block_filter_hashes_process.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use crate::Status;
12
use crate::filter::BlockFilter;
2-
use crate::utils::send_message_to;
3-
use crate::{Status, attempt};
3+
use crate::utils::send_message_to_async;
44
use ckb_network::{CKBProtocolContext, PeerIndex};
55
use ckb_types::{core::BlockNumber, packed, prelude::*};
66
use std::sync::Arc;
@@ -10,15 +10,15 @@ const BATCH_SIZE: BlockNumber = 2000;
1010
pub struct GetBlockFilterHashesProcess<'a> {
1111
message: packed::GetBlockFilterHashesReader<'a>,
1212
filter: &'a BlockFilter,
13-
nc: Arc<dyn CKBProtocolContext>,
13+
nc: Arc<dyn CKBProtocolContext + Sync>,
1414
peer: PeerIndex,
1515
}
1616

1717
impl<'a> GetBlockFilterHashesProcess<'a> {
1818
pub fn new(
1919
message: packed::GetBlockFilterHashesReader<'a>,
2020
filter: &'a BlockFilter,
21-
nc: Arc<dyn CKBProtocolContext>,
21+
nc: Arc<dyn CKBProtocolContext + Sync>,
2222
peer: PeerIndex,
2323
) -> Self {
2424
Self {
@@ -29,7 +29,7 @@ impl<'a> GetBlockFilterHashesProcess<'a> {
2929
}
3030
}
3131

32-
pub fn execute(self) -> Status {
32+
pub async fn execute(self) -> Status {
3333
let active_chain = self.filter.shared.active_chain();
3434
let start_number: BlockNumber = self.message.to_entity().start_number().into();
3535
let latest: BlockNumber = active_chain.get_latest_built_filter_block_number();
@@ -68,7 +68,7 @@ impl<'a> GetBlockFilterHashesProcess<'a> {
6868
let message = packed::BlockFilterMessage::new_builder()
6969
.set(content)
7070
.build();
71-
attempt!(send_message_to(self.nc.as_ref(), self.peer, &message))
71+
send_message_to_async(&self.nc, self.peer, &message).await
7272
} else {
7373
Status::ignored()
7474
}

sync/src/filter/get_block_filters_process.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use crate::Status;
12
use crate::filter::BlockFilter;
2-
use crate::utils::send_message_to;
3-
use crate::{Status, attempt};
3+
use crate::utils::send_message_to_async;
44
use ckb_network::{CKBProtocolContext, PeerIndex};
55
use ckb_types::core::BlockNumber;
66
use ckb_types::{packed, prelude::*};
@@ -11,15 +11,15 @@ const BATCH_SIZE: BlockNumber = 1000;
1111
pub struct GetBlockFiltersProcess<'a> {
1212
message: packed::GetBlockFiltersReader<'a>,
1313
filter: &'a BlockFilter,
14-
nc: Arc<dyn CKBProtocolContext>,
14+
nc: Arc<dyn CKBProtocolContext + Sync>,
1515
peer: PeerIndex,
1616
}
1717

1818
impl<'a> GetBlockFiltersProcess<'a> {
1919
pub fn new(
2020
message: packed::GetBlockFiltersReader<'a>,
2121
filter: &'a BlockFilter,
22-
nc: Arc<dyn CKBProtocolContext>,
22+
nc: Arc<dyn CKBProtocolContext + Sync>,
2323
peer: PeerIndex,
2424
) -> Self {
2525
Self {
@@ -30,7 +30,7 @@ impl<'a> GetBlockFiltersProcess<'a> {
3030
}
3131
}
3232

33-
pub fn execute(self) -> Status {
33+
pub async fn execute(self) -> Status {
3434
let active_chain = self.filter.shared.active_chain();
3535
let start_number: BlockNumber = self.message.to_entity().start_number().into();
3636
let latest: BlockNumber = active_chain.get_latest_built_filter_block_number();
@@ -73,7 +73,7 @@ impl<'a> GetBlockFiltersProcess<'a> {
7373
let message = packed::BlockFilterMessage::new_builder()
7474
.set(content)
7575
.build();
76-
attempt!(send_message_to(self.nc.as_ref(), self.peer, &message))
76+
send_message_to_async(&self.nc, self.peer, &message).await
7777
} else {
7878
Status::ignored()
7979
}

sync/src/filter/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,21 +30,27 @@ impl BlockFilter {
3030
Self { shared }
3131
}
3232

33-
fn try_process(
33+
async fn try_process(
3434
&mut self,
3535
nc: Arc<dyn CKBProtocolContext + Sync>,
3636
peer: PeerIndex,
3737
message: packed::BlockFilterMessageUnionReader<'_>,
3838
) -> Status {
3939
match message {
4040
packed::BlockFilterMessageUnionReader::GetBlockFilters(msg) => {
41-
GetBlockFiltersProcess::new(msg, self, nc, peer).execute()
41+
GetBlockFiltersProcess::new(msg, self, nc, peer)
42+
.execute()
43+
.await
4244
}
4345
packed::BlockFilterMessageUnionReader::GetBlockFilterHashes(msg) => {
44-
GetBlockFilterHashesProcess::new(msg, self, nc, peer).execute()
46+
GetBlockFilterHashesProcess::new(msg, self, nc, peer)
47+
.execute()
48+
.await
4549
}
4650
packed::BlockFilterMessageUnionReader::GetBlockFilterCheckPoints(msg) => {
47-
GetBlockFilterCheckPointsProcess::new(msg, self, nc, peer).execute()
51+
GetBlockFilterCheckPointsProcess::new(msg, self, nc, peer)
52+
.execute()
53+
.await
4854
}
4955
packed::BlockFilterMessageUnionReader::BlockFilters(_)
5056
| packed::BlockFilterMessageUnionReader::BlockFilterHashes(_)
@@ -61,15 +67,15 @@ impl BlockFilter {
6167
}
6268
}
6369

64-
fn process(
70+
async fn process(
6571
&mut self,
6672
nc: Arc<dyn CKBProtocolContext + Sync>,
6773
peer: PeerIndex,
6874
message: packed::BlockFilterMessageUnionReader<'_>,
6975
) {
7076
let item_name = message.item_name();
7177
let item_bytes = message.as_slice().len() as u64;
72-
let status = self.try_process(Arc::clone(&nc), peer, message);
78+
let status = self.try_process(Arc::clone(&nc), peer, message).await;
7379

7480
metric_ckb_message_bytes(
7581
MetricDirection::In,
@@ -143,7 +149,7 @@ impl CKBProtocolHandler for BlockFilter {
143149
peer_index
144150
);
145151
let start_time = Instant::now();
146-
self.process(nc, peer_index, msg);
152+
self.process(nc, peer_index, msg).await;
147153
debug_target!(
148154
crate::LOG_TARGET_FILTER,
149155
"process message={}, peer={}, cost={:?}",

sync/src/net_time_checker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::utils::send_message_to;
1+
use crate::utils::send_message_to_async;
22
use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
33
use ckb_logger::{debug, info, warn};
44
use ckb_network::async_trait;
@@ -119,7 +119,7 @@ impl CKBProtocolHandler for NetTimeProtocol {
119119
if let Some(true) = nc.get_peer(peer_index).map(|peer| peer.is_inbound()) {
120120
let now = ckb_systemtime::unix_time_as_millis();
121121
let time = packed::Time::new_builder().timestamp(now).build();
122-
let _status = send_message_to(nc.as_ref(), peer_index, &time);
122+
let _status = send_message_to_async(&nc, peer_index, &time).await;
123123
}
124124
}
125125

sync/src/relayer/compact_block_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ fn contextual_check(
269269
block_hash,
270270
peer
271271
);
272-
active_chain.send_getheaders_to_peer(nc.as_ref(), peer, (&tip).into());
272+
active_chain.send_getheaders_to_peer(nc, peer, (&tip).into());
273273
return StatusCode::CompactBlockRequiresParent.with_context(format!(
274274
"{} parent: {}",
275275
block_hash,

sync/src/relayer/get_block_proposal_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::relayer::{MAX_RELAY_TXS_BYTES_PER_BATCH, Relayer};
2-
use crate::utils::{send_message_to, send_message_to_async};
2+
use crate::utils::send_message_to_async;
33
use crate::{Status, StatusCode, attempt};
44
use ckb_logger::debug_target;
55
use ckb_network::{CKBProtocolContext, PeerIndex};

sync/src/relayer/get_block_transactions_process.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::relayer::{MAX_RELAY_TXS_NUM_PER_BATCH, Relayer};
2-
use crate::utils::send_message_to;
3-
use crate::{Status, StatusCode, attempt};
2+
use crate::utils::send_message_to_async;
3+
use crate::{Status, StatusCode};
44
use ckb_logger::debug_target;
55
use ckb_network::{CKBProtocolContext, PeerIndex};
66
use ckb_store::ChainStore;
@@ -10,15 +10,15 @@ use std::sync::Arc;
1010
pub struct GetBlockTransactionsProcess<'a> {
1111
message: packed::GetBlockTransactionsReader<'a>,
1212
relayer: &'a Relayer,
13-
nc: Arc<dyn CKBProtocolContext>,
13+
nc: Arc<dyn CKBProtocolContext + Sync>,
1414
peer: PeerIndex,
1515
}
1616

1717
impl<'a> GetBlockTransactionsProcess<'a> {
1818
pub fn new(
1919
message: packed::GetBlockTransactionsReader<'a>,
2020
relayer: &'a Relayer,
21-
nc: Arc<dyn CKBProtocolContext>,
21+
nc: Arc<dyn CKBProtocolContext + Sync>,
2222
peer: PeerIndex,
2323
) -> Self {
2424
GetBlockTransactionsProcess {
@@ -30,7 +30,7 @@ impl<'a> GetBlockTransactionsProcess<'a> {
3030
}
3131

3232
#[allow(clippy::needless_collect)]
33-
pub fn execute(self) -> Status {
33+
pub async fn execute(self) -> Status {
3434
let shared = self.relayer.shared();
3535
{
3636
let get_block_transactions = self.message;
@@ -94,7 +94,7 @@ impl<'a> GetBlockTransactionsProcess<'a> {
9494
.build();
9595
let message = packed::RelayMessage::new_builder().set(content).build();
9696

97-
attempt!(send_message_to(self.nc.as_ref(), self.peer, &message));
97+
return send_message_to_async(&self.nc, self.peer, &message).await;
9898
}
9999

100100
Status::ok()

sync/src/relayer/get_transactions_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::relayer::{MAX_RELAY_TXS_BYTES_PER_BATCH, MAX_RELAY_TXS_NUM_PER_BATCH, Relayer};
2-
use crate::utils::{send_message_to, send_message_to_async};
2+
use crate::utils::send_message_to_async;
33
use crate::{Status, StatusCode, attempt};
44
use ckb_logger::{debug_target, trace_target};
55
use ckb_network::{CKBProtocolContext, PeerIndex};

0 commit comments

Comments
 (0)