From 048e72fb9eb2ee8df7b2b60176298637c8401ba9 Mon Sep 17 00:00:00 2001 From: Arthur M Meskelis Date: Thu, 7 Nov 2024 12:55:21 -0300 Subject: [PATCH 1/2] rpc-downloader daemonized --- src/bin/rpc_downloader.rs | 29 ++++++++++++++++++++++------- src/config.rs | 4 ++++ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/bin/rpc_downloader.rs b/src/bin/rpc_downloader.rs index 4821eb774..2b31a2e0e 100644 --- a/src/bin/rpc_downloader.rs +++ b/src/bin/rpc_downloader.rs @@ -48,14 +48,29 @@ async fn run(config: RpcDownloaderConfig) -> anyhow::Result<()> { let rpc_storage = config.rpc_storage.init().await?; let chain = Arc::new(BlockchainClient::new_http(&config.external_rpc, config.external_rpc_timeout).await?); - let block_end = match config.block_end { - Some(end) => BlockNumber::from(end), - None => chain.fetch_block_number().await?, - }; + loop { + let block_end = match config.block_end { + Some(end) => BlockNumber::from(end), + None => chain.fetch_block_number().await?, + }; + + // download balances and blocks + download_balances(Arc::clone(&rpc_storage), &chain, config.initial_accounts.clone()).await?; + download_blocks(rpc_storage.clone(), chain.clone(), config.paralellism, block_end).await?; + + if !config.daemon { + break; + } - // download balances and blocks - download_balances(Arc::clone(&rpc_storage), &chain, config.initial_accounts).await?; - download_blocks(rpc_storage, chain, config.paralellism, block_end).await?; + if GlobalState::is_shutdown_warn("rpc-downloader::main") { + break; + } + + // wait for 1 minute before restarting the download + let wait_duration = Duration::from_secs(60); + tracing::info!("Daemon mode enabled, waiting {} seconds before restarting...", wait_duration.as_secs()); + tokio::time::sleep(wait_duration).await; + } Ok(()) } diff --git a/src/config.rs b/src/config.rs index 47ca026d1..556ba57ea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -241,6 +241,10 @@ pub struct RpcDownloaderConfig { #[arg(short = 'p', long = "paralellism", env = "PARALELLISM", default_value = "1")] pub paralellism: usize, + /// Daemon mode. + #[arg(short = 'd', long = "daemon", env = "DAEMON", default_value = "false")] + pub daemon: bool, + /// Accounts to retrieve initial balance information. /// /// For Cloudwalk networks, provide these addresses: From 5021f7c4a52af596f111d1b703679a0d23a516cf Mon Sep 17 00:00:00 2001 From: Arthur M Meskelis Date: Thu, 7 Nov 2024 14:41:40 -0300 Subject: [PATCH 2/2] importer-offline daemonized --- src/bin/importer_offline.rs | 94 ++++++++++++++++++++++--------------- src/config.rs | 8 ++++ 2 files changed, 64 insertions(+), 38 deletions(-) diff --git a/src/bin/importer_offline.rs b/src/bin/importer_offline.rs index c10b4153c..265beb5c1 100644 --- a/src/bin/importer_offline.rs +++ b/src/bin/importer_offline.rs @@ -10,6 +10,7 @@ use std::cmp::min; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use futures::StreamExt; @@ -73,7 +74,7 @@ async fn run(config: ImporterOfflineConfig) -> anyhow::Result<()> { let initial_accounts = rpc_storage.read_initial_accounts().await?; storage.save_accounts(initial_accounts.clone())?; - let storage_loader = execute_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, block_start, block_end, backlog_tx); + let storage_loader = execute_external_rpc_storage_loader(rpc_storage, config.blocks_by_fetch, config.paralellism, config.daemon, config.daemon_interval, block_start, block_end, backlog_tx); spawn_named("storage-loader", async move { if let Err(e) = storage_loader.await { tracing::error!(parent: None, reason = ?e, "'storage-loader' task failed"); @@ -185,57 +186,74 @@ async fn execute_external_rpc_storage_loader( // data blocks_by_fetch: usize, paralellism: usize, + daemon: bool, + daemon_interval: u64, mut start: BlockNumber, - end: BlockNumber, + mut end: BlockNumber, backlog: mpsc::Sender, ) -> anyhow::Result<()> { const TASK_NAME: &str = "external-block-loader"; - tracing::info!(parent: None, %start, %end, "creating task {}", TASK_NAME); - // prepare loads to be executed in parallel - let mut tasks = Vec::new(); - while start <= end { - let end = min(start + (blocks_by_fetch - 1), end); - - let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), start, end); - tasks.push(task); - - start += blocks_by_fetch; - } - - // execute loads in parallel - let mut tasks = futures::stream::iter(tasks).buffered(paralellism); loop { + // prepare loads to be executed in parallel + let mut tasks = Vec::new(); + if GlobalState::is_shutdown_warn(TASK_NAME) { return Ok(()); }; - // retrieve next batch of loaded blocks - // if finished, do not cancel, it is expected to finish - let Some(result) = tasks.next().await else { - tracing::info!(parent: None, "{} has no more blocks to process", TASK_NAME); - return Ok(()); - }; + tracing::info!(parent: None, %start, %end, "creating task {}", TASK_NAME); + while start <= end { + end = min(start + (blocks_by_fetch - 1), end); - // check if executed correctly - let blocks = match result { - Ok(blocks) => blocks, - Err(e) => { - let message = GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt"); - return log_and_err!(reason = e, message); - } - }; + let task = load_blocks_and_receipts(Arc::clone(&rpc_storage), start, end); + tasks.push(task); - // check blocks were really loaded - if blocks.is_empty() { - let message = GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected"); - return log_and_err!(message); + start += blocks_by_fetch; } - // send to backlog - if backlog.send(blocks).await.is_err() { - return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer"))); - }; + // execute loads in parallel + let mut iterator = futures::stream::iter(tasks).buffered(paralellism); + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return Ok(()); + }; + + // retrieve next batch of loaded blocks + // if finished, do not cancel, it is expected to finish + let Some(result) = iterator.next().await else { + tracing::info!(parent: None, "{} has no more blocks to process", TASK_NAME); + + if daemon { + tracing::info!(parent: None, "{} is in daemon mode, waiting for new blocks", TASK_NAME); + tokio::time::sleep(Duration::from_secs(daemon_interval)).await; + break; + } else { + tracing::info!(parent: None, "{} is not in daemon mode, exiting", TASK_NAME); + return Ok(()); + } + }; + + // check if executed correctly + let blocks = match result { + Ok(blocks) => blocks, + Err(e) => { + let message = GlobalState::shutdown_from(TASK_NAME, "failed to fetch block or receipt"); + return log_and_err!(reason = e, message); + } + }; + + // check blocks were really loaded + if blocks.is_empty() { + let message = GlobalState::shutdown_from(TASK_NAME, "no blocks returned when they were expected"); + return log_and_err!(message); + } + + // send to backlog + if backlog.send(blocks).await.is_err() { + return Err(anyhow!(GlobalState::shutdown_from(TASK_NAME, "failed to send task to importer"))); + }; + } } } diff --git a/src/config.rs b/src/config.rs index 556ba57ea..7408aa2d1 100644 --- a/src/config.rs +++ b/src/config.rs @@ -299,6 +299,14 @@ pub struct ImporterOfflineConfig { #[clap(flatten)] pub rpc_storage: ExternalRpcStorageConfig, + /// Daemon mode - keeps running and checking for new blocks + #[arg(short = 'd', long = "daemon", env = "DAEMON", default_value = "false")] + pub daemon: bool, + + /// Interval between daemon checks in seconds + #[arg(long = "daemon-interval", env = "DAEMON_INTERVAL", default_value = "60")] + pub daemon_interval: u64, + #[deref] #[clap(flatten)] pub common: CommonConfig,