Skip to content

Commit

Permalink
WIP: Fix up errors and most tests. Start extracintg some tests/code t…
Browse files Browse the repository at this point in the history
…o rpc crate
  • Loading branch information
jsdw committed Jan 29, 2025
1 parent 4576c47 commit b0f9c9d
Show file tree
Hide file tree
Showing 14 changed files with 290 additions and 49 deletions.
7 changes: 7 additions & 0 deletions lightclient/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ pub enum LightClientRpcError {
#[error("RPC Error: {0}.")]
pub struct JsonRpcError(Box<RawValue>);

impl JsonRpcError {
/// Attempt to deserialize this error into some type.
pub fn try_deserialize<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Result<T, serde_json::Error> {
serde_json::from_str(self.0.get())
}
}

/// This represents a single light client connection to the network. Instantiate
/// it with [`LightClient::relay_chain()`] to communicate with a relay chain, and
/// then call [`LightClient::parachain()`] to establish connections to parachains.
Expand Down
2 changes: 1 addition & 1 deletion rpcs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ description = "Make RPC calls to Substrate based nodes"
keywords = ["parity", "subxt", "rpcs"]

[features]
default = ["jsonrpsee", "native"]
default = ["jsonrpsee", "native", "unstable-light-client"]

subxt-core = ["dep:subxt-core"]
jsonrpsee = ["dep:jsonrpsee", "dep:tokio-util"]
Expand Down
24 changes: 21 additions & 3 deletions rpcs/src/client/jsonrpsee_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::Error;
use futures::stream::{StreamExt, TryStreamExt};
use jsonrpsee::{
core::{
client::{Client, ClientT, SubscriptionClientT, SubscriptionKind},
client::{Error as JsonrpseeError, Client, ClientT, SubscriptionClientT, SubscriptionKind},
traits::ToRpcParams,
},
types::SubscriptionId,
Expand All @@ -31,7 +31,7 @@ impl RpcClientT for Client {
Box::pin(async move {
let res = ClientT::request(self, method, Params(params))
.await
.map_err(|e| Error::Client(Box::new(e)))?;
.map_err(error_to_rpc_error)?;
Ok(res)
})
}
Expand All @@ -50,7 +50,7 @@ impl RpcClientT for Client {
unsub,
)
.await
.map_err(|e| Error::Client(Box::new(e)))?;
.map_err(error_to_rpc_error)?;

let id = match stream.kind() {
SubscriptionKind::Subscription(SubscriptionId::Str(id)) => {
Expand All @@ -66,3 +66,21 @@ impl RpcClientT for Client {
})
}
}

/// Convert a JsonrpseeError into the RPC error in this crate.
/// The main reason for this is to capture user errors so that
/// they can be represented/handled without casting.
fn error_to_rpc_error(error: JsonrpseeError) -> Error {
match error {
JsonrpseeError::Call(e) => {
Error::User(crate::UserError {
code: e.code(),
message: e.message().to_owned(),
data: e.data().map(|d| d.to_owned())
})
},
e => {
Error::Client(Box::new(e))
}
}
}
23 changes: 16 additions & 7 deletions rpcs/src/client/lightclient_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// see LICENSE for license details.

use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use crate::Error;
use futures::stream::{StreamExt, TryStreamExt};
use serde_json::value::RawValue;
use subxt_lightclient::{LightClientRpc, LightClientRpcError};
Expand Down Expand Up @@ -36,18 +36,27 @@ impl RpcClientT for LightClientRpc {

let id = Some(sub.id().to_owned());
let stream = sub
.map_err(|e| RpcError::ClientError(Box::new(e)))
.map_err(|e| Error::Client(Box::new(e)))
.boxed();

Ok(RawRpcSubscription { id, stream })
})
}
}

fn lc_err_to_rpc_err(err: LightClientRpcError) -> RpcError {
fn lc_err_to_rpc_err(err: LightClientRpcError) -> Error {
match err {
LightClientRpcError::JsonRpcError(e) => RpcError::ClientError(Box::new(e)),
LightClientRpcError::SmoldotError(e) => RpcError::ClientError(Box::new(e)),
LightClientRpcError::BackgroundTaskDropped => RpcError::ClientError(Box::new("Smoldot background task was dropped")),
LightClientRpcError::JsonRpcError(e) => {
// If the error is a typical user error, report it as such, else
// just wrap the error into a ClientError.
let Ok(user_error) = e.try_deserialize() else {
return Error::Client(Box::<CoreError>::from(e))
};
Error::User(user_error)
},
LightClientRpcError::SmoldotError(e) => Error::Client(Box::<CoreError>::from(e)),
LightClientRpcError::BackgroundTaskDropped => Error::Client(Box::<CoreError>::from("Smoldot background task was dropped")),
}
}
}

