diff --git a/Cargo.lock b/Cargo.lock index 2b8d657bda..95442babfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6115,6 +6115,7 @@ dependencies = [ "prost-reflect", "reqwest 0.12.9", "rstest", + "serde", "serde_json", "sqlx", "tokio", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index f350ab6f25..c8d5af4f00 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -21,7 +21,7 @@ network-integration = [] [dependencies] anyhow = {workspace = true} clap = {workspace = true} -chrono = {workspace = true} +chrono = {workspace = true, features = ["serde"] } cometindex = {workspace = true} ethnum = {workspace = true} num-bigint = { version = "0.4" } @@ -42,6 +42,7 @@ prost = {workspace = true} tracing = {workspace = true} tokio = {workspace = true, features = ["full"]} serde_json = {workspace = true} +serde = { workspace = true, features = ["derive"] } sqlx = { workspace = true, features = ["chrono", "postgres"] } [dev-dependencies] diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 39fb3d8a88..d12bef946c 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -5,7 +5,6 @@ use cometindex::{ AppView, PgTransaction, }; use penumbra_sdk_asset::asset; -use penumbra_sdk_dex::lp::position::{Id as PositionId, Position}; use penumbra_sdk_dex::{ event::{ EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution, @@ -14,12 +13,16 @@ use penumbra_sdk_dex::{ lp::Reserves, DirectedTradingPair, SwapExecution, TradingPair, }; +use penumbra_sdk_dex::{ + event::{EventSwap, EventSwapClaim}, + lp::position::{Id as PositionId, Position}, +}; use penumbra_sdk_num::Amount; use penumbra_sdk_proto::event::EventDomainType; use penumbra_sdk_proto::DomainType; use penumbra_sdk_sct::event::EventBlockRoot; use sqlx::types::BigDecimal; -use sqlx::Row; +use sqlx::{prelude::Type, Row}; use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -668,6 +671,17 @@ struct PairMetrics { liquidity_change: f64, } +#[derive(Debug, Clone, Type)] +#[sqlx(type_name = "batch_swap_summary")] +struct BatchSwapSummary { + asset_start: Vec, + asset_end: Vec, + input: BigDecimal, + output: BigDecimal, + num_swaps: i32, + price_float: f64, +} + #[derive(Debug)] struct Events { time: Option, @@ -682,6 +696,8 @@ struct Events { position_closes: Vec, position_withdrawals: Vec, batch_swaps: Vec, + swaps: BTreeMap>, + swap_claims: BTreeMap>, // Track transaction hashes by position ID position_open_txs: BTreeMap, position_close_txs: BTreeMap, @@ -701,6 +717,8 @@ impl Events { position_closes: Vec::new(), position_withdrawals: Vec::new(), batch_swaps: Vec::new(), + swaps: BTreeMap::new(), + swap_claims: BTreeMap::new(), position_open_txs: BTreeMap::new(), position_close_txs: BTreeMap::new(), position_withdrawal_txs: BTreeMap::new(), @@ -859,6 +877,16 @@ impl Events { } } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) { out.batch_swaps.push(e); + } else if let Ok(e) = EventSwap::try_from_event(&event.event) { + out.swaps + .entry(e.trading_pair) + .or_insert_with(Vec::new) + .push(e); + } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) { + out.swap_claims + .entry(e.trading_pair) + .or_insert_with(Vec::new) + .push(e); } } Ok(out) @@ -1081,6 +1109,108 @@ impl Component { Ok(()) } + async fn record_block_summary( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + events: &Events, + ) -> anyhow::Result<()> { + let num_opened_lps = events.position_opens.len() as i32; + let num_closed_lps = events.position_closes.len() as i32; + let num_withdrawn_lps = events.position_withdrawals.len() as i32; + let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::() as i32; + let num_swap_claims = events + .swap_claims + .iter() + .map(|(_, v)| v.len()) + .sum::() as i32; + let num_txs = events.batch_swaps.len() as i32; + + let mut batch_swap_summaries = Vec::::new(); + + for event in &events.batch_swaps { + let trading_pair = event.batch_swap_output_data.trading_pair; + + if let Some(swap_1_2) = &event.swap_execution_1_for_2 { + let asset_start = swap_1_2.input.asset_id; + let asset_end = swap_1_2.output.asset_id; + let input = swap_1_2.input.amount; + let output = swap_1_2.output.amount; + let price_float = (output.value() as f64) / (input.value() as f64); + + let empty_vec = vec![]; + let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec); + let filtered_swaps: Vec<_> = swaps_for_pair + .iter() + .filter(|swap| swap.delta_1_i != Amount::zero()) + .collect::>(); + let num_swaps = filtered_swaps.len() as i32; + + batch_swap_summaries.push(BatchSwapSummary { + asset_start: asset_start.to_bytes().to_vec(), + asset_end: asset_end.to_bytes().to_vec(), + input: BigDecimal::from(input.value()), + output: BigDecimal::from(output.value()), + num_swaps, + price_float, + }); + } + + if let Some(swap_2_1) = &event.swap_execution_2_for_1 { + let asset_start = swap_2_1.input.asset_id; + let asset_end = swap_2_1.output.asset_id; + let input = swap_2_1.input.amount; + let output = swap_2_1.output.amount; + let price_float = (output.value() as f64) / (input.value() as f64); + + let empty_vec = vec![]; + let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec); + let filtered_swaps: Vec<_> = swaps_for_pair + .iter() + .filter(|swap| swap.delta_2_i != Amount::zero()) + .collect::>(); + let num_swaps = filtered_swaps.len() as i32; + + batch_swap_summaries.push(BatchSwapSummary { + asset_start: asset_start.to_bytes().to_vec(), + asset_end: asset_end.to_bytes().to_vec(), + input: BigDecimal::from(input.value()), + output: BigDecimal::from(output.value()), + num_swaps, + price_float, + }); + } + } + + sqlx::query( + "INSERT INTO dex_ex_block_summary ( + height, + time, + batch_swaps, + num_open_lps, + num_closed_lps, + num_withdrawn_lps, + num_swaps, + num_swap_claims, + num_txs + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", + ) + .bind(height) + .bind(time) + .bind(&batch_swap_summaries) + .bind(num_opened_lps) + .bind(num_closed_lps) + .bind(num_withdrawn_lps) + .bind(num_swaps) + .bind(num_swap_claims) + .bind(num_txs) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + async fn record_batch_swap_traces( &self, dbtx: &mut PgTransaction<'_>, @@ -1304,6 +1434,10 @@ impl AppView for Component { // Load any missing positions before processing events events.load_positions(dbtx).await?; + // This is where we are going to build the block summary for the DEX. + self.record_block_summary(dbtx, time, block.height as i32, &events) + .await?; + // Record batch swap execution traces. for event in &events.batch_swaps { self.record_batch_swap_traces(dbtx, time, block.height as i32, event) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 15122ab946..8a5615a912 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -209,6 +209,48 @@ CREATE INDEX ON dex_ex_batch_swap_traces (time, height); CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end); -- TODO(erwan): We can add a GIN index on the position id later. +-- A high-level view of a batch swap. +CREATE TYPE batch_swap_summary AS ( + -- The directed start asset of the batch swap. + asset_start BYTEA, + -- The directed end asset of the batch swap. + asset_end BYTEA, + -- The amount of asset was consumed by the batch swap. + input NUMERIC(39), + -- The amount of asset was produced by the batch swap. + output NUMERIC(39), + -- The number of swaps in the batch swap. + num_swaps INTEGER, + -- The price with `asset_end` as the quote asset. + price_float DOUBLE PRECISION +); + +-- A summary of block data with a bias for DEX data. +CREATE TABLE IF NOT EXISTS dex_ex_block_summary ( + -- Primary key + rowid SERIAL PRIMARY KEY, + -- The height of the block. + height INTEGER NOT NULL, + -- The timestamp for the block. + time TIMESTAMPTZ NOT NULL, + -- A list of batch swap summaries that occurred in this block. + batch_swaps batch_swap_summary[] NOT NULL, + -- The number of opened LPs in this block. + num_open_lps INTEGER NOT NULL, + -- The number of closed LPs in this block. + num_closed_lps INTEGER NOT NULL, + -- The number of withdrawn LPs in this block. + num_withdrawn_lps INTEGER NOT NULL, + -- The number of swaps in this block. + num_swaps INTEGER NOT NULL, + -- The number of swap claims in this block. + num_swap_claims INTEGER NOT NULL, + -- The number of transactions in this block. + num_txs INTEGER NOT NULL +); + +CREATE INDEX ON dex_ex_block_summary (time, height); + ALTER TABLE dex_ex_position_executions ADD CONSTRAINT fk_position_executions FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id);