diff --git a/Sources/GRPCCore/Call/Client/ClientResponse.swift b/Sources/GRPCCore/Call/Client/ClientResponse.swift index b99f6f3ae..73fcde0d5 100644 --- a/Sources/GRPCCore/Call/Client/ClientResponse.swift +++ b/Sources/GRPCCore/Call/Client/ClientResponse.swift @@ -363,7 +363,7 @@ extension StreamingClientResponse { /// Returns the messages received from the server. /// - /// For rejected RPCs the `RPCAsyncSequence` throws a `RPCError``. + /// For rejected RPCs (in other words, where ``accepted`` is `failure`), the `RPCAsyncSequence` throws a ``RPCError``. public var messages: RPCAsyncSequence { switch self.accepted { case let .success(contents): @@ -382,4 +382,19 @@ extension StreamingClientResponse { return RPCAsyncSequence.throwing(error) } } + + /// Returns the body parts (i.e. `messages` and `trailingMetadata`) returned from the server. + /// + /// For rejected RPCs (in other words, where ``accepted`` is `failure`), the `RPCAsyncSequence` throws a ``RPCError``. + public var bodyParts: RPCAsyncSequence { + switch self.accepted { + case let .success(contents): + return contents.bodyParts + + case let .failure(error): + return RPCAsyncSequence.throwing(error) + } + } } + +extension StreamingClientResponse.Contents.BodyPart: Equatable where Message: Equatable {} diff --git a/Tests/GRPCCoreTests/Call/Client/ClientResponseTests.swift b/Tests/GRPCCoreTests/Call/Client/ClientResponseTests.swift index 01557d02c..46b0f19ae 100644 --- a/Tests/GRPCCoreTests/Call/Client/ClientResponseTests.swift +++ b/Tests/GRPCCoreTests/Call/Client/ClientResponseTests.swift @@ -53,7 +53,7 @@ final class ClientResponseTests: XCTestCase { XCTAssertEqual(response.trailingMetadata, ["bar": "baz"]) } - func testAcceptedStreamResponseConvenienceMethods() async throws { + func testAcceptedStreamResponseConvenienceMethods_Messages() async throws { let response = StreamingClientResponse( of: String.self, metadata: ["foo": "bar"], @@ -73,6 +73,29 @@ final class ClientResponseTests: XCTestCase { XCTAssertEqual(messages, ["foo", "bar", "baz"]) } + func testAcceptedStreamResponseConvenienceMethods_BodyParts() async throws { + let response = StreamingClientResponse( + of: String.self, + metadata: ["foo": "bar"], + bodyParts: RPCAsyncSequence( + wrapping: AsyncThrowingStream { + $0.yield(.message("foo")) + $0.yield(.message("bar")) + $0.yield(.message("baz")) + $0.yield(.trailingMetadata(["baz": "baz"])) + $0.finish() + } + ) + ) + + XCTAssertEqual(response.metadata, ["foo": "bar"]) + let bodyParts = try await response.bodyParts.collect() + XCTAssertEqual( + bodyParts, + [.message("foo"), .message("bar"), .message("baz"), .trailingMetadata(["baz": "baz"])] + ) + } + func testRejectedStreamResponseConvenienceMethods() async throws { let error = RPCError(code: .aborted, message: "error message", metadata: ["bar": "baz"]) let response = StreamingClientResponse(of: String.self, error: error) @@ -83,6 +106,11 @@ final class ClientResponseTests: XCTestCase { } errorHandler: { XCTAssertEqual($0, error) } + await XCTAssertThrowsRPCErrorAsync { + try await response.bodyParts.collect() + } errorHandler: { + XCTAssertEqual($0, error) + } } func testStreamToSingleConversionForValidStream() async throws {