Skip to content

Commit

Permalink
feat(pindexer): optional exit-on-catchup (#5101)
Browse files Browse the repository at this point in the history
## Describe your changes
For running batch jobs managing indexing pipelines, it'd be convenient
to instruct pindexer to exit after it's finished processing the events
that's available to it. This situation arises frequently in batch
processing, where pindexer will be pointed at a source db that was
restored from a db. Having pindexer exit 0 on success, else non-zero on
error, will aid in building out test infrastructure.

## Issue ticket number and link

N/A

## Checklist before requesting a review

- [ ] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

> only affects runtime options for `pindexer`, no changes to protocol or
consensus logic

---------

Co-authored-by: Lúcás Meier <[email protected]>
  • Loading branch information
conorsch and cronokirby authored Feb 21, 2025
1 parent 9e1c21d commit 8e45795
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
18 changes: 13 additions & 5 deletions crates/util/cometindex/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,26 @@ use indexing_state::{Height, IndexingState};
use std::sync::Arc;
use tokio::{sync::mpsc, task::JoinSet};

/// Attempt to catch up to the latest indexed block.
///
/// Returns whether or not we've caught up.
#[tracing::instrument(skip_all)]
async fn catchup(
state: &IndexingState,
indices: &[Arc<dyn AppView>],
genesis: Arc<serde_json::Value>,
) -> anyhow::Result<()> {
) -> anyhow::Result<bool> {
if indices.len() <= 0 {
tracing::info!(why = "no indices", "catchup completed");
return Ok(());
return Ok(true);
}

let (src_height, index_heights) = tokio::try_join!(state.src_height(), state.index_heights())?;
tracing::info!(?src_height, ?index_heights, "catchup status");
let lowest_index_height = index_heights.values().copied().min().unwrap_or_default();
if lowest_index_height >= src_height {
tracing::info!(why = "already caught up", "catchup completed");
return Ok(());
return Ok(true);
}

// Constants that influence performance.
Expand Down Expand Up @@ -103,7 +106,7 @@ async fn catchup(
while let Some(res) = tasks.join_next().await {
res??;
}
Ok(())
Ok(false)
}

pub struct Indexer {
Expand Down Expand Up @@ -139,6 +142,7 @@ impl Indexer {
chain_id: _,
poll_ms,
genesis_json,
exit_on_catchup,
},
indices: indexes,
} = self;
Expand All @@ -156,7 +160,11 @@ impl Indexer {
.clone(),
);
loop {
catchup(&state, indexes.as_slice(), app_state.clone()).await?;
let caught_up = catchup(&state, indexes.as_slice(), app_state.clone()).await?;
if exit_on_catchup && caught_up {
tracing::info!("catchup completed, exiting as requested");
return Ok(());
}
tokio::time::sleep(poll_ms).await;
}
}
Expand Down
6 changes: 6 additions & 0 deletions crates/util/cometindex/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub struct Options {
/// A file path for the genesis file to use when initializing the indexer.
#[clap(short, long)]
pub genesis_json: PathBuf,

/// By default, the program will run as a daemon, continuously polling the src database
/// for new events. If --exit-on-catchup is set, the program will instead exit after
/// it has indexed all events in the src database. Useful for batch jobs.
#[clap(long)]
pub exit_on_catchup: bool,
}

/// Parses a string containing a [`Duration`], represented as a number of milliseconds.
Expand Down

0 comments on commit 8e45795

Please sign in to comment.