Skip to content

Commit

Permalink
Make backfill behavior more explicit (#700)
Browse files Browse the repository at this point in the history
  • Loading branch information
dermanyang authored Feb 8, 2025
1 parent e943d0f commit bd642dd
Show file tree
Hide file tree
Showing 39 changed files with 832 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand All @@ -26,13 +26,21 @@ pub fn setup_account_restoration_processor_config(

let processor_config = ProcessorConfig::AccountRestorationProcessor(default_processor_config);

let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

let processor_name = processor_config.name();
(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand All @@ -26,12 +26,21 @@ pub fn setup_acc_txn_processor_config(

let processor_config = ProcessorConfig::AccountTransactionsProcessor(default_processor_config);
let processor_name = processor_config.name();

let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
11 changes: 10 additions & 1 deletion rust/integration-tests/src/sdk_tests/ans_processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::ans_processor::AnsProcessorConfig,
Expand Down Expand Up @@ -38,12 +38,21 @@ pub fn setup_ans_processor_config(

let processor_config = ProcessorConfig::AnsProcessor(ans_processor_config);
let processor_name = processor_config.name();

let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
11 changes: 10 additions & 1 deletion rust/integration-tests/src/sdk_tests/default_processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand All @@ -26,12 +26,21 @@ pub fn setup_default_processor_config(

let processor_config = ProcessorConfig::DefaultProcessor(default_processor_config);
let processor_name = processor_config.name();

let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
10 changes: 9 additions & 1 deletion rust/integration-tests/src/sdk_tests/events_processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand All @@ -26,12 +26,20 @@ pub fn setup_events_processor_config(

let processor_config = ProcessorConfig::EventsProcessor(default_processor_config);
let processor_name = processor_config.name();
let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand All @@ -27,12 +27,20 @@ pub fn setup_fa_processor_config(
let processor_config = ProcessorConfig::FungibleAssetProcessor(default_processor_config);

let processor_name = processor_config.name();
let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
10 changes: 9 additions & 1 deletion rust/integration-tests/src/sdk_tests/objects_processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::objects_processor::ObjectsProcessorConfig,
Expand Down Expand Up @@ -37,12 +37,20 @@ pub fn setup_objects_processor_config(
let processor_config = ProcessorConfig::ObjectsProcessor(objects_processor_config);

let processor_name = processor_config.name();
let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
10 changes: 9 additions & 1 deletion rust/integration-tests/src/sdk_tests/stake_processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::stake_processor::StakeProcessorConfig,
Expand Down Expand Up @@ -34,12 +34,20 @@ pub fn setup_stake_processor_config(

let processor_config = ProcessorConfig::StakeProcessor(default_processor_config);
let processor_name = processor_config.name();
let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::{
config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
processors::token_v2_processor::TokenV2ProcessorConfig,
Expand Down Expand Up @@ -36,12 +36,20 @@ pub fn setup_token_v2_processor_config(
let processor_config = ProcessorConfig::TokenV2Processor(token_v2_processor_config);

let processor_name = processor_config.name();
let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ahash::AHashMap;
use aptos_indexer_testing_framework::sdk_test_context::SdkTestContext;
use sdk_processor::config::{
db_config::{DbConfig, PostgresConfig},
indexer_processor_config::IndexerProcessorConfig,
indexer_processor_config::{IndexerProcessorConfig, ProcessorMode, TestingConfig},
processor_config::{DefaultProcessorConfig, ProcessorConfig},
};
use std::collections::HashSet;
Expand All @@ -27,12 +27,20 @@ pub fn setup_user_txn_processor_config(
let processor_config = ProcessorConfig::UserTransactionProcessor(default_processor_config);

let processor_name = processor_config.name();
let testing_config: TestingConfig = TestingConfig {
override_starting_version: transaction_stream_config.starting_version.unwrap(),
ending_version: transaction_stream_config.request_ending_version.unwrap(),
};

(
IndexerProcessorConfig {
processor_config,
transaction_stream_config,
db_config,
backfill_config: None,
bootstrap_config: None,
testing_config: Some(testing_config),
mode: ProcessorMode::Testing,
},
processor_name,
)
Expand Down
3 changes: 2 additions & 1 deletion rust/processor/parser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ server_config:
type: default_processor
postgres_connection_string: postgresql://postgres:@localhost:5432/default_processor
indexer_grpc_data_service_address: http://127.0.0.1:50051
auth_token: AUTH_TOKEN
auth_token: AUTH_TOKEN

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ CREATE TABLE backfill_processor_status (
last_updated TIMESTAMP NOT NULL DEFAULT NOW(),
last_transaction_timestamp TIMESTAMP NULL,
backfill_start_version BIGINT NOT NULL,
backfill_end_version BIGINT NULL,
backfill_end_version BIGINT NOT NULL,
PRIMARY KEY (backfill_alias)
);
2 changes: 1 addition & 1 deletion rust/processor/src/db/postgres/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ diesel::table! {
last_updated -> Timestamp,
last_transaction_timestamp -> Nullable<Timestamp>,
backfill_start_version -> Int8,
backfill_end_version -> Nullable<Int8>,
backfill_end_version -> Int8,
}
}

Expand Down
23 changes: 19 additions & 4 deletions rust/sdk-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ If you want to index a custom contract, we recommend using the [Quickstart Guide
processor_config:
type: "fungible_asset_processor"
channel_size: 100
bootstrap_config:
initial_starting_version: 0
transaction_stream_config:
indexer_grpc_data_service_address: "https://grpc.mainnet.aptoslabs.com:443"
starting_version: 0
# request_ending_version: 1409805
auth_token: "{AUTH_TOKEN}"
request_name_header: "fungible_asset_processor"
db_config:
Expand All @@ -40,11 +40,26 @@ If you want to index a custom contract, we recommend using the [Quickstart Guide
- `type`: which processor to run
- `channel_size`: size of channel in between steps
- Individual processors may have different configuration required. See the full list of configs [here](https://github.com/aptos-labs/aptos-indexer-processors/blob/main/rust/sdk-processor/src/config/processor_config.rs#L89).

- `backfill_config` (optional)
- `backfill_id`: appended to `processor_type` for a unique backfill identifier
- `initial_starting_version`: processor starts here unless there is a greater checkpointed version
- `ending_version`: ending version of the backfill
- `overwrite_checkpoint`: overwrite checkpoints if it exists, restarting the backfill from `initial_starting_version`.

- `testing_config` (optional)
- `override_starting_version`: starting version of the testing. always starts from this version
- `ending_version`: ending version of the testing

- `bootstrap_config` (optional) used for regular, non-backfill, processors
- `initial_starting_version`: processor starts here unless there is a greater checkpointed version.
Note: no ending version for bootstrap config since its meant to keep running at HEAD.

- `mode`: (optional) `default`, `testing` or `backfill`. Set to `default` if no mode specified. If backfill/testing/bootstrap configs are not specified, processor will start from 0 or the last successfully processed version.

- `transaction_stream_config`
- `indexer_grpc_data_service_address`: Data service non-TLS endpoint address.
- `auth_token`: Auth token used for connection.
- `starting_version`: start processor at starting_version.
- `request_ending_version`: stop processor after ending_version.
- `request_name_header`: request name header to append to the grpc request; name of the processor
- `additional_headers`: addtional headers to append to the grpc request
- `indexer_grpc_http2_ping_interval_in_secs`: client-side grpc HTTP2 ping interval.
Expand Down
Loading

0 comments on commit bd642dd

Please sign in to comment.