-
Notifications
You must be signed in to change notification settings - Fork 431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(relayer): don't retry old messages #5455
base: main
Are you sure you want to change the base?
Conversation
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #5455 +/- ##
=======================================
Coverage 77.53% 77.53%
=======================================
Files 103 103
Lines 2110 2110
Branches 190 190
=======================================
Hits 1636 1636
Misses 453 453
Partials 21 21
|
PendingMessage::calculate_msg_backoff(num_retries).map(|dur| Instant::now() + dur); | ||
pm.num_retries = num_retries; | ||
pm.next_attempt_after = next_attempt_after; | ||
fn get_message_status( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic is unchanged, just refactored out of from_persisted_retries
for easier unit testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
// Skip this message if it has been retried too many times | ||
if let Some(max_retries) = retries_before_skipping { | ||
if num_retries >= max_retries { | ||
return None; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is new logic, the rest is refactored out of from_persisted_retries
fn fmt<'a>(&self, f: &mut std::fmt::Formatter<'a>) -> std::fmt::Result; | ||
} | ||
|
||
impl HyperlaneDb for Db { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verbose but that's just mockall unfortunately
msg, | ||
self.destination_ctxs[&destination].clone(), | ||
app_context, | ||
Some(DEFAULT_MAX_MESSAGE_RETRIES), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should read the env var here as well
.origin_db | ||
fn maybe_get_num_retries( | ||
origin_db: Arc<dyn HyperlaneDb>, | ||
message: HyperlaneMessage, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method does not need to take ownership of message
. I wonder if passing as a reference will be sufficient.
pm.next_attempt_after = next_attempt_after; | ||
fn get_message_status( | ||
origin_db: Arc<dyn HyperlaneDb>, | ||
message: HyperlaneMessage, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method does not need to take ownership of message. I wonder if passing as a reference will be sufficient.
retries_before_skipping, | ||
)?; | ||
|
||
let message_status = Self::get_message_status(ctx.origin_db.clone(), message.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid the clone here if get_message_status
does not take ownership of its message
parameter.
const SECS_PER_MINUTE: u64 = 60; | ||
const MINS_PER_HOUR: u64 = 60; | ||
const HOURS_PER_DAY: u64 = 24; | ||
const DAYS_PER_WEEK: u64 = 7; | ||
const SECS_PER_WEEK: u64 = SECS_PER_MINUTE * MINS_PER_HOUR * HOURS_PER_DAY * DAYS_PER_WEEK; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const SECS_PER_MINUTE: u64 = 60; | |
const MINS_PER_HOUR: u64 = 60; | |
const HOURS_PER_DAY: u64 = 24; | |
const DAYS_PER_WEEK: u64 = 7; | |
const SECS_PER_WEEK: u64 = SECS_PER_MINUTE * MINS_PER_HOUR * HOURS_PER_DAY * DAYS_PER_WEEK; | |
let duration = Duration::from_seconds(chrono::Duration::weeks(1).num_seconds()) |
use crate::msg::pending_message::DEFAULT_MAX_MESSAGE_RETRIES; | ||
use hyperlane_base::db::*; | ||
use hyperlane_core::*; | ||
use std::{fmt::Debug, sync::Arc}; | ||
|
||
use super::PendingMessage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
use crate::msg::pending_message::DEFAULT_MAX_MESSAGE_RETRIES; | |
use hyperlane_base::db::*; | |
use hyperlane_core::*; | |
use std::{fmt::Debug, sync::Arc}; | |
use super::PendingMessage; | |
use std::{fmt::Debug, sync::Arc}; | |
use hyperlane_base::db::*; | |
use hyperlane_core::*; | |
use crate::msg::pending_message::DEFAULT_MAX_MESSAGE_RETRIES; | |
use super::PendingMessage; |
fn dummy_db_with_retries(retries: u32) -> MockDb { | ||
let mut db = MockDb::new(); | ||
db.expect_retrieve_pending_message_retry_count_by_message_id() | ||
.returning(move |_| Ok(Some(retries))); | ||
db | ||
} | ||
|
||
fn assert_get_num_retries( | ||
mock_retries: u32, | ||
expected_retries: Option<u32>, | ||
retries_before_skipping: Option<u32>, | ||
) { | ||
let db = dummy_db_with_retries(mock_retries); | ||
let num_retries = PendingMessage::maybe_get_num_retries( | ||
Arc::new(db), | ||
HyperlaneMessage::default(), | ||
retries_before_skipping, | ||
); | ||
|
||
// retry count is the same, because `retries_before_skipping` is `None` | ||
assert_eq!(num_retries, expected_retries); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move this auxiliary methods to the bottom of the file
@@ -516,44 +517,37 @@ impl PendingOperation for PendingMessage { | |||
|
|||
impl PendingMessage { | |||
/// Constructor that tries reading the retry count from the HyperlaneDB in order to recompute the `next_attempt_after`. | |||
/// If the message has been retried more than `max_retries`, it will return `None`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: max_retries seems stale
@@ -562,15 +556,45 @@ impl PendingMessage { | |||
0 | |||
} | |||
}; | |||
// Skip this message if it has been retried too many times | |||
if let Some(max_retries) = retries_before_skipping { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels a little awkward for this function to be the place where the decision of whether to skip is made?
I would've thought that we'd have get_num_retries
always return a u32 that is accurate (easier to reason about it then imo), and then in maybe_from_persisted_retries
we make the judgement call whether to ignore the message based off that number. The benefit being that this retry skipping logic isn't so intertwined in multiple places
@@ -651,6 +675,10 @@ impl PendingMessage { | |||
/// given the number of retries. | |||
/// `pub(crate)` for testing purposes | |||
pub(crate) fn calculate_msg_backoff(num_retries: u32) -> Option<Duration> { | |||
let max_retries = std::env::var("MAX_MESSAGE_RETRIES") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to make this not need to read / parse the env var over and over again? as we call this many times, especially upon startup -- i.e. add a max_retries: Option<u32>
as a param or something
would also probably be compatible with pulling out this logic to get access to the max retries for calling maybe_from_persisted_retries
as well
// Skip this message if it has been retried too many times | ||
if let Some(max_retries) = retries_before_skipping { | ||
if num_retries >= max_retries { | ||
return None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious if you think we should log or not (I don't have a strong opinion)
same goes for when a message already in the prep queue now becomes not deliverable bc it hits this max retries
} | ||
}; | ||
retries_before_skipping: Option<u32>, | ||
) -> Option<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes me realize that we wouldn't be able to use the API to trigger one of these to be retried? is that intentional or should we prefer to just put them in the prep queue but never be retried unless we get an API call asking for it?
@@ -651,6 +675,10 @@ impl PendingMessage { | |||
/// given the number of retries. | |||
/// `pub(crate)` for testing purposes | |||
pub(crate) fn calculate_msg_backoff(num_retries: u32) -> Option<Duration> { | |||
let max_retries = std::env::var("MAX_MESSAGE_RETRIES") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably would've done the same here fwiw but noting that our preference for ad hoc env vars is a result of our settings logic being so hard to work with that it's easier to add little "debt" pieces like this. Makes me a little concerned
Description
We've recently noticed the submitter attempts to process many old messages at the same time, because they have very similar
next_retry_attempt
s. These old messages are unlikely to be processable, and in fact starve new "healthy" messages, sometimes leading to lowered throughput.There was an attempt to make this less of an issue in #5416, but we decided old messages are not worth retrying at all given we offer the ability to deliver messages on-demand in the hyperlane CLI.
Not retrying old messages means we don't even have to push them to the submitter when loaded from the db - we just skip them.
This PR:
MAX_MESSAGE_RETRIES
env var which defaults to 70 if not set. The default of 70 is picked under the assumption that the first 48 retries take about 1 day (formula). The remaining 22 retries will take at least21 * 22 / 2
hours (see the formula), which is about 11 days. In total that is almost 2 weeks, which @nambrot confirmed is how long we should retry for. If the default is incorrect we can set a higher env var to override. I also manually checked the queues for the arbitrum and base destinations, and there was no message with a retry count of 100 but most old ones had more than 70 attempts.Duration::from_secs(u32::MAX)
as the next retry attempt, which is far enough into the future as to not be attempted at allPrepare queues are expected to get much smaller because of the skipping, which hopefully also lowers memory pressure.
Drive-by changes
MessageContext
a trait obj for easier future mockingRelated issues
Backward compatibility
Yes - if
MAX_MESSAGE_RETRIES
is set to a very high value, it's as if this feature is disabledTesting
Unit testing of the message backoff logic and of inner logic in
maybe_from_persisted_retries
. The env var loading isn't tested but is very similar to how we load env vars elsewhere - longer term it should be turned into a proper agent config (the other ones too).