Skip to content

Commit

Permalink
Adding configuration to make number of accounts fetched by gma (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus authored Oct 1, 2024
1 parent 1bd6d1a commit 20fd5e7
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 8 deletions.
1 change: 1 addition & 0 deletions bin/autobahn-router/examples/grpc_source_tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub async fn main() {
grpc_sources: Some(vec![]),
dedup_queue_size: 0,
request_timeout_in_seconds: None,
number_of_accounts_per_gma: None,
};

// Raydium
Expand Down
8 changes: 7 additions & 1 deletion bin/autobahn-router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn main() -> anyhow::Result<()> {
.unwrap_or_else(|| panic!("did not find a source config for region {}", region));

let rpc = build_rpc(&source_config);
let number_of_accounts_per_gma = source_config.number_of_accounts_per_gma.unwrap_or(100);

// handle sigint
let exit_flag: Arc<atomic::AtomicBool> = Arc::new(atomic::AtomicBool::new(false));
Expand Down Expand Up @@ -313,7 +314,12 @@ async fn main() -> anyhow::Result<()> {
info!("Using {} mints", mints.len(),);

let token_cache = {
let mint_metadata = request_mint_metadata(&source_config.rpc_http_url, &mints).await;
let mint_metadata = request_mint_metadata(
&source_config.rpc_http_url,
&mints,
number_of_accounts_per_gma,
)
.await;
let mut data: HashMap<Pubkey, token_cache::Decimals> = HashMap::new();
for (mint_pubkey, Token { mint, decimals }) in mint_metadata {
assert_eq!(mint_pubkey, mint);
Expand Down
4 changes: 2 additions & 2 deletions bin/autobahn-router/src/source/grpc_plugin_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use yellowstone_grpc_proto::geyser::{
use yellowstone_grpc_proto::tonic::codec::CompressionEncoding;

const MAX_GRPC_ACCOUNT_SUBSCRIPTIONS: usize = 100;
const MAX_GMA_ACCOUNTS: usize = 100;

// limit number of concurrent gMA/gPA requests
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
Expand Down Expand Up @@ -72,6 +71,7 @@ pub async fn feed_data_geyser(
sender: async_channel::Sender<SourceMessage>,
) -> anyhow::Result<()> {
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100);
let grpc_connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
'$' => env::var(&grpc_config.connection_string[1..])
.expect("reading connection string from env"),
Expand Down Expand Up @@ -336,7 +336,7 @@ pub async fn feed_data_geyser(
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));

info!("Requesting snapshot from gMA for {} filter accounts", accounts_filter.len());
for pubkey_chunk in accounts_filter.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
for pubkey_chunk in accounts_filter.iter().chunks(number_of_accounts_per_gma).into_iter() {
let rpc_http_url = snapshot_rpc_http_url.clone();
let account_ids = pubkey_chunk.cloned().collect_vec();
let sender = snapshot_gma_sender.clone();
Expand Down
4 changes: 2 additions & 2 deletions bin/autobahn-router/src/source/mint_accounts_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::time::Instant;
use tokio::sync::Semaphore;
use tracing::{info, trace};

const MAX_GMA_ACCOUNTS: usize = 100;
// 4: 388028 mints -> 61 sec
// 16: 388028 mints -> 35 sec
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 16;
Expand All @@ -30,6 +29,7 @@ pub struct Token {
pub async fn request_mint_metadata(
rpc_http_url: &str,
mint_account_ids: &HashSet<Pubkey>,
max_gma_accounts: usize,
) -> HashMap<Pubkey, Token> {
info!(
"Requesting data for mint accounts via chunked gMA for {} pubkey ..",
Expand All @@ -51,7 +51,7 @@ pub async fn request_mint_metadata(

let mut threads = Vec::new();
let count = Arc::new(AtomicU64::new(0));
for pubkey_chunk in mint_account_ids.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
for pubkey_chunk in mint_account_ids.iter().chunks(max_gma_accounts).into_iter() {
let pubkey_chunk = pubkey_chunk.into_iter().cloned().collect_vec();
let count = count.clone();
let rpc_client = rpc_client.clone();
Expand Down
5 changes: 2 additions & 3 deletions bin/autobahn-router/src/source/quic_plugin_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ use router_feed_lib::get_program_account::{
use solana_program::clock::Slot;
use tokio::sync::Semaphore;

const MAX_GMA_ACCOUNTS: usize = 100;

// limit number of concurrent gMA/gPA requests
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;

Expand All @@ -46,6 +44,7 @@ pub async fn feed_data_geyser(
sender: async_channel::Sender<SourceMessage>,
) -> anyhow::Result<()> {
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100);

let snapshot_rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() {
'$' => env::var(&snapshot_config.rpc_http_url[1..])
Expand Down Expand Up @@ -194,7 +193,7 @@ pub async fn feed_data_geyser(
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));

info!("Requesting snapshot from gMA for {} filter accounts", subscribed_accounts.len());
for pubkey_chunk in subscribed_accounts.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
for pubkey_chunk in subscribed_accounts.iter().chunks(number_of_accounts_per_gma).into_iter() {
let rpc_http_url = snapshot_rpc_http_url.clone();
let account_ids = pubkey_chunk.map(|x| *x).collect_vec();
let sender = snapshot_gma_sender.clone();
Expand Down
1 change: 1 addition & 0 deletions lib/router-config-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub struct AccountDataSourceConfig {
pub grpc_sources: Option<Vec<GrpcSourceConfig>>,
pub dedup_queue_size: usize,
pub request_timeout_in_seconds: Option<u64>,
pub number_of_accounts_per_gma: Option<usize>,
}

#[derive(Clone, Debug, serde_derive::Deserialize)]
Expand Down

0 comments on commit 20fd5e7

Please sign in to comment.