From 0a8002c4d4bf3afd9a0c4c0b008e6c0be85ebc45 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Wed, 23 Oct 2024 11:06:03 -0600 Subject: [PATCH] fix: streaming works --- components/chainhook-cli/src/storage/mod.rs | 2 - .../src/chainhooks/stacks/mod.rs | 58 +++++++++++++------ components/chainhook-sdk/src/indexer/mod.rs | 11 ++-- .../chainhook-sdk/src/indexer/stacks/mod.rs | 12 ++-- .../chainhook-sdk/src/indexer/stacks/tests.rs | 12 +--- components/chainhook-sdk/src/observer/mod.rs | 15 ++++- components/chainhook-sdk/src/utils/mod.rs | 19 +----- components/chainhook-types-rs/src/rosetta.rs | 11 +++- components/chainhook-types-rs/src/signers.rs | 2 - .../typescript/src/schemas/stacks/payload.ts | 6 +- .../typescript/src/schemas/stacks/signers.ts | 18 +++--- 11 files changed, 87 insertions(+), 79 deletions(-) diff --git a/components/chainhook-cli/src/storage/mod.rs b/components/chainhook-cli/src/storage/mod.rs index 820fbe7dd..55fa9e8f9 100644 --- a/components/chainhook-cli/src/storage/mod.rs +++ b/components/chainhook-cli/src/storage/mod.rs @@ -1,5 +1,3 @@ -pub mod signers; - use std::collections::VecDeque; use std::path::PathBuf; diff --git a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs index 997f5a2cb..d1cf96ab2 100644 --- a/components/chainhook-sdk/src/chainhooks/stacks/mod.rs +++ b/components/chainhook-sdk/src/chainhooks/stacks/mod.rs @@ -744,18 +744,25 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>( } #[cfg(feature = "stacks-signers")] StacksChainEvent::ChainUpdatedWithNonConsensusEvents(data) => { - for chainhook in active_chainhooks.iter() { - evaluated_predicates.insert(chainhook.uuid.as_str(), &data.received_at_block); - let (occurrences, mut expirations) = - evaluate_stacks_predicate_on_non_consensus_events(&data.events, chainhook, ctx); - expired_predicates.append(&mut expirations); - if occurrences.len() > 0 { - triggered_predicates.push(StacksTriggerChainhook { - chainhook, - apply: vec![], - rollback: vec![], - events: occurrences, - }); + if let Some(first_event) = data.events.first() { + for chainhook in active_chainhooks.iter() { + evaluated_predicates + .insert(chainhook.uuid.as_str(), &first_event.received_at_block); + let (occurrences, mut expirations) = + evaluate_stacks_predicate_on_non_consensus_events( + &data.events, + chainhook, + ctx, + ); + expired_predicates.append(&mut expirations); + if occurrences.len() > 0 { + triggered_predicates.push(StacksTriggerChainhook { + chainhook, + apply: vec![], + rollback: vec![], + events: occurrences, + }); + } } } } @@ -832,7 +839,7 @@ pub fn evaluate_stacks_predicate_on_block<'a>( | StacksPredicate::PrintEvent(_) | StacksPredicate::Txid(_) => unreachable!(), #[cfg(feature = "stacks-signers")] - StacksPredicate::SignerMessage(_) => unreachable!(), + StacksPredicate::SignerMessage(_) => false, } } @@ -845,8 +852,6 @@ pub fn evaluate_stacks_predicate_on_non_consensus_events<'a>( Vec<&'a StacksNonConsensusEventData>, BTreeMap<&'a str, &'a BlockIdentifier>, ) { - use crate::utils::AbstractStacksNonConsensusEvent; - let mut occurrences = vec![]; let expired_predicates = BTreeMap::new(); for event in events { @@ -854,8 +859,7 @@ pub fn evaluate_stacks_predicate_on_non_consensus_events<'a>( StacksPredicate::SignerMessage(StacksSignerMessagePredicate::AfterTimestamp( timestamp, )) => { - let StacksNonConsensusEventData::SignerMessage(chunk) = event; - if chunk.get_timestamp() >= *timestamp as i64 { + if event.received_at >= *timestamp { occurrences.push(event); } } @@ -1086,10 +1090,27 @@ pub fn evaluate_stacks_predicate_on_transaction<'a>( } StacksPredicate::BlockHeight(_) => unreachable!(), #[cfg(feature = "stacks-signers")] - StacksPredicate::SignerMessage(_) => unreachable!(), + StacksPredicate::SignerMessage(_) => false, } } +#[cfg(feature = "stacks-signers")] +fn serialize_stacks_non_consensus_event( + event: &StacksNonConsensusEventData, + _ctx: &Context, +) -> serde_json::Value { + use chainhook_types::StacksNonConsensusEventPayloadData; + + let payload = match &event.payload { + StacksNonConsensusEventPayloadData::SignerMessage(chunk) => chunk, + }; + json!({ + "payload": payload, + "received_at": event.received_at, + "received_at_block": event.received_at_block, + }) +} + fn serialize_stacks_block( block: &dyn AbstractStacksBlock, transactions: Vec<&StacksTransactionData>, @@ -1400,6 +1421,7 @@ pub fn serialize_stacks_payload_to_json<'a>( "rollback": trigger.rollback.into_iter().map(|(transactions, block)| { serialize_stacks_block(block, transactions, decode_clarity_values, include_contract_abi, ctx) }).collect::>(), + "events": trigger.events.into_iter().map(|event| serialize_stacks_non_consensus_event(event, ctx)).collect::>(), "chainhook": { "uuid": trigger.chainhook.uuid, "predicate": trigger.chainhook.predicate, diff --git a/components/chainhook-sdk/src/indexer/mod.rs b/components/chainhook-sdk/src/indexer/mod.rs index bffc030b5..9aaea364f 100644 --- a/components/chainhook-sdk/src/indexer/mod.rs +++ b/components/chainhook-sdk/src/indexer/mod.rs @@ -176,14 +176,13 @@ impl Indexer { ) -> Result, String> { use chainhook_types::{ StacksChainUpdatedWithNonConsensusEventsData, StacksNonConsensusEventData, + StacksNonConsensusEventPayloadData, }; let Some(chain_tip) = self.stacks_blocks_pool.get_canonical_fork_chain_tip() else { return Err("StackerDB chunk received with no canonical chain tip".to_string()); }; let chunks = stacks::standardize_stacks_marshalled_stackerdb_chunks( marshalled_stackerdb_chunks, - receipt_time, - chain_tip, ctx, )?; if chunks.len() > 0 { @@ -191,10 +190,12 @@ impl Indexer { StacksChainUpdatedWithNonConsensusEventsData { events: chunks .into_iter() - .map(|chunk| StacksNonConsensusEventData::SignerMessage(chunk)) + .map(|chunk| StacksNonConsensusEventData { + payload: StacksNonConsensusEventPayloadData::SignerMessage(chunk), + received_at: receipt_time, + received_at_block: chain_tip.clone(), + }) .collect(), - received_at: receipt_time, - received_at_block: chain_tip.clone(), }, ))) } else { diff --git a/components/chainhook-sdk/src/indexer/stacks/mod.rs b/components/chainhook-sdk/src/indexer/stacks/mod.rs index 8de21e9b7..1c9308dc8 100644 --- a/components/chainhook-sdk/src/indexer/stacks/mod.rs +++ b/components/chainhook-sdk/src/indexer/stacks/mod.rs @@ -648,21 +648,17 @@ pub fn standardize_stacks_microblock_trail( #[cfg(feature = "stacks-signers")] pub fn standardize_stacks_marshalled_stackerdb_chunks( marshalled_stackerdb_chunks: JsonValue, - receipt_time: u64, - chain_tip: &BlockIdentifier, _ctx: &Context, ) -> Result, String> { let mut stackerdb_chunks: NewStackerDbChunks = serde_json::from_value(marshalled_stackerdb_chunks) .map_err(|e| format!("unable to parse stackerdb chunks {e}"))?; - standardize_stacks_stackerdb_chunks(&mut stackerdb_chunks, receipt_time, chain_tip) + standardize_stacks_stackerdb_chunks(&mut stackerdb_chunks) } #[cfg(feature = "stacks-signers")] pub fn standardize_stacks_stackerdb_chunks( stackerdb_chunks: &NewStackerDbChunks, - receipt_time: u64, - chain_tip: &BlockIdentifier, ) -> Result, String> { use stacks_codec::codec::BlockResponse; use stacks_codec::codec::RejectCode; @@ -761,8 +757,6 @@ pub fn standardize_stacks_stackerdb_chunks( sig: slot.sig.clone(), pubkey: get_signer_pubkey_from_stackerdb_chunk_slot(slot, &data_bytes)?, message, - received_at: receipt_time, - received_at_block: chain_tip.clone(), }); } @@ -770,7 +764,9 @@ pub fn standardize_stacks_stackerdb_chunks( } #[cfg(feature = "stacks-signers")] -pub fn standardize_stacks_nakamoto_block(block: &stacks_codec::codec::NakamotoBlock) -> NakamotoBlockData { +pub fn standardize_stacks_nakamoto_block( + block: &stacks_codec::codec::NakamotoBlock, +) -> NakamotoBlockData { use miniscript::bitcoin::hex::Case; use miniscript::bitcoin::hex::DisplayHex; diff --git a/components/chainhook-sdk/src/indexer/stacks/tests.rs b/components/chainhook-sdk/src/indexer/stacks/tests.rs index b129121bc..b26a284be 100644 --- a/components/chainhook-sdk/src/indexer/stacks/tests.rs +++ b/components/chainhook-sdk/src/indexer/stacks/tests.rs @@ -402,7 +402,7 @@ fn into_chainhook_event_rejects_invalid_missing_event() { #[test] #[cfg(feature = "stacks-signers")] fn stackerdb_chunks_covert_into_signer_messages() { - use chainhook_types::{BlockIdentifier, BlockResponseData, StacksSignerMessage}; + use chainhook_types::{BlockResponseData, StacksSignerMessage}; use crate::indexer::tests::helpers::stacks_events::create_new_stackerdb_chunk; @@ -413,15 +413,7 @@ fn stackerdb_chunks_covert_into_signer_messages() { "01fc3c06f6e0ae5b13c9bb53763661817e55c8e7f1ecab8b4d4b65b283d2dd39f0099e3ea1e25e765f4f0e1dfb0a432309a16a2ec10940e1a14cb9e9b1cbf27edc".to_string(), "010074aff146904763a787aa14c614d0dd1fc63b537bdb2fd351cdf881f6db75f986005eb55250597b25acbf99d3dd3c2fa8189046e1b5d21309a44cbaf2b327c09b0159a01ed3f0094bfa9e5f72f5d894e12ce252081eab5396eb8bba137bddfc365b".to_string() ); - let parsed_chunk = standardize_stacks_stackerdb_chunks( - &new_chunks, - 1729013425, - &BlockIdentifier { - index: 170355, - hash: "0x519df2ad0d86a62dc078865486a1f3dbb2f1a8934da81b679561738820964fe0".to_string(), - }, - ) - .unwrap(); + let parsed_chunk = standardize_stacks_stackerdb_chunks(&new_chunks).unwrap(); assert_eq!(parsed_chunk.len(), 1); let message = &parsed_chunk[0]; diff --git a/components/chainhook-sdk/src/observer/mod.rs b/components/chainhook-sdk/src/observer/mod.rs index 22bf564f5..038642b4d 100644 --- a/components/chainhook-sdk/src/observer/mod.rs +++ b/components/chainhook-sdk/src/observer/mod.rs @@ -897,7 +897,8 @@ pub async fn start_bitcoin_event_observer( let ctx_moved = ctx.clone(); let config_moved = config.clone(); let _ = hiro_system_kit::thread_named("ZMQ handler").spawn(move || { - let future = zmq::start_zeromq_runloop(&config_moved, _observer_commands_tx, &ctx_moved); + let future = + zmq::start_zeromq_runloop(&config_moved, _observer_commands_tx, &ctx_moved); hiro_system_kit::nestable_block_on(future); }); } @@ -1658,12 +1659,20 @@ pub async fn start_observer_commands_handler( report.track_expiration(uuid, block_identifier); } for entry in predicates_triggered.iter() { - let blocks_ids = entry + let mut block_ids = entry .apply .iter() .map(|e| e.1.get_identifier()) .collect::>(); - report.track_trigger(&entry.chainhook.uuid, &blocks_ids); + let mut event_block_ids = entry + .events + .iter() + .map(|e| &e.received_at_block) + .collect::>(); + if event_block_ids.len() > 0 { + block_ids.append(&mut event_block_ids); + } + report.track_trigger(&entry.chainhook.uuid, &block_ids); } ctx.try_log(|logger| { slog::info!( diff --git a/components/chainhook-sdk/src/utils/mod.rs b/components/chainhook-sdk/src/utils/mod.rs index f2de01d3e..b74f2edbe 100644 --- a/components/chainhook-sdk/src/utils/mod.rs +++ b/components/chainhook-sdk/src/utils/mod.rs @@ -6,7 +6,7 @@ use std::{ }; use chainhook_types::{ - BitcoinBlockData, BlockHeader, BlockIdentifier, StacksBlockData, StacksMicroblockData, StacksStackerDbChunk, StacksTransactionData + BitcoinBlockData, BlockHeader, BlockIdentifier, StacksBlockData, StacksMicroblockData, StacksTransactionData }; use hiro_system_kit::slog::{self, Logger}; use reqwest::RequestBuilder; @@ -92,23 +92,6 @@ impl AbstractStacksBlock for StacksMicroblockData { } } -/// Trait for Stacks events that are not part of the blockchain consensus but are otherwise broadcasted by Stacks nodes to event -/// listeners. -pub trait AbstractStacksNonConsensusEvent { - fn get_timestamp(&self) -> i64; - fn get_received_at_block(&self) -> BlockIdentifier; -} - -impl AbstractStacksNonConsensusEvent for StacksStackerDbChunk { - fn get_timestamp(&self) -> i64 { - self.received_at as i64 - } - - fn get_received_at_block(&self) -> BlockIdentifier { - self.received_at_block.clone() - } -} - pub trait AbstractBlock { fn get_identifier(&self) -> &BlockIdentifier; fn get_parent_identifier(&self) -> &BlockIdentifier; diff --git a/components/chainhook-types-rs/src/rosetta.rs b/components/chainhook-types-rs/src/rosetta.rs index 8597f527c..8507a70c6 100644 --- a/components/chainhook-types-rs/src/rosetta.rs +++ b/components/chainhook-types-rs/src/rosetta.rs @@ -666,10 +666,17 @@ pub struct BlockchainUpdatedWithReorg { #[derive(Clone, Debug, PartialEq, Serialize)] #[serde(tag = "type", content = "data")] -pub enum StacksNonConsensusEventData { +pub enum StacksNonConsensusEventPayloadData { SignerMessage(StacksStackerDbChunk), } +#[derive(Clone, Debug, PartialEq, Serialize)] +pub struct StacksNonConsensusEventData { + pub payload: StacksNonConsensusEventPayloadData, + pub received_at: u64, + pub received_at_block: BlockIdentifier, +} + #[derive(Debug, Clone, PartialEq, Serialize)] pub struct BlockHeader { pub block_identifier: BlockIdentifier, @@ -699,8 +706,6 @@ pub struct BitcoinChainUpdatedWithReorgData { #[derive(Debug, Clone, PartialEq, Serialize)] pub struct StacksChainUpdatedWithNonConsensusEventsData { pub events: Vec, - pub received_at: u64, - pub received_at_block: BlockIdentifier, } #[allow(dead_code)] diff --git a/components/chainhook-types-rs/src/signers.rs b/components/chainhook-types-rs/src/signers.rs index 7fb2efb4a..3fcd15cb3 100644 --- a/components/chainhook-types-rs/src/signers.rs +++ b/components/chainhook-types-rs/src/signers.rs @@ -93,6 +93,4 @@ pub struct StacksStackerDbChunk { pub sig: String, pub pubkey: String, pub message: StacksSignerMessage, - pub received_at: u64, - pub received_at_block: BlockIdentifier, } diff --git a/components/client/typescript/src/schemas/stacks/payload.ts b/components/client/typescript/src/schemas/stacks/payload.ts index ef5969eb3..5d69d59a5 100644 --- a/components/client/typescript/src/schemas/stacks/payload.ts +++ b/components/client/typescript/src/schemas/stacks/payload.ts @@ -102,7 +102,11 @@ export const StacksEventSchema = Type.Object({ }); export type StacksEvent = Static; -export const StacksNonConsensusEventSchema = Type.Union([StacksSignerMessageEventSchema]); +export const StacksNonConsensusEventSchema = Type.Object({ + payload: Type.Union([StacksSignerMessageEventSchema]), + received_at: Type.Integer(), + received_at_block: BlockIdentifierSchema, +}); export type StacksNonConsensusEvent = Static; export const StacksPayloadSchema = Type.Object({ diff --git a/components/client/typescript/src/schemas/stacks/signers.ts b/components/client/typescript/src/schemas/stacks/signers.ts index 701486901..2982f7044 100644 --- a/components/client/typescript/src/schemas/stacks/signers.ts +++ b/components/client/typescript/src/schemas/stacks/signers.ts @@ -1,6 +1,4 @@ import { Static, Type } from '@fastify/type-provider-typebox'; -import { BlockIdentifierSchema } from '../common'; -import { StacksTransactionSchema } from './payload'; export const StacksNakamotoBlockHeaderSchema = Type.Object({ version: Type.Integer(), @@ -19,7 +17,8 @@ export type StacksNakamotoBlockHeader = Static; @@ -94,11 +93,12 @@ export const StacksSignerMessageSchema = Type.Union([ export type StacksSignerMessage = Static; export const StacksSignerMessageEventSchema = Type.Object({ - contract: Type.String(), - sig: Type.String(), - pubkey: Type.String(), - message: StacksSignerMessageSchema, - received_at: Type.Integer(), - received_at_block: BlockIdentifierSchema, + type: Type.Literal('signer_message'), + data: Type.Object({ + contract: Type.String(), + sig: Type.String(), + pubkey: Type.String(), + message: StacksSignerMessageSchema, + }), }); export type StacksSignerMessageEvent = Static;