Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 92633bb

Browse files
authored
chainHead: Add support for storage pagination and cancellation (#14755)
* chainHead/api: Add `chain_head_unstable_continue` method Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/subscriptions: Register operations for pagination Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/subscriptions: Merge limits with registered operation Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/subscriptions: Expose the operation state Signed-off-by: Alexandru Vasile <[email protected]> * chain_head/storage: Generate WaitingForContinue event Signed-off-by: Alexandru Vasile <[email protected]> * chainHead: Use the continue operation Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/tests: Adjust testing to the new storage interface Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/config: Make pagination limit configurable Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/tests: Adjust chainHeadConfig Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/tests: Check pagination and continue method Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/api: Add `chainHead_unstable_stopOperation` method Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/subscription: Add shared atomic state for efficient alloc Signed-off-by: Alexandru Vasile <[email protected]> * chainHead: Implement operation stop Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/tests: Check that storage ops can be cancelled Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/storage: Change docs for query_storage_iter_pagination Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/subscriptions: Fix merge conflicts Signed-off-by: Alexandru Vasile <[email protected]> * chainHead: Replace `async-channel` with `tokio::sync` Signed-off-by: Alexandru Vasile <[email protected]> * chainHead/subscription: Add comment about the sender/recv continue Signed-off-by: Alexandru Vasile <[email protected]> --------- Signed-off-by: Alexandru Vasile <[email protected]>
1 parent 6b07b97 commit 92633bb

File tree

6 files changed

+749
-152
lines changed

6 files changed

+749
-152
lines changed

client/rpc-spec-v2/src/chain_head/api.rs

+27
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,31 @@ pub trait ChainHeadApi<Hash> {
119119
/// This method is unstable and subject to change in the future.
120120
#[method(name = "chainHead_unstable_unpin", blocking)]
121121
fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>;
122+
123+
/// Resumes a storage fetch started with `chainHead_storage` after it has generated an
124+
/// `operationWaitingForContinue` event.
125+
///
126+
/// # Unstable
127+
///
128+
/// This method is unstable and subject to change in the future.
129+
#[method(name = "chainHead_unstable_continue", blocking)]
130+
fn chain_head_unstable_continue(
131+
&self,
132+
follow_subscription: String,
133+
operation_id: String,
134+
) -> RpcResult<()>;
135+
136+
/// Stops an operation started with chainHead_unstable_body, chainHead_unstable_call, or
137+
/// chainHead_unstable_storage. If the operation was still in progress, this interrupts it. If
138+
/// the operation was already finished, this call has no effect.
139+
///
140+
/// # Unstable
141+
///
142+
/// This method is unstable and subject to change in the future.
143+
#[method(name = "chainHead_unstable_stopOperation", blocking)]
144+
fn chain_head_unstable_stop_operation(
145+
&self,
146+
follow_subscription: String,
147+
operation_id: String,
148+
) -> RpcResult<()>;
122149
}

client/rpc-spec-v2/src/chain_head/chain_head.rs

+63-19
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ pub struct ChainHeadConfig {
6161
pub subscription_max_pinned_duration: Duration,
6262
/// The maximum number of ongoing operations per subscription.
6363
pub subscription_max_ongoing_operations: usize,
64+
/// The maximum number of items reported by the `chainHead_storage` before
65+
/// pagination is required.
66+
pub operation_max_storage_items: usize,
6467
}
6568

6669
/// Maximum pinned blocks across all connections.
@@ -78,12 +81,17 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);
7881
/// Note: The lower limit imposed by the spec is 16.
7982
const MAX_ONGOING_OPERATIONS: usize = 16;
8083

84+
/// The maximum number of items the `chainHead_storage` can return
85+
/// before paginations is required.
86+
const MAX_STORAGE_ITER_ITEMS: usize = 5;
87+
8188
impl Default for ChainHeadConfig {
8289
fn default() -> Self {
8390
ChainHeadConfig {
8491
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
8592
subscription_max_pinned_duration: MAX_PINNED_DURATION,
8693
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
94+
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
8795
}
8896
}
8997
}
@@ -100,6 +108,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
100108
subscriptions: Arc<SubscriptionManagement<Block, BE>>,
101109
/// The hexadecimal encoded hash of the genesis block.
102110
genesis_hash: String,
111+
/// The maximum number of items reported by the `chainHead_storage` before
112+
/// pagination is required.
113+
operation_max_storage_items: usize,
103114
/// Phantom member to pin the block type.
104115
_phantom: PhantomData<Block>,
105116
}
@@ -124,6 +135,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
124135
config.subscription_max_ongoing_operations,
125136
backend,
126137
)),
138+
operation_max_storage_items: config.operation_max_storage_items,
127139
genesis_hash,
128140
_phantom: PhantomData,
129141
}
@@ -232,7 +244,7 @@ where
232244
follow_subscription: String,
233245
hash: Block::Hash,
234246
) -> RpcResult<MethodResponse> {
235-
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
247+
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
236248
Ok(block) => block,
237249
Err(SubscriptionManagementError::SubscriptionAbsent) |
238250
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
@@ -243,6 +255,8 @@ where
243255
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
244256
};
245257

