Skip to content

Commit

Permalink
Router: add a liquidity endpoint
Browse files Browse the repository at this point in the history
Fetch total liquidity (in US $) of a token reachable through autobahn
  • Loading branch information
farnyser committed Sep 26, 2024
1 parent ad94eca commit 645e150
Show file tree
Hide file tree
Showing 11 changed files with 284 additions and 45 deletions.
22 changes: 0 additions & 22 deletions bin/autobahn-router/src/edge_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,6 @@ pub struct Dex {
}

impl Dex {
pub fn _desc(&self) -> String {
match &self.subscription_mode {
DexSubscriptionMode::Disabled => {
format!("Dex {} mode=Disabled", self.name)
}
DexSubscriptionMode::Accounts(subscribed_pks) => {
format!("Dex {} mode=gMa #pks={}", self.name, subscribed_pks.len())
}
DexSubscriptionMode::Programs(subscribed_prgs) => format!(
"Dex {} mode=gPa program_ids={:?}",
self.name, subscribed_prgs
),
DexSubscriptionMode::Mixed(m) => format!(
"Dex {} mode=mix #pks={} program_ids={:?}, tokens_for_owners={:?}",
self.name,
m.accounts.len(),
m.programs,
m.token_accounts_for_owner
),
}
}

pub fn edges(&self) -> Vec<Arc<Edge>> {
let edges: Vec<Arc<Edge>> = self
.edges_per_pk
Expand Down
57 changes: 57 additions & 0 deletions bin/autobahn-router/src/liquidity/liquidity_computer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::edge::Edge;
use router_lib::dex::AccountProviderView;
use std::sync::Arc;

pub fn compute_liquidity(
edge: &Arc<Edge>,
chain_data: &AccountProviderView,
) -> anyhow::Result<u64> {
let loaded = edge.prepare(&chain_data)?;

let first_in_amount = edge
.state
.read()
.unwrap()
.cached_prices
.first()
.map(|x| x.0);
let Some(first_in_amount) = first_in_amount else {
anyhow::bail!("Too early to compute liquidity");
};

let mut iter_counter = 0;
let mut has_failed = false;
let mut last_successful_in_amount = first_in_amount;
let mut next_in_amount = first_in_amount;
let mut last_successful_out_amount = 0;
let acceptable_price_impact = 0.3;

loop {
if next_in_amount == 0 || iter_counter > 50 {
break;
}
iter_counter = iter_counter + 1;

let quote = edge.quote(&loaded, &chain_data, next_in_amount);
let expected_output = (2.0 - acceptable_price_impact) * last_successful_out_amount as f64;

let out_amount = quote.map(|x| x.out_amount).unwrap_or(0);

if (out_amount as f64) < expected_output {
if has_failed {
break;
}
has_failed = true;
next_in_amount = next_in_amount
.saturating_add(last_successful_in_amount)
.saturating_div(2);
continue;
};

last_successful_in_amount = next_in_amount;
last_successful_out_amount = out_amount;
next_in_amount = next_in_amount.saturating_mul(2);
}

Ok(last_successful_out_amount)
}
66 changes: 66 additions & 0 deletions bin/autobahn-router/src/liquidity/liquidity_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::token_cache::TokenCache;
use ordered_float::{FloatCore, Pow};
use router_lib::price_feeds::price_cache::PriceCache;
use solana_program::pubkey::Pubkey;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

pub struct Liquidity {
pub liquidity_by_pool: HashMap<Pubkey, u64>,
}

pub struct LiquidityProvider {
liquidity_by_mint: HashMap<Pubkey, Liquidity>,
token_cache: TokenCache,
price_cache: PriceCache,
}

pub type LiquidityProviderArcRw = Arc<RwLock<LiquidityProvider>>;

impl LiquidityProvider {
pub fn new(token_cache: TokenCache, price_cache: PriceCache) -> LiquidityProvider {
LiquidityProvider {
liquidity_by_mint: Default::default(),
token_cache,
price_cache,
}
}

pub fn set_liquidity(&mut self, mint: Pubkey, pool: Pubkey, liquidity: u64) {
if let Some(cache) = self.liquidity_by_mint.get_mut(&mint) {
cache.liquidity_by_pool.insert(pool, liquidity);
} else {
self.liquidity_by_mint.insert(
mint,
Liquidity {
liquidity_by_pool: HashMap::from([(pool, liquidity)]),
},
);
}
}

pub fn get_total_liquidity_native(&self, mint: Pubkey) -> u64 {
if let Some(cache) = self.liquidity_by_mint.get(&mint) {
let mut sum = 0u64;
for amount in cache.liquidity_by_pool.iter().map(|x| x.1) {
sum = sum.saturating_add(*amount);
}
sum
} else {
0
}
}

pub fn get_total_liquidity_in_dollars(&self, mint: Pubkey) -> anyhow::Result<f64> {
let liquidity_native = self.get_total_liquidity_native(mint);
let price = self
.price_cache
.price_ui(mint)
.ok_or(anyhow::format_err!("no price"))?;
let decimal = self.token_cache.token(mint).map(|x| x.decimals)?;

let liquidity_dollars = (liquidity_native as f64 / 10.0.pow(decimal as f64)) * price;

Ok(liquidity_dollars)
}
}
64 changes: 64 additions & 0 deletions bin/autobahn-router/src/liquidity/liquidity_updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::debug_tools;
use crate::edge::Edge;
use crate::liquidity::liquidity_computer::compute_liquidity;
use crate::liquidity::liquidity_provider::LiquidityProviderArcRw;
use crate::util::tokio_spawn;
use itertools::Itertools;
use router_lib::dex::AccountProviderView;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{debug, info};

pub fn spawn_liquidity_updater_job(
provider: LiquidityProviderArcRw,
edges: Vec<Arc<Edge>>,
chain_data: AccountProviderView,
mut exit: broadcast::Receiver<()>,
) -> JoinHandle<()> {
let job = tokio_spawn("liquidity_updater", async move {
let mut refresh_all_interval = tokio::time::interval(Duration::from_secs(30));
refresh_all_interval.tick().await;

loop {
tokio::select! {
_ = exit.recv() => {
info!("shutting down liquidity_updater task");
break;
}
_ = refresh_all_interval.tick() => {
refresh_liquidity(&provider, &edges, &chain_data);
}
}
}
});

job
}

fn refresh_liquidity(
provider: &LiquidityProviderArcRw,
edges: &Vec<Arc<Edge>>,
account_provider: &AccountProviderView,
) {
for edge in edges {
let liquidity = compute_liquidity(&edge, &account_provider);
if let Ok(liquidity) = liquidity {
provider
.write()
.unwrap()
.set_liquidity(edge.output_mint, edge.id.key(), liquidity);
} else {
debug!("Could not compute liquidity for {}", edge.id.desc())
}
}

for mint in edges.iter().map(|x| x.output_mint).unique() {
debug!(
"Liquidity for {} -> {}",
debug_tools::name(&mint),
provider.read().unwrap().get_total_liquidity_native(mint)
)
}
}
7 changes: 7 additions & 0 deletions bin/autobahn-router/src/liquidity/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod liquidity_computer;
mod liquidity_provider;
mod liquidity_updater;

pub use liquidity_provider::LiquidityProvider;
pub use liquidity_provider::LiquidityProviderArcRw;
pub use liquidity_updater::spawn_liquidity_updater_job;
50 changes: 32 additions & 18 deletions bin/autobahn-router/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,8 @@
use crate::edge_updater::{spawn_updater_job, Dex};
use crate::hot_mints::HotMintsCache;
use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl};
use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider};
use crate::path_warmer::spawn_path_warmer_job;
use itertools::chain;
use mango_feeds_connector::chain_data::ChainData;
use mango_feeds_connector::SlotUpdate;
use prelude::*;
use router_lib::price_feeds::price_cache::PriceCache;
use router_lib::price_feeds::price_feed::PriceFeed;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::RpcClient as BlockingRpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use source::geyser;
use std::env;
use std::process::exit;
use std::sync::RwLockWriteGuard;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;

