Skip to content

Commit

Permalink
add backfill to config
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso committed Feb 11, 2025
1 parent 4e70d2f commit 206b6cd
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 62 deletions.
4 changes: 1 addition & 3 deletions aptos-indexer-processors-sdk/sdk/src/common_steps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,5 @@ pub use arcify_step::ArcifyStep;
pub use order_by_version_step::OrderByVersionStep;
pub use timed_buffer_step::TimedBufferStep;
pub use transaction_stream_step::TransactionStreamStep;
pub use version_tracker_step::{
ProcessorStatusSaver, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
};
pub use version_tracker_step::{VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS};
pub use write_rate_limit_step::{Sizeable, WriteRateLimitConfig, WriteRateLimitStep};
Original file line number Diff line number Diff line change
@@ -1,85 +1,129 @@
use crate::{
config::{
indexer_processor_config::{DbConfig, IndexerProcessorConfig, ProcessorMode},
processor_status_saver::{
BackfillProgress, BackfillStatus, ProcessorStatus, ProcessorStatusSaver,
},
},
traits::{
pollable_async_step::PollableAsyncRunType, NamedStep, PollableAsyncStep, Processable,
},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
};
use anyhow::Result;
use aptos_indexer_transaction_stream::utils::parse_timestamp;
use async_trait::async_trait;
use std::marker::PhantomData;

pub const DEFAULT_UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;
/// The `ProcessorStatusSaver` trait object should be implemented in order to save the latest successfully
/// processed transaction versino to storage. I.e., persisting the `processor_status` to storage.
#[async_trait]
pub trait ProcessorStatusSaver {
// T represents the transaction type that the processor is tracking.
async fn save_processor_status(
&self,
last_success_batch: &TransactionContext<()>,
processor_name: &str,
) -> Result<(), ProcessorError>;

async fn get_latest_processed_version(&self, processor_name: &str) -> Result<Option<u64>>;
}

/// Tracks the versioned processing of sequential transactions, ensuring no gaps
/// occur between them.
///
/// Important: this step assumes ordered transactions. Please use the `OrederByVersionStep` before this step
/// if the transactions are not ordered.
pub struct VersionTrackerStep<T, S>
pub struct VersionTrackerStep<T, S, D>
where
Self: Sized + Send + 'static,
T: Send + 'static,
S: ProcessorStatusSaver + Send + 'static,
S: ProcessorStatusSaver + Send + Sync + 'static,
D: DbConfig + Send + Sync + Clone + 'static,
{
// Last successful batch of sequentially processed transactions. Includes metadata to write to storage.
last_success_batch: Option<TransactionContext<()>>,
polling_interval_secs: u64,
processor_status_saver: S,
processor_name: String,
processor_id: String,
processor_config: IndexerProcessorConfig<D>,
_marker: PhantomData<T>,
}

impl<T, S> VersionTrackerStep<T, S>
impl<T, S, D> VersionTrackerStep<T, S, D>
where
Self: Sized + Send + 'static,
T: Send + 'static,
S: ProcessorStatusSaver + Send + 'static,
S: ProcessorStatusSaver + Send + Sync + 'static,
D: DbConfig + Send + Sync + Clone + 'static,
{
pub fn new(
processor_status_saver: S,
polling_interval_secs: u64,
processor_name: &str,
processor_id: &str,
processor_config: IndexerProcessorConfig<D>,
) -> Self {
Self {
last_success_batch: None,
processor_status_saver,
polling_interval_secs,
processor_name: processor_name.to_string(),
processor_id: processor_id.to_string(),
processor_config,
_marker: PhantomData,
}
}

async fn save_processor_status(&mut self) -> Result<(), ProcessorError> {
if let Some(last_success_batch) = self.last_success_batch.as_ref() {
self.processor_status_saver
.save_processor_status(last_success_batch, &self.processor_name)
.await
match self.processor_config.mode {
ProcessorMode::Default => {
self.processor_status_saver
.save_processor_status(ProcessorStatus {
processor_id: self.processor_id.clone(),
last_success_version: last_success_batch.metadata.end_version as i64,
last_transaction_timestamp: last_success_batch
.metadata
.end_transaction_timestamp
.as_ref()
.map(|t| {
parse_timestamp(
t,
last_success_batch.metadata.end_version as i64,
)
.naive_utc()
}),
})
.await
},
ProcessorMode::Backfill => {
let backfill_config = self.processor_config.backfill_config.as_ref().unwrap();
let lst_success_version = last_success_batch.metadata.end_version as i64;
let backfill_status =
if lst_success_version >= backfill_config.ending_version as i64 {
BackfillStatus::Complete
} else {
BackfillStatus::InProgress
};
let backfill_progress = BackfillProgress {
backfill_id: backfill_config.backfill_id.clone(),
backfill_status,
backfill_start_version: backfill_config.initial_starting_version,
backfill_end_version: backfill_config.ending_version,
last_success_version: lst_success_version,
last_transaction_timestamp: last_success_batch
.metadata
.end_transaction_timestamp
.as_ref()
.map(|t| parse_timestamp(t, lst_success_version).naive_utc()),
};
self.processor_status_saver
.save_backfill_processor_status(backfill_progress)
.await
},
ProcessorMode::Testing => Ok(()),
}
} else {
Ok(())
}
}
}

