Skip to content

Commit

Permalink
Merge pull request #1070 from subspace/refactor-piece-receiving
Browse files Browse the repository at this point in the history
DSN. Refactor piece receiving.
  • Loading branch information
shamil-gadelshin authored Jan 17, 2023
2 parents 6dd74f6 + 559ca3b commit a5e4e17
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 184 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use subspace_core_primitives::{
use thiserror::Error;
use tracing::{debug, info};

/// Duplicate trait for the subspace_networking::PieceReceiver. The goal of this trait is
/// simplifying dependency graph.
#[async_trait]
pub trait PieceReceiver {
async fn get_piece(
Expand Down
44 changes: 36 additions & 8 deletions crates/subspace-farmer/src/single_disk_plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::reward_signing::reward_signing;
use crate::single_disk_plot::farming::audit_sector;
use crate::single_disk_plot::piece_publisher::PieceSectorPublisher;
use crate::single_disk_plot::piece_reader::{read_piece, PieceReader, ReadPieceRequest};
use crate::single_disk_plot::piece_receiver::RecordsRootPieceValidator;
use crate::single_disk_plot::plotting::{plot_sector, PlottedSector};
use crate::utils::JoinOnDrop;
use async_trait::async_trait;
use bytesize::ByteSize;
use derive_more::{Display, From};
use event_listener_primitives::{Bag, HandlerId};
Expand All @@ -21,9 +23,9 @@ use lru::LruCache;
use memmap2::{Mmap, MmapMut, MmapOptions};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use piece_receiver::MultiChannelPieceReceiver;
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::error::Error;
use std::fs::OpenOptions;
use std::future::Future;
use std::io::{Seek, SeekFrom};
Expand All @@ -38,11 +40,12 @@ use std_semaphore::{Semaphore, SemaphoreGuard};
use subspace_core_primitives::crypto::kzg::{test_public_parameters, Kzg};
use subspace_core_primitives::sector_codec::SectorCodec;
use subspace_core_primitives::{
PieceIndex, PublicKey, SectorId, SectorIndex, Solution, PIECES_IN_SECTOR, PLOT_SECTOR_SIZE,
Piece, PieceIndex, PublicKey, SectorId, SectorIndex, Solution, PIECES_IN_SECTOR,
PLOT_SECTOR_SIZE,
};
use subspace_farmer_components::file_ext::FileExt;
use subspace_farmer_components::{farming, plotting, SectorMetadata};
use subspace_networking::Node;
use subspace_networking::{Node, PieceProvider};
use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
use thiserror::Error;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -420,6 +423,26 @@ struct Handlers {
solution: Handler<SolutionResponse>,
}

/// Adapter struct for the PieceReceiver trait for subspace-networking
/// and subspace-farmer-components crates.
struct PieceReceiverWrapper<PR>(PR);

impl<PR: subspace_networking::PieceReceiver> PieceReceiverWrapper<PR> {
fn new(piece_getter: PR) -> Self {
Self(piece_getter)
}
}

#[async_trait]
impl<PR: subspace_networking::PieceReceiver> plotting::PieceReceiver for PieceReceiverWrapper<PR> {
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
self.0.get_piece(piece_index).await
}
}

/// Single disk plot abstraction is a container for everything necessary to plot/farm with a single
/// disk plot.
///
Expand Down Expand Up @@ -683,14 +706,19 @@ impl SingleDiskPlot {
let records_roots_cache =
Mutex::new(LruCache::new(RECORDS_ROOTS_CACHE_SIZE));

let piece_receiver = MultiChannelPieceReceiver::new(
let piece_receiver = PieceProvider::new(
&dsn_node,
&node_client,
&kzg,
&records_roots_cache,
Some(RecordsRootPieceValidator::new(
&dsn_node,
&node_client,
&kzg,
&records_roots_cache,
)),
&shutting_down,
);

let piece_receiver_wrapper = PieceReceiverWrapper::new(piece_receiver);

// TODO: Concurrency
for (sector_index, sector, sector_metadata) in plot_initial_sector {
trace!(%sector_index, "Preparing to plot sector");
Expand Down Expand Up @@ -726,7 +754,7 @@ impl SingleDiskPlot {
let plot_sector_fut = plot_sector(
&public_key,
sector_index,
&piece_receiver,
&piece_receiver_wrapper,
&shutting_down,
&farmer_app_info.protocol_info,
&kzg,
Expand Down
187 changes: 11 additions & 176 deletions crates/subspace-farmer/src/single_disk_plot/piece_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,149 +1,48 @@
use crate::NodeClient;
use async_trait::async_trait;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use lru::LruCache;
use parking_lot::Mutex;
use std::error::Error;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use subspace_archiving::archiver::is_piece_valid;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::{
Piece, PieceIndex, PieceIndexHash, RecordsRoot, SegmentIndex, PIECES_IN_SEGMENT, RECORD_SIZE,
Piece, PieceIndex, RecordsRoot, SegmentIndex, PIECES_IN_SEGMENT, RECORD_SIZE,
};
use subspace_farmer_components::plotting::PieceReceiver;
use subspace_networking::libp2p::multihash::Multihash;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::MultihashCode;
use subspace_networking::{Node, PieceByHashRequest, PieceByHashResponse, PieceKey, ToMultihash};
use tokio::time::{sleep, timeout};
use tracing::{debug, error, trace, warn};
use subspace_networking::{Node, PieceValidator};
use tracing::error;

/// Defines initial duration between get_piece calls.
const GET_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
/// Defines max duration between get_piece calls.
const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(5);
/// Delay for getting piece from cache before resorting to archival storage
const GET_PIECE_ARCHIVAL_STORAGE_DELAY: Duration = Duration::from_secs(2);
/// Max time allocated for getting piece from DSN before attempt is considered to fail
const GET_PIECE_TIMEOUT: Duration = Duration::from_secs(5);

// Defines target storage type for requets.
#[derive(Debug, Copy, Clone)]
enum StorageType {
// L2 piece cache
Cache,
// L1 archival storage for pieces
ArchivalStorage,
}

impl From<StorageType> for MultihashCode {
fn from(storage_type: StorageType) -> Self {
match storage_type {
StorageType::Cache => MultihashCode::PieceIndex,
StorageType::ArchivalStorage => MultihashCode::Sector,
}
}
}

// Temporary struct serving pieces from different providers using configuration arguments.
pub(crate) struct MultiChannelPieceReceiver<'a, NC> {
pub(crate) struct RecordsRootPieceValidator<'a, NC> {
dsn_node: &'a Node,
node_client: &'a NC,
kzg: &'a Kzg,
records_root_cache: &'a Mutex<LruCache<SegmentIndex, RecordsRoot>>,
cancelled: &'a AtomicBool,
}

impl<'a, NC> MultiChannelPieceReceiver<'a, NC>
where
NC: NodeClient,
{
impl<'a, NC> RecordsRootPieceValidator<'a, NC> {
pub(crate) fn new(
dsn_node: &'a Node,
node_client: &'a NC,
kzg: &'a Kzg,
records_root_cache: &'a Mutex<LruCache<SegmentIndex, RecordsRoot>>,
cancelled: &'a AtomicBool,
) -> Self {
Self {
dsn_node,
node_client,
kzg,
records_root_cache,
cancelled,
}
}

fn check_cancellation(&self) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
if self.cancelled.load(Ordering::Acquire) {
debug!("Getting a piece was cancelled.");

return Err("Getting a piece was cancelled.".into());
}

Ok(())
}

// Get from piece cache (L2) or archival storage (L1)
async fn get_piece_from_storage(
&self,
piece_index: PieceIndex,
storage_type: StorageType,
) -> Option<Piece> {
let piece_index_hash = PieceIndexHash::from_index(piece_index);
let key = piece_index_hash.to_multihash_by_code(storage_type.into());
let piece_key = match storage_type {
StorageType::Cache => PieceKey::Cache(piece_index_hash),
StorageType::ArchivalStorage => PieceKey::ArchivalStorage(piece_index_hash),
};

let get_providers_result = self.dsn_node.get_providers(key).await;

match get_providers_result {
Ok(mut get_providers_stream) => {
while let Some(provider_id) = get_providers_stream.next().await {
trace!(%piece_index, %provider_id, "get_providers returned an item");

let request_result = self
.dsn_node
.send_generic_request(provider_id, PieceByHashRequest { key: piece_key })
.await;

match request_result {
Ok(PieceByHashResponse { piece: Some(piece) }) => {
trace!(%provider_id, %piece_index, ?key, "Piece request succeeded.");
return self
.validate_piece(provider_id, piece_index, key, piece)
.await;
}
Ok(PieceByHashResponse { piece: None }) => {
debug!(%provider_id, %piece_index, ?key, "Piece request returned empty piece.");
}
Err(error) => {
warn!(%provider_id, %piece_index, ?key, ?error, "Piece request failed.");
}
}
}
}
Err(err) => {
warn!(%piece_index,?key, ?err, "get_providers returned an error");
}
}

None
}
}

#[async_trait]
impl<'a, NC> PieceValidator for RecordsRootPieceValidator<'a, NC>
where
NC: NodeClient,
{
async fn validate_piece(
&self,
source_peer_id: PeerId,
piece_index: PieceIndex,
key: Multihash,
piece: Piece,
) -> Option<Piece> {
if source_peer_id != self.dsn_node.id() {
Expand All @@ -159,7 +58,6 @@ where
Err(error) => {
error!(
%piece_index,
?key,
?error,
"Failed tor retrieve records root from node"
);
Expand All @@ -172,7 +70,6 @@ where
None => {
error!(
%piece_index,
?key,
%segment_index,
"Records root for segment index wasn't found on node"
);
Expand All @@ -199,7 +96,6 @@ where
) {
error!(
%piece_index,
?key,
%source_peer_id,
"Received invalid piece from peer"
);
Expand All @@ -213,64 +109,3 @@ where
Some(piece)
}
}

#[async_trait]
impl<'a, NC> PieceReceiver for MultiChannelPieceReceiver<'a, NC>
where
NC: NodeClient,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
trace!(%piece_index, "Piece request.");

let backoff = ExponentialBackoff {
initial_interval: GET_PIECE_INITIAL_INTERVAL,
max_interval: GET_PIECE_MAX_INTERVAL,
// Try until we get a valid piece
max_elapsed_time: None,
..ExponentialBackoff::default()
};

retry(backoff, || async {
self.check_cancellation()
.map_err(backoff::Error::Permanent)?;

// Try to pull pieces in two ways, whichever is faster
let mut piece_attempts = [
timeout(
GET_PIECE_TIMEOUT,
Box::pin(self.get_piece_from_storage(piece_index, StorageType::Cache))
as Pin<Box<dyn Future<Output = _> + Send>>,
),
//TODO: verify "broken pipe" error cause
timeout(
GET_PIECE_TIMEOUT,
Box::pin(async {
// Prefer cache if it can return quickly, otherwise fall back to archival storage
sleep(GET_PIECE_ARCHIVAL_STORAGE_DELAY).await;
self.get_piece_from_storage(piece_index, StorageType::ArchivalStorage)
.await
}) as Pin<Box<dyn Future<Output = _> + Send>>,
),
]
.into_iter()
.collect::<FuturesUnordered<_>>();

while let Some(maybe_piece) = piece_attempts.next().await {
if let Ok(Some(piece)) = maybe_piece {
trace!(%piece_index, "Got piece");
return Ok(Some(piece));
}
}

warn!(%piece_index, "Couldn't get a piece from DSN. Retrying...");

Err(backoff::Error::transient(
"Couldn't get piece from DSN".into(),
))
})
.await
}
}
1 change: 1 addition & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ include = [
actix-web = "4.2.1"
anyhow = "1.0.66"
async-trait = "0.1.58"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bytes = "1.2.1"
bytesize = "1.1.0"
chrono = {version = "0.4.21", features = ["clock", "serde", "std",]}
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,6 @@ pub use request_handlers::pieces_by_range::{
};
pub use utils::deconstruct_record_key;
pub use utils::multihash::ToMultihash;
pub use utils::piece_receiver::{PieceProvider, PieceReceiver, PieceValidator};
pub use utils::prometheus::start_prometheus_metrics_server;
pub use utils::record_binary_heap::RecordBinaryHeap;
1 change: 1 addition & 0 deletions crates/subspace-networking/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod multihash;
pub mod piece_receiver;
pub(crate) mod prometheus;
pub(crate) mod record_binary_heap;
#[cfg(test)]
Expand Down
Loading

0 comments on commit a5e4e17

Please sign in to comment.