Skip to content

Commit

Permalink
enabling smart account warmup for programs with lot of accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 13, 2024
1 parent ce215e0 commit 3c92008
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 48 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions accounts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ lazy_static = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }

async-recursion = "1.0.5"

[dev-dependencies]
rand = "0.8.5"
rand_chacha = "0.3.1"
128 changes: 92 additions & 36 deletions accounts/src/account_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::{str::FromStr, sync::Arc};

use anyhow::bail;
use async_recursion::async_recursion;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_account_decoder::{UiAccount, UiDataSliceConfig};
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage, AccountStream},
account_filter::AccountFilters,
account_filter::{
AccountFilter, AccountFilterType, AccountFilters, MemcmpFilter, MemcmpFilterData,
},
},
types::BlockStream,
AnyhowJoinHandle,
Expand Down Expand Up @@ -38,6 +41,10 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_get_account_called", "Account Updates by lite-rpc service")).unwrap();
}

// this is because for most of the anchor programs first 8 bytes are discriminators so the best discriminator is at index 9
// but for general case 0 is better
const WARMUP_BYTE_INDEX: u64 = 0;

#[derive(Clone)]
pub struct AccountService {
account_store: Arc<dyn AccountStorageInterface>,
Expand All @@ -55,51 +62,100 @@ impl AccountService {
}
}

#[async_recursion]
pub async fn get_account_keys(
rpc_client: &Arc<RpcClient>,
filter: &AccountFilter,
enable_smart_warmup: bool,
) -> anyhow::Result<Vec<Pubkey>> {
if !filter.accounts.is_empty() {
Ok(filter
.accounts
.iter()
.map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid"))
.collect_vec())
} else if let Some(program_id) = &filter.program_id {
let program_id =
Pubkey::from_str(program_id).expect("Program id in filters should be valid");

match rpc_client
.get_program_accounts_with_config(
&program_id,
RpcProgramAccountsConfig {
filters: filter.get_rpc_filter(),
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: 0,
}),
commitment: None,
min_context_slot: None,
},
with_context: None,
},
)
.await
{
Ok(value) => Ok(value.iter().map(|(pk, _)| *pk).collect_vec()),
Err(e) => {
if enable_smart_warmup {
log::info!(
"gPA failed, using smart warmup feature for program {}",
filter.program_id.clone().unwrap_or_default()
);
let mut accounts = vec![];
for desc in 0..=u8::MAX {
let desc_filter = AccountFilterType::Memcmp(MemcmpFilter {
offset: WARMUP_BYTE_INDEX,
data: MemcmpFilterData::Bytes(vec![desc]),
});
let program_id = filter.program_id.clone();
let filter_list = match &filter.filters {
Some(filters) => {
let mut filters = filters.clone();
filters.push(desc_filter);
filters
}
None => {
vec![desc_filter]
}
};
let new_filter = AccountFilter {
accounts: vec![],
program_id,
filters: Some(filter_list),
};
let mut acc_list =
Self::get_account_keys(rpc_client, &new_filter, false).await?;
accounts.append(&mut acc_list);
}
Ok(accounts)
} else {
log::error!("The account filter provided timed out while getProgramAccounts from RPC, try making filter smaller. error {e:?}");
bail!("The account filter provided timed out while getProgramAccounts from RPC, try making filter smaller");
}
}
}
} else {
bail!("Account filter does not contain accounts nor program filters");
}
}