#[async_trait]
impl<T, S> Processable for VersionTrackerStep<T, S>
impl<T, S, D> Processable for VersionTrackerStep<T, S, D>
where
Self: Sized + Send + 'static,
T: Send + 'static,
S: ProcessorStatusSaver + Send + 'static,
S: ProcessorStatusSaver + Send + Sync + 'static,
D: DbConfig + Send + Sync + Clone + 'static,
{
type Input = T;
type Output = T;
Expand Down Expand Up @@ -121,11 +165,12 @@ where
}

#[async_trait]
impl<T: Send + 'static, S> PollableAsyncStep for VersionTrackerStep<T, S>
impl<T: Send + 'static, S, D> PollableAsyncStep for VersionTrackerStep<T, S, D>
where
Self: Sized + Send + Sync + 'static,
T: Send + Sync + 'static,
S: ProcessorStatusSaver + Send + Sync + 'static,
D: DbConfig + Send + Sync + Clone + 'static,
{
fn poll_interval(&self) -> std::time::Duration {
std::time::Duration::from_secs(self.polling_interval_secs)
Expand All @@ -139,11 +184,12 @@ where
}
}

impl<T, S> NamedStep for VersionTrackerStep<T, S>
impl<T, S, D> NamedStep for VersionTrackerStep<T, S, D>
where
Self: Sized + Send + 'static,
T: Send + 'static,
S: ProcessorStatusSaver + Send + 'static,
S: ProcessorStatusSaver + Send + Sync + 'static,
D: DbConfig + Send + Sync + Clone + 'static,
{
fn name(&self) -> String {
format!("VersionTrackerStep: {}", std::any::type_name::<T>())
Expand Down
132 changes: 103 additions & 29 deletions aptos-indexer-processors-sdk/sdk/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::processor_status_saver::BackfillStatus;
use crate::{
aptos_indexer_transaction_stream::TransactionStreamConfig, common_steps::ProcessorStatusSaver,
aptos_indexer_transaction_stream::TransactionStreamConfig,
config::processor_status_saver::ProcessorStatusSaver,
};
use anyhow::{Context, Result};
use async_trait::async_trait;
Expand All @@ -24,7 +26,7 @@ pub enum ProcessorMode {
Testing,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Deserialize, Debug, Serialize)]
#[serde(deny_unknown_fields)]
pub struct IndexerProcessorConfig<D> {
pub processor_name: String,
Expand All @@ -40,43 +42,115 @@ pub struct IndexerProcessorConfig<D> {
impl<D> IndexerProcessorConfig<D>
where
Self: Send + Sync + 'static,
D: DbConfig + Send + Sync + Clone + 'static,
D: DbConfig + DeserializeOwned + Send + Sync + Clone + 'static,
{
fn validate(&self) -> Result<(), String> {
// pub fn validate(&self) -> Result<(), String> {
// match self.mode {
// ProcessorMode::Testing => {
// if self.testing_config.is_none() {
// return Err("testing_config must be present when mode is 'testing'".to_string());
// }
// },
// ProcessorMode::Backfill => {
// if self.backfill_config.is_none() {
// return Err(
// "backfill_config must be present when mode is 'backfill'".to_string()
// );
// }
// },
// ProcessorMode::Default => {},
// }
// Ok(())
// }

pub async fn get_starting_version(&self) -> Result<u64> {
match self.mode {
ProcessorMode::Testing => {
if self.testing_config.is_none() {
return Err("testing_config must be present when mode is 'testing'".to_string());
}
let testing_config = self
.testing_config
.clone()
.context("testing_config must be present when mode is 'testing'")?;
Ok(testing_config.override_starting_version)
},
ProcessorMode::Default => {
// Check if there's a checkpoint in the approrpiate processor status table.
let processor_status = self
.db_config
.clone()
.into_runnable_config()
.await
.context("Failed to initialize DB config")?
.get_processor_status(&self.processor_name)
.await
.context("Failed to get latest processed version from DB")?;

let default_starting_version = self
.bootstrap_config
.clone()
.map_or(0, |config| config.initial_starting_version);

Ok(processor_status.map_or(default_starting_version, |status| {
std::cmp::max(status.last_success_version as u64, default_starting_version)
}))
},
ProcessorMode::Backfill => {
if self.backfill_config.is_none() {
return Err(
"backfill_config must be present when mode is 'backfill'".to_string()
);
let backfill_config = self
.backfill_config
.clone()
.context("backfill_config must be present when mode is 'backfill'")?;
let backfill_status_option = self
.db_config
.clone()
.into_runnable_config()
.await
.context("Failed to initialize DB config")?
.get_backfill_processor_status(&backfill_config.backfill_id)
.await
.context("Failed to query backfill_processor_status table.")?;

// Return None if there is no checkpoint, if the backfill is old (complete), or if overwrite_checkpoint is true.
// Otherwise, return the checkpointed version + 1.
if let Some(status) = backfill_status_option {
// If the backfill is complete and overwrite_checkpoint is false, return the ending_version to end the backfill.
if status.backfill_status == BackfillStatus::Complete
&& !backfill_config.overwrite_checkpoint
{
return Ok(backfill_config.ending_version);
}
// If status is Complete or overwrite_checkpoint is true, this is the start of a new backfill job.
if backfill_config.overwrite_checkpoint {
return Ok(backfill_config.initial_starting_version);
}

// `backfill_config.initial_starting_version` is NOT respected.
// Return the last success version + 1.
let starting_version = status.last_success_version as u64 + 1;
log_ascii_warning(starting_version);
Ok(starting_version)
} else {
Ok(backfill_config.initial_starting_version)
}
},
ProcessorMode::Default => {},
}
Ok(())
}
}

pub async fn get_starting_version(&self) -> Result<u64> {
// Check if there's a checkpoint in the approrpiate processor status table.
let db_config = self.db_config.clone();
let _latest_processed_version_from_db = db_config
.into_runnable_config()
.await
.context("Failed to initialize DB config")?
.get_latest_processed_version(&self.processor_name)
.await
.context("Failed to get latest processed version from DB")?;

// If nothing checkpointed, return the `starting_version` from the config, or 0 if not set.
// Ok(latest_processed_version_from_db
// .unwrap_or(self.transaction_stream_config.starting_version.unwrap_or(0)))
Ok(0)
}
fn log_ascii_warning(version: u64) {
println!(
r#"
██╗ ██╗ █████╗ ██████╗ ███╗ ██╗██╗███╗ ██╗ ██████╗ ██╗
██║ ██║██╔══██╗██╔══██╗████╗ ██║██║████╗ ██║██╔════╝ ██║
██║ █╗ ██║███████║██████╔╝██╔██╗ ██║██║██╔██╗ ██║██║ ███╗██║
██║███╗██║██╔══██║██╔══██╗██║╚██╗██║██║██║╚██╗██║██║ ██║╚═╝
╚███╔███╔╝██║ ██║██║ ██║██║ ╚████║██║██║ ╚████║╚██████╔╝██╗
╚══╝╚══╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝
=================================================================
This backfill job is resuming progress at version {}
=================================================================
"#,
version
);
}

// Note: You'll need to create a concrete implementation of this trait
Expand Down
1 change: 1 addition & 0 deletions aptos-indexer-processors-sdk/sdk/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod indexer_processor_config;
pub mod processor_status_saver;
Loading

0 comments on commit 206b6cd

Please sign in to comment.