Skip to content

Commit 0a2004b

Browse files
authored
[HTTP1] Tolerate immediate write errors (swift-server#579)
Same fix for HTTP/1 that landed for HTTP/2 in swift-server#558. ### Motivation `HTTP1ClientChannelHandler` currently does not tolerate immediate write errors. ### Changes Make `HTTP1ClientChannelHandler` resilient to failing writes. ### Result Less crashes in AHC HTTP/1.
1 parent d2da15c commit 0a2004b

File tree

3 files changed

+108
-19
lines changed

3 files changed

+108
-19
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+44-19
Original file line numberDiff line numberDiff line change
@@ -183,23 +183,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
183183
private func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) {
184184
switch action {
185185
case .sendRequestHead(let head, startBody: let startBody):
186-
if startBody {
187-
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
188-
context.flush()
189-
190-
self.request!.requestHeadSent()
191-
self.request!.resumeRequestBodyStream()
192-
} else {
193-
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
194-
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
195-
context.flush()
196-
197-
self.request!.requestHeadSent()
198-
199-
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
200-
self.runTimeoutAction(timeoutAction, context: context)
201-
}
202-
}
186+
self.sendRequestHead(head, startBody: startBody, context: context)
203187

204188
case .sendBodyPart(let part):
205189
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
@@ -212,9 +196,13 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
212196
}
213197

214198
case .pauseRequestBodyStream:
199+
// We can force unwrap the request here, as we have just validated in the state machine,
200+
// that the request is neither failed nor finished yet
215201
self.request!.pauseRequestBodyStream()
216202

217203
case .resumeRequestBodyStream:
204+
// We can force unwrap the request here, as we have just validated in the state machine,
205+
// that the request is neither failed nor finished yet
218206
self.request!.resumeRequestBodyStream()
219207

220208
case .fireChannelActive:
@@ -239,15 +227,25 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
239227
break
240228

241229
case .forwardResponseHead(let head, let pauseRequestBodyStream):
230+
// We can force unwrap the request here, as we have just validated in the state machine,
231+
// that the request is neither failed nor finished yet
242232
self.request!.receiveResponseHead(head)
243-
if pauseRequestBodyStream {
244-
self.request!.pauseRequestBodyStream()
233+
if pauseRequestBodyStream, let request = self.request {
234+
// The above response head forward might lead the request to mark itself as
235+
// cancelled, which in turn might pop the request of the handler. For this reason we
236+
// must check if the request is still present here.
237+
request.pauseRequestBodyStream()
245238
}
246239

247240
case .forwardResponseBodyParts(let buffer):
241+
// We can force unwrap the request here, as we have just validated in the state machine,
242+
// that the request is neither failed nor finished yet
248243
self.request!.receiveResponseBodyParts(buffer)
249244

250245
case .succeedRequest(let finalAction, let buffer):
246+
// We can force unwrap the request here, as we have just validated in the state machine,
247+
// that the request is neither failed nor finished yet
248+
251249
// The order here is very important...
252250
// We first nil our own task property! `taskCompleted` will potentially lead to
253251
// situations in which we get a new request right away. We should finish the task
@@ -293,6 +291,33 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
293291
}
294292
}
295293

294+
private func sendRequestHead(_ head: HTTPRequestHead, startBody: Bool, context: ChannelHandlerContext) {
295+
if startBody {
296+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
297+
298+
// The above write might trigger an error, which may lead to a call to `errorCaught`,
299+
// which in turn, may fail the request and pop it from the handler. For this reason
300+
// we must check if the request is still present here.
301+
guard let request = self.request else { return }
302+
request.requestHeadSent()
303+
request.resumeRequestBodyStream()
304+
} else {
305+
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
306+
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
307+
context.flush()
308+
309+
// The above write might trigger an error, which may lead to a call to `errorCaught`,
310+
// which in turn, may fail the request and pop it from the handler. For this reason
311+
// we must check if the request is still present here.
312+
guard let request = self.request else { return }
313+
request.requestHeadSent()
314+
315+
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
316+
self.runTimeoutAction(timeoutAction, context: context)
317+
}
318+
}
319+
}
320+
296321
private func runTimeoutAction(_ action: IdleReadStateMachine.Action, context: ChannelHandlerContext) {
297322
switch action {
298323
case .startIdleReadTimeoutTimer(let timeAmount):

Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ extension HTTP1ClientChannelHandlerTests {
3131
("testIdleReadTimeout", testIdleReadTimeout),
3232
("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled),
3333
("testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand", testFailHTTPRequestWithContentLengthBecauseOfChannelInactiveWaitingForDemand),
34+
("testWriteHTTPHeadFails", testWriteHTTPHeadFails),
3435
]
3536
}
3637
}

Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift

+63
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,69 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
394394
XCTAssertEqual($0 as? HTTPClientError, .remoteConnectionClosed)
395395
}
396396
}
397+
398+
func testWriteHTTPHeadFails() {
399+
struct WriteError: Error, Equatable {}
400+
401+
class FailWriteHandler: ChannelOutboundHandler {
402+
typealias OutboundIn = HTTPClientRequestPart
403+
typealias OutboundOut = HTTPClientRequestPart
404+
405+
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
406+
let error = WriteError()
407+
promise?.fail(error)
408+
context.fireErrorCaught(error)
409+
}
410+
}
411+
412+
let bodies: [HTTPClient.Body?] = [
413+
.none,
414+
.some(.byteBuffer(ByteBuffer(string: "hello world"))),
415+
]
416+
417+
for body in bodies {
418+
let embedded = EmbeddedChannel()
419+
var maybeTestUtils: HTTP1TestTools?
420+
XCTAssertNoThrow(maybeTestUtils = try embedded.setupHTTP1Connection())
421+
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }
422+
423+
XCTAssertNoThrow(try embedded.pipeline.syncOperations.addHandler(FailWriteHandler(), position: .after(testUtils.readEventHandler)))
424+
425+
let logger = Logger(label: "test")
426+
427+
var maybeRequest: HTTPClient.Request?
428+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: body))
429+
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
430+
431+
let delegate = ResponseAccumulator(request: request)
432+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
433+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
434+
request: request,
435+
eventLoopPreference: .delegate(on: embedded.eventLoop),
436+
task: .init(eventLoop: embedded.eventLoop, logger: logger),
437+
redirectHandler: nil,
438+
connectionDeadline: .now() + .seconds(30),
439+
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
440+
delegate: delegate
441+
))
442+
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
443+
444+
embedded.isWritable = false
445+
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())
446+
embedded.write(requestBag, promise: nil)
447+
448+
// the handler only writes once the channel is writable
449+
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
450+
embedded.isWritable = true
451+
embedded.pipeline.fireChannelWritabilityChanged()
452+
453+
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
454+
XCTAssertEqual($0 as? WriteError, WriteError())
455+
}
456+
457+
XCTAssertEqual(embedded.isActive, false)
458+
}
459+
}
397460
}
398461

399462
class TestBackpressureWriter {

0 commit comments

Comments
 (0)