From 52da11687ed5638ee457cc4fcb97dca8e9527c40 Mon Sep 17 00:00:00 2001 From: numanelahi Date: Thu, 17 Apr 2025 11:09:10 +0000 Subject: [PATCH 1/2] Add SSE style delimiting for message streaming --- .../response_to_json_translator.h | 7 +- src/response_to_json_translator.cc | 38 ++++++++-- test/response_to_json_translator_test.cc | 75 +++++++++++++++++++ 3 files changed, 113 insertions(+), 7 deletions(-) diff --git a/src/include/grpc_transcoding/response_to_json_translator.h b/src/include/grpc_transcoding/response_to_json_translator.h index cb5f435..cec6cab 100644 --- a/src/include/grpc_transcoding/response_to_json_translator.h +++ b/src/include/grpc_transcoding/response_to_json_translator.h @@ -70,6 +70,11 @@ struct JsonResponseTranslateOptions { // If set to false, all streaming messages are treated as a JSON array and // separated by comma. bool stream_newline_delimited; + + // If true, enforces Server-Sent Events (SSE) message framing (`data: \n\n`) + // and, `stream_newline_delimited` is ignored. + // If false, message framing is determined by `stream_newline_delimited`. + bool stream_sse_style_delimited; }; class ResponseToJsonTranslator : public MessageStream { @@ -84,7 +89,7 @@ class ResponseToJsonTranslator : public MessageStream { ::google::protobuf::util::TypeResolver* type_resolver, std::string type_url, bool streaming, TranscoderInputStream* in, const JsonResponseTranslateOptions& options = { - ::google::protobuf::util::JsonPrintOptions(), false}); + ::google::protobuf::util::JsonPrintOptions(), false, false}); // MessageStream implementation bool NextMessage(std::string* message); diff --git a/src/response_to_json_translator.cc b/src/response_to_json_translator.cc index 720a855..6e9604e 100644 --- a/src/response_to_json_translator.cc +++ b/src/response_to_json_translator.cc @@ -65,10 +65,11 @@ bool ResponseToJsonTranslator::NextMessage(std::string* message) { return false; } } else if (streaming_ && reader_.Finished()) { - if (!options_.stream_newline_delimited) { - // This is a non-newline-delimited streaming call and the input is - // finished. Return the final ']' - // or "[]" in case this was an empty stream. + if (!options_.stream_newline_delimited && + !options_.stream_sse_style_delimited) { + // This is a non-newline-delimited and non-SSE-style-delimited streaming + // call and the input is finished. Return the final ']' or "[]" in case + // this was an empty stream. *message = first_ ? "[]" : "]"; } finished_ = true; @@ -95,6 +96,16 @@ bool WriteChar(::google::protobuf::io::ZeroCopyOutputStream* stream, char c) { return true; } +// A helper to write a string to a ZeroCopyOutputStream. +bool WriteString(::google::protobuf::io::ZeroCopyOutputStream* stream, std::string str) { + for (auto c : str) { + if (!WriteChar(stream, c)) { + return false; + } + } + return true; +} + } // namespace bool ResponseToJsonTranslator::TranslateMessage( @@ -102,7 +113,22 @@ bool ResponseToJsonTranslator::TranslateMessage( std::string* json_out) { ::google::protobuf::io::StringOutputStream json_stream(json_out); - if (streaming_ && !options_.stream_newline_delimited) { + if (streaming_ && options_.stream_sse_style_delimited) { + if (first_) { + if (!WriteString(&json_stream, "data: ")) { + status_ = absl::Status(absl::StatusCode::kInternal, + "Failed to build the response message."); + return false; + } + first_ = false; + } else { + if (!WriteString(&json_stream, "\n\ndata: ")) { + status_ = absl::Status(absl::StatusCode::kInternal, + "Failed to build the response message."); + return false; + } + } + } else if (streaming_ && !options_.stream_newline_delimited) { if (first_) { // This is a non-newline-delimited streaming call and this is the first // message, so prepend the @@ -134,7 +160,7 @@ bool ResponseToJsonTranslator::TranslateMessage( } // Append a newline delimiter after the message if needed. - if (streaming_ && options_.stream_newline_delimited) { + if (streaming_ && options_.stream_newline_delimited && !options_.stream_sse_style_delimited) { if (!WriteChar(&json_stream, '\n')) { status_ = absl::Status(absl::StatusCode::kInternal, "Failed to build the response message."); diff --git a/test/response_to_json_translator_test.cc b/test/response_to_json_translator_test.cc index edaf450..29de17d 100644 --- a/test/response_to_json_translator_test.cc +++ b/test/response_to_json_translator_test.cc @@ -911,6 +911,81 @@ TEST_F(ResponseToJsonTranslatorTest, StreamingNewlineDelimitedDirectTest) { EXPECT_FALSE(translator.NextMessage(&message)); } +TEST_F(ResponseToJsonTranslatorTest, StreamingSSEStyleDelimitedDirectTest) { + // Load the service config + ::google::api::Service service; + ASSERT_TRUE( + transcoding::testing::LoadService("bookstore_service.pb.txt", &service)); + + // Create a TypeHelper using the service config + TypeHelper type_helper(service.types(), service.enums()); + + // Messages to test + auto test_message1 = + GenerateGrpcMessage(R"(name : "1" theme : "Fiction")"); + auto test_message2 = + GenerateGrpcMessage(R"(name : "2" theme : "Fantasy")"); + auto test_message3 = + GenerateGrpcMessage(R"(name : "3" theme : "Children")"); + auto test_message4 = + GenerateGrpcMessage(R"(name : "4" theme : "Classics")"); + + TestZeroCopyInputStream input_stream; + ResponseToJsonTranslator translator( + type_helper.Resolver(), "type.googleapis.com/Shelf", true, &input_stream, + {pbutil::JsonPrintOptions(), true, true}); + + std::string message; + // There is nothing translated + EXPECT_FALSE(translator.NextMessage(&message)); + + // Add test_message1 to the stream + input_stream.AddChunk(test_message1); + + // Now we should have the test_message1 translated + EXPECT_TRUE(translator.NextMessage(&message)); + EXPECT_EQ("data: {\"name\":\"1\",\"theme\":\"Fiction\"}", message); + + // No more messages, but not finished yet + EXPECT_FALSE(translator.NextMessage(&message)); + EXPECT_FALSE(translator.Finished()); + + // Add the test_message2, test_message3 and part of test_message4 + input_stream.AddChunk(test_message2); + input_stream.AddChunk(test_message3); + input_stream.AddChunk(test_message4.substr(0, 10)); + + // Now we should have test_message2 & test_message3 translated + EXPECT_TRUE(translator.NextMessage(&message)); + EXPECT_EQ("\n\ndata: {\"name\":\"2\",\"theme\":\"Fantasy\"}", message); + + EXPECT_TRUE(translator.NextMessage(&message)); + EXPECT_EQ("\n\ndata: {\"name\":\"3\",\"theme\":\"Children\"}", message); + + // No more messages, but not finished yet + EXPECT_FALSE(translator.NextMessage(&message)); + EXPECT_FALSE(translator.Finished()); + + // Add the rest of test_message4 + input_stream.AddChunk(test_message4.substr(10)); + + // Now we should have the test_message4 translated + EXPECT_TRUE(translator.NextMessage(&message)); + EXPECT_EQ("\n\ndata: {\"name\":\"4\",\"theme\":\"Classics\"}", message); + + // No more messages, but not finished yet + EXPECT_FALSE(translator.NextMessage(&message)); + EXPECT_FALSE(translator.Finished()); + + // Now finish the stream + input_stream.Finish(); + + // All done! + EXPECT_TRUE(translator.NextMessage(&message)); + EXPECT_TRUE(translator.Finished()); + EXPECT_FALSE(translator.NextMessage(&message)); +} + TEST_F(ResponseToJsonTranslatorTest, Streaming5KMessages) { // Load the service config ::google::api::Service service; From 293bd5c2168bef05f4d8bab85f8cfc317ad29f10 Mon Sep 17 00:00:00 2001 From: numanelahi Date: Wed, 30 Apr 2025 13:36:44 +0000 Subject: [PATCH 2/2] Optimize the WriteStr function --- src/response_to_json_translator.cc | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/response_to_json_translator.cc b/src/response_to_json_translator.cc index 6e9604e..fcb0eec 100644 --- a/src/response_to_json_translator.cc +++ b/src/response_to_json_translator.cc @@ -16,6 +16,8 @@ // #include "grpc_transcoding/response_to_json_translator.h" +#include +#include #include #include "google/protobuf/io/zero_copy_stream_impl_lite.h" @@ -97,11 +99,23 @@ bool WriteChar(::google::protobuf::io::ZeroCopyOutputStream* stream, char c) { } // A helper to write a string to a ZeroCopyOutputStream. -bool WriteString(::google::protobuf::io::ZeroCopyOutputStream* stream, std::string str) { - for (auto c : str) { - if (!WriteChar(stream, c)) { +bool WriteString(::google::protobuf::io::ZeroCopyOutputStream* stream, + const std::string& str) { + int bytes_to_write = str.size(); + int bytes_written = 0; + while (bytes_written < bytes_to_write) { + int size = 0; + void* data; + if (!stream->Next(&data, &size) || size == 0) { return false; } + int bytes_to_write_this_iteration = + std::min(bytes_to_write - bytes_written, size); + memcpy(data, str.data() + bytes_written, bytes_to_write_this_iteration); + bytes_written += bytes_to_write_this_iteration; + if (bytes_to_write_this_iteration < size) { + stream->BackUp(size - bytes_to_write_this_iteration); + } } return true; }