Skip to content

Commit 6dd74f6

Browse files
Merge pull request #1068 from subspace/dsn-node-provider-storage
node: Add provider record persistence.
2 parents 07ab4b2 + e61140e commit 6dd74f6

File tree

11 files changed

+165
-40
lines changed

11 files changed

+165
-40
lines changed

Diff for: Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: crates/subspace-networking/src/behavior/provider_storage/providers.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,9 @@ impl ParityDbProviderStorage {
236236
}
237237

238238
if heap.size() > 0 {
239-
info!(size = heap.size(), "Record cache loaded.");
239+
info!(size = heap.size(), ?path, "Record cache loaded.");
240240
} else {
241-
info!("New record cache initialized.");
241+
info!(?path, "New record cache initialized.");
242242
}
243243

244244
Ok(Self {

Diff for: crates/subspace-networking/src/create.rs

+3
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,9 @@ pub enum CreationError {
284284
/// Transport error when attempting to listen on multiaddr.
285285
#[error("Transport error when attempting to listen on multiaddr: {0}")]
286286
TransportError(#[from] TransportError<io::Error>),
287+
/// ParityDb storage error
288+
#[error("ParityDb storage error: {0}")]
289+
ParityDbStorageError(#[from] parity_db::Error),
287290
}
288291

289292
/// Converts public key from keypair to PeerId.

Diff for: crates/subspace-node/src/bin/subspace-node.rs

+5
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ fn main() -> Result<(), Error> {
421421

422422
DsnConfig {
423423
keypair,
424+
base_path: cli.run.base_path()?.map(|base_path| {
425+
base_path
426+
.config_dir(primary_chain_config.chain_spec.id())
427+
.join("dsn")
428+
}),
424429
listen_on: cli.dsn_listen_on,
425430
bootstrap_nodes: dsn_bootstrap_nodes,
426431
reserved_peers: cli.dsn_reserved_peers,

Diff for: crates/subspace-service/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ targets = ["x86_64-unknown-linux-gnu"]
1919
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
2020
derive_more = "0.99.17"
2121
domain-runtime-primitives = { version = "0.1.0", path = "../../domains/primitives/runtime" }
22+
either = "1.8.0"
2223
frame-support = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
2324
futures = "0.3.25"
2425
jsonrpsee = { version = "0.16.2", features = ["server"] }
2526
pallet-transaction-payment-rpc = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
2627
parity-scale-codec = "3.2.1"
28+
parking_lot = "0.12.1"
2729
sc-basic-authorship = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
2830
sc-chain-spec = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }
2931
sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "100d6c90d4122578006a47c1dcaf155b9c685f34" }

Diff for: crates/subspace-service/src/dsn.rs

+30-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
mod node_provider_storage;
2+
3+
use crate::dsn::node_provider_storage::NodeProviderStorage;
14
use crate::piece_cache::PieceCache;
25
use backoff::future::retry;
36
use backoff::ExponentialBackoff;
7+
use either::Either;
48
use futures::stream::FuturesUnordered;
59
use futures::{Stream, StreamExt};
610
use sc_client_api::AuxStore;
@@ -9,14 +13,16 @@ use sp_core::traits::SpawnNamed;
913
use sp_runtime::traits::Block as BlockT;
1014
use std::error::Error;
1115
use std::num::NonZeroUsize;
16+
use std::path::PathBuf;
1217
use std::sync::Arc;
1318
use std::time::Duration;
1419
use subspace_archiving::archiver::ArchivedSegment;
1520
use subspace_core_primitives::{PieceIndex, PieceIndexHash, PIECES_IN_SEGMENT};
1621
use subspace_networking::libp2p::{identity, Multiaddr};
1722
use subspace_networking::{
1823
peer_id, BootstrappedNetworkingParameters, CreationError, MemoryProviderStorage, Node,
19-
NodeRunner, PieceByHashRequestHandler, PieceByHashResponse, PieceKey, ToMultihash,
24+
NodeRunner, ParityDbProviderStorage, PieceByHashRequestHandler, PieceByHashResponse, PieceKey,
25+
ToMultihash,
2026
};
2127
use tokio::sync::Semaphore;
2228
use tokio::time::error::Elapsed;
@@ -29,6 +35,8 @@ const PUBLISH_PIECE_TIMEOUT: Duration = Duration::from_secs(120);
2935
const PUBLISH_PIECE_INITIAL_INTERVAL: Duration = Duration::from_secs(1);
3036
/// Defines max duration between put_piece calls.
3137
const PUBLISH_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(30);
38+
/// Provider records cache size
39+
const MAX_PROVIDER_RECORDS_LIMIT: usize = 100000; // ~ 10 MB
3240

3341
/// DSN configuration parameters.
3442
#[derive(Clone, Debug)]
@@ -47,20 +55,38 @@ pub struct DsnConfig {
4755

4856
/// Determines whether we allow keeping non-global (private, shared, loopback..) addresses in Kademlia DHT.
4957
pub allow_non_global_addresses_in_dht: bool,
58+
59+
/// System base path.
60+
pub base_path: Option<PathBuf>,
5061
}
5162

63+
type DsnProviderStorage<AS> =
64+
NodeProviderStorage<PieceCache<AS>, Either<ParityDbProviderStorage, MemoryProviderStorage>>;
65+
5266
pub(crate) async fn create_dsn_instance<Block, AS>(
5367
dsn_config: DsnConfig,
5468
piece_cache: PieceCache<AS>,
55-
) -> Result<(Node, NodeRunner<MemoryProviderStorage>), CreationError>
69+
) -> Result<(Node, NodeRunner<DsnProviderStorage<AS>>), CreationError>
5670
where
5771
Block: BlockT,
5872
AS: AuxStore + Sync + Send + 'static,
5973
{
6074
trace!("Subspace networking starting.");
6175

62-
// TODO: This should be a wrapper that handles locally cached pieces
63-
let provider_storage = MemoryProviderStorage::new(peer_id(&dsn_config.keypair));
76+
let peer_id = peer_id(&dsn_config.keypair);
77+
78+
let external_provider_storage = if let Some(path) = dsn_config.base_path {
79+
let db_path = path.join("storage_providers_db").into_boxed_path();
80+
81+
let cache_size: NonZeroUsize = NonZeroUsize::new(MAX_PROVIDER_RECORDS_LIMIT)
82+
.expect("Manual value should be greater than zero.");
83+
84+
Either::Left(ParityDbProviderStorage::new(&db_path, cache_size, peer_id)?)
85+
} else {
86+
Either::Right(MemoryProviderStorage::new(peer_id))
87+
};
88+
89+
let provider_storage = NodeProviderStorage::new(piece_cache.clone(), external_provider_storage);
6490

6591
let networking_config = subspace_networking::Config {
6692
keypair: dsn_config.keypair.clone(),
+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use subspace_networking::libp2p::kad::record::Key;
2+
use subspace_networking::libp2p::kad::ProviderRecord;
3+
use subspace_networking::libp2p::PeerId;
4+
use subspace_networking::ProviderStorage;
5+
6+
pub(crate) struct NodeProviderStorage<ImplicitProviderStorage, PersistentProviderStorage> {
7+
/// Provider records from local cache
8+
implicit_provider_storage: ImplicitProviderStorage,
9+
/// External provider records
10+
persistent_provider_storage: PersistentProviderStorage,
11+
}
12+
13+
impl<ImplicitProviderStorage, ExternalProviderStorage>
14+
NodeProviderStorage<ImplicitProviderStorage, ExternalProviderStorage>
15+
{
16+
pub(crate) fn new(
17+
implicit_provider_storage: ImplicitProviderStorage,
18+
persistent_provider_storage: ExternalProviderStorage,
19+
) -> Self {
20+
Self {
21+
implicit_provider_storage,
22+
persistent_provider_storage,
23+
}
24+
}
25+
}
26+
27+
impl<ImplicitProviderStorage, PersistentProviderStorage> ProviderStorage
28+
for NodeProviderStorage<ImplicitProviderStorage, PersistentProviderStorage>
29+
where
30+
ImplicitProviderStorage: ProviderStorage,
31+
PersistentProviderStorage: ProviderStorage,
32+
{
33+
type ProvidedIter<'a> = ImplicitProviderStorage::ProvidedIter<'a> where Self:'a;
34+
35+
fn add_provider(
36+
&mut self,
37+
record: ProviderRecord,
38+
) -> subspace_networking::libp2p::kad::store::Result<()> {
39+
// only external provider records
40+
self.persistent_provider_storage.add_provider(record)
41+
}
42+
43+
fn providers(&self, key: &Key) -> Vec<ProviderRecord> {
44+
let mut local_provider_records = self.implicit_provider_storage.providers(key);
45+
let mut external_provider_records = self.persistent_provider_storage.providers(key);
46+
47+
local_provider_records.append(&mut external_provider_records);
48+
49+
local_provider_records
50+
}
51+
52+
fn provided(&self) -> Self::ProvidedIter<'_> {
53+
// only local cached provider records
54+
self.implicit_provider_storage.provided()
55+
}
56+
57+
fn remove_provider(&mut self, key: &Key, peer_id: &PeerId) {
58+
// only external provider records
59+
self.persistent_provider_storage
60+
.remove_provider(key, peer_id)
61+
}
62+
}

Diff for: crates/subspace-service/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616

1717
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18-
#![feature(type_changing_struct_update)]
18+
#![feature(type_alias_impl_trait, type_changing_struct_update)]
1919

2020
mod dsn;
2121
pub mod piece_cache;

Diff for: crates/subspace-service/src/piece_cache.rs

+56-33
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
mod tests;
33

44
use parity_scale_codec::{Decode, Encode};
5+
use parking_lot::Mutex;
56
use sc_client_api::backend::AuxStore;
67
use std::borrow::Cow;
78
use std::collections::BTreeSet;
@@ -25,7 +26,7 @@ pub struct PieceCache<AS> {
2526
/// Peer ID of the current node.
2627
local_peer_id: PeerId,
2728
/// Local provided keys
28-
local_provided_keys: BTreeSet<PieceIndex>,
29+
local_provided_keys: Arc<Mutex<BTreeSet<PieceIndex>>>,
2930
}
3031

3132
impl<AS> Clone for PieceCache<AS> {
@@ -65,7 +66,7 @@ where
6566
aux_store,
6667
max_pieces_in_cache,
6768
local_peer_id,
68-
local_provided_keys,
69+
local_provided_keys: Arc::new(Mutex::new(local_provided_keys)),
6970
}
7071
}
7172

@@ -80,14 +81,17 @@ where
8081
}))
8182
}
8283

