Skip to content

Commit 65d97ff

Browse files
authored
Improve test utils for running request (#512)
### Motivation To land support for async/await, we need test utilities. We already have a `MockRequestExecutor`. Let's improve this to better handle on and off `EventLoop` request processing. ### Changes - Move `MockRequestExecutor` into its own file - Add blocking APIs to `MockRequestExecutor` when called from another thread
1 parent 5ce7377 commit 65d97ff

File tree

3 files changed

+301
-77
lines changed

3 files changed

+301
-77
lines changed

Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,7 @@ internal struct HTTPResponseBuilder {
641641
}
642642
}
643643

644-
internal struct RequestInfo: Codable {
644+
internal struct RequestInfo: Codable, Equatable {
645645
var data: String
646646
var requestNumber: Int
647647
var connectionNumber: Int
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AsyncHTTPClient
16+
import NIOConcurrencyHelpers
17+
import NIOCore
18+
19+
// This is a MockRequestExecutor, that is synchronized on its EventLoop.
20+
final class MockRequestExecutor {
21+
enum Errors: Error {
22+
case eof
23+
case unexpectedFileRegion
24+
case unexpectedByteBuffer
25+
}
26+
27+
enum RequestParts: Equatable {
28+
case body(IOData)
29+
case endOfStream
30+
31+
var isBody: Bool {
32+
switch self {
33+
case .body:
34+
return true
35+
case .endOfStream:
36+
return false
37+
}
38+
}
39+
}
40+
41+
let eventLoop: EventLoop
42+
let _blockingQueue = BlockingQueue<RequestParts>()
43+
let pauseRequestBodyPartStreamAfterASingleWrite: Bool
44+
45+
var isCancelled: Bool {
46+
if self.eventLoop.inEventLoop {
47+
return self._isCancelled
48+
} else {
49+
return try! self.eventLoop.submit { self._isCancelled }.wait()
50+
}
51+
}
52+
53+
var signalledDemandForResponseBody: Bool {
54+
if self.eventLoop.inEventLoop {
55+
return self._signaledDemandForResponseBody
56+
} else {
57+
return try! self.eventLoop.submit { self._signaledDemandForResponseBody }.wait()
58+
}
59+
}
60+
61+
var requestBodyPartsCount: Int {
62+
return self._blockingQueue.count
63+
}
64+
65+
private var request: HTTPExecutableRequest?
66+
private var _requestBodyParts = CircularBuffer<RequestParts>()
67+
private var _signaledDemandForRequestBody: Bool = false
68+
private var _signaledDemandForResponseBody: Bool = false
69+
private var _whenWritable: EventLoopPromise<RequestParts>?
70+
private var _isCancelled: Bool = false
71+
72+
init(pauseRequestBodyPartStreamAfterASingleWrite: Bool = false, eventLoop: EventLoop) {
73+
self.pauseRequestBodyPartStreamAfterASingleWrite = pauseRequestBodyPartStreamAfterASingleWrite
74+
self.eventLoop = eventLoop
75+
}
76+
77+
func runRequest(_ request: HTTPExecutableRequest) {
78+
if self.eventLoop.inEventLoop {
79+
self.runRequest0(request)
80+
} else {
81+
self.eventLoop.execute {
82+
self.runRequest0(request)
83+
}
84+
}
85+
}
86+
87+
private func runRequest0(_ request: HTTPExecutableRequest) {
88+
precondition(self.request == nil)
89+
self.request = request
90+
request.willExecuteRequest(self)
91+
}
92+
93+
func receiveRequestBody(deadline: NIODeadline = .now() + .seconds(60), _ verify: (ByteBuffer) throws -> Void) throws {
94+
enum ReceiveAction {
95+
case value(RequestParts)
96+
case future(EventLoopFuture<RequestParts>)
97+
}
98+
99+
switch try self._blockingQueue.popFirst(deadline: deadline) {
100+
case .body(.byteBuffer(let buffer)):
101+
try verify(buffer)
102+
case .body(.fileRegion):
103+
throw Errors.unexpectedFileRegion
104+
case .endOfStream:
105+
throw Errors.eof
106+
}
107+
}
108+
109+
func receiveEndOfStream(deadline: NIODeadline = .now() + .seconds(60)) throws {
110+
enum ReceiveAction {
111+
case value(RequestParts)
112+
case future(EventLoopFuture<RequestParts>)
113+
}
114+
115+
switch try self._blockingQueue.popFirst(deadline: deadline) {
116+
case .body(.byteBuffer):
117+
throw Errors.unexpectedByteBuffer
118+
case .body(.fileRegion):
119+
throw Errors.unexpectedFileRegion
120+
case .endOfStream:
121+
break
122+
}
123+
}
124+
125+
func pauseRequestBodyStream() {
126+
if self.eventLoop.inEventLoop {
127+
self.pauseRequestBodyStream0()
128+
} else {
129+
self.eventLoop.execute {
130+
self.pauseRequestBodyStream0()
131+
}
132+
}
133+
}
134+
135+
private func pauseRequestBodyStream0() {
136+
if self._signaledDemandForRequestBody == true {
137+
self._signaledDemandForRequestBody = false
138+
self.request!.pauseRequestBodyStream()
139+
}
140+
}
141+
142+
func resumeRequestBodyStream() {
143+
if self.eventLoop.inEventLoop {
144+
self.resumeRequestBodyStream0()
145+
} else {
146+
self.eventLoop.execute {
147+
self.resumeRequestBodyStream0()
148+
}
149+
}
150+
}
151+
152+
private func resumeRequestBodyStream0() {
153+
if self._signaledDemandForRequestBody == false {
154+
self._signaledDemandForRequestBody = true
155+
self.request!.resumeRequestBodyStream()
156+
}
157+
}
158+
159+
func resetResponseStreamDemandSignal() {
160+
if self.eventLoop.inEventLoop {
161+
self.resetResponseStreamDemandSignal0()
162+
} else {
163+
self.eventLoop.execute {
164+
self.resetResponseStreamDemandSignal0()
165+
}
166+
}
167+
}
168+
169+
func resetResponseStreamDemandSignal0() {
170+
self._signaledDemandForResponseBody = false
171+
}
172+
}
173+
174+
extension MockRequestExecutor: HTTPRequestExecutor {
175+
// this should always be called twice. When we receive the first call, the next call to produce
176+
// data is already scheduled. If we call pause here, once, after the second call new subsequent
177+
// calls should not be scheduled.
178+
func writeRequestBodyPart(_ part: IOData, request: HTTPExecutableRequest) {
179+
self.writeNextRequestPart(.body(part), request: request)
180+
}
181+
182+
func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
183+
self.writeNextRequestPart(.endOfStream, request: request)
184+
}
185+
186+
private func writeNextRequestPart(_ part: RequestParts, request: HTTPExecutableRequest) {
187+
enum WriteAction {
188+
case pauseBodyStream
189+
case none
190+
}
191+
192+
let stateChange = { () -> WriteAction in
193+
var pause = false
194+
if self._blockingQueue.isEmpty && self.pauseRequestBodyPartStreamAfterASingleWrite && part.isBody {
195+
pause = true
196+
self._signaledDemandForRequestBody = false
197+
}
198+
199+
self._blockingQueue.append(.success(part))
200+
201+
return pause ? .pauseBodyStream : .none
202+
}
203+
204+
let action: WriteAction
205+
if self.eventLoop.inEventLoop {
206+
action = stateChange()
207+
} else {
208+
action = try! self.eventLoop.submit(stateChange).wait()
209+
}
210+
211+
switch action {
212+
case .pauseBodyStream:
213+
request.pauseRequestBodyStream()
214+
case .none:
215+
return
216+
}
217+
}
218+
219+
func demandResponseBodyStream(_: HTTPExecutableRequest) {
220+
if self.eventLoop.inEventLoop {
221+
self._signaledDemandForResponseBody = true
222+
} else {
223+
self.eventLoop.execute { self._signaledDemandForResponseBody = true }
224+
}
225+
}
226+
227+
func cancelRequest(_: HTTPExecutableRequest) {
228+
if self.eventLoop.inEventLoop {
229+
self._isCancelled = true
230+
} else {
231+
self.eventLoop.execute { self._isCancelled = true }
232+
}
233+
}
234+
}
235+
236+
extension MockRequestExecutor {
237+
final class BlockingQueue<Element> {
238+
private let condition = ConditionLock(value: false)
239+
private var buffer = CircularBuffer<Result<Element, Error>>()
240+
241+
public struct TimeoutError: Error {}
242+
243+
internal func append(_ element: Result<Element, Error>) {
244+
self.condition.lock()
245+
self.buffer.append(element)
246+
self.condition.unlock(withValue: true)
247+
}
248+
249+
internal var isEmpty: Bool {
250+
self.condition.lock()
251+
defer { self.condition.unlock() }
252+
return self.buffer.isEmpty
253+
}
254+
255+
internal var count: Int {
256+
self.condition.lock()
257+
defer { self.condition.unlock() }
258+
return self.buffer.count
259+
}
260+
261+
internal func popFirst(deadline: NIODeadline) throws -> Element {
262+
let secondsUntilDeath = deadline - NIODeadline.now()
263+
guard self.condition.lock(whenValue: true,
264+
timeoutSeconds: .init(secondsUntilDeath.nanoseconds / 1_000_000_000)) else {
265+
throw TimeoutError()
266+
}
267+
let first = self.buffer.removeFirst()
268+
self.condition.unlock(withValue: !self.buffer.isEmpty)
269+
return try first.get()
270+
}
271+
}
272+
}

0 commit comments

Comments
 (0)