Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Implement eth/65 (EIP-2464)
Browse files Browse the repository at this point in the history
  • Loading branch information
vorot93 committed Apr 11, 2020
1 parent 1b23af3 commit f8b22e2
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ethcore/client-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ pub trait BlockChainClient:
/// Get transaction with given hash.
fn transaction(&self, id: TransactionId) -> Option<LocalizedTransaction>;

/// Get pool transaction with a given hash.
fn pooled_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>>;

/// Get uncle with given id.
fn uncle(&self, id: UncleId) -> Option<encoded::Header>;

Expand Down
4 changes: 4 additions & 0 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1889,6 +1889,10 @@ impl BlockChainClient for Client {
self.transaction_address(id).and_then(|address| self.chain.read().transaction(&address))
}

fn pooled_transaction(&self, hash: H256) -> Option<Arc<VerifiedTransaction>> {
self.importer.miner.transaction(&hash)
}

fn uncle(&self, id: UncleId) -> Option<encoded::Header> {
let index = id.position;
self.block_body(id.block).and_then(|body| body.view().uncle_rlp_at(index))
Expand Down
3 changes: 3 additions & 0 deletions ethcore/src/test_helpers/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,9 @@ impl BlockChainClient for TestBlockChainClient {
fn transaction(&self, _id: TransactionId) -> Option<LocalizedTransaction> {
None // Simple default.
}
fn pooled_transaction(&self, _hash: H256) -> Option<Arc<VerifiedTransaction>> {
None
}

fn uncle(&self, _id: UncleId) -> Option<encoded::Header> {
None // Simple default.
Expand Down
1 change: 1 addition & 0 deletions ethcore/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ rlp = "0.4.5"
snapshot = { path = "../snapshot" }
trace-time = "0.1"
triehash-ethereum = { version = "0.2", path = "../../util/triehash-ethereum" }
transaction-pool = "2"

[dev-dependencies]
env_logger = "0.5"
Expand Down
68 changes: 63 additions & 5 deletions ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ use crate::{
sync_packet::{
PacketInfo,
SyncPacket::{
self, BlockBodiesPacket, BlockHeadersPacket, NewBlockHashesPacket, NewBlockPacket,
PrivateStatePacket, PrivateTransactionPacket, ReceiptsPacket, SignedPrivateTransactionPacket,
SnapshotDataPacket, SnapshotManifestPacket, StatusPacket,
self, *,
}
},
BlockSet, ChainSync, ForkConfirmation, PacketDecodeError, PeerAsking, PeerInfo, SyncRequester,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
SyncState, ETH_PROTOCOL_VERSION_63, ETH_PROTOCOL_VERSION_64, ETH_PROTOCOL_VERSION_65, MAX_NEW_BLOCK_AGE, MAX_NEW_HASHES,
PAR_PROTOCOL_VERSION_1, PAR_PROTOCOL_VERSION_3, PAR_PROTOCOL_VERSION_4,
}
};
Expand All @@ -53,6 +51,7 @@ use common_types::{
verification::Unverified,
snapshot::{ManifestData, RestorationStatus},
};
use transaction_pool::VerifiedTransaction;


/// The Chain Sync Handler: handles responses from peers
Expand All @@ -70,6 +69,8 @@ impl SyncHandler {
ReceiptsPacket => SyncHandler::on_peer_block_receipts(sync, io, peer, &rlp),
NewBlockPacket => SyncHandler::on_peer_new_block(sync, io, peer, &rlp),
NewBlockHashesPacket => SyncHandler::on_peer_new_hashes(sync, io, peer, &rlp),
NewPooledTransactionHashesPacket => SyncHandler::on_peer_new_pooled_transactions(sync, io, peer, &rlp),
PooledTransactionsPacket => SyncHandler::on_peer_pooled_transactions(sync, io, peer, &rlp),
SnapshotManifestPacket => SyncHandler::on_snapshot_manifest(sync, io, peer, &rlp),
SnapshotDataPacket => SyncHandler::on_snapshot_data(sync, io, peer, &rlp),
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
Expand Down Expand Up @@ -590,9 +591,11 @@ impl SyncHandler {
difficulty,
latest_hash,
genesis,
unsent_pooled_hashes: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(io.chain().transactions_to_propagate().into_iter().map(|tx| *tx.hash()).collect()) } else { None },
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
asking_pooled_transactions: if eth_protocol_version >= ETH_PROTOCOL_VERSION_65.0 { Some(Vec::new()) } else { None },
asking_private_state: None,
ask_time: Instant::now(),
last_sent_transactions: Default::default(),
Expand Down Expand Up @@ -651,7 +654,7 @@ impl SyncHandler {

if false
|| (warp_protocol && (peer.protocol_version < PAR_PROTOCOL_VERSION_1.0 || peer.protocol_version > PAR_PROTOCOL_VERSION_4.0))
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_64.0))
|| (!warp_protocol && (peer.protocol_version < ETH_PROTOCOL_VERSION_63.0 || peer.protocol_version > ETH_PROTOCOL_VERSION_65.0))
{
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, peer.protocol_version);
return Err(DownloaderImportError::Invalid);
Expand Down Expand Up @@ -698,6 +701,61 @@ impl SyncHandler {
Ok(())
}

/// Called when peer requests a set of pooled transactions
pub fn on_peer_new_pooled_transactions(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
for item in tx_rlp {
let hash = item.as_val::<H256>().map_err(|_| DownloaderImportError::Invalid)?;

if io.chain().pooled_transaction(hash).is_none() {
let unfetched = sync.unfetched_pooled_transactions.entry(hash).or_insert_with(|| super::UnfetchedTransaction {
announcer: peer_id,
next_fetch: Instant::now(),
tries: 0,
});

// Only reset the budget if we hear from multiple sources
if unfetched.announcer != peer_id {
unfetched.next_fetch = Instant::now();
unfetched.tries = 0;
}
}
}

Ok(())
}

/// Called when peer sends us a list of pooled transactions
pub fn on_peer_pooled_transactions(sync: &ChainSync, io: &mut dyn SyncIo, peer_id: PeerId, tx_rlp: &Rlp) -> Result<(), DownloaderImportError> {
let peer = match sync.peers.get(&peer_id).filter(|p| p.can_sync()) {
Some(peer) => peer,
None => {
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
return Ok(());
}
};

// TODO: actually check against asked hashes
let item_count = tx_rlp.item_count()?;
if let Some(p) = &peer.asking_pooled_transactions {
if item_count > p.len() {
trace!(target: "sync", "{} Peer sent us more transactions than was supposed to", peer_id);
return Err(DownloaderImportError::Invalid);
}
} else {
trace!(target: "sync", "{} Peer sent us pooled transactions but does not declare support for them", peer_id);
return Err(DownloaderImportError::Invalid);
}
trace!(target: "sync", "{:02} -> PooledTransactions ({} entries)", peer_id, item_count);
let mut transactions = Vec::with_capacity(item_count);
for i in 0 .. item_count {
let rlp = tx_rlp.at(i)?;
let tx = rlp.as_raw().to_vec();
transactions.push(tx);
}
io.chain().queue_transactions(transactions, peer_id);
Ok(())
}

/// Called when peer sends us signed private transaction packet
fn on_signed_private_transaction(sync: &mut ChainSync, _io: &mut dyn SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
Expand Down
66 changes: 63 additions & 3 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ malloc_size_of_is_0!(PeerInfo);

pub type PacketDecodeError = DecoderError;

/// Version 65 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
pub const ETH_PROTOCOL_VERSION_65: (u8, u8) = (65, 0x11);
/// Version 64 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
pub const ETH_PROTOCOL_VERSION_64: (u8, u8) = (64, 0x11);
/// Version 63 of the Ethereum protocol and number of packet IDs reserved by the protocol (packet count).
Expand Down Expand Up @@ -200,6 +202,7 @@ const STATUS_TIMEOUT: Duration = Duration::from_secs(10);
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
const BODIES_TIMEOUT: Duration = Duration::from_secs(20);
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
const POOLED_TRANSACTIONS_TIMEOUT: Duration = Duration::from_secs(10);
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
/// Max time to wait for the Snapshot Manifest packet to arrive from a peer after it's being asked.
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -301,6 +304,7 @@ pub enum PeerAsking {
BlockHeaders,
BlockBodies,
BlockReceipts,
PooledTransactions,
SnapshotManifest,
SnapshotData,
PrivateState,
Expand Down Expand Up @@ -335,6 +339,8 @@ pub struct PeerInfo {
network_id: u64,
/// Peer best block hash
latest_hash: H256,
/// Unpropagated tx pool hashes
unsent_pooled_hashes: Option<H256FastSet>,
/// Peer total difficulty if known
difficulty: Option<U256>,
/// Type of data currently being requested by us from a peer.
Expand All @@ -343,6 +349,8 @@ pub struct PeerInfo {
asking_blocks: Vec<H256>,
/// Holds requested header hash if currently requesting block header by hash
asking_hash: Option<H256>,
/// Holds requested transaction IDs
asking_pooled_transactions: Option<Vec<H256>>,
/// Holds requested private state hash
asking_private_state: Option<H256>,
/// Holds requested snapshot chunk hash if any.
Expand Down Expand Up @@ -641,6 +649,13 @@ enum PeerState {
SameBlock
}

#[derive(Clone, MallocSizeOf)]
struct UnfetchedTransaction {
announcer: PeerId,
next_fetch: Instant,
tries: usize,
}

/// Blockchain sync handler.
/// See module documentation for more details.
#[derive(MallocSizeOf)]
Expand Down Expand Up @@ -676,6 +691,8 @@ pub struct ChainSync {
sync_start_time: Option<Instant>,
/// Transactions propagation statistics
transactions_stats: TransactionsStats,
/// Unfetched transactions
unfetched_pooled_transactions: H256FastMap<UnfetchedTransaction>,
/// Enable ancient block downloading
download_old_blocks: bool,
/// Shared private tx service.
Expand Down Expand Up @@ -717,6 +734,7 @@ impl ChainSync {
snapshot: Snapshot::new(),
sync_start_time: None,
transactions_stats: TransactionsStats::default(),
unfetched_pooled_transactions: Default::default(),
private_tx_handler,
warp_sync: config.warp_sync,
status_sinks: Vec::new()
Expand All @@ -730,7 +748,7 @@ impl ChainSync {
let last_imported_number = self.new_blocks.last_imported_block_number();
SyncStatus {
state: self.state.clone(),
protocol_version: ETH_PROTOCOL_VERSION_64.0,
protocol_version: ETH_PROTOCOL_VERSION_65.0,
network_id: self.network_id,
start_block_number: self.starting_block,
last_imported_block_number: Some(last_imported_number),
Expand Down Expand Up @@ -764,8 +782,17 @@ impl ChainSync {

/// Updates the set of transactions recently sent to this peer to avoid spamming.
pub fn transactions_received(&mut self, txs: &[UnverifiedTransaction], peer_id: PeerId) {
if let Some(peer_info) = self.peers.get_mut(&peer_id) {
peer_info.last_sent_transactions.extend(txs.iter().map(|tx| tx.hash()));
for (id, peer) in &mut self.peers {
let hashes = txs.iter().map(|tx| tx.hash());
if *id == peer_id {
peer.last_sent_transactions.extend(hashes);
} else if let Some(s) = &mut peer.unsent_pooled_hashes {
s.extend(hashes);
}
}

for tx in txs {
self.unfetched_pooled_transactions.remove(&tx.hash());
}
}

Expand Down Expand Up @@ -1099,6 +1126,36 @@ impl ChainSync {
}
}

// get some peers to give us transaction pool
if !self.unfetched_pooled_transactions.is_empty() {
if let Some(s) = &mut self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions {
let now = Instant::now();

let mut new_asking_pooled_transactions = s.iter().copied().collect::<HashSet<_>>();
let mut new_unfetched_pooled_transactions = self.unfetched_pooled_transactions.clone();
while new_asking_pooled_transactions.len() <= 256 {
for (hash, mut item) in self.unfetched_pooled_transactions.drain() {
if item.next_fetch < now {
new_asking_pooled_transactions.insert(hash);
item.tries += 1;
if item.tries < 5 {
item.next_fetch = now + (POOLED_TRANSACTIONS_TIMEOUT / 2);
new_unfetched_pooled_transactions.insert(hash, item);
}
}
}
}

let new_asking_pooled_transactions = new_asking_pooled_transactions.into_iter().collect::<Vec<_>>();
SyncRequester::request_pooled_transactions(self, io, peer_id, &new_asking_pooled_transactions);

self.peers.get_mut(&peer_id).unwrap().asking_pooled_transactions = Some(new_asking_pooled_transactions);
self.unfetched_pooled_transactions = new_unfetched_pooled_transactions;

return;
}
}

// Only ask for old blocks if the peer has an equal or higher difficulty
let equal_or_higher_difficulty = peer_difficulty.map_or(true, |pd| pd >= syncing_difficulty);

Expand Down Expand Up @@ -1290,6 +1347,7 @@ impl ChainSync {
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT,
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT,
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT,
PeerAsking::PooledTransactions => elapsed > POOLED_TRANSACTIONS_TIMEOUT,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT,
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT,
Expand Down Expand Up @@ -1618,10 +1676,12 @@ pub mod tests {
genesis: H256::zero(),
network_id: 0,
latest_hash: peer_latest_hash,
unsent_pooled_hashes: Some(Default::default()),
difficulty: None,
asking: PeerAsking::Nothing,
asking_blocks: Vec::new(),
asking_hash: None,
asking_pooled_transactions: Some(Vec::new()),
asking_private_state: None,
ask_time: Instant::now(),
last_sent_transactions: Default::default(),
Expand Down
Loading

0 comments on commit f8b22e2

Please sign in to comment.