type CoreError = dyn core::error::Error + Send + Sync + 'static;
148 changes: 148 additions & 0 deletions rpcs/src/client/mock_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

//! This module exposes a [`MockRpcClient`], which is useful for testing.
use super::{RpcClientT, RawRpcFuture, RawRpcSubscription};
use crate::Error;
use core::future::Future;
use serde_json::value::RawValue;

type MethodHandlerFn = Box<dyn Fn(&str, Option<Box<serde_json::value::RawValue>>) -> RawRpcFuture<'static, Box<RawValue>> + Send + Sync + 'static>;
type SubscriptionHandlerFn = Box<dyn Fn(&str, Option<Box<serde_json::value::RawValue>>, &str) -> RawRpcFuture<'static, RawRpcSubscription> + Send + Sync + 'static>;

/// A mock RPC client that responds programmatically to requests.
/// Useful for testing.
pub struct MockRpcClient {
method_handler: MethodHandlerFn,
subscription_handler: SubscriptionHandlerFn
}

impl MockRpcClient {
/// Create a [`MockRpcClient`] by providing a function to handle method calls
/// and a function to handle subscription calls.
pub fn from_handlers<M, S, MA, SA>(method_handler: M, subscription_handler: S) -> MockRpcClient
where
M: IntoMethodHandler<MA>,
S: IntoSubscriptionHandler<SA>,
{
MockRpcClient {
method_handler: method_handler.into_method_handler(),
subscription_handler: subscription_handler.into_subscription_handler()
}
}
}

impl RpcClientT for MockRpcClient {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<serde_json::value::RawValue>>,
) -> RawRpcFuture<'a, Box<serde_json::value::RawValue>> {
(self.method_handler)(method, params)
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<serde_json::value::RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
(self.subscription_handler)(sub, params, unsub)
}
}

/// Return responses wrapped in this to have them serialized to JSON.
pub struct Json<T>(T);

/// Anything that can be converted into a valid handler response implements this.
pub trait IntoHandlerResponse {
/// Convert self into a handler response.
fn into_handler_response(self) -> Result<Box<RawValue>, Error>;
}

impl <T: serde::Serialize> IntoHandlerResponse for Result<T, Error> {
fn into_handler_response(self) -> Result<Box<RawValue>, Error> {
self.and_then(|val| serialize_to_raw_value(&val))
}
}

impl IntoHandlerResponse for Box<RawValue> {
fn into_handler_response(self) -> Result<Box<RawValue>, Error> {
Ok(self)
}
}

impl IntoHandlerResponse for serde_json::Value {
fn into_handler_response(self) -> Result<Box<RawValue>, Error> {
serialize_to_raw_value(&self)
}
}

impl <T: serde::Serialize> IntoHandlerResponse for Json<T> {
fn into_handler_response(self) -> Result<Box<RawValue>, Error> {
serialize_to_raw_value(&self.0)
}
}

fn serialize_to_raw_value<T: serde::Serialize>(val: &T) -> Result<Box<RawValue>, Error> {
let res = serde_json::to_string(val).map_err(Error::Deserialization)?;
let raw_value = RawValue::from_string(res).map_err(Error::Deserialization)?;
Ok(raw_value)
}

/// Anything that is a valid method handler implements this trait.
pub trait IntoMethodHandler<A> {
/// Convert self into a method handler function.
fn into_method_handler(self) -> MethodHandlerFn;
}

enum SyncMethodHandler {}
impl <F, R> IntoMethodHandler<SyncMethodHandler> for F
where
F: Fn(&str, Option<Box<serde_json::value::RawValue>>) -> R + Send + Sync + 'static,
R: IntoHandlerResponse + Send + 'static,
{
fn into_method_handler(self) -> MethodHandlerFn {
Box::new(move |method: &str, params: Option<Box<serde_json::value::RawValue>>| {
let res = self(method, params);
Box::pin(async move { res.into_handler_response() })
})
}
}

