@@ -79,9 +79,22 @@ public final class NIOThreadPool {
79
79
/// It should never be "leaked" outside of the lock block.
80
80
case modifying
81
81
}
82
- private let semaphore = DispatchSemaphore ( value: 0 )
83
- private let lock = NIOLock ( )
84
- private var threads : [ NIOThread ] ? = nil // protected by `lock`
82
+
83
+ /// Whether threads in the pool have work.
84
+ private enum WorkState : Hashable {
85
+ case hasWork
86
+ case hasNoWork
87
+ }
88
+
89
+ // The condition lock is used in place of a lock and a semaphore to avoid warnings from the
90
+ // thread performance checker.
91
+ //
92
+ // Only the worker threads wait for the condition lock to take a value, no other threads need
93
+ // to wait for a given value. The value indicates whether the thread has some work to do. Work
94
+ // in this case can be either processing a work item or exiting the threads processing
95
+ // loop (i.e. shutting down).
96
+ private let conditionLock : ConditionLock < WorkState >
97
+ private var threads : [ NIOThread ] ? = nil // protected by `conditionLock`
85
98
private var state : State = . stopped
86
99
87
100
// WorkItems don't have a handle so they can't be cancelled directly. Instead an ID is assigned
@@ -124,7 +137,7 @@ public final class NIOThreadPool {
124
137
return
125
138
}
126
139
127
- let threadsToJoin = self . lock . withLock { ( ) -> [ NIOThread ] in
140
+ let threadsToJoin = self . conditionLock . withLock {
128
141
switch self . state {
129
142
case . running( let items) :
130
143
self . state = . modifying
@@ -133,17 +146,17 @@ public final class NIOThreadPool {
133
146
item. workItem ( . cancelled)
134
147
}
135
148
}
136
- self . state = . shuttingDown( Array ( repeating: true , count: numberOfThreads) )
137
- for _ in ( 0 ..< numberOfThreads) {
138
- self . semaphore. signal ( )
139
- }
149
+ self . state = . shuttingDown( Array ( repeating: true , count: self . numberOfThreads) )
150
+
140
151
let threads = self . threads!
141
- defer {
142
- self . threads = nil
143
- }
144
- return threads
152
+ self . threads = nil
153
+
154
+ // Each thread has work to do: shutdown.
155
+ return ( unlockWith: . hasWork, result: threads)
156
+
145
157
case . shuttingDown, . stopped:
146
- return [ ]
158
+ return ( unlockWith: nil , result: [ ] )
159
+
147
160
case . modifying:
148
161
fatalError ( " .modifying state misuse " )
149
162
}
@@ -171,22 +184,33 @@ public final class NIOThreadPool {
171
184
}
172
185
173
186
private func _submit( id: Int ? , _ body: @escaping WorkItem ) {
174
- let item = self . lock. withLock { ( ) -> WorkItem ? in
187
+ let submitted = self . conditionLock. withLock {
188
+ let workState : WorkState
189
+ let submitted : Bool
190
+
175
191
switch self . state {
176
192
case . running( var items) :
177
193
self . state = . modifying
178
194
items. append ( . init( workItem: body, id: id) )
179
195
self . state = . running( items)
180
- self . semaphore. signal ( )
181
- return nil
196
+ workState = items. isEmpty ? . hasNoWork : . hasWork
197
+ submitted = true
198
+
182
199
case . shuttingDown, . stopped:
183
- return body
200
+ workState = . hasNoWork
201
+ submitted = false
202
+
184
203
case . modifying:
185
204
fatalError ( " .modifying state misuse " )
186
205
}
206
+
207
+ return ( unlockWith: workState, result: submitted)
187
208
}
209
+
188
210
// if item couldn't be added run it immediately indicating that it couldn't be run
189
- item. map { $0 ( . cancelled) }
211
+ if !submitted {
212
+ body ( . cancelled)
213
+ }
190
214
}
191
215
192
216
/// Initialize a `NIOThreadPool` thread pool with `numberOfThreads` threads.
@@ -209,17 +233,18 @@ public final class NIOThreadPool {
209
233
private init ( numberOfThreads: Int , canBeStopped: Bool ) {
210
234
self . numberOfThreads = numberOfThreads
211
235
self . canBeStopped = canBeStopped
236
+ self . conditionLock = ConditionLock ( value: . hasNoWork)
212
237
}
213
238
214
239
private func process( identifier: Int ) {
215
240
var itemAndState : ( item: WorkItem , state: WorkItemState ) ? = nil
216
241
217
242
repeat {
218
- // wait until work has become available
219
- itemAndState = nil // ensure previous work item is not retained for duration of semaphore wait
220
- self . semaphore. wait ( )
243
+ itemAndState = nil // ensure previous work item is not retained while waiting for the condition
244
+ itemAndState = self . conditionLock. withLock ( when: . hasWork) {
245
+ let workState : WorkState
246
+ let result : ( WorkItem , WorkItemState ) ?
221
247
222
- itemAndState = self . lock. withLock { ( ) -> ( WorkItem , WorkItemState ) ? in
223
248
switch self . state {
224
249
case . running( var items) :
225
250
self . state = . modifying
@@ -233,18 +258,32 @@ public final class NIOThreadPool {
233
258
}
234
259
235
260
self . state = . running( items)
236
- return ( itemAndID. workItem, state)
261
+
262
+ workState = items. isEmpty ? . hasNoWork : . hasWork
263
+ result = ( itemAndID. workItem, state)
264
+
237
265
case . shuttingDown( var aliveStates) :
266
+ self . state = . modifying
238
267
assert ( aliveStates [ identifier] )
239
268
aliveStates [ identifier] = false
240
269
self . state = . shuttingDown( aliveStates)
241
- return nil
270
+
271
+ // Unlock with '.hasWork' to resume any other threads waiting to shutdown.
272
+ workState = . hasWork
273
+ result = nil
274
+
242
275
case . stopped:
243
- return nil
276
+ // Unreachable: 'stopped' is the initial state which is left when starting the
277
+ // thread pool, and before any thread calls this function.
278
+ fatalError ( " Invalid state " )
279
+
244
280
case . modifying:
245
281
fatalError ( " .modifying state misuse " )
246
282
}
283
+
284
+ return ( unlockWith: workState, result: result)
247
285
}
286
+
248
287
// if there was a work item popped, run it
249
288
itemAndState. map { item, state in item ( state) }
250
289
} while itemAndState != nil
@@ -256,16 +295,24 @@ public final class NIOThreadPool {
256
295
}
257
296
258
297
public func _start( threadNamePrefix: String ) {
259
- let alreadyRunning : Bool = self . lock . withLock {
298
+ let alreadyRunning = self . conditionLock . withLock {
260
299
switch self . state {
261
- case . running( _) :
262
- return true
263
- case . shuttingDown( _) :
300
+ case . running:
301
+ // Already running, this has no effect on whether there is more work for the
302
+ // threads to run.
303
+ return ( unlockWith: nil , result: true )
304
+
305
+ case . shuttingDown:
264
306
// This should never happen
265
307
fatalError ( " start() called while in shuttingDown " )
308
+
266
309
case . stopped:
267
310
self . state = . running( Deque ( minimumCapacity: 16 ) )
268
- return false
311
+ assert ( self . threads == nil )
312
+ self . threads = [ ]
313
+ self . threads!. reserveCapacity ( self . numberOfThreads)
314
+ return ( unlockWith: . hasNoWork, result: false )
315
+
269
316
case . modifying:
270
317
fatalError ( " .modifying state misuse " )
271
318
}
@@ -278,31 +325,50 @@ public final class NIOThreadPool {
278
325
// We use this condition lock as a tricky kind of semaphore.
279
326
// This is done to sidestep the thread performance checker warning
280
327
// that would otherwise be emitted.
281
- let cond = ConditionLock ( value: 0 )
282
-
283
- self . lock. withLock {
284
- assert ( self . threads == nil )
285
- self . threads = [ ]
286
- self . threads? . reserveCapacity ( self . numberOfThreads)
287
- }
288
-
328
+ let readyThreads = ConditionLock ( value: 0 )
289
329
for id in 0 ..< self . numberOfThreads {
290
330
// We should keep thread names under 16 characters because Linux doesn't allow more.
291
331
NIOThread . spawnAndRun ( name: " \( threadNamePrefix) \( id) " , detachThread: false ) { thread in
292
- self . lock. withLock {
293
- self . threads!. append ( thread)
294
- cond. lock ( )
295
- cond. unlock ( withValue: self . threads!. count)
332
+ readyThreads. withLock {
333
+ let threadCount = self . conditionLock. withLock {
334
+ self . threads!. append ( thread)
335
+ let workState : WorkState
336
+
337
+ switch self . state {
338
+ case . running( let items) :
339
+ workState = items. isEmpty ? . hasNoWork : . hasWork
340
+ case . shuttingDown:
341
+ // The thread has work to do: it's shutting down.
342
+ workState = . hasWork
343
+ case . stopped:
344
+ // Unreachable: .stopped always transitions to .running in the function
345
+ // and .stopped is never entered again.
346
+ fatalError ( " Invalid state " )
347
+ case . modifying:
348
+ fatalError ( " .modifying state misuse " )
349
+ }
350
+
351
+ let threadCount = self . threads!. count
352
+ return ( unlockWith: workState, result: threadCount)
353
+ }
354
+
355
+ return ( unlockWith: threadCount, result: ( ) )
296
356
}
297
357
298
358
self . process ( identifier: id)
299
359
return ( )
300
360
}
301
361
}
302
362
303
- cond. lock ( whenValue: self . numberOfThreads)
304
- cond. unlock ( )
305
- assert ( self . lock. withLock { self . threads? . count ?? - 1 } == self . numberOfThreads)
363
+ readyThreads. lock ( whenValue: self . numberOfThreads)
364
+ readyThreads. unlock ( )
365
+
366
+ func threadCount( ) -> Int {
367
+ self . conditionLock. withLock {
368
+ ( unlockWith: nil , result: self . threads? . count ?? - 1 )
369
+ }
370
+ }
371
+ assert ( threadCount ( ) == self . numberOfThreads)
306
372
}
307
373
308
374
deinit {
@@ -374,8 +440,9 @@ extension NIOThreadPool {
374
440
}
375
441
}
376
442
} onCancel: {
377
- self . lock . withLockVoid {
443
+ self . conditionLock . withLock {
378
444
self . cancelledWorkIDs. insert ( workID)
445
+ return ( unlockWith: nil , result: ( ) )
379
446
}
380
447
}
381
448
}
@@ -427,3 +494,31 @@ extension NIOThreadPool {
427
494
}
428
495
}
429
496
}
497
+
498
+ extension ConditionLock {
499
+ @inlinable
500
+ func _lock( when value: T ? ) {
501
+ if let value = value {
502
+ self . lock ( whenValue: value)
503
+ } else {
504
+ self . lock ( )
505
+ }
506
+ }
507
+
508
+ @inlinable
509
+ func _unlock( with value: T ? ) {
510
+ if let value = value {
511
+ self . unlock ( withValue: value)
512
+ } else {
513
+ self . unlock ( )
514
+ }
515
+ }
516
+
517
+ @inlinable
518
+ func withLock< Result> ( when value: T ? = nil , _ body: ( ) -> ( unlockWith: T ? , result: Result ) ) -> Result {
519
+ self . _lock ( when: value)
520
+ let ( unlockValue, result) = body ( )
521
+ self . _unlock ( with: unlockValue)
522
+ return result
523
+ }
524
+ }
0 commit comments