Skip to content

Commit

Permalink
Router: fix 'channel lagged by' for composite price feed (#3)
Browse files Browse the repository at this point in the history
* Router: fix 'channel lagged by' for composite price feed

* Router: fix typo
  • Loading branch information
farnyser authored Sep 27, 2024
1 parent dbcbd42 commit ba138ed
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions lib/router-lib/src/price_feeds/composite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use crate::price_feeds::birdeye::BirdeyePriceFeed;
use crate::price_feeds::birdeye_single::BirdeyeSinglePriceFeed;
use crate::price_feeds::fillcity::FillCityPriceFeed;
use crate::price_feeds::price_feed::{PriceFeed, PriceUpdate};
use itertools::Itertools;
use router_config_lib::PriceFeedConfig;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashSet;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -79,11 +81,27 @@ impl PriceFeed for CompositePriceFeed {
impl CompositePriceFeed {
async fn refresh(
birdeye_token: &String,
birdeye_signle_mode: bool,
birdeye_single_mode: bool,
mints: &HashSet<Pubkey>,
sender: broadcast::Sender<PriceUpdate>,
) -> anyhow::Result<()> {
let (local_sender, mut local_receiver) = broadcast::channel::<PriceUpdate>(10_000);
let mints = mints.iter().copied().collect_vec();
for chunk in mints.chunks(10_000) {
let chunk = chunk.iter().copied().collect();

Self::refresh_chunk(birdeye_token, birdeye_single_mode, &chunk, sender.clone()).await?;
}

Ok(())
}

async fn refresh_chunk(
birdeye_token: &String,
birdeye_single_mode: bool,
mints: &HashSet<Pubkey>,
sender: Sender<PriceUpdate>,
) -> anyhow::Result<()> {
let (local_sender, mut local_receiver) = broadcast::channel::<PriceUpdate>(mints.len() + 1);

info!("Querying price with fill city for {} mints", mints.len());

Expand All @@ -94,7 +112,7 @@ impl CompositePriceFeed {
if !mints.is_empty() {
info!("Querying price with birdeye for {} mints", mints.len());

let result = if birdeye_signle_mode {
let result = if birdeye_single_mode {
BirdeyeSinglePriceFeed::refresh(
birdeye_token.to_string(),
&mints,
Expand All @@ -108,7 +126,6 @@ impl CompositePriceFeed {
Self::handle_source_results(&mints, sender.clone(), &mut local_receiver, result)
.await?;
}

Ok(())
}

Expand Down

0 comments on commit ba138ed

Please sign in to comment.