-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathHTTP2Connection.swift
372 lines (310 loc) · 13.4 KB
/
HTTP2Connection.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Logging
import NIOCore
import NIOHTTP2
import NIOHTTPCompression
protocol HTTP2ConnectionDelegate {
func http2Connection(_: HTTP2Connection, newMaxStreamSetting: Int)
func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int)
func http2ConnectionGoAwayReceived(_: HTTP2Connection)
func http2ConnectionClosed(_: HTTP2Connection)
}
struct HTTP2PushNotSupportedError: Error {}
struct HTTP2ReceivedGoAwayBeforeSettingsError: Error {}
final class HTTP2Connection {
internal static let defaultSettings = nioDefaultSettings + [HTTP2Setting(parameter: .enablePush, value: 0)]
let channel: Channel
let multiplexer: HTTP2StreamMultiplexer
let logger: Logger
/// A method with access to the stream channel that is called when creating the stream.
let streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)?
/// the connection pool that created the connection
let delegate: HTTP2ConnectionDelegate
enum State {
case initialized
case starting(EventLoopPromise<Int>)
case active(maxStreams: Int)
case closing
case closed
}
/// A structure to store a http/2 stream channel in a set.
private struct ChannelBox: Hashable {
struct ID: Hashable {
private let id: ObjectIdentifier
init(_ channel: Channel) {
self.id = ObjectIdentifier(channel)
}
}
let channel: Channel
var id: ID {
ID(self.channel)
}
init(_ channel: Channel) {
self.channel = channel
}
static func == (lhs: Self, rhs: Self) -> Bool {
lhs.id == rhs.id
}
func hash(into hasher: inout Hasher) {
hasher.combine(self.id)
}
}
private var state: State
/// We use this channel set to remember, which open streams we need to inform that
/// we want to close the connection. The channels shall than cancel their currently running
/// request. This property must only be accessed from the connections `EventLoop`.
private var openStreams = Set<ChannelBox>()
let id: HTTPConnectionPool.Connection.ID
let decompression: HTTPClient.Decompression
let maximumConnectionUses: Int?
var closeFuture: EventLoopFuture<Void> {
self.channel.closeFuture
}
init(
channel: Channel,
connectionID: HTTPConnectionPool.Connection.ID,
decompression: HTTPClient.Decompression,
maximumConnectionUses: Int?,
delegate: HTTP2ConnectionDelegate,
logger: Logger,
streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) {
self.channel = channel
self.id = connectionID
self.decompression = decompression
self.maximumConnectionUses = maximumConnectionUses
self.logger = logger
self.multiplexer = HTTP2StreamMultiplexer(
mode: .client,
channel: channel,
targetWindowSize: 8 * 1024 * 1024, // 8mb
outboundBufferSizeHighWatermark: 8196,
outboundBufferSizeLowWatermark: 4092,
inboundStreamInitializer: { channel -> EventLoopFuture<Void> in
channel.eventLoop.makeFailedFuture(HTTP2PushNotSupportedError())
}
)
self.delegate = delegate
self.state = .initialized
self.streamChannelDebugInitializer = streamChannelDebugInitializer
}
deinit {
guard case .closed = self.state else {
preconditionFailure("Connection must be closed, before we can deinit it. Current state: \(self.state)")
}
}
static func start(
channel: Channel,
connectionID: HTTPConnectionPool.Connection.ID,
delegate: HTTP2ConnectionDelegate,
decompression: HTTPClient.Decompression,
maximumConnectionUses: Int?,
logger: Logger,
streamChannelDebugInitializer: (@Sendable (Channel) -> EventLoopFuture<Void>)? = nil
) -> EventLoopFuture<(HTTP2Connection, Int)> {
let connection = HTTP2Connection(
channel: channel,
connectionID: connectionID,
decompression: decompression,
maximumConnectionUses: maximumConnectionUses,
delegate: delegate,
logger: logger,
streamChannelDebugInitializer: streamChannelDebugInitializer
)
return connection._start0().map { maxStreams in (connection, maxStreams) }
}
func executeRequest(_ request: HTTPExecutableRequest) {
if self.channel.eventLoop.inEventLoop {
self.executeRequest0(request)
} else {
self.channel.eventLoop.execute {
self.executeRequest0(request)
}
}
}
/// shuts down the connection by cancelling all running tasks and closing the connection once
/// all child streams/channels are closed.
func shutdown() {
if self.channel.eventLoop.inEventLoop {
self.shutdown0()
} else {
self.channel.eventLoop.execute {
self.shutdown0()
}
}
}
func close(promise: EventLoopPromise<Void>?) {
self.channel.close(mode: .all, promise: promise)
}
func close() -> EventLoopFuture<Void> {
let promise = self.channel.eventLoop.makePromise(of: Void.self)
self.close(promise: promise)
return promise.futureResult
}
func _start0() -> EventLoopFuture<Int> {
self.channel.eventLoop.assertInEventLoop()
let readyToAcceptConnectionsPromise = self.channel.eventLoop.makePromise(of: Int.self)
self.state = .starting(readyToAcceptConnectionsPromise)
self.channel.closeFuture.whenComplete { _ in
switch self.state {
case .initialized, .closed:
preconditionFailure("invalid state \(self.state)")
case .starting(let readyToAcceptConnectionsPromise):
self.state = .closed
readyToAcceptConnectionsPromise.fail(HTTPClientError.remoteConnectionClosed)
case .active, .closing:
self.state = .closed
self.delegate.http2ConnectionClosed(self)
}
}
do {
// We create and add the http handlers ourselves here, since we need to inject an
// `HTTP2IdleHandler` between the `NIOHTTP2Handler` and the `HTTP2StreamMultiplexer`.
// The purpose of the `HTTP2IdleHandler` is to count open streams in the multiplexer.
// We use the HTTP2IdleHandler's information to notify our delegate, whether more work
// can be scheduled on this connection.
let sync = self.channel.pipeline.syncOperations
let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: Self.defaultSettings)
let idleHandler = HTTP2IdleHandler(
delegate: self,
logger: self.logger,
maximumConnectionUses: self.maximumConnectionUses
)
try sync.addHandler(http2Handler, position: .last)
try sync.addHandler(idleHandler, position: .last)
try sync.addHandler(self.multiplexer, position: .last)
} catch {
self.channel.close(mode: .all, promise: nil)
readyToAcceptConnectionsPromise.fail(error)
}
return readyToAcceptConnectionsPromise.futureResult
}
private func executeRequest0(_ request: HTTPExecutableRequest) {
self.channel.eventLoop.assertInEventLoop()
switch self.state {
case .initialized, .starting:
preconditionFailure("Invalid state: \(self.state). Sending requests is not allowed before we are started.")
case .active:
let createStreamChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self)
self.multiplexer.createStreamChannel(promise: createStreamChannelPromise) {
channel -> EventLoopFuture<Void> in
do {
// the connection may have been asked to shutdown while we created the child. in
// this
// channel.
guard case .active = self.state else {
throw HTTPClientError.cancelled
}
// We only support http/2 over an https connection – using the Application-Layer
// Protocol Negotiation (ALPN). For this reason it is safe to fix this to `.https`.
let translate = HTTP2FramePayloadToHTTP1ClientCodec(httpProtocol: .https)
try channel.pipeline.syncOperations.addHandler(translate)
if case .enabled(let limit) = self.decompression {
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
try channel.pipeline.syncOperations.addHandler(decompressHandler)
}
let handler = HTTP2ClientRequestHandler(eventLoop: channel.eventLoop)
try channel.pipeline.syncOperations.addHandler(handler)
// We must add the new channel to the list of open channels BEFORE we write the
// request to it. In case of an error, we are sure that the channel was added
// before.
let box = ChannelBox(channel)
self.openStreams.insert(box)
channel.closeFuture.whenComplete { _ in
self.openStreams.remove(box)
}
if let streamChannelDebugInitializer = self.streamChannelDebugInitializer {
return streamChannelDebugInitializer(channel).map { _ in
channel.write(request, promise: nil)
}
} else {
channel.write(request, promise: nil)
return channel.eventLoop.makeSucceededVoidFuture()
}
} catch {
return channel.eventLoop.makeFailedFuture(error)
}
}
createStreamChannelPromise.futureResult.whenFailure { error in
request.fail(error)
}
case .closing, .closed:
// Because of race conditions requests might reach this point, even though the
// connection is already closing
return request.fail(HTTPClientError.cancelled)
}
}
private func shutdown0() {
self.channel.eventLoop.assertInEventLoop()
switch self.state {
case .active:
self.state = .closing
// inform all open streams, that the currently running request should be cancelled.
for box in self.openStreams {
box.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
}
// inform the idle connection handler, that connection should be closed, once all streams
// are closed.
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.shutdownRequested, promise: nil)
case .closed, .closing:
// we are already closing/closed and we need to tolerate this
break
case .initialized, .starting:
preconditionFailure("invalid state \(self.state)")
}
}
func __forTesting_getStreamChannels() -> [Channel] {
self.channel.eventLoop.preconditionInEventLoop()
return self.openStreams.map { $0.channel }
}
}
extension HTTP2Connection: HTTP2IdleHandlerDelegate {
func http2SettingsReceived(maxStreams: Int) {
self.channel.eventLoop.assertInEventLoop()
switch self.state {
case .initialized:
preconditionFailure("Invalid state: \(self.state)")
case .starting(let promise):
self.state = .active(maxStreams: maxStreams)
promise.succeed(maxStreams)
case .active:
self.state = .active(maxStreams: maxStreams)
self.delegate.http2Connection(self, newMaxStreamSetting: maxStreams)
case .closing, .closed:
// ignore. we only wait for all connections to be closed anyway.
break
}
}
func http2GoAwayReceived() {
self.channel.eventLoop.assertInEventLoop()
switch self.state {
case .initialized:
preconditionFailure("Invalid state: \(self.state)")
case .starting(let promise):
self.state = .closing
promise.fail(HTTP2ReceivedGoAwayBeforeSettingsError())
case .active:
self.state = .closing
self.delegate.http2ConnectionGoAwayReceived(self)
case .closing, .closed:
// we are already closing. Nothing new
break
}
}
func http2StreamClosed(availableStreams: Int) {
self.channel.eventLoop.assertInEventLoop()
self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
}
}