Skip to content

Commit

Permalink
fix: improve error handling, and more! (#524)
Browse files Browse the repository at this point in the history
This PR introduces a few fixes in an effort to improve reliability and
debugging problems when running Chainhook as a service:
- Revisits log levels throughout the tool (fixes #498, fixes #521). The
general approach for the logs were:
- `crit` - fatal errors that will crash mission critical component of
Chainhook. In these cases, Chainhook should automatically kill all main
threads (not individual scanning threads, which is tracked by #404) to
crash the service.
- `erro` - something went wrong the could lead to a critical error, or
that could impact all users
- `warn` - something went wrong that could impact an end user (usually
due to user error)
- `info` - control flow logging and updates on the state of _all_
registered predicates
   - `debug` - updates on the state of _a_ predicate
- Crash the service if a mission critical thread fails (see
#517 (comment)
for a list of these threads). Previously, if one of these threads
failed, the remaining services would keep running. For example, if the
event observer handler crashed, the event observer API would keep
running. This means that the stacks node is successfully emitting blocks
that Chainhook is acknowledging but not ingesting. This causes gaps in
our database Fixes #517
- Removes an infinite loop with bitcoin ingestion, crashing the service
instead: Fixes #506
- Fixes how we delete predicates from our db when one is deregistered.
This should reduce the number of logs we have on startup. Fixes #510
 - Warns on all reorgs. Fixes #519
  • Loading branch information
MicaiahReid authored Mar 27, 2024
1 parent a2865b7 commit d6b8816
Show file tree
Hide file tree
Showing 19 changed files with 461 additions and 246 deletions.
44 changes: 23 additions & 21 deletions components/chainhook-cli/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
println!("{}", e.to_string());
});

let remote_sha_url = config.expected_remote_stacks_tsv_sha256();
let remote_sha_url = config.expected_remote_stacks_tsv_sha256()?;
let res = reqwest::get(&remote_sha_url)
.await
.or(Err(format!("Failed to GET from '{}'", &remote_sha_url)))?
Expand All @@ -34,7 +34,7 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {

write_file_content_at_path(&local_sha_file_path, &res.to_vec())?;

let file_url = config.expected_remote_stacks_tsv_url();
let file_url = config.expected_remote_stacks_tsv_url()?;
let res = reqwest::get(&file_url)
.await
.or(Err(format!("Failed to GET from '{}'", &file_url)))?;
Expand All @@ -55,14 +55,17 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
Ok(0) => break,
Ok(n) => {
if let Err(e) = file.write_all(&buffer[..n]) {
let err =
format!("unable to update compressed archive: {}", e.to_string());
return Err(err);
return Err(format!(
"unable to update compressed archive: {}",
e.to_string()
));
}
}
Err(e) => {
let err = format!("unable to write compressed archive: {}", e.to_string());
return Err(err);
return Err(format!(
"unable to write compressed archive: {}",
e.to_string()
));
}
}
}
Expand All @@ -83,12 +86,11 @@ pub async fn download_tsv_file(config: &Config) -> Result<(), String> {
.map_err(|e| format!("unable to download stacks archive: {}", e.to_string()))?;
}
drop(tx);

tokio::task::spawn_blocking(|| decoder_thread.join())
.await
.unwrap()
.unwrap()
.unwrap();
.map_err(|e| format!("failed to spawn thread: {e}"))?
.map_err(|e| format!("decoder thread failed when downloading tsv: {:?}", e))?
.map_err(|e| format!("failed to download tsv: {}", e))?;
}

Ok(())
Expand Down Expand Up @@ -124,11 +126,14 @@ impl Read for ChannelRead {
}
}

pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Context) -> bool {
pub async fn download_stacks_dataset_if_required(
config: &mut Config,
ctx: &Context,
) -> Result<bool, String> {
if config.is_initial_ingestion_required() {
// Download default tsv.
if config.rely_on_remote_stacks_tsv() && config.should_download_remote_stacks_tsv() {
let url = config.expected_remote_stacks_tsv_url();
let url = config.expected_remote_stacks_tsv_url()?;
let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
let mut tsv_sha_file_path = config.expected_cache_path();
Expand All @@ -137,7 +142,7 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
// Download archive if not already present in cache
// Load the local
let local_sha_file = read_file_content_at_path(&tsv_sha_file_path);
let sha_url = config.expected_remote_stacks_tsv_sha256();
let sha_url = config.expected_remote_stacks_tsv_sha256()?;

let remote_sha_file = match reqwest::get(&sha_url).await {
Ok(response) => response.bytes().await,
Expand All @@ -164,28 +169,25 @@ pub async fn download_stacks_dataset_if_required(config: &mut Config, ctx: &Cont
"Stacks archive file already up to date"
);
config.add_local_stacks_tsv_source(&tsv_file_path);
return false;
return Ok(false);
}

info!(ctx.expect_logger(), "Downloading {}", url);
match download_tsv_file(&config).await {
Ok(_) => {}
Err(e) => {
error!(ctx.expect_logger(), "{}", e);
std::process::exit(1);
}
Err(e) => return Err(e),
}
info!(ctx.expect_logger(), "Successfully downloaded tsv file");
config.add_local_stacks_tsv_source(&tsv_file_path);
}
true
Ok(true)
} else {
info!(
ctx.expect_logger(),
"Streaming blocks from stacks-node {}",
config.network.get_stacks_node_config().rpc_url
);
false
Ok(false)
}
}

Expand Down
10 changes: 8 additions & 2 deletions components/chainhook-cli/src/archive/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ async fn it_downloads_stacks_dataset_if_required() {
tracer: false,
};
let mut config_clone = config.clone();
assert!(download_stacks_dataset_if_required(&mut config, &ctx).await);
assert!(!download_stacks_dataset_if_required(&mut config_clone, &ctx).await);
assert!(download_stacks_dataset_if_required(&mut config, &ctx)
.await
.unwrap());
assert!(
!download_stacks_dataset_if_required(&mut config_clone, &ctx)
.await
.unwrap()
);

let mut tsv_file_path = config.expected_cache_path();
tsv_file_path.push(default_tsv_file_path(&config.network.stacks_network));
Expand Down
4 changes: 2 additions & 2 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ pub fn main() {
let opts: Opts = match Opts::try_parse() {
Ok(opts) => opts,
Err(e) => {
error!(ctx.expect_logger(), "{e}");
crit!(ctx.expect_logger(), "{e}");
process::exit(1);
}
};

match hiro_system_kit::nestable_block_on(handle_command(opts, ctx.clone())) {
Err(e) => {
error!(ctx.expect_logger(), "{e}");
crit!(ctx.expect_logger(), "{e}");
process::exit(1);
}
Ok(_) => {}
Expand Down
22 changes: 12 additions & 10 deletions components/chainhook-cli/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ impl Config {
}
}

pub fn expected_local_stacks_tsv_file(&self) -> &PathBuf {
pub fn expected_local_stacks_tsv_file(&self) -> Result<&PathBuf, String> {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvPath(config) = source {
return &config.file_path;
return Ok(&config.file_path);
}
}
panic!("expected local-tsv source")
Err("could not find expected local tsv source")?
}

pub fn expected_cache_path(&self) -> PathBuf {
Expand All @@ -271,21 +271,23 @@ impl Config {
destination_path
}

fn expected_remote_stacks_tsv_base_url(&self) -> &String {
fn expected_remote_stacks_tsv_base_url(&self) -> Result<&String, String> {
for source in self.event_sources.iter() {
if let EventSourceConfig::StacksTsvUrl(config) = source {
return &config.file_url;
return Ok(&config.file_url);
}
}
panic!("expected remote-tsv source")
Err("could not find expected remote tsv source")?
}

pub fn expected_remote_stacks_tsv_sha256(&self) -> String {
format!("{}.sha256", self.expected_remote_stacks_tsv_base_url())
pub fn expected_remote_stacks_tsv_sha256(&self) -> Result<String, String> {
self.expected_remote_stacks_tsv_base_url()
.map(|url| format!("{}.sha256", url))
}

pub fn expected_remote_stacks_tsv_url(&self) -> String {
format!("{}.gz", self.expected_remote_stacks_tsv_base_url())
pub fn expected_remote_stacks_tsv_url(&self) -> Result<String, String> {
self.expected_remote_stacks_tsv_base_url()
.map(|url| format!("{}.gz", url))
}

pub fn rely_on_remote_stacks_tsv(&self) -> bool {
Expand Down
28 changes: 21 additions & 7 deletions components/chainhook-cli/src/config/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,48 @@ fn should_download_remote_stacks_tsv_handles_both_modes() {
}

#[test]
#[should_panic(expected = "expected remote-tsv source")]
fn expected_remote_stacks_tsv_base_url_panics_if_missing() {
let url_src = EventSourceConfig::StacksTsvUrl(super::UrlConfig {
file_url: format!("test"),
});
let mut config = Config::default(true, false, false, &None).unwrap();

config.event_sources = vec![url_src.clone()];
assert_eq!(config.expected_remote_stacks_tsv_base_url(), "test");
match config.expected_remote_stacks_tsv_base_url() {
Ok(tsv_url) => assert_eq!(tsv_url, "test"),
Err(e) => {
panic!("expected tsv file: {e}")
}
}

config.event_sources = vec![];
config.expected_remote_stacks_tsv_base_url();
match config.expected_remote_stacks_tsv_base_url() {
Ok(tsv_url) => panic!("expected no tsv file, found {}", tsv_url),
Err(e) => assert_eq!(e, "could not find expected remote tsv source".to_string()),
};
}

#[test]
#[should_panic(expected = "expected local-tsv source")]
fn expected_local_stacks_tsv_base_url_panics_if_missing() {
fn expected_local_stacks_tsv_base_url_errors_if_missing() {
let path = PathBuf::from("test");
let path_src = EventSourceConfig::StacksTsvPath(PathConfig {
file_path: path.clone(),
});
let mut config = Config::default(true, false, false, &None).unwrap();

config.event_sources = vec![path_src.clone()];
assert_eq!(config.expected_local_stacks_tsv_file(), &path);
match config.expected_local_stacks_tsv_file() {
Ok(tsv_path) => assert_eq!(tsv_path, &path),
Err(e) => {
panic!("expected tsv file: {e}")
}
}

config.event_sources = vec![];
config.expected_local_stacks_tsv_file();
match config.expected_local_stacks_tsv_file() {
Ok(tsv_path) => panic!("expected no tsv file, found {}", tsv_path.to_string_lossy()),
Err(e) => assert_eq!(e, "could not find expected local tsv source".to_string()),
};
}

#[test]
Expand Down
13 changes: 9 additions & 4 deletions components/chainhook-cli/src/scan/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
config: &Config,
ctx: &Context,
) -> Result<bool, String> {
let predicate_uuid = &predicate_spec.uuid;
let auth = Auth::UserPass(
config.network.bitcoind_rpc_username.clone(),
config.network.bitcoind_rpc_password.clone(),
Expand Down Expand Up @@ -71,9 +72,9 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(
PredicatesApi::Off => None,
};

info!(
debug!(
ctx.expect_logger(),
"Starting predicate evaluation on Bitcoin blocks",
"Starting predicate evaluation on Bitcoin blocks for predicate {predicate_uuid}",
);

let mut last_block_scanned = BlockIdentifier::default();
Expand Down Expand Up @@ -200,7 +201,7 @@ pub async fn scan_bitcoin_chainstate_via_rpc_using_predicate(

info!(
ctx.expect_logger(),
"{number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered"
"Predicate {predicate_uuid} scan completed. {number_of_blocks_scanned} blocks scanned, {actions_triggered} actions triggered."
);

if let Some(ref mut predicates_db_conn) = predicates_db_conn {
Expand Down Expand Up @@ -269,9 +270,13 @@ pub async fn execute_predicates_action<'a>(
if trigger.chainhook.include_proof {
gather_proofs(&trigger, &mut proofs, &config, &ctx);
}
let predicate_uuid = &trigger.chainhook.uuid;
match handle_bitcoin_hook_action(trigger, &proofs) {
Err(e) => {
error!(ctx.expect_logger(), "unable to handle action {}", e);
warn!(
ctx.expect_logger(),
"unable to handle action for predicate {}: {}", predicate_uuid, e
);
}
Ok(action) => {
actions_triggered += 1;
Expand Down
Loading

0 comments on commit d6b8816

Please sign in to comment.