Skip to content

Commit

Permalink
pindexer: add dex block summary (#5063)
Browse files Browse the repository at this point in the history
## Describe your changes
WIP pairing

## Issue ticket number and link

penumbra-zone/dex-explorer#338

## Checklist before requesting a review

- [ ] I have added guiding text to explain how a reviewer should test
these changes.

- [ ] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > REPLACE THIS TEXT WITH RATIONALE (CAN BE BRIEF)

---------

Co-authored-by: Jason M. Hasperhoven <[email protected]>
Co-authored-by: Conor Schaefer <[email protected]>
  • Loading branch information
3 people authored Feb 13, 2025
1 parent 6f36f59 commit 41c9a80
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ network-integration = []
[dependencies]
anyhow = {workspace = true}
clap = {workspace = true}
chrono = {workspace = true}
chrono = {workspace = true, features = ["serde"] }
cometindex = {workspace = true}
ethnum = {workspace = true}
num-bigint = { version = "0.4" }
Expand All @@ -42,6 +42,7 @@ prost = {workspace = true}
tracing = {workspace = true}
tokio = {workspace = true, features = ["full"]}
serde_json = {workspace = true}
serde = { workspace = true, features = ["derive"] }
sqlx = { workspace = true, features = ["chrono", "postgres"] }

[dev-dependencies]
Expand Down
138 changes: 136 additions & 2 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use cometindex::{
AppView, PgTransaction,
};
use penumbra_sdk_asset::asset;
use penumbra_sdk_dex::lp::position::{Id as PositionId, Position};
use penumbra_sdk_dex::{
event::{
EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution,
Expand All @@ -14,12 +13,16 @@ use penumbra_sdk_dex::{
lp::Reserves,
DirectedTradingPair, SwapExecution, TradingPair,
};
use penumbra_sdk_dex::{
event::{EventSwap, EventSwapClaim},
lp::position::{Id as PositionId, Position},
};
use penumbra_sdk_num::Amount;
use penumbra_sdk_proto::event::EventDomainType;
use penumbra_sdk_proto::DomainType;
use penumbra_sdk_sct::event::EventBlockRoot;
use sqlx::types::BigDecimal;
use sqlx::Row;
use sqlx::{prelude::Type, Row};
use std::collections::{BTreeMap, HashMap, HashSet};

type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
Expand Down Expand Up @@ -668,6 +671,17 @@ struct PairMetrics {
liquidity_change: f64,
}

#[derive(Debug, Clone, Type)]
#[sqlx(type_name = "batch_swap_summary")]
struct BatchSwapSummary {
asset_start: Vec<u8>,
asset_end: Vec<u8>,
input: BigDecimal,
output: BigDecimal,
num_swaps: i32,
price_float: f64,
}

#[derive(Debug)]
struct Events {
time: Option<DateTime>,
Expand All @@ -682,6 +696,8 @@ struct Events {
position_closes: Vec<EventPositionClose>,
position_withdrawals: Vec<EventPositionWithdraw>,
batch_swaps: Vec<EventBatchSwap>,
swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
// Track transaction hashes by position ID
position_open_txs: BTreeMap<PositionId, [u8; 32]>,
position_close_txs: BTreeMap<PositionId, [u8; 32]>,
Expand All @@ -701,6 +717,8 @@ impl Events {
position_closes: Vec::new(),
position_withdrawals: Vec::new(),
batch_swaps: Vec::new(),
swaps: BTreeMap::new(),
swap_claims: BTreeMap::new(),
position_open_txs: BTreeMap::new(),
position_close_txs: BTreeMap::new(),
position_withdrawal_txs: BTreeMap::new(),
Expand Down Expand Up @@ -859,6 +877,16 @@ impl Events {
}
} else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
out.batch_swaps.push(e);
} else if let Ok(e) = EventSwap::try_from_event(&event.event) {
out.swaps
.entry(e.trading_pair)
.or_insert_with(Vec::new)
.push(e);
} else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
out.swap_claims
.entry(e.trading_pair)
.or_insert_with(Vec::new)
.push(e);
}
}
Ok(out)
Expand Down Expand Up @@ -1081,6 +1109,108 @@ impl Component {
Ok(())
}

async fn record_block_summary(
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
height: i32,
events: &Events,
) -> anyhow::Result<()> {
let num_opened_lps = events.position_opens.len() as i32;
let num_closed_lps = events.position_closes.len() as i32;
let num_withdrawn_lps = events.position_withdrawals.len() as i32;
let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
let num_swap_claims = events
.swap_claims
.iter()
.map(|(_, v)| v.len())
.sum::<usize>() as i32;
let num_txs = events.batch_swaps.len() as i32;

let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();

for event in &events.batch_swaps {
let trading_pair = event.batch_swap_output_data.trading_pair;

if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
let asset_start = swap_1_2.input.asset_id;
let asset_end = swap_1_2.output.asset_id;
let input = swap_1_2.input.amount;
let output = swap_1_2.output.amount;
let price_float = (output.value() as f64) / (input.value() as f64);

let empty_vec = vec![];
let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
let filtered_swaps: Vec<_> = swaps_for_pair
.iter()
.filter(|swap| swap.delta_1_i != Amount::zero())
.collect::<Vec<_>>();
let num_swaps = filtered_swaps.len() as i32;

batch_swap_summaries.push(BatchSwapSummary {
asset_start: asset_start.to_bytes().to_vec(),
asset_end: asset_end.to_bytes().to_vec(),
input: BigDecimal::from(input.value()),
output: BigDecimal::from(output.value()),
num_swaps,
price_float,
});
}

if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
let asset_start = swap_2_1.input.asset_id;
let asset_end = swap_2_1.output.asset_id;
let input = swap_2_1.input.amount;
let output = swap_2_1.output.amount;
let price_float = (output.value() as f64) / (input.value() as f64);

let empty_vec = vec![];
let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
let filtered_swaps: Vec<_> = swaps_for_pair
.iter()
.filter(|swap| swap.delta_2_i != Amount::zero())
.collect::<Vec<_>>();
let num_swaps = filtered_swaps.len() as i32;

batch_swap_summaries.push(BatchSwapSummary {
asset_start: asset_start.to_bytes().to_vec(),
asset_end: asset_end.to_bytes().to_vec(),
input: BigDecimal::from(input.value()),
output: BigDecimal::from(output.value()),
num_swaps,
price_float,
});
}
}

sqlx::query(
"INSERT INTO dex_ex_block_summary (
height,
time,
batch_swaps,
num_open_lps,
num_closed_lps,
num_withdrawn_lps,
num_swaps,
num_swap_claims,
num_txs
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
)
.bind(height)
.bind(time)
.bind(&batch_swap_summaries)
.bind(num_opened_lps)
.bind(num_closed_lps)
.bind(num_withdrawn_lps)
.bind(num_swaps)
.bind(num_swap_claims)
.bind(num_txs)
.execute(dbtx.as_mut())
.await?;

Ok(())
}

