Skip to content

Commit 951033e

Browse files
committed
refactor actor into class with a lock
1 parent a288b60 commit 951033e

File tree

2 files changed

+132
-123
lines changed

2 files changed

+132
-123
lines changed

Sources/AsyncHTTPClient/AsyncAwait/AsyncRequestBag.swift

+131-122
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
import Logging
16-
import NIO
16+
import NIOCore
17+
import NIOConcurrencyHelpers
1718
import NIOSSL
1819
import NIOHTTP1
1920

2021
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
21-
actor AsyncRequestBag {
22+
class AsyncRequestBag {
2223
// TODO: We should drop the request after sending to free up resource ASAP
2324
let request: HTTPClientRequest
2425

@@ -32,8 +33,8 @@ actor AsyncRequestBag {
3233
let preferredEventLoop: EventLoop
3334
let poolKey: ConnectionPool.Key
3435

36+
private let stateLock = Lock()
3537
private var state: StateMachine = .init()
36-
private var isCancelled = false
3738

3839
init(
3940
request: HTTPClientRequest,
@@ -54,12 +55,6 @@ actor AsyncRequestBag {
5455
self.requestFramingMetadata = validatedRequest.requestFramingMetadata
5556
}
5657

57-
nonisolated func cancel() {
58-
Task.detached {
59-
await self.cancel0()
60-
}
61-
}
62-
6358
func result() async throws -> HTTPClientResponse {
6459
try await withUnsafeThrowingContinuation { continuation in
6560
self.state.registerContinuation(continuation)
@@ -68,48 +63,63 @@ actor AsyncRequestBag {
6863

6964
// MARK: Scheduled request
7065

71-
private func cancel0() {
72-
self.isCancelled = true
73-
self.fail0(HTTPClientError.cancelled)
66+
func cancel() {
67+
self.fail(HTTPClientError.cancelled)
7468
}
7569

76-
private func requestWasQueued0(_ scheduler: HTTPRequestScheduler) {
77-
self.state.requestWasQueued(scheduler)
70+
func requestWasQueued(_ scheduler: HTTPRequestScheduler) {
71+
self.stateLock.withLock {
72+
self.state.requestWasQueued(scheduler)
73+
}
7874
}
7975

80-
private func fail0(_ error: Error) {
81-
switch self.state.fail(error) {
76+
func fail(_ error: Error) {
77+
let action = self.stateLock.withLock {
78+
self.state.fail(error)
79+
}
80+
81+
switch action {
8282
case .none:
8383
break
8484

85-
case .failResponseStream(let continuation, let error, let executor):
86-
continuation.resume(throwing: error)
87-
executor.cancelRequest(self)
88-
8985
case .failContinuation(let continuation, let error, let scheduler, let executor):
86+
// TODO: better name needed. This is actually fail response before head received
9087
continuation.resume(throwing: error)
91-
scheduler?.cancelRequest(self)
88+
scheduler?.cancelRequest(self) // NOTE: scheduler and executor are exclusive here
9289
executor?.cancelRequest(self)
90+
91+
case .failResponseStream(let continuation, let error, let executor):
92+
continuation.resume(throwing: error)
93+
executor.cancelRequest(self)
9394
}
9495
}
9596

9697
// MARK: Scheduled request
9798

98-
private func willExecuteRequest0(_ executor: HTTPRequestExecutor) {
99-
if !self.state.willExecuteRequest(executor) {
99+
func willExecuteRequest(_ executor: HTTPRequestExecutor) {
100+
let cancelExecutor = self.stateLock.withLock {
101+
// TODO: willExecuteRequest should return an action enum. We dislike magic bools
102+
!self.state.willExecuteRequest(executor)
103+
}
104+
105+
if cancelExecutor {
100106
return executor.cancelRequest(self)
101107
}
102108
}
103109

104-
private func resumeRequestBodyStream0() async {
105-
switch self.state.resumeRequestBodyStream() {
110+
func resumeRequestBodyStream() {
111+
let action = self.stateLock.withLock {
112+
self.state.resumeRequestBodyStream()
113+
}
114+
115+
switch action {
106116
case .none:
107117
break
108118
case .resumeStream(let allocator):
109119
switch self.request.body?.mode {
110120
case .asyncSequence(_, let next):
111121
// it is safe to call this async here. it dispatches...
112-
await self.writeRequestStream(allocator, next: next)
122+
self.continueRequestBodyStream(allocator, next: next)
113123

114124
case .byteBuffer(let byteBuffer):
115125
guard case .write(let part, let executor, true) = self.state.producedNextRequestPart(.byteBuffer(byteBuffer)) else {
@@ -153,13 +163,73 @@ actor AsyncRequestBag {
153163
}
154164
}
155165
}
166+
167+
enum AfterNextBodyPartAction {
168+
case `continue`
169+
case pause
170+
}
171+
172+
private func requestBodyStreamNextPart(_ part: IOData) -> AfterNextBodyPartAction {
173+
let writeAction = self.stateLock.withLock {
174+
self.state.producedNextRequestPart(part)
175+
}
176+
177+
switch writeAction {
178+
case .write(let part, let executor, let continueAfter):
179+
executor.writeRequestBodyPart(part, request: self)
180+
if continueAfter {
181+
return .continue
182+
} else {
183+
return .pause
184+
}
185+
186+
case .ignore:
187+
// we only ignore reads, if the request has failed anyway. we should leave
188+
// the reader loop
189+
return .pause
190+
}
191+
}
192+
193+
private func requestBodyStreamFinished() {
194+
let finishAction = self.stateLock.withLock {
195+
self.state.finishRequestBodyStream()
196+
}
197+
// no more data to produce
198+
switch finishAction {
199+
case .none:
200+
break
201+
case .forwardStreamFinished(let executor):
202+
executor.finishRequestBodyStream(self)
203+
}
204+
return
205+
}
206+
207+
private func requestBodyStreamFailed(_ error: Error) {
208+
let failAction = self.stateLock.withLock {
209+
self.state.failedToProduceNextRequestPart(error)
210+
}
211+
212+
switch failAction {
213+
case .none:
214+
break
215+
case .informRequestAboutFailure(let error, cancelExecutor: let executor, let continuation):
216+
executor.cancelRequest(self)
217+
self.fail(error)
218+
continuation?.resume(throwing: error)
219+
}
220+
}
156221

157-
private func pauseRequestBodyStream0() {
158-
self.state.pauseRequestBodyStream()
222+
func pauseRequestBodyStream() {
223+
self.stateLock.withLock {
224+
self.state.pauseRequestBodyStream()
225+
}
159226
}
160227

161-
private func receiveResponseHead0(_ head: HTTPResponseHead) {
162-
switch self.state.receiveResponseHead(head) {
228+
func receiveResponseHead(_ head: HTTPResponseHead) {
229+
let action = self.stateLock.withLock {
230+
self.state.receiveResponseHead(head)
231+
}
232+
switch action {
163233
case .none:
164234
break
165235
case .succeedResponseHead(let head, let continuation):
@@ -173,17 +243,23 @@ actor AsyncRequestBag {
173243
}
174244
}
175245

176-
private func receiveResponseBodyParts0(_ buffer: CircularBuffer<ByteBuffer>) {
177-
switch self.state.receiveResponseBodyParts(buffer) {
246+
func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) {
247+
let action = self.stateLock.withLock {
248+
self.state.receiveResponseBodyParts(buffer)
249+
}
250+
switch action {
178251
case .none:
179252
break
180253
case .succeedContinuation(let continuation, let bytes):
181254
continuation.resume(returning: bytes)
182255
}
183256
}
184257

185-
private func succeedRequest0(_ buffer: CircularBuffer<ByteBuffer>?) {
186-
switch self.state.succeedRequest(buffer) {
258+
func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?) {
259+
let succeedAction = self.stateLock.withLock {
260+
self.state.succeedRequest(buffer)
261+
}
262+
switch succeedAction {
187263
case .succeedRequest(let continuation):
188264
continuation.resume(returning: nil)
189265
case .succeedContinuation(let continuation, let byteBuffer):
@@ -195,111 +271,48 @@ actor AsyncRequestBag {
195271

196272
// MARK: Other methods
197273

198-
private func writeRequestStream(
274+
private func continueRequestBodyStream(
199275
_ allocator: ByteBufferAllocator,
200276
next: @escaping ((ByteBufferAllocator) async throws -> IOData?)
201-
) async {
202-
while true {
203-
do {
204-
guard let part = try await next(allocator) else { // <---- dispatch point!
205-
// no more data to produce
206-
switch self.state.finishRequestBodyStream() {
207-
case .none:
208-
break
209-
case .forwardStreamFinished(let executor):
210-
executor.finishRequestBodyStream(self)
277+
) {
278+
Task {
279+
while true {
280+
do {
281+
guard let part = try await next(allocator) else { // <---- dispatch point!
282+
return self.requestBodyStreamFinished()
211283
}
212-
return
213-
}
214-
215-
let action = self.state.producedNextRequestPart(part)
216-
switch action {
217-
case .write(let part, let executor, let continueAfter):
218-
executor.writeRequestBodyPart(part, request: self)
219-
if !continueAfter {
284+
285+
switch self.requestBodyStreamNextPart(part) {
286+
case .pause:
220287
return
288+
case .continue:
289+
continue
221290
}
222-
case .ignore:
291+
292+
} catch {
293+
// producing more failed
294+
self.requestBodyStreamFailed(error)
223295
return
224296
}
225-
} catch {
226-
// producing more failed
227-
switch self.state.failedToProduceNextRequestPart(error) {
228-
case .none:
229-
break
230-
case .informRequestAboutFailure(let error, cancelExecutor: let executor, let continuation):
231-
executor.cancelRequest(self)
232-
self.fail(error)
233-
continuation?.resume(throwing: error)
234-
}
235-
return
236297
}
237298
}
238299
}
239300
}
240301

241302
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
242303
extension AsyncRequestBag: HTTPSchedulableRequest {
243-
nonisolated var tlsConfiguration: TLSConfiguration? {
304+
var tlsConfiguration: TLSConfiguration? {
244305
return nil
245306
}
246307

247-
nonisolated var requiredEventLoop: EventLoop? {
308+
var requiredEventLoop: EventLoop? {
248309
return nil
249310
}
250-
251-
nonisolated func requestWasQueued(_ scheduler: HTTPRequestScheduler) {
252-
Task.detached {
253-
await self.requestWasQueued0(scheduler)
254-
}
255-
}
256-
257-
nonisolated func fail(_ error: Error) {
258-
Task.detached {
259-
await self.fail0(error)
260-
}
261-
}
262311
}
263312

264313
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
265314
extension AsyncRequestBag: HTTPExecutableRequest {
266-
nonisolated func willExecuteRequest(_ executor: HTTPRequestExecutor) {
267-
Task.detached {
268-
await self.willExecuteRequest0(executor)
269-
}
270-
}
271-
272-
nonisolated func requestHeadSent() {}
273-
274-
nonisolated func resumeRequestBodyStream() {
275-
Task.detached {
276-
await self.resumeRequestBodyStream0()
277-
}
278-
}
279-
280-
nonisolated func pauseRequestBodyStream() {
281-
Task.detached {
282-
await self.pauseRequestBodyStream0()
283-
}
284-
}
285-
286-
nonisolated func receiveResponseHead(_ head: HTTPResponseHead) {
287-
Task.detached {
288-
await self.receiveResponseHead0(head)
289-
}
290-
}
291-
292-
nonisolated func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) {
293-
Task.detached {
294-
await self.receiveResponseBodyParts0(buffer)
295-
}
296-
}
297-
298-
nonisolated func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?) {
299-
Task.detached {
300-
await self.succeedRequest0(buffer)
301-
}
302-
}
315+
func requestHeadSent() {}
303316
}
304317

305318
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
@@ -317,11 +330,7 @@ extension AsyncRequestBag {
317330
}
318331
}
319332

320-
func cancelResponseStream0(streamID: HTTPClientResponse.Body.IteratorStream.ID) {}
321-
322-
nonisolated func cancelResponseStream(streamID: HTTPClientResponse.Body.IteratorStream.ID) {
323-
Task.detached {
324-
await self.cancelResponseStream0(streamID: streamID)
325-
}
333+
func cancelResponseStream(streamID: HTTPClientResponse.Body.IteratorStream.ID) {
334+
self.cancel()
326335
}
327336
}

Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ struct HTTPClientRequest {
2424
internal enum Mode {
2525
case asyncSequence(length: Int?, (ByteBufferAllocator) async throws -> (IOData?))
2626
// case asyncSequenceFactory(() -> Mode) // typealias (ByteBufferAllocator) async throws -> IOData?
27-
case sequence(length: Int?, (ByteBufferAllocator) throws -> ByteBuffer)
27+
case sequence(length: Int?, (ByteBufferAllocator) throws -> ByteBuffer) // TODO: remove `throws`
2828
case byteBuffer(ByteBuffer)
2929
}
3030

0 commit comments

Comments
 (0)