Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,6 @@ pub struct SenderAccountArgs {
/// Prefix used to bypass limitations of global actor registry (used for tests)
pub prefix: Option<String>,

/// Configuration for retry scheduler in case sender is denied
pub retry_interval: Duration,

/// Sender type, used to decide which set of tables to use
pub sender_type: SenderType,
}
Expand Down Expand Up @@ -343,8 +340,6 @@ pub struct State {
/// Sender Balance used to verify if it has money in
/// the escrow to pay for all non-redeemed fees (ravs and receipts)
sender_balance: U256,
/// Configuration for retry scheduler in case sender is denied
retry_interval: Duration,

/// Adaptative limiter for concurrent Rav Request
///
Expand Down Expand Up @@ -547,7 +542,6 @@ impl State {
If this doesn't work, open an issue on our Github."
)
})?;
self.backoff_info.ok();
self.rav_request_for_allocation(allocation_id).await
}

Expand Down Expand Up @@ -586,12 +580,14 @@ impl State {
match rav_result {
Ok(signed_rav) => {
self.sender_fee_tracker.ok_rav_request(allocation_id);
self.backoff_info.ok();
self.adaptive_limiter.on_success();
let rav_value = signed_rav.map_or(0, |rav| rav.value_aggregate);
self.update_rav(allocation_id, rav_value);
}
Err(err) => {
self.sender_fee_tracker.failed_rav_backoff(allocation_id);
self.backoff_info.fail();
self.adaptive_limiter.on_failure();
tracing::error!(
"Error while requesting RAV for sender {} and allocation {}: {}",
Expand Down Expand Up @@ -643,6 +639,46 @@ impl State {
}
}

fn cancel_scheduled_retry(&mut self) {
if let Some(scheduled_rav_request) = self.scheduled_rav_request.take() {
tracing::debug!(sender = %self.sender, "Aborting scheduled RAV request");
scheduled_rav_request.abort();
}
}

fn next_retry_delay(&self) -> Duration {
let global_remaining = self.backoff_info.remaining();
let allocation_remaining = self.sender_fee_tracker.min_remaining_backoff();

match (global_remaining, allocation_remaining) {
(Some(global), Some(allocation)) => std::cmp::max(global, allocation),
(Some(global), None) => global,
(None, Some(allocation)) => allocation,
(None, None) => Duration::ZERO,
}
}

fn schedule_retry(
&mut self,
myself: &ActorRef<SenderAccountMessage>,
allocation_id: AllocationId,
) {
self.cancel_scheduled_retry();

let delay = self.next_retry_delay();
tracing::debug!(
sender = %self.sender,
%allocation_id,
delay = ?delay,
"Scheduling RAV retry"
);

let retry_allocation_id = allocation_id;
self.scheduled_rav_request = Some(myself.send_after(delay, move || {
SenderAccountMessage::UpdateReceiptFees(retry_allocation_id, ReceiptFees::Retry)
}));
}

/// Determines whether the sender should be denied/blocked based on current fees and balance.
///
/// The deny condition is reached when either:
Expand Down Expand Up @@ -824,7 +860,6 @@ impl Actor for SenderAccount {
sender_aggregator_endpoint,
allocation_ids,
prefix,
retry_interval,
sender_type,
}: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Expand Down Expand Up @@ -1161,9 +1196,7 @@ impl Actor for SenderAccount {
sender: sender_id,
denied,
sender_balance,
retry_interval,
adaptive_limiter: AdaptiveLimiter::new(INITIAL_RAV_REQUEST_CONCURRENT, 1..50),
escrow_accounts,
escrow_subgraph,
network_subgraph,
domain_separator,
Expand Down Expand Up @@ -1283,10 +1316,7 @@ impl Actor for SenderAccount {
);
}
// If we're here because of a new receipt, abort any scheduled UpdateReceiptFees
if let Some(scheduled_rav_request) = state.scheduled_rav_request.take() {
tracing::debug!(sender = %state.sender, "Aborting scheduled RAV request");
scheduled_rav_request.abort();
}
state.cancel_scheduled_retry();

match receipt_fees {
ReceiptFees::NewReceipt(value, timestamp_ns) => {
Expand Down Expand Up @@ -1425,13 +1455,7 @@ impl Actor for SenderAccount {
// Action: Schedule another retry to attempt RAV creation again.
(true, true) => {
// retry in a moment
state.scheduled_rav_request =
Some(myself.send_after(state.retry_interval, move || {
SenderAccountMessage::UpdateReceiptFees(
allocation_id,
ReceiptFees::Retry,
)
}));
state.schedule_retry(&myself, allocation_id);
}
_ => {}
}
Expand Down Expand Up @@ -1766,8 +1790,6 @@ pub mod tests {

/// Prefix shared between tests so we don't have conflicts in the global registry
const BUFFER_DURATION: Duration = Duration::from_millis(100);
const RETRY_DURATION: Duration = Duration::from_millis(1000);

async fn setup_mock_escrow_subgraph() -> MockServer {
let mock_escrow_subgraph_server: MockServer = MockServer::start().await;
mock_escrow_subgraph_server
Expand Down Expand Up @@ -2248,15 +2270,15 @@ pub mod tests {
.unwrap();
flush_messages(&mut msg_receiver).await;

// wait to try again so it's outside the buffer
tokio::time::sleep(RETRY_DURATION).await;
// wait briefly to allow the retry loop to process
tokio::time::sleep(Duration::from_millis(50)).await;
assert_triggered!(triggered_rav_request);

// Verify that no additional retry happens since the first RAV request
// successfully cleared the unaggregated fees and resolved the deny condition.
// This validates that the retry mechanism stops when the underlying issue is resolved,
// which is the correct behavior according to the TAP protocol and retry logic.
tokio::time::sleep(RETRY_DURATION).await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert_not_triggered!(triggered_rav_request);
}

Expand Down
4 changes: 0 additions & 4 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
fmt::Display,
str::FromStr,
sync::LazyLock,
time::Duration,
};

use anyhow::{anyhow, bail};
Expand Down Expand Up @@ -39,8 +38,6 @@ static RECEIPTS_CREATED: LazyLock<CounterVec> = LazyLock::new(|| {
.unwrap()
});

const RETRY_INTERVAL: Duration = Duration::from_secs(30);

/// Notification received by pgnotify for V1 (legacy) receipts
///
/// This contains a list of properties that are sent by postgres when a V1 receipt is inserted
Expand Down Expand Up @@ -1069,7 +1066,6 @@ impl State {
.clone(),
allocation_ids,
prefix: self.prefix.clone(),
retry_interval: RETRY_INTERVAL,
sender_type,
})
}
Expand Down
41 changes: 19 additions & 22 deletions crates/tap-agent/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,44 @@

//! # backoff
//!
//! This module is used to provide a helper that keep tracks of exponential backoff information in a
//! non-blocking way. This is important since Actors process one message at a time, and a sleep in
//! the middle would affect performance.
//!
//! This way we just mark something as "in backoff" and just check that information before sending
//! the request.
//!
//! This module is also used by [crate::tracker].
//! Helper for tracking exponential backoff windows without blocking the async actor loop. Actors
//! process one message at a time, so we just mark the next instant when work is allowed again and
//! query that metadata before firing a request.

use std::time::{Duration, Instant};

/// Backoff information based on [Instant]
/// Backoff information based on [`Instant`]
#[derive(Debug, Clone)]
pub struct BackoffInfo {
failed_count: u32,
failed_backoff_time: Instant,
}

impl BackoffInfo {
/// Callback representing a successful request
///
/// This resets the failed_count
/// Marks a successful attempt, resetting counters and clearing any pending backoff delay.
pub fn ok(&mut self) {
self.failed_count = 0;
self.failed_backoff_time = Instant::now();
}

/// Callback representing a failed request
///
/// This sets the backoff time to max(100ms * 2 ^ retries, 60s)
/// Marks a failed attempt, growing the backoff delay exponentially up to 60 seconds.
pub fn fail(&mut self) {
// backoff = max(100ms * 2 ^ retries, 60s)
self.failed_backoff_time = Instant::now()
+ (Duration::from_millis(100) * 2u32.pow(self.failed_count))
.min(Duration::from_secs(60));
let delay =
(Duration::from_millis(100) * 2u32.pow(self.failed_count)).min(Duration::from_secs(60));
self.failed_backoff_time = Instant::now() + delay;
self.failed_count += 1;
}

/// Returns if backoff is in process
/// Returns the remaining backoff duration, if the current attempt should keep waiting.
pub fn remaining(&self) -> Option<Duration> {
self.failed_backoff_time
.checked_duration_since(Instant::now())
.filter(|remaining| !remaining.is_zero())
}

/// Returns whether the caller is still inside the backoff window.
pub fn in_backoff(&self) -> bool {
let now = Instant::now();
now < self.failed_backoff_time
self.remaining().is_some()
}
}

Expand Down
50 changes: 48 additions & 2 deletions crates/tap-agent/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ pub const RECEIPT_LIMIT: u64 = 10000;
pub const DUMMY_URL: &str = "http://localhost:1234";
pub const ESCROW_VALUE: u128 = 1000;
const BUFFER_DURATION: Duration = Duration::from_millis(100);
const RETRY_DURATION: Duration = Duration::from_millis(1000);
const RAV_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
const TAP_SENDER_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -182,7 +181,6 @@ pub async fn create_sender_account(
sender_aggregator_endpoint: aggregator_url,
allocation_ids: HashSet::new(),
prefix: Some(prefix.clone()),
retry_interval: RETRY_DURATION,
sender_type: SenderType::Legacy,
};

Expand Down Expand Up @@ -1101,3 +1099,51 @@ pub mod actors {
(rx, sender_account)
}
}

#[cfg(test)]
mod retry_helpers_tests {
use super::*;
use crate::{backoff::BackoffInfo, tracker::SenderFeeTracker};
use std::time::Duration;
use thegraph_core::alloy::primitives::Address;

#[test]
fn backoff_info_remaining_resets_after_ok() {
let mut info = BackoffInfo::default();
assert!(info.remaining().is_none());
assert!(!info.in_backoff());

info.fail();
let remaining = info
.remaining()
.expect("backoff duration should be tracked after a failure");
assert!(remaining > Duration::ZERO);
assert!(info.in_backoff());

info.ok();
assert!(info.remaining().is_none());
assert!(!info.in_backoff());
}

#[test]
fn sender_fee_tracker_min_remaining_backoff_roundtrip() {
let mut tracker = SenderFeeTracker::new(Duration::ZERO);
assert!(tracker.min_remaining_backoff().is_none());

let allocation = Address::from_low_u64_be(1);
tracker.failed_rav_backoff(allocation);
let first_delay = tracker
.min_remaining_backoff()
.expect("tracker should expose backoff after failure");
assert!(first_delay > Duration::ZERO);

tracker.failed_rav_backoff(allocation);
let second_delay = tracker
.min_remaining_backoff()
.expect("tracker should keep tracking backoff durations");
assert!(second_delay >= first_delay);

tracker.ok_rav_request(allocation);
assert!(tracker.min_remaining_backoff().is_none());
}
}
12 changes: 12 additions & 0 deletions crates/tap-agent/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ mod tracker_tests;

pub use generic_tracker::GlobalFeeTracker;

use std::time::Duration;

use crate::agent::unaggregated_receipts::UnaggregatedReceipts;

/// Simple Tracker used for just `u128` fees and no extra blocking or unblocking feature
Expand All @@ -41,6 +43,16 @@ pub type SimpleFeeTracker = GenericTracker<u128, u128, NoExtraData, u128>;
pub type SenderFeeTracker =
GenericTracker<GlobalFeeTracker, SenderFeeStats, DurationInfo, UnaggregatedReceipts>;

impl SenderFeeTracker {
/// Returns the smallest remaining backoff across all tracked allocations, if any
pub fn min_remaining_backoff(&self) -> Option<Duration> {
self.id_to_fee
.values()
.filter_map(|stats| stats.backoff_info.remaining())
.min()
}
}

/// Stats trait used by the Counter of a given allocation.
///
/// This is the data that goes in the Value side of the Map inside our Tracker
Expand Down