Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pindexer: add dex block summary #5063

Merged
merged 17 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ network-integration = []
[dependencies]
anyhow = {workspace = true}
clap = {workspace = true}
chrono = {workspace = true}
chrono = {workspace = true, features = ["serde"] }
cometindex = {workspace = true}
ethnum = {workspace = true}
num-bigint = { version = "0.4" }
Expand All @@ -42,6 +42,7 @@ prost = {workspace = true}
tracing = {workspace = true}
tokio = {workspace = true, features = ["full"]}
serde_json = {workspace = true}
serde = { workspace = true, features = ["derive"] }
sqlx = { workspace = true, features = ["chrono", "postgres"] }

[dev-dependencies]
Expand Down
161 changes: 160 additions & 1 deletion crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use cometindex::{
AppView, PgTransaction,
};
use penumbra_sdk_asset::asset;
use penumbra_sdk_dex::lp::position::{Id as PositionId, Position};
use penumbra_sdk_dex::{
event::{
EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution,
Expand All @@ -14,10 +13,15 @@ use penumbra_sdk_dex::{
lp::Reserves,
DirectedTradingPair, SwapExecution, TradingPair,
};
use penumbra_sdk_dex::{
event::{EventSwap, EventSwapClaim},
lp::position::{Id as PositionId, Position},
};
use penumbra_sdk_num::Amount;
use penumbra_sdk_proto::event::EventDomainType;
use penumbra_sdk_proto::DomainType;
use penumbra_sdk_sct::event::EventBlockRoot;
use serde::{Deserialize, Serialize};
use sqlx::types::BigDecimal;
use sqlx::Row;
use std::collections::{BTreeMap, HashMap, HashSet};
Expand Down Expand Up @@ -668,6 +672,40 @@ struct PairMetrics {
liquidity_change: f64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct BatchSwapSummary {
#[serde(serialize_with = "serialize_asset_id")]
asset_start: penumbra_sdk_asset::asset::Id,
#[serde(serialize_with = "serialize_asset_id")]
asset_end: penumbra_sdk_asset::asset::Id,
#[serde(serialize_with = "serialize_amount")]
input: Amount,
#[serde(serialize_with = "serialize_amount")]
output: Amount,
num_swaps: i32,
price_float: f64,
}

// Add these serialization helper functions
fn serialize_asset_id<S>(
asset_id: &penumbra_sdk_asset::asset::Id,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// Serialize the asset ID as bytes
serializer.serialize_bytes(asset_id.to_bytes().as_ref())
}

fn serialize_amount<S>(amount: &Amount, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// Serialize the amount as a string to preserve precision
serializer.serialize_str(&amount.value().to_string())
}

#[derive(Debug)]
struct Events {
time: Option<DateTime>,
Expand All @@ -682,6 +720,8 @@ struct Events {
position_closes: Vec<EventPositionClose>,
position_withdrawals: Vec<EventPositionWithdraw>,
batch_swaps: Vec<EventBatchSwap>,
swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
// Track transaction hashes by position ID
position_open_txs: BTreeMap<PositionId, [u8; 32]>,
position_close_txs: BTreeMap<PositionId, [u8; 32]>,
Expand All @@ -701,6 +741,8 @@ impl Events {
position_closes: Vec::new(),
position_withdrawals: Vec::new(),
batch_swaps: Vec::new(),
swaps: BTreeMap::new(),
swap_claims: BTreeMap::new(),
position_open_txs: BTreeMap::new(),
position_close_txs: BTreeMap::new(),
position_withdrawal_txs: BTreeMap::new(),
Expand Down Expand Up @@ -859,6 +901,16 @@ impl Events {
}
} else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
out.batch_swaps.push(e);
} else if let Ok(e) = EventSwap::try_from_event(&event.event) {
out.swaps
.entry(e.trading_pair)
.or_insert_with(Vec::new)
.push(e);
} else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
out.swap_claims
.entry(e.trading_pair)
.or_insert_with(Vec::new)
.push(e);
}
}
Ok(out)
Expand Down Expand Up @@ -1081,6 +1133,109 @@ impl Component {
Ok(())
}

async fn record_block_summary(
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
height: i32,
events: &Events,
) -> anyhow::Result<()> {
let num_opened_lps = events.position_opens.len() as i32;
let num_closed_lps = events.position_closes.len() as i32;
let num_withdrawn_lps = events.position_withdrawals.len() as i32;
let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
let num_swap_claims = events
.swap_claims
.iter()
.map(|(_, v)| v.len())
.sum::<usize>() as i32;
let num_txs = events.batch_swaps.len() as i32;

let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();

// Deal with the batch summaries:
for event in &events.batch_swaps {
let trading_pair = event.batch_swap_output_data.trading_pair;

if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
let asset_start = swap_1_2.input.asset_id;
let asset_end = swap_1_2.output.asset_id;
let input = swap_1_2.input.amount;
let output = swap_1_2.output.amount;
let price_float = (output.value() as f64) / (input.value() as f64);

let empty_vec = vec![];
let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
let filtered_swaps: Vec<_> = swaps_for_pair
.iter()
.filter(|swap| swap.delta_1_i != Amount::zero())
.collect::<Vec<_>>();
let num_swaps = filtered_swaps.len() as i32;

batch_swap_summaries.push(BatchSwapSummary {
asset_start,
asset_end,
input,
output,
num_swaps,
price_float,
});
}

if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
let asset_start = swap_2_1.input.asset_id;
let asset_end = swap_2_1.output.asset_id;
let input = swap_2_1.input.amount;
let output = swap_2_1.output.amount;
let price_float = (output.value() as f64) / (input.value() as f64);

let empty_vec = vec![];
let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
let filtered_swaps: Vec<_> = swaps_for_pair
.iter()
.filter(|swap| swap.delta_2_i != Amount::zero())
.collect::<Vec<_>>();
let num_swaps = filtered_swaps.len() as i32;

batch_swap_summaries.push(BatchSwapSummary {
asset_start,
asset_end,
input,
output,
num_swaps,
price_float,
});
}
}

sqlx::query(
"INSERT INTO dex_ex_block_summary (
height,
time,
batch_swaps,
num_open_lps,
num_closed_lps,
num_withdrawn_lps,
num_swaps,
num_swap_claims,
num_txs
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
)
.bind(height)
.bind(time)
.bind(serde_json::to_value(&batch_swap_summaries)?)
.bind(num_opened_lps)
.bind(num_closed_lps)
.bind(num_withdrawn_lps)
.bind(num_swaps)
.bind(num_swap_claims)
.bind(num_txs)
.execute(dbtx.as_mut())
.await?;

Ok(())
}

