From 54a3a68041ecd53bebc05ff0b0f7e5d396444a92 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 17 Dec 2021 09:58:23 +0100 Subject: [PATCH 1/4] Fix HTTP1 to HTTP2 migration while shutdown is in progress ### Motivation Calling `HTTPClient.shutdown()` may never return if connections are still starting and one new established connection results in a state migration (i.e. from HTTP1 to HTTP2 or vice versa). We forgot to migrate the shutdown state. This could result in a large dealy until `.shutdown()` returns because we wait until connections are closed because of idle timeout. Worse, it could also never return if more requests are queued because the connections would not be idle and therefore not close itself. ###Changes - Mirgrate shutdown state too - add tests for this specific case --- ...HTTPConnectionPool+HTTP1StateMachine.swift | 37 +++-- ...HTTPConnectionPool+HTTP2StateMachine.swift | 30 ++-- .../HTTPConnectionPool+StateMachine.swift | 20 +-- .../AsyncAwaitEndToEndTests.swift | 5 +- ...onPool+HTTP2StateMachineTests+XCTest.swift | 1 + ...onnectionPool+HTTP2StateMachineTests.swift | 133 ++++++++++++++++-- 6 files changed, 167 insertions(+), 59 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 36d8328a0..6b3f7352e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -16,12 +16,6 @@ import NIOCore extension HTTPConnectionPool { struct HTTP1StateMachine { - enum State: Equatable { - case running - case shuttingDown(unclean: Bool) - case shutDown - } - typealias Action = HTTPConnectionPool.StateMachine.Action typealias ConnectionMigrationAction = HTTPConnectionPool.StateMachine.ConnectionMigrationAction typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction @@ -34,15 +28,20 @@ extension HTTPConnectionPool { private var lastConnectFailure: Error? private(set) var requests: RequestQueue - private var state: State = .running + private(set) var lifecycleState: StateMachine.LifecycleState - init(idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int) { + init( + idGenerator: Connection.ID.Generator, + maximumConcurrentConnections: Int, + lifecycleState: StateMachine.LifecycleState + ) { self.connections = HTTP1Connections( maximumConcurrentConnections: maximumConcurrentConnections, generator: idGenerator ) self.requests = RequestQueue() + self.lifecycleState = lifecycleState } mutating func migrateFromHTTP2( @@ -111,7 +110,7 @@ extension HTTPConnectionPool { // MARK: - Events - mutating func executeRequest(_ request: Request) -> Action { - switch self.state { + switch self.lifecycleState { case .running: if let eventLoop = request.requiredEventLoop { return self.executeRequestOnRequiredEventLoop(request, eventLoop: eventLoop) @@ -218,7 +217,7 @@ extension HTTPConnectionPool { self.failedConsecutiveConnectionAttempts += 1 self.lastConnectFailure = error - switch self.state { + switch self.lifecycleState { case .running: // We don't care how many waiting requests we have at this point, we will schedule a // retry. More tasks, may appear until the backoff has completed. The final @@ -243,7 +242,7 @@ extension HTTPConnectionPool { } mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { - switch self.state { + switch self.lifecycleState { case .running: // The naming of `failConnection` is a little confusing here. All it does is moving the // connection state from `.backingOff` to `.closed` here. It also returns the @@ -271,7 +270,7 @@ extension HTTPConnectionPool { return .none } - precondition(self.state == .running, "If we are shutting down, we must not have any idle connections") + precondition(self.lifecycleState == .running, "If we are shutting down, we must not have any idle connections") return .init( request: .none, @@ -332,7 +331,7 @@ extension HTTPConnectionPool { } mutating func shutdown() -> Action { - precondition(self.state == .running, "Shutdown must only be called once") + precondition(self.lifecycleState == .running, "Shutdown must only be called once") // If we have remaining request queued, we should fail all of them with a cancelled // error. @@ -350,10 +349,10 @@ extension HTTPConnectionPool { let isShutdown: StateMachine.ConnectionAction.IsShutdown let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty) if self.connections.isEmpty && self.http2Connections == nil { - self.state = .shutDown + self.lifecycleState = .shutDown isShutdown = .yes(unclean: unclean) } else { - self.state = .shuttingDown(unclean: unclean) + self.lifecycleState = .shuttingDown(unclean: unclean) isShutdown = .no } @@ -371,7 +370,7 @@ extension HTTPConnectionPool { at index: Int, context: HTTP1Connections.IdleConnectionContext ) -> EstablishedAction { - switch self.state { + switch self.lifecycleState { case .running: switch context.use { case .generalPurpose: @@ -457,7 +456,7 @@ extension HTTPConnectionPool { at index: Int, context: HTTP1Connections.FailedConnectionContext ) -> Action { - switch self.state { + switch self.lifecycleState { case .running: switch context.use { case .generalPurpose: @@ -537,7 +536,7 @@ extension HTTPConnectionPool { } mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { - switch self.state { + switch self.lifecycleState { case .running: _ = self.http2Connections?.failConnection(connectionID) if self.http2Connections?.isEmpty == true { @@ -570,7 +569,7 @@ extension HTTPConnectionPool { mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { // It is save to bang the http2Connections here. If we get this callback but we don't have // http2 connections something has gone terribly wrong. - switch self.state { + switch self.lifecycleState { case .running: let (index, context) = self.http2Connections!.releaseStream(connectionID) guard context.isIdle else { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 932175696..d3e6fbdcd 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -22,12 +22,6 @@ extension HTTPConnectionPool { typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction - private enum State: Equatable { - case running - case shuttingDown(unclean: Bool) - case shutDown - } - private var lastConnectFailure: Error? private var failedConsecutiveConnectionAttempts = 0 @@ -38,15 +32,17 @@ extension HTTPConnectionPool { private let idGenerator: Connection.ID.Generator - private var state: State = .running + private(set) var lifecycleState: StateMachine.LifecycleState init( - idGenerator: Connection.ID.Generator + idGenerator: Connection.ID.Generator, + lifecycleState: StateMachine.LifecycleState ) { self.idGenerator = idGenerator self.requests = RequestQueue() self.connections = HTTP2Connections(generator: idGenerator) + self.lifecycleState = lifecycleState } mutating func migrateFromHTTP1( @@ -112,7 +108,7 @@ extension HTTPConnectionPool { } mutating func executeRequest(_ request: Request) -> Action { - switch self.state { + switch self.lifecycleState { case .running: if let eventLoop = request.requiredEventLoop { return self.executeRequest(request, onRequired: eventLoop) @@ -236,7 +232,7 @@ extension HTTPConnectionPool { at index: Int, context: HTTP2Connections.EstablishedConnectionContext ) -> EstablishedAction { - switch self.state { + switch self.lifecycleState { case .running: // We prioritise requests with a required event loop over those without a requirement. // This can cause starvation for request without a required event loop. @@ -323,7 +319,7 @@ extension HTTPConnectionPool { } private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action { - switch self.state { + switch self.lifecycleState { case .running: // we do not know if we have created this connection for a request with a required // event loop or not. However, we do not need this information and can infer @@ -378,7 +374,7 @@ extension HTTPConnectionPool { } private mutating func nextActionForClosingConnection(on eventLoop: EventLoop) -> Action { - switch self.state { + switch self.lifecycleState { case .running: let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) guard hasPendingRequest else { @@ -463,7 +459,7 @@ extension HTTPConnectionPool { return .none } - precondition(self.state == .running, "If we are shutting down, we must not have any idle connections") + precondition(self.lifecycleState == .running, "If we are shutting down, we must not have any idle connections") return .init( request: .none, @@ -479,7 +475,7 @@ extension HTTPConnectionPool { if self.http1Connections!.isEmpty { self.http1Connections = nil } - switch self.state { + switch self.lifecycleState { case .running: return .none case .shuttingDown(let unclean): @@ -510,7 +506,7 @@ extension HTTPConnectionPool { self.http1Connections = nil // we must also check, if we are shutting down. Was this maybe out last connection? - switch self.state { + switch self.lifecycleState { case .running: return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) case .shuttingDown(let unclean): @@ -543,10 +539,10 @@ extension HTTPConnectionPool { let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil) if self.connections.isEmpty && self.http1Connections == nil { isShutdown = .yes(unclean: unclean) - self.state = .shutDown + self.lifecycleState = .shutDown } else { isShutdown = .no - self.state = .shuttingDown(unclean: unclean) + self.lifecycleState = .shuttingDown(unclean: unclean) } return .init( request: requestAction, diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index 964d9b794..4d912633c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -66,6 +66,12 @@ extension HTTPConnectionPool { case none } + enum LifecycleState: Equatable { + case running + case shuttingDown(unclean: Bool) + case shutDown + } + enum HTTPVersionState { case http1(HTTP1StateMachine) case http2(HTTP2StateMachine) @@ -88,7 +94,6 @@ extension HTTPConnectionPool { } var state: HTTPVersionState - var isShuttingDown: Bool = false let idGenerator: Connection.ID.Generator let maximumConcurrentHTTP1Connections: Int @@ -98,7 +103,8 @@ extension HTTPConnectionPool { self.idGenerator = idGenerator let http1State = HTTP1StateMachine( idGenerator: idGenerator, - maximumConcurrentConnections: maximumConcurrentHTTP1Connections + maximumConcurrentConnections: maximumConcurrentHTTP1Connections, + lifecycleState: .running ) self.state = .http1(http1State) } @@ -121,7 +127,8 @@ extension HTTPConnectionPool { case .http2(let http2StateMachine): var http1StateMachine = HTTP1StateMachine( idGenerator: self.idGenerator, - maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections + maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections, + lifecycleState: http2StateMachine.lifecycleState ) let newConnectionAction = http1StateMachine.migrateFromHTTP2( @@ -140,7 +147,8 @@ extension HTTPConnectionPool { case .http1(let http1StateMachine): var http2StateMachine = HTTP2StateMachine( - idGenerator: self.idGenerator + idGenerator: self.idGenerator, + lifecycleState: http1StateMachine.lifecycleState ) let migrationAction = http2StateMachine.migrateFromHTTP1( http1Connections: http1StateMachine.connections, @@ -263,10 +271,6 @@ extension HTTPConnectionPool { } mutating func shutdown() -> Action { - precondition(!self.isShuttingDown, "Shutdown must only be called once") - - self.isShuttingDown = true - return self.state.modify(http1: { http1 in http1.shutdown() }, http2: { http2 in diff --git a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift index 7b43d8b96..7c5f61681 100644 --- a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift +++ b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift @@ -322,7 +322,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase { #if compiler(>=5.5) && canImport(_Concurrency) guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } XCTAsyncTest(timeout: 5) { - let bin = HTTPBin() + let bin = HTTPBin(.http2(compress: false)) defer { XCTAssertNoThrow(try bin.shutdown()) } let client = makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } @@ -368,7 +368,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase { #if compiler(>=5.5) && canImport(_Concurrency) guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } XCTAsyncTest(timeout: 5) { - let bin = HTTPBin() + let bin = HTTPBin(.http2(compress: false)) defer { XCTAssertNoThrow(try bin.shutdown()) } let client = makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } @@ -381,7 +381,6 @@ final class AsyncAwaitEndToEndTests: XCTestCase { await XCTAssertThrowsError(try await task.value) { XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.deadlineExceeded) } - print("done") } #endif } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index ef1ed1f04..9c1f4cd79 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -37,6 +37,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream), ("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection), ("testMigrationFromHTTP1ToHTTP2", testMigrationFromHTTP1ToHTTP2), + ("testMigrationFromHTTP1ToHTTP2WhileShuttingDown", testMigrationFromHTTP1ToHTTP2WhileShuttingDown), ("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections), ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), ("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires), diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 049d18f7f..939b1c0b8 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -29,7 +29,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let el1 = elg.next() var connections = MockConnectionPool() var queuer = MockRequestQueuer() - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: .init()) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: .init(), lifecycleState: .running) /// first request should create a new connection let mockRequest = MockHTTPRequest(eventLoop: el1) @@ -137,7 +137,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.HTTP2StateMachine( - idGenerator: .init() + idGenerator: .init(), + lifecycleState: .running ) let mockRequest = MockHTTPRequest(eventLoop: elg.next()) @@ -193,7 +194,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.HTTP2StateMachine( - idGenerator: .init() + idGenerator: .init(), + lifecycleState: .running ) let mockRequest = MockHTTPRequest(eventLoop: elg.next()) @@ -230,7 +232,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.HTTP2StateMachine( - idGenerator: .init() + idGenerator: .init(), + lifecycleState: .running ) let mockRequest = MockHTTPRequest(eventLoop: elg.next()) @@ -284,7 +287,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let el1 = elg.next() let idGenerator = HTTPConnectionPool.Connection.ID.Generator() - var http1State = HTTPConnectionPool.HTTP1StateMachine(idGenerator: idGenerator, maximumConcurrentConnections: 8) + var http1State = HTTPConnectionPool.HTTP1StateMachine(idGenerator: idGenerator, maximumConcurrentConnections: 8, lifecycleState: .running) let mockRequest1 = MockHTTPRequest(eventLoop: el1) let request1 = HTTPConnectionPool.Request(mockRequest1) @@ -310,7 +313,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { // second connection is a HTTP2 connection and we need to migrate let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) - var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let http2ConnectAction = http2State.migrateFromHTTP1( http1Connections: http1State.connections, @@ -350,7 +353,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) @@ -395,7 +398,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) @@ -423,7 +426,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) @@ -458,7 +461,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) @@ -488,7 +491,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1( @@ -529,7 +532,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction1 = state.migrateFromHTTP1( @@ -670,6 +673,89 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertTrue(queuer.isEmpty) } + func testMigrationFromHTTP1ToHTTP2WhileShuttingDown() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + /// first 8 request should create a new connection + var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let connID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") + } + connectionIDs.append(connID) + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + guard let conn1ID = connectionIDs.first else { + return XCTFail("could not create connection") + } + + /// after we reached the `maximumConcurrentHTTP1Connections`, we will not create new connections + for _ in 0..<8 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + /// we now no longer want anything of it + let shutdownAction = state.shutdown() + + guard case .failRequestsAndCancelTimeouts(let requestsToCancel, let error) = shutdownAction.request else { + return XCTFail("unexpected shutdown action \(shutdownAction)") + } + + XCTAssertEqualTypeAndValue(error, HTTPClientError.cancelled) + + for request in requestsToCancel { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + XCTAssertTrue(queuer.isEmpty) + + /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(conn1ID, maxConcurrentStreams: 10)) + let migrationAction = state.newHTTP2ConnectionCreated(conn1, maxConcurrentStreams: 10) + XCTAssertEqual(migrationAction.request, .none) + XCTAssertEqual(migrationAction.connection, .migration( + createConnections: [], + closeConnections: [conn1], + scheduleTimeout: nil + )) + XCTAssertNoThrow(try connections.closeConnection(conn1)) + + /// remaining connections should be closed immediately without executing any request + for connID in connectionIDs.dropFirst().dropLast() { + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 10)) + let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) + XCTAssertNoThrow(try connections.closeConnection(conn)) + } + let lastConnID = connectionIDs.last! + let lastConn: HTTPConnectionPool.Connection = .__testOnly_connection(id: lastConnID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(lastConnID, maxConcurrentStreams: 10)) + let action = state.newHTTP2ConnectionCreated(lastConn, maxConcurrentStreams: 10) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .closeConnection(lastConn, isShutdown: .yes(unclean: true))) + XCTAssertNoThrow(try connections.closeConnection(lastConn)) + XCTAssertTrue(connections.isEmpty) + } + func testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() @@ -1108,3 +1194,26 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(state.http2ConnectionClosed(connection.id), .none) } } + +/// Should be used if you have a value of statically unknown type and want to compare its value to an `Equatable` type. +/// The assert will fail if the boths +/// - Note: if the type of both values are statically know, prefer `XCTAssertEqual`. +/// - Parameters: +/// - lhs: value of a statically unknown type +/// - rhs: value of statically known and `Equatable` type +func XCTAssertEqualTypeAndValue( + _ lhs: @autoclosure () throws -> Left, + _ rhs: @autoclosure () throws -> Right, + file: StaticString = #file, + line: UInt = #line +) { + XCTAssertNoThrow(try { + let lhs = try lhs() + let rhs = try rhs() + guard let lhsAsRhs = lhs as? Right else { + XCTFail("could not cast \(lhs) of type \(Right.self) to \(Left.self)") + return + } + XCTAssertEqual(lhsAsRhs, rhs) + }(), file: file, line: line) +} From 75ede301825f0fd45af2a195596ce1bab5258203 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 17 Dec 2021 13:31:07 +0100 Subject: [PATCH 2/4] simplify testMigrationFromHTTP1ToHTTP2WhileShuttingDown --- ...onnectionPool+HTTP2StateMachineTests.swift | 66 ++++--------------- 1 file changed, 13 insertions(+), 53 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 939b1c0b8..93b10f689 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -680,40 +680,21 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { var queuer = MockRequestQueuer() var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) - /// first 8 request should create a new connection - var connectionIDs: [HTTPConnectionPool.Connection.ID] = [] - for _ in 0..<8 { - let mockRequest = MockHTTPRequest(eventLoop: el1) - let request = HTTPConnectionPool.Request(mockRequest) - let action = state.executeRequest(request) - guard case .createConnection(let connID, let eventLoop) = action.connection else { - return XCTFail("Unexpected connection action \(action.connection)") - } - connectionIDs.append(connID) - XCTAssertTrue(eventLoop === el1) - XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) - XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) - XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) - } - - guard let conn1ID = connectionIDs.first else { - return XCTFail("could not create connection") - } - - /// after we reached the `maximumConcurrentHTTP1Connections`, we will not create new connections - for _ in 0..<8 { - let mockRequest = MockHTTPRequest(eventLoop: el1) - let request = HTTPConnectionPool.Request(mockRequest) - let action = state.executeRequest(request) - XCTAssertEqual(action.connection, .none) - XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) - - XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + /// create a new connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let conn1ID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") } + + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(conn1ID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) /// we now no longer want anything of it let shutdownAction = state.shutdown() - guard case .failRequestsAndCancelTimeouts(let requestsToCancel, let error) = shutdownAction.request else { return XCTFail("unexpected shutdown action \(shutdownAction)") } @@ -725,34 +706,13 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertTrue(queuer.isEmpty) - /// first new HTTP2 connection should migrate from HTTP1 to HTTP2 and execute requests + /// new HTTP2 connection should migrate from HTTP1 to HTTP2, close the connection and shutdown the pool let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(conn1ID, maxConcurrentStreams: 10)) let migrationAction = state.newHTTP2ConnectionCreated(conn1, maxConcurrentStreams: 10) XCTAssertEqual(migrationAction.request, .none) - XCTAssertEqual(migrationAction.connection, .migration( - createConnections: [], - closeConnections: [conn1], - scheduleTimeout: nil - )) + XCTAssertEqual(migrationAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) XCTAssertNoThrow(try connections.closeConnection(conn1)) - - /// remaining connections should be closed immediately without executing any request - for connID in connectionIDs.dropFirst().dropLast() { - let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el1) - XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 10)) - let action = state.newHTTP2ConnectionCreated(conn, maxConcurrentStreams: 10) - XCTAssertEqual(action.request, .none) - XCTAssertEqual(action.connection, .closeConnection(conn, isShutdown: .no)) - XCTAssertNoThrow(try connections.closeConnection(conn)) - } - let lastConnID = connectionIDs.last! - let lastConn: HTTPConnectionPool.Connection = .__testOnly_connection(id: lastConnID, eventLoop: el1) - XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(lastConnID, maxConcurrentStreams: 10)) - let action = state.newHTTP2ConnectionCreated(lastConn, maxConcurrentStreams: 10) - XCTAssertEqual(action.request, .none) - XCTAssertEqual(action.connection, .closeConnection(lastConn, isShutdown: .yes(unclean: true))) - XCTAssertNoThrow(try connections.closeConnection(lastConn)) XCTAssertTrue(connections.isEmpty) } From d7aa73365ebc49c93d3ad033f49c04f44735a178 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 17 Dec 2021 14:09:51 +0100 Subject: [PATCH 3/4] add http2 to http1 migration test --- ...onPool+HTTP2StateMachineTests+XCTest.swift | 1 + ...onnectionPool+HTTP2StateMachineTests.swift | 75 ++++++++++++++++++- 2 files changed, 74 insertions(+), 2 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index 9c1f4cd79..9dca0c934 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -40,6 +40,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testMigrationFromHTTP1ToHTTP2WhileShuttingDown", testMigrationFromHTTP1ToHTTP2WhileShuttingDown), ("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections), ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), + ("testHTTP2toHTTP1MigrationDuringShutdown", testHTTP2toHTTP1MigrationDuringShutdown), ("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires), ("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected), ("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed), diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 93b10f689..e92523796 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -687,7 +687,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { guard case .createConnection(let conn1ID, let eventLoop) = action.connection else { return XCTFail("Unexpected connection action \(action.connection)") } - + XCTAssertTrue(eventLoop === el1) XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) XCTAssertNoThrow(try connections.createConnection(conn1ID, on: el1)) @@ -698,7 +698,6 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { guard case .failRequestsAndCancelTimeouts(let requestsToCancel, let error) = shutdownAction.request else { return XCTFail("unexpected shutdown action \(shutdownAction)") } - XCTAssertEqualTypeAndValue(error, HTTPClientError.cancelled) for request in requestsToCancel { @@ -916,6 +915,78 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertNoThrow(try connections.closeConnection(http2Conn)) } + func testHTTP2toHTTP1MigrationDuringShutdown() { + let elg = EmbeddedEventLoopGroup(loops: 2) + let el1 = elg.next() + let el2 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + // create http2 connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest) + let action1 = state.executeRequest(request1) + guard case .createConnection(let http2ConnID, let http2EventLoop) = action1.connection else { + return XCTFail("Unexpected connection action \(action1.connection)") + } + XCTAssertTrue(http2EventLoop === el1) + XCTAssertEqual(action1.request, .scheduleRequestTimeout(for: request1, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http2ConnID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request1.id)) + let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http2ConnID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(http2ConnID, maxConcurrentStreams: 10)) + let migrationAction1 = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = migrationAction1.request else { + return XCTFail("unexpected request action \(migrationAction1.request)") + } + XCTAssertEqual(migrationAction1.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) + } + + // a request with new required event loop should create a new connection + let mockRequestWithRequiredEventLoop = MockHTTPRequest(eventLoop: el2, requiresEventLoopForChannel: true) + let requestWithRequiredEventLoop = HTTPConnectionPool.Request(mockRequestWithRequiredEventLoop) + let action2 = state.executeRequest(requestWithRequiredEventLoop) + guard case .createConnection(let http1ConnId, let http1EventLoop) = action2.connection else { + return XCTFail("Unexpected connection action \(action2.connection)") + } + XCTAssertTrue(http1EventLoop === el2) + XCTAssertEqual(action2.request, .scheduleRequestTimeout(for: requestWithRequiredEventLoop, on: mockRequestWithRequiredEventLoop.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http1ConnId, on: el2)) + XCTAssertNoThrow(try queuer.queue(mockRequestWithRequiredEventLoop, id: requestWithRequiredEventLoop.id)) + + /// we now no longer want anything of it + let shutdownAction = state.shutdown() + guard case .failRequestsAndCancelTimeouts(let requestsToCancel, let error) = shutdownAction.request else { + return XCTFail("unexpected shutdown action \(shutdownAction)") + } + XCTAssertEqualTypeAndValue(error, HTTPClientError.cancelled) + + for request in requestsToCancel { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + XCTAssertTrue(queuer.isEmpty) + + // if we established a new http/1 connection we should migrate back to http/1, + // close the connection and shutdown the pool + let http1Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http1ConnId, eventLoop: el2) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP1(http1ConnId)) + let migrationAction2 = state.newHTTP1ConnectionCreated(http1Conn) + XCTAssertEqual(migrationAction2.request, .none) + XCTAssertEqual(migrationAction2.connection, .migration(createConnections: [], closeConnections: [http1Conn], scheduleTimeout: nil)) + + // in http/1 state, we should close idle http2 connections + XCTAssertNoThrow(try connections.finishExecution(http2Conn.id)) + let releaseAction = state.http2ConnectionStreamClosed(http2Conn.id) + XCTAssertEqual(releaseAction.connection, .closeConnection(http2Conn, isShutdown: .yes(unclean: true))) + XCTAssertEqual(releaseAction.request, .none) + XCTAssertNoThrow(try connections.closeConnection(http2Conn)) + } + func testConnectionIsImmediatelyCreatedAfterBackoffTimerFires() { let elg = EmbeddedEventLoopGroup(loops: 2) let el1 = elg.next() From 8a7db1908f2437efc146fb088f40f81a7c070052 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 17 Dec 2021 14:11:44 +0100 Subject: [PATCH 4/4] fix documentation of `XCTAssertEqualTypeAndValue` --- .../HTTPConnectionPool+HTTP2StateMachineTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index e92523796..825ffc9b3 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -1226,8 +1226,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } } -/// Should be used if you have a value of statically unknown type and want to compare its value to an `Equatable` type. -/// The assert will fail if the boths +/// Should be used if you have a value of statically unknown type and want to compare its value to another `Equatable` value. +/// The assert will fail if both values don't have the same type or don't have the same value. /// - Note: if the type of both values are statically know, prefer `XCTAssertEqual`. /// - Parameters: /// - lhs: value of a statically unknown type