Skip to content

Commit

Permalink
fix(brc20): verify ordinal transfers in chunks instead of individually (
Browse files Browse the repository at this point in the history
#394)

* chore: group transfers

* fix: finish integration

* fix: chunk query

* chunk size

* test: indexing

* fix: comments
  • Loading branch information
rafaelcr authored Feb 5, 2025
1 parent 51d3a76 commit fe842e2
Show file tree
Hide file tree
Showing 7 changed files with 728 additions and 366 deletions.
5 changes: 5 additions & 0 deletions components/chainhook-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ pub use tokio_postgres;

use tokio_postgres::{Client, Config, NoTls, Row};

/// Standard chunk size to use when we're batching multiple query inserts into a single SQL statement to save on DB round trips.
/// This number is designed to not hit the postgres limit of 65536 query parameters in a single SQL statement, but results may
/// vary depending on column counts. Queries should use other custom chunk sizes as needed.
pub const BATCH_QUERY_CHUNK_SIZE: usize = 500;

/// A Postgres configuration for a single database.
#[derive(Clone, Debug)]
pub struct PgConnectionConfig {
Expand Down
77 changes: 54 additions & 23 deletions components/ordhook-core/src/core/meta_protocols/brc20/brc20_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chainhook_postgres::{
deadpool_postgres::GenericClient,
tokio_postgres::{types::ToSql, Client},
types::{PgNumericU128, PgNumericU64},
utils, FromPgRow,
utils, FromPgRow, BATCH_QUERY_CHUNK_SIZE,
};
use chainhook_sdk::types::{
BitcoinBlockData, Brc20BalanceData, Brc20Operation, Brc20TokenDeployData, Brc20TransferData,
Expand Down Expand Up @@ -79,24 +79,43 @@ pub async fn get_token_available_balance_for_address<T: GenericClient>(
Ok(Some(supply.0))
}

pub async fn get_unsent_token_transfer<T: GenericClient>(
ordinal_number: u64,
pub async fn get_unsent_token_transfers<T: GenericClient>(
ordinal_numbers: &Vec<u64>,
client: &T,
) -> Result<Option<DbOperation>, String> {
let row = client
.query_opt(
"SELECT * FROM operations
WHERE ordinal_number = $1 AND operation = 'transfer'
AND NOT EXISTS (SELECT 1 FROM operations WHERE ordinal_number = $1 AND operation = 'transfer_send')
LIMIT 1",
&[&PgNumericU64(ordinal_number)],
)
.await
.map_err(|e| format!("get_unsent_token_transfer: {e}"))?;
let Some(row) = row else {
return Ok(None);
};
Ok(Some(DbOperation::from_pg_row(&row)))
) -> Result<Vec<DbOperation>, String> {
if ordinal_numbers.is_empty() {
return Ok(vec![]);
}
let mut results = vec![];
// We can afford a larger chunk size here because we're only using one parameter per ordinal number value.
for chunk in ordinal_numbers.chunks(5000) {
let mut wrapped = Vec::with_capacity(chunk.len());
for n in chunk {
wrapped.push(PgNumericU64(*n));
}
let mut params = vec![];
for number in wrapped.iter() {
params.push(number);
}
let rows = client
.query(
"SELECT *
FROM operations o
WHERE operation = 'transfer'
AND o.ordinal_number = ANY($1)
AND NOT EXISTS (
SELECT 1 FROM operations
WHERE ordinal_number = o.ordinal_number
AND operation = 'transfer_send'
)
LIMIT 1",
&[&params],
)
.await
.map_err(|e| format!("get_unsent_token_transfers: {e}"))?;
results.extend(rows.iter().map(|row| DbOperation::from_pg_row(row)));
}
Ok(results)
}

pub async fn insert_tokens<T: GenericClient>(
Expand All @@ -106,7 +125,7 @@ pub async fn insert_tokens<T: GenericClient>(
if tokens.len() == 0 {
return Ok(());
}
for chunk in tokens.chunks(500) {
for chunk in tokens.chunks(BATCH_QUERY_CHUNK_SIZE) {
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
for row in chunk.iter() {
params.push(&row.ticker);
Expand Down Expand Up @@ -148,7 +167,7 @@ pub async fn insert_operations<T: GenericClient>(
if operations.len() == 0 {
return Ok(());
}
for chunk in operations.chunks(500) {
for chunk in operations.chunks(BATCH_QUERY_CHUNK_SIZE) {
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
for row in chunk.iter() {
params.push(&row.ticker);
Expand Down Expand Up @@ -253,7 +272,11 @@ pub async fn update_address_operation_counts<T: GenericClient>(
if counts.len() == 0 {
return Ok(());
}
for chunk in counts.keys().collect::<Vec<&String>>().chunks(500) {
for chunk in counts
.keys()
.collect::<Vec<&String>>()
.chunks(BATCH_QUERY_CHUNK_SIZE)
{
let mut params: Vec<&(dyn ToSql + Sync)> = vec![];
let mut insert_rows = 0;
for address in chunk {
Expand Down Expand Up @@ -287,7 +310,11 @@ pub async fn update_token_operation_counts<T: GenericClient>(
if counts.len() == 0 {
return Ok(());
}
for chunk in counts.keys().collect::<Vec<&String>>().chunks(500) {
for chunk in counts
.keys()
.collect::<Vec<&String>>()
.chunks(BATCH_QUERY_CHUNK_SIZE)
{
let mut converted = HashMap::new();
for tick in chunk {
converted.insert(*tick, counts.get(*tick).unwrap().to_string());
Expand Down Expand Up @@ -324,7 +351,11 @@ pub async fn update_token_minted_supplies<T: GenericClient>(
if supplies.len() == 0 {
return Ok(());
}
for chunk in supplies.keys().collect::<Vec<&String>>().chunks(500) {
for chunk in supplies
.keys()
.collect::<Vec<&String>>()
.chunks(BATCH_QUERY_CHUNK_SIZE)
{
let mut converted = HashMap::new();
for tick in chunk {
converted.insert(*tick, supplies.get(*tick).unwrap().0.to_string());
Expand Down
61 changes: 39 additions & 22 deletions components/ordhook-core/src/core/meta_protocols/brc20/cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{collections::HashMap, num::NonZeroUsize};
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
};

use chainhook_postgres::{
deadpool_postgres::GenericClient,
Expand Down Expand Up @@ -146,30 +149,44 @@ impl Brc20MemoryCache {
return Ok(None);
}

pub async fn get_unsent_token_transfer<T: GenericClient>(
pub async fn get_unsent_token_transfers<T: GenericClient>(
&mut self,
ordinal_number: u64,
ordinal_numbers: &Vec<&u64>,
client: &T,
) -> Result<Option<DbOperation>, String> {
// Use `get` instead of `contains` so we promote this value in the LRU.
if let Some(_) = self.ignored_inscriptions.get(&ordinal_number) {
return Ok(None);
}
if let Some(row) = self.unsent_transfers.get(&ordinal_number) {
return Ok(Some(row.clone()));
) -> Result<Vec<DbOperation>, String> {
let mut results = vec![];
let mut cache_missed_ordinal_numbers = HashSet::new();
for ordinal_number in ordinal_numbers.iter() {
// Use `get` instead of `contains` so we promote this value in the LRU.
if let Some(_) = self.ignored_inscriptions.get(*ordinal_number) {
continue;
}
if let Some(row) = self.unsent_transfers.get(*ordinal_number) {
results.push(row.clone());
} else {
cache_missed_ordinal_numbers.insert(**ordinal_number);
}
}
self.handle_cache_miss(client).await?;
match brc20_pg::get_unsent_token_transfer(ordinal_number, client).await? {
Some(row) => {
self.unsent_transfers.put(ordinal_number, row.clone());
return Ok(Some(row));
if !cache_missed_ordinal_numbers.is_empty() {
// Some ordinal numbers were not in cache, check DB.
self.handle_cache_miss(client).await?;
let pending_transfers = brc20_pg::get_unsent_token_transfers(
&cache_missed_ordinal_numbers.iter().cloned().collect(),
client,
)
.await?;
for unsent_transfer in pending_transfers.into_iter() {
cache_missed_ordinal_numbers.remove(&unsent_transfer.ordinal_number.0);
self.unsent_transfers
.put(unsent_transfer.ordinal_number.0, unsent_transfer.clone());
results.push(unsent_transfer);
}
None => {
// Inscription is not relevant for BRC20.
self.ignore_inscription(ordinal_number);
return Ok(None);
// Ignore all irrelevant numbers.
for irrelevant_number in cache_missed_ordinal_numbers.iter() {
self.ignore_inscription(*irrelevant_number);
}
}
return Ok(results);
}

/// Marks an ordinal number as ignored so we don't bother computing its transfers for BRC20 purposes.
Expand Down Expand Up @@ -456,12 +473,12 @@ impl Brc20MemoryCache {
return Ok(transfer.clone());
}
self.handle_cache_miss(client).await?;
let Some(transfer) = brc20_pg::get_unsent_token_transfer(ordinal_number, client).await?
else {
let transfers = brc20_pg::get_unsent_token_transfers(&vec![ordinal_number], client).await?;
let Some(transfer) = transfers.first() else {
unreachable!("Invalid transfer ordinal number {}", ordinal_number)
};
self.unsent_transfers.put(ordinal_number, transfer.clone());
return Ok(transfer);
return Ok(transfer.clone());
}

async fn handle_cache_miss<T: GenericClient>(&mut self, client: &T) -> Result<(), String> {
Expand Down
Loading

0 comments on commit fe842e2

Please sign in to comment.