diff --git a/Cargo.lock b/Cargo.lock index 91dd894a17..692da0cb07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1364,10 +1364,12 @@ dependencies = [ "clap", "futures", "hex", + "prost 0.13.4", "serde_json", "sqlx", "tap", "tendermint 0.40.1", + "tendermint-proto 0.40.1", "tokio", "tracing", "tracing-subscriber 0.3.18", diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 0157582936..ec933ea5c4 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -796,9 +796,9 @@ impl Events { pub fn extract(block: &BlockEvents) -> anyhow::Result { let mut out = Self::new(); - out.height = block.height as i32; + out.height = block.height() as i32; - for event in &block.events { + for event in block.events() { if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { let candle = Candle::from_candlestick_data(&e.stick); out.with_candle(e.pair, candle); @@ -818,7 +818,7 @@ impl Events { }, false, ); - if let Some(tx_hash) = event.tx_hash { + if let Some(tx_hash) = event.tx_hash() { out.position_open_txs.insert(e.position_id, tx_hash); } // A newly opened position might be executed against in this block, @@ -838,7 +838,7 @@ impl Events { }, true, ); - if let Some(tx_hash) = event.tx_hash { + if let Some(tx_hash) = event.tx_hash() { out.position_withdrawal_txs.insert(e.position_id, tx_hash); } out.position_withdrawals.push(e); @@ -873,7 +873,7 @@ impl Events { } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) { // The position close event is emitted by the dex module at EOB, // so we need to track it with the tx hash of the closure tx. - if let Some(tx_hash) = event.tx_hash { + if let Some(tx_hash) = event.tx_hash() { out.position_close_txs.insert(e.position_id, tx_hash); } } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) { @@ -1429,19 +1429,19 @@ impl AppView for Component { let mut events = Events::extract(&block)?; let time = events .time - .expect(&format!("no block root event at height {}", block.height)); + .expect(&format!("no block root event at height {}", block.height())); last_time = Some(time); // Load any missing positions before processing events events.load_positions(dbtx).await?; // This is where we are going to build the block summary for the DEX. - self.record_block_summary(dbtx, time, block.height as i32, &events) + self.record_block_summary(dbtx, time, block.height() as i32, &events) .await?; // Record batch swap execution traces. for event in &events.batch_swaps { - self.record_batch_swap_traces(dbtx, time, block.height as i32, event) + self.record_batch_swap_traces(dbtx, time, block.height() as i32, event) .await?; } diff --git a/crates/bin/pindexer/src/governance.rs b/crates/bin/pindexer/src/governance.rs index ed8d1e2777..f947fdca4f 100644 --- a/crates/bin/pindexer/src/governance.rs +++ b/crates/bin/pindexer/src/governance.rs @@ -30,7 +30,7 @@ impl GovernanceProposals { async fn index_event( &self, dbtx: &mut PgTransaction<'_>, - event: &ContextualizedEvent, + event: ContextualizedEvent<'_>, ) -> Result<(), anyhow::Error> { match event.event.kind.as_str() { EVENT_PROPOSAL_SUBMIT => { diff --git a/crates/bin/pindexer/src/ibc/mod.rs b/crates/bin/pindexer/src/ibc/mod.rs index 2f3923905c..975a7179c8 100644 --- a/crates/bin/pindexer/src/ibc/mod.rs +++ b/crates/bin/pindexer/src/ibc/mod.rs @@ -71,10 +71,10 @@ enum Event { }, } -impl TryFrom<&ContextualizedEvent> for Event { +impl TryFrom> for Event { type Error = anyhow::Error; - fn try_from(event: &ContextualizedEvent) -> Result { + fn try_from(event: ContextualizedEvent<'_>) -> Result { match EventKind::try_from(event.event.kind.as_str())? { EventKind::InboundTransfer => { let pe = pb::EventInboundFungibleTokenTransfer::from_event(&event.event)?; diff --git a/crates/bin/pindexer/src/insights/mod.rs b/crates/bin/pindexer/src/insights/mod.rs index 272c7ff73b..88a07e7973 100644 --- a/crates/bin/pindexer/src/insights/mod.rs +++ b/crates/bin/pindexer/src/insights/mod.rs @@ -290,7 +290,7 @@ impl Component { async fn index_event( &self, dbtx: &mut PgTransaction<'_>, - event: &ContextualizedEvent, + event: ContextualizedEvent<'_>, ) -> Result<(), anyhow::Error> { let height = event.block_height; if let Ok(e) = EventUndelegate::try_from_event(&event.event) { diff --git a/crates/bin/pindexer/src/stake/delegation_txs.rs b/crates/bin/pindexer/src/stake/delegation_txs.rs index 2744ad305f..eddec1faa6 100644 --- a/crates/bin/pindexer/src/stake/delegation_txs.rs +++ b/crates/bin/pindexer/src/stake/delegation_txs.rs @@ -74,7 +74,7 @@ impl AppView for DelegationTxs { .bind(ik.to_string()) .bind(amount.value() as i64) .bind(event.block_height as i64) - .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) + .bind(event.tx_hash().ok_or_else(|| anyhow!("missing tx hash in event"))?) .execute(dbtx.as_mut()) .await?; } diff --git a/crates/bin/pindexer/src/stake/slashings.rs b/crates/bin/pindexer/src/stake/slashings.rs index 47f529f3c1..abe4d34aa4 100644 --- a/crates/bin/pindexer/src/stake/slashings.rs +++ b/crates/bin/pindexer/src/stake/slashings.rs @@ -13,7 +13,7 @@ impl Slashings { async fn index_event( &self, dbtx: &mut PgTransaction<'_>, - event: &ContextualizedEvent, + event: ContextualizedEvent<'_>, ) -> Result<(), anyhow::Error> { let pe = match pb::EventSlashingPenaltyApplied::from_event(event.as_ref()) { Ok(pe) => pe, diff --git a/crates/bin/pindexer/src/stake/undelegation_txs.rs b/crates/bin/pindexer/src/stake/undelegation_txs.rs index b045b79c57..bc6edf7dc8 100644 --- a/crates/bin/pindexer/src/stake/undelegation_txs.rs +++ b/crates/bin/pindexer/src/stake/undelegation_txs.rs @@ -74,7 +74,7 @@ impl AppView for UndelegationTxs { .bind(ik.to_string()) .bind(amount.value() as i64) .bind(event.block_height as i64) - .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) + .bind(event.tx_hash().ok_or_else(|| anyhow!("missing tx hash in event"))?) .execute(dbtx.as_mut()) .await?; } diff --git a/crates/bin/pindexer/src/stake/validator_set.rs b/crates/bin/pindexer/src/stake/validator_set.rs index 2d637b3ea4..fbc20d560f 100644 --- a/crates/bin/pindexer/src/stake/validator_set.rs +++ b/crates/bin/pindexer/src/stake/validator_set.rs @@ -23,7 +23,7 @@ impl ValidatorSet { async fn index_event( &self, dbtx: &mut PgTransaction<'_>, - event: &ContextualizedEvent, + event: ContextualizedEvent<'_>, ) -> Result<(), anyhow::Error> { match event.event.kind.as_str() { "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => { diff --git a/crates/bin/pindexer/src/supply.rs b/crates/bin/pindexer/src/supply.rs index 0226ab55aa..f1356630f4 100644 --- a/crates/bin/pindexer/src/supply.rs +++ b/crates/bin/pindexer/src/supply.rs @@ -650,10 +650,10 @@ impl Event { } } -impl<'a> TryFrom<&'a ContextualizedEvent> for Event { +impl TryFrom> for Event { type Error = anyhow::Error; - fn try_from(event: &'a ContextualizedEvent) -> Result { + fn try_from(event: ContextualizedEvent<'_>) -> Result { match event.event.kind.as_str() { // undelegation x if x == Event::NAMES[0] => { diff --git a/crates/util/cometindex/Cargo.toml b/crates/util/cometindex/Cargo.toml index 2e3421b93b..0dd80e04bf 100644 --- a/crates/util/cometindex/Cargo.toml +++ b/crates/util/cometindex/Cargo.toml @@ -18,7 +18,9 @@ tracing = {workspace = true} tracing-subscriber = {workspace = true} sqlx = {workspace = true, features = ["postgres", "json", "runtime-tokio", "tls-rustls"] } async-trait = {workspace = true} +prost = {workspace = true} tendermint = {workspace = true} +tendermint-proto = {workspace = true} serde_json = {workspace = true} futures = {workspace = true} hex = {workspace = true} diff --git a/crates/util/cometindex/src/contextualized.rs b/crates/util/cometindex/src/contextualized.rs index 1805a3df41..c9e422d333 100644 --- a/crates/util/cometindex/src/contextualized.rs +++ b/crates/util/cometindex/src/contextualized.rs @@ -1,10 +1,10 @@ use tendermint::abci::Event; -#[derive(Clone, Debug)] -pub struct ContextualizedEvent { - pub event: Event, +#[derive(Clone, Copy, Debug)] +pub struct ContextualizedEvent<'block> { + pub event: &'block Event, pub block_height: u64, - pub tx_hash: Option<[u8; 32]>, + pub tx: Option<([u8; 32], &'block [u8])>, /// The rowid of the event in the local database. /// /// Note that this is a purely local identifier and won't be the same across @@ -12,7 +12,17 @@ pub struct ContextualizedEvent { pub local_rowid: i64, } -impl AsRef for ContextualizedEvent { +impl<'block> ContextualizedEvent<'block> { + pub fn tx_hash(&self) -> Option<[u8; 32]> { + self.tx.map(|x| x.0) + } + + pub fn tx_data(&self) -> Option<&'block [u8]> { + self.tx.map(|x| x.1) + } +} + +impl<'tx> AsRef for ContextualizedEvent<'tx> { fn as_ref(&self) -> &Event { &self.event } diff --git a/crates/util/cometindex/src/index.rs b/crates/util/cometindex/src/index.rs index 58ac084d2c..3901816d81 100644 --- a/crates/util/cometindex/src/index.rs +++ b/crates/util/cometindex/src/index.rs @@ -1,20 +1,88 @@ -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use async_trait::async_trait; pub use sqlx::PgPool; use sqlx::{Postgres, Transaction}; +use tendermint::abci::Event; use crate::ContextualizedEvent; pub type PgTransaction<'a> = Transaction<'a, Postgres>; +#[derive(Clone, Copy, Debug)] +struct EventReference { + /// Which event in the block this is. + pub event_index: usize, + pub tx_hash: Option<[u8; 32]>, + pub local_rowid: i64, +} + /// Represents all of the events in a given block #[derive(Clone, Debug)] pub struct BlockEvents { - /// The height of this block. - pub height: u64, - /// The events contained in this block, in order. - pub events: Vec, + height: u64, + event_refs: Vec, + events: Vec, + transactions: BTreeMap<[u8; 32], Vec>, +} + +// The builder interface for our own crate. +impl BlockEvents { + pub(crate) fn new(height: u64) -> Self { + const EXPECTED_EVENTS: usize = 32; + + Self { + height, + event_refs: Vec::with_capacity(EXPECTED_EVENTS), + events: Vec::with_capacity(EXPECTED_EVENTS), + transactions: BTreeMap::new(), + } + } + + /// Register a transaction in this block. + pub(crate) fn push_tx(&mut self, hash: [u8; 32], data: Vec) { + self.transactions.insert(hash, data); + } + + /// Register an event in this block. + pub(crate) fn push_event(&mut self, event: Event, tx_hash: Option<[u8; 32]>, local_rowid: i64) { + let event_index = self.events.len(); + self.events.push(event); + self.event_refs.push(EventReference { + event_index, + tx_hash, + local_rowid, + }); + } +} + +impl BlockEvents { + pub fn height(&self) -> u64 { + self.height + } + + fn contextualize(&self, event_ref: EventReference) -> ContextualizedEvent<'_> { + let event = &self.events[event_ref.event_index]; + let tx = event_ref + .tx_hash + .and_then(|h| Some((h, self.transactions.get(&h)?.as_slice()))); + ContextualizedEvent { + event, + block_height: self.height, + tx, + local_rowid: event_ref.local_rowid, + } + } + + /// Iterate over the events in this block, in the order that they appear. + pub fn events(&self) -> impl Iterator> { + self.event_refs.iter().map(|x| self.contextualize(*x)) + } + + /// Iterate over transactions (and their hashes) in the order they appear in the block. + pub fn transactions(&self) -> impl Iterator { + self.transactions.iter().map(|x| (*x.0, x.1.as_slice())) + } } #[derive(Clone, Debug)] @@ -71,8 +139,8 @@ impl EventBatch { self.by_height.iter().skip(skip) } - pub fn events(&self) -> impl Iterator { - self.events_by_block().flat_map(|x| x.events.iter()) + pub fn events(&self) -> impl Iterator> { + self.events_by_block().flat_map(|x| x.events()) } } diff --git a/crates/util/cometindex/src/indexer/indexing_state.rs b/crates/util/cometindex/src/indexer/indexing_state.rs index 07ee7b6b80..34d1424f6d 100644 --- a/crates/util/cometindex/src/indexer/indexing_state.rs +++ b/crates/util/cometindex/src/indexer/indexing_state.rs @@ -1,13 +1,11 @@ -use std::collections::HashMap; - -use futures::TryStreamExt; +use anyhow::anyhow; +use futures::{Stream, StreamExt, TryStreamExt}; +use prost::Message as _; use sqlx::{postgres::PgPoolOptions, PgPool, Postgres, Transaction}; -use tendermint::abci; +use std::collections::HashMap; +use tendermint::abci::{self, Event}; -use crate::{ - index::{BlockEvents, EventBatch}, - ContextualizedEvent, -}; +use crate::index::{BlockEvents, EventBatch}; /// Create a Database, with, for sanity, some read only settings. /// @@ -158,24 +156,60 @@ impl IndexingState { Ok(()) } - pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result { - // The amount of events we expect a block to have. - const WORKING_CAPACITY: usize = 32; + fn transactions_between( + &self, + first: Height, + last: Height, + ) -> impl Stream)>> + '_ { + async fn parse_row( + row: (i64, String, Vec), + ) -> anyhow::Result<(Height, [u8; 32], Vec)> { + let tx_hash: [u8; 32] = hex::decode(row.1)? + .try_into() + .map_err(|_| anyhow!("expected 32 byte hash"))?; + let tx_result = tendermint_proto::v0_37::abci::TxResult::decode(row.2.as_slice())?; + let transaction = tx_result.tx.to_vec(); + let height = Height::try_from(row.0)?; + Ok((height, tx_hash, transaction)) + } - let mut by_height = Vec::with_capacity((last.0 - first.0 + 1) as usize); - let mut event_stream = - sqlx::query_as::<_, (i64, String, i64, Option, serde_json::Value)>( - // This query does some shenanigans to ensure good performance. - // The main trick is that we know that each event has 1 block and <= 1 transaction associated - // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and - // then a sort, and instead work from the events in a linear fashion. - // Basically, this query ends up doing: - // - // for event in events >= id: - // attach attributes - // attach block - // attach transaction? - r#" + sqlx::query_as::<_, (i64, String, Vec)>( + r#" +SELECT height, tx_hash, tx_result +FROM tx_results +LEFT JOIN LATERAL ( + SELECT height FROM blocks WHERE blocks.rowid = tx_results.block_id LIMIT 1 +) ON TRUE +WHERE + block_id >= (SELECT rowid FROM blocks where height = $1) +AND + block_id <= (SELECT rowid FROM blocks where height = $2) +"#, + ) + .bind(first) + .bind(last) + .fetch(&self.src) + .map_err(|e| anyhow::Error::from(e).context("error reading from database")) + .and_then(parse_row) + } + + fn events_between( + &self, + first: Height, + last: Height, + ) -> impl Stream, i64)>> + '_ { + sqlx::query_as::<_, (i64, String, i64, Option, serde_json::Value)>( + // This query does some shenanigans to ensure good performance. + // The main trick is that we know that each event has 1 block and <= 1 transaction associated + // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and + // then a sort, and instead work from the events in a linear fashion. + // Basically, this query ends up doing: + // + // for event in events >= id: + // attach attributes + // attach block + // attach transaction hash? + r#" SELECT events.rowid, events.type, @@ -213,85 +247,54 @@ ON TRUE ORDER BY events.rowid ASC "#, - ) - .bind(first) - .bind(last) - .fetch(&self.src) - .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| { - tracing::debug!(?local_rowid, type_str, height, ?tx_hash); - let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| { - hex::decode(s) - .expect("invalid tx_hash") - .try_into() - .expect("expected 32 bytes") - }); - let block_height = height as u64; - - let serde_json::Value::Object(attrs) = attrs else { - // saves an allocation below bc we can take ownership - panic!("expected JSON object"); - }; - - let event = abci::Event { - kind: type_str, - attributes: attrs - .into_iter() - .filter_map(|(k, v)| match v { - serde_json::Value::String(s) => Some((k, s)), - // we never hit this because of how we constructed the query - _ => None, - }) - .map(Into::into) - .collect(), - }; - - let ce = ContextualizedEvent { - event, - block_height, - tx_hash, - local_rowid, - }; + ) + .bind(first) + .bind(last) + .fetch(&self.src) + .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| { + tracing::debug!(?local_rowid, type_str, height, ?tx_hash); + let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| { + hex::decode(s) + .expect("invalid tx_hash") + .try_into() + .expect("expected 32 bytes") + }); + let serde_json::Value::Object(attrs) = attrs else { + // saves an allocation below bc we can take ownership + panic!("expected JSON object"); + }; - ce - }) - .map_err(|e| anyhow::Error::from(e).context("error reading from database")); + let event = abci::Event { + kind: type_str, + attributes: attrs + .into_iter() + .filter_map(|(k, v)| match v { + serde_json::Value::String(s) => Some((k, s)), + // we never hit this because of how we constructed the query + _ => None, + }) + .map(Into::into) + .collect(), + }; + let height = Height::try_from(height).expect("failed to decode height"); + (height, event, tx_hash, local_rowid) + }) + .map_err(|e| anyhow::Error::from(e).context("error reading from database")) + } - let mut height = first.0; - let mut current_batch = BlockEvents { - height: first.0, - events: Vec::with_capacity(WORKING_CAPACITY), - }; - while let Some(e) = event_stream.try_next().await? { - assert!(e.block_height >= height); - if e.block_height > height { - by_height.push(current_batch); - height = e.block_height; - current_batch = BlockEvents { - height, - events: Vec::with_capacity(WORKING_CAPACITY), - }; - } - current_batch.events.push(e); + pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result { + let mut out = (first.0..=last.0) + .map(|height| BlockEvents::new(height)) + .collect::>(); + let mut tx_stream = self.transactions_between(first, last).boxed(); + while let Some((height, tx_hash, tx_data)) = tx_stream.try_next().await? { + out[(height.0 - first.0) as usize].push_tx(tx_hash, tx_data); } - // Flush the current block, and create empty ones for the remaining heights. - // - // This is the correct behavior *assuming* that the caller has already checked - // that the raw events database has indexed all the blocks up to and including - // the provided last height. In that case, imagine if there were never any events - // at all. In that case, what we would need to do is to push empty blocks - // starting from `first` and up to and including `last`. - // - // Usually, there are events every block, so this code just serves to push - // the final block. - while height <= last.0 { - by_height.push(current_batch); - height += 1; - current_batch = BlockEvents { - height, - events: Vec::new(), - }; + let mut events_stream = self.events_between(first, last).boxed(); + while let Some((height, event, tx_hash, local_rowid)) = events_stream.try_next().await? { + out[(height.0 - first.0) as usize].push_event(event, tx_hash, local_rowid); } - Ok(EventBatch::new(by_height)) + Ok(EventBatch::new(out)) } pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result {