83-
fn write_local_provided_keys(&self) -> Result<(), Box<dyn Error>> {
84+
fn write_local_provided_keys(
85+
&self,
86+
local_provided_keys: BTreeSet<PieceIndex>,
87+
) -> Result<(), Box<dyn Error>> {
8488
// TODO: Could be a slow process. We need to optimize it ASAP!
8589
self.aux_store
8690
.insert_aux(
8791
&vec![(
8892
LOCAL_PROVIDED_KEYS,
8993
ParityDbKeyCollection {
90-
set: self.local_provided_keys.clone(),
94+
set: local_provided_keys,
9195
}
9296
.encode()
9397
.as_slice(),
@@ -150,15 +154,21 @@ where
150154
.collect::<Vec<_>>(),
151155
)?;
152156

153-
for piece_index in delete_indexes {
154-
self.local_provided_keys.remove(&piece_index);
155-
}
157+
let local_provided_keys = {
158+
let mut local_provided_keys = self.local_provided_keys.lock();
156159

157-
for piece_index in insert_indexes {
158-
self.local_provided_keys.insert(piece_index);
159-
}
160+
for piece_index in delete_indexes {
161+
local_provided_keys.remove(&piece_index);
162+
}
163+
164+
for piece_index in insert_indexes {
165+
local_provided_keys.insert(piece_index);
166+
}
167+
168+
local_provided_keys.clone()
169+
};
160170

161-
self.write_local_provided_keys()?;
171+
self.write_local_provided_keys(local_provided_keys)?;
162172

163173
Ok(())
164174
}
@@ -249,7 +259,13 @@ where
249259
}
250260

251261
fn provided(&self) -> Self::ProvidedIter<'_> {
252-
let pieces_indexes = self.local_provided_keys.iter();
262+
let pieces_indexes = {
263+
self.local_provided_keys
264+
.lock()
265+
.iter()
266+
.cloned()
267+
.collect::<Vec<_>>()
268+
};
253269

254270
AuxStoreProviderRecordIterator::new(pieces_indexes, self.clone())
255271
}
@@ -264,18 +280,17 @@ where
264280
}
265281

