Skip to content

Commit

Permalink
Add debug query to ProcessorError::DBStoreError (#48)
Browse files Browse the repository at this point in the history
* Add debug query to processor error

* address comments
  • Loading branch information
just-in-chang authored Sep 12, 2024
1 parent 2ec35d6 commit b55f725
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ where
.eq(excluded(processor_status::last_transaction_timestamp)),
)),
Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
).await.map_err(|e| ProcessorError::DBStoreError {
message: format!("Failed to update processor status: {}", e),
})?;
).await?;
}
Ok(())
}
Expand Down
41 changes: 21 additions & 20 deletions rust/sdk-examples/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#![allow(clippy::extra_unused_lifetimes)]

use ahash::AHashMap;
use aptos_indexer_processor_sdk::utils::convert::remove_null_bytes;
use aptos_indexer_processor_sdk::utils::{convert::remove_null_bytes, errors::ProcessorError};
use diesel::{
query_builder::{AstPass, Query, QueryFragment, QueryId},
ConnectionResult, QueryResult,
Expand All @@ -20,7 +20,7 @@ use diesel_async::{
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use futures_util::{future::BoxFuture, FutureExt};
use std::sync::Arc;
use tracing::{debug, info, warn};
use tracing::{info, warn};

pub type Backend = diesel::pg::Pg;

Expand Down Expand Up @@ -129,7 +129,7 @@ pub async fn execute_in_chunks<U, T>(
build_query: fn(Vec<T>) -> (U, Option<&'static str>),
items_to_insert: &[T],
chunk_size: usize,
) -> Result<(), diesel::result::Error>
) -> Result<(), ProcessorError>
where
U: QueryFragment<Backend> + diesel::query_builder::QueryId + Send + 'static,
T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone + Send + 'static,
Expand Down Expand Up @@ -161,7 +161,7 @@ pub async fn execute_with_better_error<U>(
pool: ArcDbPool,
query: U,
mut additional_where_clause: Option<&'static str>,
) -> QueryResult<usize>
) -> Result<usize, ProcessorError>
where
U: QueryFragment<Backend> + diesel::query_builder::QueryId + Send,
{
Expand All @@ -176,19 +176,23 @@ where
where_clause: additional_where_clause,
};
let debug_string = diesel::debug_query::<Backend, _>(&final_query).to_string();
debug!("Executing query: {:?}", debug_string);
let conn = &mut pool.get().await.map_err(|e| {
warn!("Error getting connection from pool: {:?}", e);
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UnableToSendCommand,
Box::new(e.to_string()),
)
ProcessorError::DBStoreError {
message: format!("{:#}", e),
query: Some(debug_string.clone()),
}
})?;
let res = final_query.execute(conn).await;
if let Err(ref e) = res {
warn!("Error running query: {:?}\n{:?}", e, debug_string);
}
res
final_query
.execute(conn)
.await
.inspect_err(|e| {
warn!("Error running query: {:?}\n{:?}", e, debug_string);
})
.map_err(|e| ProcessorError::DBStoreError {
message: format!("{:#}", e),
query: Some(debug_string),
})
}

/// Returns the entry for the config hashmap, or the default field count for the insert
Expand Down Expand Up @@ -223,12 +227,9 @@ where
where_clause: additional_where_clause,
};
let debug_string = diesel::debug_query::<Backend, _>(&final_query).to_string();
debug!("Executing query: {:?}", debug_string);
let res = final_query.execute(conn).await;
if let Err(ref e) = res {
final_query.execute(conn).await.inspect_err(|e| {
warn!("Error running query: {:?}\n{:?}", e, debug_string);
}
res
})
}

async fn execute_or_retry_cleaned<U, T>(
Expand All @@ -237,7 +238,7 @@ async fn execute_or_retry_cleaned<U, T>(
items: Vec<T>,
query: U,
additional_where_clause: Option<&'static str>,
) -> Result<(), diesel::result::Error>
) -> Result<(), ProcessorError>
where
U: QueryFragment<Backend> + diesel::query_builder::QueryId + Send,
T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone,
Expand Down
7 changes: 5 additions & 2 deletions rust/sdk/src/utils/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub enum ProcessorError {
ProcessError { message: String },
#[error("Poll Error: {message}")]
PollError { message: String },
#[error("DB Store Error: {message}")]
DBStoreError { message: String },
#[error("DB Store Error: {message}, Query: {query:?}")]
DBStoreError {
message: String,
query: Option<String>,
},
}

0 comments on commit b55f725

Please sign in to comment.