diff --git a/Cargo.lock b/Cargo.lock index 367c0504aa..4681e50097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5815,6 +5815,7 @@ name = "pindexer" version = "0.80.6" dependencies = [ "anyhow", + "chrono", "clap", "cometindex", "num-bigint", @@ -5827,8 +5828,10 @@ dependencies = [ "penumbra-keys", "penumbra-num", "penumbra-proto", + "penumbra-sct", "penumbra-shielded-pool", "penumbra-stake", + "prost", "serde_json", "sqlx", "tokio", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index 88e062b8ca..610d43e780 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -13,6 +13,7 @@ publish = false [dependencies] anyhow = {workspace = true} clap = {workspace = true} +chrono = {workspace = true} cometindex = {workspace = true} num-bigint = { version = "0.4" } penumbra-shielded-pool = {workspace = true, default-features = false} @@ -26,6 +27,8 @@ penumbra-governance = {workspace = true, default-features = false} penumbra-num = {workspace = true, default-features = false} penumbra-asset = {workspace = true, default-features = false} penumbra-proto = {workspace = true, default-features = false} +penumbra-sct = {workspace = true, default-features = false} +prost = {workspace = true} tracing = {workspace = true} tokio = {workspace = true, features = ["full"]} serde_json = {workspace = true} diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs new file mode 100644 index 0000000000..ec3882d0c9 --- /dev/null +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -0,0 +1,510 @@ +use std::fmt::Display; + +use anyhow::{anyhow, Context}; +use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc}; +use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use penumbra_asset::asset; +use penumbra_dex::{event::EventCandlestickData, CandlestickData}; +use penumbra_proto::{event::EventDomainType, DomainType}; +use penumbra_sct::event::EventBlockRoot; +use prost::Name as _; +use sqlx::PgPool; + +type DateTime = sqlx::types::chrono::DateTime; + +/// Candlestick data, unmoored from the prison of a particular block height. +/// +/// In other words, this can represent candlesticks which span arbitrary windows, +/// and not just a single block. +#[derive(Debug, Clone, Copy)] +struct Candle { + open: f64, + close: f64, + low: f64, + high: f64, + direct_volume: f64, + swap_volume: f64, +} + +impl Candle { + fn from_candlestick_data(data: &CandlestickData) -> Self { + Self { + open: data.open, + close: data.close, + low: data.low, + high: data.high, + direct_volume: data.direct_volume, + swap_volume: data.swap_volume, + } + } + + fn merge(&self, that: &Self) -> Self { + Self { + open: self.open, + close: that.close, + low: self.low.min(that.low), + high: self.high.max(that.high), + direct_volume: self.direct_volume + that.direct_volume, + swap_volume: self.swap_volume + that.swap_volume, + } + } +} + +impl From for Candle { + fn from(value: CandlestickData) -> Self { + Self::from(&value) + } +} + +impl From<&CandlestickData> for Candle { + fn from(value: &CandlestickData) -> Self { + Self::from_candlestick_data(value) + } +} + +#[derive(Clone, Copy, Debug)] +enum Window { + W1m, + W15m, + W1h, + W4h, + W1d, + W1w, + W1mo, +} + +impl Window { + fn all() -> impl Iterator { + [ + Window::W1m, + Window::W15m, + Window::W1h, + Window::W4h, + Window::W1d, + Window::W1w, + Window::W1mo, + ] + .into_iter() + } + + /// Get the anchor for a given time. + /// + /// This is the latest time that "snaps" to a given anchor, dependent on the window. + /// + /// For example, the 1 minute window has an anchor every minute, the day window + /// every day, etc. + fn anchor(&self, time: DateTime) -> DateTime { + let (y, mo, d, h, m) = ( + time.year(), + time.month(), + time.day(), + time.hour(), + time.minute(), + ); + let out = match self { + Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(), + Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(), + Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(), + Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(), + Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(), + Window::W1w => Utc + .with_ymd_and_hms(y, mo, d, 0, 0, 0) + .single() + .and_then(|x| { + x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into())) + }), + Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(), + }; + out.unwrap() + } +} + +impl Display for Window { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Window::*; + let str = match self { + W1m => "1m", + W15m => "15m", + W1h => "1h", + W4h => "4h", + W1d => "1d", + W1w => "1w", + W1mo => "1mo", + }; + write!(f, "{}", str) + } +} + +mod price_chart { + use super::*; + + /// A context when processing a price chart. + #[derive(Debug)] + pub struct Context<'tx, 'db> { + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + window: Window, + } + + impl<'tx, 'db> Context<'tx, 'db> { + pub fn new( + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + window: Window, + ) -> Self { + Self { + dbtx, + asset_start, + asset_end, + window, + } + } + + /// Get the candle we should update, based on the current timestamp. + async fn relevant_candle( + &mut self, + anchor: DateTime, + ) -> anyhow::Result> { + let stuff: Option<(i32, f64, f64, f64, f64, f64, f64)> = sqlx::query_as( + r#" + SELECT + dex_ex_candlesticks.id, + open, + close, + high, + low, + direct_volume, + swap_volume + FROM dex_ex_price_charts + JOIN dex_ex_candlesticks ON dex_ex_candlesticks.id = candlestick_id + WHERE asset_start = $1 + AND asset_end = $2 + AND the_window = $3 + AND start_time >= $4 + "#, + ) + .bind(self.asset_start.to_bytes().as_slice()) + .bind(self.asset_end.to_bytes().as_slice()) + .bind(self.window.to_string()) + .bind(anchor) + .fetch_optional(self.dbtx.as_mut()) + .await?; + Ok( + stuff.map(|(id, open, close, high, low, direct_volume, swap_volume)| { + ( + id, + Candle { + open, + close, + high, + low, + direct_volume, + swap_volume, + }, + ) + }), + ) + } + + async fn create_candle(&mut self, anchor: DateTime, candle: Candle) -> anyhow::Result<()> { + let id: i32 = sqlx::query_scalar( + r#" + INSERT INTO dex_ex_candlesticks VALUES (DEFAULT, $1, $2, $3, $4, $5, $6) RETURNING id + "#, + ) + .bind(candle.open) + .bind(candle.close) + .bind(candle.high) + .bind(candle.low) + .bind(candle.direct_volume) + .bind(candle.swap_volume) + .fetch_one(self.dbtx.as_mut()) + .await?; + sqlx::query( + r#" + INSERT INTO dex_ex_price_charts VALUES (DEFAULT, $1, $2, $3, $4, $5) + "#, + ) + .bind(self.asset_start.to_bytes().as_slice()) + .bind(self.asset_end.to_bytes().as_slice()) + .bind(self.window.to_string()) + .bind(anchor) + .bind(id) + .execute(self.dbtx.as_mut()) + .await?; + Ok(()) + } + + async fn update_candle(&mut self, id: i32, candle: Candle) -> anyhow::Result<()> { + sqlx::query( + r#" + UPDATE dex_ex_candlesticks + SET (open, close, high, low, direct_volume, swap_volume) = + ($1, $2, $3, $4, $5, $6) + WHERE id = $7 + "#, + ) + .bind(candle.open) + .bind(candle.close) + .bind(candle.high) + .bind(candle.low) + .bind(candle.direct_volume) + .bind(candle.swap_volume) + .bind(id) + .execute(self.dbtx.as_mut()) + .await?; + Ok(()) + } + + pub async fn update(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { + let anchor = self.window.anchor(time); + match self.relevant_candle(anchor).await? { + None => self.create_candle(anchor, candle).await?, + Some((id, old_candle)) => self.update_candle(id, old_candle.merge(&candle)).await?, + }; + Ok(()) + } + } +} + +use price_chart::Context as PriceChartContext; + +mod summary { + use super::*; + + #[derive(Debug)] + pub struct Context<'tx, 'db> { + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + } + + impl<'tx, 'db> Context<'tx, 'db> { + pub fn new( + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + ) -> Self { + Self { + dbtx, + asset_start, + asset_end, + } + } + + pub async fn add_candle(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { + let asset_start = self.asset_start.to_bytes(); + let asset_end = self.asset_end.to_bytes(); + sqlx::query( + r#" + INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6) + "#, + ) + .bind(asset_start.as_slice()) + .bind(asset_end.as_slice()) + .bind(time) + .bind(candle.close) + .bind(candle.direct_volume) + .bind(candle.swap_volume) + .execute(self.dbtx.as_mut()) + .await?; + Ok(()) + } + } + + pub async fn update_all(dbtx: &mut PgTransaction<'_>, time: DateTime) -> anyhow::Result<()> { + let time_24h_ago = time + .checked_sub_days(Days::new(1)) + .ok_or(anyhow!("should be able to get time 24h ago from {}", time))?; + sqlx::query( + r#" + DELETE FROM _dex_ex_summary_backing WHERE time < $1 + "#, + ) + .bind(time_24h_ago) + .execute(dbtx.as_mut()) + .await?; + // Update all of the summaries with relevant backing data. + // + // We choose this one as being responsible for creating the first summary. + sqlx::query( + r#" + INSERT INTO dex_ex_summary + SELECT DISTINCT ON (asset_start, asset_end) + asset_start, + asset_end, + FIRST_VALUE(price) OVER w AS price_24h_ago, + price AS current_price, + MAX(price) OVER w AS high_24h, + MIN(price) OVER w AS low_24h, + SUM(direct_volume) OVER w AS direct_volume_24h, + SUM(swap_volume) OVER w AS swap_volume_24h + FROM _dex_ex_summary_backing + WINDOW w AS ( + PARTITION BY + asset_start, asset_end + ORDER BY asset_start, asset_end, time DESC + ) ORDER by asset_start, asset_end, time ASC + ON CONFLICT (asset_start, asset_end) DO UPDATE SET + price_24h_ago = EXCLUDED.price_24h_ago, + current_price = EXCLUDED.current_price, + high_24h = EXCLUDED.high_24h, + low_24h = EXCLUDED.low_24h, + direct_volume_24h = EXCLUDED.direct_volume_24h, + swap_volume_24h = EXCLUDED.swap_volume_24h + "#, + ) + .execute(dbtx.as_mut()) + .await?; + // When we don't have backing data, we should nonetheless update to reflect this + sqlx::query( + r#" + UPDATE dex_ex_summary + SET + price_24h_ago = current_price, + high_24h = current_price, + low_24h = current_price, + direct_volume_24h = 0, + swap_volume_24h = 0 + WHERE NOT EXISTS ( + SELECT 1 + FROM _dex_ex_summary_backing + WHERE + _dex_ex_summary_backing.asset_start = dex_ex_summary.asset_start + AND + _dex_ex_summary_backing.asset_end = dex_ex_summary.asset_end + ) + "#, + ) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } +} + +use summary::Context as SummaryContext; + +async fn queue_event_candlestick_data( + dbtx: &mut PgTransaction<'_>, + height: u64, + event: EventCandlestickData, +) -> anyhow::Result<()> { + sqlx::query("INSERT INTO _dex_ex_queue VALUES (DEFAULT, $1, $2)") + .bind(i64::try_from(height)?) + .bind(event.encode_to_vec().as_slice()) + .execute(dbtx.as_mut()) + .await?; + Ok(()) +} + +async fn unqueue_event_candlestick_data( + dbtx: &mut PgTransaction<'_>, + height: u64, +) -> anyhow::Result> { + let values: Vec> = + sqlx::query_scalar("DELETE FROM _dex_ex_queue WHERE height = $1 RETURNING data") + .bind(i64::try_from(height)?) + .fetch_all(dbtx.as_mut()) + .await?; + values + .into_iter() + .map(|x| EventCandlestickData::decode(x.as_slice())) + .collect() +} + +async fn on_event_candlestick_data( + dbtx: &mut PgTransaction<'_>, + event_time: DateTime, + event: EventCandlestickData, +) -> anyhow::Result<()> { + let asset_start = event.pair.start; + let asset_end = event.pair.end; + let candle = event.stick.into(); + for window in Window::all() { + let mut ctx = PriceChartContext::new(dbtx, asset_start, asset_end, window); + ctx.update(event_time, candle).await?; + } + let mut ctx = SummaryContext::new(dbtx, asset_start, asset_end); + ctx.add_candle(event_time, candle).await?; + Ok(()) +} + +async fn fetch_height_time( + dbtx: &mut PgTransaction<'_>, + height: u64, +) -> anyhow::Result> { + const CTX: &'static str = r#" +The `dex_ex` component relies on the `block` component to be running, to provide the `block_details` with timestamps. +Make sure that is running as well. +"#; + sqlx::query_scalar("SELECT timestamp FROM block_details WHERE height = $1") + .bind(i64::try_from(height)?) + .fetch_optional(dbtx.as_mut()) + .await + .context(CTX) +} + +#[derive(Debug)] +pub struct Component {} + +impl Component { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + [ + ::Proto::full_name(), + ::Proto::full_name(), + ] + .into_iter() + .any(|x| type_str == x) + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + _src_db: &PgPool, + ) -> Result<(), anyhow::Error> { + if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { + let height = event.block_height; + match fetch_height_time(dbtx, height).await? { + None => { + queue_event_candlestick_data(dbtx, height, e).await?; + } + Some(time) => { + on_event_candlestick_data(dbtx, time, e).await?; + } + } + } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) { + let height = e.height; + let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!( + "creating timestamp should succeed; timestamp: {}", + e.timestamp_seconds + ))?; + for event in unqueue_event_candlestick_data(dbtx, height).await? { + on_event_candlestick_data(dbtx, time, event).await?; + } + summary::update_all(dbtx, time).await?; + } + tracing::debug!(?event, "unrecognized event"); + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql new file mode 100644 index 0000000000..53f85ceb8a --- /dev/null +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -0,0 +1,73 @@ +CREATE TABLE IF NOT EXISTS dex_ex_candlesticks ( + id SERIAL PRIMARY KEY, + -- The price at the start of a window. + open FLOAT8 NOT NULL, + -- The price at the close of a window. + close FLOAT8 NOT NULL, + -- The highest price reached during a window. + high FLOAT8 NOT NULL, + -- The lowest price reached during a window. + low FLOAT8 NOT NULL, + -- The volume traded directly through position executions. + direct_volume FLOAT8 NOT NULL, + -- The volume that traded indirectly, possibly through several positions. + swap_volume FLOAT8 NOT NULL +); + +-- Contains, for each directed asset pair and window type, candle sticks for each window. +CREATE TABLE IF NOT EXISTS dex_ex_price_charts ( + -- We just want a simple primary key to have here. + id SERIAL PRIMARY KEY, + -- The bytes for the first asset in the directed pair. + asset_start BYTEA NOT NULL, + -- The bytes for the second asset in the directed pair. + asset_end BYTEA NOT NULL, + -- The window type for this stick. + -- + -- Enum types are annoying. + the_window TEXT NOT NULL, + -- The start time of this window. + start_time TIMESTAMPTZ NOT NULL, + -- The start time for the window this stick is about. + candlestick_id INTEGER UNIQUE REFERENCES dex_ex_candlesticks (id) +); + +CREATE UNIQUE INDEX ON dex_ex_price_charts (asset_start, asset_end, the_window, start_time); + +CREATE TABLE IF NOT EXISTS _dex_ex_summary_backing ( + asset_start BYTEA NOT NULL, + asset_end BYTEA NOT NULL, + -- The time for this bit of information. + time TIMESTAMPTZ NOT NULL, + -- The price at this point. + price FLOAT8 NOT NULL, + -- The volume for this particular candle. + direct_volume FLOAT8 NOT NULL, + swap_volume FLOAT8 NOT NULL, + PRIMARY KEY (asset_start, asset_end, time) +); + +CREATE TABLE IF NOT EXISTS dex_ex_summary ( + -- The first asset of the directed pair. + asset_start BYTEA NOT NULL, + -- The second asset of the directed pair. + asset_end BYTEA NOT NULL, + -- The current price (in terms of asset2) + current_price FLOAT8 NOT NULL, + -- The price 24h ago. + price_24h_ago FLOAT8 NOT NULL, + -- The highest price over the past 24h. + high_24h FLOAT8 NOT NULL, + -- The lowest price over the past 24h. + low_24h FLOAT8 NOT NULL, + -- c.f. candlesticks for the difference between these two + direct_volume_24h FLOAT8 NOT NULL, + swap_volume_24h FLOAT8 NOT NULL, + PRIMARY KEY (asset_start, asset_end) +); + +CREATE TABLE IF NOT EXISTS _dex_ex_queue ( + id SERIAL PRIMARY KEY, + height BIGINT NOT NULL, + data BYTEA NOT NULL +); diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 6a7c5e8208..950000fdd3 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -10,7 +10,7 @@ impl IndexerExt for cometindex::Indexer { .with_index(crate::stake::DelegationTxs {}) .with_index(crate::stake::UndelegationTxs {}) .with_index(crate::governance::GovernanceProposals {}) - .with_index(crate::dex::Component::new()) + .with_index(crate::dex_ex::Component::new()) .with_index(crate::supply::Component::new()) .with_index(crate::ibc::Component::new()) } diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index 353aaaf0b2..e2c2d63476 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -4,6 +4,7 @@ mod indexer_ext; pub use indexer_ext::IndexerExt; pub mod block; pub mod dex; +pub mod dex_ex; pub mod ibc; mod parsing; pub mod shielded_pool; diff --git a/crates/core/component/sct/src/component/tree.rs b/crates/core/component/sct/src/component/tree.rs index b0d987a109..9bbd83e263 100644 --- a/crates/core/component/sct/src/component/tree.rs +++ b/crates/core/component/sct/src/component/tree.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use cnidarium::{StateRead, StateWrite}; -use penumbra_proto::{StateReadProto, StateWriteProto}; +use penumbra_proto::{DomainType as _, StateReadProto, StateWriteProto}; use penumbra_tct as tct; use tct::builder::{block, epoch}; use tracing::instrument; @@ -85,7 +85,14 @@ pub trait SctManager: StateWrite { self.put(state_key::tree::anchor_by_height(height), sct_anchor); self.record_proto(event::anchor(height, sct_anchor, block_timestamp)); - self.record_proto(event::block_root(height, block_root, block_timestamp)); + self.record_proto( + event::EventBlockRoot { + height, + root: block_root, + timestamp_seconds: block_timestamp, + } + .to_proto(), + ); // Only record an epoch root event if we are ending the epoch. if let Some(epoch_root) = epoch_root { let index = self diff --git a/crates/core/component/sct/src/event.rs b/crates/core/component/sct/src/event.rs index 65a462b0f8..43bef0da20 100644 --- a/crates/core/component/sct/src/event.rs +++ b/crates/core/component/sct/src/event.rs @@ -1,8 +1,12 @@ +use anyhow::{anyhow, Context as _}; use pbjson_types::Timestamp; use penumbra_tct as tct; use tct::builder::{block, epoch}; -use penumbra_proto::core::component::sct::v1 as pb; +use penumbra_proto::{ + core::component::sct::v1::{self as pb}, + DomainType, Name as _, +}; use crate::CommitmentSource; @@ -17,17 +21,48 @@ pub fn anchor(height: u64, anchor: tct::Root, timestamp: i64) -> pb::EventAnchor } } -pub fn block_root(height: u64, root: block::Root, timestamp: i64) -> pb::EventBlockRoot { - pb::EventBlockRoot { - height, - root: Some(root.into()), - timestamp: Some(Timestamp { - seconds: timestamp, - nanos: 0, - }), +#[derive(Debug, Clone)] +pub struct EventBlockRoot { + pub height: u64, + pub root: block::Root, + pub timestamp_seconds: i64, +} + +impl TryFrom for EventBlockRoot { + type Error = anyhow::Error; + + fn try_from(value: pb::EventBlockRoot) -> Result { + fn inner(value: pb::EventBlockRoot) -> anyhow::Result { + Ok(EventBlockRoot { + height: value.height, + root: value.root.ok_or(anyhow!("missing `root`"))?.try_into()?, + timestamp_seconds: value + .timestamp + .ok_or(anyhow!("missing `timestamp`"))? + .seconds, + }) + } + inner(value).context(format!("parsing {}", pb::EventBlockRoot::NAME)) } } +impl From for pb::EventBlockRoot { + fn from(value: EventBlockRoot) -> Self { + Self { + height: value.height, + root: Some(value.root.into()), + timestamp: Some(Timestamp { + seconds: value.timestamp_seconds, + nanos: 0, + }), + } + } +} + +impl DomainType for EventBlockRoot { + type Proto = pb::EventBlockRoot; +} + pub fn epoch_root(index: u64, root: epoch::Root, timestamp: i64) -> pb::EventEpochRoot { pb::EventEpochRoot { index,