Skip to content

Commit aaed3ba

Browse files
authored
Merge pull request #3310 from autonomys/xdm_relayer
XDM: Cache and submit XDM messages from the relayer instead of submitting them every block until confirmation
2 parents e35cd7b + 2a96cfb commit aaed3ba

File tree

4 files changed

+287
-151
lines changed

4 files changed

+287
-151
lines changed

domains/client/cross-domain-message-gossip/src/aux_schema.rs

Lines changed: 66 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
//! Schema for channel update storage.
22
3-
use crate::message_listener::LOG_TARGET;
3+
use crate::RELAYER_PREFIX;
44
use parity_scale_codec::{Decode, Encode};
55
use sc_client_api::backend::AuxStore;
66
use sp_blockchain::{Error as ClientError, Info, Result as ClientResult};
7+
use sp_core::bytes::to_hex;
78
use sp_core::H256;
89
use sp_messenger::messages::{ChainId, ChannelId, ChannelState, Nonce};
910
use sp_messenger::{ChannelNonce, XdmId};
1011
use sp_runtime::traits::{Block as BlockT, NumberFor};
1112
use subspace_runtime_primitives::BlockNumber;
1213

1314
const CHANNEL_DETAIL: &[u8] = b"channel_detail";
15+
const LOG_TARGET: &str = "gossip_aux_schema";
1416