pub async fn populate_from_rpc(
&self,
rpc_client: Arc<RpcClient>,
filters: &AccountFilters,
max_request_in_parallel: usize,
enable_smart_accounts_warmup: bool,
) -> anyhow::Result<()> {
const NB_ACCOUNTS_IN_GMA: usize = 100;
const NB_RETRY: usize = 10;
let mut accounts = vec![];
for filter in filters.iter() {
if !filter.accounts.is_empty() {
let mut f_accounts = filter
.accounts
.iter()
.map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid"))
.collect();
accounts.append(&mut f_accounts);
}

if let Some(program_id) = &filter.program_id {
let program_id =
Pubkey::from_str(program_id).expect("Program id in filters should be valid");
let mut rpc_acc = rpc_client
.get_program_accounts_with_config(
&program_id,
RpcProgramAccountsConfig {
filters: filter.get_rpc_filter(),
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: 0,
}),
commitment: None,
min_context_slot: None,
},
with_context: None,
},
)
.await?
.iter()
.map(|(pk, _)| *pk)
.collect_vec();
accounts.append(&mut rpc_acc);
}
let mut keys =
Self::get_account_keys(&rpc_client, filter, enable_smart_accounts_warmup).await?;
accounts.append(&mut keys);
}
log::info!("Fetching {} accounts", accounts.len());
for accounts in accounts.chunks(max_request_in_parallel * NB_ACCOUNTS_IN_GMA) {
Expand Down
37 changes: 30 additions & 7 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
use anyhow::Context;
use clap::Parser;
use dotenv::dotenv;
use serde::{Deserialize, Serialize};
use solana_rpc_client_api::client_error::reqwest::Url;

#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -86,10 +87,7 @@ pub struct Config {
pub enable_address_lookup_tables: Option<bool>,

#[serde(default)]
pub account_filters: Option<String>,

#[serde(default)]
pub enable_accounts_on_demand_accounts_service: Option<bool>,
pub accounts_configuration: AccountConfiguration,
}

impl Config {
Expand Down Expand Up @@ -219,12 +217,25 @@ impl Config {
.ok()
.or(config.address_lookup_tables_binary);

config.account_filters = env::var("ACCOUNT_FILTERS").ok().or(config.account_filters);
config.accounts_configuration.account_filters = env::var("ACCOUNT_FILTERS")
.ok()
.or(config.accounts_configuration.account_filters);

config.enable_accounts_on_demand_accounts_service = env::var("ENABLE_ACCOUNT_ON_DEMAND")
config
.accounts_configuration
.enable_accounts_on_demand_accounts_service = env::var("ENABLE_SMART_ACCOUNT_WARMUP")
.map(|value| value.parse::<bool>().unwrap())
.ok()
.or(config.enable_accounts_on_demand_accounts_service);
.or(config.accounts_configuration.enable_smart_accounts_warmup);

config
.accounts_configuration
.enable_accounts_on_demand_accounts_service = env::var("ENABLE_ACCOUNT_ON_DEMAND")
.map(|value| value.parse::<bool>().unwrap())
.ok()
.or(config
.accounts_configuration
.enable_accounts_on_demand_accounts_service);
Ok(config)
}

Expand Down Expand Up @@ -349,3 +360,15 @@ fn obfuscate_token(token: &Option<String>) -> String {
}
}
}

#[derive(Clone, Serialize, Deserialize, Debug, Default)]
pub struct AccountConfiguration {
#[serde(default)]
pub account_filters: Option<String>,

#[serde(default)]
pub enable_accounts_on_demand_accounts_service: Option<bool>,

#[serde(default)]
pub enable_smart_accounts_warmup: Option<bool>,
}
20 changes: 15 additions & 5 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
enable_grpc_stream_inspection,
enable_address_lookup_tables,
address_lookup_tables_binary,
account_filters,
enable_accounts_on_demand_accounts_service,
accounts_configuration,
..
} = args;

Expand All @@ -149,15 +148,16 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:

let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);

let account_filters = if let Some(account_filters) = account_filters {
let account_filters = if let Some(account_filters) = accounts_configuration.account_filters {
serde_json::from_str::<AccountFilters>(account_filters.as_str())
.expect("Account filters should be valid")
} else {
vec![]
};

let enable_accounts_on_demand_accounts_service =
enable_accounts_on_demand_accounts_service.unwrap_or_default();
let enable_accounts_on_demand_accounts_service = accounts_configuration
.enable_accounts_on_demand_accounts_service
.unwrap_or_default();
if enable_accounts_on_demand_accounts_service {
log::info!("Accounts on demand service is enabled");
} else {
Expand Down Expand Up @@ -232,11 +232,21 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
account_service
.process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe());

let enable_smart_accounts_warmup = accounts_configuration
.enable_smart_accounts_warmup
.unwrap_or_default();
if enable_smart_accounts_warmup {
info!("Smart account warmup enabled");
} else {
info!("Smart account warmup disabled");
}

account_service
.populate_from_rpc(
rpc_client.clone(),
&account_filters,
MAX_CONNECTIONS_IN_PARALLEL,
enable_smart_accounts_warmup,
)
.await?;
Some(account_service)
Expand Down

0 comments on commit 3c92008

Please sign in to comment.