Skip to content

Commit 61ecafc

Browse files
committed
Refactor to allow configurable network adapters
Signed-off-by: Konrad Gräfe <[email protected]>
1 parent 681187c commit 61ecafc

File tree

10 files changed

+172
-34
lines changed

10 files changed

+172
-34
lines changed

src/adapters/framed_tcp.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::network::adapter::{
22
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
33
ListeningInfo, PendingStatus,
44
};
5-
use crate::network::{RemoteAddr, Readiness};
5+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
66
use crate::util::encoding::{self, Decoder, MAX_ENCODED_SIZE};
77

88
use mio::net::{TcpListener, TcpStream};
@@ -45,7 +45,10 @@ impl Resource for RemoteResource {
4545
}
4646

4747
impl Remote for RemoteResource {
48-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
48+
fn connect_with(
49+
_: TransportConnect,
50+
remote_addr: RemoteAddr,
51+
) -> io::Result<ConnectionInfo<Self>> {
4952
let peer_addr = *remote_addr.socket_addr();
5053
let stream = TcpStream::connect(peer_addr)?;
5154
let local_addr = stream.local_addr()?;
@@ -129,7 +132,7 @@ impl Resource for LocalResource {
129132
impl Local for LocalResource {
130133
type Remote = RemoteResource;
131134

132-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
135+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
133136
let listener = TcpListener::bind(addr)?;
134137
let local_addr = listener.local_addr().unwrap();
135138
Ok(ListeningInfo { local: { LocalResource { listener } }, local_addr })

src/adapters/tcp.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::network::adapter::{
22
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
33
ListeningInfo, PendingStatus,
44
};
5-
use crate::network::{RemoteAddr, Readiness};
5+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
66

77
use mio::net::{TcpListener, TcpStream};
88
use mio::event::{Source};
@@ -40,7 +40,10 @@ impl Resource for RemoteResource {
4040
}
4141

4242
impl Remote for RemoteResource {
43-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
43+
fn connect_with(
44+
_: TransportConnect,
45+
remote_addr: RemoteAddr,
46+
) -> io::Result<ConnectionInfo<Self>> {
4447
let peer_addr = *remote_addr.socket_addr();
4548
let stream = TcpStream::connect(peer_addr)?;
4649
let local_addr = stream.local_addr()?;
@@ -131,7 +134,7 @@ impl Resource for LocalResource {
131134
impl Local for LocalResource {
132135
type Remote = RemoteResource;
133136

134-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
137+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
135138
let listener = TcpListener::bind(addr)?;
136139
let local_addr = listener.local_addr().unwrap();
137140
Ok(ListeningInfo { local: { LocalResource { listener } }, local_addr })

src/adapters/template.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::network::adapter::{
44
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
55
ListeningInfo, PendingStatus,
66
};
7-
use crate::network::{RemoteAddr, Readiness};
7+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
88

99
use mio::event::{Source};
1010

@@ -25,7 +25,10 @@ impl Resource for RemoteResource {
2525
}
2626

2727
impl Remote for RemoteResource {
28-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
28+
fn connect_with(
29+
config: TransportConnect,
30+
remote_addr: RemoteAddr,
31+
) -> io::Result<ConnectionInfo<Self>> {
2932
todo!()
3033
}
3134

@@ -52,7 +55,7 @@ impl Resource for LocalResource {
5255
impl Local for LocalResource {
5356
type Remote = RemoteResource;
5457

55-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
58+
fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
5659
todo!()
5760
}
5861

src/adapters/udp.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::network::adapter::{
22
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
33
ListeningInfo, PendingStatus,
44
};
5-
use crate::network::{RemoteAddr, Readiness};
5+
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};
66

77
use mio::net::{UdpSocket};
88
use mio::event::{Source};
@@ -43,7 +43,10 @@ impl Resource for RemoteResource {
4343
}
4444

4545
impl Remote for RemoteResource {
46-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
46+
fn connect_with(
47+
_: TransportConnect,
48+
remote_addr: RemoteAddr,
49+
) -> io::Result<ConnectionInfo<Self>> {
4750
let socket = UdpSocket::bind("0.0.0.0:0".parse().unwrap())?;
4851
let peer_addr = *remote_addr.socket_addr();
4952
socket.connect(peer_addr)?;
@@ -95,7 +98,7 @@ impl Resource for LocalResource {
9598
impl Local for LocalResource {
9699
type Remote = RemoteResource;
97100

98-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
101+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
99102
let socket = match addr {
100103
SocketAddr::V4(addr) if addr.ip().is_multicast() => {
101104
let listening_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, addr.port());

src/adapters/ws.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::network::adapter::{
44
};
55
use crate::network::{RemoteAddr, Readiness};
66
use crate::util::thread::{OTHER_THREAD_ERR};
7+
use crate::network::{TransportConnect, TransportListen};
78

89
use mio::event::{Source};
910
use mio::net::{TcpStream, TcpListener};
@@ -76,7 +77,10 @@ impl Resource for RemoteResource {
7677
}
7778

7879
impl Remote for RemoteResource {
79-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>> {
80+
fn connect_with(
81+
_: TransportConnect,
82+
remote_addr: RemoteAddr,
83+
) -> io::Result<ConnectionInfo<Self>> {
8084
let (peer_addr, url) = match remote_addr {
8185
RemoteAddr::Socket(addr) => {
8286
(addr, Url::parse(&format!("ws://{addr}/message-io-default")).unwrap())
@@ -328,7 +332,7 @@ impl Resource for LocalResource {
328332
impl Local for LocalResource {
329333
type Remote = RemoteResource;
330334

331-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
335+
fn listen_with(_: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
332336
let listener = TcpListener::bind(addr)?;
333337
let local_addr = listener.local_addr().unwrap();
334338
Ok(ListeningInfo { local: LocalResource { listener }, local_addr })

src/network.rs

+38-10
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use adapter::{SendStatus};
1717
pub use resource_id::{ResourceId, ResourceType};
1818
pub use endpoint::{Endpoint};
1919
pub use remote_addr::{RemoteAddr, ToRemoteAddr};
20-
pub use transport::{Transport};
20+
pub use transport::{Transport, TransportConnect, TransportListen};
2121
pub use driver::{NetEvent};
2222
pub use poll::{Readiness};
2323

@@ -101,12 +101,22 @@ impl NetworkController {
101101
&self,
102102
transport: Transport,
103103
addr: impl ToRemoteAddr,
104+
) -> io::Result<(Endpoint, SocketAddr)> {
105+
self.connect_with(transport.into(), addr)
106+
}
107+
108+
pub fn connect_with(
109+
&self,
110+
transport_connect: TransportConnect,
111+
addr: impl ToRemoteAddr,
104112
) -> io::Result<(Endpoint, SocketAddr)> {
105113
let addr = addr.to_remote_addr().unwrap();
106-
self.controllers[transport.id() as usize].connect(addr).map(|(endpoint, addr)| {
107-
log::trace!("Connect to {}", endpoint);
108-
(endpoint, addr)
109-
})
114+
self.controllers[transport_connect.id() as usize].connect_with(transport_connect, addr).map(
115+
|(endpoint, addr)| {
116+
log::trace!("Connect to {}", endpoint);
117+
(endpoint, addr)
118+
},
119+
)
110120
}
111121

112122
/// Creates a connection to the specified address.
@@ -147,7 +157,15 @@ impl NetworkController {
147157
transport: Transport,
148158
addr: impl ToRemoteAddr,
149159
) -> io::Result<(Endpoint, SocketAddr)> {
150-
let (endpoint, addr) = self.connect(transport, addr)?;
160+
self.connect_sync_with(transport.into(), addr)
161+
}
162+
163+
pub fn connect_sync_with(
164+
&self,
165+
transport_connect: TransportConnect,
166+
addr: impl ToRemoteAddr,
167+
) -> io::Result<(Endpoint, SocketAddr)> {
168+
let (endpoint, addr) = self.connect_with(transport_connect, addr)?;
151169
loop {
152170
std::thread::sleep(Duration::from_millis(1));
153171
match self.is_ready(endpoint.resource_id()) {
@@ -173,12 +191,22 @@ impl NetworkController {
173191
&self,
174192
transport: Transport,
175193
addr: impl ToSocketAddrs,
194+
) -> io::Result<(ResourceId, SocketAddr)> {
195+
self.listen_with(transport.into(), addr)
196+
}
197+
198+
pub fn listen_with(
199+
&self,
200+
transport_listen: TransportListen,
201+
addr: impl ToSocketAddrs,
176202
) -> io::Result<(ResourceId, SocketAddr)> {
177203
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
178-
self.controllers[transport.id() as usize].listen(addr).map(|(resource_id, addr)| {
179-
log::trace!("Listening at {} by {}", addr, resource_id);
180-
(resource_id, addr)
181-
})
204+
self.controllers[transport_listen.id() as usize].listen_with(transport_listen, addr).map(
205+
|(resource_id, addr)| {
206+
log::trace!("Listening at {} by {}", addr, resource_id);
207+
(resource_id, addr)
208+
},
209+
)
182210
}
183211

184212
/// Send the data message thought the connection represented by the given endpoint.

src/network/adapter.rs

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::network::transport::{TransportConnect, TransportListen};
2+
13
use super::remote_addr::{RemoteAddr};
24
use super::poll::{Readiness};
35

@@ -120,7 +122,10 @@ pub trait Remote: Resource + Sized {
120122
/// The [`RemoteAddr`] contains either a [`SocketAddr`] or a [`url::Url`].
121123
/// It is in charge of deciding what to do in both cases.
122124
/// It also must return the extracted address as `SocketAddr`.
123-
fn connect(remote_addr: RemoteAddr) -> io::Result<ConnectionInfo<Self>>;
125+
fn connect_with(
126+
config: TransportConnect,
127+
remote_addr: RemoteAddr,
128+
) -> io::Result<ConnectionInfo<Self>>;
124129

125130
/// Called when a remote resource received an event.
126131
/// The resource must be *ready* to receive this call.
@@ -193,7 +198,7 @@ pub trait Local: Resource + Sized {
193198
/// The **implementator** is in change of creating the corresponding local resource.
194199
/// It also must returned the listening address since it could not be the same as param `addr`
195200
/// (e.g. listening from port `0`).
196-
fn listen(addr: SocketAddr) -> io::Result<ListeningInfo<Self>>;
201+
fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>>;
197202

198203
/// Called when a local resource received an event.
199204
/// It means that some resource have tried to connect.

src/network/driver.rs

+23-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use super::poll::{Poll, Readiness};
44
use super::registry::{ResourceRegistry, Register};
55
use super::remote_addr::{RemoteAddr};
66
use super::adapter::{Adapter, Remote, Local, SendStatus, AcceptedType, ReadStatus, PendingStatus};
7+
use super::transport::{TransportConnect, TransportListen};
78

89
use std::net::{SocketAddr};
910
use std::sync::{
@@ -68,8 +69,16 @@ impl std::fmt::Debug for NetEvent<'_> {
6869
}
6970

7071
pub trait ActionController: Send + Sync {
71-
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)>;
72-
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)>;
72+
fn connect_with(
73+
&self,
74+
config: TransportConnect,
75+
addr: RemoteAddr,
76+
) -> io::Result<(Endpoint, SocketAddr)>;
77+
fn listen_with(
78+
&self,
79+
config: TransportListen,
80+
addr: SocketAddr,
81+
) -> io::Result<(ResourceId, SocketAddr)>;
7382
fn send(&self, endpoint: Endpoint, data: &[u8]) -> SendStatus;
7483
fn remove(&self, id: ResourceId) -> bool;
7584
fn is_ready(&self, id: ResourceId) -> Option<bool>;
@@ -136,8 +145,12 @@ impl<R: Remote, L: Local> Clone for Driver<R, L> {
136145
}
137146

138147
impl<R: Remote, L: Local> ActionController for Driver<R, L> {
139-
fn connect(&self, addr: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> {
140-
R::connect(addr).map(|info| {
148+
fn connect_with(
149+
&self,
150+
config: TransportConnect,
151+
addr: RemoteAddr,
152+
) -> io::Result<(Endpoint, SocketAddr)> {
153+
R::connect_with(config, addr).map(|info| {
141154
let id = self.remote_registry.register(
142155
info.remote,
143156
RemoteProperties::new(info.peer_addr, None),
@@ -147,8 +160,12 @@ impl<R: Remote, L: Local> ActionController for Driver<R, L> {
147160
})
148161
}
149162

150-
fn listen(&self, addr: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> {
151-
L::listen(addr).map(|info| {
163+
fn listen_with(
164+
&self,
165+
config: TransportListen,
166+
addr: SocketAddr,
167+
) -> io::Result<(ResourceId, SocketAddr)> {
168+
L::listen_with(config, addr).map(|info| {
152169
let id = self.local_registry.register(info.local, LocalProperties, false);
153170
(id, info.local_addr)
154171
})

src/network/loader.rs

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::network::{TransportConnect, TransportListen};
2+
13
use super::endpoint::{Endpoint};
24
use super::resource_id::{ResourceId};
35
use super::poll::{Poll, Readiness};
@@ -62,11 +64,19 @@ const UNIMPLEMENTED_DRIVER_ERR: &str =
6264

6365
struct UnimplementedDriver;
6466
impl ActionController for UnimplementedDriver {
65-
fn connect(&self, _: RemoteAddr) -> io::Result<(Endpoint, SocketAddr)> {
67+
fn connect_with(
68+
&self,
69+
_: TransportConnect,
70+
_: RemoteAddr,
71+
) -> io::Result<(Endpoint, SocketAddr)> {
6672
panic!("{}", UNIMPLEMENTED_DRIVER_ERR);
6773
}
6874

69-
fn listen(&self, _: SocketAddr) -> io::Result<(ResourceId, SocketAddr)> {
75+
fn listen_with(
76+
&self,
77+
_: TransportListen,
78+
_: SocketAddr,
79+
) -> io::Result<(ResourceId, SocketAddr)> {
7080
panic!("{}", UNIMPLEMENTED_DRIVER_ERR);
7181
}
7282

0 commit comments

Comments
 (0)