266282
pub struct AuxStoreProviderRecordIterator<'a, AS> {
267-
piece_index_iter: std::collections::btree_set::Iter<'a, PieceIndex>,
283+
piece_indexes: Vec<PieceIndex>,
284+
piece_indexes_cursor: usize,
268285
piece_cache: PieceCache<AS>,
269286
marker: PhantomData<&'a ()>,
270287
}
271288

272289
impl<'a, AS: AuxStore> AuxStoreProviderRecordIterator<'a, AS> {
273-
pub fn new(
274-
piece_index_iter: std::collections::btree_set::Iter<'a, PieceIndex>,
275-
piece_cache: PieceCache<AS>,
276-
) -> Self {
290+
pub fn new(piece_indexes: Vec<PieceIndex>, piece_cache: PieceCache<AS>) -> Self {
277291
Self {
278-
piece_index_iter,
292+
piece_indexes,
293+
piece_indexes_cursor: 0,
279294
piece_cache,
280295
marker: PhantomData,
281296
}
@@ -286,23 +301,31 @@ impl<'a, AS: AuxStore> Iterator for AuxStoreProviderRecordIterator<'a, AS> {
286301
type Item = Cow<'a, ProviderRecord>;
287302

288303
fn next(&mut self) -> Option<Self::Item> {
304+
if self.piece_indexes.len() == self.piece_indexes_cursor {
305+
return None; // iterator finished
306+
}
307+
289308
let peer_id = self.piece_cache.local_peer_id;
309+
let piece_index = self.piece_indexes[self.piece_indexes_cursor];
310+
let piece_index_hash = PieceIndexHash::from_index(piece_index);
311+
let key = Key::from(piece_index_hash.to_multihash());
312+
313+
let result = self
314+
.piece_cache
315+
.get_piece(piece_index_hash)
316+
.ok()
317+
.flatten()
318+
.map(move |_| ProviderRecord {
319+
key: key.clone(),
320+
provider: peer_id,
321+
expires: None,
322+
addresses: vec![], // TODO: add address hints
323+
})
324+
.map(Cow::Owned);
290325

291-
self.piece_index_iter.next().and_then(|piece_index| {
292-
let piece_index_hash = PieceIndexHash::from_index(*piece_index);
293-
let key = Key::from(piece_index_hash.to_multihash());
326+
// Move iterator cursor forward
327+
self.piece_indexes_cursor += 1;
294328

295-
self.piece_cache
296-
.get_piece(piece_index_hash)
297-
.ok()
298-
.flatten()
299-
.map(move |_| ProviderRecord {
300-
key: key.clone(),
301-
provider: peer_id,
302-
expires: None,
303-
addresses: vec![], // TODO: add address hints
304-
})
305-
.map(Cow::Owned)
306-
})
329+
result
307330
}
308331
}

Diff for: domains/test/service/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ async fn run_executor(
128128
force_new_slot_notifications: true,
129129
subspace_networking: SubspaceNetworking::Create {
130130
config: DsnConfig {
131+
base_path: None,
131132
listen_on: vec!["/ip4/127.0.0.1/tcp/0"
132133
.parse()
133134
.expect("Correct multiaddr; qed")],

Diff for: test/subspace-test-service/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ pub async fn run_validator_node(
193193
force_new_slot_notifications: true,
194194
subspace_networking: SubspaceNetworking::Create {
195195
config: DsnConfig {
196+
base_path: None,
196197
listen_on: vec!["/ip4/127.0.0.1/tcp/0"
197198
.parse()
198199
.expect("Correct multiaddr; qed")],

0 commit comments

Comments
 (0)