From 1aed41bd1d6bc39b9b7b11c0371b5c41bfc50d7f Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Thu, 7 Nov 2024 13:57:53 +0100 Subject: [PATCH 1/2] fix(validator): session signatures storage #274 --- Cargo.lock | 1 - Cargo.toml | 3 +- collator/Cargo.toml | 4 +- .../src/validator/impls/std_impl/session.rs | 85 ++++++++----------- 4 files changed, 40 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71e04585d..1af5bba1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3520,7 +3520,6 @@ dependencies = [ "parking_lot", "rand", "rayon", - "scc", "scopeguard", "serde", "sha2", diff --git a/Cargo.toml b/Cargo.toml index f4150e21c..1e5dddd8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,6 @@ rlimit = "0.10.1" rustc_version = "0.4" rustls = "0.23.16" rustls-webpki = "0.102" -scc = "2.1" scopeguard = "1.2" serde = "1.0" serde_json = "1.0.114" @@ -238,4 +237,4 @@ opt-level = 3 [profile.dev.package.hashbrown] opt-level = 3 [profile.dev.package."*"] -opt-level = 1 +opt-level = 1 \ No newline at end of file diff --git a/collator/Cargo.toml b/collator/Cargo.toml index e076d051e..fc8f98471 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -27,7 +27,6 @@ metrics = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } rayon = { workspace = true } -scc = { workspace = true } scopeguard = { workspace = true } serde = { workspace = true } sha2 = { workspace = true } @@ -65,3 +64,6 @@ block-creator-stats = [] [lints] workspace = true + +[profile.test] +incremental = true diff --git a/collator/src/validator/impls/std_impl/session.rs b/collator/src/validator/impls/std_impl/session.rs index be8562449..c92b3d0d8 100644 --- a/collator/src/validator/impls/std_impl/session.rs +++ b/collator/src/validator/impls/std_impl/session.rs @@ -12,14 +12,13 @@ use everscale_crypto::ed25519::KeyPair; use everscale_types::models::*; use futures_util::stream::FuturesUnordered; use futures_util::{Future, StreamExt}; -use scc::TreeIndex; use tokio::sync::{Notify, Semaphore}; use tokio_util::sync::CancellationToken; use tracing::Instrument; use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request}; use tycho_util::futures::JoinTask; use tycho_util::metrics::HistogramGuard; -use tycho_util::FastHashMap; +use tycho_util::{FastDashMap, FastHashMap}; use super::ValidatorStdImplConfig; use crate::tracing_targets; @@ -87,8 +86,8 @@ impl ValidatorSession { shard_ident: info.shard_ident, weight_threshold, validators: Arc::new(validators), - block_signatures: TreeIndex::new(), - cached_signatures: TreeIndex::new(), + block_signatures: FastDashMap::default(), + cached_signatures: FastDashMap::default(), cancelled: AtomicBool::new(false), cancelled_signal: Notify::new(), }); @@ -153,17 +152,17 @@ impl ValidatorSession { .fetch_max(block_seqno, Ordering::Release); let state = self.inner.state.as_ref(); - state.cached_signatures.remove_range(..=block_seqno); - - let guard = scc::ebr::Guard::new(); - for (_, validation) in state.block_signatures.range(..=block_seqno, &guard) { - validation.cancelled.cancel(); - } - drop(guard); + state.cached_signatures.retain(|&key, _| key > block_seqno); // NOTE: Remove only blocks that are old enough. let until_seqno = block_seqno.saturating_sub(self.inner.config.old_blocks_to_keep); - state.block_signatures.remove_range(..=until_seqno); + + state.block_signatures.retain(|&key, validation| { + if key <= block_seqno { + validation.cancelled.cancel(); + } + key > until_seqno + }); } #[tracing::instrument( @@ -184,41 +183,36 @@ impl ValidatorSession { let state = &self.inner.state; - // Remove cached slot + let entry = state.block_signatures.entry(block_id.seqno); + if let tycho_util::DashMapEntry::Occupied(_) = entry { + anyhow::bail!( + "block validation is already in progress. \ + session_id={}, block_id={:?}", + self.inner.session_id, + block_id + ); + } + let cached = state .cached_signatures - .peek(&block_id.seqno, &scc::ebr::Guard::new()) - .map(Arc::clone); + .remove(&block_id.seqno) + .map(|(_, value)| value); // Prepare block signatures - let block_signatures = match &cached { - Some(cached) => self.reuse_signatures(block_id, cached.clone()).await, - None => self.prepare_new_signatures(block_id), - } - .build(block_id, state.weight_threshold); + let block_signatures = { + match &cached { + Some(cached) => self.reuse_signatures(block_id, cached.clone()).await, + None => self.prepare_new_signatures(block_id), + } + .build(block_id, state.weight_threshold) + }; - // Allow only one validation at a time - if state - .block_signatures - .insert(block_id.seqno, block_signatures.clone()) - .is_err() - { - // TODO: Panic here? - anyhow::bail!( - "block validation is already in progress. \ - session_id={}, block_id={block_id}", - self.inner.session_id - ); - } + entry.or_insert(block_signatures.clone()); - // NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures - // only after we have inserted the block. - // // At this point the following is true: // - All new signatures will be stored (and validated) in the block; // - There might be some new signatures that were stored in the cache, but we // have not yet processed them. We will use them later. - state.cached_signatures.remove(&block_id.seqno); // Start collecting signatures from other validators let mut result = FastHashMap::default(); @@ -604,8 +598,8 @@ struct SessionState { shard_ident: ShardIdent, weight_threshold: u64, validators: Arc>, - block_signatures: TreeIndex>, - cached_signatures: TreeIndex>, + block_signatures: FastDashMap>, + cached_signatures: FastDashMap>, cancelled: AtomicBool, cancelled_signal: Notify, } @@ -758,14 +752,9 @@ impl ExchangeSignatures for SessionState { if self.cancelled.load(Ordering::Acquire) { return Err(ValidationError::Cancelled); } - - let guard = scc::ebr::Guard::new(); - // Full signature exchange if we know the block. // Otherwise, cache the signature for the block to use it later. - // - // NOTE: scc's `peek` does not lock the tree - let result = if let Some(signatures) = self.block_signatures.peek(&block_seqno, &guard) { + let result = if let Some(signatures) = self.block_signatures.get(&block_seqno) { metrics::counter!(METRIC_BLOCK_EXCHANGES_IN_TOTAL).increment(1); let Some(slot) = signatures.other_signatures.get(peer_id) else { @@ -774,13 +763,13 @@ impl ExchangeSignatures for SessionState { // If more signatures are still needed, validate and store new to the block if !signatures.validated.load(Ordering::Acquire) { - self.add_signature(signatures, slot, peer_id, &signature)?; + self.add_signature(&signatures, slot, peer_id, &signature)?; } proto::Exchange::Complete(signatures.own_signature.clone()) } else { // Find the slot for the specified block seqno. - let Some(slot) = self.cached_signatures.peek(&block_seqno, &guard) else { + let Some(slot) = self.cached_signatures.get(&block_seqno) else { metrics::counter!(METRIC_MISS_EXCHANGES_IN_TOTAL).increment(1); return Err(ValidationError::NoSlot); }; @@ -796,8 +785,6 @@ impl ExchangeSignatures for SessionState { proto::Exchange::Cached }; - drop(guard); - let action = match &result { proto::Exchange::Complete(_) => "complete", proto::Exchange::Cached => "cached", From 34cc696c1b0f0ce715772bfaacb920b8e87f475e Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Wed, 13 Nov 2024 10:20:09 +0100 Subject: [PATCH 2/2] fix(validator): remove potentially deadlock --- .../src/validator/impls/std_impl/session.rs | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/collator/src/validator/impls/std_impl/session.rs b/collator/src/validator/impls/std_impl/session.rs index c92b3d0d8..4fe250b3c 100644 --- a/collator/src/validator/impls/std_impl/session.rs +++ b/collator/src/validator/impls/std_impl/session.rs @@ -183,22 +183,20 @@ impl ValidatorSession { let state = &self.inner.state; - let entry = state.block_signatures.entry(block_id.seqno); - if let tycho_util::DashMapEntry::Occupied(_) = entry { - anyhow::bail!( + // first check for prevent slow operations + if state.block_signatures.contains_key(&block_id.seqno) { + panic!( "block validation is already in progress. \ session_id={}, block_id={:?}", - self.inner.session_id, - block_id + self.inner.session_id, block_id ); } let cached = state .cached_signatures - .remove(&block_id.seqno) - .map(|(_, value)| value); + .get(&block_id.seqno) + .map(|entry| entry.clone()); - // Prepare block signatures let block_signatures = { match &cached { Some(cached) => self.reuse_signatures(block_id, cached.clone()).await, @@ -207,7 +205,22 @@ impl ValidatorSession { .build(block_id, state.weight_threshold) }; - entry.or_insert(block_signatures.clone()); + // second check for prevent data races + match state.block_signatures.entry(block_id.seqno) { + tycho_util::DashMapEntry::Occupied(_) => { + panic!( + "block validation is already in progress. \ + session_id={}, block_id={:?}", + self.inner.session_id, block_id + ); + } + tycho_util::DashMapEntry::Vacant(signatures) => { + signatures.insert(block_signatures.clone()); + // NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures + // only after we have inserted the block. + state.cached_signatures.remove(&block_id.seqno); + } + } // At this point the following is true: // - All new signatures will be stored (and validated) in the block; @@ -294,7 +307,6 @@ impl ValidatorSession { debug_assert!(prev.is_none(), "duplicate signature in result"); total_weight += validator_info.weight; } - tracing::info!(target: tracing_targets::VALIDATOR, "finished"); Ok(ValidationStatus::Complete(ValidationComplete { signatures: result, @@ -797,7 +809,6 @@ impl ExchangeSignatures for SessionState { action, "exchanged signatures" ); - Ok(result) } }