Skip to content

Commit

Permalink
Rechannel: don't ack old chunk_ids in the block channel
Browse files Browse the repository at this point in the history
Discard the ack packet if the chunk_id of the packet does not match the current chunk_id of the message being sent.
  • Loading branch information
lucaspoffo committed Jun 29, 2022
1 parent 49dd33f commit 453f5ef
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions rechannel/src/channel/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) struct SliceMessage {
#[derive(Debug, Clone)]
struct PacketSent {
acked: bool,
chunk_id: u16,
slice_ids: Vec<u32>,
}

Expand Down Expand Up @@ -100,8 +101,12 @@ impl Default for BlockChannelConfig {
}

impl PacketSent {
fn new(slice_ids: Vec<u32>) -> Self {
Self { acked: false, slice_ids }
fn new(chunk_id: u16, slice_ids: Vec<u32>) -> Self {
Self {
chunk_id,
slice_ids,
acked: false,
}
}
}

Expand Down Expand Up @@ -193,7 +198,7 @@ impl ChunkSender {

fn process_ack(&mut self, ack: u16) {
if let Some(sent_packet) = self.packets_sent.get_mut(ack) {
if sent_packet.acked {
if sent_packet.acked || sent_packet.chunk_id != self.chunk_id {
return;
}
sent_packet.acked = true;
Expand Down Expand Up @@ -379,7 +384,7 @@ impl Channel for BlockChannel {
}
}

let packet_sent = PacketSent::new(slice_ids);
let packet_sent = PacketSent::new(self.sender.chunk_id, slice_ids);
self.sender.packets_sent.insert(sequence, packet_sent);

Some(ChannelPacketData {
Expand Down Expand Up @@ -511,12 +516,11 @@ mod tests {
fn block_channel_queue() {
let mut channel = BlockChannel::new(BlockChannelConfig {
resend_time: Duration::ZERO,
..Default::default()
..Default::default()
});
let first_message = Bytes::from(vec![3; 2000]);
let second_message = Bytes::from(vec![5; 2000]);
channel.send_message(first_message.clone());
// Add second message to queue
channel.send_message(second_message.clone());

// First message
Expand All @@ -540,4 +544,28 @@ mod tests {
let block_channel_data = channel.get_messages_to_send(u64::MAX, 2);
assert!(block_channel_data.is_none());
}

#[test]
fn acking_packet_with_old_chunk_id() {
let mut channel = BlockChannel::new(BlockChannelConfig {
resend_time: Duration::ZERO,
..Default::default()
});
let first_message = Bytes::from(vec![5; 400 * 3]);
let second_message = Bytes::from(vec![3; 400]);
channel.send_message(first_message.clone());
channel.send_message(second_message.clone());

let _ = channel.get_messages_to_send(u64::MAX, 0).unwrap();
let _ = channel.get_messages_to_send(u64::MAX, 1).unwrap();

channel.process_ack(0);
let _ = channel.get_messages_to_send(u64::MAX, 2).unwrap();

channel.process_ack(1);
assert!(channel.sender.sending);

channel.process_ack(2);
assert!(!channel.sender.sending);
}
}

0 comments on commit 453f5ef

Please sign in to comment.