async fn record_batch_swap_traces(
&self,
dbtx: &mut PgTransaction<'_>,
Expand Down Expand Up @@ -1304,6 +1434,10 @@ impl AppView for Component {
// Load any missing positions before processing events
events.load_positions(dbtx).await?;

// This is where we are going to build the block summary for the DEX.
self.record_block_summary(dbtx, time, block.height as i32, &events)
.await?;

// Record batch swap execution traces.
for event in &events.batch_swaps {
self.record_batch_swap_traces(dbtx, time, block.height as i32, event)
Expand Down
42 changes: 42 additions & 0 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,48 @@ CREATE INDEX ON dex_ex_batch_swap_traces (time, height);
CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end);
-- TODO(erwan): We can add a GIN index on the position id later.

-- A high-level view of a batch swap.
CREATE TYPE batch_swap_summary AS (
-- The directed start asset of the batch swap.
asset_start BYTEA,
-- The directed end asset of the batch swap.
asset_end BYTEA,
-- The amount of asset was consumed by the batch swap.
input NUMERIC(39),
-- The amount of asset was produced by the batch swap.
output NUMERIC(39),
-- The number of swaps in the batch swap.
num_swaps INTEGER,
-- The price with `asset_end` as the quote asset.
price_float DOUBLE PRECISION
);

-- A summary of block data with a bias for DEX data.
CREATE TABLE IF NOT EXISTS dex_ex_block_summary (
-- Primary key
rowid SERIAL PRIMARY KEY,
-- The height of the block.
height INTEGER NOT NULL,
-- The timestamp for the block.
time TIMESTAMPTZ NOT NULL,
-- A list of batch swap summaries that occurred in this block.
batch_swaps batch_swap_summary[] NOT NULL,
-- The number of opened LPs in this block.
num_open_lps INTEGER NOT NULL,
-- The number of closed LPs in this block.
num_closed_lps INTEGER NOT NULL,
-- The number of withdrawn LPs in this block.
num_withdrawn_lps INTEGER NOT NULL,
-- The number of swaps in this block.
num_swaps INTEGER NOT NULL,
-- The number of swap claims in this block.
num_swap_claims INTEGER NOT NULL,
-- The number of transactions in this block.
num_txs INTEGER NOT NULL
);

CREATE INDEX ON dex_ex_block_summary (time, height);

ALTER TABLE dex_ex_position_executions
ADD CONSTRAINT fk_position_executions
FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id);
Expand Down

0 comments on commit 41c9a80

Please sign in to comment.