Skip to content

Commit fb97b97

Browse files
committed
Use DispatchSourceTimer for delayed background tasks
1 parent 454c3b4 commit fb97b97

File tree

6 files changed

+134
-79
lines changed

6 files changed

+134
-79
lines changed

AirMessage/Connection/ClientConnection.swift

+31-21
Original file line numberDiff line numberDiff line change
@@ -60,49 +60,59 @@ class ClientConnection {
6060
case pingExpiry
6161
}
6262

63-
struct RunningTimer {
64-
let timer: Timer
65-
let callback: (ClientConnection) -> Void
66-
}
67-
68-
private var timerDict: [TimerType: RunningTimer] = [:]
63+
private var timerDict: [TimerType: DispatchSourceTimer] = [:]
6964

7065
/**
7166
Cancels all pending expiry timers
7267
*/
7368
func cancelAllTimers() {
7469
for (_, runningTimer) in timerDict {
75-
runningTimer.timer.invalidate()
70+
runningTimer.cancel()
7671
}
7772
timerDict.removeAll()
7873
}
7974

8075
/**
8176
Cancels the expiry timer of the specified type
8277
*/
83-
func cancelTimer(ofType type: TimerType) {
84-
timerDict[type]?.timer.invalidate()
78+
func cancelTimer(ofType type: TimerType, queue: DispatchQueue) {
79+
//Make sure we're on the provided dispatch queue
80+
assertDispatchQueue(queue)
81+
82+
timerDict[type]?.cancel()
8583
timerDict[type] = nil
8684
}
8785

8886
/**
8987
Schedules a timer of the specified type after interval to run the callback
9088
If a timer was previously scheduled of this type, that timer is cancelled and replaced with this one
9189
*/
92-
func startTimer(ofType type: TimerType, interval: TimeInterval, callback: @escaping (ClientConnection) -> Void) {
90+
func startTimer(ofType type: TimerType, interval: TimeInterval, queue: DispatchQueue, callback: @escaping (ClientConnection) -> Void) {
91+
//Make sure we're on the provided dispatch queue
92+
assertDispatchQueue(queue)
93+
9394
//Cancel existing timers
94-
cancelTimer(ofType: type)
95+
cancelTimer(ofType: type, queue: queue)
9596

9697
//Create and start the timer
97-
let timer = Timer(timeInterval: interval, target: self, selector: #selector(onTimerExpire), userInfo: type, repeats: false)
98-
RunLoop.main.add(timer, forMode: .common)
99-
timerDict[type] = RunningTimer(timer: timer, callback: callback)
100-
}
101-
102-
@objc private func onTimerExpire(timer: Timer) {
103-
//Invoke the callback of the specified type
104-
let type = timer.userInfo as! TimerType
105-
timerDict[type]?.callback(self)
106-
timerDict[type] = nil
98+
let timer = DispatchSource.makeTimerSource(queue: queue)
99+
timer.schedule(deadline: .now() + interval, repeating: .never)
100+
timer.setEventHandler { [weak self] in
101+
//Make sure we're still on the same queue
102+
assertDispatchQueue(queue)
103+
104+
//Check our reference to self
105+
guard let self = self else {
106+
return
107+
}
108+
109+
//Invoke the callback
110+
callback(self)
111+
112+
//Remove this timer
113+
self.timerDict[type] = nil
114+
}
115+
timer.resume()
116+
timerDict[type] = timer
107117
}
108118
}

AirMessage/Connection/CommConst.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class CommConst {
1313
//Timeouts
1414
static let handshakeTimeout: TimeInterval = 10 //10 seconds
1515
static let pingTimeout: TimeInterval = 30 //30 seconds
16-
static let keepAliveMillis: TimeInterval = 30 * 60 //30 minutes
16+
static let keepAliveInterval: TimeInterval = 30 * 60 //30 minutes
1717

1818
static let maxPacketAllocation = 50 * 1024 * 1024 //50 MB
1919

AirMessage/Connection/Connect/DataProxyConnect.swift

+61-31
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ class DataProxyConnect: DataProxy {
2323
private var idToken: String?
2424

2525
private var webSocket: WebSocket?
26-
private var handshakeTimer: Timer?
27-
private var pingTimer: Timer?
26+
private var handshakeTimer: DispatchSourceTimer?
27+
private var pingTimer: DispatchSourceTimer?
28+
private static let pingFrequency: TimeInterval = 5 * 60
2829

29-
private var connectionRecoveryTimer: Timer?
30+
private var connectionRecoveryTimer: DispatchSourceTimer?
3031
private var connectionRecoveryCount = 0
3132
//The max num of attempts before capping the delay time - not before giving up
3233
private static let connectionRecoveryCountMax = 8
@@ -40,12 +41,24 @@ class DataProxyConnect: DataProxy {
4041
self.idToken = idToken
4142
}
4243

43-
@objc func startServer() {
44+
deinit {
45+
//Ensure the server proxy isn't running when we go out of scope
46+
assert(!isActive, "DataProxyConnect was deinitialized while active")
47+
48+
//Ensure timers are cleaned up
49+
assert(pingTimer == nil, "DataProxyConnect was deinitialized with an active ping timer")
50+
assert(handshakeTimer == nil, "DataProxyConnect was deinitialized with an active handshake timer")
51+
assert(connectionRecoveryTimer == nil, "DataProxyConnect was deinitialized with an active connection recovery timer")
52+
}
53+
54+
func startServer() {
4455
//Ignore if we're already connecting or connected
4556
guard !isActive else { return }
4657

4758
//Cancel any active connection recover timers (in case the user initiated a reconnect)
48-
stopConnectionRecoveryTimer()
59+
processingQueue.sync {
60+
stopConnectionRecoveryTimer()
61+
}
4962

5063
//Build the URL
5164
var queryItems = [
@@ -76,29 +89,24 @@ class DataProxyConnect: DataProxy {
7689
}
7790

7891
func stopServer() {
92+
//Ignore if we're not running
93+
guard isActive else { return }
94+
7995
processingQueue.sync {
80-
//Ignore if we're not connected
81-
guard isActive, let socket = webSocket else { return }
82-
83-
//Cancel the handshake timer
84-
stopHandshakeTimer()
96+
let socket = webSocket!
8597

8698
//Clear connected clients
8799
connectionsLock.withWriteLock {
88100
connectionsMap.removeAll()
89101
}
90102
NotificationNames.postUpdateConnectionCount(0)
91103

92-
//Disconect
104+
//Disconnect
93105
socket.disconnect()
94-
isActive = false
95106
delegate?.dataProxy(self, didStopWithState: .stopped, isRecoverable: false)
96107
}
97-
}
98-
99-
deinit {
100-
//Make sure we stop the server when we go out of scope
101-
stopServer()
108+
109+
isActive = false
102110
}
103111

104112
func send(message data: Data, to client: ClientConnection?, encrypt: Bool, onSent: (() -> ())?) {
@@ -193,23 +201,28 @@ class DataProxyConnect: DataProxy {
193201
assertDispatchQueue(processingQueue)
194202

195203
//Cancel the old timer
196-
pingTimer?.invalidate()
204+
pingTimer?.cancel()
197205

198206
//Create the new timer
199-
let timer = Timer(timeInterval: 5 * 60, target: self, selector: #selector(onPingTimer), userInfo: nil, repeats: true)
200-
RunLoop.main.add(timer, forMode: .common)
207+
let timer = DispatchSource.makeTimerSource(queue: processingQueue)
208+
timer.schedule(deadline: .now() + DataProxyConnect.pingFrequency, repeating: DataProxyConnect.pingFrequency)
209+
timer.setEventHandler(handler: onPingTimer)
210+
timer.resume()
201211
pingTimer = timer
202212
}
203213

204214
private func stopPingTimer() {
205215
//Make sure we're on the processing queue
206216
assertDispatchQueue(processingQueue)
207217

208-
pingTimer?.invalidate()
218+
pingTimer?.cancel()
209219
pingTimer = nil
210220
}
211221

212-
@objc private func onPingTimer() {
222+
private func onPingTimer() {
223+
//Make sure we're on the processing queue
224+
assertDispatchQueue(processingQueue)
225+
213226
//Ping
214227
webSocket?.write(ping: Data())
215228
}
@@ -221,40 +234,54 @@ class DataProxyConnect: DataProxy {
221234
assertDispatchQueue(processingQueue)
222235

223236
//Cancel the old timer
224-
handshakeTimer?.invalidate()
237+
handshakeTimer?.cancel()
225238

226239
//Create the new timer
227-
let timer = Timer(timeInterval: ConnectConstants.handshakeTimeout, target: self, selector: #selector(onHandshakeTimer), userInfo: nil, repeats: false)
228-
RunLoop.main.add(timer, forMode: .common)
240+
let timer = DispatchSource.makeTimerSource(queue: processingQueue)
241+
timer.schedule(deadline: .now() + ConnectConstants.handshakeTimeout, repeating: .never)
242+
timer.setEventHandler(handler: onHandshakeTimer)
243+
timer.resume()
229244
handshakeTimer = timer
230245
}
231246

232247
private func stopHandshakeTimer() {
233248
//Make sure we're on the processing queue
234249
assertDispatchQueue(processingQueue)
235250

236-
handshakeTimer?.invalidate()
251+
handshakeTimer?.cancel()
237252
handshakeTimer = nil
238253
}
239254

240-
@objc private func onHandshakeTimer() {
255+
private func onHandshakeTimer() {
256+
//Make sure we're on the processing queue
257+
assertDispatchQueue(processingQueue)
258+
241259
//Disconnect
242260
webSocket?.disconnect()
261+
262+
//Clean up
263+
stopHandshakeTimer()
243264
}
244265

245266
//MARK: Connection recovery timer
246267

247268
private func startConnectionRecoveryTimer() {
269+
//Make sure we're on the processing queue
270+
assertDispatchQueue(processingQueue)
271+
248272
//Cancel the old timer
249-
connectionRecoveryTimer?.invalidate()
273+
connectionRecoveryTimer?.cancel()
250274

251275
//Wait an exponentially increasing wait period + a random delay
252276
let randomOffset: TimeInterval = Double(arc4random()) / Double(UInt32.max)
253277
let delay: TimeInterval = pow(2, Double(connectionRecoveryCount)) + randomOffset
254278

255279
//Create the new timer
256-
let timer = Timer(timeInterval: delay, target: self, selector: #selector(startServer), userInfo: nil, repeats: false)
257-
RunLoop.main.add(timer, forMode: .common)
280+
//startServer() must be invoked from the main thread
281+
let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.main)
282+
timer.schedule(deadline: .now() + delay, repeating: .never)
283+
timer.setEventHandler(handler: startServer)
284+
timer.resume()
258285
connectionRecoveryTimer = timer
259286

260287
//Add to the attempt counter
@@ -264,7 +291,10 @@ class DataProxyConnect: DataProxy {
264291
}
265292

266293
private func stopConnectionRecoveryTimer() {
267-
connectionRecoveryTimer?.invalidate()
294+
//Make sure we're on the processing queue
295+
assertDispatchQueue(processingQueue)
296+
297+
connectionRecoveryTimer?.cancel()
268298
connectionRecoveryTimer = nil
269299
}
270300

AirMessage/Connection/ConnectionManager.swift

+20-13
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class ConnectionManager {
1212
private let timerQueue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".connection.timer", qos: .utility)
1313

1414
private var dataProxy: DataProxy?
15-
private var keepaliveTimer: Timer?
15+
private var keepaliveTimer: DispatchSourceTimer?
1616
private let fileDownloadRequestMapLock = ReadWriteLock()
1717
private var fileDownloadRequestMap: [Int16: FileDownloadRequest] = [:]
1818

@@ -77,19 +77,20 @@ class ConnectionManager {
7777

7878
//MARK: - Timers
7979

80-
@objc private func runKeepalive() {
80+
private func runKeepalive() {
8181
guard let dataProxy = dataProxy else { return }
8282

83+
//Make sure we're running on the timer queue
84+
assertDispatchQueue(timerQueue)
85+
8386
//Send a ping to all clients
8487
ConnectionManager.sendMessageHeaderOnly(dataProxy, to: nil, ofType: NHT.ping, encrypt: false)
8588

8689
//Start ping response timers
8790
dataProxy.connectionsLock.withReadLock {
8891
for connection in dataProxy.connections {
89-
timerQueue.sync {
90-
connection.startTimer(ofType: .pingExpiry, interval: CommConst.pingTimeout) { [weak self] client in
91-
self?.dataProxy?.disconnect(client: client)
92-
}
92+
connection.startTimer(ofType: .pingExpiry, interval: CommConst.pingTimeout, queue: timerQueue) { [weak self] client in
93+
self?.dataProxy?.disconnect(client: client)
9394
}
9495
}
9596
}
@@ -288,7 +289,7 @@ class ConnectionManager {
288289
private func handleMessageAuthentication(dataProxy: DataProxy, packer messagePacker: inout AirPacker, from client: C) throws {
289290
//Cancel the handshake expiry timer
290291
timerQueue.sync {
291-
client.cancelTimer(ofType: .handshakeExpiry)
292+
client.cancelTimer(ofType: .handshakeExpiry, queue: timerQueue)
292293
}
293294

294295
//Sends an authorization rejected message and closes the connection
@@ -1544,9 +1545,13 @@ extension ConnectionManager: DataProxyDelegate {
15441545

15451546
//Start the keepalive timer
15461547
if dataProxy.requiresPersistence {
1547-
let timer = Timer(timeInterval: CommConst.keepAliveMillis, target: self, selector: #selector(runKeepalive), userInfo: nil, repeats: true)
1548-
RunLoop.main.add(timer, forMode: .common)
1549-
keepaliveTimer = timer
1548+
timerQueue.sync {
1549+
let timer = DispatchSource.makeTimerSource(queue: timerQueue)
1550+
timer.schedule(deadline: .now() + CommConst.keepAliveInterval, repeating: CommConst.keepAliveInterval)
1551+
timer.setEventHandler(handler: runKeepalive)
1552+
timer.resume()
1553+
keepaliveTimer = timer
1554+
}
15501555
}
15511556

15521557
LogManager.log("Server started", level: .info)
@@ -1558,8 +1563,10 @@ extension ConnectionManager: DataProxyDelegate {
15581563
NotificationNames.postUpdateConnectionCount(0)
15591564

15601565
//Stop the keepalive timer
1561-
keepaliveTimer?.invalidate()
1562-
keepaliveTimer = nil
1566+
timerQueue.sync {
1567+
keepaliveTimer?.cancel()
1568+
keepaliveTimer = nil
1569+
}
15631570

15641571
if isRecoverable {
15651572
LogManager.log("Server paused", level: .info)
@@ -1612,7 +1619,7 @@ extension ConnectionManager: DataProxyDelegate {
16121619

16131620
//Start the expiry timer
16141621
timerQueue.sync {
1615-
client.startTimer(ofType: .handshakeExpiry, interval: CommConst.handshakeTimeout) { [weak self] client in
1622+
client.startTimer(ofType: .handshakeExpiry, interval: CommConst.handshakeTimeout, queue: timerQueue) { [weak self] client in
16161623
LogManager.log("Handshake response for client \(client.readableID) timed out, disconnecting", level: .debug)
16171624
self?.dataProxy?.disconnect(client: client)
16181625
}

AirMessage/Connection/Direct/DataProxyTCP.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ class DataProxyTCP: DataProxy {
190190
}
191191

192192
deinit {
193-
//Make sure we stop the server when we go out of scope
194-
stopServer()
193+
//Ensure the server proxy isn't running when we go out of scope
194+
assert(!serverRunning, "DataProxyTCP was deinitialized while active")
195195
}
196196

197197
func send(message data: Data, to client: ClientConnection?, encrypt: Bool, onSent: (() -> ())?) {

0 commit comments

Comments
 (0)