Skip to content

Commit a150d12

Browse files
authored
Merge branch 'master' into jw-3-el
2 parents ec1b726 + 8e60b94 commit a150d12

33 files changed

+5278
-1541
lines changed

Diff for: .swiftformat

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
# file options
22

3+
--swiftversion 5.0
34
--exclude .build
45

56
# format options
67

78
--self insert
89
--patternlet inline
910
--stripunusedargs unnamed-only
10-
--comments ignore
1111
--ranges nospace
12+
--disable typeSugar # https://github.com/nicklockwood/SwiftFormat/issues/636
1213

1314
# rules

Diff for: Package.swift

+8-4
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,22 @@ let package = Package(
2121
.library(name: "AsyncHTTPClient", targets: ["AsyncHTTPClient"]),
2222
],
2323
dependencies: [
24-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.13.1"),
25-
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.4.1"),
24+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.18.0"),
25+
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.7.0"),
2626
.package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.3.0"),
27+
.package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.5.1"),
28+
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
2729
],
2830
targets: [
2931
.target(
3032
name: "AsyncHTTPClient",
31-
dependencies: ["NIO", "NIOHTTP1", "NIOSSL", "NIOConcurrencyHelpers", "NIOHTTPCompression", "NIOFoundationCompat"]
33+
dependencies: ["NIO", "NIOHTTP1", "NIOSSL", "NIOConcurrencyHelpers", "NIOHTTPCompression",
34+
"NIOFoundationCompat", "NIOTransportServices", "Logging"]
3235
),
3336
.testTarget(
3437
name: "AsyncHTTPClientTests",
35-
dependencies: ["NIO", "NIOConcurrencyHelpers", "NIOSSL", "AsyncHTTPClient", "NIOFoundationCompat", "NIOTestUtils"]
38+
dependencies: ["NIO", "NIOConcurrencyHelpers", "NIOSSL", "AsyncHTTPClient", "NIOFoundationCompat",
39+
"NIOTestUtils", "Logging"]
3640
),
3741
]
3842
)

Diff for: Sources/AsyncHTTPClient/ConnectionPool.swift

+452-520
Large diffs are not rendered by default.

Diff for: Sources/AsyncHTTPClient/ConnectionsState.swift

