Skip to content

Commit

Permalink
fix: streaming works
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelcr committed Oct 23, 2024
1 parent c211f08 commit 0a8002c
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 79 deletions.
2 changes: 0 additions & 2 deletions components/chainhook-cli/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
pub mod signers;

use std::collections::VecDeque;
use std::path::PathBuf;

Expand Down
58 changes: 40 additions & 18 deletions components/chainhook-sdk/src/chainhooks/stacks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,18 +744,25 @@ pub fn evaluate_stacks_chainhooks_on_chain_event<'a>(
}
#[cfg(feature = "stacks-signers")]
StacksChainEvent::ChainUpdatedWithNonConsensusEvents(data) => {
for chainhook in active_chainhooks.iter() {
evaluated_predicates.insert(chainhook.uuid.as_str(), &data.received_at_block);
let (occurrences, mut expirations) =
evaluate_stacks_predicate_on_non_consensus_events(&data.events, chainhook, ctx);
expired_predicates.append(&mut expirations);
if occurrences.len() > 0 {
triggered_predicates.push(StacksTriggerChainhook {
chainhook,
apply: vec![],
rollback: vec![],
events: occurrences,
});
if let Some(first_event) = data.events.first() {
for chainhook in active_chainhooks.iter() {
evaluated_predicates
.insert(chainhook.uuid.as_str(), &first_event.received_at_block);
let (occurrences, mut expirations) =
evaluate_stacks_predicate_on_non_consensus_events(
&data.events,
chainhook,
ctx,
);
expired_predicates.append(&mut expirations);
if occurrences.len() > 0 {
triggered_predicates.push(StacksTriggerChainhook {
chainhook,
apply: vec![],
rollback: vec![],
events: occurrences,
});
}
}
}
}
Expand Down Expand Up @@ -832,7 +839,7 @@ pub fn evaluate_stacks_predicate_on_block<'a>(
| StacksPredicate::PrintEvent(_)
| StacksPredicate::Txid(_) => unreachable!(),
#[cfg(feature = "stacks-signers")]
StacksPredicate::SignerMessage(_) => unreachable!(),
StacksPredicate::SignerMessage(_) => false,
}
}

