Skip to content

Commit 1bd6d1a

Browse files
authored
Router: add a liquidity endpoint (#2)
Fetch total liquidity (in US $) of a token reachable through autobahn
1 parent 9bd763f commit 1bd6d1a

File tree

10 files changed

+281
-40
lines changed

10 files changed

+281
-40
lines changed

bin/autobahn-router/src/edge_updater.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,6 @@ pub struct Dex {
2929
}
3030

3131
impl Dex {
32-
pub fn _desc(&self) -> String {
33-
match &self.subscription_mode {
34-
DexSubscriptionMode::Disabled => {
35-
format!("Dex {} mode=Disabled", self.name)
36-
}
37-
DexSubscriptionMode::Accounts(subscribed_pks) => {
38-
format!("Dex {} mode=gMa #pks={}", self.name, subscribed_pks.len())
39-
}
40-
DexSubscriptionMode::Programs(subscribed_prgs) => format!(
41-
"Dex {} mode=gPa program_ids={:?}",
42-
self.name, subscribed_prgs
43-
),
44-
DexSubscriptionMode::Mixed(m) => format!(
45-
"Dex {} mode=mix #pks={} program_ids={:?}, tokens_for_owners={:?}",
46-
self.name,
47-
m.accounts.len(),
48-
m.programs,
49-
m.token_accounts_for_owner
50-
),
51-
}
52-
}
53-
5432
pub fn edges(&self) -> Vec<Arc<Edge>> {
5533
let edges: Vec<Arc<Edge>> = self
5634
.edges_per_pk
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use crate::edge::Edge;
2+
use router_lib::dex::AccountProviderView;
3+
use std::sync::Arc;
4+
5+
pub fn compute_liquidity(
6+
edge: &Arc<Edge>,
7+
chain_data: &AccountProviderView,
8+
) -> anyhow::Result<u64> {
9+
let loaded = edge.prepare(&chain_data)?;
10+
11+
let first_in_amount = edge
12+
.state
13+
.read()
14+
.unwrap()
15+
.cached_prices
16+
.first()
17+
.map(|x| x.0);
18+
let Some(first_in_amount) = first_in_amount else {
19+
anyhow::bail!("Too early to compute liquidity");
20+
};
21+
22+
let mut iter_counter = 0;
23+
let mut has_failed = false;
24+
let mut last_successful_in_amount = first_in_amount;
25+
let mut next_in_amount = first_in_amount;
26+
let mut last_successful_out_amount = 0;
27+
let acceptable_price_impact = 0.3;
28+
29+
loop {
30+
if next_in_amount == 0 || iter_counter > 50 {
31+
break;
32+
}
33+
iter_counter = iter_counter + 1;
34+
35+
let quote = edge.quote(&loaded, &chain_data, next_in_amount);
36+
let expected_output = (2.0 - acceptable_price_impact) * last_successful_out_amount as f64;
37+
38+
let out_amount = quote.map(|x| x.out_amount).unwrap_or(0);
39+
40+
if (out_amount as f64) < expected_output {
41+
if has_failed {
42+
break;
43+
}
44+
has_failed = true;
45+
next_in_amount = next_in_amount
46+
.saturating_add(last_successful_in_amount)
47+
.saturating_div(2);
48+
continue;
49+
};
50+
51+
last_successful_in_amount = next_in_amount;
52+
last_successful_out_amount = out_amount;
53+
next_in_amount = next_in_amount.saturating_mul(2);
54+
}
55+
56+
Ok(last_successful_out_amount)
57+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use crate::token_cache::TokenCache;
2+
use ordered_float::{FloatCore, Pow};
3+
use router_lib::price_feeds::price_cache::PriceCache;
4+
use solana_program::pubkey::Pubkey;
5+
use std::collections::HashMap;
6+
use std::sync::{Arc, RwLock};
7+
8+
pub struct Liquidity {
9+
pub liquidity_by_pool: HashMap<Pubkey, u64>,
10+
}
11+
12+
pub struct LiquidityProvider {
13+
liquidity_by_mint: HashMap<Pubkey, Liquidity>,
14+
token_cache: TokenCache,
15+
price_cache: PriceCache,
16+
}
17+
18+
pub type LiquidityProviderArcRw = Arc<RwLock<LiquidityProvider>>;
19+
20+
impl LiquidityProvider {
21+
pub fn new(token_cache: TokenCache, price_cache: PriceCache) -> LiquidityProvider {
22+
LiquidityProvider {
23+
liquidity_by_mint: Default::default(),
24+
token_cache,
25+
price_cache,
26+
}
27+
}
28+
29+
pub fn set_liquidity(&mut self, mint: Pubkey, pool: Pubkey, liquidity: u64) {
30+
if let Some(cache) = self.liquidity_by_mint.get_mut(&mint) {
31+
cache.liquidity_by_pool.insert(pool, liquidity);
32+
} else {
33+
self.liquidity_by_mint.insert(
34+
mint,
35+
Liquidity {
36+
liquidity_by_pool: HashMap::from([(pool, liquidity)]),
37+
},
38+
);
39+
}
40+
}
41+
42+
pub fn get_total_liquidity_native(&self, mint: Pubkey) -> u64 {
43+
if let Some(cache) = self.liquidity_by_mint.get(&mint) {
44+
let mut sum = 0u64;
45+
for amount in cache.liquidity_by_pool.iter().map(|x| x.1) {
46+
sum = sum.saturating_add(*amount);
47+
}
48+
sum
49+
} else {
50+
0
51+
}
52+
}
53+
54+
pub fn get_total_liquidity_in_dollars(&self, mint: Pubkey) -> anyhow::Result<f64> {
55+
let liquidity_native = self.get_total_liquidity_native(mint);
56+
let price = self
57+
.price_cache
58+
.price_ui(mint)
59+
.ok_or(anyhow::format_err!("no price"))?;
60+
let decimal = self.token_cache.token(mint).map(|x| x.decimals)?;
61+
62+
let liquidity_dollars = (liquidity_native as f64 / 10.0.pow(decimal as f64)) * price;
63+
64+
Ok(liquidity_dollars)
65+
}
66+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use crate::debug_tools;
2+
use crate::edge::Edge;
3+
use crate::liquidity::liquidity_computer::compute_liquidity;
4+
use crate::liquidity::liquidity_provider::LiquidityProviderArcRw;
5+
use crate::util::tokio_spawn;
6+
use itertools::Itertools;
7+
use router_lib::dex::AccountProviderView;
8+
use std::sync::Arc;
9+
use std::time::Duration;
10+
use tokio::sync::broadcast;
11+
use tokio::task::JoinHandle;
12+
use tracing::{debug, info};
13+
14+
pub fn spawn_liquidity_updater_job(
15+
provider: LiquidityProviderArcRw,
16+
edges: Vec<Arc<Edge>>,
17+
chain_data: AccountProviderView,
18+
mut exit: broadcast::Receiver<()>,
19+
) -> JoinHandle<()> {
20+
let job = tokio_spawn("liquidity_updater", async move {
21+
let mut refresh_all_interval = tokio::time::interval(Duration::from_secs(30));
22+
refresh_all_interval.tick().await;
23+
24+
loop {
25+
tokio::select! {
26+
_ = exit.recv() => {
27+
info!("shutting down liquidity_updater task");
28+
break;
29+
}
30+
_ = refresh_all_interval.tick() => {
31+
refresh_liquidity(&provider, &edges, &chain_data);
32+
}
33+
}
34+
}
35+
});
36+
37+
job
38+
}
39+
40+
fn refresh_liquidity(
41+
provider: &LiquidityProviderArcRw,
42+
edges: &Vec<Arc<Edge>>,
43+
account_provider: &AccountProviderView,
44+
) {
45+
for edge in edges {
46+
let liquidity = compute_liquidity(&edge, &account_provider);
47+
if let Ok(liquidity) = liquidity {
48+
provider
49+
.write()
50+
.unwrap()
51+
.set_liquidity(edge.output_mint, edge.id.key(), liquidity);
52+
} else {
53+
debug!("Could not compute liquidity for {}", edge.id.desc())
54+
}
55+
}
56+
57+
for mint in edges.iter().map(|x| x.output_mint).unique() {
58+
debug!(
59+
"Liquidity for {} -> {}",
60+
debug_tools::name(&mint),
61+
provider.read().unwrap().get_total_liquidity_native(mint)
62+
)
63+
}
64+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod liquidity_computer;
2+
mod liquidity_provider;
3+
mod liquidity_updater;
4+
5+
pub use liquidity_provider::LiquidityProvider;
6+
pub use liquidity_provider::LiquidityProviderArcRw;
7+
pub use liquidity_updater::spawn_liquidity_updater_job;

bin/autobahn-router/src/main.rs

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,8 @@
11
use crate::edge_updater::{spawn_updater_job, Dex};
2+
use crate::hot_mints::HotMintsCache;
23
use crate::ix_builder::{SwapInstructionsBuilderImpl, SwapStepInstructionBuilderImpl};
4+
use crate::liquidity::{spawn_liquidity_updater_job, LiquidityProvider};
35
use crate::path_warmer::spawn_path_warmer_job;
4-
use itertools::chain;
5-
use mango_feeds_connector::chain_data::ChainData;
6-
use mango_feeds_connector::SlotUpdate;
7-
use prelude::*;
8-
use router_lib::price_feeds::price_cache::PriceCache;
9-
use router_lib::price_feeds::price_feed::PriceFeed;
10-
use solana_client::nonblocking::rpc_client::RpcClient;
11-
use solana_client::rpc_client::RpcClient as BlockingRpcClient;
12-
use solana_sdk::commitment_config::CommitmentConfig;
13-
use source::geyser;
14-
use std::env;
15-
use std::process::exit;
16-
use std::sync::RwLockWriteGuard;
17-
use std::time::{Duration, Instant};
18-
use tokio::sync::broadcast;
19-
use tokio::task::JoinHandle;
20-
21-
use crate::hot_mints::HotMintsCache;
226
use crate::prometheus_sync::PrometheusSync;
237
use crate::routing::Routing;
248
use crate::server::alt_provider::RpcAltProvider;
@@ -31,6 +15,10 @@ use crate::token_cache::{Decimals, TokenCache};
3115
use crate::tx_watcher::spawn_tx_watcher_jobs;
3216
use crate::util::tokio_spawn;
3317
use dex_orca::OrcaDex;
18+
use itertools::chain;
19+
use mango_feeds_connector::chain_data::ChainData;
20+
use mango_feeds_connector::SlotUpdate;
21+
use prelude::*;
3422
use router_config_lib::{string_or_env, AccountDataSourceConfig, Config};
3523
use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite};
3624
use router_feed_lib::get_program_account::FeedMetadata;
@@ -42,6 +30,18 @@ use router_lib::dex::{
4230
};
4331
use router_lib::mango;
4432
use router_lib::price_feeds::composite::CompositePriceFeed;
33+
use router_lib::price_feeds::price_cache::PriceCache;
34+
use router_lib::price_feeds::price_feed::PriceFeed;
35+
use solana_client::nonblocking::rpc_client::RpcClient;
36+
use solana_client::rpc_client::RpcClient as BlockingRpcClient;
37+
use solana_sdk::commitment_config::CommitmentConfig;
38+
use source::geyser;
39+
use std::env;
40+
use std::process::exit;
41+
use std::sync::RwLockWriteGuard;
42+
use std::time::{Duration, Instant};
43+
use tokio::sync::broadcast;
44+
use tokio::task::JoinHandle;
4545

4646
mod alt;
4747
mod debug_tools;
@@ -50,6 +50,7 @@ pub mod edge;
5050
mod edge_updater;
5151
mod hot_mints;
5252
pub mod ix_builder;
53+
mod liquidity;
5354
mod metrics;
5455
mod mock;
5556
mod path_warmer;
@@ -387,11 +388,23 @@ async fn main() -> anyhow::Result<()> {
387388
router_version as u8,
388389
));
389390

391+
let liquidity_provider = Arc::new(RwLock::new(LiquidityProvider::new(
392+
token_cache.clone(),
393+
price_cache.clone(),
394+
)));
395+
let liquidity_job = spawn_liquidity_updater_job(
396+
liquidity_provider.clone(),
397+
edges.clone(),
398+
chain_data_wrapper,
399+
exit_sender.subscribe(),
400+
);
401+
390402
let server_job = HttpServer::start(
391403
route_provider.clone(),
392404
hash_provider,
393405
alt_provider,
394406
live_account_provider,
407+
liquidity_provider.clone(),
395408
ix_builder,
396409
config.clone(),
397410
exit_sender.subscribe(),
@@ -504,6 +517,7 @@ async fn main() -> anyhow::Result<()> {
504517
tx_sender_job,
505518
tx_watcher_job,
506519
account_update_job,
520+
liquidity_job,
507521
]
508522
.into_iter()
509523
.chain(update_jobs.into_iter())

0 commit comments

Comments
 (0)