Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(validator): session signatures storage #428

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ rlimit = "0.10.1"
rustc_version = "0.4"
rustls = "0.23.16"
rustls-webpki = "0.102"
scc = "2.1"
scopeguard = "1.2"
serde = "1.0"
serde_json = "1.0.114"
Expand Down Expand Up @@ -238,4 +237,4 @@ opt-level = 3
[profile.dev.package.hashbrown]
opt-level = 3
[profile.dev.package."*"]
opt-level = 1
opt-level = 1
4 changes: 3 additions & 1 deletion collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
scc = { workspace = true }
scopeguard = { workspace = true }
serde = { workspace = true }
sha2 = { workspace = true }
Expand Down Expand Up @@ -65,3 +64,6 @@ block-creator-stats = []

[lints]
workspace = true

[profile.test]
incremental = true
100 changes: 49 additions & 51 deletions collator/src/validator/impls/std_impl/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use everscale_crypto::ed25519::KeyPair;
use everscale_types::models::*;
use futures_util::stream::FuturesUnordered;
use futures_util::{Future, StreamExt};
use scc::TreeIndex;
use tokio::sync::{Notify, Semaphore};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request};
use tycho_util::futures::JoinTask;
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastHashMap;
use tycho_util::{FastDashMap, FastHashMap};

use super::ValidatorStdImplConfig;
use crate::tracing_targets;
Expand Down Expand Up @@ -87,8 +86,8 @@ impl ValidatorSession {
shard_ident: info.shard_ident,
weight_threshold,
validators: Arc::new(validators),
block_signatures: TreeIndex::new(),
cached_signatures: TreeIndex::new(),
block_signatures: FastDashMap::default(),
cached_signatures: FastDashMap::default(),
cancelled: AtomicBool::new(false),
cancelled_signal: Notify::new(),
});
Expand Down Expand Up @@ -153,17 +152,17 @@ impl ValidatorSession {
.fetch_max(block_seqno, Ordering::Release);

let state = self.inner.state.as_ref();
state.cached_signatures.remove_range(..=block_seqno);

let guard = scc::ebr::Guard::new();
for (_, validation) in state.block_signatures.range(..=block_seqno, &guard) {
validation.cancelled.cancel();
}
drop(guard);
state.cached_signatures.retain(|&key, _| key > block_seqno);

// NOTE: Remove only blocks that are old enough.
let until_seqno = block_seqno.saturating_sub(self.inner.config.old_blocks_to_keep);
state.block_signatures.remove_range(..=until_seqno);

state.block_signatures.retain(|&key, validation| {
if key <= block_seqno {
validation.cancelled.cancel();
}
key > until_seqno
});
}

