Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 25 additions & 20 deletions crates/portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,7 @@ impl<
content_key: TContentKey,
content_value: RawContentValue,
) -> PutContentInfo {
let should_we_store = match self
.store
.lock()
.is_key_within_radius_and_unavailable(&content_key)
{
let should_we_store = match self.store.lock().should_we_store(&content_key) {
Ok(should_we_store) => matches!(should_we_store, ShouldWeStoreContent::Store),
Err(err) => {
warn!(
Expand Down Expand Up @@ -492,12 +488,14 @@ impl<
Ok(Response::Content(found_content)) => {
match found_content {
Content::Content(content) => {
match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), false)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
let validation_result = self.validate_content(&content_key, &content).await;
if validation_result.is_valid() {
Ok((Content::Content(content), false))
} else {
Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {validation_result:?}",
self.protocol
))),
)))
}
}
Content::Enrs(_) => Ok((found_content, false)),
Expand All @@ -514,12 +512,14 @@ impl<
};
let content = RawContentValue::from(bytes);

match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), true)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
let validation_result = self.validate_content(&content_key, &content).await;
if validation_result.is_valid() {
Ok((Content::Content(content), true))
} else {
Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {validation_result:?}",
self.protocol
))),
)))
}
}
}
Expand All @@ -533,13 +533,18 @@ impl<
&self,
content_key: &TContentKey,
content: &[u8],
) -> anyhow::Result<ValidationResult<TContentKey>> {
) -> ValidationResult {
let validation_result = self.validator.validate_content(content_key, content).await;
self.metrics.report_validation(validation_result.is_ok());

validation_result.map_err(|err| {
anyhow!("Content validation failed for content key {content_key:?} with error: {err:?}")
})
self.metrics.report_validation(validation_result.is_valid());
if !validation_result.is_valid() {
warn!(
"Content validation failed for content key {}: {validation_result:?}",
content_key.to_bytes(),
)
}

validation_result
}

/// Initialize FindContent uTP stream with remote node
Expand Down
58 changes: 26 additions & 32 deletions crates/portalnet/src/overlay/service/find_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,37 +525,36 @@ impl<
.await;
utp_processing
.metrics
.report_validation(validation_result.is_ok());

let validation_result = match validation_result {
Ok(validation_result) => validation_result,
Err(err) => {
warn!(
error = ?err,
content.id = %hex_encode_compact(content_id),
content.key = %content_key,
"Error validating content"
);
// Indicate to the query that the content is invalid
let _ = valid_content_callback.send(None);
if let Some(query_trace_events_tx) = query_trace_events_tx {
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
query_id,
sending_peer,
QueryFailureKind::InvalidContent,
));
}
return;
.report_validation(validation_result.is_valid());

if !validation_result.is_valid() {
warn!(
content.id = %hex_encode_compact(content_id),
content.key = %content_key,
?validation_result,
"Error validating content"
);
// Indicate to the query that the content is invalid
let _ = valid_content_callback.send(None);
if let Some(query_trace_events_tx) = query_trace_events_tx {
let _ = query_trace_events_tx.send(QueryTraceEvent::Failure(
query_id,
sending_peer,
QueryFailureKind::InvalidContent,
));
}
};
return;
}