+338
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
17+
extension HTTP1ConnectionProvider {
18+
enum Action {
19+
case lease(Connection, Waiter)
20+
case create(Waiter)
21+
case replace(Connection, Waiter)
22+
case closeProvider
23+
case park(Connection)
24+
case none
25+
case fail(Waiter, Error)
26+
case cancel(Connection, Bool)
27+
indirect case closeAnd(Connection, Action)
28+
indirect case parkAnd(Connection, Action)
29+
}
30+
31+
struct ConnectionsState {
32+
enum State {
33+
case active
34+
case closed
35+
}
36+
37+
struct Snapshot {
38+
var state: State
39+
var availableConnections: CircularBuffer<Connection>
40+
var leasedConnections: Set<ConnectionKey>
41+
var waiters: CircularBuffer<Waiter>
42+
var openedConnectionsCount: Int
43+
var pending: Int
44+
}
45+
46+
let maximumConcurrentConnections: Int
47+
let eventLoop: EventLoop
48+
49+
private var state: State = .active
50+
51+
/// Opened connections that are available.
52+
private var availableConnections: CircularBuffer<Connection> = .init(initialCapacity: 8)
53+
54+
/// Opened connections that are leased to the user.
55+
private var leasedConnections: Set<ConnectionKey> = .init()
56+
57+
/// Consumers that weren't able to get a new connection without exceeding
58+
/// `maximumConcurrentConnections` get a `Future<Connection>`
59+
/// whose associated promise is stored in `Waiter`. The promise is completed
60+
/// as soon as possible by the provider, in FIFO order.
61+
private var waiters: CircularBuffer<Waiter> = .init(initialCapacity: 8)
62+
63+
/// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit.
64+
private var openedConnectionsCount: Int = 0
65+
66+
/// Number of enqueued requests, used to track if it is safe to delete the provider.
67+
private var pending: Int = 0
68+
69+
init(maximumConcurrentConnections: Int = 8, eventLoop: EventLoop) {
70+
self.maximumConcurrentConnections = maximumConcurrentConnections
71+
self.eventLoop = eventLoop
72+
}
73+
74+
func testsOnly_getInternalState() -> Snapshot {
75+
return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending)
76+
}
77+
78+
mutating func testsOnly_setInternalState(_ snapshot: Snapshot) {
79+
self.state = snapshot.state
80+
self.availableConnections = snapshot.availableConnections
81+
self.leasedConnections = snapshot.leasedConnections
82+
self.waiters = snapshot.waiters
83+
self.openedConnectionsCount = snapshot.openedConnectionsCount
84+
self.pending = snapshot.pending
85+
}
86+
87+
func assertInvariants() {
88+
assert(self.waiters.isEmpty)
89+
assert(self.availableConnections.isEmpty)
90+
assert(self.leasedConnections.isEmpty)
91+
assert(self.openedConnectionsCount == 0)
92+
assert(self.pending == 0)
93+
}
94+
95+
mutating func enqueue() -> Bool {
96+
switch self.state {
97+
case .active:
98+
self.pending += 1
99+
return true
100+
case .closed:
101+
return false
102+
}
103+
}
104+
105+
private var hasCapacity: Bool {
106+
return self.openedConnectionsCount < self.maximumConcurrentConnections
107+
}
108+
109+
private var isEmpty: Bool {
110+
return self.openedConnectionsCount == 0 && self.pending == 0
111+
}
112+
113+
mutating func acquire(waiter: Waiter) -> Action {
114+
switch self.state {
115+
case .active:
116+
self.pending -= 1
117+
118+
let (eventLoop, required) = self.resolvePreference(waiter.preference)
119+
if required {
120+
// If there is an opened connection on the same EL - use it
121+
if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) {
122+
let connection = self.availableConnections.remove(at: found)
123+
self.leasedConnections.insert(ConnectionKey(connection))
124+
return .lease(connection, waiter)
125+
}
126+
127+
// If we can create additional connection, create
128+
if self.hasCapacity {
129+
self.openedConnectionsCount += 1
130+
return .create(waiter)
131+
}
132+
133+
// If we cannot create additional connection, but there is one in the pool, replace it
134+
if let connection = self.availableConnections.popFirst() {
135+
return .replace(connection, waiter)
136+
}
137+
138+
self.waiters.append(waiter)
139+
return .none
140+
} else if let connection = self.availableConnections.popFirst() {
141+
self.leasedConnections.insert(ConnectionKey(connection))
142+
return .lease(connection, waiter)
143+
} else if self.hasCapacity {
144+
self.openedConnectionsCount += 1
145+
return .create(waiter)
146+
} else {
147+
self.waiters.append(waiter)
148+
return .none
149+
}
150+
case .closed:
151+
return .fail(waiter, HTTPClientError.alreadyShutdown)
152+
}
153+
}
154+
155+
mutating func release(connection: Connection, closing: Bool) -> Action {
156+
switch self.state {
157+
case .active:
158+
assert(self.leasedConnections.contains(ConnectionKey(connection)))
159+
160+
if connection.isActiveEstimation, !closing { // If connection is alive, we can offer it to a next waiter
161+
if let waiter = self.waiters.popFirst() {
162+
let (eventLoop, required) = self.resolvePreference(waiter.preference)
163+
164+
// If returned connection is on same EL or we do not require special EL - lease it
165+
if connection.channel.eventLoop === eventLoop || !required {
166+
return .lease(connection, waiter)
167+
}
168+
169+
// If there is an opened connection on the same loop, lease it and park returned
170+
if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) {
171+
self.leasedConnections.remove(ConnectionKey(connection))
172+
let replacement = self.availableConnections.swap(at: found, with: connection)
173+
self.leasedConnections.insert(ConnectionKey(replacement))
174+
return .parkAnd(connection, .lease(replacement, waiter))
175+
}
176+
177+
// If we can create new connection - do it
178+
if self.hasCapacity {
179+
self.leasedConnections.remove(ConnectionKey(connection))
180+
self.availableConnections.append(connection)
181+
self.openedConnectionsCount += 1
182+
return .parkAnd(connection, .create(waiter))
183+
}
184+
185+
// If we cannot create new connections, we will have to replace returned connection with a new one on the required loop
186+
return .replace(connection, waiter)
187+
} else { // or park, if there are no waiters
188+
self.leasedConnections.remove(ConnectionKey(connection))
189+
self.availableConnections.append(connection)
190+
return .park(connection)
191+
}
192+
} else { // if connection is not alive, we delete it and process the next waiter
193+
// this connections is now gone, we will either create new connection or do nothing
194+
self.openedConnectionsCount -= 1
195+
self.leasedConnections.remove(ConnectionKey(connection))
196+
197+
return self.processNextWaiter()
198+
}
199+
case .closed:
200+
self.openedConnectionsCount -= 1
201+
self.leasedConnections.remove(ConnectionKey(connection))
202+
203+
return self.processNextWaiter()
204+
}
205+
}
206+
207+
mutating func offer(connection: Connection) -> Action {
208+
switch self.state {
209+
case .active:
210+
self.leasedConnections.insert(ConnectionKey(connection))
211+
return .none
212+
case .closed: // This can happen when we close the client while connections was being estableshed
213+
return .cancel(connection, self.isEmpty)
214+
}
215+
}
216+
217+
mutating func drop(connection: Connection) {
218+
switch self.state {
219+
case .active:
220+
self.leasedConnections.remove(ConnectionKey(connection))
221+
case .closed:
222+
assertionFailure("should not happen")
223+
}
224+
}
225+
226+
mutating func connectFailed() -> Action {
227+
switch self.state {
228+
case .active:
229+
self.openedConnectionsCount -= 1
230+
return self.processNextWaiter()
231+
case .closed:
232+
// This can happen in the following scenario: user initiates a connection that will fail to connect,
233+
// user calls `syncShutdown` before we received an error from the bootstrap. In this scenario,
234+
// pool will be `.closed` but connection will be still in the process of being established/failed,
235+
// so then this process finishes, it will get to this point.
236+
return .none
237+
}
238+
}
239+
240+
mutating func remoteClosed(connection: Connection) -> Action {
241+
switch self.state {
242+
case .active:
243+
// Connection can be closed remotely while we wait for `.lease` action to complete.
244+
// If this happens when connections is leased, we do not remove it from leased connections,
245+
// it will be done when a new replacement will be ready for it.
246+
if self.leasedConnections.contains(ConnectionKey(connection)) {
247+
return .none
248+
}
249+
250+
// If this connection is not in use, the have to release it as well
251+
self.openedConnectionsCount -= 1
252+
self.availableConnections.removeAll { $0 === connection }
253+
254+
return self.processNextWaiter()
255+
case .closed:
256+
self.openedConnectionsCount -= 1
257+
return self.processNextWaiter()
258+
}
259+
}
260+
261+
mutating func timeout(connection: Connection) -> Action {
262+
switch self.state {
263+
case .active:
264+
// We can get timeout and inUse = true when we decided to lease the connection, but this action is not executed yet.
265+
// In this case we can ignore timeout notification.
266+
if self.leasedConnections.contains(ConnectionKey(connection)) {
267+
return .none
268+
}
269+
270+
// If connection was not in use, we release it from the pool, increasing available capacity
271+
self.openedConnectionsCount -= 1
272+
self.availableConnections.removeAll { $0 === connection }
273+
274+
return .closeAnd(connection, self.processNextWaiter())
275+
case .closed:
276+
return .none
277+
}
278+
}
279+
280+
mutating func processNextWaiter() -> Action {
281+
if let waiter = self.waiters.popFirst() {
282+
let (eventLoop, required) = self.resolvePreference(waiter.preference)
283+
284+
// If specific EL is required, we have only two options - find open one or create a new one
285+
if required, let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) {
286+
let connection = self.availableConnections.remove(at: found)
287+
self.leasedConnections.insert(ConnectionKey(connection))
288+
return .lease(connection, waiter)
289+
} else if !required, let connection = self.availableConnections.popFirst() {
290+
self.leasedConnections.insert(ConnectionKey(connection))
291+
return .lease(connection, waiter)
292+
} else {
293+
self.openedConnectionsCount += 1
294+
return .create(waiter)
295+
}
296+
}
297+
298+
// if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider
299+
if self.isEmpty {
300+
// deactivate and remove
301+
self.state = .closed
302+
return .closeProvider
303+
}
304+
305+
return .none
306+
}
307+
308+
mutating func close() -> (CircularBuffer<Waiter>, CircularBuffer<Connection>, Set<ConnectionKey>, Bool)? {
309+
switch self.state {
310+
case .active:
311+
let waiters = self.waiters
312+
self.waiters.removeAll()
313+
314+
let available = self.availableConnections
315+
self.availableConnections.removeAll()
316+
317+
let leased = self.leasedConnections
318+
319+
self.state = .closed
320+
321+
return (waiters, available, leased, self.openedConnectionsCount - available.count == 0)
322+
case .closed:
323+
return nil
324+
}
325+
}
326+
327+
private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) {
328+
switch preference.preference {
329+
case .indifferent:
330+
return (self.eventLoop, false)
331+
case .delegate(let el):
332+
return (el, false)
333+
case .delegateAndChannel(let el), .testOnly_exact(let el, _):
334+
return (el, true)
335+
}
336+
}
337+
}
338+
}

0 commit comments

Comments
 (0)