From 85ce6577aa2dee7a9557af9b5ea56346870d60c0 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Mon, 16 Jan 2023 20:05:04 +0700 Subject: [PATCH 1/2] Refactor piece receiving method. - move piece receiving funtionality from farmer to networking crate to enable its reuse --- Cargo.lock | 4 + crates/sp-lightclient/Cargo.toml | 1 + crates/sp-lightclient/src/tests.rs | 3 +- crates/subspace-farmer-components/Cargo.toml | 1 + .../benches/utils/mod.rs | 2 +- .../src/plotting.rs | 13 +- .../subspace-farmer/src/single_disk_plot.rs | 15 +- .../src/single_disk_plot/piece_receiver.rs | 187 +--------------- crates/subspace-networking/Cargo.toml | 1 + crates/subspace-networking/src/lib.rs | 1 + crates/subspace-networking/src/utils.rs | 1 + .../src/utils/piece_receiver.rs | 202 ++++++++++++++++++ test/subspace-test-client/Cargo.toml | 1 + test/subspace-test-client/src/lib.rs | 3 +- 14 files changed, 239 insertions(+), 196 deletions(-) create mode 100644 crates/subspace-networking/src/utils/piece_receiver.rs diff --git a/Cargo.lock b/Cargo.lock index 8075c15798f..c97d917fca5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9023,6 +9023,7 @@ dependencies = [ "subspace-archiving", "subspace-core-primitives", "subspace-farmer-components", + "subspace-networking", "subspace-solving", "subspace-verification", ] @@ -9589,6 +9590,7 @@ dependencies = [ "static_assertions", "subspace-archiving", "subspace-core-primitives", + "subspace-networking", "subspace-solving", "subspace-verification", "thiserror", @@ -9632,6 +9634,7 @@ dependencies = [ "actix-web", "anyhow", "async-trait", + "backoff", "bytes", "bytesize", "chrono", @@ -9876,6 +9879,7 @@ dependencies = [ "subspace-archiving", "subspace-core-primitives", "subspace-farmer-components", + "subspace-networking", "subspace-runtime-primitives", "subspace-service", "subspace-solving", diff --git a/crates/sp-lightclient/Cargo.toml b/crates/sp-lightclient/Cargo.toml index 2ae88722e95..79cdfa81334 100644 --- a/crates/sp-lightclient/Cargo.toml +++ b/crates/sp-lightclient/Cargo.toml @@ -25,6 +25,7 @@ sp-consensus-subspace = { version = "0.1.0", path = "../sp-consensus-subspace", sp-runtime = { version = "7.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" } sp-std = { version = "5.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives", default-features = false } +subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-solving = { version = "0.1.0", path = "../subspace-solving", default-features = false } subspace-verification = { version = "0.1.0", path = "../subspace-verification", default-features = false } diff --git a/crates/sp-lightclient/src/tests.rs b/crates/sp-lightclient/src/tests.rs index bc2cba5e37e..d7c3a17771c 100644 --- a/crates/sp-lightclient/src/tests.rs +++ b/crates/sp-lightclient/src/tests.rs @@ -34,8 +34,9 @@ use subspace_core_primitives::{ Solution, SolutionRange, PLOT_SECTOR_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE, }; use subspace_farmer_components::farming::audit_sector; -use subspace_farmer_components::plotting::{plot_sector, PieceReceiver}; +use subspace_farmer_components::plotting::plot_sector; use subspace_farmer_components::{FarmerProtocolInfo, SectorMetadata}; +use subspace_networking::PieceReceiver; use subspace_solving::{derive_global_challenge, REWARD_SIGNING_CONTEXT}; use subspace_verification::{derive_audit_chunk, derive_randomness}; diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index 7cd5b2d678a..e5ac448b50a 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -27,6 +27,7 @@ serde = { version = "1.0.147", features = ["derive"] } static_assertions = "1.1.0" subspace-solving = { version = "0.1.0", path = "../subspace-solving" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } +subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-verification = { version = "0.1.0", path = "../subspace-verification" } thiserror = "1.0.32" tracing = "0.1.37" diff --git a/crates/subspace-farmer-components/benches/utils/mod.rs b/crates/subspace-farmer-components/benches/utils/mod.rs index abbfd386c51..8ee9bb5c118 100644 --- a/crates/subspace-farmer-components/benches/utils/mod.rs +++ b/crates/subspace-farmer-components/benches/utils/mod.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use std::error::Error; use subspace_core_primitives::{Piece, PieceIndex}; -use subspace_farmer_components::plotting::PieceReceiver; +use subspace_networking::PieceReceiver; pub struct BenchPieceReceiver { piece: Piece, diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 56540d5da5c..30e2480906c 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -1,28 +1,19 @@ use crate::{FarmerProtocolInfo, SectorMetadata}; -use async_trait::async_trait; use futures::stream::FuturesOrdered; use futures::StreamExt; use parity_scale_codec::Encode; -use std::error::Error; use std::io; use std::sync::atomic::{AtomicBool, Ordering}; use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::{Commitment, Kzg}; use subspace_core_primitives::sector_codec::{SectorCodec, SectorCodecError}; use subspace_core_primitives::{ - Piece, PieceIndex, PublicKey, Scalar, SectorId, SectorIndex, PIECE_SIZE, PLOT_SECTOR_SIZE, + PieceIndex, PublicKey, Scalar, SectorId, SectorIndex, PIECE_SIZE, PLOT_SECTOR_SIZE, }; +use subspace_networking::PieceReceiver; use thiserror::Error; use tracing::{debug, info}; -#[async_trait] -pub trait PieceReceiver { - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result, Box>; -} - /// Information about sector that was plotted #[derive(Debug, Clone)] pub struct PlottedSector { diff --git a/crates/subspace-farmer/src/single_disk_plot.rs b/crates/subspace-farmer/src/single_disk_plot.rs index bb2d5d28ba5..84a09081aac 100644 --- a/crates/subspace-farmer/src/single_disk_plot.rs +++ b/crates/subspace-farmer/src/single_disk_plot.rs @@ -9,6 +9,7 @@ use crate::reward_signing::reward_signing; use crate::single_disk_plot::farming::audit_sector; use crate::single_disk_plot::piece_publisher::PieceSectorPublisher; use crate::single_disk_plot::piece_reader::{read_piece, PieceReader, ReadPieceRequest}; +use crate::single_disk_plot::piece_receiver::RecordsRootPieceValidator; use crate::single_disk_plot::plotting::{plot_sector, PlottedSector}; use crate::utils::JoinOnDrop; use bytesize::ByteSize; @@ -21,7 +22,6 @@ use lru::LruCache; use memmap2::{Mmap, MmapMut, MmapOptions}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; -use piece_receiver::MultiChannelPieceReceiver; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; use std::fs::OpenOptions; @@ -42,7 +42,7 @@ use subspace_core_primitives::{ }; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::{farming, plotting, SectorMetadata}; -use subspace_networking::Node; +use subspace_networking::{Node, PieceProvider}; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; use tokio::runtime::Handle; @@ -683,11 +683,14 @@ impl SingleDiskPlot { let records_roots_cache = Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE)); - let piece_receiver = MultiChannelPieceReceiver::new( + let piece_receiver = PieceProvider::new( &dsn_node, - &node_client, - &kzg, - &records_roots_cache, + Some(RecordsRootPieceValidator::new( + &dsn_node, + &node_client, + &kzg, + &records_roots_cache, + )), &shutting_down, ); diff --git a/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs b/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs index 0664a268a00..5edc0e3dcfd 100644 --- a/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs +++ b/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs @@ -1,149 +1,48 @@ use crate::NodeClient; use async_trait::async_trait; -use backoff::future::retry; -use backoff::ExponentialBackoff; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use lru::LruCache; use parking_lot::Mutex; -use std::error::Error; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; use subspace_archiving::archiver::is_piece_valid; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{ - Piece, PieceIndex, PieceIndexHash, RecordsRoot, SegmentIndex, PIECES_IN_SEGMENT, RECORD_SIZE, + Piece, PieceIndex, RecordsRoot, SegmentIndex, PIECES_IN_SEGMENT, RECORD_SIZE, }; -use subspace_farmer_components::plotting::PieceReceiver; -use subspace_networking::libp2p::multihash::Multihash; use subspace_networking::libp2p::PeerId; -use subspace_networking::utils::multihash::MultihashCode; -use subspace_networking::{Node, PieceByHashRequest, PieceByHashResponse, PieceKey, ToMultihash}; -use tokio::time::{sleep, timeout}; -use tracing::{debug, error, trace, warn}; +use subspace_networking::{Node, PieceValidator}; +use tracing::error; -/// Defines initial duration between get_piece calls. -const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1); -/// Defines max duration between get_piece calls. -const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5); -/// Delay for getting piece from cache before resorting to archival storage -const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(2); -/// Max time allocated for getting piece from DSN before attempt is considered to fail -const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5); - -// Defines target storage type for requets. -#[derive(Debug, Copy, Clone)] -enum StorageType { - // L2 piece cache - Cache, - // L1 archival storage for pieces - ArchivalStorage, -} - -impl From for MultihashCode { - fn from(storage_type: StorageType) -> Self { - match storage_type { - StorageType::Cache => MultihashCode::PieceIndex, - StorageType::ArchivalStorage => MultihashCode::Sector, - } - } -} - -// Temporary struct serving pieces from different providers using configuration arguments. -pub(crate) struct MultiChannelPieceReceiver<'a, NC> { +pub(crate) struct RecordsRootPieceValidator<'a, NC> { dsn_node: &'a Node, node_client: &'a NC, kzg: &'a Kzg, records_root_cache: &'a Mutex>, - cancelled: &'a AtomicBool, } -impl<'a, NC> MultiChannelPieceReceiver<'a, NC> -where - NC: NodeClient, -{ +impl<'a, NC> RecordsRootPieceValidator<'a, NC> { pub(crate) fn new( dsn_node: &'a Node, node_client: &'a NC, kzg: &'a Kzg, records_root_cache: &'a Mutex>, - cancelled: &'a AtomicBool, ) -> Self { Self { dsn_node, node_client, kzg, records_root_cache, - cancelled, - } - } - - fn check_cancellation(&self) -> Result<(), Box> { - if self.cancelled.load(Ordering::Acquire) { - debug!("Getting a piece was cancelled."); - - return Err("Getting a piece was cancelled.".into()); - } - - Ok(()) - } - - // Get from piece cache (L2) or archival storage (L1) - async fn get_piece_from_storage( - &self, - piece_index: PieceIndex, - storage_type: StorageType, - ) -> Option { - let piece_index_hash = PieceIndexHash::from_index(piece_index); - let key = piece_index_hash.to_multihash_by_code(storage_type.into()); - let piece_key = match storage_type { - StorageType::Cache => PieceKey::Cache(piece_index_hash), - StorageType::ArchivalStorage => PieceKey::ArchivalStorage(piece_index_hash), - }; - - let get_providers_result = self.dsn_node.get_providers(key).await; - - match get_providers_result { - Ok(mut get_providers_stream) => { - while let Some(provider_id) = get_providers_stream.next().await { - trace!(%piece_index, %provider_id, "get_providers returned an item"); - - let request_result = self - .dsn_node - .send_generic_request(provider_id, PieceByHashRequest { key: piece_key }) - .await; - - match request_result { - Ok(PieceByHashResponse { piece: Some(piece) }) => { - trace!(%provider_id, %piece_index, ?key, "Piece request succeeded."); - return self - .validate_piece(provider_id, piece_index, key, piece) - .await; - } - Ok(PieceByHashResponse { piece: None }) => { - debug!(%provider_id, %piece_index, ?key, "Piece request returned empty piece."); - } - Err(error) => { - warn!(%provider_id, %piece_index, ?key, ?error, "Piece request failed."); - } - } - } - } - Err(err) => { - warn!(%piece_index,?key, ?err, "get_providers returned an error"); - } } - - None } +} +#[async_trait] +impl<'a, NC> PieceValidator for RecordsRootPieceValidator<'a, NC> +where + NC: NodeClient, +{ async fn validate_piece( &self, source_peer_id: PeerId, piece_index: PieceIndex, - key: Multihash, piece: Piece, ) -> Option { if source_peer_id != self.dsn_node.id() { @@ -159,7 +58,6 @@ where Err(error) => { error!( %piece_index, - ?key, ?error, "Failed tor retrieve records root from node" ); @@ -172,7 +70,6 @@ where None => { error!( %piece_index, - ?key, %segment_index, "Records root for segment index wasn't found on node" ); @@ -199,7 +96,6 @@ where ) { error!( %piece_index, - ?key, %source_peer_id, "Received invalid piece from peer" ); @@ -213,64 +109,3 @@ where Some(piece) } } - -#[async_trait] -impl<'a, NC> PieceReceiver for MultiChannelPieceReceiver<'a, NC> -where - NC: NodeClient, -{ - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result, Box> { - trace!(%piece_index, "Piece request."); - - let backoff = ExponentialBackoff { - initial_interval: GET_PIECE_INITIAL_INTERVAL, - max_interval: GET_PIECE_MAX_INTERVAL, - // Try until we get a valid piece - max_elapsed_time: None, - ..ExponentialBackoff::default() - }; - - retry(backoff, || async { - self.check_cancellation() - .map_err(backoff::Error::Permanent)?; - - // Try to pull pieces in two ways, whichever is faster - let mut piece_attempts = [ - timeout( - GET_PIECE_TIMEOUT, - Box::pin(self.get_piece_from_storage(piece_index, StorageType::Cache)) - as Pin + Send>>, - ), - //TODO: verify "broken pipe" error cause - timeout( - GET_PIECE_TIMEOUT, - Box::pin(async { - // Prefer cache if it can return quickly, otherwise fall back to archival storage - sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await; - self.get_piece_from_storage(piece_index, StorageType::ArchivalStorage) - .await - }) as Pin + Send>>, - ), - ] - .into_iter() - .collect::>(); - - while let Some(maybe_piece) = piece_attempts.next().await { - if let Ok(Some(piece)) = maybe_piece { - trace!(%piece_index, "Got piece"); - return Ok(Some(piece)); - } - } - - warn!(%piece_index, "Couldn't get a piece from DSN. Retrying..."); - - Err(backoff::Error::transient( - "Couldn't get piece from DSN".into(), - )) - }) - .await - } -} diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index 543c33ae4bd..1a9fdb29017 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -19,6 +19,7 @@ include = [ actix-web = "4.2.1" anyhow = "1.0.66" async-trait = "0.1.58" +backoff = { version = "0.4.0", features = ["futures", "tokio"] } bytes = "1.2.1" bytesize = "1.1.0" chrono = {version = "0.4.21", features = ["clock", "serde", "std",]} diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 841bfb5a475..61ce0f2af95 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -60,5 +60,6 @@ pub use request_handlers::pieces_by_range::{ }; pub use utils::deconstruct_record_key; pub use utils::multihash::ToMultihash; +pub use utils::piece_receiver::{PieceProvider, PieceReceiver, PieceValidator}; pub use utils::prometheus::start_prometheus_metrics_server; pub use utils::record_binary_heap::RecordBinaryHeap; diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 2857a391afa..28ebc721009 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -1,4 +1,5 @@ pub mod multihash; +pub mod piece_receiver; pub(crate) mod prometheus; pub(crate) mod record_binary_heap; #[cfg(test)] diff --git a/crates/subspace-networking/src/utils/piece_receiver.rs b/crates/subspace-networking/src/utils/piece_receiver.rs new file mode 100644 index 00000000000..f6c728c5790 --- /dev/null +++ b/crates/subspace-networking/src/utils/piece_receiver.rs @@ -0,0 +1,202 @@ +use crate::utils::multihash::MultihashCode; +use crate::{Node, PieceByHashRequest, PieceByHashResponse, PieceKey, ToMultihash}; +use async_trait::async_trait; +use backoff::future::retry; +use backoff::ExponentialBackoff; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use libp2p::PeerId; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash}; +use tokio::time::{sleep, timeout}; +use tracing::{debug, trace, warn}; + +/// Defines initial duration between get_piece calls. +const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1); +/// Defines max duration between get_piece calls. +const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5); +/// Delay for getting piece from cache before resorting to archival storage +const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(2); +/// Max time allocated for getting piece from DSN before attempt is considered to fail +const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5); + +/// An abstraction for piece receiving. +#[async_trait] +pub trait PieceReceiver { + /// Returns optional piece from the DSN. None means - no piece was found. + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box>; +} + +// Defines target storage type for requets. +#[derive(Debug, Copy, Clone)] +enum StorageType { + // L2 piece cache + Cache, + // L1 archival storage for pieces + ArchivalStorage, +} + +impl From for MultihashCode { + fn from(storage_type: StorageType) -> Self { + match storage_type { + StorageType::Cache => MultihashCode::PieceIndex, + StorageType::ArchivalStorage => MultihashCode::Sector, + } + } +} + +#[async_trait] +pub trait PieceValidator: Sync + Send { + async fn validate_piece( + &self, + source_peer_id: PeerId, + piece_index: PieceIndex, + piece: Piece, + ) -> Option; +} + +/// Piece provider with cancellation and optional piece validator. +pub struct PieceProvider<'a, PV> { + dsn_node: &'a Node, + piece_validator: Option, + cancelled: &'a AtomicBool, +} + +impl<'a, PV: PieceValidator> PieceProvider<'a, PV> { + pub fn new(dsn_node: &'a Node, piece_validator: Option, cancelled: &'a AtomicBool) -> Self { + Self { + dsn_node, + piece_validator, + cancelled, + } + } + + fn check_cancellation(&self) -> Result<(), Box> { + if self.cancelled.load(Ordering::Acquire) { + debug!("Getting a piece was cancelled."); + + return Err("Getting a piece was cancelled.".into()); + } + + Ok(()) + } + + // Get from piece cache (L2) or archival storage (L1) + async fn get_piece_from_storage( + &self, + piece_index: PieceIndex, + storage_type: StorageType, + ) -> Option { + let piece_index_hash = PieceIndexHash::from_index(piece_index); + let key = piece_index_hash.to_multihash_by_code(storage_type.into()); + let piece_key = match storage_type { + StorageType::Cache => PieceKey::Cache(piece_index_hash), + StorageType::ArchivalStorage => PieceKey::ArchivalStorage(piece_index_hash), + }; + + let get_providers_result = self.dsn_node.get_providers(key).await; + + match get_providers_result { + Ok(mut get_providers_stream) => { + while let Some(provider_id) = get_providers_stream.next().await { + trace!(%piece_index, %provider_id, "get_providers returned an item"); + + let request_result = self + .dsn_node + .send_generic_request(provider_id, PieceByHashRequest { key: piece_key }) + .await; + + match request_result { + Ok(PieceByHashResponse { piece: Some(piece) }) => { + trace!(%provider_id, %piece_index, ?key, "Piece request succeeded."); + + if let Some(validator) = &self.piece_validator { + return validator + .validate_piece(provider_id, piece_index, piece) + .await; + } else { + return Some(piece); + } + } + Ok(PieceByHashResponse { piece: None }) => { + debug!(%provider_id, %piece_index, ?key, "Piece request returned empty piece."); + } + Err(error) => { + warn!(%provider_id, %piece_index, ?key, ?error, "Piece request failed."); + } + } + } + } + Err(err) => { + warn!(%piece_index,?key, ?err, "get_providers returned an error"); + } + } + + None + } +} + +#[async_trait] +impl<'a, PV: PieceValidator> PieceReceiver for PieceProvider<'a, PV> { + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box> { + trace!(%piece_index, "Piece request."); + + let backoff = ExponentialBackoff { + initial_interval: GET_PIECE_INITIAL_INTERVAL, + max_interval: GET_PIECE_MAX_INTERVAL, + // Try until we get a valid piece + max_elapsed_time: None, + ..ExponentialBackoff::default() + }; + + retry(backoff, || async { + self.check_cancellation() + .map_err(backoff::Error::Permanent)?; + + // Try to pull pieces in two ways, whichever is faster + let mut piece_attempts = [ + timeout( + GET_PIECE_TIMEOUT, + Box::pin(self.get_piece_from_storage(piece_index, StorageType::Cache)) + as Pin + Send>>, + ), + //TODO: verify "broken pipe" error cause + timeout( + GET_PIECE_TIMEOUT, + Box::pin(async { + // Prefer cache if it can return quickly, otherwise fall back to archival storage + sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await; + self.get_piece_from_storage(piece_index, StorageType::ArchivalStorage) + .await + }) as Pin + Send>>, + ), + ] + .into_iter() + .collect::>(); + + while let Some(maybe_piece) = piece_attempts.next().await { + if let Ok(Some(piece)) = maybe_piece { + trace!(%piece_index, "Got piece"); + return Ok(Some(piece)); + } + } + + warn!(%piece_index, "Couldn't get a piece from DSN. Retrying..."); + + Err(backoff::Error::transient( + "Couldn't get piece from DSN".into(), + )) + }) + .await + } +} diff --git a/test/subspace-test-client/Cargo.toml b/test/subspace-test-client/Cargo.toml index 67e57a7603c..92551ceb1c2 100644 --- a/test/subspace-test-client/Cargo.toml +++ b/test/subspace-test-client/Cargo.toml @@ -31,6 +31,7 @@ subspace-archiving = { path = "../../crates/subspace-archiving" } subspace-core-primitives = { path = "../../crates/subspace-core-primitives" } subspace-runtime-primitives = { path = "../../crates/subspace-runtime-primitives" } subspace-farmer-components = { path = "../../crates/subspace-farmer-components" } +subspace-networking = { version = "0.1.0", path = "../../crates/subspace-networking" } subspace-service = { path = "../../crates/subspace-service" } subspace-solving = { path = "../../crates/subspace-solving" } subspace-test-runtime = { version = "0.1.0", features = ["do-not-enforce-cost-of-storage"], path = "../subspace-test-runtime" } diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 2b4b8f46e85..b595b00e008 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -44,8 +44,9 @@ use subspace_core_primitives::{ RECORD_SIZE, }; use subspace_farmer_components::farming::audit_sector; -use subspace_farmer_components::plotting::{plot_sector, PieceReceiver}; +use subspace_farmer_components::plotting::plot_sector; use subspace_farmer_components::{FarmerProtocolInfo, SectorMetadata}; +use subspace_networking::PieceReceiver; use subspace_runtime_primitives::opaque::Block; use subspace_service::{FullClient, NewFull}; use subspace_solving::REWARD_SIGNING_CONTEXT; From 437e7cdf10dee0fde3f1fdae0d9d0d6f2fc9e6ed Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Tue, 17 Jan 2023 18:47:15 +0700 Subject: [PATCH 2/2] Fix review comments. --- Cargo.lock | 3 -- crates/sp-lightclient/Cargo.toml | 1 - crates/sp-lightclient/src/tests.rs | 3 +- crates/subspace-farmer-components/Cargo.toml | 1 - .../benches/utils/mod.rs | 2 +- .../src/plotting.rs | 15 ++++++++-- .../subspace-farmer/src/single_disk_plot.rs | 29 +++++++++++++++++-- .../src/utils/piece_receiver.rs | 2 +- test/subspace-test-client/Cargo.toml | 1 - test/subspace-test-client/src/lib.rs | 3 +- 10 files changed, 44 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c97d917fca5..94a95b32b89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9023,7 +9023,6 @@ dependencies = [ "subspace-archiving", "subspace-core-primitives", "subspace-farmer-components", - "subspace-networking", "subspace-solving", "subspace-verification", ] @@ -9590,7 +9589,6 @@ dependencies = [ "static_assertions", "subspace-archiving", "subspace-core-primitives", - "subspace-networking", "subspace-solving", "subspace-verification", "thiserror", @@ -9879,7 +9877,6 @@ dependencies = [ "subspace-archiving", "subspace-core-primitives", "subspace-farmer-components", - "subspace-networking", "subspace-runtime-primitives", "subspace-service", "subspace-solving", diff --git a/crates/sp-lightclient/Cargo.toml b/crates/sp-lightclient/Cargo.toml index 79cdfa81334..2ae88722e95 100644 --- a/crates/sp-lightclient/Cargo.toml +++ b/crates/sp-lightclient/Cargo.toml @@ -25,7 +25,6 @@ sp-consensus-subspace = { version = "0.1.0", path = "../sp-consensus-subspace", sp-runtime = { version = "7.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" } sp-std = { version = "5.0.0", default-features = false, git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives", default-features = false } -subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-solving = { version = "0.1.0", path = "../subspace-solving", default-features = false } subspace-verification = { version = "0.1.0", path = "../subspace-verification", default-features = false } diff --git a/crates/sp-lightclient/src/tests.rs b/crates/sp-lightclient/src/tests.rs index d7c3a17771c..bc2cba5e37e 100644 --- a/crates/sp-lightclient/src/tests.rs +++ b/crates/sp-lightclient/src/tests.rs @@ -34,9 +34,8 @@ use subspace_core_primitives::{ Solution, SolutionRange, PLOT_SECTOR_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE, }; use subspace_farmer_components::farming::audit_sector; -use subspace_farmer_components::plotting::plot_sector; +use subspace_farmer_components::plotting::{plot_sector, PieceReceiver}; use subspace_farmer_components::{FarmerProtocolInfo, SectorMetadata}; -use subspace_networking::PieceReceiver; use subspace_solving::{derive_global_challenge, REWARD_SIGNING_CONTEXT}; use subspace_verification::{derive_audit_chunk, derive_randomness}; diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index e5ac448b50a..7cd5b2d678a 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -27,7 +27,6 @@ serde = { version = "1.0.147", features = ["derive"] } static_assertions = "1.1.0" subspace-solving = { version = "0.1.0", path = "../subspace-solving" } subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } -subspace-networking = { version = "0.1.0", path = "../subspace-networking" } subspace-verification = { version = "0.1.0", path = "../subspace-verification" } thiserror = "1.0.32" tracing = "0.1.37" diff --git a/crates/subspace-farmer-components/benches/utils/mod.rs b/crates/subspace-farmer-components/benches/utils/mod.rs index 8ee9bb5c118..abbfd386c51 100644 --- a/crates/subspace-farmer-components/benches/utils/mod.rs +++ b/crates/subspace-farmer-components/benches/utils/mod.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use std::error::Error; use subspace_core_primitives::{Piece, PieceIndex}; -use subspace_networking::PieceReceiver; +use subspace_farmer_components::plotting::PieceReceiver; pub struct BenchPieceReceiver { piece: Piece, diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 30e2480906c..139ebd1d166 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -1,19 +1,30 @@ use crate::{FarmerProtocolInfo, SectorMetadata}; +use async_trait::async_trait; use futures::stream::FuturesOrdered; use futures::StreamExt; use parity_scale_codec::Encode; +use std::error::Error; use std::io; use std::sync::atomic::{AtomicBool, Ordering}; use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::{Commitment, Kzg}; use subspace_core_primitives::sector_codec::{SectorCodec, SectorCodecError}; use subspace_core_primitives::{ - PieceIndex, PublicKey, Scalar, SectorId, SectorIndex, PIECE_SIZE, PLOT_SECTOR_SIZE, + Piece, PieceIndex, PublicKey, Scalar, SectorId, SectorIndex, PIECE_SIZE, PLOT_SECTOR_SIZE, }; -use subspace_networking::PieceReceiver; use thiserror::Error; use tracing::{debug, info}; +/// Duplicate trait for the subspace_networking::PieceReceiver. The goal of this trait is +/// simplifying dependency graph. +#[async_trait] +pub trait PieceReceiver { + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box>; +} + /// Information about sector that was plotted #[derive(Debug, Clone)] pub struct PlottedSector { diff --git a/crates/subspace-farmer/src/single_disk_plot.rs b/crates/subspace-farmer/src/single_disk_plot.rs index 84a09081aac..a2de258d048 100644 --- a/crates/subspace-farmer/src/single_disk_plot.rs +++ b/crates/subspace-farmer/src/single_disk_plot.rs @@ -12,6 +12,7 @@ use crate::single_disk_plot::piece_reader::{read_piece, PieceReader, ReadPieceRe use crate::single_disk_plot::piece_receiver::RecordsRootPieceValidator; use crate::single_disk_plot::plotting::{plot_sector, PlottedSector}; use crate::utils::JoinOnDrop; +use async_trait::async_trait; use bytesize::ByteSize; use derive_more::{Display, From}; use event_listener_primitives::{Bag, HandlerId}; @@ -24,6 +25,7 @@ use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; +use std::error::Error; use std::fs::OpenOptions; use std::future::Future; use std::io::{Seek, SeekFrom}; @@ -38,7 +40,8 @@ use std_semaphore::{Semaphore, SemaphoreGuard}; use subspace_core_primitives::crypto::kzg::{test_public_parameters, Kzg}; use subspace_core_primitives::sector_codec::SectorCodec; use subspace_core_primitives::{ - PieceIndex, PublicKey, SectorId, SectorIndex, Solution, PIECES_IN_SECTOR, PLOT_SECTOR_SIZE, + Piece, PieceIndex, PublicKey, SectorId, SectorIndex, Solution, PIECES_IN_SECTOR, + PLOT_SECTOR_SIZE, }; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::{farming, plotting, SectorMetadata}; @@ -420,6 +423,26 @@ struct Handlers { solution: Handler, } +/// Adapter struct for the PieceReceiver trait for subspace-networking +/// and subspace-farmer-components crates. +struct PieceReceiverWrapper(PR); + +impl PieceReceiverWrapper { + fn new(piece_getter: PR) -> Self { + Self(piece_getter) + } +} + +#[async_trait] +impl plotting::PieceReceiver for PieceReceiverWrapper { + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box> { + self.0.get_piece(piece_index).await + } +} + /// Single disk plot abstraction is a container for everything necessary to plot/farm with a single /// disk plot. /// @@ -694,6 +717,8 @@ impl SingleDiskPlot { &shutting_down, ); + let piece_receiver_wrapper = PieceReceiverWrapper::new(piece_receiver); + // TODO: Concurrency for (sector_index, sector, sector_metadata) in plot_initial_sector { trace!(%sector_index, "Preparing to plot sector"); @@ -729,7 +754,7 @@ impl SingleDiskPlot { let plot_sector_fut = plot_sector( &public_key, sector_index, - &piece_receiver, + &piece_receiver_wrapper, &shutting_down, &farmer_app_info.protocol_info, &kzg, diff --git a/crates/subspace-networking/src/utils/piece_receiver.rs b/crates/subspace-networking/src/utils/piece_receiver.rs index f6c728c5790..5a1c246d8c4 100644 --- a/crates/subspace-networking/src/utils/piece_receiver.rs +++ b/crates/subspace-networking/src/utils/piece_receiver.rs @@ -26,7 +26,7 @@ const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5); /// An abstraction for piece receiving. #[async_trait] -pub trait PieceReceiver { +pub trait PieceReceiver: Send + Sync { /// Returns optional piece from the DSN. None means - no piece was found. async fn get_piece( &self, diff --git a/test/subspace-test-client/Cargo.toml b/test/subspace-test-client/Cargo.toml index 92551ceb1c2..67e57a7603c 100644 --- a/test/subspace-test-client/Cargo.toml +++ b/test/subspace-test-client/Cargo.toml @@ -31,7 +31,6 @@ subspace-archiving = { path = "../../crates/subspace-archiving" } subspace-core-primitives = { path = "../../crates/subspace-core-primitives" } subspace-runtime-primitives = { path = "../../crates/subspace-runtime-primitives" } subspace-farmer-components = { path = "../../crates/subspace-farmer-components" } -subspace-networking = { version = "0.1.0", path = "../../crates/subspace-networking" } subspace-service = { path = "../../crates/subspace-service" } subspace-solving = { path = "../../crates/subspace-solving" } subspace-test-runtime = { version = "0.1.0", features = ["do-not-enforce-cost-of-storage"], path = "../subspace-test-runtime" } diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index b595b00e008..2b4b8f46e85 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -44,9 +44,8 @@ use subspace_core_primitives::{ RECORD_SIZE, }; use subspace_farmer_components::farming::audit_sector; -use subspace_farmer_components::plotting::plot_sector; +use subspace_farmer_components::plotting::{plot_sector, PieceReceiver}; use subspace_farmer_components::{FarmerProtocolInfo, SectorMetadata}; -use subspace_networking::PieceReceiver; use subspace_runtime_primitives::opaque::Block; use subspace_service::{FullClient, NewFull}; use subspace_solving::REWARD_SIGNING_CONTEXT;