Skip to content

Commit

Permalink
Merge pull request #1072 from subspace/unify-piece-receiving-protocol
Browse files Browse the repository at this point in the history
networking: Unite requests for storage types.
  • Loading branch information
shamil-gadelshin authored Jan 18, 2023
2 parents a5e4e17 + 5558034 commit d3a8537
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 160 deletions.
84 changes: 44 additions & 40 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm/dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::record::Key;
use subspace_networking::libp2p::kad::ProviderRecord;
use subspace_networking::libp2p::multihash::Multihash;
use subspace_networking::utils::multihash::MultihashCode;
use subspace_networking::{
create, peer_id, BootstrappedNetworkingParameters, Config, Node, NodeRunner,
ParityDbProviderStorage, PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse,
PieceKey, ToMultihash,
ToMultihash,
};
use tokio::runtime::Handle;
use tracing::{debug, info, trace, warn};
Expand Down Expand Up @@ -86,9 +85,16 @@ pub(super) async fn configure_dsn(
networking_parameters_registry: BootstrappedNetworkingParameters::new(bootstrap_nodes)
.boxed(),
request_response_protocols: vec![PieceByHashRequestHandler::create(move |req| {
let result = match req.key {
PieceKey::ArchivalStorage(piece_index_hash) => {
debug!(key=?req.key, "Archival storage piece request received.");
let result = {
debug!(piece_index_hash = ?req.piece_index_hash, "Piece request received. Trying cache...");
let multihash = req.piece_index_hash.to_multihash();

let piece_from_cache = piece_storage.get(&multihash.into());

if piece_from_cache.is_some() {
piece_from_cache
} else {
debug!(piece_index_hash = ?req.piece_index_hash, "No piece in the cache. Trying archival storage...");

let (mut reader, piece_details) = {
let readers_and_pieces = match weak_readers_and_pieces.upgrade() {
Expand All @@ -103,23 +109,26 @@ pub(super) async fn configure_dsn(
Some(readers_and_pieces) => readers_and_pieces,
None => {
debug!(
?piece_index_hash,
?req.piece_index_hash,
"Readers and pieces are not initialized yet"
);
return None;
}
};
let piece_details =
match readers_and_pieces.pieces.get(&piece_index_hash).copied() {
Some(piece_details) => piece_details,
None => {
trace!(
?piece_index_hash,
"Piece is not stored in any of the local plots"
);
return None;
}
};
let piece_details = match readers_and_pieces
.pieces
.get(&req.piece_index_hash)
.copied()
{
Some(piece_details) => piece_details,
None => {
trace!(
?req.piece_index_hash,
"Piece is not stored in any of the local plots"
);
return None;
}
};
let reader = readers_and_pieces
.readers
.get(piece_details.plot_offset)
Expand All @@ -136,11 +145,6 @@ pub(super) async fn configure_dsn(
)
})
}
PieceKey::Cache(piece_index_hash) => {
debug!(key=?req.key, "Cache piece request received.");

piece_storage.get(&piece_index_hash.to_multihash().into())
}
};

Some(PieceByHashResponse { piece: result })
Expand Down Expand Up @@ -171,7 +175,6 @@ impl<PS: PieceStorage> FarmerProviderRecordProcessor<PS> {
//TODO: consider introducing get-piece helper
async fn get_piece(&self, piece_index_hash: PieceIndexHash) -> Option<Piece> {
let multihash = piece_index_hash.to_multihash();
let piece_key = PieceKey::Cache(piece_index_hash);

let get_providers_result = self.node.get_providers(multihash).await;

Expand All @@ -180,29 +183,34 @@ impl<PS: PieceStorage> FarmerProviderRecordProcessor<PS> {
while let Some(provider_id) = get_providers_stream.next().await {
trace!(?multihash, %provider_id, "get_providers returned an item");

if provider_id == self.node.id() {
trace!(?multihash, %provider_id, "Attempted to get a piece from itself.");
continue;
}

let request_result = self
.node
.send_generic_request(provider_id, PieceByHashRequest { key: piece_key })
.send_generic_request(provider_id, PieceByHashRequest { piece_index_hash })
.await;

match request_result {
Ok(PieceByHashResponse { piece: Some(piece) }) => {
trace!(%provider_id, ?multihash, ?piece_key, "Piece request succeeded.");
trace!(%provider_id, ?multihash, ?piece_index_hash, "Piece request succeeded.");
return Some(piece);
}
Ok(PieceByHashResponse { piece: None }) => {
debug!(%provider_id, ?multihash, ?piece_key, "Piece request returned empty piece.");
debug!(%provider_id, ?multihash, ?piece_index_hash, "Piece request returned empty piece.");
}
Err(error) => {
warn!(%provider_id, ?multihash, ?piece_key, ?error, "Piece request failed.");
warn!(%provider_id, ?multihash, ?piece_index_hash, ?error, "Piece request failed.");
}
}
}
}
Err(err) => {
warn!(
?multihash,
?piece_key,
?piece_index_hash,
?err,
"get_providers returned an error"
);
Expand Down Expand Up @@ -244,21 +252,17 @@ impl<PS: PieceStorage> FarmerProviderRecordProcessor<PS> {
return;
}

if multihash.code() == u64::from(MultihashCode::PieceIndex) {
trace!(key=?rec.key, "Starting processing provider record...");
trace!(key=?rec.key, "Starting processing provider record...");

if self.piece_storage.should_include_in_storage(&rec.key) {
let piece_index_hash: Blake2b256Hash = multihash.digest()[..BLAKE2B_256_HASH_SIZE]
.try_into()
.expect("Multihash should be known 32 bytes size.");
if self.piece_storage.should_include_in_storage(&rec.key) {
let piece_index_hash: Blake2b256Hash = multihash.digest()[..BLAKE2B_256_HASH_SIZE]
.try_into()
.expect("Multihash should be known 32 bytes size.");

if let Some(piece) = self.get_piece(piece_index_hash.into()).await {
self.piece_storage.add_piece(rec.key.clone(), piece);
self.announce_piece(multihash).await;
}
if let Some(piece) = self.get_piece(piece_index_hash.into()).await {
self.piece_storage.add_piece(rec.key.clone(), piece);
self.announce_piece(multihash).await;
}
} else {
trace!(key=?rec.key, "Processing of the provider record cancelled.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;
use subspace_networking::libp2p::kad::record::Key;
use subspace_networking::libp2p::kad::ProviderRecord;
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::MultihashCode;
use subspace_networking::{deconstruct_record_key, ProviderStorage, ToMultihash};

pub(crate) struct FarmerProviderStorage<PersistentProviderStorage> {
Expand Down Expand Up @@ -44,21 +43,18 @@ where
fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
let mut provider_records = self.persistent_provider_storage.providers(key);

let (piece_index_hash, multihash_code) = deconstruct_record_key(key);
let (piece_index_hash, _) = deconstruct_record_key(key);

if multihash_code == MultihashCode::Sector
&& self
.readers_and_pieces
.lock()
.as_ref()
.expect("Should be populated at this point.")
.pieces
.contains_key(&piece_index_hash)
if self
.readers_and_pieces
.lock()
.as_ref()
.expect("Should be populated at this point.")
.pieces
.contains_key(&piece_index_hash)
{
provider_records.push(ProviderRecord {
key: piece_index_hash
.to_multihash_by_code(MultihashCode::Sector)
.into(),
key: piece_index_hash.to_multihash().into(),
provider: self.local_peer_id,
expires: None,
addresses: Vec::new(), // TODO: add address hints
Expand All @@ -77,7 +73,7 @@ where
.keys()
.map(|hash| {
ProviderRecord {
key: hash.to_multihash_by_code(MultihashCode::Sector).into(),
key: hash.to_multihash().into(),
provider: self.local_peer_id,
expires: None,
addresses: Vec::new(), // TODO: add address hints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{PieceIndex, PieceIndexHash};
use subspace_networking::utils::multihash::MultihashCode;
use subspace_networking::{Node, ToMultihash};
use tokio::time::error::Elapsed;
use tokio::time::timeout;
Expand Down Expand Up @@ -104,8 +103,7 @@ impl PieceSectorPublisher {
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
self.check_cancellation()?;

let key =
PieceIndexHash::from_index(piece_index).to_multihash_by_code(MultihashCode::Sector);
let key = PieceIndexHash::from_index(piece_index).to_multihash();

let result = self.dsn_node.start_announcing(key).await;

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub use request_handlers::peer_info::{
PeerInfo, PeerInfoRequest, PeerInfoRequestHandler, PeerInfoResponse, PeerSyncStatus,
};
pub use request_handlers::piece_by_key::{
PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse, PieceKey,
PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse,
};
pub use request_handlers::pieces_by_range::{
PiecesByRangeRequest, PiecesByRangeRequestHandler, PiecesByRangeResponse, PiecesToPlot,
Expand Down
12 changes: 2 additions & 10 deletions crates/subspace-networking/src/request_handlers/piece_by_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,11 @@ use crate::request_handlers::generic_request_handler::{GenericRequest, GenericRe
use parity_scale_codec::{Decode, Encode};
use subspace_core_primitives::{Piece, PieceIndexHash};

//TODO: rename all module names if we keep this enum
#[derive(Debug, Clone, Eq, PartialEq, Copy, Encode, Decode)]
pub enum PieceKey {
Cache(PieceIndexHash),
ArchivalStorage(PieceIndexHash),
}

/// Piece-by-hash protocol request.
#[derive(Debug, Clone, Eq, PartialEq, Encode, Decode)]
pub struct PieceByHashRequest {
//TODO: rename if we keep the enum
/// Piece index hash
pub key: PieceKey,
/// Request key - piece index hash
pub piece_index_hash: PieceIndexHash,
}

impl GenericRequest for PieceByHashRequest {
Expand Down
21 changes: 4 additions & 17 deletions crates/subspace-networking/src/utils/multihash.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use libp2p::multihash::Multihash;
use std::error::Error;
use subspace_core_primitives::{Blake2b256Hash, PieceIndexHash};
use subspace_core_primitives::PieceIndexHash;

/// Start of Subspace Network multicodec namespace (+1000 to distinguish from future stable values):
/// https://github.com/multiformats/multicodec/blob/master/table.csv
Expand All @@ -9,9 +9,7 @@ const SUBSPACE_MULTICODEC_NAMESPACE_START: u64 = 0xb39910 + 1000;
#[derive(Debug, Clone, PartialEq)]
#[repr(u64)]
pub enum MultihashCode {
Piece = SUBSPACE_MULTICODEC_NAMESPACE_START,
PieceIndex = SUBSPACE_MULTICODEC_NAMESPACE_START + 1,
Sector = SUBSPACE_MULTICODEC_NAMESPACE_START + 2,
PieceIndexHash = SUBSPACE_MULTICODEC_NAMESPACE_START,
}

impl From<MultihashCode> for u64 {
Expand All @@ -25,9 +23,7 @@ impl TryFrom<u64> for MultihashCode {

fn try_from(value: u64) -> Result<Self, Self::Error> {
match value {
x if x == MultihashCode::Piece as u64 => Ok(MultihashCode::Piece),
x if x == MultihashCode::PieceIndex as u64 => Ok(MultihashCode::PieceIndex),
x if x == MultihashCode::Sector as u64 => Ok(MultihashCode::Sector),
x if x == MultihashCode::PieceIndexHash as u64 => Ok(MultihashCode::PieceIndexHash),
_ => Err("Unexpected multihash code".into()),
}
}
Expand All @@ -39,23 +35,14 @@ pub fn create_multihash_by_piece_index(piece_index: u64) -> Multihash {
piece_index_hash.to_multihash()
}

pub fn create_multihash_by_piece(records_root: &Blake2b256Hash, piece_index: u64) -> Multihash {
let piece_index_bytes = piece_index.to_le_bytes();
let mut input = Vec::with_capacity(records_root.len() + piece_index_bytes.len());
input.extend_from_slice(records_root);
input.extend_from_slice(&piece_index_bytes);
Multihash::wrap(u64::from(MultihashCode::Piece), &input)
.expect("Input never exceeds allocated size; qed")
}

pub trait ToMultihash {
fn to_multihash(&self) -> Multihash;
fn to_multihash_by_code(&self, code: MultihashCode) -> Multihash;
}

impl ToMultihash for PieceIndexHash {
fn to_multihash(&self) -> Multihash {
self.to_multihash_by_code(MultihashCode::PieceIndex)
self.to_multihash_by_code(MultihashCode::PieceIndexHash)
}

fn to_multihash_by_code(&self, code: MultihashCode) -> Multihash {
Expand Down
Loading

0 comments on commit d3a8537

Please sign in to comment.