Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 141 additions & 9 deletions rs/ethereum/cketh/minter/src/deposit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ async fn mint() {
}
}

/// Maximum fraction of time that `scrape_logs` is allowed to run before
/// yielding control via a timer. This allows the canister to stop between
/// scraping batches if a stop is requested.
const SCRAPE_LOGS_TIME_BUDGET_FRACTION: f64 = 0.75;

pub async fn scrape_logs() {
let _guard = match TimerGuard::new(TaskType::ScrapEthLogs) {
Ok(guard) => guard,
Expand All @@ -153,9 +158,96 @@ pub async fn scrape_logs() {
}
};
let max_block_spread = read_state(|s| s.max_block_spread_for_logs_scraping());
scrape_until_block::<ReceivedEthLogScraping>(last_block_number, max_block_spread).await;
scrape_until_block::<ReceivedErc20LogScraping>(last_block_number, max_block_spread).await;
scrape_until_block::<ReceivedEthOrErc20LogScraping>(last_block_number, max_block_spread).await;

// Calculate time budget for this scraping session.
// We allow scraping to run for up to 75% of the scraping interval,
// leaving time for other operations and allowing the canister to stop if needed.
let time_budget = Duration::from_secs_f64(
crate::SCRAPING_ETH_LOGS_INTERVAL.as_secs_f64() * SCRAPE_LOGS_TIME_BUDGET_FRACTION,
);
let deadline = ic_cdk::api::time() + time_budget.as_nanos() as u64;

scrape_logs_inner(last_block_number, max_block_spread, deadline).await;
}

/// Inner scraping function that respects a time deadline.
/// If the deadline is exceeded, it schedules a continuation via `set_timer(0)`
/// which allows the canister to stop between batches if a stop is requested.
async fn scrape_logs_inner(last_block_number: BlockNumber, max_block_spread: u16, deadline: u64) {
// Scrape each log type, but check deadline after each one
let result = scrape_until_block_or_deadline::<ReceivedEthLogScraping>(
last_block_number,
max_block_spread,
deadline,
)
.await;
if let ScrapeResult::DeadlineExceeded = result {
schedule_scrape_continuation(last_block_number, max_block_spread, deadline);
return;
}

let result = scrape_until_block_or_deadline::<ReceivedErc20LogScraping>(
last_block_number,
max_block_spread,
deadline,
)
.await;
if let ScrapeResult::DeadlineExceeded = result {
schedule_scrape_continuation(last_block_number, max_block_spread, deadline);
return;
}

let result = scrape_until_block_or_deadline::<ReceivedEthOrErc20LogScraping>(
last_block_number,
max_block_spread,
deadline,
)
.await;
if let ScrapeResult::DeadlineExceeded = result {
schedule_scrape_continuation(last_block_number, max_block_spread, deadline);
}
}