#[tracing::instrument(
Expand All @@ -184,41 +183,49 @@ impl ValidatorSession {

let state = &self.inner.state;

// Remove cached slot
// first check for prevent slow operations
if state.block_signatures.contains_key(&block_id.seqno) {
panic!(
"block validation is already in progress. \
session_id={}, block_id={:?}",
self.inner.session_id, block_id
);
}

let cached = state
.cached_signatures
.peek(&block_id.seqno, &scc::ebr::Guard::new())
.map(Arc::clone);
.get(&block_id.seqno)
.map(|entry| entry.clone());

// Prepare block signatures
let block_signatures = match &cached {
Some(cached) => self.reuse_signatures(block_id, cached.clone()).await,
None => self.prepare_new_signatures(block_id),
}
.build(block_id, state.weight_threshold);
let block_signatures = {
match &cached {
Some(cached) => self.reuse_signatures(block_id, cached.clone()).await,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entry lives across the await here, it can deadlock

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored

None => self.prepare_new_signatures(block_id),
}
.build(block_id, state.weight_threshold)
};

// Allow only one validation at a time
if state
.block_signatures
.insert(block_id.seqno, block_signatures.clone())
.is_err()
{
// TODO: Panic here?
anyhow::bail!(
"block validation is already in progress. \
session_id={}, block_id={block_id}",
self.inner.session_id
);
// second check for prevent data races
match state.block_signatures.entry(block_id.seqno) {
tycho_util::DashMapEntry::Occupied(_) => {
panic!(
"block validation is already in progress. \
session_id={}, block_id={:?}",
self.inner.session_id, block_id
);
}
tycho_util::DashMapEntry::Vacant(signatures) => {
signatures.insert(block_signatures.clone());
// NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures
// only after we have inserted the block.
state.cached_signatures.remove(&block_id.seqno);
}
}

// NOTE: To eliminate the gap inside exchange routine, we can remove cached signatures
// only after we have inserted the block.
//
// At this point the following is true:
// - All new signatures will be stored (and validated) in the block;
// - There might be some new signatures that were stored in the cache, but we
// have not yet processed them. We will use them later.
state.cached_signatures.remove(&block_id.seqno);

// Start collecting signatures from other validators
let mut result = FastHashMap::default();
Expand Down Expand Up @@ -300,7 +307,6 @@ impl ValidatorSession {
debug_assert!(prev.is_none(), "duplicate signature in result");
total_weight += validator_info.weight;
}

tracing::info!(target: tracing_targets::VALIDATOR, "finished");
Ok(ValidationStatus::Complete(ValidationComplete {
signatures: result,
Expand Down Expand Up @@ -604,8 +610,8 @@ struct SessionState {
shard_ident: ShardIdent,
weight_threshold: u64,
validators: Arc<FastHashMap<PeerId, BriefValidatorDescr>>,
block_signatures: TreeIndex<u32, Arc<BlockSignatures>>,
cached_signatures: TreeIndex<u32, Arc<CachedSignatures>>,
block_signatures: FastDashMap<u32, Arc<BlockSignatures>>,
cached_signatures: FastDashMap<u32, Arc<CachedSignatures>>,
cancelled: AtomicBool,
cancelled_signal: Notify,
}
Expand Down Expand Up @@ -758,14 +764,9 @@ impl ExchangeSignatures for SessionState {
if self.cancelled.load(Ordering::Acquire) {
return Err(ValidationError::Cancelled);
}

let guard = scc::ebr::Guard::new();

// Full signature exchange if we know the block.
// Otherwise, cache the signature for the block to use it later.
//
// NOTE: scc's `peek` does not lock the tree
let result = if let Some(signatures) = self.block_signatures.peek(&block_seqno, &guard) {
let result = if let Some(signatures) = self.block_signatures.get(&block_seqno) {
metrics::counter!(METRIC_BLOCK_EXCHANGES_IN_TOTAL).increment(1);

let Some(slot) = signatures.other_signatures.get(peer_id) else {
Expand All @@ -774,13 +775,13 @@ impl ExchangeSignatures for SessionState {

// If more signatures are still needed, validate and store new to the block
if !signatures.validated.load(Ordering::Acquire) {
self.add_signature(signatures, slot, peer_id, &signature)?;
self.add_signature(&signatures, slot, peer_id, &signature)?;
}

proto::Exchange::Complete(signatures.own_signature.clone())
} else {
// Find the slot for the specified block seqno.
let Some(slot) = self.cached_signatures.peek(&block_seqno, &guard) else {
let Some(slot) = self.cached_signatures.get(&block_seqno) else {
metrics::counter!(METRIC_MISS_EXCHANGES_IN_TOTAL).increment(1);
return Err(ValidationError::NoSlot);
};
Expand All @@ -796,8 +797,6 @@ impl ExchangeSignatures for SessionState {
proto::Exchange::Cached
};

drop(guard);

let action = match &result {
proto::Exchange::Complete(_) => "complete",
proto::Exchange::Cached => "cached",
Expand All @@ -810,7 +809,6 @@ impl ExchangeSignatures for SessionState {
action,
"exchanged signatures"
);

Ok(result)
}
}
Expand Down
Loading