1517
fn channel_detail_key(
1618
src_chain_id: ChainId,
@@ -58,35 +60,48 @@ pub struct ChannelDetail {
5860
/// Load the channel state of self_chain_id on chain_id.
5961
pub fn get_channel_state<Backend>(
6062
backend: &Backend,
61-
self_chain_id: ChainId,
62-
chain_id: ChainId,
63+
dst_chain_id: ChainId,
64+
src_chain_id: ChainId,
6365
channel_id: ChannelId,
6466
) -> ClientResult<Option<ChannelDetail>>
6567
where
6668
Backend: AuxStore,
6769
{
6870
load_decode(
6971
backend,
70-
channel_detail_key(chain_id, self_chain_id, channel_id).as_slice(),
72+
channel_detail_key(src_chain_id, dst_chain_id, channel_id).as_slice(),
7173
)
7274
}
7375

7476
/// Set the channel state of self_chain_id on chain_id.
7577
pub fn set_channel_state<Backend>(
7678
backend: &Backend,
77-
self_chain_id: ChainId,
78-
chain_id: ChainId,
79+
dst_chain_id: ChainId,
80+
src_chain_id: ChainId,
7981
channel_detail: ChannelDetail,
8082
) -> ClientResult<()>
8183
where
8284
Backend: AuxStore,
8385
{
8486
backend.insert_aux(
8587
&[(
86-
channel_detail_key(chain_id, self_chain_id, channel_detail.channel_id).as_slice(),
88+
channel_detail_key(src_chain_id, dst_chain_id, channel_detail.channel_id).as_slice(),
8789
channel_detail.encode().as_slice(),
8890
)],
8991
vec![],
92+
)?;
93+
94+
let channel_nonce = ChannelNonce {
95+
relay_msg_nonce: Some(channel_detail.next_inbox_nonce),
96+
relay_response_msg_nonce: channel_detail.latest_response_received_message_nonce,
97+
};
98+
let prefix = (RELAYER_PREFIX, src_chain_id).encode();
99+
cleanup_chain_channel_storages(
100+
backend,
101+
&prefix,
102+
src_chain_id,
103+
channel_detail.channel_id,
104+
channel_nonce,
90105
)
91106
}
92107

@@ -101,25 +116,36 @@ mod xdm_keys {
101116
const XDM_RELAY_RESPONSE: &[u8] = b"relay_msg_response";
102117
const XDM_LAST_CLEANUP_NONCE: &[u8] = b"xdm_last_cleanup_nonce";
103118

104-
pub(super) fn get_key_for_xdm_id(xdm_id: XdmId) -> Vec<u8> {
119+
pub(super) fn get_key_for_xdm_id(prefix: &[u8], xdm_id: XdmId) -> Vec<u8> {
105120
match xdm_id {
106-
XdmId::RelayMessage(id) => get_key_for_xdm_relay(id),
107-
XdmId::RelayResponseMessage(id) => get_key_for_xdm_relay_response(id),
121+
XdmId::RelayMessage(id) => get_key_for_xdm_relay(prefix, id),
122+
XdmId::RelayResponseMessage(id) => get_key_for_xdm_relay_response(prefix, id),
108123
}
109124
}
110125

111126
pub(super) fn get_key_for_last_cleanup_relay_nonce(
127+
prefix: &[u8],
112128
chain_id: ChainId,
113129
channel_id: ChannelId,
114130
) -> Vec<u8> {
115-
(XDM, XDM_RELAY, XDM_LAST_CLEANUP_NONCE, chain_id, channel_id).encode()
131+
(
132+
prefix,
133+
XDM,
134+
XDM_RELAY,
135+
XDM_LAST_CLEANUP_NONCE,
136+
chain_id,
137+
channel_id,
138+
)
139+
.encode()
116140
}
117141

118142
pub(super) fn get_key_for_last_cleanup_relay_response_nonce(
143+
prefix: &[u8],
119144
chain_id: ChainId,
120145
channel_id: ChannelId,
121146
) -> Vec<u8> {
122147
(
148+
prefix,
123149
XDM,
124150
XDM_RELAY_RESPONSE,
125151
XDM_LAST_CLEANUP_NONCE,
@@ -129,19 +155,19 @@ mod xdm_keys {
129155
.encode()
130156
}
131157

132-
pub(super) fn get_key_for_xdm_relay(id: MessageKey) -> Vec<u8> {
133-
(XDM, XDM_RELAY, id).encode()
158+
pub(super) fn get_key_for_xdm_relay(prefix: &[u8], id: MessageKey) -> Vec<u8> {
159+
(prefix, XDM, XDM_RELAY, id).encode()
134160
}
135161

136-
pub(super) fn get_key_for_xdm_relay_response(id: MessageKey) -> Vec<u8> {
137-
(XDM, XDM_RELAY_RESPONSE, id).encode()
162+
pub(super) fn get_key_for_xdm_relay_response(prefix: &[u8], id: MessageKey) -> Vec<u8> {
163+
(prefix, XDM, XDM_RELAY_RESPONSE, id).encode()
138164
}
139165
}
140166

141167
#[derive(Debug, Encode, Decode, Clone)]
142-
pub(super) struct BlockId<Block: BlockT> {
143-
pub(super) number: NumberFor<Block>,
144-
pub(super) hash: Block::Hash,
168+
pub struct BlockId<Block: BlockT> {
169+
pub number: NumberFor<Block>,
170+
pub hash: Block::Hash,
145171
}
146172

147173
impl<Block: BlockT> From<Info<Block>> for BlockId<Block> {
@@ -156,32 +182,38 @@ impl<Block: BlockT> From<Info<Block>> for BlockId<Block> {
156182
/// Store the given XDM ID as processed at given block.
157183
pub fn set_xdm_message_processed_at<Backend, Block>(
158184
backend: &Backend,
185+
prefix: &[u8],
159186
xdm_id: XdmId,
160187
block_id: BlockId<Block>,
161188
) -> ClientResult<()>
162189
where
163190
Backend: AuxStore,
164191
Block: BlockT,
165192
{
166-
let key = xdm_keys::get_key_for_xdm_id(xdm_id);
193+
let key = xdm_keys::get_key_for_xdm_id(prefix, xdm_id);
167194
backend.insert_aux(&[(key.as_slice(), block_id.encode().as_slice())], vec![])
168195
}
169196

170197
/// Returns the maybe last processed block number for given xdm.
171198
pub fn get_xdm_processed_block_number<Backend, Block>(
172199
backend: &Backend,
200+
prefix: &[u8],
173201
xdm_id: XdmId,
174202
) -> ClientResult<Option<BlockId<Block>>>
175203
where
176204
Backend: AuxStore,
177205
Block: BlockT,
178206
{
179-
load_decode(backend, xdm_keys::get_key_for_xdm_id(xdm_id).as_slice())
207+
load_decode(
208+
backend,
209+
xdm_keys::get_key_for_xdm_id(prefix, xdm_id).as_slice(),
210+
)
180211
}
181212

182213
/// Cleans up all the xdm storages until the latest nonces.
183214
pub fn cleanup_chain_channel_storages<Backend>(
184215
backend: &Backend,
216+
prefix: &[u8],
185217
chain_id: ChainId,
186218
channel_id: ChannelId,
187219
channel_nonce: ChannelNonce,
@@ -193,7 +225,7 @@ where
193225
let mut to_delete = vec![];
194226
if let Some(latest_relay_nonce) = channel_nonce.relay_msg_nonce {
195227
let last_cleanup_relay_nonce_key =
196-
xdm_keys::get_key_for_last_cleanup_relay_nonce(chain_id, channel_id);
228+
xdm_keys::get_key_for_last_cleanup_relay_nonce(prefix, chain_id, channel_id);
197229
let last_cleaned_up_nonce =
198230
load_decode::<_, Nonce>(backend, last_cleanup_relay_nonce_key.as_slice())?;
199231

@@ -204,17 +236,19 @@ where
204236

205237
tracing::debug!(
206238
target: LOG_TARGET,
207-
"Cleaning Relay xdm keys for {:?} channel: {:?} from: {:?} to: {:?}",
239+
"[{:?}]Cleaning Relay xdm keys for {:?} channel: {:?} from: {:?} to: {:?}",
240+
to_hex(prefix, false),
208241
chain_id,
209242
channel_id,
210243
from_nonce,
211244
latest_relay_nonce
212245
);
213246

214247
while from_nonce <= latest_relay_nonce {
215-
to_delete.push(xdm_keys::get_key_for_xdm_relay((
216-
chain_id, channel_id, from_nonce,
217-
)));
248+
to_delete.push(xdm_keys::get_key_for_xdm_relay(
249+
prefix,
250+
(chain_id, channel_id, from_nonce),
251+
));
218252
from_nonce = from_nonce.saturating_add(Nonce::one());
219253
}
220254

@@ -223,7 +257,7 @@ where
223257

224258
if let Some(latest_relay_response_nonce) = channel_nonce.relay_response_msg_nonce {
225259
let last_cleanup_relay_response_nonce_key =
226-
xdm_keys::get_key_for_last_cleanup_relay_response_nonce(chain_id, channel_id);
260+
xdm_keys::get_key_for_last_cleanup_relay_response_nonce(prefix, chain_id, channel_id);
227261
let last_cleaned_up_nonce =
228262
load_decode::<_, Nonce>(backend, last_cleanup_relay_response_nonce_key.as_slice())?;
229263

@@ -234,17 +268,19 @@ where
234268

235269
tracing::debug!(
236270
target: LOG_TARGET,
237-
"Cleaning Relay response xdm keys for {:?} channel: {:?} from: {:?} to: {:?}",
271+
"[{:?}]Cleaning Relay response xdm keys for {:?} channel: {:?} from: {:?} to: {:?}",
272+
to_hex(prefix, false),
238273
chain_id,
239274
channel_id,
240275
from_nonce,
241276
latest_relay_response_nonce
242277
);
243278

244279
while from_nonce <= latest_relay_response_nonce {
245-
to_delete.push(xdm_keys::get_key_for_xdm_relay_response((
246-
chain_id, channel_id, from_nonce,
247-
)));
280+
to_delete.push(xdm_keys::get_key_for_xdm_relay_response(
281+
prefix,
282+
(chain_id, channel_id, from_nonce),
283+
));
248284
from_nonce = from_nonce.saturating_add(Nonce::one());
249285
}
250286

domains/client/cross-domain-message-gossip/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@ mod aux_schema;
55
mod gossip_worker;
66
mod message_listener;
77

8-
pub use aux_schema::{get_channel_state, set_channel_state, ChannelDetail};
8+
pub use aux_schema::{
9+
get_channel_state, get_xdm_processed_block_number, set_channel_state,
10+
set_xdm_message_processed_at, BlockId, ChannelDetail,
11+
};
912
pub use gossip_worker::{
1013
xdm_gossip_peers_set_config, ChainMsg, ChainSink, ChannelUpdate, GossipWorker,
1114
GossipWorkerBuilder, Message, MessageData,
1215
};
13-
pub use message_listener::start_cross_chain_message_listener;
16+
pub use message_listener::{
17+
can_allow_xdm_submission, start_cross_chain_message_listener, RELAYER_PREFIX,
18+
};

domains/client/cross-domain-message-gossip/src/message_listener.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ use subspace_runtime_primitives::{Balance, BlockNumber};
3131
use thiserror::Error;
3232

3333
pub(crate) const LOG_TARGET: &str = "domain_message_listener";
34+
const TX_POOL_PREFIX: &[u8] = b"xdm_tx_pool_listener";
35+
pub const RELAYER_PREFIX: &[u8] = b"xdm_relayer";
36+
3437
/// Number of blocks an already submitted XDM is not accepted since last submission.
3538
const XDM_ACCEPT_BLOCK_LIMIT: u32 = 5;
3639

@@ -476,7 +479,7 @@ where
476479
Ok(())
477480
}
478481

479-
fn can_allow_xdm_submission<Client, Block>(
482+
pub fn can_allow_xdm_submission<Client, Block>(
480483
client: &Arc<Client>,
481484
xdm_id: XdmId,
482485
submitted_block_id: BlockId<Block>,
@@ -564,7 +567,7 @@ where
564567
runtime_api.channel_nonce(block_id.hash, src_chain_id, channel_id)?;
565568

566569
if let Some(submitted_block_id) =
567-
get_xdm_processed_block_number::<_, BlockOf<TxPool>>(&**client, xdm_id)?
570+
get_xdm_processed_block_number::<_, BlockOf<TxPool>>(&**client, TX_POOL_PREFIX, xdm_id)?
568571
&& !can_allow_xdm_submission(
569572
client,
570573
xdm_id,
@@ -614,11 +617,17 @@ where
614617
block_id
615618
);
616619

617-
set_xdm_message_processed_at(&**client, xdm_id, block_id)?;
620+
set_xdm_message_processed_at(&**client, TX_POOL_PREFIX, xdm_id, block_id)?;
618621
}
619622

620623
if let Some(channel_nonce) = maybe_channel_nonce {
621-
cleanup_chain_channel_storages(&**client, src_chain_id, channel_id, channel_nonce)?;
624+
cleanup_chain_channel_storages(
625+
&**client,
626+
TX_POOL_PREFIX,
627+
src_chain_id,
628+
channel_id,
629+
channel_nonce,
630+
)?;
622631
}
623632

624633
Ok(true)

0 commit comments

Comments
 (0)