1+ use crate :: lock:: waiter:: WaiterSet ;
12use futures_core:: future:: { FusedFuture , Future } ;
2- use futures_core:: task:: { Context , Poll , Waker } ;
3- use slab:: Slab ;
3+ use futures_core:: task:: { Context , Poll } ;
44use std:: cell:: UnsafeCell ;
5+ use std:: fmt;
56use std:: ops:: { Deref , DerefMut } ;
67use std:: pin:: Pin ;
78use std:: process;
89use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
9- use std:: sync:: Mutex as StdMutex ;
10- use std:: { fmt, mem} ;
1110
1211/// A futures-aware read-write lock.
1312pub struct RwLock < T : ?Sized > {
1413 state : AtomicUsize ,
15- read_waiters : StdMutex < Slab < Waiter > > ,
16- write_waiters : StdMutex < Slab < Waiter > > ,
14+ readers : WaiterSet ,
15+ writers : WaiterSet ,
1716 value : UnsafeCell < T > ,
1817}
1918
@@ -22,49 +21,24 @@ impl<T: ?Sized> fmt::Debug for RwLock<T> {
2221 let state = self . state . load ( Ordering :: SeqCst ) ;
2322 f. debug_struct ( "RwLock" )
2423 . field ( "is_locked" , & ( ( state & IS_LOCKED ) != 0 ) )
25- . field ( "has_writers" , & ( ( state & HAS_WRITERS ) != 0 ) )
26- . field ( "has_readers" , & ( ( state & HAS_READERS ) != 0 ) )
27- . field ( "active_readers" , & ( ( state & READ_COUNT_MASK ) >> 3 ) )
24+ . field ( "readers" , & ( ( state & READ_COUNT ) >> 1 ) )
2825 . finish ( )
2926 }
3027}
3128
32- enum Waiter {
33- Waiting ( Waker ) ,
34- Woken ,
35- }
36-
37- impl Waiter {
38- fn register ( & mut self , waker : & Waker ) {
39- match self {
40- Waiter :: Waiting ( w) if waker. will_wake ( w) => { }
41- _ => * self = Waiter :: Waiting ( waker. clone ( ) ) ,
42- }
43- }
44-
45- fn wake ( & mut self ) {
46- match mem:: replace ( self , Waiter :: Woken ) {
47- Waiter :: Waiting ( waker) => waker. wake ( ) ,
48- Waiter :: Woken => { }
49- }
50- }
51- }
52-
5329#[ allow( clippy:: identity_op) ]
5430const IS_LOCKED : usize = 1 << 0 ;
55- const HAS_WRITERS : usize = 1 << 1 ;
56- const HAS_READERS : usize = 1 << 2 ;
57- const ONE_READER : usize = 1 << 3 ;
58- const READ_COUNT_MASK : usize = !( ONE_READER - 1 ) ;
59- const MAX_READERS : usize = usize:: max_value ( ) >> 3 ;
31+ const ONE_READER : usize = 1 << 1 ;
32+ const READ_COUNT : usize = !( ONE_READER - 1 ) ;
33+ const MAX_READERS : usize = usize:: max_value ( ) >> 1 ;
6034
6135impl < T > RwLock < T > {
6236 /// Creates a new futures-aware read-write lock.
6337 pub fn new ( t : T ) -> RwLock < T > {
6438 RwLock {
6539 state : AtomicUsize :: new ( 0 ) ,
66- read_waiters : StdMutex :: new ( Slab :: new ( ) ) ,
67- write_waiters : StdMutex :: new ( Slab :: new ( ) ) ,
40+ readers : WaiterSet :: new ( ) ,
41+ writers : WaiterSet :: new ( ) ,
6842 value : UnsafeCell :: new ( t) ,
6943 }
7044 }
@@ -165,40 +139,6 @@ impl<T: ?Sized> RwLock<T> {
165139 pub fn get_mut ( & mut self ) -> & mut T {
166140 unsafe { & mut * self . value . get ( ) }
167141 }
168-
169- fn remove_reader ( & self , wait_key : usize ) {
170- if wait_key != WAIT_KEY_NONE {
171- let mut readers = self . read_waiters . lock ( ) . unwrap ( ) ;
172- // No need to check whether another waiter needs to be
173- // woken up since no other readers depend on this.
174- readers. remove ( wait_key) ;
175- if readers. is_empty ( ) {
176- self . state . fetch_and ( !HAS_READERS , Ordering :: Relaxed ) ;
177- }
178- }
179- }
180-
181- fn remove_writer ( & self , wait_key : usize , wake_another : bool ) {
182- if wait_key != WAIT_KEY_NONE {
183- let mut writers = self . write_waiters . lock ( ) . unwrap ( ) ;
184- match writers. remove ( wait_key) {
185- Waiter :: Waiting ( _) => { }
186- Waiter :: Woken => {
187- // We were awoken, but then dropped before we could
188- // wake up to acquire the lock. Wake up another
189- // waiter.
190- if wake_another {
191- if let Some ( ( _, waiter) ) = writers. iter_mut ( ) . next ( ) {
192- waiter. wake ( ) ;
193- }
194- }
195- }
196- }
197- if writers. is_empty ( ) {
198- self . state . fetch_and ( !HAS_WRITERS , Ordering :: Relaxed ) ;
199- }
200- }
201- }
202142}
203143
204144// Sentinel for when no slot in the `Slab` has been dedicated to this object.
@@ -244,27 +184,23 @@ impl<'a, T: ?Sized> Future for RwLockReadFuture<'a, T> {
244184 . expect ( "polled RwLockReadFuture after completion" ) ;
245185
246186 if let Some ( lock) = rwlock. try_read ( ) {
247- rwlock. remove_reader ( self . wait_key ) ;
187+ if self . wait_key != WAIT_KEY_NONE {
188+ rwlock. readers . remove ( self . wait_key ) ;
189+ }
248190 self . rwlock = None ;
249191 return Poll :: Ready ( lock) ;
250192 }
251193
252- {
253- let mut readers = rwlock. read_waiters . lock ( ) . unwrap ( ) ;
254- if self . wait_key == WAIT_KEY_NONE {
255- self . wait_key = readers. insert ( Waiter :: Waiting ( cx. waker ( ) . clone ( ) ) ) ;
256- if readers. len ( ) == 1 {
257- rwlock. state . fetch_or ( HAS_READERS , Ordering :: Relaxed ) ;
258- }
259- } else {
260- readers[ self . wait_key ] . register ( cx. waker ( ) ) ;
261- }
194+ if self . wait_key == WAIT_KEY_NONE {
195+ self . wait_key = rwlock. readers . insert ( cx. waker ( ) ) ;
196+ } else {
197+ rwlock. readers . register ( self . wait_key , cx. waker ( ) ) ;
262198 }
263199
264200 // Ensure that we haven't raced `RwLockWriteGuard::drop`'s unlock path by
265201 // attempting to acquire the lock again.
266202 if let Some ( lock) = rwlock. try_read ( ) {
267- rwlock. remove_reader ( self . wait_key ) ;
203+ rwlock. readers . remove ( self . wait_key ) ;
268204 self . rwlock = None ;
269205 return Poll :: Ready ( lock) ;
270206 }
@@ -276,7 +212,9 @@ impl<'a, T: ?Sized> Future for RwLockReadFuture<'a, T> {
276212impl < T : ?Sized > Drop for RwLockReadFuture < ' _ , T > {
277213 fn drop ( & mut self ) {
278214 if let Some ( rwlock) = self . rwlock {
279- rwlock. remove_reader ( self . wait_key ) ;
215+ if self . wait_key != WAIT_KEY_NONE {
216+ rwlock. readers . remove ( self . wait_key ) ;
217+ }
280218 }
281219 }
282220}
@@ -320,28 +258,24 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
320258 . expect ( "polled RwLockWriteFuture after completion" ) ;
321259
322260 if let Some ( lock) = rwlock. try_write ( ) {
323- rwlock. remove_writer ( self . wait_key , false ) ;
261+ if self . wait_key != WAIT_KEY_NONE {
262+ rwlock. writers . remove ( self . wait_key ) ;
263+ }
324264 self . rwlock = None ;
325265 return Poll :: Ready ( lock) ;
326266 }
327267
328- {
329- let mut writers = rwlock. write_waiters . lock ( ) . unwrap ( ) ;
330- if self . wait_key == WAIT_KEY_NONE {
331- self . wait_key = writers. insert ( Waiter :: Waiting ( cx. waker ( ) . clone ( ) ) ) ;
332- if writers. len ( ) == 1 {
333- rwlock. state . fetch_or ( HAS_WRITERS , Ordering :: Relaxed ) ;
334- }
335- } else {
336- writers[ self . wait_key ] . register ( cx. waker ( ) ) ;
337- }
268+ if self . wait_key == WAIT_KEY_NONE {
269+ self . wait_key = rwlock. writers . insert ( cx. waker ( ) ) ;
270+ } else {
271+ rwlock. writers . register ( self . wait_key , cx. waker ( ) ) ;
338272 }
339273
340274 // Ensure that we haven't raced `RwLockWriteGuard::drop` or
341275 // `RwLockReadGuard::drop`'s unlock path by attempting to acquire
342276 // the lock again.
343277 if let Some ( lock) = rwlock. try_write ( ) {
344- rwlock. remove_writer ( self . wait_key , false ) ;
278+ rwlock. writers . remove ( self . wait_key ) ;
345279 self . rwlock = None ;
346280 return Poll :: Ready ( lock) ;
347281 }
@@ -353,11 +287,13 @@ impl<'a, T: ?Sized> Future for RwLockWriteFuture<'a, T> {
353287impl < T : ?Sized > Drop for RwLockWriteFuture < ' _ , T > {
354288 fn drop ( & mut self ) {
355289 if let Some ( rwlock) = self . rwlock {
356- // This future was dropped before it acquired the rwlock.
357- //
358- // Remove ourselves from the map, waking up another waiter if we
359- // had been awoken to acquire the lock.
360- rwlock. remove_writer ( self . wait_key , true ) ;
290+ if self . wait_key != WAIT_KEY_NONE {
291+ // This future was dropped before it acquired the rwlock.
292+ //
293+ // Remove ourselves from the map, waking up another waiter if we
294+ // had been awoken to acquire the lock.
295+ rwlock. writers . cancel ( self . wait_key ) ;
296+ }
361297 }
362298 }
363299}
@@ -381,11 +317,8 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockReadGuard<'_, T> {
381317impl < T : ?Sized > Drop for RwLockReadGuard < ' _ , T > {
382318 fn drop ( & mut self ) {
383319 let old_state = self . rwlock . state . fetch_sub ( ONE_READER , Ordering :: SeqCst ) ;
384- if old_state & READ_COUNT_MASK == ONE_READER && old_state & HAS_WRITERS != 0 {
385- let mut writers = self . rwlock . write_waiters . lock ( ) . unwrap ( ) ;
386- if let Some ( ( _, waiter) ) = writers. iter_mut ( ) . next ( ) {
387- waiter. wake ( ) ;
388- }
320+ if old_state & READ_COUNT == ONE_READER {
321+ self . rwlock . writers . notify_any ( ) ;
389322 }
390323 }
391324}
@@ -421,21 +354,9 @@ impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLockWriteGuard<'_, T> {
421354
422355impl < T : ?Sized > Drop for RwLockWriteGuard < ' _ , T > {
423356 fn drop ( & mut self ) {
424- let old_state = self . rwlock . state . fetch_and ( !IS_LOCKED , Ordering :: AcqRel ) ;
425- match ( old_state & HAS_WRITERS , old_state & HAS_READERS ) {
426- ( 0 , 0 ) => { }
427- ( 0 , _) => {
428- let mut readers = self . rwlock . read_waiters . lock ( ) . unwrap ( ) ;
429- for ( _, waiter) in readers. iter_mut ( ) {
430- waiter. wake ( ) ;
431- }
432- }
433- _ => {
434- let mut writers = self . rwlock . write_waiters . lock ( ) . unwrap ( ) ;
435- if let Some ( ( _, waiter) ) = writers. iter_mut ( ) . next ( ) {
436- waiter. wake ( ) ;
437- }
438- }
357+ self . rwlock . state . store ( 0 , Ordering :: SeqCst ) ;
358+ if !self . rwlock . readers . notify_all ( ) {
359+ self . rwlock . writers . notify_any ( ) ;
439360 }
440361 }
441362}
0 commit comments