Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use channels as a transport abstraction. #14

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions renet/examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use renet::{ConnectToken, RenetClient, RenetConnectionConfig, RenetServer, ServerConfig, ServerEvent, NETCODE_USER_DATA_BYTES};
use renet::{
udp_transport, ConnectToken, RenetClient, RenetConnectionConfig, RenetServer, ServerConfig, ServerEvent, NETCODE_USER_DATA_BYTES,
};
use renetcode::NETCODE_KEY_BYTES;
use std::collections::HashMap;
use std::thread;
Expand Down Expand Up @@ -67,7 +69,9 @@ fn server(addr: SocketAddr) {
let connection_config = RenetConnectionConfig::default();
let server_config = ServerConfig::new(64, PROTOCOL_ID, addr, *PRIVATE_KEY);
let current_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let mut server: RenetServer = RenetServer::new(current_time, server_config, connection_config, socket).unwrap();
let (sender, receiver) = udp_transport(socket);

let mut server: RenetServer = RenetServer::new(current_time, server_config, connection_config, sender, receiver);

let mut usernames: HashMap<u64, String> = HashMap::new();
let mut received_messages = vec![];
Expand Down Expand Up @@ -127,7 +131,9 @@ fn client(server_addr: SocketAddr, username: Username) {
PRIVATE_KEY,
)
.unwrap();
let mut client = RenetClient::new(current_time, socket, client_id, connect_token, connection_config).unwrap();

let (sender, receiver) = udp_transport(socket);
let mut client = RenetClient::new(current_time, client_id, connect_token, connection_config, sender, receiver);
let stdin_channel = spawn_stdin_channel();

let mut last_updated = Instant::now();
Expand Down
49 changes: 28 additions & 21 deletions renet/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,44 @@ use rechannel::{
error::RechannelError,
remote_connection::{NetworkInfo, RemoteConnection},
};
use renetcode::{ConnectToken, NetcodeClient, NetcodeError, PacketToSend, NETCODE_MAX_PACKET_BYTES};
use renetcode::{ConnectToken, NetcodeClient, NetcodeError, PacketToSend};

use log::debug;

use std::io;
use std::net::UdpSocket;
use std::sync::mpsc::TryRecvError;
use std::time::Duration;
use std::{
net::SocketAddr,
sync::mpsc::{Receiver, Sender},
};

/// A client that establishes an authenticated connection with a server.
/// Can send/receive encrypted messages from/to the server.
pub struct RenetClient {
netcode_client: NetcodeClient,
socket: UdpSocket,
reliable_connection: RemoteConnection,
buffer: [u8; NETCODE_MAX_PACKET_BYTES],
packet_sender: Sender<(SocketAddr, Vec<u8>)>,
packet_receiver: Receiver<(SocketAddr, Vec<u8>)>,
}

impl RenetClient {
pub fn new(
current_time: Duration,
socket: UdpSocket,
client_id: u64,
connect_token: ConnectToken,
config: RenetConnectionConfig,
) -> Result<Self, std::io::Error> {
socket.set_nonblocking(true)?;
packet_sender: Sender<(SocketAddr, Vec<u8>)>,
packet_receiver: Receiver<(SocketAddr, Vec<u8>)>,
) -> Self {
let reliable_connection = RemoteConnection::new(config.to_connection_config());
let netcode_client = NetcodeClient::new(current_time, client_id, connect_token);

Ok(Self {
buffer: [0u8; NETCODE_MAX_PACKET_BYTES],
socket,
Self {
reliable_connection,
netcode_client,
})
packet_receiver,
packet_sender,
}
}

pub fn client_id(&self) -> u64 {
Expand Down Expand Up @@ -70,7 +73,7 @@ impl RenetClient {
match self.netcode_client.disconnect() {
Ok(PacketToSend { packet, address }) => {
for _ in 0..NUM_DISCONNECT_PACKETS_TO_SEND {
if let Err(e) = self.socket.send_to(packet, address) {
if let Err(e) = self.packet_sender.send((address, packet.to_vec())) {
log::error!("failed to send disconnect packet to server: {}", e);
}
}
Expand Down Expand Up @@ -104,7 +107,9 @@ impl RenetClient {
let packets = self.reliable_connection.get_packets_to_send()?;
for packet in packets.into_iter() {
let PacketToSend { packet, address } = self.netcode_client.generate_payload_packet(&packet)?;
self.socket.send_to(packet, address)?;
self.packet_sender
.send((address, packet.to_vec()))
.map_err(|_| RenetError::SenderDisconnected)?;
}
}
Ok(())
Expand All @@ -123,27 +128,29 @@ impl RenetClient {
}

loop {
let packet = match self.socket.recv_from(&mut self.buffer) {
Ok((len, addr)) => {
let mut packet = match self.packet_receiver.try_recv() {
Ok((addr, payload)) => {
if addr != self.netcode_client.server_addr() {
debug!("Discarded packet from unknown server {:?}", addr);
continue;
}

&mut self.buffer[..len]
payload
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => return Err(RenetError::IO(e)),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Err(RenetError::ReceiverDisconnected),
};

if let Some(payload) = self.netcode_client.process_packet(packet) {
if let Some(payload) = self.netcode_client.process_packet(&mut packet) {
self.reliable_connection.process_packet(payload)?;
}
}

self.reliable_connection.update()?;
if let Some((packet, addr)) = self.netcode_client.update(duration) {
self.socket.send_to(packet, addr)?;
self.packet_sender
.send((addr, packet.to_vec()))
.map_err(|_| RenetError::SenderDisconnected)?;
}

Ok(())
Expand Down
12 changes: 4 additions & 8 deletions renet/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use std::fmt;
pub enum RenetError {
Netcode(renetcode::NetcodeError),
Rechannel(rechannel::error::RechannelError),
IO(std::io::Error),
ReceiverDisconnected,
SenderDisconnected,
}

impl Error for RenetError {}
Expand All @@ -19,7 +20,8 @@ impl fmt::Display for RenetError {
match *self {
RenetError::Netcode(ref err) => err.fmt(fmt),
RenetError::Rechannel(ref err) => err.fmt(fmt),
RenetError::IO(ref err) => err.fmt(fmt),
RenetError::SenderDisconnected => write!(fmt, "packet sender has been disconnected"),
RenetError::ReceiverDisconnected => write!(fmt, "packet receiver has been disconnected"),
}
}
}
Expand All @@ -36,12 +38,6 @@ impl From<rechannel::error::RechannelError> for RenetError {
}
}

impl From<std::io::Error> for RenetError {
fn from(inner: std::io::Error) -> Self {
RenetError::IO(inner)
}
}

pub enum DisconnectionReason {
Rechannel(RechannelDisconnectReason),
Netcode(NetcodeDisconnectReason),
Expand Down
37 changes: 37 additions & 0 deletions renet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ pub use rechannel::error::{ChannelError, DisconnectionReason, RechannelError};
pub use rechannel::remote_connection::NetworkInfo;

use rechannel::{remote_connection::ConnectionConfig, FragmentConfig};
use renetcode::NETCODE_MAX_PACKET_BYTES;
pub use renetcode::{ConnectToken, NetcodeError};
pub use renetcode::{NETCODE_KEY_BYTES, NETCODE_MAX_PAYLOAD_BYTES, NETCODE_USER_DATA_BYTES};

pub use client::RenetClient;
pub use error::RenetError;
pub use server::{RenetServer, ServerConfig, ServerEvent};

use std::net::{SocketAddr, UdpSocket};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::time::Duration;

const NUM_DISCONNECT_PACKETS_TO_SEND: u32 = 5;
Expand Down Expand Up @@ -74,3 +77,37 @@ impl RenetConnectionConfig {
}
}
}

#[allow(clippy::type_complexity)]
pub fn udp_transport(socket: UdpSocket) -> (Sender<(SocketAddr, Vec<u8>)>, Receiver<(SocketAddr, Vec<u8>)>) {
let (packet_sender, packet_receiver) = channel::<(SocketAddr, Vec<u8>)>();
let (transport_sender, transport_receiver) = channel::<(SocketAddr, Vec<u8>)>();
socket.set_nonblocking(true).unwrap();

std::thread::spawn(move || {
let mut buffer = vec![0u8; NETCODE_MAX_PACKET_BYTES].into_boxed_slice();
loop {
match socket.recv_from(&mut buffer) {
Ok((len, addr)) => {
if transport_sender.send((addr, buffer[..len].to_vec())).is_err() {
break;
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(_) => break,
};

match packet_receiver.try_recv() {
Ok((addr, packet)) => {
if socket.send_to(&packet, addr).is_err() {
break;
}
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => break,
}
}
});

(packet_sender, transport_receiver)
}
Loading