diff --git a/crates/subspace-farmer/src/archiving.rs b/crates/subspace-farmer/src/archiving.rs index 4fc0d93694..18abc5c12c 100644 --- a/crates/subspace-farmer/src/archiving.rs +++ b/crates/subspace-farmer/src/archiving.rs @@ -1,5 +1,6 @@ use crate::object_mappings::ObjectMappings; use crate::rpc_client::RpcClient; +use futures::StreamExt; use subspace_archiving::archiver::ArchivedSegment; use subspace_core_primitives::objects::{GlobalObject, PieceObject, PieceObjectMapping}; use subspace_core_primitives::{FlatPieces, Sha256Hash}; @@ -129,7 +130,7 @@ impl Archiving { info!("Plotting stopped!"); break; } - result = archived_segments.recv() => { + result = archived_segments.next() => { match result { Some(archived_segment) => { let segment_index = archived_segment.root_block.segment_index(); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/bench_rpc_client.rs b/crates/subspace-farmer/src/bin/subspace-farmer/bench_rpc_client.rs index e333a11f1d..cb5b53666d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/bench_rpc_client.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/bench_rpc_client.rs @@ -1,11 +1,14 @@ use async_trait::async_trait; +use futures::channel::mpsc; +use futures::{SinkExt, Stream, StreamExt}; +use std::pin::Pin; use std::sync::Arc; use subspace_archiving::archiver::ArchivedSegment; use subspace_farmer::{RpcClient, RpcClientError as MockError}; use subspace_rpc_primitives::{ BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::Mutex; use tokio::task::JoinHandle; /// Client mock for benching purpose @@ -28,17 +31,18 @@ impl BenchRpcClient { metadata: FarmerMetadata, mut archived_segments_receiver: mpsc::Receiver, ) -> Self { - let (inner_archived_segments_sender, inner_archived_segments_receiver) = mpsc::channel(10); + let (mut inner_archived_segments_sender, inner_archived_segments_receiver) = + mpsc::channel(10); let (acknowledge_archived_segment_sender, mut acknowledge_archived_segment_receiver) = mpsc::channel(1); let segment_producer_handle = tokio::spawn({ async move { - while let Some(segment) = archived_segments_receiver.recv().await { + while let Some(segment) = archived_segments_receiver.next().await { if inner_archived_segments_sender.send(segment).await.is_err() { break; } - if acknowledge_archived_segment_receiver.recv().await.is_none() { + if acknowledge_archived_segment_receiver.next().await.is_none() { break; } } @@ -66,7 +70,9 @@ impl RpcClient for BenchRpcClient { Ok(self.inner.metadata.clone()) } - async fn subscribe_slot_info(&self) -> Result, MockError> { + async fn subscribe_slot_info( + &self, + ) -> Result + Send + 'static>>, MockError> { unreachable!("Unreachable, as we don't start farming for benchmarking") } @@ -77,7 +83,9 @@ impl RpcClient for BenchRpcClient { unreachable!("Unreachable, as we don't start farming for benchmarking") } - async fn subscribe_block_signing(&self) -> Result, MockError> { + async fn subscribe_block_signing( + &self, + ) -> Result + Send + 'static>>, MockError> { unreachable!("Unreachable, as we don't start farming for benchmarking") } @@ -90,11 +98,11 @@ impl RpcClient for BenchRpcClient { async fn subscribe_archived_segments( &self, - ) -> Result, MockError> { - let (sender, receiver) = mpsc::channel(10); + ) -> Result + Send + 'static>>, MockError> { + let (mut sender, receiver) = mpsc::channel(10); let archived_segments_receiver = self.inner.archived_segments_receiver.clone(); tokio::spawn(async move { - while let Some(archived_segment) = archived_segments_receiver.lock().await.recv().await + while let Some(archived_segment) = archived_segments_receiver.lock().await.next().await { if sender.send(archived_segment).await.is_err() { break; @@ -102,12 +110,13 @@ impl RpcClient for BenchRpcClient { } }); - Ok(receiver) + Ok(Box::pin(receiver)) } async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), MockError> { self.inner .acknowledge_archived_segment_sender + .clone() .send(segment_index) .await?; Ok(()) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/bench.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/bench.rs index ba59080e9f..3e8fae41b3 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/bench.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/bench.rs @@ -1,11 +1,11 @@ -use std::path::{Path, PathBuf}; -use std::{fmt, io}; - +use crate::bench_rpc_client::BenchRpcClient; +use crate::{utils, WriteToDisk}; use anyhow::anyhow; +use futures::channel::mpsc; +use futures::SinkExt; use rand::prelude::*; -use tempfile::TempDir; -use tracing::info; - +use std::path::{Path, PathBuf}; +use std::{fmt, io}; use subspace_archiving::archiver::ArchivedSegment; use subspace_core_primitives::objects::{PieceObject, PieceObjectMapping}; use subspace_core_primitives::{ @@ -15,10 +15,9 @@ use subspace_core_primitives::{ use subspace_farmer::multi_farming::{MultiFarming, Options as MultiFarmingOptions}; use subspace_farmer::{ObjectMappings, PieceOffset, Plot, PlotFile, RpcClient}; use subspace_rpc_primitives::FarmerMetadata; +use tempfile::TempDir; use tokio::time::Instant; - -use crate::bench_rpc_client::BenchRpcClient; -use crate::{utils, WriteToDisk}; +use tracing::info; pub struct BenchPlotMock { piece_count: u64, @@ -90,7 +89,7 @@ pub(crate) async fn bench( ) -> anyhow::Result<()> { utils::raise_fd_limit(); - let (archived_segments_sender, archived_segments_receiver) = tokio::sync::mpsc::channel(10); + let (mut archived_segments_sender, archived_segments_receiver) = mpsc::channel(10); let client = BenchRpcClient::new(BENCH_FARMER_METADATA, archived_segments_receiver); let base_directory = crate::utils::get_path(custom_path); diff --git a/crates/subspace-farmer/src/farming.rs b/crates/subspace-farmer/src/farming.rs index 400b6e8053..8a95ec851b 100644 --- a/crates/subspace-farmer/src/farming.rs +++ b/crates/subspace-farmer/src/farming.rs @@ -7,7 +7,7 @@ use crate::commitments::Commitments; use crate::identity::Identity; use crate::plot::Plot; use crate::rpc_client::RpcClient; -use futures::{future, future::Either}; +use futures::{future, future::Either, StreamExt}; use std::sync::mpsc; use std::time::Instant; use subspace_core_primitives::{LocalChallenge, PublicKey, Salt, Solution}; @@ -123,7 +123,7 @@ async fn subscribe_to_slot_info( let mut salts = Salts::default(); - while let Some(slot_info) = slot_info_notifications.recv().await { + while let Some(slot_info) = slot_info_notifications.next().await { debug!(?slot_info, "New slot"); update_commitments(plot, commitments, &mut salts, &slot_info); @@ -184,7 +184,7 @@ async fn subscribe_to_slot_info( if let Some(BlockSigningInfo { header_hash, public_key, - }) = block_signing_info_notifications.recv().await + }) = block_signing_info_notifications.next().await { // Multiple plots might have solved, only sign with correct one if identity.public_key().to_bytes() != public_key { diff --git a/crates/subspace-farmer/src/mock_rpc_client.rs b/crates/subspace-farmer/src/mock_rpc_client.rs index 7c6934fab9..2e02edc286 100644 --- a/crates/subspace-farmer/src/mock_rpc_client.rs +++ b/crates/subspace-farmer/src/mock_rpc_client.rs @@ -1,11 +1,14 @@ use crate::rpc_client::{Error as MockError, RpcClient}; use async_trait::async_trait; +use futures::channel::mpsc; +use futures::{SinkExt, Stream, StreamExt}; +use std::pin::Pin; use std::sync::Arc; use subspace_archiving::archiver::ArchivedSegment; use subspace_rpc_primitives::{ BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::Mutex; /// `MockRpc` wrapper. #[derive(Clone, Debug)] @@ -71,7 +74,12 @@ impl MockRpcClient { } pub(crate) async fn send_metadata(&self, metadata: FarmerMetadata) { - self.inner.metadata_sender.send(metadata).await.unwrap(); + self.inner + .metadata_sender + .clone() + .send(metadata) + .await + .unwrap(); } pub(crate) async fn send_slot_info(&self, slot_info: SlotInfo) { @@ -79,7 +87,7 @@ impl MockRpcClient { .slot_into_sender .lock() .await - .as_ref() + .as_mut() .unwrap() .send(slot_info) .await @@ -87,7 +95,7 @@ impl MockRpcClient { } pub(crate) async fn receive_solution(&self) -> Option { - self.inner.solution_receiver.lock().await.recv().await + self.inner.solution_receiver.lock().await.next().await } pub(crate) async fn drop_slot_sender(&self) { @@ -99,7 +107,7 @@ impl MockRpcClient { .archived_segments_sender .lock() .await - .as_ref() + .as_mut() .unwrap() .send(archived_segment) .await @@ -112,7 +120,7 @@ impl MockRpcClient { acknowledge_archived_segment_receiver .lock() .await - .recv() + .next() .await; }); } @@ -130,19 +138,29 @@ impl MockRpcClient { #[async_trait] impl RpcClient for MockRpcClient { async fn farmer_metadata(&self) -> Result { - Ok(self.inner.metadata_receiver.lock().await.try_recv()?) + Ok(self + .inner + .metadata_receiver + .lock() + .await + .try_next()? + .unwrap()) } - async fn subscribe_slot_info(&self) -> Result, MockError> { - let (sender, receiver) = mpsc::channel(10); + async fn subscribe_slot_info( + &self, + ) -> Result + Send + 'static>>, MockError> { + let (mut sender, receiver) = mpsc::channel(10); let slot_receiver = self.inner.slot_info_receiver.clone(); tokio::spawn(async move { - while let Some(slot_info) = slot_receiver.lock().await.recv().await { - sender.send(slot_info).await.unwrap(); + while let Some(slot_info) = slot_receiver.lock().await.next().await { + if sender.send(slot_info).await.is_err() { + break; + } } }); - Ok(receiver) + Ok(Box::pin(receiver)) } async fn submit_solution_response( @@ -151,22 +169,27 @@ impl RpcClient for MockRpcClient { ) -> Result<(), MockError> { self.inner .solution_sender + .clone() .send(solution_response) .await .unwrap(); Ok(()) } - async fn subscribe_block_signing(&self) -> Result, MockError> { - let (sender, receiver) = mpsc::channel(10); + async fn subscribe_block_signing( + &self, + ) -> Result + Send + 'static>>, MockError> { + let (mut sender, receiver) = mpsc::channel(10); let block_signing_receiver = self.inner.block_signing_info_receiver.clone(); tokio::spawn(async move { - while let Some(block_signing_info) = block_signing_receiver.lock().await.recv().await { - sender.send(block_signing_info).await.unwrap(); + while let Some(block_signing_info) = block_signing_receiver.lock().await.next().await { + if sender.send(block_signing_info).await.is_err() { + break; + } } }); - Ok(receiver) + Ok(Box::pin(receiver)) } async fn submit_block_signature( @@ -175,6 +198,7 @@ impl RpcClient for MockRpcClient { ) -> Result<(), MockError> { self.inner .block_signature_sender + .clone() .send(block_signature) .await .unwrap(); @@ -183,22 +207,25 @@ impl RpcClient for MockRpcClient { async fn subscribe_archived_segments( &self, - ) -> Result, MockError> { - let (sender, receiver) = mpsc::channel(10); + ) -> Result + Send + 'static>>, MockError> { + let (mut sender, receiver) = mpsc::channel(10); let archived_segments_receiver = self.inner.archived_segments_receiver.clone(); tokio::spawn(async move { - while let Some(archived_segment) = archived_segments_receiver.lock().await.recv().await + while let Some(archived_segment) = archived_segments_receiver.lock().await.next().await { - sender.send(archived_segment).await.unwrap(); + if sender.send(archived_segment).await.is_err() { + break; + } } }); - Ok(receiver) + Ok(Box::pin(receiver)) } async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), MockError> { self.inner .acknowledge_archived_segment_sender + .clone() .send(segment_index) .await .unwrap(); diff --git a/crates/subspace-farmer/src/node_rpc_client.rs b/crates/subspace-farmer/src/node_rpc_client.rs index 912696e138..906d738fd6 100644 --- a/crates/subspace-farmer/src/node_rpc_client.rs +++ b/crates/subspace-farmer/src/node_rpc_client.rs @@ -1,15 +1,16 @@ use crate::rpc_client::{Error as RpcError, RpcClient}; use async_trait::async_trait; +use futures::{Stream, StreamExt}; use jsonrpsee::core::client::{ClientT, SubscriptionClientT}; use jsonrpsee::core::Error as JsonError; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; +use std::pin::Pin; use std::sync::Arc; use subspace_archiving::archiver::ArchivedSegment; use subspace_rpc_primitives::{ BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse, }; -use tokio::sync::mpsc; /// `WsClient` wrapper. #[derive(Clone, Debug)] @@ -34,8 +35,10 @@ impl RpcClient for NodeRpcClient { .await?) } - async fn subscribe_slot_info(&self) -> Result, RpcError> { - let mut subscription = self + async fn subscribe_slot_info( + &self, + ) -> Result + Send + 'static>>, RpcError> { + let subscription = self .client .subscribe( "subspace_subscribeSlotInfo", @@ -44,15 +47,9 @@ impl RpcClient for NodeRpcClient { ) .await?; - let (sender, receiver) = mpsc::channel(1); - - tokio::spawn(async move { - while let Some(Ok(notification)) = subscription.next().await { - let _ = sender.send(notification).await; - } - }); - - Ok(receiver) + Ok(Box::pin(subscription.filter_map( + |slot_info_result| async move { slot_info_result.ok() }, + ))) } async fn submit_solution_response( @@ -68,8 +65,10 @@ impl RpcClient for NodeRpcClient { .await?) } - async fn subscribe_block_signing(&self) -> Result, RpcError> { - let mut subscription = self + async fn subscribe_block_signing( + &self, + ) -> Result + Send + 'static>>, RpcError> { + let subscription = self .client .subscribe( "subspace_subscribeBlockSigning", @@ -78,15 +77,9 @@ impl RpcClient for NodeRpcClient { ) .await?; - let (sender, receiver) = mpsc::channel(1); - - tokio::spawn(async move { - while let Some(Ok(notification)) = subscription.next().await { - let _ = sender.send(notification).await; - } - }); - - Ok(receiver) + Ok(Box::pin(subscription.filter_map( + |block_signing_info_result| async move { block_signing_info_result.ok() }, + ))) } /// Submit a block signature @@ -105,8 +98,8 @@ impl RpcClient for NodeRpcClient { async fn subscribe_archived_segments( &self, - ) -> Result, RpcError> { - let mut subscription = self + ) -> Result + Send + 'static>>, RpcError> { + let subscription = self .client .subscribe( "subspace_subscribeArchivedSegment", @@ -115,15 +108,9 @@ impl RpcClient for NodeRpcClient { ) .await?; - let (sender, receiver) = mpsc::channel(1); - - tokio::spawn(async move { - while let Some(Ok(notification)) = subscription.next().await { - let _ = sender.send(notification).await; - } - }); - - Ok(receiver) + Ok(Box::pin(subscription.filter_map( + |archived_segment_result| async move { archived_segment_result.ok() }, + ))) } async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), RpcError> { diff --git a/crates/subspace-farmer/src/rpc_client.rs b/crates/subspace-farmer/src/rpc_client.rs index 36be9a6a4a..42c9435eff 100644 --- a/crates/subspace-farmer/src/rpc_client.rs +++ b/crates/subspace-farmer/src/rpc_client.rs @@ -1,9 +1,10 @@ use async_trait::async_trait; +use futures::Stream; +use std::pin::Pin; use subspace_archiving::archiver::ArchivedSegment; use subspace_rpc_primitives::{ BlockSignature, BlockSigningInfo, FarmerMetadata, SlotInfo, SolutionResponse, }; -use tokio::sync::mpsc; /// To become error type agnostic pub type Error = Box; @@ -15,7 +16,9 @@ pub trait RpcClient: Clone + Send + Sync + 'static { async fn farmer_metadata(&self) -> Result; /// Subscribe to slot - async fn subscribe_slot_info(&self) -> Result, Error>; + async fn subscribe_slot_info( + &self, + ) -> Result + Send + 'static>>, Error>; /// Submit a slot solution async fn submit_solution_response( @@ -24,13 +27,17 @@ pub trait RpcClient: Clone + Send + Sync + 'static { ) -> Result<(), Error>; /// Subscribe to block signing request - async fn subscribe_block_signing(&self) -> Result, Error>; + async fn subscribe_block_signing( + &self, + ) -> Result + Send + 'static>>, Error>; /// Submit a block signature async fn submit_block_signature(&self, block_signature: BlockSignature) -> Result<(), Error>; /// Subscribe to archived segments - async fn subscribe_archived_segments(&self) -> Result, Error>; + async fn subscribe_archived_segments( + &self, + ) -> Result + Send + 'static>>, Error>; /// Acknowledge receiving of archived segments async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), Error>;