diff --git a/build.c b/build.c index fd174ff10..a8fbba68c 100644 --- a/build.c +++ b/build.c @@ -9,7 +9,7 @@ int main(int argc, char **argv) { char *CXX = strncpy(calloc(1024, 1), or_else(getenv("CXX"), "g++"), 1024); char *EXEC_SUFFIX = strncpy(calloc(1024, 1), maybe(getenv("EXEC_SUFFIX")), 1024); - char *EXAMPLE_FILES[] = {"EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName", + char *EXAMPLE_FILES[] = {"Precompress", "EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName", "EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"}; strcat(CXXFLAGS, " -march=native -O3 -Wpedantic -Wall -Wextra -Wsign-conversion -Wconversion -std=c++20 -Isrc -IuSockets/src"); diff --git a/examples/Precompress.cpp b/examples/Precompress.cpp new file mode 100644 index 000000000..d169b5d1e --- /dev/null +++ b/examples/Precompress.cpp @@ -0,0 +1,77 @@ +#include "App.h" + +int main() { + /* ws->getUserData returns one of these */ + struct PerSocketData { + /* Fill with user data */ + }; + + /* Keeping track of last precompressed message both in original and compressed format */ + std::string originalMessage; + std::string compressedMessage; + std::mutex m; + + /* For demo, we create a thread that will update the precompressed message every second */ + std::thread t2([&originalMessage, &compressedMessage, &m]() { + uWS::ZlibContext zlibContext; + uWS::DeflationStream compressor(uWS::DEDICATED_COMPRESSOR); + int counter = 0; + + while (true) { + counter++; + + m.lock(); + originalMessage = "Hello you are looking at message number " + std::to_string(counter) + " and this text should be precompressed"; + compressedMessage = compressor.deflate(&zlibContext, {originalMessage.data(), originalMessage.length()}, true); + m.unlock(); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + + }); + + uWS::App().ws("/*", { + /* You must only use SHARED_COMPRESSOR with precompression (can't use dedicated_compressor) */ + .compression = uWS::CompressOptions(uWS::SHARED_COMPRESSOR | uWS::DEDICATED_DECOMPRESSOR), + /* Handlers */ + .upgrade = nullptr, + .open = [](auto */*ws*/) { + /* Open event here, you may access ws->getUserData() which points to a PerSocketData struct */ + + }, + .message = [&originalMessage, &compressedMessage, &m](auto *ws, std::string_view message, uWS::OpCode opCode) { + + /* First respond by echoing what they send us, without compression */ + ws->send(message, opCode, false); + + /* This should be wrapped up into ws->sendPrepared(PreparedMessage) in the future, experimental for now */ + m.lock(); + if (ws->hasNegotiatedCompression() && compressedMessage.length() < originalMessage.length()) { + std::cout << "Responding with precompressed message saving " << (originalMessage.length() - compressedMessage.length()) << " bytes" << std::endl; + ws->send({compressedMessage.data(), compressedMessage.length()}, uWS::OpCode::TEXT, uWS::CompressFlags::ALREADY_COMPRESSED); + } else { + ws->send({originalMessage.data(), originalMessage.length()}, uWS::OpCode::TEXT); + } + m.unlock(); + }, + .dropped = [](auto */*ws*/, std::string_view /*message*/, uWS::OpCode /*opCode*/) { + /* A message was dropped due to set maxBackpressure and closeOnBackpressureLimit limit */ + }, + .drain = [](auto */*ws*/) { + /* Check ws->getBufferedAmount() here */ + }, + .ping = [](auto */*ws*/, std::string_view) { + /* Not implemented yet */ + }, + .pong = [](auto */*ws*/, std::string_view) { + /* Not implemented yet */ + }, + .close = [](auto */*ws*/, int /*code*/, std::string_view /*message*/) { + /* You may access ws->getUserData() here */ + } + }).listen(9001, [&t2](auto *listen_socket) { + if (listen_socket) { + std::cout << "Listening on port " << 9001 << std::endl; + } + }).run(); +} diff --git a/src/WebSocket.h b/src/WebSocket.h index a433e9d4f..08cc13b21 100644 --- a/src/WebSocket.h +++ b/src/WebSocket.h @@ -27,6 +27,13 @@ namespace uWS { +/* Experimental */ +enum CompressFlags : int { + NO_ACTION, + COMPRESS, + ALREADY_COMPRESSED +}; + template struct WebSocket : AsyncSocket { template friend struct TemplatedApp; @@ -87,9 +94,15 @@ struct WebSocket : AsyncSocket { return send(message, CONTINUATION, compress, true); } + /* Experimental */ + bool hasNegotiatedCompression() { + WebSocketData *webSocketData = (WebSocketData *) Super::getAsyncSocketData(); + return webSocketData->compressionStatus == WebSocketData::ENABLED; + } + /* Send or buffer a WebSocket frame, compressed or not. Returns BACKPRESSURE on increased user space backpressure, * DROPPED on dropped message (due to backpressure) or SUCCCESS if you are free to send even more now. */ - SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, bool compress = false, bool fin = true) { + SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) { WebSocketContextData *webSocketContextData = (WebSocketContextData *) us_socket_context_ext(SSL, (us_socket_context_t *) us_socket_context(SSL, (us_socket_t *) this) ); @@ -145,12 +158,15 @@ struct WebSocket : AsyncSocket { /* Check and correct the compress hint. It is never valid to compress 0 bytes */ if (message.length() && opCode < 3 && webSocketData->compressionStatus == WebSocketData::ENABLED) { - LoopData *loopData = Super::getLoopData(); - /* Compress using either shared or dedicated deflationStream */ - if (webSocketData->deflationStream) { - message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); - } else { - message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); + /* If compress is 2 (IS_PRE_COMPRESSED), skip this step (experimental) */ + if (compress != CompressFlags::ALREADY_COMPRESSED) { + LoopData *loopData = Super::getLoopData(); + /* Compress using either shared or dedicated deflationStream */ + if (webSocketData->deflationStream) { + message = webSocketData->deflationStream->deflate(loopData->zlibContext, message, false); + } else { + message = loopData->deflationStream->deflate(loopData->zlibContext, message, true); + } } } else { compress = false;