Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enabling smart account warmup for programs with lot of accounts #358

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions accounts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ lazy_static = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }

async-recursion = "1.0.5"

[dev-dependencies]
rand = "0.8.5"
rand_chacha = "0.3.1"
128 changes: 92 additions & 36 deletions accounts/src/account_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::{str::FromStr, sync::Arc};

use anyhow::bail;
use async_recursion::async_recursion;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_account_decoder::{UiAccount, UiDataSliceConfig};
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage, AccountStream},
account_filter::AccountFilters,
account_filter::{
AccountFilter, AccountFilterType, AccountFilters, MemcmpFilter, MemcmpFilterData,
},
},
types::BlockStream,
AnyhowJoinHandle,
Expand Down Expand Up @@ -38,6 +41,10 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_get_account_called", "Account Updates by lite-rpc service")).unwrap();
}

// this is because for most of the anchor programs first 8 bytes are discriminators so the best discriminator is at index 9
// but for general case 0 is better
const WARMUP_BYTE_INDEX: u64 = 0;

#[derive(Clone)]
pub struct AccountService {
account_store: Arc<dyn AccountStorageInterface>,
Expand All @@ -55,51 +62,100 @@ impl AccountService {
}
}

#[async_recursion]
pub async fn get_account_keys(
rpc_client: &Arc<RpcClient>,
filter: &AccountFilter,
enable_smart_warmup: bool,
) -> anyhow::Result<Vec<Pubkey>> {
if !filter.accounts.is_empty() {
Ok(filter
.accounts
.iter()
.map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid"))
.collect_vec())
} else if let Some(program_id) = &filter.program_id {
let program_id =
Pubkey::from_str(program_id).expect("Program id in filters should be valid");

match rpc_client
.get_program_accounts_with_config(
&program_id,
RpcProgramAccountsConfig {
filters: filter.get_rpc_filter(),
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: 0,
}),
commitment: None,
min_context_slot: None,
},
with_context: None,
},
)
.await
{
Ok(value) => Ok(value.iter().map(|(pk, _)| *pk).collect_vec()),
Err(e) => {
if enable_smart_warmup {
log::info!(
"gPA failed, using smart warmup feature for program {}",
filter.program_id.clone().unwrap_or_default()
);
let mut accounts = vec![];
for desc in 0..=u8::MAX {
let desc_filter = AccountFilterType::Memcmp(MemcmpFilter {
offset: WARMUP_BYTE_INDEX,
data: MemcmpFilterData::Bytes(vec![desc]),
});
let program_id = filter.program_id.clone();
let filter_list = match &filter.filters {
Some(filters) => {
let mut filters = filters.clone();
filters.push(desc_filter);
filters
}
None => {
vec![desc_filter]
}
};
let new_filter = AccountFilter {
accounts: vec![],
program_id,
filters: Some(filter_list),
};
let mut acc_list =
Self::get_account_keys(rpc_client, &new_filter, false).await?;
accounts.append(&mut acc_list);
}
Ok(accounts)
} else {
log::error!("The account filter provided timed out while getProgramAccounts from RPC, try making filter smaller. error {e:?}");
bail!("The account filter provided timed out while getProgramAccounts from RPC, try making filter smaller");
}
}
}
} else {
bail!("Account filter does not contain accounts nor program filters");
}
}

