Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ Notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

## Fixed

- Block announcements via `inv` trigger a `getheaders` request instead of being ignored

## 0.6.0

## Changed
Expand Down
1 change: 1 addition & 0 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ pub(crate) enum PeerMessage {
FilterHeaders(CFHeaders),
Filter(CFilter),
Block(Block),
NewBlocks(Vec<BlockHash>),
FeeFilter(FeeRate),
}

Expand Down
9 changes: 9 additions & 0 deletions src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ impl Peer {
.await?;
Ok(())
}
ReaderMessage::NewBlocks(block_hashes) => {
self.main_thread_sender
.send(PeerThreadMessage {
nonce: self.nonce,
message: PeerMessage::NewBlocks(block_hashes),
})
.await?;
Ok(())
}
ReaderMessage::GetData(requests) => {
let mut tx_queue = self.tx_queue.lock().await;
for inv in requests {
Expand Down
54 changes: 52 additions & 2 deletions src/network/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bitcoin::{
message_network::VersionMessage,
ServiceFlags,
},
Block,
Block, BlockHash,
};
use bitcoin::{FeeRate, Wtxid};
use tokio::io::AsyncBufReadExt;
Expand Down Expand Up @@ -59,7 +59,19 @@ impl<R: AsyncBufReadExt + Send + Sync + Unpin> Reader<R> {
if inventory.len() > MAX_INV {
return Some(ReaderMessage::Disconnect);
}
None
let blocks: Vec<BlockHash> = inventory
.into_iter()
.filter_map(|inv| match inv {
Inventory::Block(hash)
| Inventory::CompactBlock(hash)
| Inventory::WitnessBlock(hash) => Some(hash),
_ => None,
})
.collect();
if blocks.is_empty() {
return None;
}
Some(ReaderMessage::NewBlocks(blocks))
}
NetworkMessage::GetData(inventory) => Some(ReaderMessage::GetData(inventory)),
NetworkMessage::NotFound(_) => None,
Expand Down Expand Up @@ -146,6 +158,7 @@ pub(in crate::network) enum ReaderMessage {
FilterHeaders(CFHeaders),
Filter(CFilter),
Block(Block),
NewBlocks(Vec<BlockHash>),
Reject(RejectPayload),
Disconnect,
Verack,
Expand All @@ -171,3 +184,40 @@ impl ReaderMessage {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

fn test_reader() -> Reader<tokio::io::Empty> {
let (tx, _rx) = tokio::sync::mpsc::channel(1);
Reader::new(
MessageParser::V1(tokio::io::empty(), bitcoin::Network::Regtest),
tx,
)
}

#[test]
fn inv_parsing_surfaces_only_block_hashes() {
let reader = test_reader();
let block = BlockHash::from_byte_array([1; 32]);
let witness_block = BlockHash::from_byte_array([2; 32]);
let txid = bitcoin::Txid::from_byte_array([3; 32]);
// Mixed inventory: only block hashes surface, in order.
let parsed = reader.parse_message(NetworkMessage::Inv(vec![
Inventory::Transaction(txid),
Inventory::Block(block),
Inventory::WitnessBlock(witness_block),
]));
assert!(
matches!(parsed, Some(ReaderMessage::NewBlocks(hashes)) if hashes == vec![block, witness_block])
);
// Transaction-only inventory remains ignored.
let parsed = reader.parse_message(NetworkMessage::Inv(vec![Inventory::Transaction(txid)]));
assert!(parsed.is_none());
// Oversized inventory still disconnects.
let oversized = vec![Inventory::Block(block); MAX_INV + 1];
let parsed = reader.parse_message(NetworkMessage::Inv(oversized));
assert!(matches!(parsed, Some(ReaderMessage::Disconnect)));
}
}
35 changes: 35 additions & 0 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ impl Node {
}
None => continue,
},
PeerMessage::NewBlocks(blocks) => {
crate::debug!(format!("[{}]: inv", peer_thread.nonce));
match self.handle_inventory_blocks(blocks) {
Some(response) => {
self.peer_map.send_message(peer_thread.nonce, response).await;
}
None => continue,
}
}
PeerMessage::FeeFilter(feerate) => {
self.peer_map.set_broadcast_min(peer_thread.nonce, feerate);
}
Expand Down Expand Up @@ -599,6 +608,32 @@ impl Node {
None
}

// A peer announced new blocks with an `inv` instead of `headers`. Bitcoin Core
// falls back to inv-of-tip, even after BIP-130 `sendheaders`, when more than
// eight blocks connect in a single announcement round or when a block queued
// for announcement was reorganized away. Probe the announcing peer with
// `getheaders` and let the response drive any state changes through the usual
// `handle_headers` path. Deliberately no `NodeState` mutation, no filter queue
// changes, no tip assumption, and no `LastBlockMonitor` reset on the inv itself.
fn handle_inventory_blocks(&mut self, blocks: Vec<BlockHash>) -> Option<MainThreadMessage> {
// A header sync is already in progress.
if self.state == NodeState::Behind {
return None;
}
if blocks
.into_iter()
.all(|block| self.chain.header_chain.contains(block))
{
return None;
}
let next_headers = GetHeadersMessage {
version: WTXID_VERSION,
locator_hashes: self.chain.header_chain.locators(),
stop_hash: BlockHash::all_zeros(),
};
Some(MainThreadMessage::GetHeaders(next_headers))
}

// Clear the filter hash cache and redownload the filters.
fn rescan(&mut self, height_opt: Option<u32>) -> Option<MainThreadMessage> {
match self.state {
Expand Down
34 changes: 34 additions & 0 deletions tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,3 +724,37 @@ async fn whitelist_only_sync() {
requester.shutdown().unwrap();
rpc.stop().unwrap();
}

#[tokio::test]
async fn inv_fallback_after_burst_mine() {
let (bitcoind, socket_addr) = start_bitcoind(true).unwrap();
let rpc = &bitcoind.client;
let tempdir = tempfile::TempDir::new().unwrap().path().to_owned();
let miner = rpc.new_address().unwrap();
mine_blocks(rpc, &miner, 10, 2).await;
let best = best_hash(rpc);
let (node, client) = new_node(
socket_addr,
tempdir,
ChainState::Checkpoint(HashCheckpoint::from_genesis(bitcoin::Network::Regtest)),
);
tokio::task::spawn(async move { node.run().await });
let Client {
requester,
info_rx,
warn_rx,
event_rx: mut channel,
} = client;
tokio::task::spawn(async move { print_logs(info_rx, warn_rx).await });
sync_assert(&best, &mut channel).await;
invalidate_block(rpc, &best).await;
let stale = best_hash(rpc);
invalidate_block(rpc, &stale).await;
mine_blocks(rpc, &miner, 9, 1).await;
let best = best_hash(rpc);
tokio::time::timeout(Duration::from_secs(120), sync_assert(&best, &mut channel))
.await
.expect("node did not learn the new tip after a block burst");
requester.shutdown().unwrap();
rpc.stop().unwrap();
}
Loading