diff --git a/lightclient/src/lib.rs b/lightclient/src/lib.rs index 24fd8c960f..8d2a4c58a2 100644 --- a/lightclient/src/lib.rs +++ b/lightclient/src/lib.rs @@ -59,6 +59,13 @@ pub enum LightClientRpcError { #[error("RPC Error: {0}.")] pub struct JsonRpcError(Box); +impl JsonRpcError { + /// Attempt to deserialize this error into some type. + pub fn try_deserialize<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Result { + serde_json::from_str(self.0.get()) + } +} + /// This represents a single light client connection to the network. Instantiate /// it with [`LightClient::relay_chain()`] to communicate with a relay chain, and /// then call [`LightClient::parachain()`] to establish connections to parachains. diff --git a/rpcs/Cargo.toml b/rpcs/Cargo.toml index 876b374106..047338a6e5 100644 --- a/rpcs/Cargo.toml +++ b/rpcs/Cargo.toml @@ -15,7 +15,7 @@ description = "Make RPC calls to Substrate based nodes" keywords = ["parity", "subxt", "rpcs"] [features] -default = ["jsonrpsee", "native"] +default = ["jsonrpsee", "native", "unstable-light-client"] subxt-core = ["dep:subxt-core"] jsonrpsee = ["dep:jsonrpsee", "dep:tokio-util"] diff --git a/rpcs/src/client/jsonrpsee_impl.rs b/rpcs/src/client/jsonrpsee_impl.rs index dd5e864d67..dc7d40dd33 100644 --- a/rpcs/src/client/jsonrpsee_impl.rs +++ b/rpcs/src/client/jsonrpsee_impl.rs @@ -7,7 +7,7 @@ use crate::Error; use futures::stream::{StreamExt, TryStreamExt}; use jsonrpsee::{ core::{ - client::{Client, ClientT, SubscriptionClientT, SubscriptionKind}, + client::{Error as JsonrpseeError, Client, ClientT, SubscriptionClientT, SubscriptionKind}, traits::ToRpcParams, }, types::SubscriptionId, @@ -31,7 +31,7 @@ impl RpcClientT for Client { Box::pin(async move { let res = ClientT::request(self, method, Params(params)) .await - .map_err(|e| Error::Client(Box::new(e)))?; + .map_err(error_to_rpc_error)?; Ok(res) }) } @@ -50,7 +50,7 @@ impl RpcClientT for Client { unsub, ) .await - .map_err(|e| Error::Client(Box::new(e)))?; + .map_err(error_to_rpc_error)?; let id = match stream.kind() { SubscriptionKind::Subscription(SubscriptionId::Str(id)) => { @@ -66,3 +66,21 @@ impl RpcClientT for Client { }) } } + +/// Convert a JsonrpseeError into the RPC error in this crate. +/// The main reason for this is to capture user errors so that +/// they can be represented/handled without casting. +fn error_to_rpc_error(error: JsonrpseeError) -> Error { + match error { + JsonrpseeError::Call(e) => { + Error::User(crate::UserError { + code: e.code(), + message: e.message().to_owned(), + data: e.data().map(|d| d.to_owned()) + }) + }, + e => { + Error::Client(Box::new(e)) + } + } +} \ No newline at end of file diff --git a/rpcs/src/client/lightclient_impl.rs b/rpcs/src/client/lightclient_impl.rs index b6a736450f..21ed7437af 100644 --- a/rpcs/src/client/lightclient_impl.rs +++ b/rpcs/src/client/lightclient_impl.rs @@ -3,7 +3,7 @@ // see LICENSE for license details. use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; -use crate::error::RpcError; +use crate::Error; use futures::stream::{StreamExt, TryStreamExt}; use serde_json::value::RawValue; use subxt_lightclient::{LightClientRpc, LightClientRpcError}; @@ -36,7 +36,7 @@ impl RpcClientT for LightClientRpc { let id = Some(sub.id().to_owned()); let stream = sub - .map_err(|e| RpcError::ClientError(Box::new(e))) + .map_err(|e| Error::Client(Box::new(e))) .boxed(); Ok(RawRpcSubscription { id, stream }) @@ -44,10 +44,19 @@ impl RpcClientT for LightClientRpc { } } -fn lc_err_to_rpc_err(err: LightClientRpcError) -> RpcError { +fn lc_err_to_rpc_err(err: LightClientRpcError) -> Error { match err { - LightClientRpcError::JsonRpcError(e) => RpcError::ClientError(Box::new(e)), - LightClientRpcError::SmoldotError(e) => RpcError::ClientError(Box::new(e)), - LightClientRpcError::BackgroundTaskDropped => RpcError::ClientError(Box::new("Smoldot background task was dropped")), + LightClientRpcError::JsonRpcError(e) => { + // If the error is a typical user error, report it as such, else + // just wrap the error into a ClientError. + let Ok(user_error) = e.try_deserialize() else { + return Error::Client(Box::::from(e)) + }; + Error::User(user_error) + }, + LightClientRpcError::SmoldotError(e) => Error::Client(Box::::from(e)), + LightClientRpcError::BackgroundTaskDropped => Error::Client(Box::::from("Smoldot background task was dropped")), } -} \ No newline at end of file +} + +type CoreError = dyn core::error::Error + Send + Sync + 'static; \ No newline at end of file diff --git a/rpcs/src/client/mock_rpc_client.rs b/rpcs/src/client/mock_rpc_client.rs new file mode 100644 index 0000000000..104cf37924 --- /dev/null +++ b/rpcs/src/client/mock_rpc_client.rs @@ -0,0 +1,148 @@ +// Copyright 2019-2025 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! This module exposes a [`MockRpcClient`], which is useful for testing. + +use super::{RpcClientT, RawRpcFuture, RawRpcSubscription}; +use crate::Error; +use core::future::Future; +use serde_json::value::RawValue; + +type MethodHandlerFn = Box>) -> RawRpcFuture<'static, Box> + Send + Sync + 'static>; +type SubscriptionHandlerFn = Box>, &str) -> RawRpcFuture<'static, RawRpcSubscription> + Send + Sync + 'static>; + +/// A mock RPC client that responds programmatically to requests. +/// Useful for testing. +pub struct MockRpcClient { + method_handler: MethodHandlerFn, + subscription_handler: SubscriptionHandlerFn +} + +impl MockRpcClient { + /// Create a [`MockRpcClient`] by providing a function to handle method calls + /// and a function to handle subscription calls. + pub fn from_handlers(method_handler: M, subscription_handler: S) -> MockRpcClient + where + M: IntoMethodHandler, + S: IntoSubscriptionHandler, + { + MockRpcClient { + method_handler: method_handler.into_method_handler(), + subscription_handler: subscription_handler.into_subscription_handler() + } + } +} + +impl RpcClientT for MockRpcClient { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + (self.method_handler)(method, params) + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + (self.subscription_handler)(sub, params, unsub) + } +} + +/// Return responses wrapped in this to have them serialized to JSON. +pub struct Json(T); + +/// Anything that can be converted into a valid handler response implements this. +pub trait IntoHandlerResponse { + /// Convert self into a handler response. + fn into_handler_response(self) -> Result, Error>; +} + +impl IntoHandlerResponse for Result { + fn into_handler_response(self) -> Result, Error> { + self.and_then(|val| serialize_to_raw_value(&val)) + } +} + +impl IntoHandlerResponse for Box { + fn into_handler_response(self) -> Result, Error> { + Ok(self) + } +} + +impl IntoHandlerResponse for serde_json::Value { + fn into_handler_response(self) -> Result, Error> { + serialize_to_raw_value(&self) + } +} + +impl IntoHandlerResponse for Json { + fn into_handler_response(self) -> Result, Error> { + serialize_to_raw_value(&self.0) + } +} + +fn serialize_to_raw_value(val: &T) -> Result, Error> { + let res = serde_json::to_string(val).map_err(Error::Deserialization)?; + let raw_value = RawValue::from_string(res).map_err(Error::Deserialization)?; + Ok(raw_value) +} + +/// Anything that is a valid method handler implements this trait. +pub trait IntoMethodHandler { + /// Convert self into a method handler function. + fn into_method_handler(self) -> MethodHandlerFn; +} + +enum SyncMethodHandler {} +impl IntoMethodHandler for F +where + F: Fn(&str, Option>) -> R + Send + Sync + 'static, + R: IntoHandlerResponse + Send + 'static, +{ + fn into_method_handler(self) -> MethodHandlerFn { + Box::new(move |method: &str, params: Option>| { + let res = self(method, params); + Box::pin(async move { res.into_handler_response() }) + }) + } +} + +enum AsyncMethodHandler {} +impl IntoMethodHandler for F +where + F: Fn(&str, Option>) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + R: IntoHandlerResponse + Send + 'static, +{ + fn into_method_handler(self) -> MethodHandlerFn { + Box::new(move |method: &str, params: Option>| { + let fut = self(method, params); + Box::pin(async move { fut.await.into_handler_response() }) + }) + } +} + +/// Anything that is a valid subscription handler implements this trait. +pub trait IntoSubscriptionHandler { + /// Convert self into a subscription handler function. + fn into_subscription_handler(self) -> SubscriptionHandlerFn; +} + +enum SyncSubscriptionHandler {} +impl IntoMethodHandler for F +where + F: Fn(&str, Option>) -> R + Send + Sync + 'static, + R: IntoHandlerResponse + Send + 'static, +{ + fn into_method_handler(self) -> MethodHandlerFn { + Box::new(move |method: &str, params: Option>| { + let res = self(method, params); + Box::pin(async move { res.into_handler_response() }) + }) + } +} \ No newline at end of file diff --git a/rpcs/src/client/mod.rs b/rpcs/src/client/mod.rs index c380499913..4f9ffe6a34 100644 --- a/rpcs/src/client/mod.rs +++ b/rpcs/src/client/mod.rs @@ -63,7 +63,7 @@ crate::macros::cfg_jsonrpsee! { crate::macros::cfg_unstable_light_client! { mod lightclient_impl; - pub use lightclient_impl::LightClientRpc as LightClientRpcClient; + pub use subxt_lightclient::LightClientRpc as LightClientRpcClient; } crate::macros::cfg_reconnecting_rpc_client! { @@ -71,6 +71,11 @@ crate::macros::cfg_reconnecting_rpc_client! { pub use reconnecting_rpc_client::RpcClient as ReconnectingRpcClient; } +#[cfg(test)] +pub mod mock_rpc_client; +#[cfg(test)] +pub use mock_rpc_client::MockRpcClient; + mod rpc_client; mod rpc_client_t; diff --git a/rpcs/src/client/reconnecting_rpc_client/mod.rs b/rpcs/src/client/reconnecting_rpc_client/mod.rs index 8986561f5c..c2c34b1aae 100644 --- a/rpcs/src/client/reconnecting_rpc_client/mod.rs +++ b/rpcs/src/client/reconnecting_rpc_client/mod.rs @@ -427,13 +427,7 @@ impl RpcClientT for RpcClient { async { self.request(method.to_string(), params) .await - .map_err(|e| match e { - Error::DisconnectedWillReconnect(e) => { - SubxtRpcError::DisconnectedWillReconnect(e.to_string()) - } - Error::Dropped => SubxtRpcError::Client(Box::new(e)), - Error::RpcError(e) => SubxtRpcError::Client(Box::new(e)), - }) + .map_err(error_to_rpc_error) } .boxed() } @@ -448,7 +442,7 @@ impl RpcClientT for RpcClient { let sub = self .subscribe(sub.to_string(), params, unsub.to_string()) .await - .map_err(|e| SubxtRpcError::Client(Box::new(e)))?; + .map_err(error_to_rpc_error)?; let id = match sub.id() { SubscriptionId::Num(n) => n.to_string(), @@ -471,6 +465,27 @@ impl RpcClientT for RpcClient { } } +/// Convert a reconnecting client Error into the RPC error in this crate. +/// The main reason for this is to capture user errors so that +/// they can be represented/handled without casting. +fn error_to_rpc_error(error: Error) -> SubxtRpcError { + match error { + Error::DisconnectedWillReconnect(reason) => { + SubxtRpcError::DisconnectedWillReconnect(reason.to_string()) + }, + Error::RpcError(RpcError::Call(e)) => { + SubxtRpcError::User(crate::UserError { + code: e.code(), + message: e.message().to_owned(), + data: e.data().map(|d| d.to_owned()) + }) + }, + e => { + SubxtRpcError::Client(Box::new(e)) + } + } +} + async fn background_task

( mut client: Arc, mut rx: UnboundedReceiver, diff --git a/rpcs/src/client/rpc_client_t.rs b/rpcs/src/client/rpc_client_t.rs index 66075fd688..697331ca43 100644 --- a/rpcs/src/client/rpc_client_t.rs +++ b/rpcs/src/client/rpc_client_t.rs @@ -54,7 +54,7 @@ pub trait RpcClientT: Send + Sync + 'static { } /// A boxed future that is returned from the [`RpcClientT`] methods. -pub type RawRpcFuture<'a, T, E = Error> = Pin> + Send + 'a>>; +pub type RawRpcFuture<'a, T> = Pin> + Send + 'a>>; /// The RPC subscription returned from [`RpcClientT`]'s `subscription` method. pub struct RawRpcSubscription { diff --git a/rpcs/src/lib.rs b/rpcs/src/lib.rs index 3aa287a3cd..a71cc6b0ac 100644 --- a/rpcs/src/lib.rs +++ b/rpcs/src/lib.rs @@ -60,6 +60,9 @@ mod impl_config { #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum Error { + /// An error which indicates a user fault. + #[error("User error: {0}")] + User(UserError), // Dev note: We need the error to be safely sent between threads // for `subscribe_to_block_headers_filling_in_gaps` and friends. /// An error coming from the underlying RPC Client. @@ -81,3 +84,40 @@ pub enum Error { #[error("RPC error: insecure URL: {0}")] InsecureUrl(String), } + +impl Error { + /// Is the error the `DisconnectedWillReconnect` variant? This should be true + /// only if the underlying `RpcClient` implementation was disconnected and is + /// automatically reconnecting behind the scenes. + pub fn is_disconnected_will_reconnect(&self) -> bool { + matches!(self, Error::DisconnectedWillReconnect(_)) + } +} + +/// This error tends to be returned when the user made an RPC call with +/// invalid parameters. Implementations of [`RpcClientT`] should turn any such +/// errors into this, so that they can be handled appropriately. By contrast, +/// [`Error::Client`] is emitted when the underlying RPC Client implementation +/// has some problem that isn't user specific (eg network issue or similar). +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct UserError { + /// Code + pub code: i32, + /// Message + pub message: String, + /// Optional data + pub data: Option>, +} + +impl core::fmt::Display for UserError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", &self.message, &self.code) + } +} + +#[cfg(test)] +mod test { + + +} \ No newline at end of file diff --git a/subxt/src/backend/chain_head/mod.rs b/subxt/src/backend/chain_head/mod.rs index 4f78d2882e..7349c0af42 100644 --- a/subxt/src/backend/chain_head/mod.rs +++ b/subxt/src/backend/chain_head/mod.rs @@ -362,7 +362,7 @@ impl Backend for ChainHeadBackend { let status = self.methods.chainhead_v1_body(&sub_id, at).await?; let operation_id = match status { MethodResponse::LimitReached => { - return Err(RpcError::request_rejected("limit reached").into()) + return Err(RpcError::LimitReached.into()) } MethodResponse::Started(s) => s.operation_id, }; @@ -722,7 +722,7 @@ impl Backend for ChainHeadBackend { .await?; let operation_id = match status { MethodResponse::LimitReached => { - return Err(RpcError::request_rejected("limit reached").into()) + return Err(RpcError::LimitReached.into()) } MethodResponse::Started(s) => s.operation_id, }; diff --git a/subxt/src/backend/chain_head/storage_items.rs b/subxt/src/backend/chain_head/storage_items.rs index 4194ceaf6a..f140cefffb 100644 --- a/subxt/src/backend/chain_head/storage_items.rs +++ b/subxt/src/backend/chain_head/storage_items.rs @@ -46,7 +46,7 @@ impl StorageItems { .await?; let operation_id: Arc = match status { MethodResponse::LimitReached => { - return Err(RpcError::request_rejected("limit reached").into()) + return Err(RpcError::LimitReached.into()) } MethodResponse::Started(s) => s.operation_id.into(), }; diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index ebe3a5c0ce..6af85962c6 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -992,6 +992,10 @@ mod test { }) } + fn limit_reached() -> MethodResponse { + MethodResponse::LimitReached + } + fn storage_done(id: &str) -> FollowEvent { FollowEvent::OperationStorageDone(OperationId { operation_id: id.to_owned(), @@ -1302,10 +1306,6 @@ mod test { let hash = random_hash(); let mock_data = vec![ - ( - "chainSpec_v1_genesisHash", - Message::Single(Err::(subxt_rpcs::Error::Client("Error".into()))), - ), ( "chainSpec_v1_genesisHash", Message::Single(Err(subxt_rpcs::Error::DisconnectedWillReconnect( @@ -1344,7 +1344,7 @@ mod test { ), ( "method_response", - Message::Single(Err(subxt_rpcs::Error::Client("stale id".into()))), + Message::Single(Ok(limit_reached())), ), ( "method_response", @@ -1384,7 +1384,7 @@ mod test { let rpc_params = jsonrpsee::types::Params::new(params.as_deref()); let key: String = rpc_params.sequence().next().unwrap(); if key == *"ID1" { - return Err(subxt_rpcs::Error::Client("stale id".into())); + return Message::Single(Ok(limit_reached())); } else { subscription_expired .swap(false, std::sync::atomic::Ordering::SeqCst); diff --git a/subxt/src/backend/utils.rs b/subxt/src/backend/utils.rs index b7b003c48e..47356f672e 100644 --- a/subxt/src/backend/utils.rs +++ b/subxt/src/backend/utils.rs @@ -118,14 +118,20 @@ where } // TODO: https://github.com/paritytech/subxt/issues/1567 - // This is a hack because if a reconnection occurs - // the order of pending calls is not guaranteed. + // This is a hack because, in the event of a disconnection, + // we may not get the correct subscription ID back on reconnecting. // - // Such that it's possible the a pending future completes - // before `chainHead_follow` is established with fresh - // subscription id. + // This is because we have a race between this future and the + // separate chainHead subscription, which runs in a different task. + // if this future is too quick, it'll be given back an old + // subscription ID from the chainHead subscription which has yet + // to reconnect and establish a new subscription ID. // - if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES { + // In the event of a wrong subscription Id being used, we happen to + // hand back an `RpcError::LimitReached`, and so can retry when we + // specifically hit that error to see if we get a new subscription ID + // eventually. + if e.is_rpc_limit_reached() && rejected_retries < REJECTED_MAX_RETRIES { rejected_retries += 1; continue; } diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index f2e10d46a6..1b1b418807 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -138,8 +138,8 @@ impl Error { } /// Checks whether the error was caused by a RPC request being rejected. - pub fn is_rejected(&self) -> bool { - matches!(self, Error::Rpc(RpcError::RequestRejected(_))) + pub fn is_rpc_limit_reached(&self) -> bool { + matches!(self, Error::Rpc(RpcError::LimitReached)) } } @@ -153,22 +153,15 @@ pub enum RpcError { /// Error related to the RPC client. #[error("RPC error: {0}")] ClientError(#[from] subxt_rpcs::Error), - /// This error signals that the request was rejected for some reason. - /// The specific reason is provided. - #[error("RPC error: request rejected: {0}")] - RequestRejected(String), + /// This error signals that we got back a [`subxt_rpcs::methods::chain_head::MethodResponse::LimitReached`], + /// which is not technically an RPC error but is treated as an error in our own APIs. + #[error("RPC error: limit reached")] + LimitReached, /// The RPC subscription dropped. #[error("RPC error: subscription dropped.")] SubscriptionDropped, } -impl RpcError { - /// Create a `RequestRejected` error from anything that can be turned into a string. - pub fn request_rejected>(s: S) -> RpcError { - RpcError::RequestRejected(s.into()) - } -} - /// Block error #[derive(Clone, Debug, thiserror::Error)] #[non_exhaustive]