-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathHTTPRequestStateMachine.swift
790 lines (671 loc) · 33.4 KB
/
HTTPRequestStateMachine.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
//===----------------------------------------------------------------------===//
//
// This source file is part of the AsyncHTTPClient open source project
//
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOHTTP1
import NIOSSL
struct HTTPRequestStateMachine {
fileprivate enum State {
/// The initial state machine state. The only valid mutation is `start()`. The state will
/// transitions to:
/// - `.waitForChannelToBecomeWritable`
/// - `.running(.streaming, .initialized)` (if the Channel is writable and if a request body is expected)
/// - `.running(.endSent, .initialized)` (if the Channel is writable and no request body is expected)
case initialized
/// Waiting for the channel to be writable. Valid transitions are:
/// - `.running(.streaming, .initialized)` (once the Channel is writable again and if a request body is expected)
/// - `.running(.endSent, .initialized)` (once the Channel is writable again and no request body is expected)
/// - `.failed` (if a connection error occurred)
case waitForChannelToBecomeWritable(HTTPRequestHead, RequestFramingMetadata)
/// A request is on the wire. Valid transitions are:
/// - `.finished`
/// - `.failed`
case running(RequestState, ResponseState)
/// The request has completed successfully
case finished
/// The request has failed
case failed(Error)
case modifying
}
/// A sub state for a running request. More specifically for sending a request body.
fileprivate enum RequestState {
/// A sub state for sending a request body. Stores whether a producer should produce more
/// bytes or should pause.
enum ProducerControlState: String {
/// The request body producer should produce more body bytes. The channel is writable.
case producing
/// The request body producer should pause producing more bytes. The channel is not writable.
case paused
}
/// The request is streaming its request body. `expectedBodyLength` has a value, if the request header contained
/// a `"content-length"` header field. If the request header contained a `"transfer-encoding" = "chunked"`
/// header field, the `expectedBodyLength` is `nil`.
case streaming(expectedBodyLength: Int?, sentBodyBytes: Int, producer: ProducerControlState)
/// The request has sent its request body and end.
case endSent
}
fileprivate enum ResponseState {
/// A response head has not been received yet.
case waitingForHead
/// A response head has been received and we are ready to consume more data off the wire
case receivingBody(HTTPResponseHead, ResponseStreamState)
/// A response end has been received. We don't expect more bytes from the wire.
case endReceived
}
enum Action {
/// A action to execute, when we consider a request "done".
enum FinalStreamAction {
/// Close the connection
case close
/// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded,
/// as soon as we wrote the request end onto the wire.
case sendRequestEnd
/// Do nothing. This is action is used, if the request failed, before we the request head was written onto the wire.
/// This might happen if the request is cancelled, or the request failed the soundness check.
case none
}
case sendRequestHead(HTTPRequestHead, startBody: Bool)
case sendBodyPart(IOData)
case sendRequestEnd
case pauseRequestBodyStream
case resumeRequestBodyStream
case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool)
case forwardResponseBodyParts(CircularBuffer<ByteBuffer>)
case failRequest(Error, FinalStreamAction)
case succeedRequest(FinalStreamAction, CircularBuffer<ByteBuffer>)
case read
case wait
}
private var state: State = .initialized
private var isChannelWritable: Bool
init(isChannelWritable: Bool) {
self.isChannelWritable = isChannelWritable
}
mutating func startRequest(head: HTTPRequestHead, metadata: RequestFramingMetadata) -> Action {
switch self.state {
case .initialized:
guard self.isChannelWritable else {
self.state = .waitForChannelToBecomeWritable(head, metadata)
return .wait
}
return self.startSendingRequest(head: head, metadata: metadata)
case .failed:
// The request state machine is marked as failed before the request is started, if
// the request was cancelled before hitting the channel handler. Before `startRequest`
// is called on the state machine, `willExecuteRequest` is called on
// `HTTPExecutableRequest`, which might loopback to state machines cancel method.
return .wait
case .running, .finished, .waitForChannelToBecomeWritable, .modifying:
preconditionFailure("`startRequest()` must be called first, and exactly once. Invalid state: \(self.state)")
}
}
mutating func writabilityChanged(writable: Bool) -> Action {
if writable {
return self.channelIsWritable()
} else {
return self.channelIsNotWritable()
}
}
private mutating func channelIsWritable() -> Action {
self.isChannelWritable = true
switch self.state {
case .initialized,
.running(.streaming(_, _, producer: .producing), _),
.running(.endSent, _),
.finished,
.failed:
return .wait
case .waitForChannelToBecomeWritable(let head, let metadata):
return self.startSendingRequest(head: head, metadata: metadata)
case .running(.streaming(_, _, producer: .paused), .receivingBody(let head, _)) where head.status.code >= 300:
// If we are receiving a response with a status of >= 300, we should not send out
// further request body parts. The remote already signaled with status >= 300 that it
// won't be interested. Let's save some bandwidth.
return .wait
case .running(.streaming(let expectedBody, let sentBodyBytes, producer: .paused), let responseState):
let requestState: RequestState = .streaming(
expectedBodyLength: expectedBody,
sentBodyBytes: sentBodyBytes,
producer: .producing
)
self.state = .running(requestState, responseState)
return .resumeRequestBodyStream
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
private mutating func channelIsNotWritable() -> Action {
self.isChannelWritable = false
switch self.state {
case .initialized,
.waitForChannelToBecomeWritable,
.running(.streaming(_, _, producer: .paused), _),
.running(.endSent, _),
.finished,
.failed:
return .wait
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .producing), let responseState):
let requestState: RequestState = .streaming(
expectedBodyLength: expectedBodyLength,
sentBodyBytes: sentBodyBytes,
producer: .paused
)
self.state = .running(requestState, responseState)
return .pauseRequestBodyStream
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func errorHappened(_ error: Error) -> Action {
if let error = error as? NIOSSLError,
error == .uncleanShutdown,
let action = self.handleNIOSSLUncleanShutdownError() {
return action
}
switch self.state {
case .initialized:
preconditionFailure("After the state machine has been initialized, start must be called immediately. Thus this state is unreachable")
case .waitForChannelToBecomeWritable:
// the request failed, before it was sent onto the wire.
self.state = .failed(error)
return .failRequest(error, .none)
case .running:
self.state = .failed(error)
return .failRequest(error, .close)
case .finished, .failed:
// ignore error
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
private mutating func handleNIOSSLUncleanShutdownError() -> Action? {
switch self.state {
case .running(.streaming, .waitingForHead),
.running(.endSent, .waitingForHead):
// if we received a NIOSSL.uncleanShutdown before we got an answer we should handle
// this like a normal connection close. We will receive a call to channelInactive after
// this error.
return .wait
case .running(.streaming, .receivingBody(let responseHead, _)),
.running(.endSent, .receivingBody(let responseHead, _)):
// This code is only reachable for request and responses, which we expect to have a body.
// We depend on logic from the HTTPResponseDecoder here. The decoder will emit an
// HTTPResponsePart.end right after the HTTPResponsePart.head, for every request with a
// CONNECT or HEAD method and every response with a 1xx, 204 or 304 response status.
//
// For this reason we only need to check the "content-length" or "transfer-encoding"
// headers here to determine if we are potentially in an EOF terminated response.
if responseHead.headers.contains(name: "content-length") || responseHead.headers.contains(name: "transfer-encoding") {
// If we have already received the response head, the parser will ensure that we
// receive a complete response, if the content-length or transfer-encoding header
// was set. In this case we can ignore the NIOSSLError.uncleanShutdown. We will see
// a HTTPParserError very soon.
return .wait
}
// If the response is EOF terminated, we need to rely on a clean tls shutdown to be sure
// we have received all necessary bytes. For this reason we forward the uncleanShutdown
// error to the user.
self.state = .failed(NIOSSLError.uncleanShutdown)
return .failRequest(NIOSSLError.uncleanShutdown, .close)
case .waitForChannelToBecomeWritable, .running, .finished, .failed, .initialized, .modifying:
return nil
}
}
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
switch self.state {
case .initialized,
.waitForChannelToBecomeWritable,
.running(.endSent, _):
preconditionFailure("We must be in the request streaming phase, if we receive further body parts. Invalid state: \(self.state)")
case .running(.streaming(_, _, let producerState), .receivingBody(let head, _)) where head.status.code >= 300:
// If we have already received a response head with status >= 300, we won't send out any
// further request body bytes. Since the remote signaled with status >= 300, that it
// won't be interested. We expect that the producer has been informed to pause
// producing.
assert(producerState == .paused)
return .wait
case .running(.streaming(let expectedBodyLength, var sentBodyBytes, let producerState), let responseState):
// We don't check the producer state here:
//
// No matter if the `producerState` is either `.producing` or `.paused` any bytes we
// receive shall be forwarded to the Channel right away. As long as we have not received
// a response with status >= 300.
//
// More streamed data is accepted, even though the producer may have been asked to
// pause. The reason for this is as follows: There might be thread synchronization
// situations in which the producer might not have received the plea to pause yet.
if let expected = expectedBodyLength, sentBodyBytes + part.readableBytes > expected {
let error = HTTPClientError.bodyLengthMismatch
self.state = .failed(error)
return .failRequest(error, .close)
}
sentBodyBytes += part.readableBytes
let requestState: RequestState = .streaming(
expectedBodyLength: expectedBodyLength,
sentBodyBytes: sentBodyBytes,
producer: producerState
)
self.state = .running(requestState, responseState)
return .sendBodyPart(part)
case .failed:
return .wait
case .finished:
// A request may be finished, before we have send all parts. This might be the case if
// the server responded with an HTTP status code that is equal or larger to 300
// (Redirection, Client Error or Server Error). In those cases we pause the request body
// stream as soon as we have received the response head and we succeed the request as
// when response end is received. This may mean, that we succeed a request, even though
// we have not sent all it's body parts.
// We may still receive something, here because of potential race conditions with the
// producing thread.
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func requestStreamFinished() -> Action {
switch self.state {
case .initialized,
.waitForChannelToBecomeWritable,
.running(.endSent, _):
preconditionFailure("A request body stream end is only expected if we are in state request streaming. Invalid state: \(self.state)")
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .waitingForHead):
if let expected = expectedBodyLength, expected != sentBodyBytes {
let error = HTTPClientError.bodyLengthMismatch
self.state = .failed(error)
return .failRequest(error, .close)
}
self.state = .running(.endSent, .waitingForHead)
return .sendRequestEnd
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .receivingBody(let head, let streamState)):
assert(head.status.code < 300)
if let expected = expectedBodyLength, expected != sentBodyBytes {
let error = HTTPClientError.bodyLengthMismatch
self.state = .failed(error)
return .failRequest(error, .close)
}
self.state = .running(.endSent, .receivingBody(head, streamState))
return .sendRequestEnd
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .endReceived):
if let expected = expectedBodyLength, expected != sentBodyBytes {
let error = HTTPClientError.bodyLengthMismatch
self.state = .failed(error)
return .failRequest(error, .close)
}
self.state = .finished
return .succeedRequest(.sendRequestEnd, .init())
case .failed:
return .wait
case .finished:
// A request may be finished, before we have send all parts. This might be the case if
// the server responded with an HTTP status code that is equal or larger to 300
// (Redirection, Client Error or Server Error). In those cases we pause the request body
// stream as soon as we have received the response head and we succeed the request as
// when response end is received. This may mean, that we succeed a request, even though
// we have not sent all it's body parts.
// We may still receive something, here because of potential race conditions with the
// producing thread.
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func requestCancelled() -> Action {
switch self.state {
case .initialized, .waitForChannelToBecomeWritable:
let error = HTTPClientError.cancelled
self.state = .failed(error)
// Okay, this has different semantics for HTTP/1 and HTTP/2. In HTTP/1 we don't want to
// close the connection, if we haven't sent anything yet, to reuse the connection for
// another request. In HTTP/2 we must close the channel to ensure it is released from
// HTTP/2 multiplexer.
return .failRequest(error, .none)
case .running:
let error = HTTPClientError.cancelled
self.state = .failed(error)
return .failRequest(error, .close)
case .finished:
return .wait
case .failed:
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func channelInactive() -> Action {
switch self.state {
case .initialized, .waitForChannelToBecomeWritable, .running:
let error = HTTPClientError.remoteConnectionClosed
self.state = .failed(error)
return .failRequest(error, .none)
case .finished:
return .wait
case .failed:
// don't overwrite error
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
// MARK: - Response
mutating func read() -> Action {
switch self.state {
case .initialized,
.waitForChannelToBecomeWritable,
.running(_, .waitingForHead),
.running(_, .endReceived),
.finished,
.failed:
// If we are not in the middle of streaming the response body, we always want to get
// more data...
return .read
case .running(let requestState, .receivingBody(let head, var streamState)):
// This should never happen. But we don't want to precondition this behavior. Let's just
// pass the read event on
return self.avoidingStateMachineCoW { state -> Action in
let action = streamState.read()
state = .running(requestState, .receivingBody(head, streamState))
return action.toRequestAction()
}
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func channelRead(_ part: HTTPClientResponsePart) -> Action {
switch part {
case .head(let head):
return self.receivedHTTPResponseHead(head)
case .body(let body):
return self.receivedHTTPResponseBodyPart(body)
case .end:
return self.receivedHTTPResponseEnd()
}
}
mutating func channelReadComplete() -> Action {
switch self.state {
case .initialized,
.waitForChannelToBecomeWritable,
.running(_, .waitingForHead),
.running(_, .endReceived),
.finished,
.failed:
return .wait
case .running(let requestState, .receivingBody(let head, var streamState)):
// This should never happen. But we don't want to precondition this behavior. Let's just
// pass the read event on
return self.avoidingStateMachineCoW { state -> Action in
let buffer = streamState.channelReadComplete()
state = .running(requestState, .receivingBody(head, streamState))
if let buffer = buffer {
return .forwardResponseBodyParts(buffer)
} else {
return .wait
}
}
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
private mutating func receivedHTTPResponseHead(_ head: HTTPResponseHead) -> Action {
guard head.status.code >= 200 || head.status == .switchingProtocols else {
// We ignore any leading 1xx headers except for 101 (switching protocols). The
// HTTP1ConnectionStateMachine ensures the connection close for 101 after the `.end` is
// received.
return .wait
}
switch self.state {
case .initialized, .waitForChannelToBecomeWritable:
preconditionFailure("How can we receive a response head before sending a request head ourselves")
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .paused), .waitingForHead):
self.state = .running(
.streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused),
.receivingBody(head, .init())
)
return .forwardResponseHead(head, pauseRequestBodyStream: false)
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .producing), .waitingForHead):
if head.status.code >= 300 {
self.state = .running(
.streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .paused),
.receivingBody(head, .init())
)
return .forwardResponseHead(head, pauseRequestBodyStream: true)
} else {
self.state = .running(
.streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: .producing),
.receivingBody(head, .init())
)
return .forwardResponseHead(head, pauseRequestBodyStream: false)
}
case .running(.endSent, .waitingForHead):
self.state = .running(.endSent, .receivingBody(head, .init()))
return .forwardResponseHead(head, pauseRequestBodyStream: false)
case .running(_, .receivingBody), .running(_, .endReceived), .finished:
preconditionFailure("How can we successfully finish the request, before having received a head. Invalid state: \(self.state)")
case .failed:
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func receivedHTTPResponseBodyPart(_ body: ByteBuffer) -> Action {
switch self.state {
case .initialized, .waitForChannelToBecomeWritable:
preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)")
case .running(_, .waitingForHead):
preconditionFailure("How can we receive a response body, if we haven't received a head. Invalid state: \(self.state)")
case .running(let requestState, .receivingBody(let head, var responseStreamState)):
return self.avoidingStateMachineCoW { state -> Action in
responseStreamState.receivedBodyPart(body)
state = .running(requestState, .receivingBody(head, responseStreamState))
return .wait
}
case .running(_, .endReceived), .finished:
preconditionFailure("How can we successfully finish the request, before having received a head. Invalid state: \(self.state)")
case .failed:
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
private mutating func receivedHTTPResponseEnd() -> Action {
switch self.state {
case .initialized, .waitForChannelToBecomeWritable:
preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)")
case .running(_, .waitingForHead):
preconditionFailure("How can we receive a response end, if we haven't a received a head. Invalid state: \(self.state)")
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, let producerState), .receivingBody(let head, var responseStreamState))
where head.status.code < 300:
return self.avoidingStateMachineCoW { state -> Action in
let (remainingBuffer, connectionAction) = responseStreamState.end()
switch connectionAction {
case .none:
state = .running(
.streaming(expectedBodyLength: expectedBodyLength, sentBodyBytes: sentBodyBytes, producer: producerState),
.endReceived
)
return .forwardResponseBodyParts(remainingBuffer)
case .close:
// If we receive a `.close` as a connectionAction from the responseStreamState
// this means, that the response end was signaled by a connection close. Since
// the request is still uploading, we will not be able to finish the upload. For
// this reason we can fail the request here.
state = .failed(HTTPClientError.remoteConnectionClosed)
return .failRequest(HTTPClientError.remoteConnectionClosed, .close)
}
}
case .running(.streaming(_, _, let producerState), .receivingBody(let head, var responseStreamState)):
assert(head.status.code >= 300)
assert(producerState == .paused, "Expected to have paused the request body stream, when the head was received. Invalid state: \(self.state)")
return self.avoidingStateMachineCoW { state -> Action in
// We can ignore the connectionAction from the responseStreamState, since the
// connection should be closed anyway.
let (remainingBuffer, _) = responseStreamState.end()
state = .finished
return .succeedRequest(.close, remainingBuffer)
}
case .running(.endSent, .receivingBody(_, var responseStreamState)):
return self.avoidingStateMachineCoW { state -> Action in
let (remainingBuffer, action) = responseStreamState.end()
state = .finished
switch action {
case .none:
return .succeedRequest(.none, remainingBuffer)
case .close:
return .succeedRequest(.close, remainingBuffer)
}
}
case .running(_, .endReceived), .finished:
preconditionFailure("How can we receive a response end, if another one was already received. Invalid state: \(self.state)")
case .failed:
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func demandMoreResponseBodyParts() -> Action {
switch self.state {
case .initialized,
.running(_, .waitingForHead),
.waitForChannelToBecomeWritable:
preconditionFailure("The response is expected to only ask for more data after the response head was forwarded")
case .running(let requestState, .receivingBody(let head, var responseStreamState)):
return self.avoidingStateMachineCoW { state -> Action in
let action = responseStreamState.demandMoreResponseBodyParts()
state = .running(requestState, .receivingBody(head, responseStreamState))
return action.toRequestAction()
}
case .running(_, .endReceived),
.finished,
.failed:
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
mutating func idleReadTimeoutTriggered() -> Action {
switch self.state {
case .initialized,
.waitForChannelToBecomeWritable,
.running(.streaming, _):
preconditionFailure("We only schedule idle read timeouts after we have sent the complete request. Invalid state: \(self.state)")
case .running(.endSent, .waitingForHead), .running(.endSent, .receivingBody):
let error = HTTPClientError.readTimeout
self.state = .failed(error)
return .failRequest(error, .close)
case .running(.endSent, .endReceived):
preconditionFailure("Invalid state. This state should be: .finished")
case .finished, .failed:
return .wait
case .modifying:
preconditionFailure("Invalid state: \(self.state)")
}
}
private mutating func startSendingRequest(head: HTTPRequestHead, metadata: RequestFramingMetadata) -> Action {
switch metadata.body {
case .stream:
self.state = .running(.streaming(expectedBodyLength: nil, sentBodyBytes: 0, producer: .producing), .waitingForHead)
return .sendRequestHead(head, startBody: true)
case .fixedSize(0):
// no body
self.state = .running(.endSent, .waitingForHead)
return .sendRequestHead(head, startBody: false)
case .fixedSize(let length):
// length is greater than zero and we therefore have a body to send
self.state = .running(.streaming(expectedBodyLength: length, sentBodyBytes: 0, producer: .producing), .waitingForHead)
return .sendRequestHead(head, startBody: true)
}
}
}
extension HTTPRequestStateMachine {
/// So, uh...this function needs some explaining.
///
/// While the state machine logic above is great, there is a downside to having all of the state machine data in
/// associated data on enumerations: any modification of that data will trigger copy on write for heap-allocated
/// data. That means that for _every operation on the state machine_ we will CoW our underlying state, which is
/// not good.
///
/// The way we can avoid this is by using this helper function. It will temporarily set state to a value with no
/// associated data, before attempting the body of the function. It will also verify that the state machine never
/// remains in this bad state.
///
/// A key note here is that all callers must ensure that they return to a good state before they exit.
///
/// Sadly, because it's generic and has a closure, we need to force it to be inlined at all call sites, which is
/// not ideal.
@inline(__always)
private mutating func avoidingStateMachineCoW<ReturnType>(_ body: (inout State) -> ReturnType) -> ReturnType {
self.state = .modifying
defer {
assert(!self.isModifying)
}
return body(&self.state)
}
private var isModifying: Bool {
if case .modifying = self.state {
return true
} else {
return false
}
}
}
extension HTTPRequestStateMachine.ResponseStreamState.Action {
func toRequestAction() -> HTTPRequestStateMachine.Action {
switch self {
case .read:
return .read
case .wait:
return .wait
}
}
}
extension HTTPRequestStateMachine: CustomStringConvertible {
var description: String {
switch self.state {
case .initialized:
return "HTTPRequestStateMachine(.initialized, isWritable: \(self.isChannelWritable))"
case .waitForChannelToBecomeWritable:
return "HTTPRequestStateMachine(.waitForChannelToBecomeWritable, isWritable: \(self.isChannelWritable))"
case .running(let requestState, let responseState):
return "HTTPRequestStateMachine(.running(request: \(requestState), response: \(responseState)), isWritable: \(self.isChannelWritable))"
case .finished:
return "HTTPRequestStateMachine(.finished, isWritable: \(self.isChannelWritable))"
case .failed(let error):
return "HTTPRequestStateMachine(.failed(\(error)), isWritable: \(self.isChannelWritable))"
case .modifying:
return "HTTPRequestStateMachine(.modifying, isWritable: \(self.isChannelWritable))"
}
}
}
extension HTTPRequestStateMachine.RequestState: CustomStringConvertible {
var description: String {
switch self {
case .streaming(expectedBodyLength: let expected, let sent, producer: let producer):
return ".streaming(sent: \(expected != nil ? String(expected!) : "-"), sent: \(sent), producer: \(producer)"
case .endSent:
return ".endSent"
}
}
}
extension HTTPRequestStateMachine.ResponseState: CustomStringConvertible {
var description: String {
switch self {
case .waitingForHead:
return ".waitingForHead"
case .receivingBody(let head, let streamState):
return ".receivingBody(\(head), streamState: \(streamState))"
case .endReceived:
return ".endReceived"
}
}
}