258+
let operation_id = block_guard.operation().operation_id();
259+
246260
let event = match self.client.block(hash) {
247261
Ok(Some(signed_block)) => {
248262
let extrinsics = signed_block
@@ -252,7 +266,7 @@ where
252266
.map(|extrinsic| hex_string(&extrinsic.encode()))
253267
.collect();
254268
FollowEvent::<Block::Hash>::OperationBodyDone(OperationBodyDone {
255-
operation_id: block_guard.operation_id(),
269+
operation_id: operation_id.clone(),
256270
value: extrinsics,
257271
})
258272
},
@@ -268,16 +282,13 @@ where
268282
return Err(ChainHeadRpcError::InvalidBlock.into())
269283
},
270284
Err(error) => FollowEvent::<Block::Hash>::OperationError(OperationError {
271-
operation_id: block_guard.operation_id(),
285+
operation_id: operation_id.clone(),
272286
error: error.to_string(),
273287
}),
274288
};
275289

276290
let _ = block_guard.response_sender().unbounded_send(event);
277-
Ok(MethodResponse::Started(MethodResponseStarted {
278-
operation_id: block_guard.operation_id(),
279-
discarded_items: None,
280-
}))
291+
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
281292
}
282293

283294
fn chain_head_unstable_header(
@@ -337,7 +348,7 @@ where
337348
.transpose()?
338349
.map(ChildInfo::new_default_from_vec);
339350

340-
let block_guard =
351+
let mut block_guard =
341352
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
342353
Ok(block) => block,
343354
Err(SubscriptionManagementError::SubscriptionAbsent) |
@@ -349,17 +360,21 @@ where
349360
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
350361
};
351362

352-
let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
353-
let operation_id = block_guard.operation_id();
363+
let mut storage_client = ChainHeadStorage::<Client, Block, BE>::new(
364+
self.client.clone(),
365+
self.operation_max_storage_items,
366+
);
367+
let operation = block_guard.operation();
368+
let operation_id = operation.operation_id();
354369

355370
// The number of operations we are allowed to execute.
356-
let num_operations = block_guard.num_reserved();
371+
let num_operations = operation.num_reserved();
357372
let discarded = items.len().saturating_sub(num_operations);
358373
let mut items = items;
359374
items.truncate(num_operations);
360375

361376
let fut = async move {
362-
storage_client.generate_events(block_guard, hash, items, child_trie);
377+
storage_client.generate_events(block_guard, hash, items, child_trie).await;
363378
};
364379

365380
self.executor
@@ -379,7 +394,7 @@ where
379394
) -> RpcResult<MethodResponse> {
380395
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);
381396

382-
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
397+
let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
383398
Ok(block) => block,
384399
Err(SubscriptionManagementError::SubscriptionAbsent) |
385400
Err(SubscriptionManagementError::ExceededLimits) => {
@@ -401,28 +416,26 @@ where
401416
.into())
402417
}
403418

419+
let operation_id = block_guard.operation().operation_id();
404420
let event = self
405421
.client
406422
.executor()
407423
.call(hash, &function, &call_parameters, CallContext::Offchain)
408424
.map(|result| {
409425
FollowEvent::<Block::Hash>::OperationCallDone(OperationCallDone {
410-
operation_id: block_guard.operation_id(),
426+
operation_id: operation_id.clone(),
411427
output: hex_string(&result),
412428
})
413429
})
414430
.unwrap_or_else(|error| {
415431
FollowEvent::<Block::Hash>::OperationError(OperationError {
416-
operation_id: block_guard.operation_id(),
432+
operation_id: operation_id.clone(),
417433
error: error.to_string(),
418434
})
419435
});
420436

421437
let _ = block_guard.response_sender().unbounded_send(event);
422-
Ok(MethodResponse::Started(MethodResponseStarted {
423-
operation_id: block_guard.operation_id(),
424-
discarded_items: None,
425-
}))
438+
Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None }))
426439
}
427440

428441
fn chain_head_unstable_unpin(
@@ -443,4 +456,35 @@ where
443456
Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()),
444457
}
445458
}
459+
460+
fn chain_head_unstable_continue(
461+
&self,
462+
follow_subscription: String,
463+
operation_id: String,
464+
) -> RpcResult<()> {
465+
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
466+
return Ok(())
467+
};
468+
469+
if !operation.submit_continue() {
470+
// Continue called without generating a `WaitingForContinue` event.
471+
Err(ChainHeadRpcError::InvalidContinue.into())
472+
} else {
473+
Ok(())
474+
}
475+
}
476+
477+
fn chain_head_unstable_stop_operation(
478+
&self,
479+
follow_subscription: String,
480+
operation_id: String,
481+
) -> RpcResult<()> {
482+
let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else {
483+
return Ok(())
484+
};
485+
486+
operation.stop_operation();
487+
488+
Ok(())
489+
}
446490
}

0 commit comments

Comments
 (0)