diff --git a/CHANGELOG.md b/CHANGELOG.md index c1ce5c1..4111a41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ Notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +## Fixed + +- Block announcements via `inv` trigger a `getheaders` request instead of being ignored + ## 0.6.0 ## Changed diff --git a/src/network/mod.rs b/src/network/mod.rs index 6073642..0125617 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -448,6 +448,7 @@ pub(crate) enum PeerMessage { FilterHeaders(CFHeaders), Filter(CFilter), Block(Block), + NewBlocks(Vec), FeeFilter(FeeRate), } diff --git a/src/network/peer.rs b/src/network/peer.rs index 9cfc76c..6808e0f 100644 --- a/src/network/peer.rs +++ b/src/network/peer.rs @@ -259,6 +259,15 @@ impl Peer { .await?; Ok(()) } + ReaderMessage::NewBlocks(block_hashes) => { + self.main_thread_sender + .send(PeerThreadMessage { + nonce: self.nonce, + message: PeerMessage::NewBlocks(block_hashes), + }) + .await?; + Ok(()) + } ReaderMessage::GetData(requests) => { let mut tx_queue = self.tx_queue.lock().await; for inv in requests { diff --git a/src/network/reader.rs b/src/network/reader.rs index 0f95593..4937b8a 100644 --- a/src/network/reader.rs +++ b/src/network/reader.rs @@ -9,7 +9,7 @@ use bitcoin::{ message_network::VersionMessage, ServiceFlags, }, - Block, + Block, BlockHash, }; use bitcoin::{FeeRate, Wtxid}; use tokio::io::AsyncBufReadExt; @@ -59,7 +59,19 @@ impl Reader { if inventory.len() > MAX_INV { return Some(ReaderMessage::Disconnect); } - None + let blocks: Vec = inventory + .into_iter() + .filter_map(|inv| match inv { + Inventory::Block(hash) + | Inventory::CompactBlock(hash) + | Inventory::WitnessBlock(hash) => Some(hash), + _ => None, + }) + .collect(); + if blocks.is_empty() { + return None; + } + Some(ReaderMessage::NewBlocks(blocks)) } NetworkMessage::GetData(inventory) => Some(ReaderMessage::GetData(inventory)), NetworkMessage::NotFound(_) => None, @@ -146,6 +158,7 @@ pub(in crate::network) enum ReaderMessage { FilterHeaders(CFHeaders), Filter(CFilter), Block(Block), + NewBlocks(Vec), Reject(RejectPayload), Disconnect, Verack, @@ -171,3 +184,40 @@ impl ReaderMessage { } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_reader() -> Reader { + let (tx, _rx) = tokio::sync::mpsc::channel(1); + Reader::new( + MessageParser::V1(tokio::io::empty(), bitcoin::Network::Regtest), + tx, + ) + } + + #[test] + fn inv_parsing_surfaces_only_block_hashes() { + let reader = test_reader(); + let block = BlockHash::from_byte_array([1; 32]); + let witness_block = BlockHash::from_byte_array([2; 32]); + let txid = bitcoin::Txid::from_byte_array([3; 32]); + // Mixed inventory: only block hashes surface, in order. + let parsed = reader.parse_message(NetworkMessage::Inv(vec![ + Inventory::Transaction(txid), + Inventory::Block(block), + Inventory::WitnessBlock(witness_block), + ])); + assert!( + matches!(parsed, Some(ReaderMessage::NewBlocks(hashes)) if hashes == vec![block, witness_block]) + ); + // Transaction-only inventory remains ignored. + let parsed = reader.parse_message(NetworkMessage::Inv(vec![Inventory::Transaction(txid)])); + assert!(parsed.is_none()); + // Oversized inventory still disconnects. + let oversized = vec![Inventory::Block(block); MAX_INV + 1]; + let parsed = reader.parse_message(NetworkMessage::Inv(oversized)); + assert!(matches!(parsed, Some(ReaderMessage::Disconnect))); + } +} diff --git a/src/node.rs b/src/node.rs index ea35773..5d0a0ea 100644 --- a/src/node.rs +++ b/src/node.rs @@ -188,6 +188,15 @@ impl Node { } None => continue, }, + PeerMessage::NewBlocks(blocks) => { + crate::debug!(format!("[{}]: inv", peer_thread.nonce)); + match self.handle_inventory_blocks(blocks) { + Some(response) => { + self.peer_map.send_message(peer_thread.nonce, response).await; + } + None => continue, + } + } PeerMessage::FeeFilter(feerate) => { self.peer_map.set_broadcast_min(peer_thread.nonce, feerate); } @@ -599,6 +608,32 @@ impl Node { None } + // A peer announced new blocks with an `inv` instead of `headers`. Bitcoin Core + // falls back to inv-of-tip, even after BIP-130 `sendheaders`, when more than + // eight blocks connect in a single announcement round or when a block queued + // for announcement was reorganized away. Probe the announcing peer with + // `getheaders` and let the response drive any state changes through the usual + // `handle_headers` path. Deliberately no `NodeState` mutation, no filter queue + // changes, no tip assumption, and no `LastBlockMonitor` reset on the inv itself. + fn handle_inventory_blocks(&mut self, blocks: Vec) -> Option { + // A header sync is already in progress. + if self.state == NodeState::Behind { + return None; + } + if blocks + .into_iter() + .all(|block| self.chain.header_chain.contains(block)) + { + return None; + } + let next_headers = GetHeadersMessage { + version: WTXID_VERSION, + locator_hashes: self.chain.header_chain.locators(), + stop_hash: BlockHash::all_zeros(), + }; + Some(MainThreadMessage::GetHeaders(next_headers)) + } + // Clear the filter hash cache and redownload the filters. fn rescan(&mut self, height_opt: Option) -> Option { match self.state { diff --git a/tests/core.rs b/tests/core.rs index 05cbea7..f794edf 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -724,3 +724,37 @@ async fn whitelist_only_sync() { requester.shutdown().unwrap(); rpc.stop().unwrap(); } + +#[tokio::test] +async fn inv_fallback_after_burst_mine() { + let (bitcoind, socket_addr) = start_bitcoind(true).unwrap(); + let rpc = &bitcoind.client; + let tempdir = tempfile::TempDir::new().unwrap().path().to_owned(); + let miner = rpc.new_address().unwrap(); + mine_blocks(rpc, &miner, 10, 2).await; + let best = best_hash(rpc); + let (node, client) = new_node( + socket_addr, + tempdir, + ChainState::Checkpoint(HashCheckpoint::from_genesis(bitcoin::Network::Regtest)), + ); + tokio::task::spawn(async move { node.run().await }); + let Client { + requester, + info_rx, + warn_rx, + event_rx: mut channel, + } = client; + tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await }); + sync_assert(&best, &mut channel).await; + invalidate_block(rpc, &best).await; + let stale = best_hash(rpc); + invalidate_block(rpc, &stale).await; + mine_blocks(rpc, &miner, 9, 1).await; + let best = best_hash(rpc); + tokio::time::timeout(Duration::from_secs(120), sync_assert(&best, &mut channel)) + .await + .expect("node did not learn the new tip after a block burst"); + requester.shutdown().unwrap(); + rpc.stop().unwrap(); +}