Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 25 additions & 20 deletions crates/portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,7 @@ impl<
content_key: TContentKey,
content_value: RawContentValue,
) -> PutContentInfo {
let should_we_store = match self
.store
.lock()
.is_key_within_radius_and_unavailable(&content_key)
{
let should_we_store = match self.store.lock().should_we_store(&content_key) {
Ok(should_we_store) => matches!(should_we_store, ShouldWeStoreContent::Store),
Err(err) => {
warn!(
Expand Down Expand Up @@ -492,12 +488,14 @@ impl<
Ok(Response::Content(found_content)) => {
match found_content {
Content::Content(content) => {
match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), false)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
let validation_result = self.validate_content(&content_key, &content).await;
if validation_result.is_valid() {
Ok((Content::Content(content), false))
} else {
Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {validation_result:?}",
self.protocol
))),
)))
}
}
Content::Enrs(_) => Ok((found_content, false)),
Expand All @@ -514,12 +512,14 @@ impl<
};
let content = RawContentValue::from(bytes);

match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), true)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
let validation_result = self.validate_content(&content_key, &content).await;
if validation_result.is_valid() {
Ok((Content::Content(content), true))
} else {
Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {validation_result:?}",
self.protocol
))),
)))
}
}
}
Expand All @@ -533,13 +533,18 @@ impl<
&self,
content_key: &TContentKey,
content: &[u8],
) -> anyhow::Result<ValidationResult<TContentKey>> {
) -> ValidationResult {
let validation_result = self.validator.validate_content(content_key, content).await;
self.metrics.report_validation(validation_result.is_ok());

validation_result.map_err(|err| {
anyhow!("Content validation failed for content key {content_key:?} with error: {err:?}")
})
self.metrics.report_validation(validation_result.is_valid());
if !validation_result.is_valid() {
warn!(
"Content validation failed for content key {}: {validation_result:?}",
content_key.to_bytes(),
)
}

validation_result
}

/// Initialize FindContent uTP stream with remote node
Expand Down
58 changes: 26 additions & 32 deletions crates/portalnet/src/overlay/service/find_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,37 +525,36 @@ impl<
.await;
utp_processing
.metrics
.report_validation(validation_result.is_ok());

let validation_result = match validation_result {
Ok(validation_result) => validation_result,
Err(err) => {
warn!(
error = ?err,
content.id = %hex_encode_compact(content_id),
content.key = %content_key,
"Error validating content"
);
// Indicate to the query that the content is invalid
let _ = valid_content_callback.send(None);
if let Some(query_trace_events_tx) = query_trace_events_tx {
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
query_id,
sending_peer,
QueryFailureKind::InvalidContent,
));
}
return;
.report_validation(validation_result.is_valid());

if !validation_result.is_valid() {
warn!(
content.id = %hex_encode_compact(content_id),
content.key = %content_key,
?validation_result,
"Error validating content"
);
// Indicate to the query that the content is invalid
let _ = valid_content_callback.send(None);
if let Some(query_trace_events_tx) = query_trace_events_tx {
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
query_id,
sending_peer,
QueryFailureKind::InvalidContent,
));
}
};
return;
}