Expand All @@ -845,17 +852,14 @@ pub fn evaluate_stacks_predicate_on_non_consensus_events<'a>(
Vec<&'a StacksNonConsensusEventData>,
BTreeMap<&'a str, &'a BlockIdentifier>,
) {
use crate::utils::AbstractStacksNonConsensusEvent;

let mut occurrences = vec![];
let expired_predicates = BTreeMap::new();
for event in events {
match &chainhook.predicate {
StacksPredicate::SignerMessage(StacksSignerMessagePredicate::AfterTimestamp(
timestamp,
)) => {
let StacksNonConsensusEventData::SignerMessage(chunk) = event;
if chunk.get_timestamp() >= *timestamp as i64 {
if event.received_at >= *timestamp {
occurrences.push(event);
}
}
Expand Down Expand Up @@ -1086,10 +1090,27 @@ pub fn evaluate_stacks_predicate_on_transaction<'a>(
}
StacksPredicate::BlockHeight(_) => unreachable!(),
#[cfg(feature = "stacks-signers")]
StacksPredicate::SignerMessage(_) => unreachable!(),
StacksPredicate::SignerMessage(_) => false,
}
}

#[cfg(feature = "stacks-signers")]
fn serialize_stacks_non_consensus_event(
event: &StacksNonConsensusEventData,
_ctx: &Context,
) -> serde_json::Value {
use chainhook_types::StacksNonConsensusEventPayloadData;

let payload = match &event.payload {
StacksNonConsensusEventPayloadData::SignerMessage(chunk) => chunk,
};
json!({
"payload": payload,
"received_at": event.received_at,
"received_at_block": event.received_at_block,
})
}

fn serialize_stacks_block(
block: &dyn AbstractStacksBlock,
transactions: Vec<&StacksTransactionData>,
Expand Down Expand Up @@ -1400,6 +1421,7 @@ pub fn serialize_stacks_payload_to_json<'a>(
"rollback": trigger.rollback.into_iter().map(|(transactions, block)| {
serialize_stacks_block(block, transactions, decode_clarity_values, include_contract_abi, ctx)
}).collect::<Vec<_>>(),
"events": trigger.events.into_iter().map(|event| serialize_stacks_non_consensus_event(event, ctx)).collect::<Vec<_>>(),
"chainhook": {
"uuid": trigger.chainhook.uuid,
"predicate": trigger.chainhook.predicate,
Expand Down
11 changes: 6 additions & 5 deletions components/chainhook-sdk/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,26 @@ impl Indexer {
) -> Result<Option<StacksChainEvent>, String> {
use chainhook_types::{
StacksChainUpdatedWithNonConsensusEventsData, StacksNonConsensusEventData,
StacksNonConsensusEventPayloadData,
};
let Some(chain_tip) = self.stacks_blocks_pool.get_canonical_fork_chain_tip() else {
return Err("StackerDB chunk received with no canonical chain tip".to_string());
};
let chunks = stacks::standardize_stacks_marshalled_stackerdb_chunks(
marshalled_stackerdb_chunks,
receipt_time,
chain_tip,
ctx,
)?;
if chunks.len() > 0 {
Ok(Some(StacksChainEvent::ChainUpdatedWithNonConsensusEvents(
StacksChainUpdatedWithNonConsensusEventsData {
events: chunks
.into_iter()
.map(|chunk| StacksNonConsensusEventData::SignerMessage(chunk))
.map(|chunk| StacksNonConsensusEventData {
payload: StacksNonConsensusEventPayloadData::SignerMessage(chunk),
received_at: receipt_time,
received_at_block: chain_tip.clone(),
})
.collect(),
received_at: receipt_time,
received_at_block: chain_tip.clone(),
},
)))
} else {
Expand Down
12 changes: 4 additions & 8 deletions components/chainhook-sdk/src/indexer/stacks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,21 +648,17 @@ pub fn standardize_stacks_microblock_trail(
#[cfg(feature = "stacks-signers")]
pub fn standardize_stacks_marshalled_stackerdb_chunks(
marshalled_stackerdb_chunks: JsonValue,
receipt_time: u64,
chain_tip: &BlockIdentifier,
_ctx: &Context,
) -> Result<Vec<StacksStackerDbChunk>, String> {
let mut stackerdb_chunks: NewStackerDbChunks =
serde_json::from_value(marshalled_stackerdb_chunks)
.map_err(|e| format!("unable to parse stackerdb chunks {e}"))?;
standardize_stacks_stackerdb_chunks(&mut stackerdb_chunks, receipt_time, chain_tip)
standardize_stacks_stackerdb_chunks(&mut stackerdb_chunks)
}

#[cfg(feature = "stacks-signers")]
pub fn standardize_stacks_stackerdb_chunks(
stackerdb_chunks: &NewStackerDbChunks,
receipt_time: u64,
chain_tip: &BlockIdentifier,
) -> Result<Vec<StacksStackerDbChunk>, String> {
use stacks_codec::codec::BlockResponse;
use stacks_codec::codec::RejectCode;
Expand Down Expand Up @@ -761,16 +757,16 @@ pub fn standardize_stacks_stackerdb_chunks(
sig: slot.sig.clone(),
pubkey: get_signer_pubkey_from_stackerdb_chunk_slot(slot, &data_bytes)?,
message,
received_at: receipt_time,
received_at_block: chain_tip.clone(),
});
}

Ok(parsed_chunks)
}

#[cfg(feature = "stacks-signers")]
pub fn standardize_stacks_nakamoto_block(block: &stacks_codec::codec::NakamotoBlock) -> NakamotoBlockData {
pub fn standardize_stacks_nakamoto_block(
block: &stacks_codec::codec::NakamotoBlock,
) -> NakamotoBlockData {
use miniscript::bitcoin::hex::Case;
use miniscript::bitcoin::hex::DisplayHex;

Expand Down
12 changes: 2 additions & 10 deletions components/chainhook-sdk/src/indexer/stacks/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ fn into_chainhook_event_rejects_invalid_missing_event() {
#[test]
#[cfg(feature = "stacks-signers")]
fn stackerdb_chunks_covert_into_signer_messages() {
use chainhook_types::{BlockIdentifier, BlockResponseData, StacksSignerMessage};
use chainhook_types::{BlockResponseData, StacksSignerMessage};

use crate::indexer::tests::helpers::stacks_events::create_new_stackerdb_chunk;

Expand All @@ -413,15 +413,7 @@ fn stackerdb_chunks_covert_into_signer_messages() {
"01fc3c06f6e0ae5b13c9bb53763661817e55c8e7f1ecab8b4d4b65b283d2dd39f0099e3ea1e25e765f4f0e1dfb0a432309a16a2ec10940e1a14cb9e9b1cbf27edc".to_string(),
"010074aff146904763a787aa14c614d0dd1fc63b537bdb2fd351cdf881f6db75f986005eb55250597b25acbf99d3dd3c2fa8189046e1b5d21309a44cbaf2b327c09b0159a01ed3f0094bfa9e5f72f5d894e12ce252081eab5396eb8bba137bddfc365b".to_string()
);
let parsed_chunk = standardize_stacks_stackerdb_chunks(
&new_chunks,
1729013425,
&BlockIdentifier {
index: 170355,
hash: "0x519df2ad0d86a62dc078865486a1f3dbb2f1a8934da81b679561738820964fe0".to_string(),
},
)
.unwrap();
let parsed_chunk = standardize_stacks_stackerdb_chunks(&new_chunks).unwrap();

assert_eq!(parsed_chunk.len(), 1);
let message = &parsed_chunk[0];
Expand Down
15 changes: 12 additions & 3 deletions components/chainhook-sdk/src/observer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ pub async fn start_bitcoin_event_observer(
let ctx_moved = ctx.clone();
let config_moved = config.clone();
let _ = hiro_system_kit::thread_named("ZMQ handler").spawn(move || {
let future = zmq::start_zeromq_runloop(&config_moved, _observer_commands_tx, &ctx_moved);
let future =
zmq::start_zeromq_runloop(&config_moved, _observer_commands_tx, &ctx_moved);
hiro_system_kit::nestable_block_on(future);
});
}
Expand Down Expand Up @@ -1658,12 +1659,20 @@ pub async fn start_observer_commands_handler(
report.track_expiration(uuid, block_identifier);
}
for entry in predicates_triggered.iter() {
let blocks_ids = entry
let mut block_ids = entry
.apply
.iter()
.map(|e| e.1.get_identifier())
.collect::<Vec<&BlockIdentifier>>();
report.track_trigger(&entry.chainhook.uuid, &blocks_ids);
let mut event_block_ids = entry
.events
.iter()
.map(|e| &e.received_at_block)
.collect::<Vec<&BlockIdentifier>>();
if event_block_ids.len() > 0 {
block_ids.append(&mut event_block_ids);
}
report.track_trigger(&entry.chainhook.uuid, &block_ids);
}
ctx.try_log(|logger| {
slog::info!(
Expand Down
19 changes: 1 addition & 18 deletions components/chainhook-sdk/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use chainhook_types::{
BitcoinBlockData, BlockHeader, BlockIdentifier, StacksBlockData, StacksMicroblockData, StacksStackerDbChunk, StacksTransactionData
BitcoinBlockData, BlockHeader, BlockIdentifier, StacksBlockData, StacksMicroblockData, StacksTransactionData
};
use hiro_system_kit::slog::{self, Logger};
use reqwest::RequestBuilder;
Expand Down Expand Up @@ -92,23 +92,6 @@ impl AbstractStacksBlock for StacksMicroblockData {
}
}

/// Trait for Stacks events that are not part of the blockchain consensus but are otherwise broadcasted by Stacks nodes to event
/// listeners.
pub trait AbstractStacksNonConsensusEvent {
fn get_timestamp(&self) -> i64;
fn get_received_at_block(&self) -> BlockIdentifier;
}

impl AbstractStacksNonConsensusEvent for StacksStackerDbChunk {
fn get_timestamp(&self) -> i64 {
self.received_at as i64
}

fn get_received_at_block(&self) -> BlockIdentifier {
self.received_at_block.clone()
}
}

pub trait AbstractBlock {
fn get_identifier(&self) -> &BlockIdentifier;
fn get_parent_identifier(&self) -> &BlockIdentifier;
Expand Down
11 changes: 8 additions & 3 deletions components/chainhook-types-rs/src/rosetta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,17 @@ pub struct BlockchainUpdatedWithReorg {

#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(tag = "type", content = "data")]
pub enum StacksNonConsensusEventData {
pub enum StacksNonConsensusEventPayloadData {
SignerMessage(StacksStackerDbChunk),
}

#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct StacksNonConsensusEventData {
pub payload: StacksNonConsensusEventPayloadData,
pub received_at: u64,
pub received_at_block: BlockIdentifier,
}

#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct BlockHeader {
pub block_identifier: BlockIdentifier,
Expand Down Expand Up @@ -699,8 +706,6 @@ pub struct BitcoinChainUpdatedWithReorgData {
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct StacksChainUpdatedWithNonConsensusEventsData {
pub events: Vec<StacksNonConsensusEventData>,
pub received_at: u64,
pub received_at_block: BlockIdentifier,
}

#[allow(dead_code)]
Expand Down
2 changes: 0 additions & 2 deletions components/chainhook-types-rs/src/signers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,4 @@ pub struct StacksStackerDbChunk {
pub sig: String,
pub pubkey: String,
pub message: StacksSignerMessage,
pub received_at: u64,
pub received_at_block: BlockIdentifier,
}
6 changes: 5 additions & 1 deletion components/client/typescript/src/schemas/stacks/payload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ export const StacksEventSchema = Type.Object({
});
export type StacksEvent = Static<typeof StacksEventSchema>;

export const StacksNonConsensusEventSchema = Type.Union([StacksSignerMessageEventSchema]);
export const StacksNonConsensusEventSchema = Type.Object({
payload: Type.Union([StacksSignerMessageEventSchema]),
received_at: Type.Integer(),
received_at_block: BlockIdentifierSchema,
});
export type StacksNonConsensusEvent = Static<typeof StacksNonConsensusEventSchema>;

export const StacksPayloadSchema = Type.Object({
Expand Down
18 changes: 9 additions & 9 deletions components/client/typescript/src/schemas/stacks/signers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { Static, Type } from '@fastify/type-provider-typebox';
import { BlockIdentifierSchema } from '../common';
import { StacksTransactionSchema } from './payload';

export const StacksNakamotoBlockHeaderSchema = Type.Object({
version: Type.Integer(),
Expand All @@ -19,7 +17,8 @@ export type StacksNakamotoBlockHeader = Static<typeof StacksNakamotoBlockHeaderS

export const StacksNakamotoBlockSchema = Type.Object({
header: StacksNakamotoBlockHeaderSchema,
transactions: Type.Array(StacksTransactionSchema),
// TODO(rafaelcr): Add transactions
// transactions: Type.Array(StacksTransactionSchema),
});
export type StacksNakamotoBlock = Static<typeof StacksNakamotoBlockSchema>;

Expand Down Expand Up @@ -94,11 +93,12 @@ export const StacksSignerMessageSchema = Type.Union([
export type StacksSignerMessage = Static<typeof StacksSignerMessageSchema>;

export const StacksSignerMessageEventSchema = Type.Object({
contract: Type.String(),
sig: Type.String(),
pubkey: Type.String(),
message: StacksSignerMessageSchema,
received_at: Type.Integer(),
received_at_block: BlockIdentifierSchema,
type: Type.Literal('signer_message'),
data: Type.Object({
contract: Type.String(),
sig: Type.String(),
pubkey: Type.String(),
message: StacksSignerMessageSchema,
}),
});
export type StacksSignerMessageEvent = Static<typeof StacksSignerMessageEventSchema>;

0 comments on commit 0a8002c

Please sign in to comment.