From 8e755ee756abbc01e285ca2cba256155da327a6e Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 9 Jan 2024 17:49:26 +0000 Subject: [PATCH 01/14] Add tracing interceptors --- Package.swift | 25 ++++++- .../ClientTracingInterceptor.swift | 62 +++++++++++++++++ .../ServerTracingInterceptor.swift | 64 +++++++++++++++++ .../ClientRPCExecutorTestHarness.swift | 17 +++-- .../Call/TracingInterceptorTests.swift | 69 +++++++++++++++++++ .../Call/TracingTestsUtils.swift | 64 +++++++++++++++++ .../TracingInterceptorTests.swift | 26 +++++++ 7 files changed, 322 insertions(+), 5 deletions(-) create mode 100644 Sources/GRPCInterceptors/ClientTracingInterceptor.swift create mode 100644 Sources/GRPCInterceptors/ServerTracingInterceptor.swift create mode 100644 Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift create mode 100644 Tests/GRPCCoreTests/Call/TracingTestsUtils.swift create mode 100644 Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift diff --git a/Package.swift b/Package.swift index 201ba8e56..15d65ff82 100644 --- a/Package.swift +++ b/Package.swift @@ -72,6 +72,10 @@ let packageDependencies: [Package.Dependency] = [ url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0" ), + .package( + url: "https://github.com/apple/swift-distributed-tracing.git", + from: "1.0.0" + ), ].appending( .package( url: "https://github.com/apple/swift-nio-ssl.git", @@ -131,9 +135,11 @@ extension Target.Dependency { ) static let dequeModule: Self = .product(name: "DequeModule", package: "swift-collections") static let atomics: Self = .product(name: "Atomics", package: "swift-atomics") + static let tracing: Self = .product(name: "Tracing", package: "swift-distributed-tracing") static let grpcCore: Self = .target(name: "GRPCCore") static let grpcInProcessTransport: Self = .target(name: "GRPCInProcessTransport") + static let grpcInterceptors: Self = .target(name: "GRPCInterceptors") static let grpcHTTP2Core: Self = .target(name: "GRPCHTTP2Core") static let grpcHTTP2TransportNIOPosix: Self = .target(name: "GRPCHTTP2TransportNIOPosix") static let grpcHTTP2TransportNIOTransportServices: Self = .target(name: "GRPCHTTP2TransportNIOTransportServices") @@ -181,6 +187,14 @@ extension Target { ] ) + static let grpcInterceptors: Target = .target( + name: "GRPCInterceptors", + dependencies: [ + .grpcCore, + .tracing + ] + ) + static let grpcHTTP2Core: Target = .target( name: "GRPCHTTP2Core", dependencies: [ @@ -265,6 +279,7 @@ extension Target { dependencies: [ .grpcCore, .grpcInProcessTransport, + .grpcInterceptors, .dequeModule, .atomics ] @@ -273,11 +288,17 @@ extension Target { static let grpcInProcessTransportTests: Target = .testTarget( name: "GRPCInProcessTransportTests", dependencies: [ - .grpcCore, .grpcInProcessTransport, ] ) + static let grpcInterceptorsTests: Target = .testTarget( + name: "GRPCInterceptorsTests", + dependencies: [ + .grpcInterceptors + ] + ) + static let grpcHTTP2CoreTests: Target = .testTarget( name: "GRPCHTTP2CoreTests", dependencies: [ @@ -638,6 +659,7 @@ let package = Package( .grpcCore, .grpcInProcessTransport, .grpcCodeGen, + .grpcInterceptors, .grpcHTTP2Core, .grpcHTTP2TransportNIOPosix, .grpcHTTP2TransportNIOTransportServices, @@ -646,6 +668,7 @@ let package = Package( .grpcCoreTests, .grpcInProcessTransportTests, .grpcCodeGenTests, + .grpcInterceptorsTests .grpcHTTP2CoreTests, .grpcHTTP2TransportNIOPosixTests, .grpcHTTP2TransportNIOTransportServicesTests diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift new file mode 100644 index 000000000..fe58dc188 --- /dev/null +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -0,0 +1,62 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import Tracing + +/// A client interceptor that injects tracing information into the request. +/// +/// The tracing information will be taken from the current `ServiceContext`, and injected into the request's +/// metadata, to be picked up by the server-side ``ServerTracingInterceptor``. +/// For more information, refer to the documentation for `swift-distributed-tracing`. +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +public struct ClientTracingInterceptor: ClientInterceptor { + + /// Create a new instance of a ``ClientTracingInterceptor``. + public init() {} + + /// This interceptor will inject as the request's metadata whatever `ServiceContext` key/value pairs + /// have been made available by the tracing implementation bootstrapped in your application. + /// + /// Which key/value pairs are injected will depend on the specific tracing implementation + /// that has been configured when bootstrapping `swift-distributed-tracing` in your application. + public func intercept( + request: ClientRequest.Stream, + context: ClientInterceptorContext, + next: @Sendable (ClientRequest.Stream, ClientInterceptorContext) async throws -> ClientResponse.Stream + ) async throws -> ClientResponse.Stream where Input: Sendable, Output: Sendable { + var request = request + if let serviceContext = ServiceContext.current { + InstrumentationSystem.instrument.inject( + serviceContext, + into: &request, + using: ClientRequestInjector() + ) + } + return try await next(request, context) + } +} + +/// An injector responsible for injecting the required instrumentation keys from the `ServiceContext` into +/// the request metadata. +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +struct ClientRequestInjector: Instrumentation.Injector { + typealias Carrier = ClientRequest.Stream + + func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { + carrier.metadata.addString(value, forKey: key) + } +} diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift new file mode 100644 index 000000000..e4ccb7ccb --- /dev/null +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -0,0 +1,64 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCCore +import Tracing + +/// A server interceptor that extracts tracing information from the request. +/// +/// The extracted tracing information will be made available to user code via the current `ServiceContext`. +/// For more information, refer to the documentation for `swift-distributed-tracing`. +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +public struct ServerTracingInterceptor: ServerInterceptor { + + /// Create a new instance of a ``ServerTracingInterceptor``. + public init() {} + + /// This interceptor will extract whatever `ServiceContext` key/value pairs have been inserted into the + /// request's metadata, and will make them available to user code via the `ServiceContext/current` + /// context. + /// + /// Which key/value pairs are extracted and made available will depend on the specific tracing implementation + /// that has been configured when bootstrapping `swift-distributed-tracing` in your application. + public func intercept( + request: ServerRequest.Stream, + context: ServerInterceptorContext, + next: @Sendable (ServerRequest.Stream, ServerInterceptorContext) async throws -> ServerResponse.Stream + ) async throws -> ServerResponse.Stream where Input : Sendable, Output : Sendable { + var serviceContext = ServiceContext.topLevel + InstrumentationSystem.instrument.extract( + request, + into: &serviceContext, + using: ServerRequestExtractor() + ) + + return try await ServiceContext.withValue(serviceContext) { + try await next(request, context) + } + } +} + +/// An extractor responsible for extracting the required instrumentation keys from request metadata. +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +struct ServerRequestExtractor: Instrumentation.Extractor { + typealias Carrier = ServerRequest.Stream + + func extract(key: String, from carrier: Carrier) -> String? { + var values = carrier.metadata[stringValues: key].makeIterator() + // There should only be one value for each key. If more, pick just one. + return values.next() + } +} diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift index 9aacff2d7..3e7aa6159 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift @@ -69,11 +69,13 @@ struct ClientRPCExecutorTestHarness { func unary( request: ClientRequest.Single<[UInt8]>, configuration: MethodConfiguration? = nil, + interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( request: ClientRequest.Stream(single: request), - configuration: configuration + configuration: configuration, + interceptors: interceptors ) { response in try await handler(ClientResponse.Single(stream: response)) } @@ -82,11 +84,13 @@ struct ClientRPCExecutorTestHarness { func clientStreaming( request: ClientRequest.Stream<[UInt8]>, configuration: MethodConfiguration? = nil, + interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( request: request, - configuration: configuration + configuration: configuration, + interceptors: interceptors ) { response in try await handler(ClientResponse.Single(stream: response)) } @@ -95,11 +99,13 @@ struct ClientRPCExecutorTestHarness { func serverStreaming( request: ClientRequest.Single<[UInt8]>, configuration: MethodConfiguration? = nil, + interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( request: ClientRequest.Stream(single: request), - configuration: configuration + configuration: configuration, + interceptors: interceptors ) { response in try await handler(response) } @@ -108,6 +114,7 @@ struct ClientRPCExecutorTestHarness { func bidirectional( request: ClientRequest.Stream<[UInt8]>, configuration: MethodConfiguration? = nil, + interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.execute( @@ -115,6 +122,7 @@ struct ClientRPCExecutorTestHarness { serializer: IdentitySerializer(), deserializer: IdentityDeserializer(), configuration: configuration, + interceptors: interceptors, handler: handler ) } @@ -124,6 +132,7 @@ struct ClientRPCExecutorTestHarness { serializer: some MessageSerializer, deserializer: some MessageDeserializer, configuration: MethodConfiguration?, + interceptors: [any ClientInterceptor], handler: @escaping @Sendable (ClientResponse.Stream) async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in @@ -160,7 +169,7 @@ struct ClientRPCExecutorTestHarness { serializer: serializer, deserializer: deserializer, transport: self.clientTransport, - interceptors: [], + interceptors: interceptors, handler: handler ) diff --git a/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift b/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift new file mode 100644 index 000000000..0c47f88b1 --- /dev/null +++ b/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift @@ -0,0 +1,69 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GRPCInterceptors +import Tracing +import GRPCCore +import XCTest + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class ClientTracingInterceptorTests: TracingTestCase { + func testClientInterceptor() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + serviceContext.traceID = traceIDString + try await ServiceContext.withValue(serviceContext) { + let tester = ClientRPCExecutorTestHarness(server: .echo) + try await tester.unary( + request: ClientRequest.Single(message: [1, 2, 3], metadata: ["foo": "bar"]), + interceptors: [ClientTracingInterceptor()] + ) { response in + XCTAssertEqual( + response.metadata, + [ + "foo": "bar", + "trace-id": "\(traceIDString)" + ] + ) + XCTAssertEqual(try response.message, [1, 2, 3]) + } + + XCTAssertEqual(tester.clientStreamsOpened, 1) + XCTAssertEqual(tester.serverStreamsAccepted, 1) + } + } + + func testServerInterceptor() async throws { + let harness = ServerRPCExecutorTestHarness(interceptors: [ServerTracingInterceptor()]) + try await harness.execute( + deserializer: IdentityDeserializer(), + serializer: IdentitySerializer() + ) { request in + guard let serviceContext = ServiceContext.current else { + XCTFail("There should be a service context present.") + return .init(error: .init(status: .init(code: .failedPrecondition, message: "There should be a service context present."))!) + } + + let traceID = serviceContext.traceID + XCTAssertEqual("some-trace-id", traceID) + + return .init(accepted: .success(.init(metadata: [], producer: { _ in [] }))) + } producer: { inbound in + try await inbound.write(.metadata(["trace-id": "some-trace-id"])) + inbound.finish() + } consumer: { _ in } + } +} diff --git a/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift b/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift new file mode 100644 index 000000000..44b11ad52 --- /dev/null +++ b/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift @@ -0,0 +1,64 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Tracing +import XCTest + +class TracingTestCase: XCTestCase { + override class func setUp() { + InstrumentationSystem.bootstrap(TestTracer()) + } +} + +public struct TestTracer: Instrumentation.Instrument { + public func extract( + _ carrier: Carrier, + into context: inout ServiceContextModule.ServiceContext, + using extractor: Extract + ) where Carrier == Extract.Carrier, Extract: Instrumentation.Extractor { + let traceID = extractor.extract(key: TraceID.keyName, from: carrier) + context[TraceID.self] = traceID + } + + public func inject( + _ context: ServiceContextModule.ServiceContext, + into carrier: inout Carrier, + using injector: Inject + ) where Carrier == Inject.Carrier, Inject: Instrumentation.Injector { + if let traceID = context.traceID { + injector.inject(traceID, forKey: TraceID.keyName, into: &carrier) + } + } +} + + +internal enum TraceID: ServiceContextModule.ServiceContextKey { + typealias Value = String + + static let keyName = "trace-id" +} + +extension ServiceContext { + var traceID: String? { + get { + self[TraceID.self] + } + set { + self[TraceID.self] = newValue + } + } +} + diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift new file mode 100644 index 000000000..b63ccd7fa --- /dev/null +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -0,0 +1,26 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import GRPCCore +import Tracing +import XCTest + +@testable import GRPCInterceptors + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class TracingInterceptorTests: XCTestCase { + // TODO: move interceptor tests from GRPCCoreTests to here once we've + // split out test utils/harness into their own target. +} From 0df458be34985088d98275a642e7cffd8370361d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 10 Jan 2024 15:58:57 +0000 Subject: [PATCH 02/14] Formatting --- .../ClientTracingInterceptor.swift | 11 +++++----- .../ServerTracingInterceptor.swift | 13 ++++++----- .../Call/TracingInterceptorTests.swift | 22 +++++++++++++------ .../Call/TracingTestsUtils.swift | 6 ++--- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index fe58dc188..5e071a5f3 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -24,11 +24,11 @@ import Tracing /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ClientTracingInterceptor: ClientInterceptor { - + /// Create a new instance of a ``ClientTracingInterceptor``. public init() {} - - /// This interceptor will inject as the request's metadata whatever `ServiceContext` key/value pairs + + /// This interceptor will inject as the request's metadata whatever `ServiceContext` key/value pairs /// have been made available by the tracing implementation bootstrapped in your application. /// /// Which key/value pairs are injected will depend on the specific tracing implementation @@ -36,7 +36,8 @@ public struct ClientTracingInterceptor: ClientInterceptor { public func intercept( request: ClientRequest.Stream, context: ClientInterceptorContext, - next: @Sendable (ClientRequest.Stream, ClientInterceptorContext) async throws -> ClientResponse.Stream + next: @Sendable (ClientRequest.Stream, ClientInterceptorContext) async throws -> + ClientResponse.Stream ) async throws -> ClientResponse.Stream where Input: Sendable, Output: Sendable { var request = request if let serviceContext = ServiceContext.current { @@ -55,7 +56,7 @@ public struct ClientTracingInterceptor: ClientInterceptor { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) struct ClientRequestInjector: Instrumentation.Injector { typealias Carrier = ClientRequest.Stream - + func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { carrier.metadata.addString(value, forKey: key) } diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift index e4ccb7ccb..8d973e3b3 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -23,10 +23,10 @@ import Tracing /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ServerTracingInterceptor: ServerInterceptor { - + /// Create a new instance of a ``ServerTracingInterceptor``. public init() {} - + /// This interceptor will extract whatever `ServiceContext` key/value pairs have been inserted into the /// request's metadata, and will make them available to user code via the `ServiceContext/current` /// context. @@ -36,15 +36,16 @@ public struct ServerTracingInterceptor: ServerInterceptor { public func intercept( request: ServerRequest.Stream, context: ServerInterceptorContext, - next: @Sendable (ServerRequest.Stream, ServerInterceptorContext) async throws -> ServerResponse.Stream - ) async throws -> ServerResponse.Stream where Input : Sendable, Output : Sendable { + next: @Sendable (ServerRequest.Stream, ServerInterceptorContext) async throws -> + ServerResponse.Stream + ) async throws -> ServerResponse.Stream where Input: Sendable, Output: Sendable { var serviceContext = ServiceContext.topLevel InstrumentationSystem.instrument.extract( request, into: &serviceContext, using: ServerRequestExtractor() ) - + return try await ServiceContext.withValue(serviceContext) { try await next(request, context) } @@ -55,7 +56,7 @@ public struct ServerTracingInterceptor: ServerInterceptor { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) struct ServerRequestExtractor: Instrumentation.Extractor { typealias Carrier = ServerRequest.Stream - + func extract(key: String, from carrier: Carrier) -> String? { var values = carrier.metadata[stringValues: key].makeIterator() // There should only be one value for each key. If more, pick just one. diff --git a/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift b/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift index 0c47f88b1..ddd5f3548 100644 --- a/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift +++ b/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift @@ -14,9 +14,9 @@ * limitations under the License. */ +import GRPCCore import GRPCInterceptors import Tracing -import GRPCCore import XCTest @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) @@ -35,7 +35,7 @@ final class ClientTracingInterceptorTests: TracingTestCase { response.metadata, [ "foo": "bar", - "trace-id": "\(traceIDString)" + "trace-id": "\(traceIDString)", ] ) XCTAssertEqual(try response.message, [1, 2, 3]) @@ -45,7 +45,7 @@ final class ClientTracingInterceptorTests: TracingTestCase { XCTAssertEqual(tester.serverStreamsAccepted, 1) } } - + func testServerInterceptor() async throws { let harness = ServerRPCExecutorTestHarness(interceptors: [ServerTracingInterceptor()]) try await harness.execute( @@ -54,16 +54,24 @@ final class ClientTracingInterceptorTests: TracingTestCase { ) { request in guard let serviceContext = ServiceContext.current else { XCTFail("There should be a service context present.") - return .init(error: .init(status: .init(code: .failedPrecondition, message: "There should be a service context present."))!) + return .init( + error: .init( + status: .init( + code: .failedPrecondition, + message: "There should be a service context present." + ) + )! + ) } - + let traceID = serviceContext.traceID XCTAssertEqual("some-trace-id", traceID) - + return .init(accepted: .success(.init(metadata: [], producer: { _ in [] }))) } producer: { inbound in try await inbound.write(.metadata(["trace-id": "some-trace-id"])) inbound.finish() - } consumer: { _ in } + } consumer: { _ in + } } } diff --git a/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift b/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift index 44b11ad52..9e9ffbd2a 100644 --- a/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift +++ b/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift @@ -32,7 +32,7 @@ public struct TestTracer: Instrumentation.Instrument { let traceID = extractor.extract(key: TraceID.keyName, from: carrier) context[TraceID.self] = traceID } - + public func inject( _ context: ServiceContextModule.ServiceContext, into carrier: inout Carrier, @@ -44,10 +44,9 @@ public struct TestTracer: Instrumentation.Instrument { } } - internal enum TraceID: ServiceContextModule.ServiceContextKey { typealias Value = String - + static let keyName = "trace-id" } @@ -61,4 +60,3 @@ extension ServiceContext { } } } - From f242b4dc5d1da83ee6ee318721c77e927b347ab7 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 12 Jan 2024 13:24:29 +0000 Subject: [PATCH 03/14] Add spans --- .../ClientTracingInterceptor.swift | 21 +++-- .../ServerTracingInterceptor.swift | 13 ++- .../Call/TracingTestsUtils.swift | 83 ++++++++++++++++++- 3 files changed, 105 insertions(+), 12 deletions(-) diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index 5e071a5f3..1b587a605 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -40,14 +40,21 @@ public struct ClientTracingInterceptor: ClientInterceptor { ClientResponse.Stream ) async throws -> ClientResponse.Stream where Input: Sendable, Output: Sendable { var request = request - if let serviceContext = ServiceContext.current { - InstrumentationSystem.instrument.inject( - serviceContext, - into: &request, - using: ClientRequestInjector() - ) + let tracer = InstrumentationSystem.tracer + let serviceContext = ServiceContext.current ?? .topLevel + + tracer.inject( + serviceContext, + into: &request, + using: ClientRequestInjector() + ) + + return try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .client) { span in + span.addEvent("Sending request") + let response = try await next(request, context) + span.addEvent("Received response") + return response } - return try await next(request, context) } } diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift index 8d973e3b3..7e78c076a 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -40,14 +40,19 @@ public struct ServerTracingInterceptor: ServerInterceptor { ServerResponse.Stream ) async throws -> ServerResponse.Stream where Input: Sendable, Output: Sendable { var serviceContext = ServiceContext.topLevel - InstrumentationSystem.instrument.extract( + let tracer = InstrumentationSystem.tracer + + tracer.extract( request, into: &serviceContext, using: ServerRequestExtractor() ) - - return try await ServiceContext.withValue(serviceContext) { - try await next(request, context) + + return try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .server) { span in + span.addEvent("Received request") + let response = try await next(request, context) + span.addEvent("Sending response") + return response } } } diff --git a/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift b/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift index 9e9ffbd2a..238c00dad 100644 --- a/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift +++ b/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift @@ -23,7 +23,9 @@ class TracingTestCase: XCTestCase { } } -public struct TestTracer: Instrumentation.Instrument { +public struct TestTracer: Tracer { + public typealias Span = TestSpan + public func extract( _ carrier: Carrier, into context: inout ServiceContextModule.ServiceContext, @@ -42,6 +44,70 @@ public struct TestTracer: Instrumentation.Instrument { injector.inject(traceID, forKey: TraceID.keyName, into: &carrier) } } + + public func forceFlush() { + // no-op + } + + public func startSpan( + _ operationName: String, + context: @autoclosure () -> ServiceContext, + ofKind kind: SpanKind, + at instant: @autoclosure () -> Instant, + function: String, + file fileID: String, + line: UInt + ) -> TestSpan where Instant: TracerInstant { + TestSpan(context: context(), operationName: operationName) + } +} + +public class TestSpan: Span { + public var context: ServiceContextModule.ServiceContext + public var operationName: String + public var attributes: Tracing.SpanAttributes + public var isRecording: Bool + public private(set) var status: Tracing.SpanStatus? + public private(set) var events: [Tracing.SpanEvent] = [] + + public init( + context: ServiceContextModule.ServiceContext, + operationName: String, + attributes: Tracing.SpanAttributes = [:], + isRecording: Bool = true + ) { + self.context = context + self.operationName = operationName + self.attributes = attributes + self.isRecording = isRecording + } + + public func setStatus(_ status: Tracing.SpanStatus) { + self.status = status + } + + public func addEvent(_ event: Tracing.SpanEvent) { + self.events.append(event) + } + + public func recordError( + _ error: any Error, + attributes: Tracing.SpanAttributes, + at instant: @autoclosure () -> Instant + ) where Instant: Tracing.TracerInstant { + self.setStatus(.init( + code: .error, + message: "Error: \(error), attributes: \(attributes), at instant: \(instant())" + )) + } + + public func addLink(_ link: Tracing.SpanLink) { + self.context.spanLinks?.append(link) + } + + public func end(at instant: @autoclosure () -> Instant) where Instant: Tracing.TracerInstant { + self.setStatus(.init(code: .ok, message: "Ended at instant: \(instant())")) + } } internal enum TraceID: ServiceContextModule.ServiceContextKey { @@ -50,6 +116,12 @@ internal enum TraceID: ServiceContextModule.ServiceContextKey { static let keyName = "trace-id" } +internal enum ServiceContextSpanLinksKey: ServiceContextModule.ServiceContextKey { + typealias Value = [SpanLink] + + static let keyName = "span-links" +} + extension ServiceContext { var traceID: String? { get { @@ -59,4 +131,13 @@ extension ServiceContext { self[TraceID.self] = newValue } } + + var spanLinks: [SpanLink]? { + get { + self[ServiceContextSpanLinksKey.self] + } + set { + self[ServiceContextSpanLinksKey.self] = newValue + } + } } From fa9eabfd1e513255e630cdefd8b955b91b25815d Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Fri, 12 Jan 2024 13:33:33 +0000 Subject: [PATCH 04/14] PR changes --- Package.swift | 3 ++- Sources/GRPCInterceptors/ClientTracingInterceptor.swift | 7 ++++--- Sources/GRPCInterceptors/ServerTracingInterceptor.swift | 6 +++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/Package.swift b/Package.swift index 15d65ff82..51079c5f7 100644 --- a/Package.swift +++ b/Package.swift @@ -288,7 +288,8 @@ extension Target { static let grpcInProcessTransportTests: Target = .testTarget( name: "GRPCInProcessTransportTests", dependencies: [ - .grpcInProcessTransport, + .grpcCore, + .grpcInProcessTransport ] ) diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index 1b587a605..512bc8a87 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -19,8 +19,9 @@ import Tracing /// A client interceptor that injects tracing information into the request. /// -/// The tracing information will be taken from the current `ServiceContext`, and injected into the request's +/// The tracing information is taken from the current `ServiceContext`, and injected into the request's /// metadata, to be picked up by the server-side ``ServerTracingInterceptor``. +/// /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ClientTracingInterceptor: ClientInterceptor { @@ -28,10 +29,10 @@ public struct ClientTracingInterceptor: ClientInterceptor { /// Create a new instance of a ``ClientTracingInterceptor``. public init() {} - /// This interceptor will inject as the request's metadata whatever `ServiceContext` key/value pairs + /// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs /// have been made available by the tracing implementation bootstrapped in your application. /// - /// Which key/value pairs are injected will depend on the specific tracing implementation + /// Which key-value pairs are injected will depend on the specific tracing implementation /// that has been configured when bootstrapping `swift-distributed-tracing` in your application. public func intercept( request: ClientRequest.Stream, diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift index 7e78c076a..b377f543d 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -19,7 +19,7 @@ import Tracing /// A server interceptor that extracts tracing information from the request. /// -/// The extracted tracing information will be made available to user code via the current `ServiceContext`. +/// The extracted tracing information is made available to user code via the current `ServiceContext`. /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ServerTracingInterceptor: ServerInterceptor { @@ -27,11 +27,11 @@ public struct ServerTracingInterceptor: ServerInterceptor { /// Create a new instance of a ``ServerTracingInterceptor``. public init() {} - /// This interceptor will extract whatever `ServiceContext` key/value pairs have been inserted into the + /// This interceptor will extract whatever `ServiceContext` key-value pairs have been inserted into the /// request's metadata, and will make them available to user code via the `ServiceContext/current` /// context. /// - /// Which key/value pairs are extracted and made available will depend on the specific tracing implementation + /// Which key-value pairs are extracted and made available will depend on the specific tracing implementation /// that has been configured when bootstrapping `swift-distributed-tracing` in your application. public func intercept( request: ServerRequest.Stream, From fa3332be6e730dc015c402f28c51facf920f3980 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 16 Jan 2024 13:15:28 +0000 Subject: [PATCH 05/14] Move tests out of GRPCCore --- Package.swift | 2 +- .../Call/TracingInterceptorTests.swift | 77 ------------------- .../TracingInterceptorTests.swift | 43 ++++++++++- .../TracingTestsUtils.swift | 0 4 files changed, 41 insertions(+), 81 deletions(-) delete mode 100644 Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift rename Tests/{GRPCCoreTests/Call => GRPCInterceptorsTests}/TracingTestsUtils.swift (100%) diff --git a/Package.swift b/Package.swift index 51079c5f7..4c77e0cd1 100644 --- a/Package.swift +++ b/Package.swift @@ -279,7 +279,6 @@ extension Target { dependencies: [ .grpcCore, .grpcInProcessTransport, - .grpcInterceptors, .dequeModule, .atomics ] @@ -296,6 +295,7 @@ extension Target { static let grpcInterceptorsTests: Target = .testTarget( name: "GRPCInterceptorsTests", dependencies: [ + .grpcCore, .grpcInterceptors ] ) diff --git a/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift b/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift deleted file mode 100644 index ddd5f3548..000000000 --- a/Tests/GRPCCoreTests/Call/TracingInterceptorTests.swift +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2024, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import GRPCCore -import GRPCInterceptors -import Tracing -import XCTest - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class ClientTracingInterceptorTests: TracingTestCase { - func testClientInterceptor() async throws { - var serviceContext = ServiceContext.topLevel - let traceIDString = UUID().uuidString - serviceContext.traceID = traceIDString - try await ServiceContext.withValue(serviceContext) { - let tester = ClientRPCExecutorTestHarness(server: .echo) - try await tester.unary( - request: ClientRequest.Single(message: [1, 2, 3], metadata: ["foo": "bar"]), - interceptors: [ClientTracingInterceptor()] - ) { response in - XCTAssertEqual( - response.metadata, - [ - "foo": "bar", - "trace-id": "\(traceIDString)", - ] - ) - XCTAssertEqual(try response.message, [1, 2, 3]) - } - - XCTAssertEqual(tester.clientStreamsOpened, 1) - XCTAssertEqual(tester.serverStreamsAccepted, 1) - } - } - - func testServerInterceptor() async throws { - let harness = ServerRPCExecutorTestHarness(interceptors: [ServerTracingInterceptor()]) - try await harness.execute( - deserializer: IdentityDeserializer(), - serializer: IdentitySerializer() - ) { request in - guard let serviceContext = ServiceContext.current else { - XCTFail("There should be a service context present.") - return .init( - error: .init( - status: .init( - code: .failedPrecondition, - message: "There should be a service context present." - ) - )! - ) - } - - let traceID = serviceContext.traceID - XCTAssertEqual("some-trace-id", traceID) - - return .init(accepted: .success(.init(metadata: [], producer: { _ in [] }))) - } producer: { inbound in - try await inbound.write(.metadata(["trace-id": "some-trace-id"])) - inbound.finish() - } consumer: { _ in - } - } -} diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index b63ccd7fa..529085a9d 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -20,7 +20,44 @@ import XCTest @testable import GRPCInterceptors @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class TracingInterceptorTests: XCTestCase { - // TODO: move interceptor tests from GRPCCoreTests to here once we've - // split out test utils/harness into their own target. +final class TracingInterceptorTests: TracingTestCase { + func testClientInterceptor() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + let interceptor = ClientTracingInterceptor() + + serviceContext.traceID = traceIDString + try await ServiceContext.withValue(serviceContext) { + let response = try await interceptor.intercept( + request: .init(producer: { writer in try await writer.write(["request"])}), + context: .init(descriptor: .init(service: "foo", method: "bar"))) { stream, _ in + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + return .init(metadata: [], bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) }))) + } + + var messages = response.messages.makeAsyncIterator() + let message = try await messages.next() + XCTAssertEqual(message, ["response"]) + } + } + + func testServerInterceptor() async throws { + let interceptor = ServerTracingInterceptor() + let response = try await interceptor.intercept( + request: .init(single: .init(metadata: [], message: [])), + context: .init(descriptor: .init(service: "foo", method: "bar"))) { _, _ in + return .init(accepted: .success(.init(metadata: [], producer: { writer in + guard let serviceContext = ServiceContext.current else { + XCTFail("There should be a service context present.") + return [] + } + + let traceID = serviceContext.traceID + XCTAssertEqual("some-trace-id", traceID) + + try await writer.write("response") + return [] + }))) + } + } } diff --git a/Tests/GRPCCoreTests/Call/TracingTestsUtils.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift similarity index 100% rename from Tests/GRPCCoreTests/Call/TracingTestsUtils.swift rename to Tests/GRPCInterceptorsTests/TracingTestsUtils.swift From f6ff4a1ec3c9f82240c0a3cf2de2f8b19bcd2672 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Tue, 16 Jan 2024 14:59:08 +0000 Subject: [PATCH 06/14] Create injector/extractor in interceptor inits --- .../ClientTracingInterceptor.swift | 15 +++++++++------ .../ServerTracingInterceptor.swift | 13 ++++++++----- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index 512bc8a87..57e4f07e0 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -25,9 +25,12 @@ import Tracing /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ClientTracingInterceptor: ClientInterceptor { + private let injector: ClientRequestInjector /// Create a new instance of a ``ClientTracingInterceptor``. - public init() {} + public init() { + self.injector = ClientRequestInjector() + } /// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs /// have been made available by the tracing implementation bootstrapped in your application. @@ -46,8 +49,8 @@ public struct ClientTracingInterceptor: ClientInterceptor { tracer.inject( serviceContext, - into: &request, - using: ClientRequestInjector() + into: &request.metadata, + using: self.injector ) return try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .client) { span in @@ -62,10 +65,10 @@ public struct ClientTracingInterceptor: ClientInterceptor { /// An injector responsible for injecting the required instrumentation keys from the `ServiceContext` into /// the request metadata. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -struct ClientRequestInjector: Instrumentation.Injector { - typealias Carrier = ClientRequest.Stream +struct ClientRequestInjector: Instrumentation.Injector { + typealias Carrier = Metadata func inject(_ value: String, forKey key: String, into carrier: inout Carrier) { - carrier.metadata.addString(value, forKey: key) + carrier.addString(value, forKey: key) } } diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift index b377f543d..2959184a3 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -23,9 +23,12 @@ import Tracing /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ServerTracingInterceptor: ServerInterceptor { + private let extractor: ServerRequestExtractor /// Create a new instance of a ``ServerTracingInterceptor``. - public init() {} + public init() { + self.extractor = ServerRequestExtractor() + } /// This interceptor will extract whatever `ServiceContext` key-value pairs have been inserted into the /// request's metadata, and will make them available to user code via the `ServiceContext/current` @@ -43,7 +46,7 @@ public struct ServerTracingInterceptor: ServerInterceptor { let tracer = InstrumentationSystem.tracer tracer.extract( - request, + request.metadata, into: &serviceContext, using: ServerRequestExtractor() ) @@ -59,11 +62,11 @@ public struct ServerTracingInterceptor: ServerInterceptor { /// An extractor responsible for extracting the required instrumentation keys from request metadata. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -struct ServerRequestExtractor: Instrumentation.Extractor { - typealias Carrier = ServerRequest.Stream +struct ServerRequestExtractor: Instrumentation.Extractor { + typealias Carrier = Metadata func extract(key: String, from carrier: Carrier) -> String? { - var values = carrier.metadata[stringValues: key].makeIterator() + var values = carrier[stringValues: key].makeIterator() // There should only be one value for each key. If more, pick just one. return values.next() } From 7a54d8dc74e1a0f2ce29907fa03f7a0b808b6992 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 17 Jan 2024 15:32:34 +0000 Subject: [PATCH 07/14] Add option to record more events in span, and other PR changes --- .../ClientTracingInterceptor.swift | 25 ++- Sources/GRPCInterceptors/HookedWriter.swift | 40 ++++ .../ServerTracingInterceptor.swift | 51 ++++- .../TracingInterceptorTests.swift | 209 ++++++++++++++++-- .../TracingTestsUtils.swift | 98 ++++---- 5 files changed, 354 insertions(+), 69 deletions(-) create mode 100644 Sources/GRPCInterceptors/HookedWriter.swift diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index 57e4f07e0..3763412f0 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -26,10 +26,12 @@ import Tracing @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ClientTracingInterceptor: ClientInterceptor { private let injector: ClientRequestInjector + private let emitEventOnEachWrite: Bool /// Create a new instance of a ``ClientTracingInterceptor``. - public init() { + public init(emitEventOnEachWrite: Bool = false) { self.injector = ClientRequestInjector() + self.emitEventOnEachWrite = emitEventOnEachWrite } /// This interceptor will inject as the request's metadata whatever `ServiceContext` key-value pairs @@ -54,9 +56,26 @@ public struct ClientTracingInterceptor: ClientInterceptor { ) return try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .client) { span in - span.addEvent("Sending request") + span.addEvent("Request started") + + if self.emitEventOnEachWrite { + let wrappedProducer = request.producer + request.producer = { writer in + let eventEmittingWriter = HookedWriter( + wrapping: writer, + beforeEachWrite: { + span.addEvent("Sending request part") + }, + afterEachWrite: { + span.addEvent("Sent request part") + }) + + try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) + } + } + let response = try await next(request, context) - span.addEvent("Received response") + span.addEvent("Received response end") return response } } diff --git a/Sources/GRPCInterceptors/HookedWriter.swift b/Sources/GRPCInterceptors/HookedWriter.swift new file mode 100644 index 000000000..6f3e91374 --- /dev/null +++ b/Sources/GRPCInterceptors/HookedWriter.swift @@ -0,0 +1,40 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import GRPCCore +import Tracing + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +struct HookedWriter: RPCWriterProtocol { + private let writer: any RPCWriterProtocol + private let beforeEachWrite: @Sendable () -> Void + private let afterEachWrite: @Sendable () -> Void + + public init( + wrapping other: some RPCWriterProtocol, + beforeEachWrite: @Sendable @escaping () -> Void, + afterEachWrite: @Sendable @escaping () -> Void + ) { + self.writer = other + self.beforeEachWrite = beforeEachWrite + self.afterEachWrite = afterEachWrite + } + + public func write(contentsOf elements: some Sequence) async throws { + self.beforeEachWrite() + try await self.writer.write(contentsOf: elements) + self.afterEachWrite() + } +} diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift index 2959184a3..fa94d9ba6 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -24,10 +24,12 @@ import Tracing @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ServerTracingInterceptor: ServerInterceptor { private let extractor: ServerRequestExtractor + private let emitEventOnEachWrite: Bool /// Create a new instance of a ``ServerTracingInterceptor``. - public init() { + public init(emitEventOnEachWrite: Bool = false) { self.extractor = ServerRequestExtractor() + self.emitEventOnEachWrite = emitEventOnEachWrite } /// This interceptor will extract whatever `ServiceContext` key-value pairs have been inserted into the @@ -48,14 +50,49 @@ public struct ServerTracingInterceptor: ServerInterceptor { tracer.extract( request.metadata, into: &serviceContext, - using: ServerRequestExtractor() + using: self.extractor ) - return try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .server) { span in - span.addEvent("Received request") - let response = try await next(request, context) - span.addEvent("Sending response") - return response + return try await ServiceContext.withValue(serviceContext) { + try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .server) { span in + span.addEvent("Received request") + + var response = try await next(request, context) + + switch response.accepted { + case .success(var success): + let wrappedProducer = success.producer + + if self.emitEventOnEachWrite { + success.producer = { writer in + let eventEmittingWriter = HookedWriter( + wrapping: writer, + beforeEachWrite: { + span.addEvent("Sending response part") + }, + afterEachWrite: { + span.addEvent("Sent response part") + }) + + let wrappedResult = try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) + span.addEvent("Sent response end") + return wrappedResult + } + } else { + success.producer = { writer in + let wrappedResult = try await wrappedProducer(writer) + span.addEvent("Sent response end") + return wrappedResult + } + } + + response = .init(accepted: .success(success)) + case .failure: + span.addEvent("Sent error response") + } + + return response + } } } } diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 529085a9d..884dfa0aa 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + import GRPCCore import Tracing import XCTest @@ -20,44 +21,216 @@ import XCTest @testable import GRPCInterceptors @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -final class TracingInterceptorTests: TracingTestCase { +final class TracingInterceptorTests: XCTestCase { + override class func setUp() { + InstrumentationSystem.bootstrap(TestTracer()) + } + func testClientInterceptor() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString - let interceptor = ClientTracingInterceptor() - + let interceptor = ClientTracingInterceptor(emitEventOnEachWrite: false) + let (stream, continuation) = AsyncStream.makeStream() serviceContext.traceID = traceIDString + try await ServiceContext.withValue(serviceContext) { let response = try await interceptor.intercept( - request: .init(producer: { writer in try await writer.write(["request"])}), + request: .init(producer: { writer in + try await writer.write(contentsOf: ["request1"]) + try await writer.write(contentsOf: ["request2"]) + }), context: .init(descriptor: .init(service: "foo", method: "bar"))) { stream, _ in + // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the response stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + try await stream.producer(writer) + continuation.finish() + return .init(metadata: [], bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) }))) } + var streamIterator = stream.makeAsyncIterator() + var element = await streamIterator.next() + XCTAssertEqual(element, "request1") + element = await streamIterator.next() + XCTAssertEqual(element, "request2") + element = await streamIterator.next() + XCTAssertNil(element) + var messages = response.messages.makeAsyncIterator() let message = try await messages.next() XCTAssertEqual(message, ["response"]) + + let tracer = InstrumentationSystem.tracer as! TestTracer + XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ + "Request started", + "Received response end" + ]) } } + + func testClientInterceptorAllEventsRecorded() async throws { + var serviceContext = ServiceContext.topLevel + let traceIDString = UUID().uuidString + let interceptor = ClientTracingInterceptor(emitEventOnEachWrite: true) + let (stream, continuation) = AsyncStream.makeStream() + serviceContext.traceID = traceIDString + try await ServiceContext.withValue(serviceContext) { + let response = try await interceptor.intercept( + request: .init(producer: { writer in + try await writer.write(contentsOf: ["request1"]) + try await writer.write(contentsOf: ["request2"]) + }), + context: .init(descriptor: .init(service: "foo", method: "bar"))) { stream, _ in + // Assert the metadata contains the injected context key-value. + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the response stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + try await stream.producer(writer) + continuation.finish() + + return .init(metadata: [], bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) }))) + } + + var streamIterator = stream.makeAsyncIterator() + var element = await streamIterator.next() + XCTAssertEqual(element, "request1") + element = await streamIterator.next() + XCTAssertEqual(element, "request2") + element = await streamIterator.next() + XCTAssertNil(element) + + var messages = response.messages.makeAsyncIterator() + let message = try await messages.next() + XCTAssertEqual(message, ["response"]) + + let tracer = InstrumentationSystem.tracer as! TestTracer + XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ + "Request started", + // Recorded when `request1` is sent + "Sending request part", + "Sent request part", + // Recorded when `request2` is sent + "Sending request part", + "Sent request part", + // Recorded at end of `producer` + "Received response end" + ]) + } + } + + func testServerInterceptorErrorResponse() async throws { + let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: false) + let response = try await interceptor.intercept( + request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), + context: .init(descriptor: .init(service: "foo", method: "bar"))) { _, _ in + ServerResponse.Stream(error: .init(code: .unknown, message: "Test error")) + } + XCTAssertThrowsError(try response.accepted.get()) + + let tracer = InstrumentationSystem.tracer as! TestTracer + XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ + "Received request", + "Sent error response" + ]) + } + func testServerInterceptor() async throws { - let interceptor = ServerTracingInterceptor() + let (stream, continuation) = AsyncStream.makeStream() + let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: false) let response = try await interceptor.intercept( - request: .init(single: .init(metadata: [], message: [])), + request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), context: .init(descriptor: .init(service: "foo", method: "bar"))) { _, _ in - return .init(accepted: .success(.init(metadata: [], producer: { writer in - guard let serviceContext = ServiceContext.current else { - XCTFail("There should be a service context present.") - return [] - } - - let traceID = serviceContext.traceID - XCTAssertEqual("some-trace-id", traceID) - - try await writer.write("response") - return [] - }))) + { [serviceContext = ServiceContext.current] in + return ServerResponse.Stream(accepted: .success(.init(metadata: [], producer: { writer in + guard let serviceContext else { + XCTFail("There should be a service context present.") + return ["Result": "Test failed"] + } + + let traceID = serviceContext.traceID + XCTAssertEqual("some-trace-id", traceID) + + try await writer.write("response1") + try await writer.write("response2") + + return ["Result": "Trailing metadata"] + }))) + }() } + + let responseContents = try response.accepted.get() + let trailingMetadata = try await responseContents.producer(RPCWriter(wrapping: TestWriter(streamContinuation: continuation))) + continuation.finish() + XCTAssertEqual(trailingMetadata, ["Result": "Trailing metadata"]) + + var streamIterator = stream.makeAsyncIterator() + var element = await streamIterator.next() + XCTAssertEqual(element, "response1") + element = await streamIterator.next() + XCTAssertEqual(element, "response2") + element = await streamIterator.next() + XCTAssertNil(element) + + let tracer = InstrumentationSystem.tracer as! TestTracer + XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ + "Received request", + "Sent response end" + ]) + } + + func testServerInterceptorAllEventsRecorded() async throws { + let (stream, continuation) = AsyncStream.makeStream() + let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: true) + let response = try await interceptor.intercept( + request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), + context: .init(descriptor: .init(service: "foo", method: "bar"))) { _, _ in + { [serviceContext = ServiceContext.current] in + return ServerResponse.Stream(accepted: .success(.init(metadata: [], producer: { writer in + guard let serviceContext else { + XCTFail("There should be a service context present.") + return ["Result": "Test failed"] + } + + let traceID = serviceContext.traceID + XCTAssertEqual("some-trace-id", traceID) + + try await writer.write("response1") + try await writer.write("response2") + + return ["Result": "Trailing metadata"] + }))) + }() + } + + let responseContents = try response.accepted.get() + let trailingMetadata = try await responseContents.producer(RPCWriter(wrapping: TestWriter(streamContinuation: continuation))) + continuation.finish() + XCTAssertEqual(trailingMetadata, ["Result": "Trailing metadata"]) + + var streamIterator = stream.makeAsyncIterator() + var element = await streamIterator.next() + XCTAssertEqual(element, "response1") + element = await streamIterator.next() + XCTAssertEqual(element, "response2") + element = await streamIterator.next() + XCTAssertNil(element) + + let tracer = InstrumentationSystem.tracer as! TestTracer + XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ + "Received request", + // Recorded when `response1` is sent + "Sending response part", + "Sent response part", + // Recorded when `response2` is sent + "Sending response part", + "Sent response part", + // Recorded when we're done sending response + "Sent response end" + ]) } } diff --git a/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift index 238c00dad..0b62d10de 100644 --- a/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift +++ b/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift @@ -14,19 +14,18 @@ * limitations under the License. */ +import GRPCCore import Tracing -import XCTest -class TracingTestCase: XCTestCase { - override class func setUp() { - InstrumentationSystem.bootstrap(TestTracer()) +class TestTracer: Tracer { + typealias Span = TestSpan + + private var latestTestSpan: TestSpan? + var latestSpanEvents: [SpanEvent] { + self.latestTestSpan?.events ?? [] } -} - -public struct TestTracer: Tracer { - public typealias Span = TestSpan - public func extract( + func extract( _ carrier: Carrier, into context: inout ServiceContextModule.ServiceContext, using extractor: Extract @@ -35,7 +34,7 @@ public struct TestTracer: Tracer { context[TraceID.self] = traceID } - public func inject( + func inject( _ context: ServiceContextModule.ServiceContext, into carrier: inout Carrier, using injector: Inject @@ -45,11 +44,11 @@ public struct TestTracer: Tracer { } } - public func forceFlush() { + func forceFlush() { // no-op } - public func startSpan( + func startSpan( _ operationName: String, context: @autoclosure () -> ServiceContext, ofKind kind: SpanKind, @@ -58,65 +57,66 @@ public struct TestTracer: Tracer { file fileID: String, line: UInt ) -> TestSpan where Instant: TracerInstant { - TestSpan(context: context(), operationName: operationName) + self.latestTestSpan = TestSpan(context: context(), operationName: operationName) + return latestTestSpan! } } -public class TestSpan: Span { - public var context: ServiceContextModule.ServiceContext - public var operationName: String - public var attributes: Tracing.SpanAttributes - public var isRecording: Bool - public private(set) var status: Tracing.SpanStatus? - public private(set) var events: [Tracing.SpanEvent] = [] +class TestSpan: Span { + var context: ServiceContextModule.ServiceContext + var operationName: String + var attributes: Tracing.SpanAttributes + var isRecording: Bool + private(set) var status: Tracing.SpanStatus? + private(set) var events: [Tracing.SpanEvent] = [] - public init( + init( context: ServiceContextModule.ServiceContext, operationName: String, attributes: Tracing.SpanAttributes = [:], isRecording: Bool = true ) { - self.context = context - self.operationName = operationName - self.attributes = attributes - self.isRecording = isRecording + self.context = context + self.operationName = operationName + self.attributes = attributes + self.isRecording = isRecording } - public func setStatus(_ status: Tracing.SpanStatus) { - self.status = status + func setStatus(_ status: Tracing.SpanStatus) { + self.status = status } - public func addEvent(_ event: Tracing.SpanEvent) { - self.events.append(event) + func addEvent(_ event: Tracing.SpanEvent) { + self.events.append(event) } - public func recordError( - _ error: any Error, - attributes: Tracing.SpanAttributes, - at instant: @autoclosure () -> Instant + func recordError( + _ error: any Error, + attributes: Tracing.SpanAttributes, + at instant: @autoclosure () -> Instant ) where Instant: Tracing.TracerInstant { - self.setStatus(.init( - code: .error, - message: "Error: \(error), attributes: \(attributes), at instant: \(instant())" - )) + self.setStatus(.init( + code: .error, + message: "Error: \(error), attributes: \(attributes), at instant: \(instant())" + )) } - public func addLink(_ link: Tracing.SpanLink) { + func addLink(_ link: Tracing.SpanLink) { self.context.spanLinks?.append(link) } - public func end(at instant: @autoclosure () -> Instant) where Instant: Tracing.TracerInstant { + func end(at instant: @autoclosure () -> Instant) where Instant: Tracing.TracerInstant { self.setStatus(.init(code: .ok, message: "Ended at instant: \(instant())")) } } -internal enum TraceID: ServiceContextModule.ServiceContextKey { +enum TraceID: ServiceContextModule.ServiceContextKey { typealias Value = String static let keyName = "trace-id" } -internal enum ServiceContextSpanLinksKey: ServiceContextModule.ServiceContextKey { +enum ServiceContextSpanLinksKey: ServiceContextModule.ServiceContextKey { typealias Value = [SpanLink] static let keyName = "span-links" @@ -141,3 +141,19 @@ extension ServiceContext { } } } + +struct TestWriter: RPCWriterProtocol { + typealias Element = WriterElement + + private let streamContinuation: AsyncStream.Continuation + + init(streamContinuation: AsyncStream.Continuation) { + self.streamContinuation = streamContinuation + } + + func write(contentsOf elements: some Sequence) async throws { + elements.forEach { element in + self.streamContinuation.yield(element) + } + } +} From 52829d3ddcfaf9aa660a4fd2a608258eea90d2aa Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 17 Jan 2024 15:34:25 +0000 Subject: [PATCH 08/14] Fix doc nit --- Sources/GRPCInterceptors/ClientTracingInterceptor.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index 3763412f0..ae3ac37da 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -20,7 +20,7 @@ import Tracing /// A client interceptor that injects tracing information into the request. /// /// The tracing information is taken from the current `ServiceContext`, and injected into the request's -/// metadata, to be picked up by the server-side ``ServerTracingInterceptor``. +/// metadata. I twill then be picked up by the server-side ``ServerTracingInterceptor``. /// /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) From 6ddaa2f0f567c49975b0f6d005b1455dbc33cb76 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 17 Jan 2024 15:37:39 +0000 Subject: [PATCH 09/14] Fix rebase mistake --- Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.swift b/Package.swift index 4c77e0cd1..7f78ec35d 100644 --- a/Package.swift +++ b/Package.swift @@ -669,7 +669,7 @@ let package = Package( .grpcCoreTests, .grpcInProcessTransportTests, .grpcCodeGenTests, - .grpcInterceptorsTests + .grpcInterceptorsTests, .grpcHTTP2CoreTests, .grpcHTTP2TransportNIOPosixTests, .grpcHTTP2TransportNIOTransportServicesTests From 8be3c685634fbffde91991a4970cd57338f4f342 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 17 Jan 2024 15:40:21 +0000 Subject: [PATCH 10/14] Formatting --- .../ClientTracingInterceptor.swift | 19 +- Sources/GRPCInterceptors/HookedWriter.swift | 4 +- .../ServerTracingInterceptor.swift | 23 +- .../TracingInterceptorTests.swift | 264 ++++++++++-------- .../TracingTestsUtils.swift | 46 +-- 5 files changed, 207 insertions(+), 149 deletions(-) diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index ae3ac37da..6c14394fd 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -21,7 +21,7 @@ import Tracing /// /// The tracing information is taken from the current `ServiceContext`, and injected into the request's /// metadata. I twill then be picked up by the server-side ``ServerTracingInterceptor``. -/// +/// /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ClientTracingInterceptor: ClientInterceptor { @@ -48,16 +48,20 @@ public struct ClientTracingInterceptor: ClientInterceptor { var request = request let tracer = InstrumentationSystem.tracer let serviceContext = ServiceContext.current ?? .topLevel - + tracer.inject( serviceContext, into: &request.metadata, using: self.injector ) - - return try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .client) { span in + + return try await tracer.withSpan( + context.descriptor.fullyQualifiedMethod, + context: serviceContext, + ofKind: .client + ) { span in span.addEvent("Request started") - + if self.emitEventOnEachWrite { let wrappedProducer = request.producer request.producer = { writer in @@ -68,8 +72,9 @@ public struct ClientTracingInterceptor: ClientInterceptor { }, afterEachWrite: { span.addEvent("Sent request part") - }) - + } + ) + try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) } } diff --git a/Sources/GRPCInterceptors/HookedWriter.swift b/Sources/GRPCInterceptors/HookedWriter.swift index 6f3e91374..4c6c1ffbb 100644 --- a/Sources/GRPCInterceptors/HookedWriter.swift +++ b/Sources/GRPCInterceptors/HookedWriter.swift @@ -21,7 +21,7 @@ struct HookedWriter: RPCWriterProtocol { private let writer: any RPCWriterProtocol private let beforeEachWrite: @Sendable () -> Void private let afterEachWrite: @Sendable () -> Void - + public init( wrapping other: some RPCWriterProtocol, beforeEachWrite: @Sendable @escaping () -> Void, @@ -31,7 +31,7 @@ struct HookedWriter: RPCWriterProtocol { self.beforeEachWrite = beforeEachWrite self.afterEachWrite = afterEachWrite } - + public func write(contentsOf elements: some Sequence) async throws { self.beforeEachWrite() try await self.writer.write(contentsOf: elements) diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift index fa94d9ba6..c2414cc65 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -46,23 +46,27 @@ public struct ServerTracingInterceptor: ServerInterceptor { ) async throws -> ServerResponse.Stream where Input: Sendable, Output: Sendable { var serviceContext = ServiceContext.topLevel let tracer = InstrumentationSystem.tracer - + tracer.extract( request.metadata, into: &serviceContext, using: self.extractor ) - + return try await ServiceContext.withValue(serviceContext) { - try await tracer.withSpan(context.descriptor.fullyQualifiedMethod, context: serviceContext, ofKind: .server) { span in + try await tracer.withSpan( + context.descriptor.fullyQualifiedMethod, + context: serviceContext, + ofKind: .server + ) { span in span.addEvent("Received request") var response = try await next(request, context) - + switch response.accepted { case .success(var success): let wrappedProducer = success.producer - + if self.emitEventOnEachWrite { success.producer = { writer in let eventEmittingWriter = HookedWriter( @@ -72,9 +76,12 @@ public struct ServerTracingInterceptor: ServerInterceptor { }, afterEachWrite: { span.addEvent("Sent response part") - }) - - let wrappedResult = try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) + } + ) + + let wrappedResult = try await wrappedProducer( + RPCWriter(wrapping: eventEmittingWriter) + ) span.addEvent("Sent response end") return wrappedResult } diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 884dfa0aa..e6f67e020 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -39,18 +39,22 @@ final class TracingInterceptorTests: XCTestCase { try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: .init(descriptor: .init(service: "foo", method: "bar"))) { stream, _ in - // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - - // Write into the response stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) - try await stream.producer(writer) - continuation.finish() - - return .init(metadata: [], bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) }))) - } - + context: .init(descriptor: .init(service: "foo", method: "bar")) + ) { stream, _ in + // Assert the metadata contains the injected context key-value. + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the response stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + try await stream.producer(writer) + continuation.finish() + + return .init( + metadata: [], + bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) })) + ) + } + var streamIterator = stream.makeAsyncIterator() var element = await streamIterator.next() XCTAssertEqual(element, "request1") @@ -58,19 +62,22 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual(element, "request2") element = await streamIterator.next() XCTAssertNil(element) - + var messages = response.messages.makeAsyncIterator() let message = try await messages.next() XCTAssertEqual(message, ["response"]) - + let tracer = InstrumentationSystem.tracer as! TestTracer - XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ - "Request started", - "Received response end" - ]) + XCTAssertEqual( + tracer.latestSpanEvents.map { $0.name }, + [ + "Request started", + "Received response end", + ] + ) } } - + func testClientInterceptorAllEventsRecorded() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString @@ -84,18 +91,22 @@ final class TracingInterceptorTests: XCTestCase { try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: .init(descriptor: .init(service: "foo", method: "bar"))) { stream, _ in - // Assert the metadata contains the injected context key-value. - XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) - - // Write into the response stream to make sure the `producer` closure's called. - let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) - try await stream.producer(writer) - continuation.finish() - - return .init(metadata: [], bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) }))) - } - + context: .init(descriptor: .init(service: "foo", method: "bar")) + ) { stream, _ in + // Assert the metadata contains the injected context key-value. + XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) + + // Write into the response stream to make sure the `producer` closure's called. + let writer = RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + try await stream.producer(writer) + continuation.finish() + + return .init( + metadata: [], + bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) })) + ) + } + var streamIterator = stream.makeAsyncIterator() var element = await streamIterator.next() XCTAssertEqual(element, "request1") @@ -103,23 +114,26 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual(element, "request2") element = await streamIterator.next() XCTAssertNil(element) - + var messages = response.messages.makeAsyncIterator() let message = try await messages.next() XCTAssertEqual(message, ["response"]) - + let tracer = InstrumentationSystem.tracer as! TestTracer - XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ - "Request started", - // Recorded when `request1` is sent - "Sending request part", - "Sent request part", - // Recorded when `request2` is sent - "Sending request part", - "Sent request part", - // Recorded at end of `producer` - "Received response end" - ]) + XCTAssertEqual( + tracer.latestSpanEvents.map { $0.name }, + [ + "Request started", + // Recorded when `request1` is sent + "Sending request part", + "Sent request part", + // Recorded when `request2` is sent + "Sending request part", + "Sent request part", + // Recorded at end of `producer` + "Received response end", + ] + ) } } @@ -127,47 +141,61 @@ final class TracingInterceptorTests: XCTestCase { let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: false) let response = try await interceptor.intercept( request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), - context: .init(descriptor: .init(service: "foo", method: "bar"))) { _, _ in - ServerResponse.Stream(error: .init(code: .unknown, message: "Test error")) - } + context: .init(descriptor: .init(service: "foo", method: "bar")) + ) { _, _ in + ServerResponse.Stream(error: .init(code: .unknown, message: "Test error")) + } XCTAssertThrowsError(try response.accepted.get()) let tracer = InstrumentationSystem.tracer as! TestTracer - XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ - "Received request", - "Sent error response" - ]) + XCTAssertEqual( + tracer.latestSpanEvents.map { $0.name }, + [ + "Received request", + "Sent error response", + ] + ) } - + func testServerInterceptor() async throws { let (stream, continuation) = AsyncStream.makeStream() let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: false) let response = try await interceptor.intercept( request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), - context: .init(descriptor: .init(service: "foo", method: "bar"))) { _, _ in - { [serviceContext = ServiceContext.current] in - return ServerResponse.Stream(accepted: .success(.init(metadata: [], producer: { writer in - guard let serviceContext else { - XCTFail("There should be a service context present.") - return ["Result": "Test failed"] - } - - let traceID = serviceContext.traceID - XCTAssertEqual("some-trace-id", traceID) - - try await writer.write("response1") - try await writer.write("response2") - - return ["Result": "Trailing metadata"] - }))) - }() - } - + context: .init(descriptor: .init(service: "foo", method: "bar")) + ) { _, _ in + { [serviceContext = ServiceContext.current] in + return ServerResponse.Stream( + accepted: .success( + .init( + metadata: [], + producer: { writer in + guard let serviceContext else { + XCTFail("There should be a service context present.") + return ["Result": "Test failed"] + } + + let traceID = serviceContext.traceID + XCTAssertEqual("some-trace-id", traceID) + + try await writer.write("response1") + try await writer.write("response2") + + return ["Result": "Trailing metadata"] + } + ) + ) + ) + }() + } + let responseContents = try response.accepted.get() - let trailingMetadata = try await responseContents.producer(RPCWriter(wrapping: TestWriter(streamContinuation: continuation))) + let trailingMetadata = try await responseContents.producer( + RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + ) continuation.finish() XCTAssertEqual(trailingMetadata, ["Result": "Trailing metadata"]) - + var streamIterator = stream.makeAsyncIterator() var element = await streamIterator.next() XCTAssertEqual(element, "response1") @@ -177,41 +205,54 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertNil(element) let tracer = InstrumentationSystem.tracer as! TestTracer - XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ - "Received request", - "Sent response end" - ]) + XCTAssertEqual( + tracer.latestSpanEvents.map { $0.name }, + [ + "Received request", + "Sent response end", + ] + ) } - + func testServerInterceptorAllEventsRecorded() async throws { let (stream, continuation) = AsyncStream.makeStream() let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: true) let response = try await interceptor.intercept( request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), - context: .init(descriptor: .init(service: "foo", method: "bar"))) { _, _ in - { [serviceContext = ServiceContext.current] in - return ServerResponse.Stream(accepted: .success(.init(metadata: [], producer: { writer in - guard let serviceContext else { - XCTFail("There should be a service context present.") - return ["Result": "Test failed"] - } - - let traceID = serviceContext.traceID - XCTAssertEqual("some-trace-id", traceID) - - try await writer.write("response1") - try await writer.write("response2") - - return ["Result": "Trailing metadata"] - }))) - }() - } - + context: .init(descriptor: .init(service: "foo", method: "bar")) + ) { _, _ in + { [serviceContext = ServiceContext.current] in + return ServerResponse.Stream( + accepted: .success( + .init( + metadata: [], + producer: { writer in + guard let serviceContext else { + XCTFail("There should be a service context present.") + return ["Result": "Test failed"] + } + + let traceID = serviceContext.traceID + XCTAssertEqual("some-trace-id", traceID) + + try await writer.write("response1") + try await writer.write("response2") + + return ["Result": "Trailing metadata"] + } + ) + ) + ) + }() + } + let responseContents = try response.accepted.get() - let trailingMetadata = try await responseContents.producer(RPCWriter(wrapping: TestWriter(streamContinuation: continuation))) + let trailingMetadata = try await responseContents.producer( + RPCWriter(wrapping: TestWriter(streamContinuation: continuation)) + ) continuation.finish() XCTAssertEqual(trailingMetadata, ["Result": "Trailing metadata"]) - + var streamIterator = stream.makeAsyncIterator() var element = await streamIterator.next() XCTAssertEqual(element, "response1") @@ -221,16 +262,19 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertNil(element) let tracer = InstrumentationSystem.tracer as! TestTracer - XCTAssertEqual(tracer.latestSpanEvents.map { $0.name }, [ - "Received request", - // Recorded when `response1` is sent - "Sending response part", - "Sent response part", - // Recorded when `response2` is sent - "Sending response part", - "Sent response part", - // Recorded when we're done sending response - "Sent response end" - ]) + XCTAssertEqual( + tracer.latestSpanEvents.map { $0.name }, + [ + "Received request", + // Recorded when `response1` is sent + "Sending response part", + "Sent response part", + // Recorded when `response2` is sent + "Sending response part", + "Sent response part", + // Recorded when we're done sending response + "Sent response end", + ] + ) } } diff --git a/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift index 0b62d10de..058a4173d 100644 --- a/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift +++ b/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift @@ -19,12 +19,12 @@ import Tracing class TestTracer: Tracer { typealias Span = TestSpan - + private var latestTestSpan: TestSpan? var latestSpanEvents: [SpanEvent] { self.latestTestSpan?.events ?? [] } - + func extract( _ carrier: Carrier, into context: inout ServiceContextModule.ServiceContext, @@ -43,11 +43,11 @@ class TestTracer: Tracer { injector.inject(traceID, forKey: TraceID.keyName, into: &carrier) } } - + func forceFlush() { // no-op } - + func startSpan( _ operationName: String, context: @autoclosure () -> ServiceContext, @@ -69,42 +69,44 @@ class TestSpan: Span { var isRecording: Bool private(set) var status: Tracing.SpanStatus? private(set) var events: [Tracing.SpanEvent] = [] - + init( - context: ServiceContextModule.ServiceContext, - operationName: String, - attributes: Tracing.SpanAttributes = [:], - isRecording: Bool = true + context: ServiceContextModule.ServiceContext, + operationName: String, + attributes: Tracing.SpanAttributes = [:], + isRecording: Bool = true ) { self.context = context self.operationName = operationName self.attributes = attributes self.isRecording = isRecording } - + func setStatus(_ status: Tracing.SpanStatus) { self.status = status } - + func addEvent(_ event: Tracing.SpanEvent) { self.events.append(event) } - + func recordError( _ error: any Error, attributes: Tracing.SpanAttributes, at instant: @autoclosure () -> Instant ) where Instant: Tracing.TracerInstant { - self.setStatus(.init( - code: .error, - message: "Error: \(error), attributes: \(attributes), at instant: \(instant())" - )) + self.setStatus( + .init( + code: .error, + message: "Error: \(error), attributes: \(attributes), at instant: \(instant())" + ) + ) } - + func addLink(_ link: Tracing.SpanLink) { self.context.spanLinks?.append(link) } - + func end(at instant: @autoclosure () -> Instant) where Instant: Tracing.TracerInstant { self.setStatus(.init(code: .ok, message: "Ended at instant: \(instant())")) } @@ -131,7 +133,7 @@ extension ServiceContext { self[TraceID.self] = newValue } } - + var spanLinks: [SpanLink]? { get { self[ServiceContextSpanLinksKey.self] @@ -144,13 +146,13 @@ extension ServiceContext { struct TestWriter: RPCWriterProtocol { typealias Element = WriterElement - + private let streamContinuation: AsyncStream.Continuation - + init(streamContinuation: AsyncStream.Continuation) { self.streamContinuation = streamContinuation } - + func write(contentsOf elements: some Sequence) async throws { elements.forEach { element in self.streamContinuation.yield(element) From 941e2f86f062d1e388daee4c1f8e16ace953143a Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 17 Jan 2024 15:51:36 +0000 Subject: [PATCH 11/14] Fix build on older Swift versions --- ...tsUtils.swift => TracingTestsUtilities.swift} | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) rename Tests/GRPCInterceptorsTests/{TracingTestsUtils.swift => TracingTestsUtilities.swift} (88%) diff --git a/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift similarity index 88% rename from Tests/GRPCInterceptorsTests/TracingTestsUtils.swift rename to Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift index 058a4173d..a41156016 100644 --- a/Tests/GRPCInterceptorsTests/TracingTestsUtils.swift +++ b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift @@ -159,3 +159,19 @@ struct TestWriter: RPCWriterProtocol { } } } + +#if swift(<5.9) +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension AsyncStream { + static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: AsyncStream.Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(Element.self, bufferingPolicy: limit) { + continuation = $0 + } + return (stream, continuation) + } +} +#endif From 5111462d0ab7ee1865b75a6342e7eb538e824c45 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Wed, 17 Jan 2024 18:33:39 +0000 Subject: [PATCH 12/14] PR Changes --- Package.swift | 1 + .../ClientTracingInterceptor.swift | 50 ++++++++++++++-- Sources/GRPCInterceptors/HookedWriter.swift | 4 +- .../OnFinishAsyncSequence.swift | 57 +++++++++++++++++++ .../ServerTracingInterceptor.swift | 41 +++++++++++-- .../ClientRPCExecutorTestHarness.swift | 17 ++---- .../TracingInterceptorTests.swift | 37 +++++++++--- .../TracingTestsUtilities.swift | 2 +- 8 files changed, 176 insertions(+), 33 deletions(-) create mode 100644 Sources/GRPCInterceptors/OnFinishAsyncSequence.swift diff --git a/Package.swift b/Package.swift index 7f78ec35d..b3920460d 100644 --- a/Package.swift +++ b/Package.swift @@ -296,6 +296,7 @@ extension Target { name: "GRPCInterceptorsTests", dependencies: [ .grpcCore, + .tracing, .grpcInterceptors ] ) diff --git a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift index 6c14394fd..2bb9395c5 100644 --- a/Sources/GRPCInterceptors/ClientTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ClientTracingInterceptor.swift @@ -20,7 +20,7 @@ import Tracing /// A client interceptor that injects tracing information into the request. /// /// The tracing information is taken from the current `ServiceContext`, and injected into the request's -/// metadata. I twill then be picked up by the server-side ``ServerTracingInterceptor``. +/// metadata. It will then be picked up by the server-side ``ServerTracingInterceptor``. /// /// For more information, refer to the documentation for `swift-distributed-tracing`. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @@ -29,6 +29,10 @@ public struct ClientTracingInterceptor: ClientInterceptor { private let emitEventOnEachWrite: Bool /// Create a new instance of a ``ClientTracingInterceptor``. + /// + /// - Parameter emitEventOnEachWrite: If `true`, each request part sent and response part + /// received will be recorded as a separate event in a tracing span. Otherwise, only the request/response + /// start and end will be recorded as events. public init(emitEventOnEachWrite: Bool = false) { self.injector = ClientRequestInjector() self.emitEventOnEachWrite = emitEventOnEachWrite @@ -75,12 +79,50 @@ public struct ClientTracingInterceptor: ClientInterceptor { } ) - try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) + do { + try await wrappedProducer(RPCWriter(wrapping: eventEmittingWriter)) + } catch { + span.addEvent("Error encountered") + throw error + } + + span.addEvent("Request end") + } + } + + var response: ClientResponse.Stream + do { + response = try await next(request, context) + } catch { + span.addEvent("Error encountered") + throw error + } + + switch response.accepted { + case .success(var success): + if self.emitEventOnEachWrite { + let onEachPartRecordingSequence = success.bodyParts.map { element in + span.addEvent("Received response part") + return element + } + let onFinishRecordingSequence = OnFinishAsyncSequence( + wrapping: onEachPartRecordingSequence + ) { + span.addEvent("Received response end") + } + success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) + response.accepted = .success(success) + } else { + let onFinishRecordingSequence = OnFinishAsyncSequence(wrapping: success.bodyParts) { + span.addEvent("Received response end") + } + success.bodyParts = RPCAsyncSequence(wrapping: onFinishRecordingSequence) + response.accepted = .success(success) } + case .failure: + span.addEvent("Received error response") } - let response = try await next(request, context) - span.addEvent("Received response end") return response } } diff --git a/Sources/GRPCInterceptors/HookedWriter.swift b/Sources/GRPCInterceptors/HookedWriter.swift index 4c6c1ffbb..b4bb52eed 100644 --- a/Sources/GRPCInterceptors/HookedWriter.swift +++ b/Sources/GRPCInterceptors/HookedWriter.swift @@ -22,7 +22,7 @@ struct HookedWriter: RPCWriterProtocol { private let beforeEachWrite: @Sendable () -> Void private let afterEachWrite: @Sendable () -> Void - public init( + init( wrapping other: some RPCWriterProtocol, beforeEachWrite: @Sendable @escaping () -> Void, afterEachWrite: @Sendable @escaping () -> Void @@ -32,7 +32,7 @@ struct HookedWriter: RPCWriterProtocol { self.afterEachWrite = afterEachWrite } - public func write(contentsOf elements: some Sequence) async throws { + func write(contentsOf elements: some Sequence) async throws { self.beforeEachWrite() try await self.writer.write(contentsOf: elements) self.afterEachWrite() diff --git a/Sources/GRPCInterceptors/OnFinishAsyncSequence.swift b/Sources/GRPCInterceptors/OnFinishAsyncSequence.swift new file mode 100644 index 000000000..311e1fcd8 --- /dev/null +++ b/Sources/GRPCInterceptors/OnFinishAsyncSequence.swift @@ -0,0 +1,57 @@ +/* + * Copyright 2024, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +struct OnFinishAsyncSequence: AsyncSequence, Sendable { + private let _makeAsyncIterator: @Sendable () -> AsyncIterator + + init( + wrapping other: S, + onFinish: @escaping () -> Void + ) where S.Element == Element { + self._makeAsyncIterator = { + AsyncIterator(wrapping: other.makeAsyncIterator(), onFinish: onFinish) + } + } + + func makeAsyncIterator() -> AsyncIterator { + self._makeAsyncIterator() + } + + struct AsyncIterator: AsyncIteratorProtocol { + private var iterator: any AsyncIteratorProtocol + private var onFinish: (() -> Void)? + + fileprivate init( + wrapping other: Iterator, + onFinish: @escaping () -> Void + ) where Iterator: AsyncIteratorProtocol, Iterator.Element == Element { + self.iterator = other + self.onFinish = onFinish + } + + mutating func next() async throws -> Element? { + let elem = try await self.iterator.next() + + if elem == nil { + self.onFinish?() + self.onFinish = nil + } + + return elem as? Element + } + } +} diff --git a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift index c2414cc65..d91da7051 100644 --- a/Sources/GRPCInterceptors/ServerTracingInterceptor.swift +++ b/Sources/GRPCInterceptors/ServerTracingInterceptor.swift @@ -27,6 +27,10 @@ public struct ServerTracingInterceptor: ServerInterceptor { private let emitEventOnEachWrite: Bool /// Create a new instance of a ``ServerTracingInterceptor``. + /// + /// - Parameter emitEventOnEachWrite: If `true`, each response part sent and request part + /// received will be recorded as a separate event in a tracing span. Otherwise, only the request/response + /// start and end will be recorded as events. public init(emitEventOnEachWrite: Bool = false) { self.extractor = ServerRequestExtractor() self.emitEventOnEachWrite = emitEventOnEachWrite @@ -59,10 +63,23 @@ public struct ServerTracingInterceptor: ServerInterceptor { context: serviceContext, ofKind: .server ) { span in - span.addEvent("Received request") + span.addEvent("Received request start") + + var request = request + + if self.emitEventOnEachWrite { + request.messages = RPCAsyncSequence( + wrapping: request.messages.map { element in + span.addEvent("Received request part") + return element + } + ) + } var response = try await next(request, context) + span.addEvent("Received request end") + switch response.accepted { case .success(var success): let wrappedProducer = success.producer @@ -79,15 +96,29 @@ public struct ServerTracingInterceptor: ServerInterceptor { } ) - let wrappedResult = try await wrappedProducer( - RPCWriter(wrapping: eventEmittingWriter) - ) + let wrappedResult: Metadata + do { + wrappedResult = try await wrappedProducer( + RPCWriter(wrapping: eventEmittingWriter) + ) + } catch { + span.addEvent("Error encountered") + throw error + } + span.addEvent("Sent response end") return wrappedResult } } else { success.producer = { writer in - let wrappedResult = try await wrappedProducer(writer) + let wrappedResult: Metadata + do { + wrappedResult = try await wrappedProducer(writer) + } catch { + span.addEvent("Error encountered") + throw error + } + span.addEvent("Sent response end") return wrappedResult } diff --git a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift index 3e7aa6159..9aacff2d7 100644 --- a/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift +++ b/Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness.swift @@ -69,13 +69,11 @@ struct ClientRPCExecutorTestHarness { func unary( request: ClientRequest.Single<[UInt8]>, configuration: MethodConfiguration? = nil, - interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( request: ClientRequest.Stream(single: request), - configuration: configuration, - interceptors: interceptors + configuration: configuration ) { response in try await handler(ClientResponse.Single(stream: response)) } @@ -84,13 +82,11 @@ struct ClientRPCExecutorTestHarness { func clientStreaming( request: ClientRequest.Stream<[UInt8]>, configuration: MethodConfiguration? = nil, - interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Single<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( request: request, - configuration: configuration, - interceptors: interceptors + configuration: configuration ) { response in try await handler(ClientResponse.Single(stream: response)) } @@ -99,13 +95,11 @@ struct ClientRPCExecutorTestHarness { func serverStreaming( request: ClientRequest.Single<[UInt8]>, configuration: MethodConfiguration? = nil, - interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.bidirectional( request: ClientRequest.Stream(single: request), - configuration: configuration, - interceptors: interceptors + configuration: configuration ) { response in try await handler(response) } @@ -114,7 +108,6 @@ struct ClientRPCExecutorTestHarness { func bidirectional( request: ClientRequest.Stream<[UInt8]>, configuration: MethodConfiguration? = nil, - interceptors: [any ClientInterceptor] = [], handler: @escaping @Sendable (ClientResponse.Stream<[UInt8]>) async throws -> Void ) async throws { try await self.execute( @@ -122,7 +115,6 @@ struct ClientRPCExecutorTestHarness { serializer: IdentitySerializer(), deserializer: IdentityDeserializer(), configuration: configuration, - interceptors: interceptors, handler: handler ) } @@ -132,7 +124,6 @@ struct ClientRPCExecutorTestHarness { serializer: some MessageSerializer, deserializer: some MessageDeserializer, configuration: MethodConfiguration?, - interceptors: [any ClientInterceptor], handler: @escaping @Sendable (ClientResponse.Stream) async throws -> Void ) async throws { try await withThrowingTaskGroup(of: Void.self) { group in @@ -169,7 +160,7 @@ struct ClientRPCExecutorTestHarness { serializer: serializer, deserializer: deserializer, transport: self.clientTransport, - interceptors: interceptors, + interceptors: [], handler: handler ) diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index e6f67e020..633b7d01e 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -51,7 +51,12 @@ final class TracingInterceptorTests: XCTestCase { return .init( metadata: [], - bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) })) + bodyParts: .init( + wrapping: AsyncStream { cont in + cont.yield(.message(["response"])) + cont.finish() + } + ) ) } @@ -64,8 +69,10 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertNil(element) var messages = response.messages.makeAsyncIterator() - let message = try await messages.next() + var message = try await messages.next() XCTAssertEqual(message, ["response"]) + message = try await messages.next() + XCTAssertNil(message) let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( @@ -103,7 +110,12 @@ final class TracingInterceptorTests: XCTestCase { return .init( metadata: [], - bodyParts: .init(wrapping: AsyncStream(unfolding: { .message(["response"]) })) + bodyParts: .init( + wrapping: AsyncStream { cont in + cont.yield(.message(["response"])) + cont.finish() + } + ) ) } @@ -116,8 +128,10 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertNil(element) var messages = response.messages.makeAsyncIterator() - let message = try await messages.next() + var message = try await messages.next() XCTAssertEqual(message, ["response"]) + message = try await messages.next() + XCTAssertNil(message) let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( @@ -130,7 +144,11 @@ final class TracingInterceptorTests: XCTestCase { // Recorded when `request2` is sent "Sending request part", "Sent request part", - // Recorded at end of `producer` + // Recorded after all request parts have been sent + "Request end", + // Recorded when receiving response part + "Received response part", + // Recorded at end of response "Received response end", ] ) @@ -151,7 +169,8 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual( tracer.latestSpanEvents.map { $0.name }, [ - "Received request", + "Received request start", + "Received request end", "Sent error response", ] ) @@ -208,7 +227,8 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual( tracer.latestSpanEvents.map { $0.name }, [ - "Received request", + "Received request start", + "Received request end", "Sent response end", ] ) @@ -265,7 +285,8 @@ final class TracingInterceptorTests: XCTestCase { XCTAssertEqual( tracer.latestSpanEvents.map { $0.name }, [ - "Received request", + "Received request start", + "Received request end", // Recorded when `response1` is sent "Sending response part", "Sent response part", diff --git a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift index a41156016..f8b9afd19 100644 --- a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift +++ b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift @@ -17,7 +17,7 @@ import GRPCCore import Tracing -class TestTracer: Tracer { +final class TestTracer: Tracer { typealias Span = TestSpan private var latestTestSpan: TestSpan? From ce315322d6c42ef58fd359bb6039956dccc0ccc9 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 18 Jan 2024 10:32:10 +0000 Subject: [PATCH 13/14] Make TestTracer thread-safe --- Package.swift | 1 + .../TracingInterceptorTests.swift | 50 +++++++++++++++---- .../TracingTestsUtilities.swift | 15 ++++-- 3 files changed, 51 insertions(+), 15 deletions(-) diff --git a/Package.swift b/Package.swift index b3920460d..cd9be155f 100644 --- a/Package.swift +++ b/Package.swift @@ -297,6 +297,7 @@ extension Target { dependencies: [ .grpcCore, .tracing, + .nioCore, .grpcInterceptors ] ) diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 633b7d01e..11dfbc5ab 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -34,12 +34,16 @@ final class TracingInterceptorTests: XCTestCase { serviceContext.traceID = traceIDString try await ServiceContext.withValue(serviceContext) { + let methodDescriptor = MethodDescriptor( + service: "TracingInterceptorTests", + method: "testClientInterceptor" + ) let response = try await interceptor.intercept( request: .init(producer: { writer in try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: .init(descriptor: .init(service: "foo", method: "bar")) + context: .init(descriptor: methodDescriptor) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) @@ -76,7 +80,9 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.latestSpanEvents.map { $0.name }, + tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + $0.name + }, [ "Request started", "Received response end", @@ -86,6 +92,10 @@ final class TracingInterceptorTests: XCTestCase { } func testClientInterceptorAllEventsRecorded() async throws { + let methodDescriptor = MethodDescriptor( + service: "TracingInterceptorTests", + method: "testClientInterceptorAllEventsRecorded" + ) var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString let interceptor = ClientTracingInterceptor(emitEventOnEachWrite: true) @@ -98,7 +108,7 @@ final class TracingInterceptorTests: XCTestCase { try await writer.write(contentsOf: ["request1"]) try await writer.write(contentsOf: ["request2"]) }), - context: .init(descriptor: .init(service: "foo", method: "bar")) + context: .init(descriptor: methodDescriptor) ) { stream, _ in // Assert the metadata contains the injected context key-value. XCTAssertEqual(stream.metadata, ["trace-id": "\(traceIDString)"]) @@ -135,7 +145,9 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.latestSpanEvents.map { $0.name }, + tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + $0.name + }, [ "Request started", // Recorded when `request1` is sent @@ -156,10 +168,14 @@ final class TracingInterceptorTests: XCTestCase { } func testServerInterceptorErrorResponse() async throws { + let methodDescriptor = MethodDescriptor( + service: "TracingInterceptorTests", + method: "testServerInterceptorErrorResponse" + ) let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: false) let response = try await interceptor.intercept( request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), - context: .init(descriptor: .init(service: "foo", method: "bar")) + context: .init(descriptor: methodDescriptor) ) { _, _ in ServerResponse.Stream(error: .init(code: .unknown, message: "Test error")) } @@ -167,7 +183,9 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.latestSpanEvents.map { $0.name }, + tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + $0.name + }, [ "Received request start", "Received request end", @@ -177,11 +195,15 @@ final class TracingInterceptorTests: XCTestCase { } func testServerInterceptor() async throws { + let methodDescriptor = MethodDescriptor( + service: "TracingInterceptorTests", + method: "testServerInterceptor" + ) let (stream, continuation) = AsyncStream.makeStream() let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: false) let response = try await interceptor.intercept( request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), - context: .init(descriptor: .init(service: "foo", method: "bar")) + context: .init(descriptor: methodDescriptor) ) { _, _ in { [serviceContext = ServiceContext.current] in return ServerResponse.Stream( @@ -225,7 +247,9 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.latestSpanEvents.map { $0.name }, + tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + $0.name + }, [ "Received request start", "Received request end", @@ -235,11 +259,15 @@ final class TracingInterceptorTests: XCTestCase { } func testServerInterceptorAllEventsRecorded() async throws { + let methodDescriptor = MethodDescriptor( + service: "TracingInterceptorTests", + method: "testServerInterceptorAllEventsRecorded" + ) let (stream, continuation) = AsyncStream.makeStream() let interceptor = ServerTracingInterceptor(emitEventOnEachWrite: true) let response = try await interceptor.intercept( request: .init(single: .init(metadata: ["trace-id": "some-trace-id"], message: [])), - context: .init(descriptor: .init(service: "foo", method: "bar")) + context: .init(descriptor: methodDescriptor) ) { _, _ in { [serviceContext = ServiceContext.current] in return ServerResponse.Stream( @@ -283,7 +311,9 @@ final class TracingInterceptorTests: XCTestCase { let tracer = InstrumentationSystem.tracer as! TestTracer XCTAssertEqual( - tracer.latestSpanEvents.map { $0.name }, + tracer.getEventsForTestSpan(ofOperationName: methodDescriptor.fullyQualifiedMethod).map { + $0.name + }, [ "Received request start", "Received request end", diff --git a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift index f8b9afd19..ffed5213a 100644 --- a/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift +++ b/Tests/GRPCInterceptorsTests/TracingTestsUtilities.swift @@ -15,14 +15,16 @@ */ import GRPCCore +import NIOConcurrencyHelpers import Tracing final class TestTracer: Tracer { typealias Span = TestSpan - private var latestTestSpan: TestSpan? - var latestSpanEvents: [SpanEvent] { - self.latestTestSpan?.events ?? [] + private var testSpans: NIOLockedValueBox<[String: TestSpan]> = .init([:]) + + func getEventsForTestSpan(ofOperationName operationName: String) -> [SpanEvent] { + self.testSpans.withLockedValue({ $0[operationName] })?.events ?? [] } func extract( @@ -57,8 +59,11 @@ final class TestTracer: Tracer { file fileID: String, line: UInt ) -> TestSpan where Instant: TracerInstant { - self.latestTestSpan = TestSpan(context: context(), operationName: operationName) - return latestTestSpan! + return self.testSpans.withLockedValue { testSpans in + let span = TestSpan(context: context(), operationName: operationName) + testSpans[operationName] = span + return span + } } } From 859a0fe8d6c17f1c11e933ba0df118118b515488 Mon Sep 17 00:00:00 2001 From: Gus Cairo Date: Thu, 18 Jan 2024 10:52:40 +0000 Subject: [PATCH 14/14] Don't run client tests in 5.7 --- Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift index 11dfbc5ab..98fbb2558 100644 --- a/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift +++ b/Tests/GRPCInterceptorsTests/TracingInterceptorTests.swift @@ -26,6 +26,7 @@ final class TracingInterceptorTests: XCTestCase { InstrumentationSystem.bootstrap(TestTracer()) } + #if swift(>=5.8) // Compiling these tests fails in 5.7 func testClientInterceptor() async throws { var serviceContext = ServiceContext.topLevel let traceIDString = UUID().uuidString @@ -166,6 +167,7 @@ final class TracingInterceptorTests: XCTestCase { ) } } + #endif // swift >= 5.7 func testServerInterceptorErrorResponse() async throws { let methodDescriptor = MethodDescriptor(