async fn record_batch_swap_traces(
&self,
dbtx: &mut PgTransaction<'_>,
Expand Down Expand Up @@ -1304,6 +1459,10 @@ impl AppView for Component {
// Load any missing positions before processing events
events.load_positions(dbtx).await?;

// This is where we are going to build the block summary for the DEX.
self.record_block_summary(dbtx, time, block.height as i32, &events)
.await?;

// Record batch swap execution traces.
for event in &events.batch_swaps {
self.record_batch_swap_traces(dbtx, time, block.height as i32, event)
Expand Down
42 changes: 42 additions & 0 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,48 @@ CREATE INDEX ON dex_ex_batch_swap_traces (time, height);
CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end);
-- TODO(erwan): We can add a GIN index on the position id later.

-- A high-level view of a batch swap.
CREATE TYPE batch_swap_summary AS (
-- The directed start asset of the batch swap.
asset_start BYTEA,
-- The directed end asset of the batch swap.
asset_end BYTEA,
-- The amount of asset was consumed by the batch swap.
input NUMERIC(39),
-- The amount of asset was produced by the batch swap.
output NUMERIC(39),
-- The number of swaps in the batch swap.
num_swaps INTEGER,
-- The price with `asset_end` as the quote asset.
price_float DOUBLE PRECISION
);

-- A summary of block data with a bias for DEX data.
CREATE TABLE IF NOT EXISTS dex_ex_block_summary (
-- Primary key
rowid SERIAL PRIMARY KEY,
-- The height of the block.
height INTEGER UNIQUE NOT NULL,
-- The timestamp for the block.
time TIMESTAMPTZ NOT NULL,
-- A list of batch swap summaries that occurred in this block.
batch_swaps batch_swap_summary[] NOT NULL,
-- The number of opened LPs in this block.
num_open_lps INTEGER NOT NULL,
-- The number of closed LPs in this block.
num_closed_lps INTEGER NOT NULL,
-- The number of withdrawn LPs in this block.
num_withdrawn_lps INTEGER NOT NULL,
-- The number of swaps in this block.
num_swaps INTEGER NOT NULL,
-- The number of swap claims in this block.
num_swap_claims INTEGER NOT NULL,
-- The number of transactions in this block.
num_txs INTEGER NOT NULL
);

CREATE INDEX ON dex_ex_block_summary (time, height);

ALTER TABLE dex_ex_position_executions
ADD CONSTRAINT fk_position_executions
FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id);
Expand Down
Loading