From 3c9200891b7000319796c8fe350d76353370bd2b Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 13 Mar 2024 21:16:53 +0100 Subject: [PATCH] enabling smart account warmup for programs with lot of accounts --- Cargo.lock | 12 +++ accounts/Cargo.toml | 2 + accounts/src/account_service.rs | 128 +++++++++++++++++++++++--------- lite-rpc/src/cli.rs | 37 +++++++-- lite-rpc/src/main.rs | 20 +++-- 5 files changed, 151 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5170cf1b..9a25e379 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -414,6 +414,17 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -4347,6 +4358,7 @@ name = "solana-lite-rpc-accounts" version = "0.2.4" dependencies = [ "anyhow", + "async-recursion", "async-trait", "base64 0.21.7", "bincode", diff --git a/accounts/Cargo.toml b/accounts/Cargo.toml index 254370da..d497ab8f 100644 --- a/accounts/Cargo.toml +++ b/accounts/Cargo.toml @@ -43,6 +43,8 @@ lazy_static = { workspace = true } solana-lite-rpc-core = { workspace = true } solana-lite-rpc-util = { workspace = true } +async-recursion = "1.0.5" + [dev-dependencies] rand = "0.8.5" rand_chacha = "0.3.1" \ No newline at end of file diff --git a/accounts/src/account_service.rs b/accounts/src/account_service.rs index 7b9ab410..632a0564 100644 --- a/accounts/src/account_service.rs +++ b/accounts/src/account_service.rs @@ -1,6 +1,7 @@ use std::{str::FromStr, sync::Arc}; use anyhow::bail; +use async_recursion::async_recursion; use itertools::Itertools; use prometheus::{opts, register_int_gauge, IntGauge}; use solana_account_decoder::{UiAccount, UiDataSliceConfig}; @@ -8,7 +9,9 @@ use solana_lite_rpc_core::{ commitment_utils::Commitment, structures::{ account_data::{AccountData, AccountNotificationMessage, AccountStream}, - account_filter::AccountFilters, + account_filter::{ + AccountFilter, AccountFilterType, AccountFilters, MemcmpFilter, MemcmpFilterData, + }, }, types::BlockStream, AnyhowJoinHandle, @@ -38,6 +41,10 @@ lazy_static::lazy_static! { register_int_gauge!(opts!("literpc_get_account_called", "Account Updates by lite-rpc service")).unwrap(); } +// this is because for most of the anchor programs first 8 bytes are discriminators so the best discriminator is at index 9 +// but for general case 0 is better +const WARMUP_BYTE_INDEX: u64 = 0; + #[derive(Clone)] pub struct AccountService { account_store: Arc, @@ -55,51 +62,100 @@ impl AccountService { } } + #[async_recursion] + pub async fn get_account_keys( + rpc_client: &Arc, + filter: &AccountFilter, + enable_smart_warmup: bool, + ) -> anyhow::Result> { + if !filter.accounts.is_empty() { + Ok(filter + .accounts + .iter() + .map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid")) + .collect_vec()) + } else if let Some(program_id) = &filter.program_id { + let program_id = + Pubkey::from_str(program_id).expect("Program id in filters should be valid"); + + match rpc_client + .get_program_accounts_with_config( + &program_id, + RpcProgramAccountsConfig { + filters: filter.get_rpc_filter(), + account_config: RpcAccountInfoConfig { + encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), + data_slice: Some(UiDataSliceConfig { + offset: 0, + length: 0, + }), + commitment: None, + min_context_slot: None, + }, + with_context: None, + }, + ) + .await + { + Ok(value) => Ok(value.iter().map(|(pk, _)| *pk).collect_vec()), + Err(e) => { + if enable_smart_warmup { + log::info!( + "gPA failed, using smart warmup feature for program {}", + filter.program_id.clone().unwrap_or_default() + ); + let mut accounts = vec![]; + for desc in 0..=u8::MAX { + let desc_filter = AccountFilterType::Memcmp(MemcmpFilter { + offset: WARMUP_BYTE_INDEX, + data: MemcmpFilterData::Bytes(vec![desc]), + }); + let program_id = filter.program_id.clone(); + let filter_list = match &filter.filters { + Some(filters) => { + let mut filters = filters.clone(); + filters.push(desc_filter); + filters + } + None => { + vec![desc_filter] + } + }; + let new_filter = AccountFilter { + accounts: vec![], + program_id, + filters: Some(filter_list), + }; + let mut acc_list = + Self::get_account_keys(rpc_client, &new_filter, false).await?; + accounts.append(&mut acc_list); + } + Ok(accounts) + } else { + log::error!("The account filter provided timed out while getProgramAccounts from RPC, try making filter smaller. error {e:?}"); + bail!("The account filter provided timed out while getProgramAccounts from RPC, try making filter smaller"); + } + } + } + } else { + bail!("Account filter does not contain accounts nor program filters"); + } + } + pub async fn populate_from_rpc( &self, rpc_client: Arc, filters: &AccountFilters, max_request_in_parallel: usize, + enable_smart_accounts_warmup: bool, ) -> anyhow::Result<()> { const NB_ACCOUNTS_IN_GMA: usize = 100; const NB_RETRY: usize = 10; let mut accounts = vec![]; for filter in filters.iter() { - if !filter.accounts.is_empty() { - let mut f_accounts = filter - .accounts - .iter() - .map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid")) - .collect(); - accounts.append(&mut f_accounts); - } - - if let Some(program_id) = &filter.program_id { - let program_id = - Pubkey::from_str(program_id).expect("Program id in filters should be valid"); - let mut rpc_acc = rpc_client - .get_program_accounts_with_config( - &program_id, - RpcProgramAccountsConfig { - filters: filter.get_rpc_filter(), - account_config: RpcAccountInfoConfig { - encoding: Some(solana_account_decoder::UiAccountEncoding::Base64), - data_slice: Some(UiDataSliceConfig { - offset: 0, - length: 0, - }), - commitment: None, - min_context_slot: None, - }, - with_context: None, - }, - ) - .await? - .iter() - .map(|(pk, _)| *pk) - .collect_vec(); - accounts.append(&mut rpc_acc); - } + let mut keys = + Self::get_account_keys(&rpc_client, filter, enable_smart_accounts_warmup).await?; + accounts.append(&mut keys); } log::info!("Fetching {} accounts", accounts.len()); for accounts in accounts.chunks(max_request_in_parallel * NB_ACCOUNTS_IN_GMA) { diff --git a/lite-rpc/src/cli.rs b/lite-rpc/src/cli.rs index 02ea57e6..1432f59e 100644 --- a/lite-rpc/src/cli.rs +++ b/lite-rpc/src/cli.rs @@ -12,6 +12,7 @@ use crate::{ use anyhow::Context; use clap::Parser; use dotenv::dotenv; +use serde::{Deserialize, Serialize}; use solana_rpc_client_api::client_error::reqwest::Url; #[derive(Parser, Debug, Clone)] @@ -86,10 +87,7 @@ pub struct Config { pub enable_address_lookup_tables: Option, #[serde(default)] - pub account_filters: Option, - - #[serde(default)] - pub enable_accounts_on_demand_accounts_service: Option, + pub accounts_configuration: AccountConfiguration, } impl Config { @@ -219,12 +217,25 @@ impl Config { .ok() .or(config.address_lookup_tables_binary); - config.account_filters = env::var("ACCOUNT_FILTERS").ok().or(config.account_filters); + config.accounts_configuration.account_filters = env::var("ACCOUNT_FILTERS") + .ok() + .or(config.accounts_configuration.account_filters); - config.enable_accounts_on_demand_accounts_service = env::var("ENABLE_ACCOUNT_ON_DEMAND") + config + .accounts_configuration + .enable_accounts_on_demand_accounts_service = env::var("ENABLE_SMART_ACCOUNT_WARMUP") .map(|value| value.parse::().unwrap()) .ok() - .or(config.enable_accounts_on_demand_accounts_service); + .or(config.accounts_configuration.enable_smart_accounts_warmup); + + config + .accounts_configuration + .enable_accounts_on_demand_accounts_service = env::var("ENABLE_ACCOUNT_ON_DEMAND") + .map(|value| value.parse::().unwrap()) + .ok() + .or(config + .accounts_configuration + .enable_accounts_on_demand_accounts_service); Ok(config) } @@ -349,3 +360,15 @@ fn obfuscate_token(token: &Option) -> String { } } } + +#[derive(Clone, Serialize, Deserialize, Debug, Default)] +pub struct AccountConfiguration { + #[serde(default)] + pub account_filters: Option, + + #[serde(default)] + pub enable_accounts_on_demand_accounts_service: Option, + + #[serde(default)] + pub enable_smart_accounts_warmup: Option, +} diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index e4707bbf..75ea50ba 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -134,8 +134,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: enable_grpc_stream_inspection, enable_address_lookup_tables, address_lookup_tables_binary, - account_filters, - enable_accounts_on_demand_accounts_service, + accounts_configuration, .. } = args; @@ -149,15 +148,16 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr); - let account_filters = if let Some(account_filters) = account_filters { + let account_filters = if let Some(account_filters) = accounts_configuration.account_filters { serde_json::from_str::(account_filters.as_str()) .expect("Account filters should be valid") } else { vec![] }; - let enable_accounts_on_demand_accounts_service = - enable_accounts_on_demand_accounts_service.unwrap_or_default(); + let enable_accounts_on_demand_accounts_service = accounts_configuration + .enable_accounts_on_demand_accounts_service + .unwrap_or_default(); if enable_accounts_on_demand_accounts_service { log::info!("Accounts on demand service is enabled"); } else { @@ -232,11 +232,21 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc) -> anyhow: account_service .process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe()); + let enable_smart_accounts_warmup = accounts_configuration + .enable_smart_accounts_warmup + .unwrap_or_default(); + if enable_smart_accounts_warmup { + info!("Smart account warmup enabled"); + } else { + info!("Smart account warmup disabled"); + } + account_service .populate_from_rpc( rpc_client.clone(), &account_filters, MAX_CONNECTIONS_IN_PARALLEL, + enable_smart_accounts_warmup, ) .await?; Some(account_service)