/// Schedule a continuation of scraping via a timer.
/// This function is called when the time deadline is exceeded, at which point
/// there are no pending HTTP outcalls. If the canister is in "Stopping" state,
/// this timer will NOT fire, allowing the canister to stop gracefully.
fn schedule_scrape_continuation(
last_block_number: BlockNumber,
max_block_spread: u16,
deadline: u64,
) {
log!(
DEBUG,
"[scrape_logs]: time budget exceeded, scheduling continuation via timer"
);
ic_cdk_timers::set_timer(Duration::from_secs(0), async move {
let _guard = match TimerGuard::new(TaskType::ScrapEthLogs) {
Ok(guard) => guard,
Err(_) => return,
};
scrape_logs_inner(last_block_number, max_block_spread, deadline).await;
});
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ScrapeResult {
/// All blocks up to the target were scraped successfully.
Completed,
/// Scraping stopped due to an error.
Error,
/// Scraping stopped because the time deadline was exceeded.
/// At this point there are no pending HTTP outcalls, so the canister
/// can be stopped if requested.
DeadlineExceeded,
}

#[derive(Debug)]
enum ScrapeBlockRangeError {
/// The time deadline was exceeded. No pending HTTP outcalls at this point.
DeadlineExceeded,
/// An RPC error occurred.
RpcError(MultiCallError<Vec<LogEntry>>),
}

pub async fn update_last_observed_block_number() -> Option<BlockNumber> {
Expand All @@ -182,7 +274,11 @@ pub async fn update_last_observed_block_number() -> Option<BlockNumber> {
}
}

async fn scrape_until_block<S>(last_block_number: BlockNumber, max_block_spread: u16)
async fn scrape_until_block_or_deadline<S>(
last_block_number: BlockNumber,
max_block_spread: u16,
deadline: u64,
) -> ScrapeResult
where
S: LogScraping,
{
Expand All @@ -194,7 +290,7 @@ where
"[scrape_contract_logs]: skipping scraping {} logs: not active",
S::ID
);
return;
return ScrapeResult::Completed;
}
};
let block_range = BlockRangeInclusive::new(
Expand All @@ -211,40 +307,76 @@ where
);
let rpc_client = read_state(rpc_client);
for block_range in block_range.into_chunks(max_block_spread) {
// Check if we've exceeded the time deadline before each chunk.
// This allows the canister to stop between chunks if a stop is requested.
// At this point there are no pending HTTP outcalls since we check BEFORE
// making the next call.
if ic_cdk::api::time() > deadline {
log!(
DEBUG,
"[scrape_contract_logs]: deadline exceeded for {} logs, will continue in next batch",
S::ID
);
return ScrapeResult::DeadlineExceeded;
}

match scrape_block_range::<S>(
&rpc_client,
scrape.contract_address,
scrape.topics.clone(),
block_range.clone(),
deadline,
)
.await
{
Ok(()) => {}
Err(e) => {
Err(ScrapeBlockRangeError::DeadlineExceeded) => {
log!(
DEBUG,
"[scrape_contract_logs]: deadline exceeded during {} logs scraping, will continue in next batch",
S::ID
);
return ScrapeResult::DeadlineExceeded;
}
Err(ScrapeBlockRangeError::RpcError(e)) => {
log!(
INFO,
"[scrape_contract_logs]: Failed to scrape {} logs in range {block_range}: {e:?}",
S::ID
);
return;
return ScrapeResult::Error;
}
}
}
ScrapeResult::Completed
}

async fn scrape_block_range<S>(
rpc_client: &EvmRpcClient<IcRuntime, CandidResponseConverter, DoubleCycles>,
contract_address: Address,
topics: Vec<Topic>,
block_range: BlockRangeInclusive,
) -> Result<(), MultiCallError<Vec<LogEntry>>>
deadline: u64,
) -> Result<(), ScrapeBlockRangeError>
where
S: LogScraping,
{
let mut subranges = VecDeque::new();
subranges.push_back(block_range);

while !subranges.is_empty() {
// Check deadline before each HTTP call. At this point there are no
// pending HTTP outcalls, so the canister can be stopped if requested.
if ic_cdk::api::time() > deadline {
log!(
DEBUG,
"[scrape_block_range]: deadline exceeded for {} logs with {} subranges remaining",
S::ID,
subranges.len()
);
return Err(ScrapeBlockRangeError::DeadlineExceeded);
}

let range = subranges.pop_front().unwrap();
let (from_block, to_block) = range.clone().into_inner();

Expand Down Expand Up @@ -300,7 +432,7 @@ where
}
} else {
log!(INFO, "Failed to get {} logs in range {range}: {e:?}", S::ID);
return Err(e);
return Err(ScrapeBlockRangeError::RpcError(e));
}
}
}
Expand Down
97 changes: 97 additions & 0 deletions rs/ethereum/cketh/minter/tests/cketh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,103 @@ fn should_be_unstoppable_while_scraping_blocks_has_open_call_context() {
);
}

