2
2
mod epoll {
3
3
use std:: io;
4
4
5
- pub const EPOLL_CTL_ADD : i32 = 1 ;
6
- pub const EPOLL_CTL_DEL : i32 = 2 ;
7
- pub const EPOLL_CTL_MOD : i32 = 3 ;
8
- pub const EPOLLIN : i32 = 0x1 ;
9
- pub const EPOLLONESHOT : i32 = 1 << 30 ;
10
- pub const EPOLLET : i32 = 1 << 31 ;
11
-
12
5
// Module providing unsafe ffi for Linux's epoll and epoll-adjacent syscalls.
13
6
mod ffi {
14
7
#[ link( name = "c" ) ]
@@ -96,14 +89,17 @@ pub mod blockers {
96
89
fd : RawFd ,
97
90
}
98
91
92
+ // Abstraction around `epoll`. Allows for blocking until an I/O event is ready.
99
93
impl IoBlocker {
94
+ // Initialize an `epoll` instance.
100
95
pub fn new ( ) -> io:: Result < Self > {
101
96
match epoll:: create ( ) {
102
97
Ok ( fd) => Ok ( IoBlocker { fd } ) ,
103
98
Err ( e) => Err ( e) ,
104
99
}
105
100
}
106
101
102
+ // Generate a registrator that can register events to this IoBlocker's epoll.
107
103
pub fn registrator ( & self ) -> Registrator {
108
104
Registrator { fd : self . fd }
109
105
}
@@ -113,7 +109,6 @@ pub mod blockers {
113
109
const TIMEOUT : i32 = -1 ; // Wait forever.
114
110
const MAX_EVENTS : i32 = 1024 ;
115
111
events. clear ( ) ;
116
- // std::thread::sleep(core::time::Duration::from_secs(1));
117
112
118
113
// println!("IoBlocker::block --> begin blocking");
119
114
let n_events = epoll:: wait ( self . fd , events, MAX_EVENTS , TIMEOUT ) ?;
@@ -127,15 +122,19 @@ pub mod blockers {
127
122
}
128
123
}
129
124
130
- // Ensure that we close held file descriptor.
125
+ // Ensure that we close held `epoll` file descriptor.
131
126
impl Drop for IoBlocker {
132
127
fn drop ( & mut self ) {
133
128
// println!("IoBlocker::drop --> dropping");
134
129
epoll:: close ( self . fd ) . unwrap ( ) ;
135
130
}
136
131
}
137
132
138
- // Struct to get around Rust ownership issues
133
+ // There should only be a single IoBlocker for a given `epoll` instance,
134
+ // since IoBlocker is responsible for creating and destroying its file descriptor.
135
+ // However, we need some way for multiple different I/O events to register
136
+ // themselves to an `epoll`. For this reason, we need a class that can be
137
+ // duplicated such that each task can own its own copy. This is the Registrator.
139
138
#[ derive( Debug , Clone , Copy ) ] // Helpful default traits.
140
139
pub struct Registrator {
141
140
fd : RawFd ,
@@ -147,21 +146,21 @@ pub mod blockers {
147
146
let fd = interest. as_raw_fd ( ) ;
148
147
149
148
// Listen for file to become readable.
150
- let mut event = epoll:: Event :: new ( epoll :: EPOLLIN | epoll :: EPOLLONESHOT , token) ;
151
- match epoll:: ctl ( self . fd , epoll :: EPOLL_CTL_ADD , fd, & mut event) {
149
+ let mut event = epoll:: Event :: new ( libc :: EPOLLIN | libc :: EPOLLONESHOT , token) ;
150
+ match epoll:: ctl ( self . fd , libc :: EPOLL_CTL_ADD , fd, & mut event) {
152
151
Ok ( _) => Ok ( ( ) ) ,
153
152
Err ( e) if e. kind ( ) == io:: ErrorKind :: AlreadyExists => {
154
153
// one-shot epoll consumed, so needs to be re-armed.
155
- epoll:: ctl ( self . fd , epoll :: EPOLL_CTL_MOD , fd, & mut event)
156
- } ,
154
+ epoll:: ctl ( self . fd , libc :: EPOLL_CTL_MOD , fd, & mut event)
155
+ }
157
156
Err ( e) => Err ( e) ,
158
157
}
159
158
}
160
159
161
160
pub fn unregister ( & self , interest : & impl AsRawFd , token : usize ) -> io:: Result < ( ) > {
162
161
let fd = interest. as_raw_fd ( ) ;
163
- let mut event = epoll:: Event :: new ( epoll :: EPOLLIN | epoll :: EPOLLET , token) ;
164
- match epoll:: ctl ( self . fd , epoll :: EPOLL_CTL_DEL , fd, & mut event) {
162
+ let mut event = epoll:: Event :: new ( libc :: EPOLLIN | libc :: EPOLLET , token) ;
163
+ match epoll:: ctl ( self . fd , libc :: EPOLL_CTL_DEL , fd, & mut event) {
165
164
Ok ( _) => Ok ( ( ) ) ,
166
165
Err ( e) => Err ( e) ,
167
166
}
@@ -172,12 +171,14 @@ pub mod blockers {
172
171
// Provides future API for awaitable services. For use by external crates.
173
172
// In this case, the only provided service is a TcpStream.
174
173
pub mod services {
175
- use crate :: blockers;
174
+ use crate :: blockers:: { self , Registrator } ;
176
175
use futures:: io:: { AsyncRead , AsyncWrite } ;
176
+ use socket2:: { Domain , Protocol , SockAddr , Socket , Type } ;
177
177
use std:: io:: { self , Error , Read , Write } ;
178
178
use std:: net:: Shutdown ;
179
179
use std:: os:: unix:: io:: { AsRawFd , RawFd } ;
180
180
use std:: {
181
+ future:: Future ,
181
182
net,
182
183
pin:: Pin ,
183
184
task:: { Context , Poll } ,
@@ -190,23 +191,85 @@ pub mod services {
190
191
token : usize ,
191
192
}
192
193
194
+ pub struct ConnectStreamFuture < ' a > {
195
+ socket : & ' a Socket ,
196
+ addr : SockAddr ,
197
+ registrator : Registrator ,
198
+ token : usize ,
199
+ }
200
+
201
+ impl Future for ConnectStreamFuture < ' _ > {
202
+ type Output = Result < ( ) , io:: Error > ;
203
+
204
+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
205
+ match self . socket . connect ( & self . addr ) {
206
+ // Finished connecting, done polling.
207
+ Ok ( _) => Poll :: Ready ( Ok ( ( ) ) ) ,
208
+ // Since we set this TcpStream to non-blocking, check if
209
+ // return value still not ready.
210
+ Err ( e)
211
+ if e. raw_os_error ( ) == Some ( libc:: EINPROGRESS )
212
+ || e. kind ( ) == io:: ErrorKind :: WouldBlock =>
213
+ {
214
+ self . registrator . register ( self . socket , self . token ) . unwrap ( ) ;
215
+ Poll :: Pending
216
+ }
217
+ // Error occurred, done polling.
218
+ Err ( e) => Poll :: Ready ( Err ( e) ) ,
219
+ }
220
+ }
221
+ }
222
+
193
223
impl TcpStream {
194
- pub fn connect (
195
- addr : impl net:: ToSocketAddrs ,
224
+ // Asynchronously connect this stream to a socket address.
225
+ // Reference: https://docs.rs/async-io/latest/src/async_io/lib.rs.html#1867
226
+ pub async fn connect (
227
+ addr_generator : impl net:: ToSocketAddrs ,
196
228
registrator : blockers:: Registrator ,
197
229
token : usize ,
198
230
) -> io:: Result < Self > {
199
- let stream = net:: TcpStream :: connect ( addr) ?;
200
- stream. set_nonblocking ( true ) ?;
231
+ let sock_type = Type :: STREAM . nonblocking ( ) ;
232
+ let protocol = Protocol :: TCP ;
233
+ let socket_addr = addr_generator. to_socket_addrs ( ) . unwrap ( ) . next ( ) . unwrap ( ) ;
234
+ let domain = Domain :: for_address ( socket_addr) ;
235
+
236
+ let socket = Socket :: new ( domain, sock_type, Some ( protocol) )
237
+ . unwrap_or_else ( |_| panic ! ( "Failed to create socket at address {}" , socket_addr) ) ;
238
+
239
+ // wait for the socket to become writeable.
240
+ TcpStream :: sock_connect (
241
+ & socket,
242
+ SockAddr :: from ( socket_addr) ,
243
+ registrator. clone ( ) ,
244
+ token,
245
+ )
246
+ . await ?;
247
+
248
+ let stream = net:: TcpStream :: from ( socket) ;
201
249
202
250
Ok ( TcpStream {
203
251
inner : stream,
204
252
registrator,
205
253
token,
206
254
} )
207
255
}
256
+
257
+ fn sock_connect (
258
+ socket : & Socket ,
259
+ addr : SockAddr ,
260
+ registrator : Registrator ,
261
+ token : usize ,
262
+ ) -> ConnectStreamFuture {
263
+ return ConnectStreamFuture {
264
+ socket,
265
+ addr,
266
+ registrator,
267
+ token,
268
+ } ;
269
+ }
208
270
}
209
271
272
+ // Allow calling async read operations on socket.
210
273
impl AsyncRead for TcpStream {
211
274
fn poll_read (
212
275
self : Pin < & mut Self > ,
@@ -230,6 +293,7 @@ pub mod services {
230
293
}
231
294
}
232
295
296
+ // Allow calling async write operations on socket.
233
297
impl AsyncWrite for TcpStream {
234
298
fn poll_write (
235
299
self : Pin < & mut Self > ,
@@ -308,6 +372,7 @@ pub mod runtime {
308
372
// We use a `SyncSender` as opposed to a regular `Sender` to allow for sharing
309
373
// the sender between threads.
310
374
ready_queue : mpsc:: SyncSender < usize > ,
375
+ // Unique identifier for this task.
311
376
token : usize ,
312
377
}
313
378
@@ -348,7 +413,6 @@ pub mod runtime {
348
413
}
349
414
}
350
415
351
-
352
416
impl Reactor {
353
417
// Create a new Reactor instance.
354
418
pub fn new ( sender : mpsc:: SyncSender < usize > ) -> Reactor {
@@ -400,7 +464,10 @@ pub mod runtime {
400
464
let mut finished = false ;
401
465
{
402
466
// println!("Executor::run --> received {}", token);
403
- let task = self . task_map . get ( & token) . unwrap_or_else ( || panic ! ( "Invalid task token {}" , token) ) ;
467
+ let task = self
468
+ . task_map
469
+ . get ( & token)
470
+ . unwrap_or_else ( || panic ! ( "Invalid task token {}" , token) ) ;
404
471
let future = & mut * task. future . lock ( ) . unwrap ( ) ;
405
472
406
473
let waker = Waker :: from ( Arc :: clone ( & task) ) ;
@@ -414,7 +481,7 @@ pub mod runtime {
414
481
// be pointing to garbage upon calling remove.
415
482
if finished {
416
483
self . task_map . remove ( & token) . unwrap ( ) ;
417
-
484
+
418
485
// Done processing all suspended events.
419
486
if self . task_map . is_empty ( ) {
420
487
break ;
0 commit comments