Skip to content

Commit 2a0f24a

Browse files
committed
Refactor to allow configurable network adapters
Signed-off-by: Konrad Gräfe <[email protected]>
1 parent 681187c commit 2a0f24a

File tree

10 files changed

+236
-37
lines changed

10 files changed

+236
-37
lines changed

src/adapters/framed_tcp.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::network::adapter::{
22
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
33
ListeningInfo, PendingStatus,
44
};
5-
use crate::network::{RemoteAddr, Readiness};
5+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
66
use crate::util::encoding::{self, Decoder, MAX_ENCODED_SIZE};
77

88
use mio::net::{TcpListener, TcpStream};
@@ -45,7 +45,10 @@ impl Resource for RemoteResource {
4545
}
4646

4747
impl Remote for RemoteResource {
48-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
48+
fn connect_with(
49+
_: TransportConnect,
50+
remote_addr: RemoteAddr,
51+
) -> io::Result<ConnectionInfo<Self>> {
4952
let peer_addr = *remote_addr.socket_addr();
5053
let stream = TcpStream::connect(peer_addr)?;
5154
let local_addr = stream.local_addr()?;
@@ -129,7 +132,7 @@ impl Resource for LocalResource {
129132
impl Local for LocalResource {
130133
type Remote = RemoteResource;
131134

132-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
135+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
133136
let listener = TcpListener::bind(addr)?;
134137
let local_addr = listener.local_addr().unwrap();
135138
Ok(ListeningInfo { local: { LocalResource { listener } }, local_addr })

src/adapters/tcp.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::network::adapter::{
22
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
33
ListeningInfo, PendingStatus,
44
};
5-
use crate::network::{RemoteAddr, Readiness};
5+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
66

77
use mio::net::{TcpListener, TcpStream};
88
use mio::event::{Source};
@@ -40,7 +40,10 @@ impl Resource for RemoteResource {
4040
}
4141

4242
impl Remote for RemoteResource {
43-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
43+
fn connect_with(
44+
_: TransportConnect,
45+
remote_addr: RemoteAddr,
46+
) -> io::Result<ConnectionInfo<Self>> {
4447
let peer_addr = *remote_addr.socket_addr();
4548
let stream = TcpStream::connect(peer_addr)?;
4649
let local_addr = stream.local_addr()?;
@@ -131,7 +134,7 @@ impl Resource for LocalResource {
131134
impl Local for LocalResource {
132135
type Remote = RemoteResource;
133136

134-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
137+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
135138
let listener = TcpListener::bind(addr)?;
136139
let local_addr = listener.local_addr().unwrap();
137140
Ok(ListeningInfo { local: { LocalResource { listener } }, local_addr })

src/adapters/template.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::network::adapter::{
44
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
55
ListeningInfo, PendingStatus,
66
};
7-
use crate::network::{RemoteAddr, Readiness};
7+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
88

99
use mio::event::{Source};
1010

@@ -25,7 +25,10 @@ impl Resource for RemoteResource {
2525
}
2626

2727
impl Remote for RemoteResource {
28-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
28+
fn connect_with(
29+
config: TransportConnect,
30+
remote_addr: RemoteAddr,
31+
) -> io::Result<ConnectionInfo<Self>> {
2932
todo!()
3033
}
3134

@@ -52,7 +55,7 @@ impl Resource for LocalResource {
5255
impl Local for LocalResource {
5356
type Remote = RemoteResource;
5457

55-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
58+
fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
5659
todo!()
5760
}
5861

src/adapters/udp.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::network::adapter::{
22
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
33
ListeningInfo, PendingStatus,
44
};
5-
use crate::network::{RemoteAddr, Readiness};
5+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
66

77
use mio::net::{UdpSocket};
88
use mio::event::{Source};
@@ -43,7 +43,10 @@ impl Resource for RemoteResource {
4343
}
4444

4545
impl Remote for RemoteResource {
46-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
46+
fn connect_with(
47+
_: TransportConnect,
48+
remote_addr: RemoteAddr,
49+
) -> io::Result<ConnectionInfo<Self>> {
4750
let socket = UdpSocket::bind("0.0.0.0:0".parse().unwrap())?;
4851
let peer_addr = *remote_addr.socket_addr();
4952
socket.connect(peer_addr)?;
@@ -95,7 +98,7 @@ impl Resource for LocalResource {
9598
impl Local for LocalResource {
9699
type Remote = RemoteResource;
97100

98-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
101+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
99102
let socket = match addr {
100103
SocketAddr::V4(addr) if addr.ip().is_multicast() => {
101104
let listening_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, addr.port());

src/adapters/ws.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::network::adapter::{
44
};
55
use crate::network::{RemoteAddr, Readiness};
66
use crate::util::thread::{OTHER_THREAD_ERR};
7+
use crate::network::{TransportConnect, TransportListen};
78

89
use mio::event::{Source};
910
use mio::net::{TcpStream, TcpListener};
@@ -76,7 +77,10 @@ impl Resource for RemoteResource {
7677
}
7778

7879
impl Remote for RemoteResource {
79-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
80+
fn connect_with(
81+
_: TransportConnect,
82+
remote_addr: RemoteAddr,
83+
) -> io::Result<ConnectionInfo<Self>> {
8084
let (peer_addr, url) = match remote_addr {
8185
RemoteAddr::Socket(addr) => {
8286
(addr, Url::parse(&format!("ws://{addr}/message-io-default")).unwrap())
@@ -328,7 +332,7 @@ impl Resource for LocalResource {
328332
impl Local for LocalResource {
329333
type Remote = RemoteResource;
330334

331-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
335+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
332336
let listener = TcpListener::bind(addr)?;
333337
let local_addr = listener.local_addr().unwrap();
334338
Ok(ListeningInfo { local: LocalResource { listener }, local_addr })

src/network.rs

+70-11
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use adapter::{SendStatus};
1717
pub use resource_id::{ResourceId, ResourceType};
1818
pub use endpoint::{Endpoint};
1919
pub use remote_addr::{RemoteAddr, ToRemoteAddr};
20-
pub use transport::{Transport};
20+
pub use transport::{Transport, TransportConnect, TransportListen};
2121
pub use driver::{NetEvent};
2222
pub use poll::{Readiness};
2323

@@ -101,12 +101,33 @@ impl NetworkController {
101101
&self,
102102
transport: Transport,
103103
addr: impl ToRemoteAddr,
104+
) -> io::Result<(Endpoint, SocketAddr)> {
105+
self.connect_with(transport.into(), addr)
106+
}
107+
108+
/// Creates a connection to the specified address with custom transport options for transports
109+
/// that support it.
110+
/// The endpoint, an identifier of the new connection, will be returned.
111+
/// This function will generate a [`NetEvent::Connected`] event with the result of the
112+
/// connection. This call will **NOT** block to perform the connection.
113+
///
114+
/// Note that this function can return an error in the case the internal socket
115+
/// could not be binded or open in the OS, but never will return an error regarding
116+
/// the connection itself.
117+
/// If you want to check if the connection has been established or not you have to read the
118+
/// boolean indicator in the [`NetEvent::Connected`] event.
119+
pub fn connect_with(
120+
&self,
121+
transport_connect: TransportConnect,
122+
addr: impl ToRemoteAddr,
104123
) -> io::Result<(Endpoint, SocketAddr)> {
105124
let addr = addr.to_remote_addr().unwrap();
106-
self.controllers[transport.id() as usize].connect(addr).map(|(endpoint, addr)| {
107-
log::trace!("Connect to {}", endpoint);
108-
(endpoint, addr)
109-
})
125+
self.controllers[transport_connect.id() as usize].connect_with(transport_connect, addr).map(
126+
|(endpoint, addr)| {
127+
log::trace!("Connect to {}", endpoint);
128+
(endpoint, addr)
129+
},
130+
)
110131
}
111132

112133
/// Creates a connection to the specified address.
@@ -147,7 +168,28 @@ impl NetworkController {
147168
transport: Transport,
148169
addr: impl ToRemoteAddr,
149170
) -> io::Result<(Endpoint, SocketAddr)> {
150-
let (endpoint, addr) = self.connect(transport, addr)?;
171+
self.connect_sync_with(transport.into(), addr)
172+
}
173+
174+
/// Creates a connection to the specified address with custom transport options for transports
175+
/// that support it.
176+
/// This function is similar to [`NetworkController::connect_with()`] but will block
177+
/// until for the connection is ready.
178+
/// If the connection can not be established, a `ConnectionRefused` error will be returned.
179+
///
180+
/// Note that the `Connect` event will be also generated.
181+
///
182+
/// Since this function blocks the current thread, it must NOT be used inside
183+
/// the network callback because the internal event could not be processed.
184+
///
185+
/// In order to get the best scalability and performance, use the non-blocking
186+
/// [`NetworkController::connect_with()`] version.
187+
pub fn connect_sync_with(
188+
&self,
189+
transport_connect: TransportConnect,
190+
addr: impl ToRemoteAddr,
191+
) -> io::Result<(Endpoint, SocketAddr)> {
192+
let (endpoint, addr) = self.connect_with(transport_connect, addr)?;
151193
loop {
152194
std::thread::sleep(Duration::from_millis(1));
153195
match self.is_ready(endpoint.resource_id()) {
@@ -164,7 +206,7 @@ impl NetworkController {
164206
}
165207

166208
/// Listen messages from specified transport.
167-
/// The giver address will be used as interface and listening port.
209+
/// The given address will be used as interface and listening port.
168210
/// If the port can be opened, a [ResourceId] identifying the listener is returned
169211
/// along with the local address, or an error if not.
170212
/// The address is returned despite you passed as parameter because
@@ -173,12 +215,29 @@ impl NetworkController {
173215
&self,
174216
transport: Transport,
175217
addr: impl ToSocketAddrs,
218+
) -> io::Result<(ResourceId, SocketAddr)> {
219+
self.listen_with(transport.into(), addr)
220+
}
221+
222+
/// Listen messages from specified transport with custom transport options for transports that
223+
/// support it.
224+
/// The given address will be used as interface and listening port.
225+
/// If the port can be opened, a [ResourceId] identifying the listener is returned
226+
/// along with the local address, or an error if not.
227+
/// The address is returned despite you passed as parameter because
228+
/// when a `0` port is specified, the OS will give choose the value.
229+
pub fn listen_with(
230+
&self,
231+
transport_listen: TransportListen,
232+
addr: impl ToSocketAddrs,
176233
) -> io::Result<(ResourceId, SocketAddr)> {
177234
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
178-
self.controllers[transport.id() as usize].listen(addr).map(|(resource_id, addr)| {
179-
log::trace!("Listening at {} by {}", addr, resource_id);
180-
(resource_id, addr)
181-
})
235+
self.controllers[transport_listen.id() as usize].listen_with(transport_listen, addr).map(
236+
|(resource_id, addr)| {
237+
log::trace!("Listening at {} by {}", addr, resource_id);
238+
(resource_id, addr)
239+
},
240+
)
182241
}
183242

184243
/// Send the data message thought the connection represented by the given endpoint.

src/network/adapter.rs

+15-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::network::transport::{TransportConnect, TransportListen};
2+
13
use super::remote_addr::{RemoteAddr};
24
use super::poll::{Readiness};
35

@@ -31,7 +33,7 @@ pub trait Resource: Send + Sync {
3133
fn source(&mut self) -> &mut dyn Source;
3234
}
3335

34-
/// Plain struct used as a returned value of [`Remote::connect()`]
36+
/// Plain struct used as a returned value of [`Remote::connect_with()`]
3537
pub struct ConnectionInfo<R: Remote> {
3638
/// The new created remote resource
3739
pub remote: R,
@@ -43,7 +45,7 @@ pub struct ConnectionInfo<R: Remote> {
4345
pub peer_addr: SocketAddr,
4446
}
4547

46-
/// Plain struct used as a returned value of [`Local::listen()`]
48+
/// Plain struct used as a returned value of [`Local::listen_with()`]
4749
pub struct ListeningInfo<L: Local> {
4850
/// The new created local resource
4951
pub local: L,
@@ -117,10 +119,16 @@ pub enum PendingStatus {
117119
pub trait Remote: Resource + Sized {
118120
/// Called when the user performs a connection request to an specific remote address.
119121
/// The **implementator** is in change of creating the corresponding remote resource.
122+
/// The [`TransportConnect`] wraps custom transport options for transports that support it. It
123+
/// is guaranteed by the upper level to be of the variant matching the adapter. Therefore other
124+
/// variants can be safely ignored.
120125
/// The [`RemoteAddr`] contains either a [`SocketAddr`] or a [`url::Url`].
121126
/// It is in charge of deciding what to do in both cases.
122127
/// It also must return the extracted address as `SocketAddr`.
123-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>>;
128+
fn connect_with(
129+
config: TransportConnect,
130+
remote_addr: RemoteAddr,
131+
) -> io::Result<ConnectionInfo<Self>>;
124132

125133
/// Called when a remote resource received an event.
126134
/// The resource must be *ready* to receive this call.
@@ -193,7 +201,10 @@ pub trait Local: Resource + Sized {
193201
/// The **implementator** is in change of creating the corresponding local resource.
194202
/// It also must returned the listening address since it could not be the same as param `addr`
195203
/// (e.g. listening from port `0`).
196-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>>;
204+
/// The [`TransportListen`] wraps custom transport options for transports that support it. It
205+
/// is guaranteed by the upper level to be of the variant matching the adapter. Therefore other
206+
/// variants can be safely ignored.
207+
fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>>;
197208

198209
/// Called when a local resource received an event.
199210
/// It means that some resource have tried to connect.

src/network/driver.rs

+23-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use super::poll::{Poll, Readiness};
44
use super::registry::{ResourceRegistry, Register};
55
use super::remote_addr::{RemoteAddr};
66
use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus, PendingStatus};
7+
use super::transport::{TransportConnect, TransportListen};
78

89
use std::net::{SocketAddr};
910
use std::sync::{
@@ -68,8 +69,16 @@ impl std::fmt::Debug for NetEvent<'_> {
6869
}
6970

7071
pub trait ActionController: Send + Sync {
71-
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)>;
72-
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)>;
72+
fn connect_with(
73+
&self,
74+
config: TransportConnect,
75+
addr: RemoteAddr,
76+
) -> io::Result<(Endpoint, SocketAddr)>;
77+
fn listen_with(
78+
&self,
79+
config: TransportListen,
80+
addr: SocketAddr,
81+
) -> io::Result<(ResourceId, SocketAddr)>;
7382
fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus;
7483
fn remove(&self, id: ResourceId) -> bool;
7584
fn is_ready(&self, id: ResourceId) -> Option<bool>;
@@ -136,8 +145,12 @@ impl<R: Remote, L: Local> Clone for Driver<R, L> {
136145
}
137146

138147
impl<R: Remote, L: Local> ActionController for Driver<R, L> {
139-
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> {
140-
R::connect(addr).map(|info| {
148+
fn connect_with(
149+
&self,
150+
config: TransportConnect,
151+
addr: RemoteAddr,
152+
) -> io::Result<(Endpoint, SocketAddr)> {
153+
R::connect_with(config, addr).map(|info| {
141154
let id = self.remote_registry.register(
142155
info.remote,
143156
RemoteProperties::new(info.peer_addr, None),
@@ -147,8 +160,12 @@ impl<R: Remote, L: Local> ActionController for Driver<R, L> {
147160
})
148161
}
149162

150-
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> {
151-
L::listen(addr).map(|info| {
163+
fn listen_with(
164+
&self,
165+
config: TransportListen,
166+
addr: SocketAddr,
167+
) -> io::Result<(ResourceId, SocketAddr)> {
168+
L::listen_with(config, addr).map(|info| {
152169
let id = self.local_registry.register(info.local, LocalProperties, false);
153170
(id, info.local_addr)
154171
})

0 commit comments

Comments
 (0)