Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Prepare for network protocol version upgrades #5084

Merged
merged 26 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2bd9cb1
explicitly tag network requests with version
rphmeier Mar 11, 2022
2f4464d
fmt
rphmeier Mar 11, 2022
b1ed697
make PeerSet more aware of versioning
rphmeier Mar 11, 2022
2f61e49
some generalization of the network bridge to support upgrades
rphmeier Mar 11, 2022
0593b0c
walk back some renaming
rphmeier Mar 11, 2022
5ffc622
walk back some version stuff
rphmeier Mar 11, 2022
2e4f7b0
extract version from fallback
rphmeier Mar 11, 2022
f63da6a
remove V1 from NetworkBridgeUpdate
rphmeier Mar 11, 2022
98ee4da
Merge branch 'master' into rh-network-protocol-upgrades
rphmeier Apr 14, 2022
ebb52bb
add accidentally-removed timer
rphmeier Apr 14, 2022
812da02
implement focusing for versioned messages
rphmeier Apr 14, 2022
5f3de34
fmt
rphmeier Apr 14, 2022
d84984e
fix up network bridge & tests
rphmeier Apr 15, 2022
5cbd94b
remove inaccurate version check in bridge
rphmeier Apr 15, 2022
da8f234
remove some TODO [now]s
rphmeier Apr 15, 2022
91067d6
fix fallout in statement distribution
rphmeier Apr 15, 2022
708a6d6
fmt
rphmeier Apr 15, 2022
d46a3a8
fallout in gossip-support
rphmeier Apr 15, 2022
95d4463
fix fallout in collator-protocol
rphmeier Apr 15, 2022
5c811ec
fix fallout in bitfield-distribution
rphmeier Apr 15, 2022
5fc3b9c
fix fallout in approval-distribution
rphmeier Apr 15, 2022
3e08c99
Merge branch 'master' into rh-network-protocol-upgrades
rphmeier Apr 20, 2022
f30ead9
fmt
rphmeier Apr 20, 2022
382e45e
Merge branch 'master' into rh-network-protocol-upgrades
rphmeier Apr 21, 2022
ac07199
use never!
rphmeier Apr 21, 2022
3bc077e
fmt
rphmeier Apr 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 14 additions & 13 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_network_protocol::{
v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, View,
self as net_protocol, v1 as protocol_v1, PeerId, UnifiedReputationChange as Rep, Versioned,
View,
};
use polkadot_node_primitives::approval::{
AssignmentCert, BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote,
Expand Down Expand Up @@ -227,10 +228,10 @@ impl State {
ctx: &mut (impl SubsystemContext<Message = ApprovalDistributionMessage>
+ overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
metrics: &Metrics,
event: NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>,
event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, role, _) => {
NetworkBridgeEvent::PeerConnected(peer_id, role, _, _) => {
// insert a blank view if none already present
gum::trace!(target: LOG_TARGET, ?peer_id, ?role, "Peer connected");
self.peer_views.entry(peer_id).or_default();
Expand Down Expand Up @@ -275,7 +276,7 @@ impl State {
live
});
},
NetworkBridgeEvent::PeerMessage(peer_id, msg) => {
NetworkBridgeEvent::PeerMessage(peer_id, Versioned::V1(msg)) => {
self.process_incoming_peer_message(ctx, metrics, peer_id, msg).await;
},
}
Expand Down Expand Up @@ -751,9 +752,9 @@ impl State {

ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments),
),
)),
))
.await;
}
Expand Down Expand Up @@ -973,9 +974,9 @@ impl State {

ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals),
),
)),
))
.await;
}
Expand Down Expand Up @@ -1181,9 +1182,9 @@ impl State {

ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![peer_id.clone()],
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments),
),
)),
))
.await;
}
Expand All @@ -1199,9 +1200,9 @@ impl State {

ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
vec![peer_id],
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals),
),
)),
))
.await;
}
Expand Down Expand Up @@ -1284,7 +1285,7 @@ impl ApprovalDistribution {
Context: overseer::SubsystemContext<Message = ApprovalDistributionMessage>,
{
match msg {
ApprovalDistributionMessage::NetworkBridgeUpdateV1(event) => {
ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
state.handle_network_msg(ctx, metrics, event).await;
},
ApprovalDistributionMessage::NewBlocks(metas) => {
Expand Down
51 changes: 26 additions & 25 deletions node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,17 @@ async fn setup_peer_with_view(
) {
overseer_send(
virtual_overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerConnected(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerConnected(
peer_id.clone(),
ObservedRole::Full,
1,
None,
)),
)
.await;
overseer_send(
virtual_overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange(
peer_id.clone(),
view,
)),
Expand All @@ -130,9 +131,9 @@ async fn send_message_from_peer(
) {
overseer_send(
virtual_overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
peer_id.clone(),
msg,
Versioned::V1(msg),
)),
)
.await;
Expand Down Expand Up @@ -234,9 +235,9 @@ fn try_import_the_same_assignment() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
)
))
)) => {
assert_eq!(peers.len(), 2);
assert_eq!(assignments.len(), 1);
Expand Down Expand Up @@ -323,7 +324,7 @@ fn spam_attack_results_in_negative_reputation_change() {
// send a view update that removes block B from peer's view by bumping the finalized_number
overseer_send(
overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange(
peer.clone(),
View::with_finalized(2),
)),
Expand Down Expand Up @@ -382,7 +383,7 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() {
// update peer view to include the hash
overseer_send(
overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange(
peer.clone(),
view![hash],
)),
Expand All @@ -394,9 +395,9 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
)
))
)) => {
assert_eq!(peers.len(), 1);
assert_eq!(assignments.len(), 1);
Expand Down Expand Up @@ -460,9 +461,9 @@ fn import_approval_happy_path() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
)
))
)) => {
assert_eq!(peers.len(), 2);
assert_eq!(assignments.len(), 1);
Expand Down Expand Up @@ -496,9 +497,9 @@ fn import_approval_happy_path() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(approvals)
)
))
)) => {
assert_eq!(peers.len(), 1);
assert_eq!(approvals.len(), 1);
Expand Down Expand Up @@ -712,9 +713,9 @@ fn update_peer_view() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
)
))
)) => {
assert_eq!(peers.len(), 1);
assert_eq!(assignments.len(), 1);
Expand Down Expand Up @@ -743,7 +744,7 @@ fn update_peer_view() {
// update peer's view
overseer_send(
overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange(
peer.clone(),
View::new(vec![hash_b, hash_c, hash_d], 2),
)),
Expand All @@ -763,9 +764,9 @@ fn update_peer_view() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(assignments)
)
))
)) => {
assert_eq!(peers.len(), 1);
assert_eq!(assignments.len(), 1);
Expand Down Expand Up @@ -796,7 +797,7 @@ fn update_peer_view() {
// update peer's view
overseer_send(
overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerViewChange(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange(
peer.clone(),
View::with_finalized(finalized_number),
)),
Expand Down Expand Up @@ -948,9 +949,9 @@ fn sends_assignments_even_when_state_is_approved() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments)
)
))
)) => {
assert_eq!(peers, vec![peer.clone()]);
assert_eq!(sent_assignments, assignments);
Expand All @@ -961,9 +962,9 @@ fn sends_assignments_even_when_state_is_approved() {
overseer_recv(overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::ApprovalDistribution(
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals)
)
))
)) => {
assert_eq!(peers, vec![peer.clone()]);
assert_eq!(sent_approvals, approvals);
Expand Down Expand Up @@ -1007,7 +1008,7 @@ fn race_condition_in_local_vs_remote_view_update() {
// Send our view update to include a new head
overseer_send(
overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::OurViewChange(
our_view![hash_b],
)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
Recipient::Authority(authority_id.clone()),
PoVFetchingRequest { candidate_hash },
);
let full_req = Requests::PoVFetching(req);
let full_req = Requests::PoVFetchingV1(req);

ctx.send_message(NetworkBridgeMessage::SendRequests(
vec![full_req],
Expand Down Expand Up @@ -200,7 +200,7 @@ mod tests {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(mut reqs, _)) => {
let req = assert_matches!(
reqs.pop(),
Some(Requests::PoVFetching(outgoing)) => {outgoing}
Some(Requests::PoVFetchingV1(outgoing)) => {outgoing}
);
req.pending_response
.send(Ok(PoVFetchingResponse::PoV(pov.clone()).encode()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ impl RunningTask {
) -> std::result::Result<ChunkFetchingResponse, TaskError> {
let (full_request, response_recv) =
OutgoingRequest::new(Recipient::Authority(validator.clone()), self.request);
let requests = Requests::ChunkFetching(full_request);
let requests = Requests::ChunkFetchingV1(full_request);

self.sender
.send(FromFetchTask::Message(AllMessages::NetworkBridge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl TestRun {
let mut valid_responses = 0;
for req in reqs {
let req = match req {
Requests::ChunkFetching(req) => req,
Requests::ChunkFetchingV1(req) => req,
_ => panic!("Unexpected request"),
};
let response =
Expand Down
2 changes: 1 addition & 1 deletion node/network/availability-distribution/src/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ fn to_incoming_req(
outgoing: Requests,
) -> IncomingRequest<v1::ChunkFetchingRequest> {
match outgoing {
Requests::ChunkFetching(OutgoingRequest { payload, pending_response, .. }) => {
Requests::ChunkFetchingV1(OutgoingRequest { payload, pending_response, .. }) => {
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>) =
oneshot::channel();
executor.spawn(
Expand Down
4 changes: 2 additions & 2 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl RequestFromBackers {
sender
.send_message(
NetworkBridgeMessage::SendRequests(
vec![Requests::AvailableDataFetching(req)],
vec![Requests::AvailableDataFetchingV1(req)],
IfDisconnected::ImmediateError,
)
.into(),
Expand Down Expand Up @@ -325,7 +325,7 @@ impl RequestChunksFromValidators {

let (req, res) =
OutgoingRequest::new(Recipient::Authority(validator), raw_request.clone());
requests.push(Requests::ChunkFetching(req));
requests.push(Requests::ChunkFetchingV1(req));

params.metrics.on_chunk_request_issued();
let timer = params.metrics.time_chunk_request();
Expand Down
4 changes: 2 additions & 2 deletions node/network/availability-recovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl TestState {
i += 1;
assert_matches!(
req,
Requests::ChunkFetching(req) => {
Requests::ChunkFetchingV1(req) => {
assert_eq!(req.payload.candidate_hash, candidate_hash);

let validator_index = req.payload.index.0 as usize;
Expand Down Expand Up @@ -341,7 +341,7 @@ impl TestState {

assert_matches!(
requests.pop().unwrap(),
Requests::AvailableDataFetching(req) => {
Requests::AvailableDataFetchingV1(req) => {
assert_eq!(req.payload.candidate_hash, candidate_hash);
let validator_index = self.validator_authority_id
.iter()
Expand Down
Loading