-
Notifications
You must be signed in to change notification settings - Fork 113
/
Copy pathControlPlaneRequestEncoder.swift
130 lines (111 loc) · 5.05 KB
/
ControlPlaneRequestEncoder.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2021 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
struct ControlPlaneRequestEncoder: _EmittingChannelHandler {
typealias OutboundOut = ByteBuffer
private var host: String
private var byteBuffer: ByteBuffer!
init(host: String) {
self.host = host
}
mutating func writeRequest(
_ request: ControlPlaneRequest,
context: ChannelHandlerContext,
promise: EventLoopPromise<Void>?
) {
self.byteBuffer.clear(minimumCapacity: self.byteBuffer.storageCapacity)
switch request {
case .next:
self.byteBuffer.writeString(.nextInvocationRequestLine)
self.byteBuffer.writeHostHeader(host: self.host)
self.byteBuffer.writeString(.userAgentHeader)
self.byteBuffer.writeString(.CRLF) // end of head
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
context.flush()
case .invocationResponse(let requestID, let payload):
let contentLength = payload?.readableBytes ?? 0
self.byteBuffer.writeInvocationResultRequestLine(requestID)
self.byteBuffer.writeHostHeader(host: self.host)
self.byteBuffer.writeString(.userAgentHeader)
self.byteBuffer.writeContentLengthHeader(length: contentLength)
self.byteBuffer.writeString(.CRLF) // end of head
if let payload = payload, contentLength > 0 {
context.write(self.wrapOutboundOut(self.byteBuffer), promise: nil)
context.write(self.wrapOutboundOut(payload), promise: promise)
} else {
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
}
context.flush()
case .invocationError(let requestID, let errorMessage):
let payload = errorMessage.toJSONBytes()
self.byteBuffer.writeInvocationErrorRequestLine(requestID)
self.byteBuffer.writeContentLengthHeader(length: payload.count)
self.byteBuffer.writeHostHeader(host: self.host)
self.byteBuffer.writeString(.userAgentHeader)
self.byteBuffer.writeString(.unhandledErrorHeader)
self.byteBuffer.writeString(.CRLF) // end of head
self.byteBuffer.writeBytes(payload)
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
context.flush()
case .initializationError(let errorMessage):
let payload = errorMessage.toJSONBytes()
self.byteBuffer.writeString(.runtimeInitErrorRequestLine)
self.byteBuffer.writeContentLengthHeader(length: payload.count)
self.byteBuffer.writeHostHeader(host: self.host)
self.byteBuffer.writeString(.userAgentHeader)
self.byteBuffer.writeString(.unhandledErrorHeader)
self.byteBuffer.writeString(.CRLF) // end of head
self.byteBuffer.writeBytes(payload)
context.write(self.wrapOutboundOut(self.byteBuffer), promise: promise)
context.flush()
}
}
mutating func writerAdded(context: ChannelHandlerContext) {
self.byteBuffer = context.channel.allocator.buffer(capacity: 256)
}
mutating func writerRemoved(context: ChannelHandlerContext) {
self.byteBuffer = nil
}
}
extension String {
static let CRLF: String = "\r\n"
static let userAgentHeader: String = "user-agent: Swift-Lambda/Unknown\r\n"
static let unhandledErrorHeader: String = "lambda-runtime-function-error-type: Unhandled\r\n"
static let nextInvocationRequestLine: String =
"GET /2018-06-01/runtime/invocation/next HTTP/1.1\r\n"
static let runtimeInitErrorRequestLine: String =
"POST /2018-06-01/runtime/init/error HTTP/1.1\r\n"
}
extension ByteBuffer {
fileprivate mutating func writeInvocationResultRequestLine(_ requestID: String) {
self.writeString("POST /2018-06-01/runtime/invocation/")
self.writeString(requestID)
self.writeString("/response HTTP/1.1\r\n")
}
fileprivate mutating func writeInvocationErrorRequestLine(_ requestID: String) {
self.writeString("POST /2018-06-01/runtime/invocation/")
self.writeString(requestID)
self.writeString("/error HTTP/1.1\r\n")
}
fileprivate mutating func writeHostHeader(host: String) {
self.writeString("host: ")
self.writeString(host)
self.writeString(.CRLF)
}
fileprivate mutating func writeContentLengthHeader(length: Int) {
self.writeString("content-length: ")
self.writeString("\(length)")
self.writeString(.CRLF)
}
}