Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 54 additions & 161 deletions validator_client/validator_services/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::duties_service::{DutiesService, Error, SelectionProofConfig};
use eth2::types::SyncCommitteeSelection;
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use logging::crit;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down Expand Up @@ -607,187 +606,81 @@ pub async fn fill_in_aggregation_proofs<S: ValidatorStore, T: SlotClock + 'stati

// Generate selection proofs for each validator at each slot, one slot at a time.
for slot in (start_slot..=pre_compute_slot.as_u64()).map(Slot::new) {
// For distributed mode
if duties_service
.sync_duties
.selection_proof_config
.parallel_sign
{
let mut futures_unordered = FuturesUnordered::new();

for (_, duty) in pre_compute_duties {
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
"error" = ?e,
"Arithmetic error computing subnet IDs"
);
continue;
}
};

// Construct proof for prior slot.
let proof_slot = slot - 1;

// Calling the make_sync_selection_proof will return a full selection proof
for &subnet_id in &subnet_ids {
let duties_service = duties_service.clone();
futures_unordered.push(async move {
let result =
make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id)
.await;

result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof))
});
}
}

while let Some(result) = futures_unordered.next().await {
let Some((validator_index, proof_slot, subnet_id, proof)) = result else {
continue;
};
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!("period" = sync_committee_period, "Missing sync duties");
continue;
};
let mut futures_unordered = FuturesUnordered::new();

let validators = committee_duties.validators.read();

// Check if the validator is an aggregator
match proof.is_aggregator::<S::E>() {
Ok(true) => {
if let Some(Some(duty)) = validators.get(&validator_index) {
debug!(
validator_index,
"slot" = %proof_slot,
"subcommittee_index" = *subnet_id,
// log full selection proof for debugging
"full selection proof" = ?proof,
"Validator is sync aggregator"
);

// Store the proof
duty.aggregation_duties
.proofs
.write()
.insert((proof_slot, subnet_id), proof);
}
}
Ok(false) => {} // Not an aggregator
Err(e) => {
warn!(
validator_index,
%slot,
"error" = ?e,
"Error determining is_aggregator"
);
}
}
for (validator_start_slot, duty) in pre_compute_duties {
if slot < *validator_start_slot {
Copy link
Author

@nadtech-hub nadtech-hub Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doubting to keep this case and was wondering why it was only seen in distributed mode
@michaelsproul

continue;
}
} else {
// For non-distributed mode
debug!(
period = sync_committee_period,
%current_slot,
%pre_compute_slot,
"Calculating sync selection proofs"
);

let mut validator_proofs = vec![];
for (validator_start_slot, duty) in pre_compute_duties {
// Proofs are already known at this slot for this validator.
if slot < *validator_start_slot {
let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
"error" = ?e,
"Arithmetic error computing subnet IDs"
);
continue;
}
};

let subnet_ids = match duty.subnet_ids::<S::E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
error = ?e,
"Arithmetic error computing subnet IDs"
);
continue;
}
};

// Create futures to produce proofs.
let duties_service_ref = &duties_service;
let futures = subnet_ids.iter().map(|subnet_id| async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
// Construct proof for prior slot.
let proof_slot = slot - 1;

let proof =
make_sync_selection_proof(duties_service_ref, duty, proof_slot, *subnet_id)
// Calling the make_sync_selection_proof will return a full selection proof
for &subnet_id in &subnet_ids {
let duties_service = duties_service.clone();
futures_unordered.push(async move {
let result =
make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id)
.await;

match proof {
Some(proof) => match proof.is_aggregator::<S::E>() {
Ok(true) => {
debug!(
validator_index = duty.validator_index,
slot = %proof_slot,
%subnet_id,
"Validator is sync aggregator"
);
Some(((proof_slot, *subnet_id), proof))
}
Ok(false) => None,
Err(e) => {
warn!(
pubkey = ?duty.pubkey,
slot = %proof_slot,
error = ?e,
"Error determining is_aggregator"
);
None
}
},

None => None,
}
result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof))
});

// Execute all the futures in parallel, collecting any successful results.
let proofs = join_all(futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();

validator_proofs.push((duty.validator_index, proofs));
}
}

// Add to global storage (we add regularly so the proofs can be used ASAP).
while let Some(result) = futures_unordered.next().await {
let Some((validator_index, proof_slot, subnet_id, proof)) = result else {
continue;
};
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(period = sync_committee_period, "Missing sync duties");
debug!("period" = sync_committee_period, "Missing sync duties");
continue;
};

let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();

for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
debug!(
// Check if the validator is an aggregator
match proof.is_aggregator::<S::E>() {
Ok(true) => {
if let Some(Some(duty)) = validators.get(&validator_index) {
debug!(
validator_index,
"slot" = %proof_slot,
"subcommittee_index" = *subnet_id,
// log full selection proof for debugging
"full selection proof" = ?proof,
"Validator is sync aggregator"
);

// Store the proof
duty.aggregation_duties
.proofs
.write()
.insert((proof_slot, subnet_id), proof);
}
}
Ok(false) => {} // Not an aggregator
Err(e) => {
warn!(
validator_index,
period = sync_committee_period,
"Missing sync duty to update"
%slot,
"error" = ?e,
"Error determining is_aggregator"
);
}
}

if num_validators_updated > 0 {
debug!(
%slot,
updated_validators = num_validators_updated,
"Finished computing sync selection proofs"
);
}
}
}
}