use crate::hot_mints::HotMintsCache;
use crate::prometheus_sync::PrometheusSync;
use crate::routing::Routing;
use crate::server::alt_provider::RpcAltProvider;
Expand All @@ -31,6 +15,10 @@ use crate::token_cache::{Decimals, TokenCache};
use crate::tx_watcher::spawn_tx_watcher_jobs;
use crate::util::tokio_spawn;
use dex_orca::OrcaDex;
use itertools::chain;
use mango_feeds_connector::chain_data::ChainData;
use mango_feeds_connector::SlotUpdate;
use prelude::*;
use router_config_lib::{string_or_env, AccountDataSourceConfig, Config};
use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite};
use router_feed_lib::get_program_account::FeedMetadata;
Expand All @@ -42,6 +30,18 @@ use router_lib::dex::{
};
use router_lib::mango;
use router_lib::price_feeds::composite::CompositePriceFeed;
use router_lib::price_feeds::price_cache::PriceCache;
use router_lib::price_feeds::price_feed::PriceFeed;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_client::rpc_client::RpcClient as BlockingRpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use source::geyser;
use std::env;
use std::process::exit;
use std::sync::RwLockWriteGuard;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;

mod alt;
mod debug_tools;
Expand All @@ -50,6 +50,7 @@ pub mod edge;
mod edge_updater;
mod hot_mints;
pub mod ix_builder;
mod liquidity;
mod metrics;
mod mock;
mod path_warmer;
Expand Down Expand Up @@ -378,11 +379,23 @@ async fn main() -> anyhow::Result<()> {
router_version as u8,
));

let liquidity_provider = Arc::new(RwLock::new(LiquidityProvider::new(
token_cache.clone(),
price_cache.clone(),
)));
let liquidity_job = spawn_liquidity_updater_job(
liquidity_provider.clone(),
edges.clone(),
chain_data_wrapper,
exit_sender.subscribe(),
);

let server_job = HttpServer::start(
route_provider.clone(),
hash_provider,
alt_provider,
live_account_provider,
liquidity_provider.clone(),
ix_builder,
config.clone(),
exit_sender.subscribe(),
Expand Down Expand Up @@ -495,6 +508,7 @@ async fn main() -> anyhow::Result<()> {
tx_sender_job,
tx_watcher_job,
account_update_job,
liquidity_job,
]
.into_iter()
.chain(update_jobs.into_iter())
Expand Down
Loading

0 comments on commit 645e150

Please sign in to comment.