Skip to content

Fix HTTP1 to HTTP2 migration while shutdown is in progress #530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ import NIOCore

extension HTTPConnectionPool {
struct HTTP1StateMachine {
enum State: Equatable {
case running
case shuttingDown(unclean: Bool)
case shutDown
}

typealias Action = HTTPConnectionPool.StateMachine.Action
typealias ConnectionMigrationAction = HTTPConnectionPool.StateMachine.ConnectionMigrationAction
typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction
Expand All @@ -34,15 +28,20 @@ extension HTTPConnectionPool {
private var lastConnectFailure: Error?

private(set) var requests: RequestQueue
private var state: State = .running
private(set) var lifecycleState: StateMachine.LifecycleState

init(idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int) {
init(
idGenerator: Connection.ID.Generator,
maximumConcurrentConnections: Int,
lifecycleState: StateMachine.LifecycleState
) {
self.connections = HTTP1Connections(
maximumConcurrentConnections: maximumConcurrentConnections,
generator: idGenerator
)

self.requests = RequestQueue()
self.lifecycleState = lifecycleState
}

mutating func migrateFromHTTP2(
Expand Down Expand Up @@ -111,7 +110,7 @@ extension HTTPConnectionPool {
// MARK: - Events -

mutating func executeRequest(_ request: Request) -> Action {
switch self.state {
switch self.lifecycleState {
case .running:
if let eventLoop = request.requiredEventLoop {
return self.executeRequestOnRequiredEventLoop(request, eventLoop: eventLoop)
Expand Down Expand Up @@ -218,7 +217,7 @@ extension HTTPConnectionPool {
self.failedConsecutiveConnectionAttempts += 1
self.lastConnectFailure = error

switch self.state {
switch self.lifecycleState {
case .running:
// We don't care how many waiting requests we have at this point, we will schedule a
// retry. More tasks, may appear until the backoff has completed. The final
Expand All @@ -243,7 +242,7 @@ extension HTTPConnectionPool {
}

mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action {
switch self.state {
switch self.lifecycleState {
case .running:
// The naming of `failConnection` is a little confusing here. All it does is moving the
// connection state from `.backingOff` to `.closed` here. It also returns the
Expand Down Expand Up @@ -271,7 +270,7 @@ extension HTTPConnectionPool {
return .none
}

precondition(self.state == .running, "If we are shutting down, we must not have any idle connections")
precondition(self.lifecycleState == .running, "If we are shutting down, we must not have any idle connections")

return .init(
request: .none,
Expand Down Expand Up @@ -332,7 +331,7 @@ extension HTTPConnectionPool {
}

mutating func shutdown() -> Action {
precondition(self.state == .running, "Shutdown must only be called once")
precondition(self.lifecycleState == .running, "Shutdown must only be called once")

// If we have remaining request queued, we should fail all of them with a cancelled
// error.
Expand All @@ -350,10 +349,10 @@ extension HTTPConnectionPool {
let isShutdown: StateMachine.ConnectionAction.IsShutdown
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty)
if self.connections.isEmpty && self.http2Connections == nil {
self.state = .shutDown
self.lifecycleState = .shutDown
isShutdown = .yes(unclean: unclean)
} else {
self.state = .shuttingDown(unclean: unclean)
self.lifecycleState = .shuttingDown(unclean: unclean)
isShutdown = .no
}

Expand All @@ -371,7 +370,7 @@ extension HTTPConnectionPool {
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> EstablishedAction {
switch self.state {
switch self.lifecycleState {
case .running:
switch context.use {
case .generalPurpose:
Expand Down Expand Up @@ -457,7 +456,7 @@ extension HTTPConnectionPool {
at index: Int,
context: HTTP1Connections.FailedConnectionContext
) -> Action {
switch self.state {
switch self.lifecycleState {
case .running:
switch context.use {
case .generalPurpose:
Expand Down Expand Up @@ -537,7 +536,7 @@ extension HTTPConnectionPool {
}

mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action {
switch self.state {
switch self.lifecycleState {
case .running:
_ = self.http2Connections?.failConnection(connectionID)
if self.http2Connections?.isEmpty == true {
Expand Down Expand Up @@ -570,7 +569,7 @@ extension HTTPConnectionPool {
mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
switch self.state {
switch self.lifecycleState {
case .running:
let (index, context) = self.http2Connections!.releaseStream(connectionID)
guard context.isIdle else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ extension HTTPConnectionPool {
typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction
typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction

private enum State: Equatable {
case running
case shuttingDown(unclean: Bool)
case shutDown
}

private var lastConnectFailure: Error?
private var failedConsecutiveConnectionAttempts = 0

Expand All @@ -38,15 +32,17 @@ extension HTTPConnectionPool {

private let idGenerator: Connection.ID.Generator

private var state: State = .running
private(set) var lifecycleState: StateMachine.LifecycleState

init(
idGenerator: Connection.ID.Generator
idGenerator: Connection.ID.Generator,
lifecycleState: StateMachine.LifecycleState
) {
self.idGenerator = idGenerator
self.requests = RequestQueue()

self.connections = HTTP2Connections(generator: idGenerator)
self.lifecycleState = lifecycleState
}

mutating func migrateFromHTTP1(
Expand Down Expand Up @@ -112,7 +108,7 @@ extension HTTPConnectionPool {
}

mutating func executeRequest(_ request: Request) -> Action {
switch self.state {
switch self.lifecycleState {
case .running:
if let eventLoop = request.requiredEventLoop {
return self.executeRequest(request, onRequired: eventLoop)
Expand Down Expand Up @@ -236,7 +232,7 @@ extension HTTPConnectionPool {
at index: Int,
context: HTTP2Connections.EstablishedConnectionContext
) -> EstablishedAction {
switch self.state {
switch self.lifecycleState {
case .running:
// We prioritise requests with a required event loop over those without a requirement.
// This can cause starvation for request without a required event loop.
Expand Down Expand Up @@ -323,7 +319,7 @@ extension HTTPConnectionPool {
}

private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action {
switch self.state {
switch self.lifecycleState {
case .running:
// we do not know if we have created this connection for a request with a required
// event loop or not. However, we do not need this information and can infer
Expand Down Expand Up @@ -378,7 +374,7 @@ extension HTTPConnectionPool {
}

private mutating func nextActionForClosingConnection(on eventLoop: EventLoop) -> Action {
switch self.state {
switch self.lifecycleState {
case .running:
let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil)
guard hasPendingRequest else {
Expand Down Expand Up @@ -463,7 +459,7 @@ extension HTTPConnectionPool {
return .none
}

precondition(self.state == .running, "If we are shutting down, we must not have any idle connections")
precondition(self.lifecycleState == .running, "If we are shutting down, we must not have any idle connections")

return .init(
request: .none,
Expand All @@ -479,7 +475,7 @@ extension HTTPConnectionPool {
if self.http1Connections!.isEmpty {
self.http1Connections = nil
}
switch self.state {
switch self.lifecycleState {
case .running:
return .none
case .shuttingDown(let unclean):
Expand Down Expand Up @@ -510,7 +506,7 @@ extension HTTPConnectionPool {
self.http1Connections = nil

// we must also check, if we are shutting down. Was this maybe out last connection?
switch self.state {
switch self.lifecycleState {
case .running:
return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no))
case .shuttingDown(let unclean):
Expand Down Expand Up @@ -543,10 +539,10 @@ extension HTTPConnectionPool {
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil)
if self.connections.isEmpty && self.http1Connections == nil {
isShutdown = .yes(unclean: unclean)
self.state = .shutDown
self.lifecycleState = .shutDown
} else {
isShutdown = .no
self.state = .shuttingDown(unclean: unclean)
self.lifecycleState = .shuttingDown(unclean: unclean)
}
return .init(
request: requestAction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ extension HTTPConnectionPool {
case none
}

enum LifecycleState: Equatable {
case running
case shuttingDown(unclean: Bool)
case shutDown
}

enum HTTPVersionState {
case http1(HTTP1StateMachine)
case http2(HTTP2StateMachine)
Expand All @@ -88,7 +94,6 @@ extension HTTPConnectionPool {
}

var state: HTTPVersionState
var isShuttingDown: Bool = false

let idGenerator: Connection.ID.Generator
let maximumConcurrentHTTP1Connections: Int
Expand All @@ -98,7 +103,8 @@ extension HTTPConnectionPool {
self.idGenerator = idGenerator
let http1State = HTTP1StateMachine(
idGenerator: idGenerator,
maximumConcurrentConnections: maximumConcurrentHTTP1Connections
maximumConcurrentConnections: maximumConcurrentHTTP1Connections,
lifecycleState: .running
)
self.state = .http1(http1State)
}
Expand All @@ -121,7 +127,8 @@ extension HTTPConnectionPool {
case .http2(let http2StateMachine):
var http1StateMachine = HTTP1StateMachine(
idGenerator: self.idGenerator,
maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections
maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections,
lifecycleState: http2StateMachine.lifecycleState
)

let newConnectionAction = http1StateMachine.migrateFromHTTP2(
Expand All @@ -140,7 +147,8 @@ extension HTTPConnectionPool {
case .http1(let http1StateMachine):

var http2StateMachine = HTTP2StateMachine(
idGenerator: self.idGenerator
idGenerator: self.idGenerator,
lifecycleState: http1StateMachine.lifecycleState
)
let migrationAction = http2StateMachine.migrateFromHTTP1(
http1Connections: http1StateMachine.connections,
Expand Down Expand Up @@ -263,10 +271,6 @@ extension HTTPConnectionPool {
}

mutating func shutdown() -> Action {
precondition(!self.isShuttingDown, "Shutdown must only be called once")

self.isShuttingDown = true

return self.state.modify(http1: { http1 in
http1.shutdown()
}, http2: { http2 in
Expand Down
5 changes: 2 additions & 3 deletions Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
#if compiler(>=5.5) && canImport(_Concurrency)
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
XCTAsyncTest(timeout: 5) {
let bin = HTTPBin()
let bin = HTTPBin(.http2(compress: false))
defer { XCTAssertNoThrow(try bin.shutdown()) }
let client = makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }
Expand Down Expand Up @@ -368,7 +368,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
#if compiler(>=5.5) && canImport(_Concurrency)
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
XCTAsyncTest(timeout: 5) {
let bin = HTTPBin()
let bin = HTTPBin(.http2(compress: false))
defer { XCTAssertNoThrow(try bin.shutdown()) }
let client = makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }
Expand All @@ -381,7 +381,6 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
await XCTAssertThrowsError(try await task.value) {
XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.deadlineExceeded)
}
print("done")
}
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
("testGoAwayWithLeasedStream", testGoAwayWithLeasedStream),
("testGoAwayWithPendingRequestsStartsNewConnection", testGoAwayWithPendingRequestsStartsNewConnection),
("testMigrationFromHTTP1ToHTTP2", testMigrationFromHTTP1ToHTTP2),
("testMigrationFromHTTP1ToHTTP2WhileShuttingDown", testMigrationFromHTTP1ToHTTP2WhileShuttingDown),
("testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections", testMigrationFromHTTP1ToHTTP2WithAlreadyStartedHTTP1Connections),
("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration),
("testHTTP2toHTTP1MigrationDuringShutdown", testHTTP2toHTTP1MigrationDuringShutdown),
("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires),
("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected),
("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed),
Expand Down
Loading