Skip to content

Commit 7728066

Browse files
authored
Added header when reporting failing invocations or initializations #116 (#128)
* allow the users of client to provide headers * add initialization error header * add error header to failed invocations * fix external setting of ip and port via config * add tests
1 parent d1a9276 commit 7728066

File tree

5 files changed

+134
-11
lines changed

5 files changed

+134
-11
lines changed

Package.swift

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ let package = Package(
3535
]),
3636
.testTarget(name: "AWSLambdaRuntimeCoreTests", dependencies: [
3737
.byName(name: "AWSLambdaRuntimeCore"),
38+
.product(name: "NIOTestUtils", package: "swift-nio"),
39+
.product(name: "NIOFoundationCompat", package: "swift-nio"),
3840
]),
3941
.testTarget(name: "AWSLambdaRuntimeTests", dependencies: [
4042
.byName(name: "AWSLambdaRuntimeCore"),

Sources/AWSLambdaRuntimeCore/HTTPClient.swift

+4-6
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,25 @@ internal final class HTTPClient {
2727
private var state = State.disconnected
2828
private var executing = false
2929

30-
private static let headers = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")])
31-
3230
init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) {
3331
self.eventLoop = eventLoop
3432
self.configuration = configuration
3533
self.targetHost = "\(self.configuration.ip):\(self.configuration.port)"
3634
}
3735

38-
func get(url: String, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
36+
func get(url: String, headers: HTTPHeaders, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
3937
self.execute(Request(targetHost: self.targetHost,
4038
url: url,
4139
method: .GET,
42-
headers: HTTPClient.headers,
40+
headers: headers,
4341
timeout: timeout ?? self.configuration.requestTimeout))
4442
}
4543

46-
func post(url: String, body: ByteBuffer?, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
44+
func post(url: String, headers: HTTPHeaders, body: ByteBuffer?, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
4745
self.execute(Request(targetHost: self.targetHost,
4846
url: url,
4947
method: .POST,
50-
headers: HTTPClient.headers,
48+
headers: headers,
5149
body: body,
5250
timeout: timeout ?? self.configuration.requestTimeout))
5351
}

Sources/AWSLambdaRuntimeCore/LambdaConfiguration.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ extension Lambda {
6767
let keepAlive: Bool
6868
let requestTimeout: TimeAmount?
6969

70-
init(baseURL: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
71-
let ipPort = env("AWS_LAMBDA_RUNTIME_API")?.split(separator: ":") ?? ["127.0.0.1", "7000"]
70+
init(address: String? = nil, keepAlive: Bool? = nil, requestTimeout: TimeAmount? = nil) {
71+
let ipPort = (address ?? env("AWS_LAMBDA_RUNTIME_API"))?.split(separator: ":") ?? ["127.0.0.1", "7000"]
7272
guard ipPort.count == 2, let port = Int(ipPort[1]) else {
7373
preconditionFailure("invalid ip+port configuration \(ipPort)")
7474
}

Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift

+17-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ extension Lambda {
3636
func getNextInvocation(logger: Logger) -> EventLoopFuture<(Invocation, ByteBuffer)> {
3737
let url = Consts.invocationURLPrefix + Consts.getNextInvocationURLSuffix
3838
logger.debug("requesting work from lambda runtime engine using \(url)")
39-
return self.httpClient.get(url: url).flatMapThrowing { response in
39+
return self.httpClient.get(url: url, headers: RuntimeClient.defaultHeaders).flatMapThrowing { response in
4040
guard response.status == .ok else {
4141
throw RuntimeError.badStatusCode(response.status)
4242
}
@@ -61,19 +61,23 @@ extension Lambda {
6161
func reportResults(logger: Logger, invocation: Invocation, result: Result<ByteBuffer?, Error>) -> EventLoopFuture<Void> {
6262
var url = Consts.invocationURLPrefix + "/" + invocation.requestID
6363
var body: ByteBuffer?
64+
let headers: HTTPHeaders
65+
6466
switch result {
6567
case .success(let buffer):
6668
url += Consts.postResponseURLSuffix
6769
body = buffer
70+
headers = RuntimeClient.defaultHeaders
6871
case .failure(let error):
6972
url += Consts.postErrorURLSuffix
7073
let errorResponse = ErrorResponse(errorType: Consts.functionError, errorMessage: "\(error)")
7174
let bytes = errorResponse.toJSONBytes()
7275
body = self.allocator.buffer(capacity: bytes.count)
7376
body!.writeBytes(bytes)
77+
headers = RuntimeClient.errorHeaders
7478
}
7579
logger.debug("reporting results to lambda runtime engine using \(url)")
76-
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
80+
return self.httpClient.post(url: url, headers: headers, body: body).flatMapThrowing { response in
7781
guard response.status == .accepted else {
7882
throw RuntimeError.badStatusCode(response.status)
7983
}
@@ -98,7 +102,7 @@ extension Lambda {
98102
var body = self.allocator.buffer(capacity: bytes.count)
99103
body.writeBytes(bytes)
100104
logger.warning("reporting initialization error to lambda runtime engine using \(url)")
101-
return self.httpClient.post(url: url, body: body).flatMapThrowing { response in
105+
return self.httpClient.post(url: url, headers: RuntimeClient.errorHeaders, body: body).flatMapThrowing { response in
102106
guard response.status == .accepted else {
103107
throw RuntimeError.badStatusCode(response.status)
104108
}
@@ -186,3 +190,13 @@ extension Lambda {
186190
}
187191
}
188192
}
193+
194+
extension Lambda.RuntimeClient {
195+
internal static let defaultHeaders = HTTPHeaders([("user-agent", "Swift-Lambda/Unknown")])
196+
197+
/// These headers must be sent along an invocation or initialization error report
198+
internal static let errorHeaders = HTTPHeaders([
199+
("user-agent", "Swift-Lambda/Unknown"),
200+
("lambda-runtime-function-error-type", "Unhandled"),
201+
])
202+
}

Tests/AWSLambdaRuntimeCoreTests/LambdaRuntimeClientTest.swift

+109
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
@testable import AWSLambdaRuntimeCore
16+
import Logging
17+
import NIO
18+
import NIOFoundationCompat
19+
import NIOHTTP1
20+
import NIOTestUtils
1621
import XCTest
1722

1823
class LambdaRuntimeClientTest: XCTestCase {
@@ -209,6 +214,110 @@ class LambdaRuntimeClientTest: XCTestCase {
209214
XCTAssertEqual(#"{"errorType":"error","errorMessage":"🥑👨‍👩‍👧‍👧👩‍👩‍👧‍👧👨‍👨‍👧"}"#, String(decoding: emojiBytes, as: Unicode.UTF8.self))
210215
}
211216

217+
func testInitializationErrorReport() {
218+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
219+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
220+
221+
let server = NIOHTTP1TestServer(group: eventLoopGroup)
222+
defer { XCTAssertNoThrow(try server.stop()) }
223+
224+
let logger = Logger(label: "TestLogger")
225+
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
226+
let result = client.reportInitializationError(logger: logger, error: TestError("boom"))
227+
228+
var inboundHeader: HTTPServerRequestPart?
229+
XCTAssertNoThrow(inboundHeader = try server.readInbound())
230+
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return }
231+
XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
232+
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])
233+
234+
var inboundBody: HTTPServerRequestPart?
235+
XCTAssertNoThrow(inboundBody = try server.readInbound())
236+
guard case .body(let body) = try? XCTUnwrap(inboundBody) else { XCTFail("Expected body after head"); return }
237+
XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom")
238+
239+
XCTAssertEqual(try server.readInbound(), .end(nil))
240+
241+
XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
242+
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
243+
XCTAssertNoThrow(try result.wait())
244+
}
245+
246+
func testInvocationErrorReport() {
247+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
248+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
249+
250+
let server = NIOHTTP1TestServer(group: eventLoopGroup)
251+
defer { XCTAssertNoThrow(try server.stop()) }
252+
253+
let logger = Logger(label: "TestLogger")
254+
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
255+
256+
let header = HTTPHeaders([
257+
(AmazonHeaders.requestID, "test"),
258+
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)),
259+
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
260+
(AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"),
261+
])
262+
var inv: Lambda.Invocation?
263+
XCTAssertNoThrow(inv = try Lambda.Invocation(headers: header))
264+
guard let invocation = inv else { return }
265+
266+
let result = client.reportResults(logger: logger, invocation: invocation, result: Result.failure(TestError("boom")))
267+
268+
var inboundHeader: HTTPServerRequestPart?
269+
XCTAssertNoThrow(inboundHeader = try server.readInbound())
270+
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return }
271+
XCTAssertEqual(head.headers["lambda-runtime-function-error-type"], ["Unhandled"])
272+
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])
273+
274+
var inboundBody: HTTPServerRequestPart?
275+
XCTAssertNoThrow(inboundBody = try server.readInbound())
276+
guard case .body(let body) = try? XCTUnwrap(inboundBody) else { XCTFail("Expected body after head"); return }
277+
XCTAssertEqual(try JSONDecoder().decode(ErrorResponse.self, from: body).errorMessage, "boom")
278+
279+
XCTAssertEqual(try server.readInbound(), .end(nil))
280+
281+
XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
282+
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
283+
XCTAssertNoThrow(try result.wait())
284+
}
285+
286+
func testInvocationSuccessResponse() {
287+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
288+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
289+
290+
let server = NIOHTTP1TestServer(group: eventLoopGroup)
291+
defer { XCTAssertNoThrow(try server.stop()) }
292+
293+
let logger = Logger(label: "TestLogger")
294+
let client = Lambda.RuntimeClient(eventLoop: eventLoopGroup.next(), configuration: .init(address: "127.0.0.1:\(server.serverPort)"))
295+
296+
let header = HTTPHeaders([
297+
(AmazonHeaders.requestID, "test"),
298+
(AmazonHeaders.deadline, String(Date(timeIntervalSinceNow: 60).millisSinceEpoch)),
299+
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime"),
300+
(AmazonHeaders.traceID, "Root=1-5bef4de7-ad49b0e87f6ef6c87fc2e700;Parent=9a9197af755a6419;Sampled=1"),
301+
])
302+
var inv: Lambda.Invocation?
303+
XCTAssertNoThrow(inv = try Lambda.Invocation(headers: header))
304+
guard let invocation = inv else { return }
305+
306+
let result = client.reportResults(logger: logger, invocation: invocation, result: Result.success(nil))
307+
308+
var inboundHeader: HTTPServerRequestPart?
309+
XCTAssertNoThrow(inboundHeader = try server.readInbound())
310+
guard case .head(let head) = try? XCTUnwrap(inboundHeader) else { XCTFail("Expected to get a head first"); return }
311+
XCTAssertFalse(head.headers.contains(name: "lambda-runtime-function-error-type"))
312+
XCTAssertEqual(head.headers["user-agent"], ["Swift-Lambda/Unknown"])
313+
314+
XCTAssertEqual(try server.readInbound(), .end(nil))
315+
316+
XCTAssertNoThrow(try server.writeOutbound(.head(.init(version: .init(major: 1, minor: 1), status: .accepted))))
317+
XCTAssertNoThrow(try server.writeOutbound(.end(nil)))
318+
XCTAssertNoThrow(try result.wait())
319+
}
320+
212321
class Behavior: LambdaServerBehavior {
213322
var state = 0
214323

0 commit comments

Comments
 (0)