diff --git a/Cargo.lock b/Cargo.lock index 6514f703267..3a21a1e3bd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9632,6 +9632,7 @@ dependencies = [ "actix-web", "anyhow", "async-trait", + "backoff", "bytes", "bytesize", "chrono", diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 56540d5da5c..139ebd1d166 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -15,6 +15,8 @@ use subspace_core_primitives::{ use thiserror::Error; use tracing::{debug, info}; +/// Duplicate trait for the subspace_networking::PieceReceiver. The goal of this trait is +/// simplifying dependency graph. #[async_trait] pub trait PieceReceiver { async fn get_piece( diff --git a/crates/subspace-farmer/src/single_disk_plot.rs b/crates/subspace-farmer/src/single_disk_plot.rs index bb2d5d28ba5..a2de258d048 100644 --- a/crates/subspace-farmer/src/single_disk_plot.rs +++ b/crates/subspace-farmer/src/single_disk_plot.rs @@ -9,8 +9,10 @@ use crate::reward_signing::reward_signing; use crate::single_disk_plot::farming::audit_sector; use crate::single_disk_plot::piece_publisher::PieceSectorPublisher; use crate::single_disk_plot::piece_reader::{read_piece, PieceReader, ReadPieceRequest}; +use crate::single_disk_plot::piece_receiver::RecordsRootPieceValidator; use crate::single_disk_plot::plotting::{plot_sector, PlottedSector}; use crate::utils::JoinOnDrop; +use async_trait::async_trait; use bytesize::ByteSize; use derive_more::{Display, From}; use event_listener_primitives::{Bag, HandlerId}; @@ -21,9 +23,9 @@ use lru::LruCache; use memmap2::{Mmap, MmapMut, MmapOptions}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; -use piece_receiver::MultiChannelPieceReceiver; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; +use std::error::Error; use std::fs::OpenOptions; use std::future::Future; use std::io::{Seek, SeekFrom}; @@ -38,11 +40,12 @@ use std_semaphore::{Semaphore, SemaphoreGuard}; use subspace_core_primitives::crypto::kzg::{test_public_parameters, Kzg}; use subspace_core_primitives::sector_codec::SectorCodec; use subspace_core_primitives::{ - PieceIndex, PublicKey, SectorId, SectorIndex, Solution, PIECES_IN_SECTOR, PLOT_SECTOR_SIZE, + Piece, PieceIndex, PublicKey, SectorId, SectorIndex, Solution, PIECES_IN_SECTOR, + PLOT_SECTOR_SIZE, }; use subspace_farmer_components::file_ext::FileExt; use subspace_farmer_components::{farming, plotting, SectorMetadata}; -use subspace_networking::Node; +use subspace_networking::{Node, PieceProvider}; use subspace_rpc_primitives::{SlotInfo, SolutionResponse}; use thiserror::Error; use tokio::runtime::Handle; @@ -420,6 +423,26 @@ struct Handlers { solution: Handler, } +/// Adapter struct for the PieceReceiver trait for subspace-networking +/// and subspace-farmer-components crates. +struct PieceReceiverWrapper(PR); + +impl PieceReceiverWrapper { + fn new(piece_getter: PR) -> Self { + Self(piece_getter) + } +} + +#[async_trait] +impl plotting::PieceReceiver for PieceReceiverWrapper { + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box> { + self.0.get_piece(piece_index).await + } +} + /// Single disk plot abstraction is a container for everything necessary to plot/farm with a single /// disk plot. /// @@ -683,14 +706,19 @@ impl SingleDiskPlot { let records_roots_cache = Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE)); - let piece_receiver = MultiChannelPieceReceiver::new( + let piece_receiver = PieceProvider::new( &dsn_node, - &node_client, - &kzg, - &records_roots_cache, + Some(RecordsRootPieceValidator::new( + &dsn_node, + &node_client, + &kzg, + &records_roots_cache, + )), &shutting_down, ); + let piece_receiver_wrapper = PieceReceiverWrapper::new(piece_receiver); + // TODO: Concurrency for (sector_index, sector, sector_metadata) in plot_initial_sector { trace!(%sector_index, "Preparing to plot sector"); @@ -726,7 +754,7 @@ impl SingleDiskPlot { let plot_sector_fut = plot_sector( &public_key, sector_index, - &piece_receiver, + &piece_receiver_wrapper, &shutting_down, &farmer_app_info.protocol_info, &kzg, diff --git a/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs b/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs index 0664a268a00..5edc0e3dcfd 100644 --- a/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs +++ b/crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs @@ -1,149 +1,48 @@ use crate::NodeClient; use async_trait::async_trait; -use backoff::future::retry; -use backoff::ExponentialBackoff; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use lru::LruCache; use parking_lot::Mutex; -use std::error::Error; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; use subspace_archiving::archiver::is_piece_valid; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{ - Piece, PieceIndex, PieceIndexHash, RecordsRoot, SegmentIndex, PIECES_IN_SEGMENT, RECORD_SIZE, + Piece, PieceIndex, RecordsRoot, SegmentIndex, PIECES_IN_SEGMENT, RECORD_SIZE, }; -use subspace_farmer_components::plotting::PieceReceiver; -use subspace_networking::libp2p::multihash::Multihash; use subspace_networking::libp2p::PeerId; -use subspace_networking::utils::multihash::MultihashCode; -use subspace_networking::{Node, PieceByHashRequest, PieceByHashResponse, PieceKey, ToMultihash}; -use tokio::time::{sleep, timeout}; -use tracing::{debug, error, trace, warn}; +use subspace_networking::{Node, PieceValidator}; +use tracing::error; -/// Defines initial duration between get_piece calls. -const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1); -/// Defines max duration between get_piece calls. -const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5); -/// Delay for getting piece from cache before resorting to archival storage -const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(2); -/// Max time allocated for getting piece from DSN before attempt is considered to fail -const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5); - -// Defines target storage type for requets. -#[derive(Debug, Copy, Clone)] -enum StorageType { - // L2 piece cache - Cache, - // L1 archival storage for pieces - ArchivalStorage, -} - -impl From for MultihashCode { - fn from(storage_type: StorageType) -> Self { - match storage_type { - StorageType::Cache => MultihashCode::PieceIndex, - StorageType::ArchivalStorage => MultihashCode::Sector, - } - } -} - -// Temporary struct serving pieces from different providers using configuration arguments. -pub(crate) struct MultiChannelPieceReceiver<'a, NC> { +pub(crate) struct RecordsRootPieceValidator<'a, NC> { dsn_node: &'a Node, node_client: &'a NC, kzg: &'a Kzg, records_root_cache: &'a Mutex>, - cancelled: &'a AtomicBool, } -impl<'a, NC> MultiChannelPieceReceiver<'a, NC> -where - NC: NodeClient, -{ +impl<'a, NC> RecordsRootPieceValidator<'a, NC> { pub(crate) fn new( dsn_node: &'a Node, node_client: &'a NC, kzg: &'a Kzg, records_root_cache: &'a Mutex>, - cancelled: &'a AtomicBool, ) -> Self { Self { dsn_node, node_client, kzg, records_root_cache, - cancelled, - } - } - - fn check_cancellation(&self) -> Result<(), Box> { - if self.cancelled.load(Ordering::Acquire) { - debug!("Getting a piece was cancelled."); - - return Err("Getting a piece was cancelled.".into()); - } - - Ok(()) - } - - // Get from piece cache (L2) or archival storage (L1) - async fn get_piece_from_storage( - &self, - piece_index: PieceIndex, - storage_type: StorageType, - ) -> Option { - let piece_index_hash = PieceIndexHash::from_index(piece_index); - let key = piece_index_hash.to_multihash_by_code(storage_type.into()); - let piece_key = match storage_type { - StorageType::Cache => PieceKey::Cache(piece_index_hash), - StorageType::ArchivalStorage => PieceKey::ArchivalStorage(piece_index_hash), - }; - - let get_providers_result = self.dsn_node.get_providers(key).await; - - match get_providers_result { - Ok(mut get_providers_stream) => { - while let Some(provider_id) = get_providers_stream.next().await { - trace!(%piece_index, %provider_id, "get_providers returned an item"); - - let request_result = self - .dsn_node - .send_generic_request(provider_id, PieceByHashRequest { key: piece_key }) - .await; - - match request_result { - Ok(PieceByHashResponse { piece: Some(piece) }) => { - trace!(%provider_id, %piece_index, ?key, "Piece request succeeded."); - return self - .validate_piece(provider_id, piece_index, key, piece) - .await; - } - Ok(PieceByHashResponse { piece: None }) => { - debug!(%provider_id, %piece_index, ?key, "Piece request returned empty piece."); - } - Err(error) => { - warn!(%provider_id, %piece_index, ?key, ?error, "Piece request failed."); - } - } - } - } - Err(err) => { - warn!(%piece_index,?key, ?err, "get_providers returned an error"); - } } - - None } +} +#[async_trait] +impl<'a, NC> PieceValidator for RecordsRootPieceValidator<'a, NC> +where + NC: NodeClient, +{ async fn validate_piece( &self, source_peer_id: PeerId, piece_index: PieceIndex, - key: Multihash, piece: Piece, ) -> Option { if source_peer_id != self.dsn_node.id() { @@ -159,7 +58,6 @@ where Err(error) => { error!( %piece_index, - ?key, ?error, "Failed tor retrieve records root from node" ); @@ -172,7 +70,6 @@ where None => { error!( %piece_index, - ?key, %segment_index, "Records root for segment index wasn't found on node" ); @@ -199,7 +96,6 @@ where ) { error!( %piece_index, - ?key, %source_peer_id, "Received invalid piece from peer" ); @@ -213,64 +109,3 @@ where Some(piece) } } - -#[async_trait] -impl<'a, NC> PieceReceiver for MultiChannelPieceReceiver<'a, NC> -where - NC: NodeClient, -{ - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result, Box> { - trace!(%piece_index, "Piece request."); - - let backoff = ExponentialBackoff { - initial_interval: GET_PIECE_INITIAL_INTERVAL, - max_interval: GET_PIECE_MAX_INTERVAL, - // Try until we get a valid piece - max_elapsed_time: None, - ..ExponentialBackoff::default() - }; - - retry(backoff, || async { - self.check_cancellation() - .map_err(backoff::Error::Permanent)?; - - // Try to pull pieces in two ways, whichever is faster - let mut piece_attempts = [ - timeout( - GET_PIECE_TIMEOUT, - Box::pin(self.get_piece_from_storage(piece_index, StorageType::Cache)) - as Pin + Send>>, - ), - //TODO: verify "broken pipe" error cause - timeout( - GET_PIECE_TIMEOUT, - Box::pin(async { - // Prefer cache if it can return quickly, otherwise fall back to archival storage - sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await; - self.get_piece_from_storage(piece_index, StorageType::ArchivalStorage) - .await - }) as Pin + Send>>, - ), - ] - .into_iter() - .collect::>(); - - while let Some(maybe_piece) = piece_attempts.next().await { - if let Ok(Some(piece)) = maybe_piece { - trace!(%piece_index, "Got piece"); - return Ok(Some(piece)); - } - } - - warn!(%piece_index, "Couldn't get a piece from DSN. Retrying..."); - - Err(backoff::Error::transient( - "Couldn't get piece from DSN".into(), - )) - }) - .await - } -} diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index 543c33ae4bd..1a9fdb29017 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -19,6 +19,7 @@ include = [ actix-web = "4.2.1" anyhow = "1.0.66" async-trait = "0.1.58" +backoff = { version = "0.4.0", features = ["futures", "tokio"] } bytes = "1.2.1" bytesize = "1.1.0" chrono = {version = "0.4.21", features = ["clock", "serde", "std",]} diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 841bfb5a475..61ce0f2af95 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -60,5 +60,6 @@ pub use request_handlers::pieces_by_range::{ }; pub use utils::deconstruct_record_key; pub use utils::multihash::ToMultihash; +pub use utils::piece_receiver::{PieceProvider, PieceReceiver, PieceValidator}; pub use utils::prometheus::start_prometheus_metrics_server; pub use utils::record_binary_heap::RecordBinaryHeap; diff --git a/crates/subspace-networking/src/utils.rs b/crates/subspace-networking/src/utils.rs index 2857a391afa..28ebc721009 100644 --- a/crates/subspace-networking/src/utils.rs +++ b/crates/subspace-networking/src/utils.rs @@ -1,4 +1,5 @@ pub mod multihash; +pub mod piece_receiver; pub(crate) mod prometheus; pub(crate) mod record_binary_heap; #[cfg(test)] diff --git a/crates/subspace-networking/src/utils/piece_receiver.rs b/crates/subspace-networking/src/utils/piece_receiver.rs new file mode 100644 index 00000000000..5a1c246d8c4 --- /dev/null +++ b/crates/subspace-networking/src/utils/piece_receiver.rs @@ -0,0 +1,202 @@ +use crate::utils::multihash::MultihashCode; +use crate::{Node, PieceByHashRequest, PieceByHashResponse, PieceKey, ToMultihash}; +use async_trait::async_trait; +use backoff::future::retry; +use backoff::ExponentialBackoff; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use libp2p::PeerId; +use std::error::Error; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash}; +use tokio::time::{sleep, timeout}; +use tracing::{debug, trace, warn}; + +/// Defines initial duration between get_piece calls. +const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1); +/// Defines max duration between get_piece calls. +const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5); +/// Delay for getting piece from cache before resorting to archival storage +const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(2); +/// Max time allocated for getting piece from DSN before attempt is considered to fail +const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5); + +/// An abstraction for piece receiving. +#[async_trait] +pub trait PieceReceiver: Send + Sync { + /// Returns optional piece from the DSN. None means - no piece was found. + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box>; +} + +// Defines target storage type for requets. +#[derive(Debug, Copy, Clone)] +enum StorageType { + // L2 piece cache + Cache, + // L1 archival storage for pieces + ArchivalStorage, +} + +impl From for MultihashCode { + fn from(storage_type: StorageType) -> Self { + match storage_type { + StorageType::Cache => MultihashCode::PieceIndex, + StorageType::ArchivalStorage => MultihashCode::Sector, + } + } +} + +#[async_trait] +pub trait PieceValidator: Sync + Send { + async fn validate_piece( + &self, + source_peer_id: PeerId, + piece_index: PieceIndex, + piece: Piece, + ) -> Option; +} + +/// Piece provider with cancellation and optional piece validator. +pub struct PieceProvider<'a, PV> { + dsn_node: &'a Node, + piece_validator: Option, + cancelled: &'a AtomicBool, +} + +impl<'a, PV: PieceValidator> PieceProvider<'a, PV> { + pub fn new(dsn_node: &'a Node, piece_validator: Option, cancelled: &'a AtomicBool) -> Self { + Self { + dsn_node, + piece_validator, + cancelled, + } + } + + fn check_cancellation(&self) -> Result<(), Box> { + if self.cancelled.load(Ordering::Acquire) { + debug!("Getting a piece was cancelled."); + + return Err("Getting a piece was cancelled.".into()); + } + + Ok(()) + } + + // Get from piece cache (L2) or archival storage (L1) + async fn get_piece_from_storage( + &self, + piece_index: PieceIndex, + storage_type: StorageType, + ) -> Option { + let piece_index_hash = PieceIndexHash::from_index(piece_index); + let key = piece_index_hash.to_multihash_by_code(storage_type.into()); + let piece_key = match storage_type { + StorageType::Cache => PieceKey::Cache(piece_index_hash), + StorageType::ArchivalStorage => PieceKey::ArchivalStorage(piece_index_hash), + }; + + let get_providers_result = self.dsn_node.get_providers(key).await; + + match get_providers_result { + Ok(mut get_providers_stream) => { + while let Some(provider_id) = get_providers_stream.next().await { + trace!(%piece_index, %provider_id, "get_providers returned an item"); + + let request_result = self + .dsn_node + .send_generic_request(provider_id, PieceByHashRequest { key: piece_key }) + .await; + + match request_result { + Ok(PieceByHashResponse { piece: Some(piece) }) => { + trace!(%provider_id, %piece_index, ?key, "Piece request succeeded."); + + if let Some(validator) = &self.piece_validator { + return validator + .validate_piece(provider_id, piece_index, piece) + .await; + } else { + return Some(piece); + } + } + Ok(PieceByHashResponse { piece: None }) => { + debug!(%provider_id, %piece_index, ?key, "Piece request returned empty piece."); + } + Err(error) => { + warn!(%provider_id, %piece_index, ?key, ?error, "Piece request failed."); + } + } + } + } + Err(err) => { + warn!(%piece_index,?key, ?err, "get_providers returned an error"); + } + } + + None + } +} + +#[async_trait] +impl<'a, PV: PieceValidator> PieceReceiver for PieceProvider<'a, PV> { + async fn get_piece( + &self, + piece_index: PieceIndex, + ) -> Result, Box> { + trace!(%piece_index, "Piece request."); + + let backoff = ExponentialBackoff { + initial_interval: GET_PIECE_INITIAL_INTERVAL, + max_interval: GET_PIECE_MAX_INTERVAL, + // Try until we get a valid piece + max_elapsed_time: None, + ..ExponentialBackoff::default() + }; + + retry(backoff, || async { + self.check_cancellation() + .map_err(backoff::Error::Permanent)?; + + // Try to pull pieces in two ways, whichever is faster + let mut piece_attempts = [ + timeout( + GET_PIECE_TIMEOUT, + Box::pin(self.get_piece_from_storage(piece_index, StorageType::Cache)) + as Pin + Send>>, + ), + //TODO: verify "broken pipe" error cause + timeout( + GET_PIECE_TIMEOUT, + Box::pin(async { + // Prefer cache if it can return quickly, otherwise fall back to archival storage + sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await; + self.get_piece_from_storage(piece_index, StorageType::ArchivalStorage) + .await + }) as Pin + Send>>, + ), + ] + .into_iter() + .collect::>(); + + while let Some(maybe_piece) = piece_attempts.next().await { + if let Ok(Some(piece)) = maybe_piece { + trace!(%piece_index, "Got piece"); + return Ok(Some(piece)); + } + } + + warn!(%piece_index, "Couldn't get a piece from DSN. Retrying..."); + + Err(backoff::Error::transient( + "Couldn't get piece from DSN".into(), + )) + }) + .await + } +}