enum AsyncMethodHandler {}
impl <F, Fut, R> IntoMethodHandler<AsyncMethodHandler> for F
where
F: Fn(&str, Option<Box<serde_json::value::RawValue>>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = R> + Send + 'static,
R: IntoHandlerResponse + Send + 'static,
{
fn into_method_handler(self) -> MethodHandlerFn {
Box::new(move |method: &str, params: Option<Box<serde_json::value::RawValue>>| {
let fut = self(method, params);
Box::pin(async move { fut.await.into_handler_response() })
})
}
}

/// Anything that is a valid subscription handler implements this trait.
pub trait IntoSubscriptionHandler<A> {
/// Convert self into a subscription handler function.
fn into_subscription_handler(self) -> SubscriptionHandlerFn;
}

enum SyncSubscriptionHandler {}
impl <F, R> IntoMethodHandler<SyncMethodHandler> for F
where
F: Fn(&str, Option<Box<serde_json::value::RawValue>>) -> R + Send + Sync + 'static,
R: IntoHandlerResponse + Send + 'static,
{
fn into_method_handler(self) -> MethodHandlerFn {
Box::new(move |method: &str, params: Option<Box<serde_json::value::RawValue>>| {
let res = self(method, params);
Box::pin(async move { res.into_handler_response() })
})
}
}
7 changes: 6 additions & 1 deletion rpcs/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ crate::macros::cfg_jsonrpsee! {

crate::macros::cfg_unstable_light_client! {
mod lightclient_impl;
pub use lightclient_impl::LightClientRpc as LightClientRpcClient;
pub use subxt_lightclient::LightClientRpc as LightClientRpcClient;
}

crate::macros::cfg_reconnecting_rpc_client! {
pub mod reconnecting_rpc_client;
pub use reconnecting_rpc_client::RpcClient as ReconnectingRpcClient;
}

#[cfg(test)]
pub mod mock_rpc_client;
#[cfg(test)]
pub use mock_rpc_client::MockRpcClient;

mod rpc_client;
mod rpc_client_t;

Expand Down
31 changes: 23 additions & 8 deletions rpcs/src/client/reconnecting_rpc_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,7 @@ impl RpcClientT for RpcClient {
async {
self.request(method.to_string(), params)
.await
.map_err(|e| match e {
Error::DisconnectedWillReconnect(e) => {
SubxtRpcError::DisconnectedWillReconnect(e.to_string())
}
Error::Dropped => SubxtRpcError::Client(Box::new(e)),
Error::RpcError(e) => SubxtRpcError::Client(Box::new(e)),
})
.map_err(error_to_rpc_error)
}
.boxed()
}
Expand All @@ -448,7 +442,7 @@ impl RpcClientT for RpcClient {
let sub = self
.subscribe(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| SubxtRpcError::Client(Box::new(e)))?;
.map_err(error_to_rpc_error)?;

let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
Expand All @@ -471,6 +465,27 @@ impl RpcClientT for RpcClient {
}
}

/// Convert a reconnecting client Error into the RPC error in this crate.
/// The main reason for this is to capture user errors so that
/// they can be represented/handled without casting.
fn error_to_rpc_error(error: Error) -> SubxtRpcError {
match error {
Error::DisconnectedWillReconnect(reason) => {
SubxtRpcError::DisconnectedWillReconnect(reason.to_string())
},
Error::RpcError(RpcError::Call(e)) => {
SubxtRpcError::User(crate::UserError {
code: e.code(),
message: e.message().to_owned(),
data: e.data().map(|d| d.to_owned())
})
},
e => {
SubxtRpcError::Client(Box::new(e))
}
}
}

async fn background_task<P>(
mut client: Arc<WsClient>,
mut rx: UnboundedReceiver<Op>,
Expand Down
2 changes: 1 addition & 1 deletion rpcs/src/client/rpc_client_t.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub trait RpcClientT: Send + Sync + 'static {
}

/// A boxed future that is returned from the [`RpcClientT`] methods.
pub type RawRpcFuture<'a, T, E = Error> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;
pub type RawRpcFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, Error>> + Send + 'a>>;

/// The RPC subscription returned from [`RpcClientT`]'s `subscription` method.
pub struct RawRpcSubscription {
Expand Down
Loading

0 comments on commit b0f9c9d

Please sign in to comment.