diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 36d8328a0..6b3f7352e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -16,12 +16,6 @@ import NIOCore extension HTTPConnectionPool { struct HTTP1StateMachine { - enum State: Equatable { - case running - case shuttingDown(unclean: Bool) - case shutDown - } - typealias Action = HTTPConnectionPool.StateMachine.Action typealias ConnectionMigrationAction = HTTPConnectionPool.StateMachine.ConnectionMigrationAction typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction @@ -34,15 +28,20 @@ extension HTTPConnectionPool { private var lastConnectFailure: Error? private(set) var requests: RequestQueue - private var state: State = .running + private(set) var lifecycleState: StateMachine.LifecycleState - init(idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int) { + init( + idGenerator: Connection.ID.Generator, + maximumConcurrentConnections: Int, + lifecycleState: StateMachine.LifecycleState + ) { self.connections = HTTP1Connections( maximumConcurrentConnections: maximumConcurrentConnections, generator: idGenerator ) self.requests = RequestQueue() + self.lifecycleState = lifecycleState } mutating func migrateFromHTTP2( @@ -111,7 +110,7 @@ extension HTTPConnectionPool { // MARK: - Events - mutating func executeRequest(_ request: Request) -> Action { - switch self.state { + switch self.lifecycleState { case .running: if let eventLoop = request.requiredEventLoop { return self.executeRequestOnRequiredEventLoop(request, eventLoop: eventLoop) @@ -218,7 +217,7 @@ extension HTTPConnectionPool { self.failedConsecutiveConnectionAttempts += 1 self.lastConnectFailure = error - switch self.state { + switch self.lifecycleState { case .running: // We don't care how many waiting requests we have at this point, we will schedule a // retry. More tasks, may appear until the backoff has completed. The final @@ -243,7 +242,7 @@ extension HTTPConnectionPool { } mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { - switch self.state { + switch self.lifecycleState { case .running: // The naming of `failConnection` is a little confusing here. All it does is moving the // connection state from `.backingOff` to `.closed` here. It also returns the @@ -271,7 +270,7 @@ extension HTTPConnectionPool { return .none } - precondition(self.state == .running, "If we are shutting down, we must not have any idle connections") + precondition(self.lifecycleState == .running, "If we are shutting down, we must not have any idle connections") return .init( request: .none, @@ -332,7 +331,7 @@ extension HTTPConnectionPool { } mutating func shutdown() -> Action { - precondition(self.state == .running, "Shutdown must only be called once") + precondition(self.lifecycleState == .running, "Shutdown must only be called once") // If we have remaining request queued, we should fail all of them with a cancelled // error. @@ -350,10 +349,10 @@ extension HTTPConnectionPool { let isShutdown: StateMachine.ConnectionAction.IsShutdown let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty) if self.connections.isEmpty && self.http2Connections == nil { - self.state = .shutDown + self.lifecycleState = .shutDown isShutdown = .yes(unclean: unclean) } else { - self.state = .shuttingDown(unclean: unclean) + self.lifecycleState = .shuttingDown(unclean: unclean) isShutdown = .no } @@ -371,7 +370,7 @@ extension HTTPConnectionPool { at index: Int, context: HTTP1Connections.IdleConnectionContext ) -> EstablishedAction { - switch self.state { + switch self.lifecycleState { case .running: switch context.use { case .generalPurpose: @@ -457,7 +456,7 @@ extension HTTPConnectionPool { at index: Int, context: HTTP1Connections.FailedConnectionContext ) -> Action { - switch self.state { + switch self.lifecycleState { case .running: switch context.use { case .generalPurpose: @@ -537,7 +536,7 @@ extension HTTPConnectionPool { } mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { - switch self.state { + switch self.lifecycleState { case .running: _ = self.http2Connections?.failConnection(connectionID) if self.http2Connections?.isEmpty == true { @@ -570,7 +569,7 @@ extension HTTPConnectionPool { mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { // It is save to bang the http2Connections here. If we get this callback but we don't have // http2 connections something has gone terribly wrong. - switch self.state { + switch self.lifecycleState { case .running: let (index, context) = self.http2Connections!.releaseStream(connectionID) guard context.isIdle else { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 932175696..d3e6fbdcd 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -22,12 +22,6 @@ extension HTTPConnectionPool { typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction - private enum State: Equatable { - case running - case shuttingDown(unclean: Bool) - case shutDown - } - private var lastConnectFailure: Error? private var failedConsecutiveConnectionAttempts = 0 @@ -38,15 +32,17 @@ extension HTTPConnectionPool { private let idGenerator: Connection.ID.Generator - private var state: State = .running + private(set) var lifecycleState: StateMachine.LifecycleState init( - idGenerator: Connection.ID.Generator + idGenerator: Connection.ID.Generator, + lifecycleState: StateMachine.LifecycleState ) { self.idGenerator = idGenerator self.requests = RequestQueue() self.connections = HTTP2Connections(generator: idGenerator) + self.lifecycleState = lifecycleState } mutating func migrateFromHTTP1( @@ -112,7 +108,7 @@ extension HTTPConnectionPool { } mutating func executeRequest(_ request: Request) -> Action { - switch self.state { + switch self.lifecycleState { case .running: if let eventLoop = request.requiredEventLoop { return self.executeRequest(request, onRequired: eventLoop) @@ -236,7 +232,7 @@ extension HTTPConnectionPool { at index: Int, context: HTTP2Connections.EstablishedConnectionContext ) -> EstablishedAction { - switch self.state { + switch self.lifecycleState { case .running: // We prioritise requests with a required event loop over those without a requirement. // This can cause starvation for request without a required event loop. @@ -323,7 +319,7 @@ extension HTTPConnectionPool { } private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action { - switch self.state { + switch self.lifecycleState { case .running: // we do not know if we have created this connection for a request with a required // event loop or not. However, we do not need this information and can infer @@ -378,7 +374,7 @@ extension HTTPConnectionPool { } private mutating func nextActionForClosingConnection(on eventLoop: EventLoop) -> Action { - switch self.state { + switch self.lifecycleState { case .running: let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) guard hasPendingRequest else { @@ -463,7 +459,7 @@ extension HTTPConnectionPool { return .none } - precondition(self.state == .running, "If we are shutting down, we must not have any idle connections") + precondition(self.lifecycleState == .running, "If we are shutting down, we must not have any idle connections") return .init( request: .none, @@ -479,7 +475,7 @@ extension HTTPConnectionPool { if self.http1Connections!.isEmpty { self.http1Connections = nil } - switch self.state { + switch self.lifecycleState { case .running: return .none case .shuttingDown(let unclean): @@ -510,7 +506,7 @@ extension HTTPConnectionPool { self.http1Connections = nil // we must also check, if we are shutting down. Was this maybe out last connection? - switch self.state { + switch self.lifecycleState { case .running: return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) case .shuttingDown(let unclean): @@ -543,10 +539,10 @@ extension HTTPConnectionPool { let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil) if self.connections.isEmpty && self.http1Connections == nil { isShutdown = .yes(unclean: unclean) - self.state = .shutDown + self.lifecycleState = .shutDown } else { isShutdown = .no - self.state = .shuttingDown(unclean: unclean) + self.lifecycleState = .shuttingDown(unclean: unclean) } return .init( request: requestAction, diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index 964d9b794..4d912633c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -66,6 +66,12 @@ extension HTTPConnectionPool { case none } + enum LifecycleState: Equatable { + case running + case shuttingDown(unclean: Bool) + case shutDown + } + enum HTTPVersionState { case http1(HTTP1StateMachine) case http2(HTTP2StateMachine) @@ -88,7 +94,6 @@ extension HTTPConnectionPool { } var state: HTTPVersionState - var isShuttingDown: Bool = false let idGenerator: Connection.ID.Generator let maximumConcurrentHTTP1Connections: Int @@ -98,7 +103,8 @@ extension HTTPConnectionPool { self.idGenerator = idGenerator let http1State = HTTP1StateMachine( idGenerator: idGenerator, - maximumConcurrentConnections: maximumConcurrentHTTP1Connections + maximumConcurrentConnections: maximumConcurrentHTTP1Connections, + lifecycleState: .running ) self.state = .http1(http1State) } @@ -121,7 +127,8 @@ extension HTTPConnectionPool { case .http2(let http2StateMachine): var http1StateMachine = HTTP1StateMachine( idGenerator: self.idGenerator, - maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections + maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections, + lifecycleState: http2StateMachine.lifecycleState ) let newConnectionAction = http1StateMachine.migrateFromHTTP2( @@ -140,7 +147,8 @@ extension HTTPConnectionPool { case .http1(let http1StateMachine): var http2StateMachine = HTTP2StateMachine( - idGenerator: self.idGenerator + idGenerator: self.idGenerator, + lifecycleState: http1StateMachine.lifecycleState ) let migrationAction = http2StateMachine.migrateFromHTTP1( http1Connections: http1StateMachine.connections, @@ -263,10 +271,6 @@ extension HTTPConnectionPool { } mutating func shutdown() -> Action { - precondition(!self.isShuttingDown, "Shutdown must only be called once") - - self.isShuttingDown = true - return self.state.modify(http1: { http1 in http1.shutdown() }, http2: { http2 in diff --git a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift index 7b43d8b96..7c5f61681 100644 --- a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift +++ b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift @@ -322,7 +322,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase { #if compiler(>=5.5) && canImport(_Concurrency) guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } XCTAsyncTest(timeout: 5) { - let bin = HTTPBin() + let bin = HTTPBin(.http2(compress: false)) defer { XCTAssertNoThrow(try bin.shutdown()) } let client = makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } @@ -368,7 +368,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase { #if compiler(>=5.5) && canImport(_Concurrency) guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return } XCTAsyncTest(timeout: 5) { - let bin = HTTPBin() + let bin = HTTPBin(.http2(compress: false)) defer { XCTAssertNoThrow(try bin.shutdown()) } let client = makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } @@ -381,7 +381,6 @@ final class AsyncAwaitEndToEndTests: XCTestCase { await XCTAssertThrowsError(try await task.value) { XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.deadlineExceeded) } - print("done") } #endif } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index ef1ed1f04..9dca0c934 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -37,8 +37,10 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream), ("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection), ("testMigrationFromHTTP1ToHTTP2", testMigrationFromHTTP1ToHTTP2), + ("testMigrationFromHTTP1ToHTTP2WhileShuttingDown", testMigrationFromHTTP1ToHTTP2WhileShuttingDown), ("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections), ("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration), + ("testHTTP2toHTTP1MigrationDuringShutdown", testHTTP2toHTTP1MigrationDuringShutdown), ("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires), ("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected), ("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed), diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 049d18f7f..825ffc9b3 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -29,7 +29,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let el1 = elg.next() var connections = MockConnectionPool() var queuer = MockRequestQueuer() - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: .init()) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: .init(), lifecycleState: .running) /// first request should create a new connection let mockRequest = MockHTTPRequest(eventLoop: el1) @@ -137,7 +137,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.HTTP2StateMachine( - idGenerator: .init() + idGenerator: .init(), + lifecycleState: .running ) let mockRequest = MockHTTPRequest(eventLoop: elg.next()) @@ -193,7 +194,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.HTTP2StateMachine( - idGenerator: .init() + idGenerator: .init(), + lifecycleState: .running ) let mockRequest = MockHTTPRequest(eventLoop: elg.next()) @@ -230,7 +232,8 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } var state = HTTPConnectionPool.HTTP2StateMachine( - idGenerator: .init() + idGenerator: .init(), + lifecycleState: .running ) let mockRequest = MockHTTPRequest(eventLoop: elg.next()) @@ -284,7 +287,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let el1 = elg.next() let idGenerator = HTTPConnectionPool.Connection.ID.Generator() - var http1State = HTTPConnectionPool.HTTP1StateMachine(idGenerator: idGenerator, maximumConcurrentConnections: 8) + var http1State = HTTPConnectionPool.HTTP1StateMachine(idGenerator: idGenerator, maximumConcurrentConnections: 8, lifecycleState: .running) let mockRequest1 = MockHTTPRequest(eventLoop: el1) let request1 = HTTPConnectionPool.Request(mockRequest1) @@ -310,7 +313,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { // second connection is a HTTP2 connection and we need to migrate let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) - var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let http2ConnectAction = http2State.migrateFromHTTP1( http1Connections: http1State.connections, @@ -350,7 +353,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) @@ -395,7 +398,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) @@ -423,7 +426,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1(http1Connections: http1Conns, requests: .init(), newHTTP2Connection: conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) @@ -458,7 +461,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) @@ -488,7 +491,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction = state.migrateFromHTTP1( @@ -529,7 +532,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator, lifecycleState: .running) let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) let connectAction1 = state.migrateFromHTTP1( @@ -670,6 +673,48 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertTrue(queuer.isEmpty) } + func testMigrationFromHTTP1ToHTTP2WhileShuttingDown() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + /// create a new connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + guard case .createConnection(let conn1ID, let eventLoop) = action.connection else { + return XCTFail("Unexpected connection action \(action.connection)") + } + + XCTAssertTrue(eventLoop === el1) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(conn1ID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + + /// we now no longer want anything of it + let shutdownAction = state.shutdown() + guard case .failRequestsAndCancelTimeouts(let requestsToCancel, let error) = shutdownAction.request else { + return XCTFail("unexpected shutdown action \(shutdownAction)") + } + XCTAssertEqualTypeAndValue(error, HTTPClientError.cancelled) + + for request in requestsToCancel { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + XCTAssertTrue(queuer.isEmpty) + + /// new HTTP2 connection should migrate from HTTP1 to HTTP2, close the connection and shutdown the pool + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(conn1ID, maxConcurrentStreams: 10)) + let migrationAction = state.newHTTP2ConnectionCreated(conn1, maxConcurrentStreams: 10) + XCTAssertEqual(migrationAction.request, .none) + XCTAssertEqual(migrationAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) + XCTAssertNoThrow(try connections.closeConnection(conn1)) + XCTAssertTrue(connections.isEmpty) + } + func testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() @@ -870,6 +915,78 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertNoThrow(try connections.closeConnection(http2Conn)) } + func testHTTP2toHTTP1MigrationDuringShutdown() { + let elg = EmbeddedEventLoopGroup(loops: 2) + let el1 = elg.next() + let el2 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.StateMachine(idGenerator: .init(), maximumConcurrentHTTP1Connections: 8) + + // create http2 connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest) + let action1 = state.executeRequest(request1) + guard case .createConnection(let http2ConnID, let http2EventLoop) = action1.connection else { + return XCTFail("Unexpected connection action \(action1.connection)") + } + XCTAssertTrue(http2EventLoop === el1) + XCTAssertEqual(action1.request, .scheduleRequestTimeout(for: request1, on: mockRequest.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http2ConnID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request1.id)) + let http2Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http2ConnID, eventLoop: el1) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP2(http2ConnID, maxConcurrentStreams: 10)) + let migrationAction1 = state.newHTTP2ConnectionCreated(http2Conn, maxConcurrentStreams: 10) + guard case .executeRequestsAndCancelTimeouts(let requests, http2Conn) = migrationAction1.request else { + return XCTFail("unexpected request action \(migrationAction1.request)") + } + XCTAssertEqual(migrationAction1.connection, .migration(createConnections: [], closeConnections: [], scheduleTimeout: nil)) + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + XCTAssertNoThrow(try connections.execute(request.__testOnly_wrapped_request(), on: http2Conn)) + } + + // a request with new required event loop should create a new connection + let mockRequestWithRequiredEventLoop = MockHTTPRequest(eventLoop: el2, requiresEventLoopForChannel: true) + let requestWithRequiredEventLoop = HTTPConnectionPool.Request(mockRequestWithRequiredEventLoop) + let action2 = state.executeRequest(requestWithRequiredEventLoop) + guard case .createConnection(let http1ConnId, let http1EventLoop) = action2.connection else { + return XCTFail("Unexpected connection action \(action2.connection)") + } + XCTAssertTrue(http1EventLoop === el2) + XCTAssertEqual(action2.request, .scheduleRequestTimeout(for: requestWithRequiredEventLoop, on: mockRequestWithRequiredEventLoop.eventLoop)) + XCTAssertNoThrow(try connections.createConnection(http1ConnId, on: el2)) + XCTAssertNoThrow(try queuer.queue(mockRequestWithRequiredEventLoop, id: requestWithRequiredEventLoop.id)) + + /// we now no longer want anything of it + let shutdownAction = state.shutdown() + guard case .failRequestsAndCancelTimeouts(let requestsToCancel, let error) = shutdownAction.request else { + return XCTFail("unexpected shutdown action \(shutdownAction)") + } + XCTAssertEqualTypeAndValue(error, HTTPClientError.cancelled) + + for request in requestsToCancel { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + XCTAssertTrue(queuer.isEmpty) + + // if we established a new http/1 connection we should migrate back to http/1, + // close the connection and shutdown the pool + let http1Conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: http1ConnId, eventLoop: el2) + XCTAssertNoThrow(try connections.succeedConnectionCreationHTTP1(http1ConnId)) + let migrationAction2 = state.newHTTP1ConnectionCreated(http1Conn) + XCTAssertEqual(migrationAction2.request, .none) + XCTAssertEqual(migrationAction2.connection, .migration(createConnections: [], closeConnections: [http1Conn], scheduleTimeout: nil)) + + // in http/1 state, we should close idle http2 connections + XCTAssertNoThrow(try connections.finishExecution(http2Conn.id)) + let releaseAction = state.http2ConnectionStreamClosed(http2Conn.id) + XCTAssertEqual(releaseAction.connection, .closeConnection(http2Conn, isShutdown: .yes(unclean: true))) + XCTAssertEqual(releaseAction.request, .none) + XCTAssertNoThrow(try connections.closeConnection(http2Conn)) + } + func testConnectionIsImmediatelyCreatedAfterBackoffTimerFires() { let elg = EmbeddedEventLoopGroup(loops: 2) let el1 = elg.next() @@ -1108,3 +1225,26 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(state.http2ConnectionClosed(connection.id), .none) } } + +/// Should be used if you have a value of statically unknown type and want to compare its value to another `Equatable` value. +/// The assert will fail if both values don't have the same type or don't have the same value. +/// - Note: if the type of both values are statically know, prefer `XCTAssertEqual`. +/// - Parameters: +/// - lhs: value of a statically unknown type +/// - rhs: value of statically known and `Equatable` type +func XCTAssertEqualTypeAndValue( + _ lhs: @autoclosure () throws -> Left, + _ rhs: @autoclosure () throws -> Right, + file: StaticString = #file, + line: UInt = #line +) { + XCTAssertNoThrow(try { + let lhs = try lhs() + let rhs = try rhs() + guard let lhsAsRhs = lhs as? Right else { + XCTFail("could not cast \(lhs) of type \(Right.self) to \(Left.self)") + return + } + XCTAssertEqual(lhsAsRhs, rhs) + }(), file: file, line: line) +}