Skip to content

Commit 8c2959d

Browse files
fabianfettdnadoba
authored andcommitted
async/await
compile some more stuff Upload streaming works in tests more tests In progress
1 parent 60fc1d2 commit 8c2959d

File tree

7 files changed

+1715
-23
lines changed

7 files changed

+1715
-23
lines changed
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import struct Foundation.Data
16+
import Logging
17+
import NIO
18+
import NIOHTTP1
19+
20+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
21+
// HTTPClient.AsyncRequest
22+
struct AsyncRequest {
23+
public struct Body {
24+
internal enum Mode {
25+
case asyncSequence((ByteBufferAllocator) async throws -> (IOData?))
26+
// case asyncSequenceFactory(() -> Mode) // typealias (ByteBufferAllocator) async throws -> IOData?
27+
case sequence((ByteBufferAllocator) throws -> ByteBuffer)
28+
case byteBuffer(ByteBuffer)
29+
}
30+
31+
var mode: Mode
32+
33+
private init(_ mode: Mode) {
34+
self.mode = mode
35+
}
36+
37+
static func byteBuffer(_ byteBuffer: ByteBuffer) -> Body {
38+
self.init(.byteBuffer(byteBuffer))
39+
}
40+
41+
static func bytes<S: Sequence>(_ sequence: S) -> Body where S.Element == UInt8 {
42+
self.init(.asyncSequence { allocator in
43+
if let buffer = sequence.withContiguousStorageIfAvailable({ allocator.buffer(bytes: $0) }) {
44+
// fastpath
45+
return .byteBuffer(buffer)
46+
}
47+
// potentially really slow path
48+
return .byteBuffer(allocator.buffer(bytes: sequence))
49+
})
50+
}
51+
52+
static func stream<S: AsyncSequence>(_ sequence: S) -> Body where S.Element == ByteBuffer {
53+
var iterator = sequence.makeAsyncIterator()
54+
let body = self.init(.asyncSequence { _ -> IOData? in
55+
if let byteBuffer = try await iterator.next() {
56+
return .byteBuffer(byteBuffer)
57+
}
58+
return nil
59+
})
60+
return body
61+
}
62+
63+
static func stream<S: AsyncSequence>(_ sequence: S) -> Body where S.Element == FileRegion {
64+
var iterator = sequence.makeAsyncIterator()
65+
let body = self.init(.asyncSequence { _ in
66+
if let fileRegion = try await iterator.next() {
67+
return .fileRegion(fileRegion)
68+
}
69+
return .none
70+
})
71+
return body
72+
}
73+
74+
static func stream<S: AsyncSequence>(_ sequence: S) -> Body where S.Element == UInt8 {
75+
var iterator = sequence.makeAsyncIterator()
76+
let body = self.init(.asyncSequence { allocator -> IOData? in
77+
var buffer = allocator.buffer(capacity: 1024) // TODO: Magic number
78+
while buffer.writableBytes > 0, let byte = try await iterator.next() {
79+
buffer.writeInteger(byte)
80+
}
81+
if buffer.readableBytes > 0 {
82+
return .byteBuffer(buffer)
83+
}
84+
return nil
85+
})
86+
return body
87+
}
88+
}
89+
90+
var url: String // TBD: URL?
91+
var method: HTTPMethod
92+
var headers: HTTPHeaders
93+
94+
var body: Body?
95+
96+
init(url: String) {
97+
self.url = url
98+
self.method = .GET
99+
self.headers = .init()
100+
self.body = .none
101+
}
102+
}
103+
104+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
105+
public struct AsyncResponse {
106+
public var version: HTTPVersion
107+
public var status: HTTPResponseStatus
108+
public var headers: HTTPHeaders
109+
public var body: Body
110+
111+
public struct Body {
112+
private let bag: AsyncRequestBag
113+
114+
fileprivate init(_ bag: AsyncRequestBag) {
115+
self.bag = bag
116+
}
117+
}
118+
119+
init(
120+
bag: AsyncRequestBag,
121+
version: HTTPVersion,
122+
status: HTTPResponseStatus,
123+
headers: HTTPHeaders
124+
) {
125+
self.body = .init(bag)
126+
self.version = version
127+
self.status = status
128+
self.headers = headers
129+
}
130+
}
131+
132+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
133+
extension AsyncResponse.Body: AsyncSequence {
134+
public typealias Element = ByteBuffer
135+
public typealias AsyncIterator = Iterator
136+
137+
public struct Iterator: AsyncIteratorProtocol {
138+
public typealias Element = ByteBuffer
139+
140+
private let stream: IteratorStream
141+
142+
fileprivate init(stream: IteratorStream) {
143+
self.stream = stream
144+
}
145+
146+
public func next() async throws -> ByteBuffer? {
147+
try await self.stream.next()
148+
}
149+
}
150+
151+
public func makeAsyncIterator() -> Iterator {
152+
Iterator(stream: IteratorStream(bag: self.bag))
153+
}
154+
155+
internal class IteratorStream {
156+
struct ID: Hashable {
157+
private let objectID: ObjectIdentifier
158+
159+
init(_ object: IteratorStream) {
160+
self.objectID = ObjectIdentifier(object)
161+
}
162+
}
163+
164+
var id: ID { ID(self) }
165+
private let bag: AsyncRequestBag
166+
167+
init(bag: AsyncRequestBag) {
168+
self.bag = bag
169+
}
170+
171+
deinit {
172+
self.bag.cancelResponseStream(streamID: self.id)
173+
}
174+
175+
func next() async throws -> ByteBuffer? {
176+
try await self.bag.nextResponsePart(streamID: self.id)
177+
}
178+
}
179+
}
180+
181+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
182+
extension HTTPClient {
183+
func execute(_ request: AsyncRequest, deadline: NIODeadline, logger: Logger) async throws -> AsyncResponse {
184+
let bag = AsyncRequestBag(
185+
request: request,
186+
logger: logger,
187+
connectionDeadline: .now() + .seconds(10),
188+
eventLoopPreference: .indifferent
189+
)
190+
191+
return try await withTaskCancellationHandler {
192+
bag.cancel()
193+
} operation: { () -> AsyncResponse in
194+
// first register the completion
195+
async let result = bag.result()
196+
197+
// second throw it onto the connection pool for execution
198+
// self.pool.execute(bag)
199+
200+
// third await result
201+
return try await result
202+
}
203+
}
204+
}
205+
206+
// redirect!
207+
208+
// connection pool manager -> shutdown
209+
// config objects ... client.config (CoW struct)
210+
// request

0 commit comments

Comments
 (0)