// skip storing if content is not valid for storing, the content
// is already stored or if there's an error reading the store
let should_store = validation_result.valid_for_storing
// store content that:
// - is canonically valid
// - is not already stored
// - is within radius (if applicable)
let should_store = validation_result.is_canonically_valid()
&& utp_processing
.store
.lock()
.is_key_within_radius_and_unavailable(&content_key)
.should_we_store(&content_key)
.map_or_else(
|err| {
error!("Unable to read store: {err}");
Expand All @@ -571,17 +570,12 @@ impl<
{
Ok(dropped_content) => {
let mut content_to_propagate = vec![(content_key.clone(), content.clone())];
if let Some(additional_content_to_propagate) =
validation_result.additional_content_to_propagate
{
content_to_propagate.push(additional_content_to_propagate);
}
if !dropped_content.is_empty() && utp_processing.gossip_dropped {
debug!(
"Dropped {:?} pieces of content after inserting new content, propagating them back into the network.",
dropped_content.len(),
);
content_to_propagate.extend(dropped_content.clone());
content_to_propagate.extend(dropped_content);
}
propagate_put_content_cross_thread::<_, TMetric>(
content_to_propagate,
Expand Down
46 changes: 10 additions & 36 deletions crates/portalnet/src/overlay/service/offer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,11 @@ impl<

for (i, key) in content_keys.iter().enumerate() {
// Accept content if within radius and not already present in the data store.
let accept = self
.store
.lock()
.is_key_within_radius_and_unavailable(key)
.map_err(|err| {
OverlayRequestError::AcceptError(format!(
"Unable to check content availability {err}"
))
})?;
let accept = self.store.lock().should_we_store(key).map_err(|err| {
OverlayRequestError::AcceptError(format!(
"Unable to check content availability {err}"
))
})?;
let accept_code = match accept {
ShouldWeStoreContent::Store => {
// accept all keys that are successfully added to the queue
Expand Down Expand Up @@ -558,45 +554,23 @@ impl<
.await;
utp_processing
.metrics
.report_validation(validation_result.is_ok());
.report_validation(validation_result.is_valid());

let validation_result = match validation_result {
Ok(validation_result) => validation_result,
Err(err) => {
// Skip storing & propagating content if it's not valid
warn!(
error = %err,
content.key = %key.to_hex(),
"Error validating accepted content"
);
return None;
}
};

if !validation_result.valid_for_storing {
// Content received via Offer/Accept should be valid for storing.
// If it isn't, don't store it and don't propagate it.
if !validation_result.is_canonically_valid() {
warn!(
content.key = %key.to_hex(),
"Error validating accepted content - not valid for storing"
?validation_result,
"Error validating accepted content",
);
return None;
}

// Collect all content to propagate
let mut content_to_propagate = vec![(key.clone(), content_value.clone())];
if let Some(additional_content_to_propagate) =
validation_result.additional_content_to_propagate
{
content_to_propagate.push(additional_content_to_propagate);
}

// Check if data should be stored, and store if it is within our radius and not
// already stored.
let key_desired = utp_processing
.store
.lock()
.is_key_within_radius_and_unavailable(&key);
let key_desired = utp_processing.store.lock().should_we_store(&key);
match key_desired {
Ok(ShouldWeStoreContent::Store) => {
match utp_processing.store.lock().put(key.clone(), &content_value) {
Expand Down
27 changes: 14 additions & 13 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub trait ContentStore {
fn get(&self, key: &Self::Key) -> Result<Option<RawContentValue>, ContentStoreError>;

/// Puts a piece of content into the store.
///
/// Returns a list of keys that were evicted from the store, which should be gossiped into the
/// network. In the future this might be updated to a separate table that stores a queue
/// of content keys to be gossiped and gossips them in a background task.
Expand All @@ -51,10 +52,17 @@ pub trait ContentStore {

/// Returns whether the content denoted by `key` is within the radius of the data store and not
/// already stored within the data store.
fn is_key_within_radius_and_unavailable(
fn should_we_store(&self, key: &Self::Key) -> Result<ShouldWeStoreContent, ContentStoreError>;

/// Performs [ContentStore::should_we_store] for multiple content keys.
///
/// The default implementation calls `self.should_we_store` for each key.
fn should_we_store_batch(
&self,
key: &Self::Key,
) -> Result<ShouldWeStoreContent, ContentStoreError>;
keys: &[Self::Key],
) -> Result<Vec<ShouldWeStoreContent>, ContentStoreError> {
keys.iter().map(|key| self.should_we_store(key)).collect()
}

/// Returns the radius of the data store.
fn radius(&self) -> Distance;
Expand Down Expand Up @@ -122,10 +130,7 @@ impl<TMetric: Metric> ContentStore for MemoryContentStore<TMetric> {
Ok(vec![])
}

fn is_key_within_radius_and_unavailable(
&self,
key: &Self::Key,
) -> Result<ShouldWeStoreContent, ContentStoreError> {
fn should_we_store(&self, key: &Self::Key) -> Result<ShouldWeStoreContent, ContentStoreError> {
if key.affected_by_radius() && self.distance_to_key(key) > self.radius {
return Ok(ShouldWeStoreContent::NotWithinRadius);
}
Expand Down Expand Up @@ -251,18 +256,14 @@ pub mod test {
// Arbitrary key within radius and unavailable.
let arb_key = IdentityContentKey::new(node_id.raw());
assert_eq!(
store
.is_key_within_radius_and_unavailable(&arb_key)
.unwrap(),
store.should_we_store(&arb_key).unwrap(),
ShouldWeStoreContent::Store
);

// Arbitrary key available.
let _ = store.put(arb_key.clone(), val);
assert_eq!(
store
.is_key_within_radius_and_unavailable(&arb_key)
.unwrap(),
store.should_we_store(&arb_key).unwrap(),
ShouldWeStoreContent::AlreadyStored
);
}
Expand Down
20 changes: 10 additions & 10 deletions crates/subnetworks/beacon/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl ContentStore for BeaconStorage {
}

/// The "radius" concept is not applicable for Beacon network
fn is_key_within_radius_and_unavailable(
fn should_we_store(
&self,
key: &BeaconContentKey,
) -> Result<ShouldWeStoreContent, ContentStoreError> {
Expand Down Expand Up @@ -836,21 +836,21 @@ mod test {
assert_eq!(result, value.as_ssz_bytes());

// Test is_key_within_radius_and_unavailable for the same finalized slot
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for older finalized slot
let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey {
finalized_slot: finalized_slot - 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for newer finalized slot
let key = BeaconContentKey::LightClientFinalityUpdate(LightClientFinalityUpdateKey {
finalized_slot: finalized_slot + 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::Store);

// Test getting the latest finality update
Expand All @@ -875,21 +875,21 @@ mod test {
assert_eq!(result, value.as_ssz_bytes());

// Test is_key_within_radius_and_unavailable for the same signature slot
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for older signature slot
let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey {
signature_slot: signature_slot - 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for newer signature slot
let key = BeaconContentKey::LightClientOptimisticUpdate(LightClientOptimisticUpdateKey {
signature_slot: signature_slot + 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::Store);

// Test getting unavailable optimistic update
Expand Down Expand Up @@ -927,21 +927,21 @@ mod test {
assert_eq!(result, value.encode());

// Test is_key_within_radius_and_unavailable for the same epoch
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for older epoch
let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey {
epoch: epoch - 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::AlreadyStored);

// Test is_key_within_radius_and_unavailable for newer epoch
let key = BeaconContentKey::HistoricalSummariesWithProof(HistoricalSummariesWithProofKey {
epoch: epoch + 1,
});
let should_store_content = storage.is_key_within_radius_and_unavailable(&key).unwrap();
let should_store_content = storage.should_we_store(&key).unwrap();
assert_eq!(should_store_content, ShouldWeStoreContent::Store);

// Test getting unavailable historical summaries with proof
Expand Down
Loading
Loading