From 555803473bffa8a3d4f62302770572fcb4090948 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Wed, 18 Jan 2023 12:47:04 +0700 Subject: [PATCH] networking: Unite requests for storage types. --- .../bin/subspace-farmer/commands/farm/dsn.rs | 84 ++++++++++--------- .../commands/farm/farmer_provider_storage.rs | 24 +++--- .../src/single_disk_plot/piece_publisher.rs | 4 +- crates/subspace-networking/src/lib.rs | 2 +- .../src/request_handlers/piece_by_key.rs | 12 +-- .../src/utils/multihash.rs | 21 +---- .../src/utils/piece_receiver.rs | 73 +++------------- crates/subspace-service/src/dsn.rs | 18 ++-- 8 files changed, 78 insertions(+), 160 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs index 33089a6ab3..85f87c8c24 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs @@ -12,11 +12,10 @@ use subspace_networking::libp2p::identity::Keypair; use subspace_networking::libp2p::kad::record::Key; use subspace_networking::libp2p::kad::ProviderRecord; use subspace_networking::libp2p::multihash::Multihash; -use subspace_networking::utils::multihash::MultihashCode; use subspace_networking::{ create, peer_id, BootstrappedNetworkingParameters, Config, Node, NodeRunner, ParityDbProviderStorage, PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse, - PieceKey, ToMultihash, + ToMultihash, }; use tokio::runtime::Handle; use tracing::{debug, info, trace, warn}; @@ -86,9 +85,16 @@ pub(super) async fn configure_dsn( networking_parameters_registry: BootstrappedNetworkingParameters::new(bootstrap_nodes) .boxed(), request_response_protocols: vec![PieceByHashRequestHandler::create(move |req| { - let result = match req.key { - PieceKey::ArchivalStorage(piece_index_hash) => { - debug!(key=?req.key, "Archival storage piece request received."); + let result = { + debug!(piece_index_hash = ?req.piece_index_hash, "Piece request received. Trying cache..."); + let multihash = req.piece_index_hash.to_multihash(); + + let piece_from_cache = piece_storage.get(&multihash.into()); + + if piece_from_cache.is_some() { + piece_from_cache + } else { + debug!(piece_index_hash = ?req.piece_index_hash, "No piece in the cache. Trying archival storage..."); let (mut reader, piece_details) = { let readers_and_pieces = match weak_readers_and_pieces.upgrade() { @@ -103,23 +109,26 @@ pub(super) async fn configure_dsn( Some(readers_and_pieces) => readers_and_pieces, None => { debug!( - ?piece_index_hash, + ?req.piece_index_hash, "Readers and pieces are not initialized yet" ); return None; } }; - let piece_details = - match readers_and_pieces.pieces.get(&piece_index_hash).copied() { - Some(piece_details) => piece_details, - None => { - trace!( - ?piece_index_hash, - "Piece is not stored in any of the local plots" - ); - return None; - } - }; + let piece_details = match readers_and_pieces + .pieces + .get(&req.piece_index_hash) + .copied() + { + Some(piece_details) => piece_details, + None => { + trace!( + ?req.piece_index_hash, + "Piece is not stored in any of the local plots" + ); + return None; + } + }; let reader = readers_and_pieces .readers .get(piece_details.plot_offset) @@ -136,11 +145,6 @@ pub(super) async fn configure_dsn( ) }) } - PieceKey::Cache(piece_index_hash) => { - debug!(key=?req.key, "Cache piece request received."); - - piece_storage.get(&piece_index_hash.to_multihash().into()) - } }; Some(PieceByHashResponse { piece: result }) @@ -171,7 +175,6 @@ impl FarmerProviderRecordProcessor { //TODO: consider introducing get-piece helper async fn get_piece(&self, piece_index_hash: PieceIndexHash) -> Option { let multihash = piece_index_hash.to_multihash(); - let piece_key = PieceKey::Cache(piece_index_hash); let get_providers_result = self.node.get_providers(multihash).await; @@ -180,21 +183,26 @@ impl FarmerProviderRecordProcessor { while let Some(provider_id) = get_providers_stream.next().await { trace!(?multihash, %provider_id, "get_providers returned an item"); + if provider_id == self.node.id() { + trace!(?multihash, %provider_id, "Attempted to get a piece from itself."); + continue; + } + let request_result = self .node - .send_generic_request(provider_id, PieceByHashRequest { key: piece_key }) + .send_generic_request(provider_id, PieceByHashRequest { piece_index_hash }) .await; match request_result { Ok(PieceByHashResponse { piece: Some(piece) }) => { - trace!(%provider_id, ?multihash, ?piece_key, "Piece request succeeded."); + trace!(%provider_id, ?multihash, ?piece_index_hash, "Piece request succeeded."); return Some(piece); } Ok(PieceByHashResponse { piece: None }) => { - debug!(%provider_id, ?multihash, ?piece_key, "Piece request returned empty piece."); + debug!(%provider_id, ?multihash, ?piece_index_hash, "Piece request returned empty piece."); } Err(error) => { - warn!(%provider_id, ?multihash, ?piece_key, ?error, "Piece request failed."); + warn!(%provider_id, ?multihash, ?piece_index_hash, ?error, "Piece request failed."); } } } @@ -202,7 +210,7 @@ impl FarmerProviderRecordProcessor { Err(err) => { warn!( ?multihash, - ?piece_key, + ?piece_index_hash, ?err, "get_providers returned an error" ); @@ -244,21 +252,17 @@ impl FarmerProviderRecordProcessor { return; } - if multihash.code() == u64::from(MultihashCode::PieceIndex) { - trace!(key=?rec.key, "Starting processing provider record..."); + trace!(key=?rec.key, "Starting processing provider record..."); - if self.piece_storage.should_include_in_storage(&rec.key) { - let piece_index_hash: Blake2b256Hash = multihash.digest()[..BLAKE2B_256_HASH_SIZE] - .try_into() - .expect("Multihash should be known 32 bytes size."); + if self.piece_storage.should_include_in_storage(&rec.key) { + let piece_index_hash: Blake2b256Hash = multihash.digest()[..BLAKE2B_256_HASH_SIZE] + .try_into() + .expect("Multihash should be known 32 bytes size."); - if let Some(piece) = self.get_piece(piece_index_hash.into()).await { - self.piece_storage.add_piece(rec.key.clone(), piece); - self.announce_piece(multihash).await; - } + if let Some(piece) = self.get_piece(piece_index_hash.into()).await { + self.piece_storage.add_piece(rec.key.clone(), piece); + self.announce_piece(multihash).await; } - } else { - trace!(key=?rec.key, "Processing of the provider record cancelled."); } } } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/farmer_provider_storage.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/farmer_provider_storage.rs index 30d5fd9805..2c59b63fef 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/farmer_provider_storage.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/farmer_provider_storage.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use subspace_networking::libp2p::kad::record::Key; use subspace_networking::libp2p::kad::ProviderRecord; use subspace_networking::libp2p::PeerId; -use subspace_networking::utils::multihash::MultihashCode; use subspace_networking::{deconstruct_record_key, ProviderStorage, ToMultihash}; pub(crate) struct FarmerProviderStorage { @@ -44,21 +43,18 @@ where fn providers(&self, key: &Key) -> Vec { let mut provider_records = self.persistent_provider_storage.providers(key); - let (piece_index_hash, multihash_code) = deconstruct_record_key(key); + let (piece_index_hash, _) = deconstruct_record_key(key); - if multihash_code == MultihashCode::Sector - && self - .readers_and_pieces - .lock() - .as_ref() - .expect("Should be populated at this point.") - .pieces - .contains_key(&piece_index_hash) + if self + .readers_and_pieces + .lock() + .as_ref() + .expect("Should be populated at this point.") + .pieces + .contains_key(&piece_index_hash) { provider_records.push(ProviderRecord { - key: piece_index_hash - .to_multihash_by_code(MultihashCode::Sector) - .into(), + key: piece_index_hash.to_multihash().into(), provider: self.local_peer_id, expires: None, addresses: Vec::new(), // TODO: add address hints @@ -77,7 +73,7 @@ where .keys() .map(|hash| { ProviderRecord { - key: hash.to_multihash_by_code(MultihashCode::Sector).into(), + key: hash.to_multihash().into(), provider: self.local_peer_id, expires: None, addresses: Vec::new(), // TODO: add address hints diff --git a/crates/subspace-farmer/src/single_disk_plot/piece_publisher.rs b/crates/subspace-farmer/src/single_disk_plot/piece_publisher.rs index 77276dab8e..9fde6a62e8 100644 --- a/crates/subspace-farmer/src/single_disk_plot/piece_publisher.rs +++ b/crates/subspace-farmer/src/single_disk_plot/piece_publisher.rs @@ -7,7 +7,6 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::{PieceIndex, PieceIndexHash}; -use subspace_networking::utils::multihash::MultihashCode; use subspace_networking::{Node, ToMultihash}; use tokio::time::error::Elapsed; use tokio::time::timeout; @@ -104,8 +103,7 @@ impl PieceSectorPublisher { ) -> Result<(), Box> { self.check_cancellation()?; - let key = - PieceIndexHash::from_index(piece_index).to_multihash_by_code(MultihashCode::Sector); + let key = PieceIndexHash::from_index(piece_index).to_multihash(); let result = self.dsn_node.start_announcing(key).await; diff --git a/crates/subspace-networking/src/lib.rs b/crates/subspace-networking/src/lib.rs index 61ce0f2af9..829b20f123 100644 --- a/crates/subspace-networking/src/lib.rs +++ b/crates/subspace-networking/src/lib.rs @@ -53,7 +53,7 @@ pub use request_handlers::peer_info::{ PeerInfo, PeerInfoRequest, PeerInfoRequestHandler, PeerInfoResponse, PeerSyncStatus, }; pub use request_handlers::piece_by_key::{ - PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse, PieceKey, + PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse, }; pub use request_handlers::pieces_by_range::{ PiecesByRangeRequest, PiecesByRangeRequestHandler, PiecesByRangeResponse, PiecesToPlot, diff --git a/crates/subspace-networking/src/request_handlers/piece_by_key.rs b/crates/subspace-networking/src/request_handlers/piece_by_key.rs index a4fbc8a2ad..c47b3e8708 100644 --- a/crates/subspace-networking/src/request_handlers/piece_by_key.rs +++ b/crates/subspace-networking/src/request_handlers/piece_by_key.rs @@ -7,19 +7,11 @@ use crate::request_handlers::generic_request_handler::{GenericRequest, GenericRe use parity_scale_codec::{Decode, Encode}; use subspace_core_primitives::{Piece, PieceIndexHash}; -//TODO: rename all module names if we keep this enum -#[derive(Debug, Clone, Eq, PartialEq, Copy, Encode, Decode)] -pub enum PieceKey { - Cache(PieceIndexHash), - ArchivalStorage(PieceIndexHash), -} - /// Piece-by-hash protocol request. #[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)] pub struct PieceByHashRequest { - //TODO: rename if we keep the enum - /// Piece index hash - pub key: PieceKey, + /// Request key - piece index hash + pub piece_index_hash: PieceIndexHash, } impl GenericRequest for PieceByHashRequest { diff --git a/crates/subspace-networking/src/utils/multihash.rs b/crates/subspace-networking/src/utils/multihash.rs index 67e73aa9d8..ea19d9b4da 100644 --- a/crates/subspace-networking/src/utils/multihash.rs +++ b/crates/subspace-networking/src/utils/multihash.rs @@ -1,6 +1,6 @@ use libp2p::multihash::Multihash; use std::error::Error; -use subspace_core_primitives::{Blake2b256Hash, PieceIndexHash}; +use subspace_core_primitives::PieceIndexHash; /// Start of Subspace Network multicodec namespace (+1000 to distinguish from future stable values): /// https://github.com/multiformats/multicodec/blob/master/table.csv @@ -9,9 +9,7 @@ const SUBSPACE_MULTICODEC_NAMESPACE_START: u64 = 0xb39910 + 1000; #[derive(Debug, Clone, PartialEq)] #[repr(u64)] pub enum MultihashCode { - Piece = SUBSPACE_MULTICODEC_NAMESPACE_START, - PieceIndex = SUBSPACE_MULTICODEC_NAMESPACE_START + 1, - Sector = SUBSPACE_MULTICODEC_NAMESPACE_START + 2, + PieceIndexHash = SUBSPACE_MULTICODEC_NAMESPACE_START, } impl From for u64 { @@ -25,9 +23,7 @@ impl TryFrom for MultihashCode { fn try_from(value: u64) -> Result { match value { - x if x == MultihashCode::Piece as u64 => Ok(MultihashCode::Piece), - x if x == MultihashCode::PieceIndex as u64 => Ok(MultihashCode::PieceIndex), - x if x == MultihashCode::Sector as u64 => Ok(MultihashCode::Sector), + x if x == MultihashCode::PieceIndexHash as u64 => Ok(MultihashCode::PieceIndexHash), _ => Err("Unexpected multihash code".into()), } } @@ -39,15 +35,6 @@ pub fn create_multihash_by_piece_index(piece_index: u64) -> Multihash { piece_index_hash.to_multihash() } -pub fn create_multihash_by_piece(records_root: &Blake2b256Hash, piece_index: u64) -> Multihash { - let piece_index_bytes = piece_index.to_le_bytes(); - let mut input = Vec::with_capacity(records_root.len() + piece_index_bytes.len()); - input.extend_from_slice(records_root); - input.extend_from_slice(&piece_index_bytes); - Multihash::wrap(u64::from(MultihashCode::Piece), &input) - .expect("Input never exceeds allocated size; qed") -} - pub trait ToMultihash { fn to_multihash(&self) -> Multihash; fn to_multihash_by_code(&self, code: MultihashCode) -> Multihash; @@ -55,7 +42,7 @@ pub trait ToMultihash { impl ToMultihash for PieceIndexHash { fn to_multihash(&self) -> Multihash { - self.to_multihash_by_code(MultihashCode::PieceIndex) + self.to_multihash_by_code(MultihashCode::PieceIndexHash) } fn to_multihash_by_code(&self, code: MultihashCode) -> Multihash { diff --git a/crates/subspace-networking/src/utils/piece_receiver.rs b/crates/subspace-networking/src/utils/piece_receiver.rs index 5a1c246d8c..1ce68ac5f6 100644 --- a/crates/subspace-networking/src/utils/piece_receiver.rs +++ b/crates/subspace-networking/src/utils/piece_receiver.rs @@ -1,26 +1,20 @@ -use crate::utils::multihash::MultihashCode; -use crate::{Node, PieceByHashRequest, PieceByHashResponse, PieceKey, ToMultihash}; +use crate::{Node, PieceByHashRequest, PieceByHashResponse, ToMultihash}; use async_trait::async_trait; use backoff::future::retry; use backoff::ExponentialBackoff; -use futures::stream::FuturesUnordered; use futures::StreamExt; use libp2p::PeerId; use std::error::Error; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use subspace_core_primitives::{Piece, PieceIndex, PieceIndexHash}; -use tokio::time::{sleep, timeout}; +use tokio::time::timeout; use tracing::{debug, trace, warn}; /// Defines initial duration between get_piece calls. const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1); /// Defines max duration between get_piece calls. const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5); -/// Delay for getting piece from cache before resorting to archival storage -const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(2); /// Max time allocated for getting piece from DSN before attempt is considered to fail const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5); @@ -34,24 +28,6 @@ pub trait PieceReceiver: Send + Sync { ) -> Result, Box>; } -// Defines target storage type for requets. -#[derive(Debug, Copy, Clone)] -enum StorageType { - // L2 piece cache - Cache, - // L1 archival storage for pieces - ArchivalStorage, -} - -impl From for MultihashCode { - fn from(storage_type: StorageType) -> Self { - match storage_type { - StorageType::Cache => MultihashCode::PieceIndex, - StorageType::ArchivalStorage => MultihashCode::Sector, - } - } -} - #[async_trait] pub trait PieceValidator: Sync + Send { async fn validate_piece( @@ -89,17 +65,9 @@ impl<'a, PV: PieceValidator> PieceProvider<'a, PV> { } // Get from piece cache (L2) or archival storage (L1) - async fn get_piece_from_storage( - &self, - piece_index: PieceIndex, - storage_type: StorageType, - ) -> Option { + async fn get_piece_from_storage(&self, piece_index: PieceIndex) -> Option { let piece_index_hash = PieceIndexHash::from_index(piece_index); - let key = piece_index_hash.to_multihash_by_code(storage_type.into()); - let piece_key = match storage_type { - StorageType::Cache => PieceKey::Cache(piece_index_hash), - StorageType::ArchivalStorage => PieceKey::ArchivalStorage(piece_index_hash), - }; + let key = piece_index_hash.to_multihash(); let get_providers_result = self.dsn_node.get_providers(key).await; @@ -110,7 +78,7 @@ impl<'a, PV: PieceValidator> PieceProvider<'a, PV> { let request_result = self .dsn_node - .send_generic_request(provider_id, PieceByHashRequest { key: piece_key }) + .send_generic_request(provider_id, PieceByHashRequest { piece_index_hash }) .await; match request_result { @@ -163,32 +131,11 @@ impl<'a, PV: PieceValidator> PieceReceiver for PieceProvider<'a, PV> { self.check_cancellation() .map_err(backoff::Error::Permanent)?; - // Try to pull pieces in two ways, whichever is faster - let mut piece_attempts = [ - timeout( - GET_PIECE_TIMEOUT, - Box::pin(self.get_piece_from_storage(piece_index, StorageType::Cache)) - as Pin + Send>>, - ), - //TODO: verify "broken pipe" error cause - timeout( - GET_PIECE_TIMEOUT, - Box::pin(async { - // Prefer cache if it can return quickly, otherwise fall back to archival storage - sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await; - self.get_piece_from_storage(piece_index, StorageType::ArchivalStorage) - .await - }) as Pin + Send>>, - ), - ] - .into_iter() - .collect::>(); - - while let Some(maybe_piece) = piece_attempts.next().await { - if let Ok(Some(piece)) = maybe_piece { - trace!(%piece_index, "Got piece"); - return Ok(Some(piece)); - } + let maybe_piece = timeout(GET_PIECE_TIMEOUT, self.get_piece_from_storage(piece_index)); + + if let Ok(Some(piece)) = maybe_piece.await { + trace!(%piece_index, "Got piece"); + return Ok(Some(piece)); } warn!(%piece_index, "Couldn't get a piece from DSN. Retrying..."); diff --git a/crates/subspace-service/src/dsn.rs b/crates/subspace-service/src/dsn.rs index d0b884590f..b2fcbd0355 100644 --- a/crates/subspace-service/src/dsn.rs +++ b/crates/subspace-service/src/dsn.rs @@ -21,7 +21,7 @@ use subspace_core_primitives::{PieceIndex, PieceIndexHash, PIECES_IN_SEGMENT}; use subspace_networking::libp2p::{identity, Multiaddr}; use subspace_networking::{ peer_id, BootstrappedNetworkingParameters, CreationError, MemoryProviderStorage, Node, - NodeRunner, ParityDbProviderStorage, PieceByHashRequestHandler, PieceByHashResponse, PieceKey, + NodeRunner, ParityDbProviderStorage, PieceByHashRequestHandler, PieceByHashResponse, ToMultihash, }; use tokio::sync::Semaphore; @@ -97,18 +97,12 @@ where ) .boxed(), request_response_protocols: vec![PieceByHashRequestHandler::create(move |req| { - let result = if let PieceKey::Cache(piece_index_hash) = req.key { - match piece_cache.get_piece(piece_index_hash) { - Ok(maybe_piece) => maybe_piece, - Err(error) => { - error!(key=?req.key, %error, "Failed to get piece from cache"); - None - } + let result = match piece_cache.get_piece(req.piece_index_hash) { + Ok(maybe_piece) => maybe_piece, + Err(error) => { + error!(piece_index_hash = ?req.piece_index_hash, %error, "Failed to get piece from cache"); + None } - } else { - debug!(key=?req.key, "Incorrect piece request - unsupported key type."); - - None }; Some(PieceByHashResponse { piece: result })