// skip storing if content is not valid for storing, the content
// is already stored or if there's an error reading the store
let should_store = validation_result.valid_for_storing
// store content that:
// - is canonically valid
// - is not already stored
// - is within radius (if applicable)
let should_store = validation_result.is_canonically_valid()
&& utp_processing
.store
.lock()
.is_key_within_radius_and_unavailable(&content_key)
.should_we_store(&content_key)
.map_or_else(
|err| {
error!("Unable to read store: {err}");
Expand All @@ -571,17 +570,12 @@ impl<
{
Ok(dropped_content) => {
let mut content_to_propagate = vec![(content_key.clone(), content.clone())];
if let Some(additional_content_to_propagate) =
validation_result.additional_content_to_propagate
{
content_to_propagate.push(additional_content_to_propagate);
}
if !dropped_content.is_empty() && utp_processing.gossip_dropped {
debug!(
"Dropped {:?} pieces of content after inserting new content, propagating them back into the network.",
dropped_content.len(),
);
content_to_propagate.extend(dropped_content.clone());
content_to_propagate.extend(dropped_content);
}
propagate_put_content_cross_thread::<_, TMetric>(
content_to_propagate,
Expand Down
46 changes: 10 additions & 36 deletions crates/portalnet/src/overlay/service/offer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,11 @@ impl<

for (i, key) in content_keys.iter().enumerate() {
// Accept content if within radius and not already present in the data store.
let accept = self
.store
.lock()
.is_key_within_radius_and_unavailable(key)
.map_err(|err| {
OverlayRequestError::AcceptError(format!(
"Unable to check content availability {err}"
))
})?;
let accept = self.store.lock().should_we_store(key).map_err(|err| {
OverlayRequestError::AcceptError(format!(
"Unable to check content availability {err}"
))
})?;
let accept_code = match accept {
ShouldWeStoreContent::Store => {
// accept all keys that are successfully added to the queue
Expand Down Expand Up @@ -558,45 +554,23 @@ impl<
.await;
utp_processing
.metrics
.report_validation(validation_result.is_ok());
.report_validation(validation_result.is_valid());

let validation_result = match validation_result {
Ok(validation_result) => validation_result,
Err(err) => {
// Skip storing & propagating content if it's not valid
warn!(
error = %err,
content.key = %key.to_hex(),
"Error validating accepted content"
);
return None;
}
};

if !validation_result.valid_for_storing {
// Content received via Offer/Accept should be valid for storing.
// If it isn't, don't store it and don't propagate it.
if !validation_result.is_canonically_valid() {
warn!(
content.key = %key.to_hex(),
"Error validating accepted content - not valid for storing"
?validation_result,
"Error validating accepted content",
);
return None;
}

// Collect all content to propagate
let mut content_to_propagate = vec![(key.clone(), content_value.clone())];
if let Some(additional_content_to_propagate) =
validation_result.additional_content_to_propagate
{
content_to_propagate.push(additional_content_to_propagate);
}

// Check if data should be stored, and store if it is within our radius and not
// already stored.
let key_desired = utp_processing
.store
.lock()
.is_key_within_radius_and_unavailable(&key);
let key_desired = utp_processing.store.lock().should_we_store(&key);
match key_desired {
Ok(ShouldWeStoreContent::Store) => {
match utp_processing.store.lock().put(key.clone(), &content_value) {
Expand Down
27 changes: 14 additions & 13 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub trait ContentStore {
fn get(&self, key: &Self::Key) -> Result<Option<RawContentValue>, ContentStoreError>;

/// Puts a piece of content into the store.
///
/// Returns a list of keys that were evicted from the store, which should be gossiped into the
/// network. In the future this might be updated to a separate table that stores a queue
/// of content keys to be gossiped and gossips them in a background task.
Expand All @@ -51,10 +52,17 @@ pub trait ContentStore {

/// Returns whether the content denoted by `key` is within the radius of the data store and not
/// already stored within the data store.
fn is_key_within_radius_and_unavailable(
fn should_we_store(&self, key: &Self::Key) -> Result<ShouldWeStoreContent, ContentStoreError>;

/// Performs [ContentStore::should_we_store] for multiple content keys.
///
/// The default implementation calls `self.should_we_store` for each key.
fn should_we_store_batch(
&self,
key: &Self::Key,
) -> Result<ShouldWeStoreContent, ContentStoreError>;
keys: &[Self::Key],
) -> Result<Vec<ShouldWeStoreContent>, ContentStoreError> {
keys.iter().map(|key| self.should_we_store(key)).collect()
}

/// Returns the radius of the data store.
fn radius(&self) -> Distance;
Expand Down Expand Up @@ -122,10 +130,7 @@ impl<TMetric: Metric> ContentStore for MemoryContentStore<TMetric> {
Ok(vec![])
}

fn is_key_within_radius_and_unavailable(
&self,
key: &Self::Key,
) -> Result<ShouldWeStoreContent, ContentStoreError> {
fn should_we_store(&self, key: &Self::Key) -> Result<ShouldWeStoreContent, ContentStoreError> {
if key.affected_by_radius() && self.distance_to_key(key) > self.radius {
return Ok(ShouldWeStoreContent::NotWithinRadius);
}
Expand Down Expand Up @@ -251,18 +256,14 @@ pub mod test {
// Arbitrary key within radius and unavailable.
let arb_key = IdentityContentKey::new(node_id.raw());
assert_eq!(
store
.is_key_within_radius_and_unavailable(&arb_key)
.unwrap(),
store.should_we_store(&arb_key).unwrap(),
ShouldWeStoreContent::Store
);

// Arbitrary key available.
let _ = store.put(arb_key.clone(), val);
assert_eq!(
store
.is_key_within_radius_and_unavailable(&arb_key)
.unwrap(),
store.should_we_store(&arb_key).unwrap(),
ShouldWeStoreContent::AlreadyStored
);
}
Expand Down
20 changes: 10 additions & 10 deletions crates/subnetworks/beacon/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl ContentStore for BeaconStorage {
}

/// The "radius" concept is not applicable for Beacon network
fn is_key_within_radius_and_unavailable(
fn should_we_store(
&self,
key: &BeaconContentKey,
) -> Result<ShouldWeStoreContent, ContentStoreError> {
Expand Down Expand Up @@ -836,21 +836,21 @@ mod test {
assert_eq!(result, value.as_ssz_bytes());

// Test is_key_within_radius_and_unavailable for the same finalized slot
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for older finalized slot
let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey {
finalized_slot: finalized_slot - 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for newer finalized slot
let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey {
finalized_slot: finalized_slot + 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::Store);

// Test getting the latest finality update
Expand All @@ -875,21 +875,21 @@ mod test {
assert_eq!(result, value.as_ssz_bytes());

// Test is_key_within_radius_and_unavailable for the same signature slot
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for older signature slot
let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey {
signature_slot: signature_slot - 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for newer signature slot
let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey {
signature_slot: signature_slot + 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::Store);

// Test getting unavailable optimistic update
Expand Down Expand Up @@ -927,21 +927,21 @@ mod test {
assert_eq!(result, value.encode());

// Test is_key_within_radius_and_unavailable for the same epoch
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for older epoch
let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey {
epoch: epoch - 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for newer epoch
let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey {
epoch: epoch + 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::Store);

// Test getting unavailable historical summaries with proof
Expand Down
Loading