From 8eca959aa9b596662411e41fea104d43d45a019b Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 6 Feb 2025 10:39:21 -0500 Subject: [PATCH 01/17] pindexer(dex_ex): add dex block summary table --- crates/bin/pindexer/src/dex_ex/schema.sql | 42 +++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 15122ab946..29c3c60455 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -209,6 +209,48 @@ CREATE INDEX ON dex_ex_batch_swap_traces (time, height); CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end); -- TODO(erwan): We can add a GIN index on the position id later. +CREATE TYPE batch_swap AS ( + -- The directed start asset of the batch swap. + asset_start BYTEA, + -- The directed end asset of the batch swap. + asset_end BYTEA, + -- The amount of asset was consumed by the batch swap. + input NUMERIC(39), + -- The amount of asset was produced by the batch swap. + output NUMERIC(39), + -- The number of swaps in the batch swap. + num_swaps INTEGER, + -- The price with `asset_end` as the quote asset. + price_float DOUBLE PRECISION NOT NULL +); + + +-- A summary of block data with a bias for DEX data. +CREATE TABLE IF NOT EXISTS block_summary ( + -- Primary key + rowid SERIAL PRIMARY KEY, + -- The height of the block. + height INTEGER PRIMARY KEY, + -- The timestamp for the block. + time TIMESTAMPTZ NOT NULL, + + batch_swaps batch_swap[] NOT NULL, + -- The number of opened LPs in this block. + num_open_lps INTEGER NOT NULL, + -- The number of closed LPs in this block. + num_closed_lps INTEGER NOT NULL, + -- The number of withdrawn LPs in this block. + num_withdrawn_lps INTEGER NOT NULL, + -- The number of swaps in this block. + num_swaps INTEGER NOT NULL, + -- The number of swap claims in this block. + num_swap_claims INTEGER NOT NULL, + -- The number of transactions in this block. + num_txs INTEGER NOT NULL +); + +CREATE INDEX ON dex_ex_batch_swap_traces (time, height); + ALTER TABLE dex_ex_position_executions ADD CONSTRAINT fk_position_executions FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id); From 93ac969d32fa38f44a772f0e761738fb4c4c1d24 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 6 Feb 2025 11:20:35 -0500 Subject: [PATCH 02/17] pindexer(dex_ex): basic versions without inserting --- crates/bin/pindexer/src/dex_ex/mod.rs | 97 ++++++++++++++++++++++- crates/bin/pindexer/src/dex_ex/schema.sql | 7 +- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 39fb3d8a88..a43fd6a1b5 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -5,7 +5,6 @@ use cometindex::{ AppView, PgTransaction, }; use penumbra_sdk_asset::asset; -use penumbra_sdk_dex::lp::position::{Id as PositionId, Position}; use penumbra_sdk_dex::{ event::{ EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution, @@ -14,6 +13,10 @@ use penumbra_sdk_dex::{ lp::Reserves, DirectedTradingPair, SwapExecution, TradingPair, }; +use penumbra_sdk_dex::{ + event::{EventSwap, EventSwapClaim}, + lp::position::{Id as PositionId, Position}, +}; use penumbra_sdk_num::Amount; use penumbra_sdk_proto::event::EventDomainType; use penumbra_sdk_proto::DomainType; @@ -682,6 +685,8 @@ struct Events { position_closes: Vec, position_withdrawals: Vec, batch_swaps: Vec, + swaps: BTreeMap>, + swap_claims: BTreeMap>, // Track transaction hashes by position ID position_open_txs: BTreeMap, position_close_txs: BTreeMap, @@ -701,6 +706,8 @@ impl Events { position_closes: Vec::new(), position_withdrawals: Vec::new(), batch_swaps: Vec::new(), + swaps: BTreeMap::new(), + swap_claims: BTreeMap::new(), position_open_txs: BTreeMap::new(), position_close_txs: BTreeMap::new(), position_withdrawal_txs: BTreeMap::new(), @@ -859,6 +866,16 @@ impl Events { } } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) { out.batch_swaps.push(e); + } else if let Ok(e) = EventSwap::try_from_event(&event.event) { + out.swaps + .entry(e.trading_pair) + .or_insert_with(Vec::new) + .push(e); + } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) { + out.swap_claims + .entry(e.trading_pair) + .or_insert_with(Vec::new) + .push(e); } } Ok(out) @@ -1081,6 +1098,80 @@ impl Component { Ok(()) } + async fn record_block_summary( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + events: &Events, + ) { + let num_opened_lps = events.position_opens.len(); + let num_closed_lps = events.position_closes.len(); + let num_withdrawn_lps = events.position_withdrawals.len(); + let num_swaps: usize = events.swaps.iter().map(|(_, v)| v.len()).sum::(); + let num_swap_claims = events + .swap_claims + .iter() + .map(|(_, v)| v.len()) + .sum::(); + + let mut batch_swap_summaries = vec![]; + + // Deal with the batch summaries: + for event in &events.batch_swaps { + let trading_pair = event.batch_swap_output_data.trading_pair; + + // TODO: refactor around a domain type for the sql schema. + if let Some(swap_1_2) = &event.swap_execution_1_for_2 { + let asset_start = swap_1_2.input.asset_id; + let asset_end = swap_1_2.output.asset_id; + let input = swap_1_2.input.amount; + let output = swap_1_2.output.amount; + let price_float = (output.value() as f64) / (input.value() as f64); + + let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or_default(); + let filtered_swaps = swaps_for_pair + .iter() + .filter(|swap| swap.delta_1_i != Amount::zero()) + .collect::>(); + let num_swaps = filtered_swaps.len(); + batch_swap_summaries.push(( + trading_pair, + asset_start, + asset_end, + input, + output, + price_float, + num_swaps, + )); + } + + if let Some(swap_2_1) = &event.swap_execution_2_for_1 { + let asset_start = swap_2_1.input.asset_id; + let asset_end = swap_2_1.output.asset_id; + let input = swap_2_1.input.amount; + let output = swap_2_1.output.amount; + let price_float = (output.value() as f64) / (input.value() as f64); + + let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&vec![]); + let filtered_swaps = swaps_for_pair + .iter() + .filter(|swap| swap.delta_2_i != Amount::zero()) + .collect::>(); + let num_swaps = filtered_swaps.len(); + batch_swap_summaries.push(( + trading_pair, + asset_start, + asset_end, + input, + output, + price_float, + num_swaps, + )); + } + } + } + async fn record_batch_swap_traces( &self, dbtx: &mut PgTransaction<'_>, @@ -1304,6 +1395,10 @@ impl AppView for Component { // Load any missing positions before processing events events.load_positions(dbtx).await?; + // This is where we are going to build the block summary for the DEX. + self.record_block_summary(dbtx, time, block.height as i32, &events) + .await; + // Record batch swap execution traces. for event in &events.batch_swaps { self.record_batch_swap_traces(dbtx, time, block.height as i32, event) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 29c3c60455..408ef11153 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -209,7 +209,8 @@ CREATE INDEX ON dex_ex_batch_swap_traces (time, height); CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end); -- TODO(erwan): We can add a GIN index on the position id later. -CREATE TYPE batch_swap AS ( +-- A high-level view of a batch swap. +CREATE TYPE batch_swap_summary AS ( -- The directed start asset of the batch swap. asset_start BYTEA, -- The directed end asset of the batch swap. @@ -233,8 +234,8 @@ CREATE TABLE IF NOT EXISTS block_summary ( height INTEGER PRIMARY KEY, -- The timestamp for the block. time TIMESTAMPTZ NOT NULL, - - batch_swaps batch_swap[] NOT NULL, + -- A list of batch swap summaries that occurred in this block. + batch_swaps batch_swap_summary[] NOT NULL, -- The number of opened LPs in this block. num_open_lps INTEGER NOT NULL, -- The number of closed LPs in this block. From bb5e9395bb1e92c4b9d55b368df05d5d14a8e02c Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Mon, 10 Feb 2025 21:18:42 +0400 Subject: [PATCH 03/17] Insert block_summary into pindexer --- crates/bin/pindexer/src/dex_ex/mod.rs | 58 ++++++++++++++++++++------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index a43fd6a1b5..32ee2c33c9 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -1104,16 +1104,17 @@ impl Component { time: DateTime, height: i32, events: &Events, - ) { - let num_opened_lps = events.position_opens.len(); - let num_closed_lps = events.position_closes.len(); - let num_withdrawn_lps = events.position_withdrawals.len(); - let num_swaps: usize = events.swaps.iter().map(|(_, v)| v.len()).sum::(); + ) -> anyhow::Result<()> { + let num_opened_lps = events.position_opens.len() as i32; + let num_closed_lps = events.position_closes.len() as i32; + let num_withdrawn_lps = events.position_withdrawals.len() as i32; + let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::() as i32; let num_swap_claims = events .swap_claims .iter() .map(|(_, v)| v.len()) - .sum::(); + .sum::() as i32; + let num_txs = events.batch_swaps.len() as i32; let mut batch_swap_summaries = vec![]; @@ -1129,20 +1130,20 @@ impl Component { let output = swap_1_2.output.amount; let price_float = (output.value() as f64) / (input.value() as f64); - let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or_default(); - let filtered_swaps = swaps_for_pair + let empty_vec = vec![]; + let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec); + let filtered_swaps: Vec<_> = swaps_for_pair .iter() .filter(|swap| swap.delta_1_i != Amount::zero()) .collect::>(); let num_swaps = filtered_swaps.len(); batch_swap_summaries.push(( - trading_pair, asset_start, asset_end, input, output, - price_float, num_swaps, + price_float, )); } @@ -1153,23 +1154,50 @@ impl Component { let output = swap_2_1.output.amount; let price_float = (output.value() as f64) / (input.value() as f64); - let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&vec![]); - let filtered_swaps = swaps_for_pair + let empty_vec = vec![]; + let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec); + let filtered_swaps: Vec<_> = swaps_for_pair .iter() .filter(|swap| swap.delta_2_i != Amount::zero()) .collect::>(); let num_swaps = filtered_swaps.len(); batch_swap_summaries.push(( - trading_pair, asset_start, asset_end, input, output, - price_float, num_swaps, + price_float, )); } } + + sqlx::query( + "INSERT INTO dex_ex_block_summary ( + height, + time, + batch_swaps, + num_open_lps, + num_closed_lps, + num_withdrawn_lps, + num_swaps, + num_swap_claims + num_txs + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", + ) + .bind(height) + .bind(time) + .bind(serde_json::to_value(&batch_swap_summaries).unwrap()) + .bind(num_opened_lps) + .bind(num_closed_lps) + .bind(num_withdrawn_lps) + .bind(num_swaps) + .bind(num_swap_claims) + .bind(num_txs) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) } async fn record_batch_swap_traces( @@ -1397,7 +1425,7 @@ impl AppView for Component { // This is where we are going to build the block summary for the DEX. self.record_block_summary(dbtx, time, block.height as i32, &events) - .await; + .await?; // Record batch swap execution traces. for event in &events.batch_swaps { From f63499c8c0bc2aa0706d54e525a3c8a40b89e57e Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 17:30:47 +0400 Subject: [PATCH 04/17] Add missing comma --- crates/bin/pindexer/src/dex_ex/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 32ee2c33c9..300c3d3263 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -1181,7 +1181,7 @@ impl Component { num_closed_lps, num_withdrawn_lps, num_swaps, - num_swap_claims + num_swap_claims, num_txs ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", ) From 62b19c4ccc21b21e9789cd715b4d512fc147f2ff Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 18:13:45 +0400 Subject: [PATCH 05/17] Change height table props --- crates/bin/pindexer/src/dex_ex/schema.sql | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 408ef11153..107af4d08a 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -225,13 +225,12 @@ CREATE TYPE batch_swap_summary AS ( price_float DOUBLE PRECISION NOT NULL ); - -- A summary of block data with a bias for DEX data. CREATE TABLE IF NOT EXISTS block_summary ( -- Primary key rowid SERIAL PRIMARY KEY, -- The height of the block. - height INTEGER PRIMARY KEY, + height INTEGER UNIQUE NOT NULL, -- The timestamp for the block. time TIMESTAMPTZ NOT NULL, -- A list of batch swap summaries that occurred in this block. From bea4ceb0b19fc2f7ab34cb0e962e4ada42d14d3e Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 18:16:08 +0400 Subject: [PATCH 06/17] Fix syntax error for schema type --- crates/bin/pindexer/src/dex_ex/schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 107af4d08a..94cf7559bd 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -222,7 +222,7 @@ CREATE TYPE batch_swap_summary AS ( -- The number of swaps in the batch swap. num_swaps INTEGER, -- The price with `asset_end` as the quote asset. - price_float DOUBLE PRECISION NOT NULL + price_float DOUBLE PRECISION ); -- A summary of block data with a bias for DEX data. From e0bc3ffec72ee82da7baccb54cef78cd9ea99d49 Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 19:10:50 +0400 Subject: [PATCH 07/17] Add BatchSwapSummary type --- Cargo.lock | 1 + crates/bin/pindexer/Cargo.toml | 3 +- crates/bin/pindexer/src/dex_ex/mod.rs | 51 ++++++++++++++++++++++----- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b8d657bda..95442babfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6115,6 +6115,7 @@ dependencies = [ "prost-reflect", "reqwest 0.12.9", "rstest", + "serde", "serde_json", "sqlx", "tokio", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index f350ab6f25..c8d5af4f00 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -21,7 +21,7 @@ network-integration = [] [dependencies] anyhow = {workspace = true} clap = {workspace = true} -chrono = {workspace = true} +chrono = {workspace = true, features = ["serde"] } cometindex = {workspace = true} ethnum = {workspace = true} num-bigint = { version = "0.4" } @@ -42,6 +42,7 @@ prost = {workspace = true} tracing = {workspace = true} tokio = {workspace = true, features = ["full"]} serde_json = {workspace = true} +serde = { workspace = true, features = ["derive"] } sqlx = { workspace = true, features = ["chrono", "postgres"] } [dev-dependencies] diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 300c3d3263..ce4fed322b 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -24,6 +24,7 @@ use penumbra_sdk_sct::event::EventBlockRoot; use sqlx::types::BigDecimal; use sqlx::Row; use std::collections::{BTreeMap, HashMap, HashSet}; +use serde::{Deserialize, Serialize}; type DateTime = sqlx::types::chrono::DateTime; @@ -671,6 +672,37 @@ struct PairMetrics { liquidity_change: f64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct BatchSwapSummary { + #[serde(serialize_with = "serialize_asset_id")] + asset_start: penumbra_sdk_asset::asset::Id, + #[serde(serialize_with = "serialize_asset_id")] + asset_end: penumbra_sdk_asset::asset::Id, + #[serde(serialize_with = "serialize_amount")] + input: Amount, + #[serde(serialize_with = "serialize_amount")] + output: Amount, + num_swaps: i32, + price_float: f64, +} + +// Add these serialization helper functions +fn serialize_asset_id(asset_id: &penumbra_sdk_asset::asset::Id, serializer: S) -> Result +where + S: serde::Serializer, +{ + // Serialize the asset ID as bytes + serializer.serialize_bytes(asset_id.to_bytes().as_ref()) +} + +fn serialize_amount(amount: &Amount, serializer: S) -> Result +where + S: serde::Serializer, +{ + // Serialize the amount as a string to preserve precision + serializer.serialize_str(&amount.value().to_string()) +} + #[derive(Debug)] struct Events { time: Option, @@ -1116,13 +1148,12 @@ impl Component { .sum::() as i32; let num_txs = events.batch_swaps.len() as i32; - let mut batch_swap_summaries = vec![]; + let mut batch_swap_summaries = Vec::::new(); // Deal with the batch summaries: for event in &events.batch_swaps { let trading_pair = event.batch_swap_output_data.trading_pair; - // TODO: refactor around a domain type for the sql schema. if let Some(swap_1_2) = &event.swap_execution_1_for_2 { let asset_start = swap_1_2.input.asset_id; let asset_end = swap_1_2.output.asset_id; @@ -1136,15 +1167,16 @@ impl Component { .iter() .filter(|swap| swap.delta_1_i != Amount::zero()) .collect::>(); - let num_swaps = filtered_swaps.len(); - batch_swap_summaries.push(( + let num_swaps = filtered_swaps.len() as i32; + + batch_swap_summaries.push(BatchSwapSummary { asset_start, asset_end, input, output, num_swaps, price_float, - )); + }); } if let Some(swap_2_1) = &event.swap_execution_2_for_1 { @@ -1160,15 +1192,16 @@ impl Component { .iter() .filter(|swap| swap.delta_2_i != Amount::zero()) .collect::>(); - let num_swaps = filtered_swaps.len(); - batch_swap_summaries.push(( + let num_swaps = filtered_swaps.len() as i32; + + batch_swap_summaries.push(BatchSwapSummary { asset_start, asset_end, input, output, num_swaps, price_float, - )); + }); } } @@ -1187,7 +1220,7 @@ impl Component { ) .bind(height) .bind(time) - .bind(serde_json::to_value(&batch_swap_summaries).unwrap()) + .bind(serde_json::to_value(&batch_swap_summaries)?) .bind(num_opened_lps) .bind(num_closed_lps) .bind(num_withdrawn_lps) From f5bbf9de37118a49d0b638b2e65e90cc6b1899f4 Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 19:19:49 +0400 Subject: [PATCH 08/17] Fix lint error --- crates/bin/pindexer/src/dex_ex/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index ce4fed322b..fe22e2c83d 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -687,7 +687,10 @@ struct BatchSwapSummary { } // Add these serialization helper functions -fn serialize_asset_id(asset_id: &penumbra_sdk_asset::asset::Id, serializer: S) -> Result +fn serialize_asset_id( + asset_id: &penumbra_sdk_asset::asset::Id, + serializer: S, +) -> Result where S: serde::Serializer, { From 6468ef2b9c9aaffa08ea26e39e04d01f54d65bc2 Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 19:22:33 +0400 Subject: [PATCH 09/17] Fix lint error --- crates/bin/pindexer/src/dex_ex/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index fe22e2c83d..976f56ea68 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -21,10 +21,10 @@ use penumbra_sdk_num::Amount; use penumbra_sdk_proto::event::EventDomainType; use penumbra_sdk_proto::DomainType; use penumbra_sdk_sct::event::EventBlockRoot; +use serde::{Deserialize, Serialize}; use sqlx::types::BigDecimal; use sqlx::Row; use std::collections::{BTreeMap, HashMap, HashSet}; -use serde::{Deserialize, Serialize}; type DateTime = sqlx::types::chrono::DateTime; From 5ded81f6ae686e6ec62aef6592541976ec2bde71 Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 20:07:31 +0400 Subject: [PATCH 10/17] Fix table name --- crates/bin/pindexer/src/dex_ex/schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 94cf7559bd..08b95941fc 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -226,7 +226,7 @@ CREATE TYPE batch_swap_summary AS ( ); -- A summary of block data with a bias for DEX data. -CREATE TABLE IF NOT EXISTS block_summary ( +CREATE TABLE IF NOT EXISTS dex_ex_block_summary ( -- Primary key rowid SERIAL PRIMARY KEY, -- The height of the block. From a5ed522d6ec821c630f31d08c865b91e8953a85c Mon Sep 17 00:00:00 2001 From: "Jason M. Hasperhoven" Date: Tue, 11 Feb 2025 20:08:23 +0400 Subject: [PATCH 11/17] Fix index on dex_ex_block_summary --- crates/bin/pindexer/src/dex_ex/schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 08b95941fc..e6f7594dfc 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -249,7 +249,7 @@ CREATE TABLE IF NOT EXISTS dex_ex_block_summary ( num_txs INTEGER NOT NULL ); -CREATE INDEX ON dex_ex_batch_swap_traces (time, height); +CREATE INDEX ON dex_ex_block_summary (time, height); ALTER TABLE dex_ex_position_executions ADD CONSTRAINT fk_position_executions From deb055b987582ee99a97929a6dac52bc4b794d46 Mon Sep 17 00:00:00 2001 From: Conor Schaefer Date: Tue, 11 Feb 2025 12:26:07 -0800 Subject: [PATCH 12/17] partial: impl sqlx::Encode for BatchSwapSummary --- crates/bin/pindexer/src/dex_ex/mod.rs | 46 ++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 976f56ea68..0aa1689ac7 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -24,6 +24,11 @@ use penumbra_sdk_sct::event::EventBlockRoot; use serde::{Deserialize, Serialize}; use sqlx::types::BigDecimal; use sqlx::Row; +use sqlx::{encode::IsNull, postgres::PgArgumentBuffer, Encode, Postgres}; + +use sqlx::postgres::PgHasArrayType; +use sqlx::postgres::PgTypeInfo; +// use sqlx::types::ArrayType; use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -706,6 +711,45 @@ where serializer.serialize_str(&amount.value().to_string()) } +/// Ensure that the domain type [BatchSwapSummary] can be stored +/// directly in Postgres as a custom type. +/// +/// The per-field encodings must match what's declared in the `CREATE TYPE batch_swap_summary` +/// statement in the SQL schema. +impl Encode<'_, Postgres> for BatchSwapSummary { + fn encode_by_ref( + &self, + buf: &mut PgArgumentBuffer, + ) -> Result> { + // Convert asset_start and asset_end to bytes + let asset_start_bytes = self.asset_start.to_bytes(); // or however you get bytes from asset::Id + let asset_end_bytes = self.asset_end.to_bytes(); + + // Convert Amount to Decimal/Numeric + let input_decimal = BigDecimal::from(self.input.value()); + let output_decimal = BigDecimal::from(self.output.value()); + + // Encode each field + asset_start_bytes.encode_by_ref(buf); + asset_end_bytes.encode_by_ref(buf); + input_decimal.encode_by_ref(buf); + output_decimal.encode_by_ref(buf); + + // Encode the primitive types directly + >::encode(self.num_swaps, buf); + >::encode(self.price_float, buf); + + Ok(IsNull::No) + } +} + +/// Make sure that we can reference arrays of the [BatchSwapSummary] types. +impl PgHasArrayType for BatchSwapSummary { + fn array_type_info() -> PgTypeInfo { + PgTypeInfo::with_name("_batch_swap_summary") // Note the underscore prefix for array type + } +} + #[derive(Debug)] struct Events { time: Option, @@ -1223,7 +1267,7 @@ impl Component { ) .bind(height) .bind(time) - .bind(serde_json::to_value(&batch_swap_summaries)?) + .bind(&batch_swap_summaries) .bind(num_opened_lps) .bind(num_closed_lps) .bind(num_withdrawn_lps) From ce6f058a9c5e0a3ed85aac37cb887f587e37c6d9 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Tue, 11 Feb 2025 17:30:04 -0500 Subject: [PATCH 13/17] Revert "partial: impl sqlx::Encode for BatchSwapSummary" This reverts commit deb055b987582ee99a97929a6dac52bc4b794d46. --- crates/bin/pindexer/src/dex_ex/mod.rs | 46 +-------------------------- 1 file changed, 1 insertion(+), 45 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 0aa1689ac7..976f56ea68 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -24,11 +24,6 @@ use penumbra_sdk_sct::event::EventBlockRoot; use serde::{Deserialize, Serialize}; use sqlx::types::BigDecimal; use sqlx::Row; -use sqlx::{encode::IsNull, postgres::PgArgumentBuffer, Encode, Postgres}; - -use sqlx::postgres::PgHasArrayType; -use sqlx::postgres::PgTypeInfo; -// use sqlx::types::ArrayType; use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -711,45 +706,6 @@ where serializer.serialize_str(&amount.value().to_string()) } -/// Ensure that the domain type [BatchSwapSummary] can be stored -/// directly in Postgres as a custom type. -/// -/// The per-field encodings must match what's declared in the `CREATE TYPE batch_swap_summary` -/// statement in the SQL schema. -impl Encode<'_, Postgres> for BatchSwapSummary { - fn encode_by_ref( - &self, - buf: &mut PgArgumentBuffer, - ) -> Result> { - // Convert asset_start and asset_end to bytes - let asset_start_bytes = self.asset_start.to_bytes(); // or however you get bytes from asset::Id - let asset_end_bytes = self.asset_end.to_bytes(); - - // Convert Amount to Decimal/Numeric - let input_decimal = BigDecimal::from(self.input.value()); - let output_decimal = BigDecimal::from(self.output.value()); - - // Encode each field - asset_start_bytes.encode_by_ref(buf); - asset_end_bytes.encode_by_ref(buf); - input_decimal.encode_by_ref(buf); - output_decimal.encode_by_ref(buf); - - // Encode the primitive types directly - >::encode(self.num_swaps, buf); - >::encode(self.price_float, buf); - - Ok(IsNull::No) - } -} - -/// Make sure that we can reference arrays of the [BatchSwapSummary] types. -impl PgHasArrayType for BatchSwapSummary { - fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("_batch_swap_summary") // Note the underscore prefix for array type - } -} - #[derive(Debug)] struct Events { time: Option, @@ -1267,7 +1223,7 @@ impl Component { ) .bind(height) .bind(time) - .bind(&batch_swap_summaries) + .bind(serde_json::to_value(&batch_swap_summaries)?) .bind(num_opened_lps) .bind(num_closed_lps) .bind(num_withdrawn_lps) From 9ac0e3122184cdfa39bceda62cb3aca35379fd16 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Tue, 11 Feb 2025 17:36:15 -0500 Subject: [PATCH 14/17] pindexer: use a composite type, derive `Encode` --- crates/bin/pindexer/src/dex_ex/mod.rs | 57 ++++++++------------------- 1 file changed, 16 insertions(+), 41 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 976f56ea68..415da0cc31 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -21,9 +21,8 @@ use penumbra_sdk_num::Amount; use penumbra_sdk_proto::event::EventDomainType; use penumbra_sdk_proto::DomainType; use penumbra_sdk_sct::event::EventBlockRoot; -use serde::{Deserialize, Serialize}; -use sqlx::types::BigDecimal; -use sqlx::Row; +use sqlx::{prelude::Type, Row}; +use sqlx::{types::BigDecimal}; use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -672,40 +671,16 @@ struct PairMetrics { liquidity_change: f64, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Type)] struct BatchSwapSummary { - #[serde(serialize_with = "serialize_asset_id")] - asset_start: penumbra_sdk_asset::asset::Id, - #[serde(serialize_with = "serialize_asset_id")] - asset_end: penumbra_sdk_asset::asset::Id, - #[serde(serialize_with = "serialize_amount")] - input: Amount, - #[serde(serialize_with = "serialize_amount")] - output: Amount, + asset_start: Vec, + asset_end: Vec, + input: BigDecimal, + output: BigDecimal, num_swaps: i32, price_float: f64, } -// Add these serialization helper functions -fn serialize_asset_id( - asset_id: &penumbra_sdk_asset::asset::Id, - serializer: S, -) -> Result -where - S: serde::Serializer, -{ - // Serialize the asset ID as bytes - serializer.serialize_bytes(asset_id.to_bytes().as_ref()) -} - -fn serialize_amount(amount: &Amount, serializer: S) -> Result -where - S: serde::Serializer, -{ - // Serialize the amount as a string to preserve precision - serializer.serialize_str(&amount.value().to_string()) -} - #[derive(Debug)] struct Events { time: Option, @@ -1173,10 +1148,10 @@ impl Component { let num_swaps = filtered_swaps.len() as i32; batch_swap_summaries.push(BatchSwapSummary { - asset_start, - asset_end, - input, - output, + asset_start: asset_start.to_bytes().to_vec(), + asset_end: asset_end.to_bytes().to_vec(), + input: BigDecimal::from(input.value()), + output: BigDecimal::from(output.value()), num_swaps, price_float, }); @@ -1198,10 +1173,10 @@ impl Component { let num_swaps = filtered_swaps.len() as i32; batch_swap_summaries.push(BatchSwapSummary { - asset_start, - asset_end, - input, - output, + asset_start: asset_start.to_bytes().to_vec(), + asset_end: asset_end.to_bytes().to_vec(), + input: BigDecimal::from(input.value()), + output: BigDecimal::from(output.value()), num_swaps, price_float, }); @@ -1223,7 +1198,7 @@ impl Component { ) .bind(height) .bind(time) - .bind(serde_json::to_value(&batch_swap_summaries)?) + .bind(&batch_swap_summaries) .bind(num_opened_lps) .bind(num_closed_lps) .bind(num_withdrawn_lps) From b0193d4d5fb0254b396b89340addb5482f45ce8e Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Tue, 11 Feb 2025 22:17:15 -0500 Subject: [PATCH 15/17] penumbra: run `fmt` --- crates/bin/pindexer/src/dex_ex/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 415da0cc31..571b236ef3 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -21,8 +21,8 @@ use penumbra_sdk_num::Amount; use penumbra_sdk_proto::event::EventDomainType; use penumbra_sdk_proto::DomainType; use penumbra_sdk_sct::event::EventBlockRoot; +use sqlx::types::BigDecimal; use sqlx::{prelude::Type, Row}; -use sqlx::{types::BigDecimal}; use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -1128,7 +1128,6 @@ impl Component { let mut batch_swap_summaries = Vec::::new(); - // Deal with the batch summaries: for event in &events.batch_swaps { let trading_pair = event.batch_swap_output_data.trading_pair; From a6af04dfcc1150fb0a8adc655eaa5970a6b55b9c Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Tue, 11 Feb 2025 22:38:44 -0500 Subject: [PATCH 16/17] pindexer: `batch_swap_summary` --- crates/bin/pindexer/src/dex_ex/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 571b236ef3..d12bef946c 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -672,6 +672,7 @@ struct PairMetrics { } #[derive(Debug, Clone, Type)] +#[sqlx(type_name = "batch_swap_summary")] struct BatchSwapSummary { asset_start: Vec, asset_end: Vec, From 071d53f3a313adc0e1a4fb18916cd2744a92991b Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Tue, 11 Feb 2025 23:34:17 -0500 Subject: [PATCH 17/17] pindexer(dex_ex): remove constraint on height for now --- crates/bin/pindexer/src/dex_ex/schema.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index e6f7594dfc..8a5615a912 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -230,7 +230,7 @@ CREATE TABLE IF NOT EXISTS dex_ex_block_summary ( -- Primary key rowid SERIAL PRIMARY KEY, -- The height of the block. - height INTEGER UNIQUE NOT NULL, + height INTEGER NOT NULL, -- The timestamp for the block. time TIMESTAMPTZ NOT NULL, -- A list of batch swap summaries that occurred in this block.