/// This test verifies that the canister CAN be stopped when the deadline is exceeded
/// during scraping, because:
/// 1. The stop request is processed first (canister goes to "Stopping" state)
/// 2. When the HTTP response callback runs, it sees deadline exceeded and schedules a timer
/// 3. The scheduled timer does NOT fire because the IC doesn't execute timers for stopping canisters
/// 4. Since no new HTTP calls are made, the call context closes and the canister stops
#[test]
fn should_be_stoppable_when_time_budget_exceeded_during_scraping() {
const NUM_BLOCK_RANGES: usize = 10;

let cketh = CkEthSetup::default();
let max_eth_logs_block_range = cketh.as_ref().max_logs_block_range();
let max_block: u64 =
LAST_SCRAPED_BLOCK_NUMBER_AT_INSTALL + (NUM_BLOCK_RANGES as u64) * max_eth_logs_block_range;

// Start scraping - this triggers the scrape_logs timer which makes HTTP calls
cketh.env.advance_time(SCRAPING_ETH_LOGS_INTERVAL);

// Get the latest block number. This triggers scraping to start.
// After this, HTTP calls for block range 1 are pending.
MockJsonRpcProviders::when(JsonRpcMethod::EthGetBlockByNumber)
.respond_for_all_with(block_response(max_block))
.build()
.expect_rpc_calls(&cketh);

let stop_msg_id = cketh.env.stop_canister_non_blocking(cketh.minter_id);

// Tick to process the stop request. The canister goes to "Stopping" state.
// Note: The canister cannot become "Stopped" yet because there are pending HTTP outcalls.
cketh.env.tick();

// Verify the canister is in "Stopping" state
let status = cketh.minter_status();
assert_eq!(
status,
CanisterStatusType::Stopping,
"Expected minter to be in Stopping state while HTTP outcall is pending"
);

// Now advance time past the deadline (75% of SCRAPING_ETH_LOGS_INTERVAL = 135s).
cketh.env.advance_time(Duration::from_secs(140));

let from_block = BlockNumber::from(LAST_SCRAPED_BLOCK_NUMBER_AT_INSTALL + 1);
let to_block = from_block
.checked_add(BlockNumber::from(max_eth_logs_block_range))
.unwrap();

// Provide the response to block range 1.
// The callback will run with the NEW time (which exceeds the deadline).
// It will:
// 1. See that deadline is exceeded
// 2. Schedule a continuation timer
// 3. Return WITHOUT making HTTP calls for block range 2
MockJsonRpcProviders::when(JsonRpcMethod::EthGetLogs)
.with_request_params(json!([{
"fromBlock": from_block,
"toBlock": to_block,
"address": [ETH_HELPER_CONTRACT_ADDRESS],
"topics": [cketh.received_eth_event_topic()]
}]))
.respond_for_all_with(empty_logs())
.build()
.expect_rpc_calls(&cketh);

// Give a few ticks for the stop to complete.
// The timer scheduled by schedule_scrape_continuation should NOT fire
// because the canister is in "Stopping" state.
for _ in 0..10 {
cketh.env.tick();
}

// Verify the stop completed
let stop_result = cketh.env.ingress_status(&stop_msg_id);
match &stop_result {
IngressStatus::Known { state, .. } => {
assert!(
matches!(state, IngressState::Completed(_)),
"Expected stop to be Completed, but got: {:?}",
state
);
}
other => panic!(
"Expected IngressStatus::Known with Completed state, got: {:?}",
other
),
}

// Verify the canister is stopped
let final_status = cketh.minter_status();
assert_eq!(
final_status,
CanisterStatusType::Stopped,
"Expected minter to be Stopped - the timer-based solution should allow stopping \
between batches when the time budget is exceeded"
);
}

#[test]
fn should_panic_when_last_finalized_block_in_the_past() {
let cketh = CkEthSetup::default();
Expand Down
Loading