pub async fn populate_from_rpc(
&self,
rpc_client: Arc<RpcClient>,
filters: &AccountFilters,
max_request_in_parallel: usize,
enable_smart_accounts_warmup: bool,
) -> anyhow::Result<()> {
const NB_ACCOUNTS_IN_GMA: usize = 100;
const NB_RETRY: usize = 10;
let mut accounts = vec![];
for filter in filters.iter() {
if !filter.accounts.is_empty() {
let mut f_accounts = filter
.accounts
.iter()
.map(|x| Pubkey::from_str(x).expect("Accounts in filters should be valid"))
.collect();
accounts.append(&mut f_accounts);
}

if let Some(program_id) = &filter.program_id {
let program_id =
Pubkey::from_str(program_id).expect("Program id in filters should be valid");
let mut rpc_acc = rpc_client
.get_program_accounts_with_config(
&program_id,
RpcProgramAccountsConfig {
filters: filter.get_rpc_filter(),
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: 0,
}),
commitment: None,
min_context_slot: None,
},
with_context: None,
},
)
.await?
.iter()
.map(|(pk, _)| *pk)
.collect_vec();
accounts.append(&mut rpc_acc);
}
let mut keys =
Self::get_account_keys(&rpc_client, filter, enable_smart_accounts_warmup).await?;
accounts.append(&mut keys);
}
log::info!("Fetching {} accounts", accounts.len());
for accounts in accounts.chunks(max_request_in_parallel * NB_ACCOUNTS_IN_GMA) {
Expand Down
37 changes: 30 additions & 7 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
use anyhow::Context;
use clap::Parser;
use dotenv::dotenv;
use serde::{Deserialize, Serialize};
use solana_rpc_client_api::client_error::reqwest::Url;

#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -86,10 +87,7 @@ pub struct Config {
pub enable_address_lookup_tables: Option<bool>,

#[serde(default)]
pub account_filters: Option<String>,

#[serde(default)]
pub enable_accounts_on_demand_accounts_service: Option<bool>,
pub accounts_configuration: AccountConfiguration,
}

impl Config {
Expand Down Expand Up @@ -219,12 +217,25 @@ impl Config {
.ok()
.or(config.address_lookup_tables_binary);

config.account_filters = env::var("ACCOUNT_FILTERS").ok().or(config.account_filters);
config.accounts_configuration.account_filters = env::var("ACCOUNT_FILTERS")
.ok()
.or(config.accounts_configuration.account_filters);

config.enable_accounts_on_demand_accounts_service = env::var("ENABLE_ACCOUNT_ON_DEMAND")
config
.accounts_configuration
.enable_accounts_on_demand_accounts_service = env::var("ENABLE_SMART_ACCOUNT_WARMUP")
.map(|value| value.parse::<bool>().unwrap())
.ok()
.or(config.enable_accounts_on_demand_accounts_service);
.or(config.accounts_configuration.enable_smart_accounts_warmup);

config
.accounts_configuration
.enable_accounts_on_demand_accounts_service = env::var("ENABLE_ACCOUNT_ON_DEMAND")
.map(|value| value.parse::<bool>().unwrap())
.ok()
.or(config
.accounts_configuration
.enable_accounts_on_demand_accounts_service);
Ok(config)
}

Expand Down Expand Up @@ -349,3 +360,15 @@ fn obfuscate_token(token: &Option<String>) -> String {
}
}
}

#[derive(Clone, Serialize, Deserialize, Debug, Default)]
pub struct AccountConfiguration {
#[serde(default)]
pub account_filters: Option<String>,

#[serde(default)]
pub enable_accounts_on_demand_accounts_service: Option<bool>,

#[serde(default)]
pub enable_smart_accounts_warmup: Option<bool>,
}
20 changes: 15 additions & 5 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
enable_grpc_stream_inspection,
enable_address_lookup_tables,
address_lookup_tables_binary,
account_filters,
enable_accounts_on_demand_accounts_service,
accounts_configuration,
..
} = args;

Expand All @@ -149,15 +148,16 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:

let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);

let account_filters = if let Some(account_filters) = account_filters {
let account_filters = if let Some(account_filters) = accounts_configuration.account_filters {
serde_json::from_str::<AccountFilters>(account_filters.as_str())
.expect("Account filters should be valid")
} else {
vec![]
};

let enable_accounts_on_demand_accounts_service =
enable_accounts_on_demand_accounts_service.unwrap_or_default();
let enable_accounts_on_demand_accounts_service = accounts_configuration
.enable_accounts_on_demand_accounts_service
.unwrap_or_default();
if enable_accounts_on_demand_accounts_service {
log::info!("Accounts on demand service is enabled");
} else {
Expand Down Expand Up @@ -232,11 +232,21 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
account_service
.process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe());

let enable_smart_accounts_warmup = accounts_configuration
.enable_smart_accounts_warmup
.unwrap_or_default();
if enable_smart_accounts_warmup {
info!("Smart account warmup enabled");
} else {
info!("Smart account warmup disabled");
}

account_service
.populate_from_rpc(
rpc_client.clone(),
&account_filters,
MAX_CONNECTIONS_IN_PARALLEL,
enable_smart_accounts_warmup,
)
.await?;
Some(account_service)
Expand Down
Loading