From bd642ddefc1ff71d4a4186337305531b30a7adb6 Mon Sep 17 00:00:00 2001 From: Sean Derman Yang <38265412+dermanyang@users.noreply.github.com> Date: Fri, 7 Feb 2025 22:54:39 -0800 Subject: [PATCH] Make backfill behavior more explicit (#700) --- .../account_restoration_processor_tests.rs | 10 +- .../account_transaction_processor_tests.rs | 11 +- .../src/sdk_tests/ans_processor_tests.rs | 11 +- .../src/sdk_tests/default_processor_tests.rs | 11 +- .../src/sdk_tests/events_processor_tests.rs | 10 +- .../fungible_asset_processor_tests.rs | 10 +- .../src/sdk_tests/objects_processor_tests.rs | 10 +- .../src/sdk_tests/stake_processor_tests.rs | 10 +- .../src/sdk_tests/token_v2_processor_tests.rs | 10 +- .../user_transaction_processor_tests.rs | 10 +- rust/processor/parser.yaml | 3 +- .../up.sql | 2 +- rust/processor/src/db/postgres/schema.rs | 2 +- rust/sdk-processor/README.md | 23 +- .../src/config/indexer_processor_config.rs | 95 +++- .../models/backfill_processor_status.rs | 8 +- .../parquet_account_transactions_processor.rs | 16 +- .../parquet_ans_processor.rs | 15 +- .../parquet_default_processor.rs | 16 +- .../parquet_events_processor.rs | 16 +- .../parquet_fungible_asset_processor.rs | 16 +- .../parquet_objects_processor.rs | 16 +- .../parquet_stake_processor.rs | 16 +- .../parquet_token_v2_processor.rs | 16 +- .../parquet_transaction_metadata_processor.rs | 16 +- .../parquet_user_transaction_processor.rs | 16 +- .../account_restoration_processor.rs | 16 +- .../account_transactions_processor.rs | 16 +- .../src/processors/ans_processor.rs | 15 +- .../src/processors/default_processor.rs | 16 +- .../src/processors/events_processor.rs | 16 +- .../processors/fungible_asset_processor.rs | 16 +- .../src/processors/monitoring_processor.rs | 16 +- .../src/processors/objects_processor.rs | 16 +- .../src/processors/stake_processor.rs | 16 +- .../src/processors/token_v2_processor.rs | 16 +- .../processors/user_transaction_processor.rs | 16 +- .../steps/common/processor_status_saver.rs | 67 +-- .../src/utils/starting_version.rs | 404 ++++++++++++------ 39 files changed, 832 insertions(+), 209 deletions(-) diff --git a/rust/integration-tests/src/sdk_tests/account_restoration_processor_tests.rs b/rust/integration-tests/src/sdk_tests/account_restoration_processor_tests.rs index 8033b08c3..223c2e6b0 100644 --- a/rust/integration-tests/src/sdk_tests/account_restoration_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/account_restoration_processor_tests.rs @@ -2,7 +2,7 @@ use ahash::AHashMap; use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }; use std::collections::HashSet; @@ -26,6 +26,11 @@ pub fn setup_account_restoration_processor_config( let processor_config = ProcessorConfig::AccountRestorationProcessor(default_processor_config); + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + let processor_name = processor_config.name(); ( IndexerProcessorConfig { @@ -33,6 +38,9 @@ pub fn setup_account_restoration_processor_config( transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs b/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs index f2cb601b5..b6cb682ed 100644 --- a/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/account_transaction_processor_tests.rs @@ -2,7 +2,7 @@ use ahash::AHashMap; use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }; use std::collections::HashSet; @@ -26,12 +26,21 @@ pub fn setup_acc_txn_processor_config( let processor_config = ProcessorConfig::AccountTransactionsProcessor(default_processor_config); let processor_name = processor_config.name(); + + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs b/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs index d31b49522..31a6b004e 100644 --- a/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/ans_processor_tests.rs @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::{ config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, processors::ans_processor::AnsProcessorConfig, @@ -38,12 +38,21 @@ pub fn setup_ans_processor_config( let processor_config = ProcessorConfig::AnsProcessor(ans_processor_config); let processor_name = processor_config.name(); + + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/default_processor_tests.rs b/rust/integration-tests/src/sdk_tests/default_processor_tests.rs index 2355eac60..cb8cd3f8f 100644 --- a/rust/integration-tests/src/sdk_tests/default_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/default_processor_tests.rs @@ -2,7 +2,7 @@ use ahash::AHashMap; use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }; use std::collections::HashSet; @@ -26,12 +26,21 @@ pub fn setup_default_processor_config( let processor_config = ProcessorConfig::DefaultProcessor(default_processor_config); let processor_name = processor_config.name(); + + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/events_processor_tests.rs b/rust/integration-tests/src/sdk_tests/events_processor_tests.rs index 10f4e9bc1..5d1012868 100644 --- a/rust/integration-tests/src/sdk_tests/events_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/events_processor_tests.rs @@ -2,7 +2,7 @@ use ahash::AHashMap; use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }; use std::collections::HashSet; @@ -26,12 +26,20 @@ pub fn setup_events_processor_config( let processor_config = ProcessorConfig::EventsProcessor(default_processor_config); let processor_name = processor_config.name(); + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs b/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs index f2e69f1b4..f73d1efe7 100644 --- a/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/fungible_asset_processor_tests.rs @@ -2,7 +2,7 @@ use ahash::AHashMap; use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }; use std::collections::HashSet; @@ -27,12 +27,20 @@ pub fn setup_fa_processor_config( let processor_config = ProcessorConfig::FungibleAssetProcessor(default_processor_config); let processor_name = processor_config.name(); + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs b/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs index b2f4005ef..e4833769d 100644 --- a/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/objects_processor_tests.rs @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::{ config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, processors::objects_processor::ObjectsProcessorConfig, @@ -37,12 +37,20 @@ pub fn setup_objects_processor_config( let processor_config = ProcessorConfig::ObjectsProcessor(objects_processor_config); let processor_name = processor_config.name(); + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs b/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs index 7d9884f42..6ddcc5101 100644 --- a/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/stake_processor_tests.rs @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::{ config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, processors::stake_processor::StakeProcessorConfig, @@ -34,12 +34,20 @@ pub fn setup_stake_processor_config( let processor_config = ProcessorConfig::StakeProcessor(default_processor_config); let processor_name = processor_config.name(); + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs b/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs index e2677823f..5b09479ed 100644 --- a/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/token_v2_processor_tests.rs @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::{ config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, processors::token_v2_processor::TokenV2ProcessorConfig, @@ -36,12 +36,20 @@ pub fn setup_token_v2_processor_config( let processor_config = ProcessorConfig::TokenV2Processor(token_v2_processor_config); let processor_name = processor_config.name(); + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/integration-tests/src/sdk_tests/user_transaction_processor_tests.rs b/rust/integration-tests/src/sdk_tests/user_transaction_processor_tests.rs index 650d4b46f..62c0a761c 100644 --- a/rust/integration-tests/src/sdk_tests/user_transaction_processor_tests.rs +++ b/rust/integration-tests/src/sdk_tests/user_transaction_processor_tests.rs @@ -2,7 +2,7 @@ use ahash::AHashMap; use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext; use sdk_processor::config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }; use std::collections::HashSet; @@ -27,12 +27,20 @@ pub fn setup_user_txn_processor_config( let processor_config = ProcessorConfig::UserTransactionProcessor(default_processor_config); let processor_name = processor_config.name(); + let testing_config: TestingConfig = TestingConfig { + override_starting_version: transaction_stream_config.starting_version.unwrap(), + ending_version: transaction_stream_config.request_ending_version.unwrap(), + }; + ( IndexerProcessorConfig { processor_config, transaction_stream_config, db_config, backfill_config: None, + bootstrap_config: None, + testing_config: Some(testing_config), + mode: ProcessorMode::Testing, }, processor_name, ) diff --git a/rust/processor/parser.yaml b/rust/processor/parser.yaml index 25ed06b54..4d374c032 100644 --- a/rust/processor/parser.yaml +++ b/rust/processor/parser.yaml @@ -7,4 +7,5 @@ server_config: type: default_processor postgres_connection_string: postgresql://postgres:@localhost:5432/default_processor indexer_grpc_data_service_address: http://127.0.0.1:50051 - auth_token: AUTH_TOKEN \ No newline at end of file + auth_token: AUTH_TOKEN + diff --git a/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql index 9db3e4b1e..7bcb084e8 100644 --- a/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql +++ b/rust/processor/src/db/postgres/migrations/2024-10-22-214753_backfill_processor_status/up.sql @@ -6,6 +6,6 @@ CREATE TABLE backfill_processor_status ( last_updated TIMESTAMP NOT NULL DEFAULT NOW(), last_transaction_timestamp TIMESTAMP NULL, backfill_start_version BIGINT NOT NULL, - backfill_end_version BIGINT NULL, + backfill_end_version BIGINT NOT NULL, PRIMARY KEY (backfill_alias) ); \ No newline at end of file diff --git a/rust/processor/src/db/postgres/schema.rs b/rust/processor/src/db/postgres/schema.rs index dfad031b8..4ae5a73d6 100644 --- a/rust/processor/src/db/postgres/schema.rs +++ b/rust/processor/src/db/postgres/schema.rs @@ -117,7 +117,7 @@ diesel::table! { last_updated -> Timestamp, last_transaction_timestamp -> Nullable, backfill_start_version -> Int8, - backfill_end_version -> Nullable, + backfill_end_version -> Int8, } } diff --git a/rust/sdk-processor/README.md b/rust/sdk-processor/README.md index c7d9519ff..c889dca1b 100644 --- a/rust/sdk-processor/README.md +++ b/rust/sdk-processor/README.md @@ -23,10 +23,10 @@ If you want to index a custom contract, we recommend using the [Quickstart Guide processor_config: type: "fungible_asset_processor" channel_size: 100 + bootstrap_config: + initial_starting_version: 0 transaction_stream_config: indexer_grpc_data_service_address: "https://grpc.mainnet.aptoslabs.com:443" - starting_version: 0 - # request_ending_version: 1409805 auth_token: "{AUTH_TOKEN}" request_name_header: "fungible_asset_processor" db_config: @@ -40,11 +40,26 @@ If you want to index a custom contract, we recommend using the [Quickstart Guide - `type`: which processor to run - `channel_size`: size of channel in between steps - Individual processors may have different configuration required. See the full list of configs [here](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/config/processor_config.rs#L89). + +- `backfill_config` (optional) + - `backfill_id`: appended to `processor_type` for a unique backfill identifier + - `initial_starting_version`: processor starts here unless there is a greater checkpointed version + - `ending_version`: ending version of the backfill + - `overwrite_checkpoint`: overwrite checkpoints if it exists, restarting the backfill from `initial_starting_version`. + +- `testing_config` (optional) + - `override_starting_version`: starting version of the testing. always starts from this version + - `ending_version`: ending version of the testing + +- `bootstrap_config` (optional) used for regular, non-backfill, processors + - `initial_starting_version`: processor starts here unless there is a greater checkpointed version. + Note: no ending version for bootstrap config since its meant to keep running at HEAD. + +- `mode`: (optional) `default`, `testing` or `backfill`. Set to `default` if no mode specified. If backfill/testing/bootstrap configs are not specified, processor will start from 0 or the last successfully processed version. + - `transaction_stream_config` - `indexer_grpc_data_service_address`: Data service non-TLS endpoint address. - `auth_token`: Auth token used for connection. - - `starting_version`: start processor at starting_version. - - `request_ending_version`: stop processor after ending_version. - `request_name_header`: request name header to append to the grpc request; name of the processor - `additional_headers`: addtional headers to append to the grpc request - `indexer_grpc_http2_ping_interval_in_secs`: client-side grpc HTTP2 ping interval. diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 7097dabf3..5343bcda5 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -36,13 +36,84 @@ use serde::{Deserialize, Serialize}; pub const QUERY_DEFAULT_RETRIES: u32 = 5; pub const QUERY_DEFAULT_RETRY_DELAY_MS: u64 = 500; -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, Default)] +#[serde(deny_unknown_fields)] +pub enum ProcessorMode { + #[serde(rename = "default")] + #[default] + Default, + #[serde(rename = "backfill")] + Backfill, + #[serde(rename = "testing")] + Testing, +} + +#[derive(Clone, Debug, Serialize)] #[serde(deny_unknown_fields)] pub struct IndexerProcessorConfig { pub processor_config: ProcessorConfig, pub transaction_stream_config: TransactionStreamConfig, pub db_config: DbConfig, pub backfill_config: Option, + pub bootstrap_config: Option, + pub testing_config: Option, + #[serde(default)] + pub mode: ProcessorMode, +} + +impl IndexerProcessorConfig { + fn validate(&self) -> Result<(), String> { + match self.mode { + ProcessorMode::Testing => { + if self.testing_config.is_none() { + return Err("testing_config must be present when mode is 'testing'".to_string()); + } + }, + ProcessorMode::Backfill => { + if self.backfill_config.is_none() { + return Err( + "backfill_config must be present when mode is 'backfill'".to_string() + ); + } + }, + ProcessorMode::Default => {}, + } + Ok(()) + } +} + +impl<'de> Deserialize<'de> for IndexerProcessorConfig { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(deny_unknown_fields)] + struct Inner { + processor_config: ProcessorConfig, + transaction_stream_config: TransactionStreamConfig, + db_config: DbConfig, + backfill_config: Option, + bootstrap_config: Option, + testing_config: Option, + #[serde(default)] + mode: ProcessorMode, + } + + let inner = Inner::deserialize(deserializer)?; + let config = IndexerProcessorConfig { + processor_config: inner.processor_config, + transaction_stream_config: inner.transaction_stream_config, + db_config: inner.db_config, + backfill_config: inner.backfill_config, + bootstrap_config: inner.bootstrap_config, + testing_config: inner.testing_config, + mode: inner.mode, + }; + + config.validate().map_err(serde::de::Error::custom)?; + Ok(config) + } } #[async_trait::async_trait] @@ -155,5 +226,25 @@ impl RunnableConfig for IndexerProcessorConfig { #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct BackfillConfig { - pub backfill_alias: String, + pub backfill_id: String, + pub initial_starting_version: u64, + pub ending_version: u64, + pub overwrite_checkpoint: bool, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +/// Initial starting version for non-backfill processors. Processors will pick up where it left off +/// if restarted. Read more in `starting_version.rs` +pub struct BootStrapConfig { + pub initial_starting_version: u64, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +/// Use this config for testing. Processors will not use checkpoint and will +/// always start from `override_starting_version`. +pub struct TestingConfig { + pub override_starting_version: u64, + pub ending_version: u64, } diff --git a/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs index 2af058d91..b3aebfe2a 100644 --- a/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs +++ b/rust/sdk-processor/src/db/common/models/backfill_processor_status.rs @@ -59,7 +59,7 @@ pub struct BackfillProcessorStatus { pub last_success_version: i64, pub last_transaction_timestamp: Option, pub backfill_start_version: i64, - pub backfill_end_version: Option, + pub backfill_end_version: i64, } #[derive(AsChangeset, Debug, Queryable)] @@ -72,14 +72,16 @@ pub struct BackfillProcessorStatusQuery { pub last_updated: chrono::NaiveDateTime, pub last_transaction_timestamp: Option, pub backfill_start_version: i64, - pub backfill_end_version: Option, + pub backfill_end_version: i64, } impl BackfillProcessorStatusQuery { pub async fn get_by_processor( - backfill_alias: &str, + processor_type: &str, + backfill_id: &str, conn: &mut DbPoolConnection<'_>, ) -> diesel::QueryResult> { + let backfill_alias = format!("{}_{}", processor_type, backfill_id); backfill_processor_status::table .filter(backfill_processor_status::backfill_alias.eq(backfill_alias)) .first::(conn) diff --git a/rust/sdk-processor/src/parquet_processors/parquet_account_transactions_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_account_transactions_processor.rs index 4ae35ea16..87206fac5 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_account_transactions_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_account_transactions_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -108,6 +109,19 @@ impl ProcessorTrait for ParquetAccountTransactionsProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_ans_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_ans_processor.rs index 9725c611b..ef93388ec 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_ans_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_ans_processor.rs @@ -1,7 +1,7 @@ use crate::{ config::{ db_config::DbConfig, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::{ParquetDefaultProcessorConfig, ProcessorConfig}, }, parquet_processors::{ @@ -121,6 +121,19 @@ impl ProcessorTrait for ParquetAnsProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs index ea64125b6..5ebaa1fa0 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_default_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -115,6 +116,19 @@ impl ProcessorTrait for ParquetDefaultProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs index 503dbd7b3..055193555 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_events_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -106,6 +107,19 @@ impl ProcessorTrait for ParquetEventsProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_fungible_asset_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_fungible_asset_processor.rs index 211e76bb0..2d524b8e7 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_fungible_asset_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_fungible_asset_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -113,6 +114,19 @@ impl ProcessorTrait for ParquetFungibleAssetProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs index f91c23920..cf2b24680 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_objects_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -106,6 +107,19 @@ impl ProcessorTrait for ParquetObjectsProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_stake_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_stake_processor.rs index 10b3dc61c..cb6ebe673 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_stake_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_stake_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -111,6 +112,19 @@ impl ProcessorTrait for ParquetStakeProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs index 2403d3970..bb62aec24 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_token_v2_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -115,6 +116,19 @@ impl ProcessorTrait for ParquetTokenV2Processor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs index 8587593cb..a0cff63b0 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_transaction_metadata_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -107,6 +108,19 @@ impl ProcessorTrait for ParquetTransactionMetadataProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs b/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs index 29fe2353a..d59989956 100644 --- a/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs +++ b/rust/sdk-processor/src/parquet_processors/parquet_user_transaction_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, parquet_processors::{ @@ -107,6 +108,19 @@ impl ProcessorTrait for ParquetUserTransactionsProcessor { // Define processor transaction stream config let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/account_restoration_processor.rs b/rust/sdk-processor/src/processors/account_restoration_processor.rs index 27ecc4a90..cd86053ea 100644 --- a/rust/sdk-processor/src/processors/account_restoration_processor.rs +++ b/rust/sdk-processor/src/processors/account_restoration_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, steps::{ @@ -99,6 +100,19 @@ impl ProcessorTrait for AccountRestorationProcessor { // Define processor steps. let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/account_transactions_processor.rs b/rust/sdk-processor/src/processors/account_transactions_processor.rs index ff5e2624a..b18854114 100644 --- a/rust/sdk-processor/src/processors/account_transactions_processor.rs +++ b/rust/sdk-processor/src/processors/account_transactions_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, steps::{ @@ -98,6 +99,19 @@ impl ProcessorTrait for AccountTransactionsProcessor { // Define processor steps. let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/ans_processor.rs b/rust/sdk-processor/src/processors/ans_processor.rs index 4f549b0bb..0dbbeced4 100644 --- a/rust/sdk-processor/src/processors/ans_processor.rs +++ b/rust/sdk-processor/src/processors/ans_processor.rs @@ -1,7 +1,7 @@ use crate::{ config::{ db_config::DbConfig, - indexer_processor_config::IndexerProcessorConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, steps::{ @@ -113,6 +113,19 @@ impl ProcessorTrait for AnsProcessor { // Define processor steps. let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/default_processor.rs b/rust/sdk-processor/src/processors/default_processor.rs index db3625e55..a0dd4b827 100644 --- a/rust/sdk-processor/src/processors/default_processor.rs +++ b/rust/sdk-processor/src/processors/default_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, steps::{ @@ -101,6 +102,19 @@ impl ProcessorTrait for DefaultProcessor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 849a0f411..2f2c1e33e 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, steps::{ @@ -98,6 +99,19 @@ impl ProcessorTrait for EventsProcessor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs index 19a2ca9a2..dd0e91f81 100644 --- a/rust/sdk-processor/src/processors/fungible_asset_processor.rs +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, steps::{ @@ -98,6 +99,19 @@ impl ProcessorTrait for FungibleAssetProcessor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/monitoring_processor.rs b/rust/sdk-processor/src/processors/monitoring_processor.rs index 477760757..1d4d2c02c 100644 --- a/rust/sdk-processor/src/processors/monitoring_processor.rs +++ b/rust/sdk-processor/src/processors/monitoring_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, steps::common::get_processor_status_saver, @@ -96,6 +97,19 @@ impl ProcessorTrait for MonitoringProcessor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/objects_processor.rs b/rust/sdk-processor/src/processors/objects_processor.rs index eec91f6aa..bc7ae981d 100644 --- a/rust/sdk-processor/src/processors/objects_processor.rs +++ b/rust/sdk-processor/src/processors/objects_processor.rs @@ -2,7 +2,8 @@ use crate::{ config::{ db_config::DbConfig, indexer_processor_config::{ - IndexerProcessorConfig, QUERY_DEFAULT_RETRIES, QUERY_DEFAULT_RETRY_DELAY_MS, + IndexerProcessorConfig, ProcessorMode, QUERY_DEFAULT_RETRIES, + QUERY_DEFAULT_RETRY_DELAY_MS, }, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, @@ -120,6 +121,19 @@ impl ProcessorTrait for ObjectsProcessor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/stake_processor.rs b/rust/sdk-processor/src/processors/stake_processor.rs index 1d58452c8..7d8edb146 100644 --- a/rust/sdk-processor/src/processors/stake_processor.rs +++ b/rust/sdk-processor/src/processors/stake_processor.rs @@ -2,7 +2,8 @@ use crate::{ config::{ db_config::DbConfig, indexer_processor_config::{ - IndexerProcessorConfig, QUERY_DEFAULT_RETRIES, QUERY_DEFAULT_RETRY_DELAY_MS, + IndexerProcessorConfig, ProcessorMode, QUERY_DEFAULT_RETRIES, + QUERY_DEFAULT_RETRY_DELAY_MS, }, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, @@ -122,6 +123,19 @@ impl ProcessorTrait for StakeProcessor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/token_v2_processor.rs b/rust/sdk-processor/src/processors/token_v2_processor.rs index 5329f33d6..16d73faee 100644 --- a/rust/sdk-processor/src/processors/token_v2_processor.rs +++ b/rust/sdk-processor/src/processors/token_v2_processor.rs @@ -2,7 +2,8 @@ use crate::{ config::{ db_config::DbConfig, indexer_processor_config::{ - IndexerProcessorConfig, QUERY_DEFAULT_RETRIES, QUERY_DEFAULT_RETRY_DELAY_MS, + IndexerProcessorConfig, ProcessorMode, QUERY_DEFAULT_RETRIES, + QUERY_DEFAULT_RETRY_DELAY_MS, }, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, @@ -119,6 +120,19 @@ impl ProcessorTrait for TokenV2Processor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/processors/user_transaction_processor.rs b/rust/sdk-processor/src/processors/user_transaction_processor.rs index f7ca14ccf..f39af893f 100644 --- a/rust/sdk-processor/src/processors/user_transaction_processor.rs +++ b/rust/sdk-processor/src/processors/user_transaction_processor.rs @@ -1,6 +1,7 @@ use crate::{ config::{ - db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, processor_config::ProcessorConfig, }, steps::{ @@ -100,6 +101,19 @@ impl ProcessorTrait for UserTransactionProcessor { // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { starting_version: Some(starting_version), + request_ending_version: match self.config.mode { + ProcessorMode::Default => None, + ProcessorMode::Backfill => self + .config + .backfill_config + .as_ref() + .map(|c| c.ending_version), + ProcessorMode::Testing => self + .config + .testing_config + .as_ref() + .map(|c| c.ending_version), + }, ..self.config.transaction_stream_config.clone() }) .await?; diff --git a/rust/sdk-processor/src/steps/common/processor_status_saver.rs b/rust/sdk-processor/src/steps/common/processor_status_saver.rs index 9b4057086..7a29cab70 100644 --- a/rust/sdk-processor/src/steps/common/processor_status_saver.rs +++ b/rust/sdk-processor/src/steps/common/processor_status_saver.rs @@ -1,5 +1,8 @@ use crate::{ - config::{db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig}, + config::{ + db_config::DbConfig, + indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, + }, db::common::models::{ backfill_processor_status::{BackfillProcessorStatus, BackfillStatus}, processor_status::ProcessorStatus, @@ -24,30 +27,37 @@ pub fn get_processor_status_saver( conn_pool: ArcDbPool, config: IndexerProcessorConfig, ) -> ProcessorStatusSaverEnum { - if let Some(backfill_config) = config.backfill_config { - let txn_stream_cfg = config.transaction_stream_config; - let backfill_start_version = txn_stream_cfg.starting_version; - let backfill_end_version = txn_stream_cfg.request_ending_version; - let backfill_alias = backfill_config.backfill_alias.clone(); - ProcessorStatusSaverEnum::Backfill { - conn_pool, - backfill_alias, - backfill_start_version, - backfill_end_version, - } - } else { - let processor_name = config.processor_config.name().to_string(); - if let DbConfig::ParquetConfig(_) = config.db_config { - ProcessorStatusSaverEnum::Parquet { + match config.mode { + ProcessorMode::Backfill => { + let backfill_config = config.backfill_config.clone().unwrap(); + let backfill_start_version = backfill_config.initial_starting_version; + let backfill_end_version = backfill_config.ending_version; + let backfill_alias = format!( + "{}_{}", + config.processor_config.name(), + backfill_config.backfill_id + ); + ProcessorStatusSaverEnum::Backfill { conn_pool, - processor_name, + backfill_alias, + backfill_start_version, + backfill_end_version, } - } else { - ProcessorStatusSaverEnum::Postgres { - conn_pool, - processor_name, + }, + _ => { + let processor_name = config.processor_config.name().to_string(); + if let DbConfig::ParquetConfig(_) = config.db_config { + ProcessorStatusSaverEnum::Parquet { + conn_pool, + processor_name, + } + } else { + ProcessorStatusSaverEnum::Postgres { + conn_pool, + processor_name, + } } - } + }, } } @@ -59,8 +69,8 @@ pub enum ProcessorStatusSaverEnum { Backfill { conn_pool: ArcDbPool, backfill_alias: String, - backfill_start_version: Option, - backfill_end_version: Option, + backfill_start_version: u64, + backfill_end_version: u64, }, Parquet { conn_pool: ArcDbPool, @@ -154,21 +164,18 @@ impl ProcessorStatusSaverEnum { backfill_end_version, } => { let lst_success_version = last_success_batch.metadata.end_version as i64; - let backfill_status = if backfill_end_version.is_some_and(|backfill_end_version| { - lst_success_version >= backfill_end_version as i64 - }) { + let backfill_status = if lst_success_version >= *backfill_end_version as i64 { BackfillStatus::Complete } else { BackfillStatus::InProgress }; - let backfill_end_version_mapped = backfill_end_version.map(|v| v as i64); let status = BackfillProcessorStatus { backfill_alias: backfill_alias.clone(), backfill_status, last_success_version: lst_success_version, last_transaction_timestamp: end_timestamp, - backfill_start_version: backfill_start_version.unwrap_or(0) as i64, - backfill_end_version: backfill_end_version_mapped, + backfill_start_version: *backfill_start_version as i64, + backfill_end_version: *backfill_end_version as i64, }; execute_with_better_error( conn_pool.clone(), diff --git a/rust/sdk-processor/src/utils/starting_version.rs b/rust/sdk-processor/src/utils/starting_version.rs index 995203512..077842aaf 100644 --- a/rust/sdk-processor/src/utils/starting_version.rs +++ b/rust/sdk-processor/src/utils/starting_version.rs @@ -1,6 +1,6 @@ use super::database::ArcDbPool; use crate::{ - config::indexer_processor_config::IndexerProcessorConfig, + config::indexer_processor_config::{IndexerProcessorConfig, ProcessorMode}, db::common::models::{ backfill_processor_status::{ BackfillProcessorStatus, BackfillProcessorStatusQuery, BackfillStatus, @@ -15,31 +15,135 @@ use processor::schema::backfill_processor_status; /// Get the appropriate starting version for the processor. /// -/// If it is a regular processor, this will return the higher of the checkpointed version, -/// or `staring_version` from the config, or 0 if not set. +/// If it is a regular (`mode == default`) processor, this will return the higher of the (checkpointed version, +/// `initial_starting_version` || 0) from the bootstrap config. /// -/// If this is a backfill processor and threre is an in-progress backfill, this will return +/// If this is a backfill processor and there is an in-progress backfill, this will return /// the checkpointed version + 1. /// /// If this is a backfill processor and there is not an in-progress backfill (i.e., no checkpoint or -/// backfill status is COMPLETE), this will return `starting_version` from the config, or 0 if not set. +/// backfill status is COMPLETE), this will return `intial_starting_version` from the backfill config. +/// +/// If the backfill status is IN_PROGRESS, and `backfill_config.overwrite_checkpoint` is `true`, this will return +/// `initial_starting_version` from the backfill config, or 0 if not set. This allows users to restart a +/// backfill job if they want to. pub async fn get_starting_version( indexer_processor_config: &IndexerProcessorConfig, conn_pool: ArcDbPool, ) -> Result { - // Check if there's a checkpoint in the appropriate processor status table. - let latest_processed_version = - get_starting_version_from_db(indexer_processor_config, conn_pool) + let mut conn = conn_pool.get().await?; + + // Determine processor type. + match indexer_processor_config.mode { + ProcessorMode::Backfill => { + let backfill_config = indexer_processor_config.backfill_config.clone().unwrap(); + let backfill_status_option = BackfillProcessorStatusQuery::get_by_processor( + indexer_processor_config.processor_config.name(), + &backfill_config.backfill_id, + &mut conn, + ) + .await + .context("Failed to query backfill_processor_status table.")?; + // Return None if there is no checkpoint, if the backfill is old (complete), or if overwrite_checkpoint is true. + // Otherwise, return the checkpointed version + 1. + if let Some(status) = backfill_status_option { + // If the backfill is complete and overwrite_checkpoint is false, return the ending_version to end the backfill. + if status.backfill_status == BackfillStatus::Complete + && !backfill_config.overwrite_checkpoint + { + return Ok(backfill_config.ending_version); + } + // If status is Complete or overwrite_checkpoint is true, this is the start of a new backfill job. + if backfill_config.overwrite_checkpoint { + let backfill_alias = status.backfill_alias.clone(); + let status = BackfillProcessorStatus { + backfill_alias, + backfill_status: BackfillStatus::InProgress, + last_success_version: 0, + last_transaction_timestamp: None, + backfill_start_version: backfill_config.initial_starting_version as i64, + backfill_end_version: backfill_config.ending_version as i64, + }; + execute_with_better_error( + conn_pool.clone(), + diesel::insert_into(backfill_processor_status::table) + .values(&status) + .on_conflict(backfill_processor_status::backfill_alias) + .do_update() + .set(( + backfill_processor_status::backfill_status + .eq(excluded(backfill_processor_status::backfill_status)), + backfill_processor_status::last_success_version + .eq(excluded(backfill_processor_status::last_success_version)), + backfill_processor_status::last_updated + .eq(excluded(backfill_processor_status::last_updated)), + backfill_processor_status::last_transaction_timestamp.eq(excluded( + backfill_processor_status::last_transaction_timestamp, + )), + backfill_processor_status::backfill_start_version.eq(excluded( + backfill_processor_status::backfill_start_version, + )), + backfill_processor_status::backfill_end_version + .eq(excluded(backfill_processor_status::backfill_end_version)), + )), + None, + ) + .await?; + return Ok(backfill_config.initial_starting_version); + } + + // `backfill_config.initial_starting_version` is NOT respected. + // Return the last success version + 1. + let starting_version = status.last_success_version as u64 + 1; + log_ascii_warning(starting_version); + Ok(starting_version) + } else { + Ok(backfill_config.initial_starting_version) + } + }, + ProcessorMode::Default => { + // Return initial_starting_version if there is no checkpoint. Otherwise, + // return the higher of the checkpointed version and `initial_starting_version`. + let status = ProcessorStatusQuery::get_by_processor( + indexer_processor_config.processor_config.name(), + &mut conn, + ) .await - .context("Failed to get latest processed version from DB")?; - - // If nothing checkpointed, return the `starting_version` from the config, or 0 if not set. - Ok(latest_processed_version.unwrap_or( - indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap_or(0), - )) + .context("Failed to query processor_status table.")?; + + let default_starting_version = indexer_processor_config + .bootstrap_config + .clone() + .map_or(0, |config| config.initial_starting_version); + + Ok(status.map_or(default_starting_version, |status| { + std::cmp::max(status.last_success_version as u64, default_starting_version) + })) + }, + ProcessorMode::Testing => { + // Always start from the override_starting_version. + let testing_config = indexer_processor_config.testing_config.clone().unwrap(); + Ok(testing_config.override_starting_version) + }, + } +} + +fn log_ascii_warning(version: u64) { + println!( + r#" + ██╗ ██╗ █████╗ ██████╗ ███╗ ██╗██╗███╗ ██╗ ██████╗ ██╗ + ██║ ██║██╔══██╗██╔══██╗████╗ ██║██║████╗ ██║██╔════╝ ██║ + ██║ █╗ ██║███████║██████╔╝██╔██╗ ██║██║██╔██╗ ██║██║ ███╗██║ + ██║███╗██║██╔══██║██╔══██╗██║╚██╗██║██║██║╚██╗██║██║ ██║╚═╝ + ╚███╔███╔╝██║ ██║██║ ██║██║ ╚████║██║██║ ╚████║╚██████╔╝██╗ + ╚══╝╚══╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝ + +================================================================= + This backfill job is resuming progress at version {} +================================================================= +"#, + version + ); } /// Get the appropriate minimum last success version for the parquet processors. @@ -51,20 +155,25 @@ pub async fn get_min_last_success_version_parquet( conn_pool: ArcDbPool, table_names: Vec, ) -> Result { - let min_processed_version = if indexer_processor_config.backfill_config.is_some() { - get_starting_version_from_db(indexer_processor_config, conn_pool.clone()) - .await - .context("Failed to get latest processed version from DB")? - } else { - get_min_processed_version_from_db(conn_pool.clone(), table_names) + let min_processed_version = match indexer_processor_config.mode { + ProcessorMode::Testing => { + let testing_config = indexer_processor_config.testing_config.clone().unwrap(); + Some(testing_config.override_starting_version) + }, + ProcessorMode::Backfill => Some( + get_starting_version(indexer_processor_config, conn_pool.clone()) + .await + .context("Failed to get latest processed version from DB")?, + ), + ProcessorMode::Default => get_min_processed_version_from_db(conn_pool.clone(), table_names) .await - .context("Failed to get minimum last success version from DB")? + .context("Failed to get minimum last success version from DB")?, }; let config_starting_version = indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap_or(0); + .bootstrap_config + .clone() + .map_or(0, |config| config.initial_starting_version); if let Some(min_processed_version) = min_processed_version { Ok(std::cmp::max( @@ -134,102 +243,16 @@ async fn get_min_processed_version_from_db( Ok(min_processed_version) } -async fn get_starting_version_from_db( - indexer_processor_config: &IndexerProcessorConfig, - conn_pool: ArcDbPool, -) -> Result> { - let mut conn = conn_pool.get().await?; - - if let Some(backfill_config) = &indexer_processor_config.backfill_config { - let backfill_status_option = BackfillProcessorStatusQuery::get_by_processor( - &backfill_config.backfill_alias, - &mut conn, - ) - .await - .context("Failed to query backfill_processor_status table.")?; - - // Return None if there is no checkpoint or if the backfill is old (complete). - // Otherwise, return the checkpointed version + 1. - if let Some(status) = backfill_status_option { - match status.backfill_status { - BackfillStatus::InProgress => { - return Ok(Some(status.last_success_version as u64 + 1)); - }, - // If status is Complete, this is the start of a new backfill job. - BackfillStatus::Complete => { - let backfill_alias = status.backfill_alias.clone(); - let backfill_end_version_mapped = status.backfill_end_version; - let status = BackfillProcessorStatus { - backfill_alias, - backfill_status: BackfillStatus::InProgress, - last_success_version: 0, - last_transaction_timestamp: None, - backfill_start_version: indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap_or(0) as i64, - backfill_end_version: backfill_end_version_mapped, - }; - execute_with_better_error( - conn_pool.clone(), - diesel::insert_into(backfill_processor_status::table) - .values(&status) - .on_conflict(backfill_processor_status::backfill_alias) - .do_update() - .set(( - backfill_processor_status::backfill_status - .eq(excluded(backfill_processor_status::backfill_status)), - backfill_processor_status::last_success_version - .eq(excluded(backfill_processor_status::last_success_version)), - backfill_processor_status::last_updated - .eq(excluded(backfill_processor_status::last_updated)), - backfill_processor_status::last_transaction_timestamp.eq(excluded( - backfill_processor_status::last_transaction_timestamp, - )), - backfill_processor_status::backfill_start_version.eq(excluded( - backfill_processor_status::backfill_start_version, - )), - backfill_processor_status::backfill_end_version - .eq(excluded(backfill_processor_status::backfill_end_version)), - )), - None, - ) - .await?; - return Ok(None); - }, - } - } else { - return Ok(None); - } - } - - let status = ProcessorStatusQuery::get_by_processor( - indexer_processor_config.processor_config.name(), - &mut conn, - ) - .await - .context("Failed to query processor_status table.")?; - - // Return None if there is no checkpoint. Otherwise, - // return the higher of the checkpointed version + 1 and `starting_version`. - Ok(status.map(|status| { - std::cmp::max( - status.last_success_version as u64, - indexer_processor_config - .transaction_stream_config - .starting_version - .unwrap_or(0), - ) - })) -} - #[cfg(test)] mod tests { use super::*; use crate::{ config::{ db_config::{DbConfig, PostgresConfig}, - indexer_processor_config::{BackfillConfig, IndexerProcessorConfig}, + indexer_processor_config::{ + BackfillConfig, BootStrapConfig, IndexerProcessorConfig, ProcessorMode, + TestingConfig, + }, processor_config::{DefaultProcessorConfig, ProcessorConfig}, }, db::common::models::{ @@ -251,7 +274,9 @@ mod tests { fn create_indexer_config( db_url: String, backfill_config: Option, - starting_version: Option, + initial_starting_version: Option, + override_starting_version: Option, + mode: ProcessorMode, ) -> IndexerProcessorConfig { let default_processor_config = DefaultProcessorConfig { per_table_chunk_sizes: AHashMap::new(), @@ -264,11 +289,18 @@ mod tests { db_pool_size: 100, }; let db_config = DbConfig::PostgresConfig(postgres_config); + let bootstrap_config = Some(BootStrapConfig { + initial_starting_version: initial_starting_version.unwrap_or(0), + }); + let testing_config = Some(TestingConfig { + override_starting_version: override_starting_version.unwrap_or(0), + ending_version: 0, + }); IndexerProcessorConfig { db_config, transaction_stream_config: TransactionStreamConfig { indexer_grpc_data_service_address: Url::parse("https://test.com").unwrap(), - starting_version, + starting_version: None, request_ending_version: None, auth_token: "test".to_string(), request_name_header: "test".to_string(), @@ -280,6 +312,9 @@ mod tests { }, processor_config, backfill_config, + bootstrap_config, + testing_config, + mode, } } @@ -288,7 +323,8 @@ mod tests { async fn test_get_starting_version_no_checkpoint() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None, None); + let indexer_processor_config = + create_indexer_config(db.get_db_url(), None, None, None, ProcessorMode::Default); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await .expect("Failed to create connection pool"); @@ -306,7 +342,8 @@ mod tests { async fn test_get_starting_version_no_checkpoint_with_start_ver() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(5)); + let indexer_processor_config = + create_indexer_config(db.get_db_url(), None, Some(5), None, ProcessorMode::Default); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await .expect("Failed to create connection pool"); @@ -324,7 +361,8 @@ mod tests { async fn test_get_starting_version_with_checkpoint() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None, None); + let indexer_processor_config = + create_indexer_config(db.get_db_url(), None, None, None, ProcessorMode::Default); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await .expect("Failed to create connection pool"); @@ -332,7 +370,7 @@ mod tests { diesel::insert_into(processor_status::table) .values(ProcessorStatus { processor: indexer_processor_config.processor_config.name().to_string(), - last_success_version: 10, + last_success_version: 12, last_transaction_timestamp: None, }) .execute(&mut conn_pool.clone().get().await.unwrap()) @@ -343,21 +381,26 @@ mod tests { .await .unwrap(); - assert_eq!(starting_version, 11) + assert_eq!(starting_version, 12) } #[tokio::test] #[allow(clippy::needless_return)] async fn test_backfill_get_starting_version_with_completed_checkpoint() { let mut db = PostgresTestDatabase::new(); - let backfill_alias = "backfill_processor".to_string(); db.setup().await.unwrap(); + let backfill_id = "1".to_string(); let indexer_processor_config = create_indexer_config( db.get_db_url(), Some(BackfillConfig { - backfill_alias: backfill_alias.clone(), + backfill_id: backfill_id.clone(), + initial_starting_version: 0, + ending_version: 10, + overwrite_checkpoint: false, }), None, + None, + ProcessorMode::Backfill, ); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await @@ -365,12 +408,12 @@ mod tests { run_migrations(db.get_db_url(), conn_pool.clone()).await; diesel::insert_into(processor::schema::backfill_processor_status::table) .values(BackfillProcessorStatus { - backfill_alias: backfill_alias.clone(), + backfill_alias: backfill_id.clone(), backfill_status: BackfillStatus::Complete, last_success_version: 10, last_transaction_timestamp: None, backfill_start_version: 0, - backfill_end_version: Some(10), + backfill_end_version: 10, }) .execute(&mut conn_pool.clone().get().await.unwrap()) .await @@ -387,14 +430,18 @@ mod tests { #[allow(clippy::needless_return)] async fn test_backfill_get_starting_version_with_inprogress_checkpoint() { let mut db = PostgresTestDatabase::new(); - let backfill_alias = "backfill_processor".to_string(); db.setup().await.unwrap(); let indexer_processor_config = create_indexer_config( db.get_db_url(), Some(BackfillConfig { - backfill_alias: backfill_alias.clone(), + backfill_id: "1".to_string(), + initial_starting_version: 0, + ending_version: 10, + overwrite_checkpoint: false, }), None, + None, + ProcessorMode::Backfill, ); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await @@ -402,12 +449,12 @@ mod tests { run_migrations(db.get_db_url(), conn_pool.clone()).await; diesel::insert_into(processor::schema::backfill_processor_status::table) .values(BackfillProcessorStatus { - backfill_alias: backfill_alias.clone(), + backfill_alias: "events_processor_1".to_string(), backfill_status: BackfillStatus::InProgress, last_success_version: 10, last_transaction_timestamp: None, backfill_start_version: 0, - backfill_end_version: Some(10), + backfill_end_version: 100, }) .execute(&mut conn_pool.clone().get().await.unwrap()) .await @@ -420,12 +467,95 @@ mod tests { assert_eq!(starting_version, 11) } + #[tokio::test] + #[allow(clippy::needless_return)] + async fn test_backfill_get_starting_version_with_inprogress_checkpoint_overwrite_checkpoint() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config( + db.get_db_url(), + Some(BackfillConfig { + backfill_id: "1".to_string(), + initial_starting_version: 3, + ending_version: 10, + overwrite_checkpoint: true, + }), + None, + None, + ProcessorMode::Backfill, + ); + let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(processor::schema::backfill_processor_status::table) + .values(BackfillProcessorStatus { + backfill_alias: "events_processor_1".to_string(), + backfill_status: BackfillStatus::InProgress, + last_success_version: 10, + last_transaction_timestamp: None, + backfill_start_version: 0, + backfill_end_version: 100, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 3) + } + + #[tokio::test] + #[allow(clippy::needless_return)] + async fn test_backfill_get_starting_version_with_checkpoint_test_mode() { + let mut db = PostgresTestDatabase::new(); + db.setup().await.unwrap(); + let indexer_processor_config = create_indexer_config( + db.get_db_url(), + Some(BackfillConfig { + backfill_id: "1".to_string(), + initial_starting_version: 3, + ending_version: 10, + overwrite_checkpoint: false, + }), + None, + Some(3), + ProcessorMode::Testing, + ); + let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) + .await + .expect("Failed to create connection pool"); + run_migrations(db.get_db_url(), conn_pool.clone()).await; + diesel::insert_into(processor::schema::backfill_processor_status::table) + .values(BackfillProcessorStatus { + backfill_alias: "events_processor_1".to_string(), + backfill_status: BackfillStatus::InProgress, + last_success_version: 10, + last_transaction_timestamp: None, + backfill_start_version: 0, + backfill_end_version: 100, + }) + .execute(&mut conn_pool.clone().get().await.unwrap()) + .await + .expect("Failed to insert processor status"); + + let starting_version = get_starting_version(&indexer_processor_config, conn_pool) + .await + .unwrap(); + + assert_eq!(starting_version, 3) + } + #[tokio::test] #[allow(clippy::needless_return)] async fn test_get_min_last_success_version_parquet_no_checkpoints() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0)); + let indexer_processor_config = + create_indexer_config(db.get_db_url(), None, Some(0), None, ProcessorMode::Default); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await .expect("Failed to create connection pool"); @@ -449,7 +579,8 @@ mod tests { async fn test_get_min_last_success_version_parquet_with_checkpoints() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0)); + let indexer_processor_config = + create_indexer_config(db.get_db_url(), None, Some(0), None, ProcessorMode::Default); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await .expect("Failed to create connection pool"); @@ -491,7 +622,8 @@ mod tests { async fn test_get_min_last_success_version_parquet_with_partial_checkpoints() { let mut db = PostgresTestDatabase::new(); db.setup().await.unwrap(); - let indexer_processor_config = create_indexer_config(db.get_db_url(), None, Some(0)); + let indexer_processor_config = + create_indexer_config(db.get_db_url(), None, Some(5), None, ProcessorMode::Default); let conn_pool = new_db_pool(db.get_db_url().as_str(), Some(10)) .await .expect("Failed to create connection pool"); @@ -522,6 +654,6 @@ mod tests { .unwrap(); // Since processor_2 has no checkpoint, the minimum version should be the starting version of processor_1 - assert_eq!(min_version, 0); + assert_eq!(min_version, 15); } }