From b1a3c58ea130d13ca45861671e3195d625f7b266 Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Tue, 5 Mar 2024 11:15:33 +0100 Subject: [PATCH 1/5] Refs #20589: Set real non_blocking_send limitation Signed-off-by: Jesus Perez --- src/cpp/rtps/transport/TCPChannelResource.cpp | 3 ++- test/unittest/transport/TCPv4Tests.cpp | 6 ++++-- test/unittest/transport/TCPv6Tests.cpp | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index 2bb1e4b5db6..efc7a6903b9 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -362,7 +362,8 @@ bool TCPChannelResource::check_socket_send_buffer( size_t future_queue_size = size_t(bytesInSendQueue) + msg_size; - if (future_queue_size > size_t(parent_->configuration()->sendBufferSize)) + // TCP actually allocates twice the size of the buffer requested. + if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize)) { return false; } diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index f0125f170db..7f8aec3512e 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1434,7 +1434,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // and we want to guarantee that the buffer might be full, we send the message more than twice. for (int i = 0; i < 5; i++) { sender_channel_resource->send(nullptr, 0, data, size, ec); @@ -1932,7 +1933,8 @@ TEST_F(TCPv4Tests, non_blocking_send) const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // and we want to guarantee that the buffer might be full, we send the message more than twice. for (int i = 0; i < 5; i++) { sender_channel_resource->send(nullptr, 0, data, size, ec); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 7486ed20468..facca1cb7f0 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -372,7 +372,8 @@ TEST_F(TCPv6Tests, non_blocking_send) const octet* data = message.data(); size_t size = message.size(); - // Send the message with no header + // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested + // and we want to guarantee that the buffer might be full, we send the message more than twice. for (int i = 0; i < 5; i++) { sender_channel_resource->send(nullptr, 0, data, size, ec); From 28139a7d4b9d6101de4bfac523aa6fa51d1d3731 Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Tue, 5 Mar 2024 11:31:44 +0100 Subject: [PATCH 2/5] Refs #20589: Readapt test Signed-off-by: Jesus Perez --- test/unittest/transport/TCPv4Tests.cpp | 10 ++++++++-- test/unittest/transport/TCPv6Tests.cpp | 5 ++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 7f8aec3512e..2d5859f019c 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1436,11 +1436,14 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested // and we want to guarantee that the buffer might be full, we send the message more than twice. + size_t bytes_sent = 0; for (int i = 0; i < 5; i++) { - sender_channel_resource->send(nullptr, 0, data, size, ec); + bytes_sent += sender_channel_resource->send(nullptr, 0, data, size, ec); } + ASSERT_EQ(bytes_sent, size * 2); + secure_socket->lowest_layer().close(ec); } #endif // ifndef _WIN32 @@ -1935,11 +1938,14 @@ TEST_F(TCPv4Tests, non_blocking_send) // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested // and we want to guarantee that the buffer might be full, we send the message more than twice. + size_t bytes_sent = 0; for (int i = 0; i < 5; i++) { - sender_channel_resource->send(nullptr, 0, data, size, ec); + bytes_sent += sender_channel_resource->send(nullptr, 0, data, size, ec); } + ASSERT_EQ(bytes_sent, size * 2); + socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel(); socket.close(); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index facca1cb7f0..0812c929328 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -374,11 +374,14 @@ TEST_F(TCPv6Tests, non_blocking_send) // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested // and we want to guarantee that the buffer might be full, we send the message more than twice. + size_t bytes_sent = 0; for (int i = 0; i < 5; i++) { - sender_channel_resource->send(nullptr, 0, data, size, ec); + bytes_sent += sender_channel_resource->send(nullptr, 0, data, size, ec); } + ASSERT_EQ(bytes_sent, size * 2); + socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel(); socket.close(); From 432d600b0212c3a547697c96478cf0bd366fdc4b Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Mon, 25 Mar 2024 15:16:46 +0100 Subject: [PATCH 3/5] Refs #20589: Fix failing test in macos/linux Signed-off-by: Jesus Perez --- test/unittest/transport/TCPv4Tests.cpp | 97 +++++++++++++++++--------- test/unittest/transport/TCPv6Tests.cpp | 31 ++++---- 2 files changed, 82 insertions(+), 46 deletions(-) diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 2d5859f019c..ff4d782e393 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1330,6 +1330,8 @@ TEST_F(TCPv4Tests, send_and_receive_between_both_secure_ports_with_sni) // destination that does not read or does it so slowly. TEST_F(TCPv4Tests, secure_non_blocking_send) { + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info); + uint16_t port = g_default_port; uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer; // Create a TCP Server transport @@ -1338,10 +1340,13 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole; TCPv4TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); + senderDescriptor.apply_security = true; senderDescriptor.non_blocking_send = true; senderDescriptor.sendBufferSize = msg_size; - senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT; - senderDescriptor.tls_config.verify_file = "ca.crt"; + senderDescriptor.tls_config.password = "fastddspwd"; + senderDescriptor.tls_config.cert_chain_file = "fastdds.crt"; + senderDescriptor.tls_config.private_key_file = "fastdds.key"; + senderDescriptor.tls_config.tmp_dh_file = "dh_params.pem"; senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER; senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS); senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE); @@ -1356,8 +1361,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(serverLoc, 127, 0, 0, 1); @@ -1370,13 +1375,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) { return preverified; }); - ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose) - { - return "fastddspwd"; - }); - ssl_context.use_certificate_chain_file("fastdds.crt"); - ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem); - ssl_context.use_tmp_dh_file("dh_params.pem"); + ssl_context.load_verify_file("ca.crt"); uint32_t options = 0; options |= asio::ssl::context::default_workarounds; @@ -1385,8 +1384,19 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) options |= asio::ssl::context::no_compression; ssl_context.set_options(options); - // TCPChannelResourceSecure::connect() like connection asio::io_service io_service; + auto ioServiceFunction = [&]() + { +#if ASIO_VERSION >= 101200 + asio::executor_work_guard work(io_service.get_executor()); +#else + io_service::work work(io_service_); +#endif // if ASIO_VERSION >= 101200 + io_service.run(); + }; + std::thread ioServiceThread(ioServiceFunction); + + // TCPChannelResourceSecure::connect() like connection asio::ip::tcp::resolver resolver(io_service); auto endpoints = resolver.resolve( IPLocator::ip_to_string(serverLoc), @@ -1407,7 +1417,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) ) { ASSERT_TRUE(!ec); - asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server; + asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::client; secure_socket->async_handshake(role, [](const std::error_code& ec) { @@ -1423,6 +1433,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) a connection. This channel will not be present in the server's channel_resources_ map as communication lacks most of the discovery messages using a raw socket as participant. */ + // auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources(); auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources(); ASSERT_TRUE(sender_unbound_channel_resources.size() == 1u); auto sender_channel_resource = @@ -1430,21 +1441,32 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size*2, 0); const octet* data = message.data(); size_t size = message.size(); // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested - // and we want to guarantee that the buffer might be full, we send the message more than twice. - size_t bytes_sent = 0; - for (int i = 0; i < 5; i++) - { - bytes_sent += sender_channel_resource->send(nullptr, 0, data, size, ec); - } - - ASSERT_EQ(bytes_sent, size * 2); + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = 0; + bytes_read = asio::read(*secure_socket, asio::buffer(buffer.data(), size), asio::transfer_exactly(size), ec); + ASSERT_EQ(ec, asio::error_code()); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size*2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); secure_socket->lowest_layer().close(ec); + io_service.stop(); + ioServiceThread.join(); } #endif // ifndef _WIN32 @@ -1866,7 +1888,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) } #ifndef _WIN32 -// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a +// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a // destination that does not read or does it so slowly. TEST_F(TCPv4Tests, non_blocking_send) { @@ -1886,8 +1908,8 @@ TEST_F(TCPv4Tests, non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv4; IPLocator::setIPv4(serverLoc, 127, 0, 0, 1); @@ -1932,19 +1954,26 @@ TEST_F(TCPv4Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size*2, 0); const octet* data = message.data(); size_t size = message.size(); // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested - // and we want to guarantee that the buffer might be full, we send the message more than twice. - size_t bytes_sent = 0; - for (int i = 0; i < 5; i++) - { - bytes_sent += sender_channel_resource->send(nullptr, 0, data, size, ec); - } - - ASSERT_EQ(bytes_sent, size * 2); + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size*2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel(); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 0812c929328..3b82193d370 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -302,7 +302,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness) } #ifndef _WIN32 -// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a +// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a // destination that does not read or does it so slowly. TEST_F(TCPv6Tests, non_blocking_send) { @@ -322,8 +322,8 @@ TEST_F(TCPv6Tests, non_blocking_send) // saturate the reception socket of the datareader. This saturation requires // preventing the datareader from reading from the socket, what inevitably // happens continuously if instantiating and connecting the receiver transport. - // Hence, a raw socket is opened and connected to the server. There won't be read - // calls on that socket. + // Hence, a raw socket is opened and connected to the server. Read calls on that + // socket are controlled. Locator_t serverLoc; serverLoc.kind = LOCATOR_KIND_TCPv6; IPLocator::setIPv6(serverLoc, "::1"); @@ -368,19 +368,26 @@ TEST_F(TCPv6Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size, 0); + std::vector message(msg_size*2, 0); const octet* data = message.data(); size_t size = message.size(); // Send the message with no header. Since TCP actually allocates twice the size of the buffer requested - // and we want to guarantee that the buffer might be full, we send the message more than twice. - size_t bytes_sent = 0; - for (int i = 0; i < 5; i++) - { - bytes_sent += sender_channel_resource->send(nullptr, 0, data, size, ec); - } - - ASSERT_EQ(bytes_sent, size * 2); + // it should be able to send a message of msg_size*2. + size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, size); + + // Now wait until the receive buffer is flushed (send buffer will be empty too) + std::vector buffer(size, 0); + size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); + ASSERT_EQ(bytes_read, size); + + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size*2 + 1); + data = message.data(); + size = message.size(); + bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); + ASSERT_EQ(bytes_sent, 0u); socket.shutdown(asio::ip::tcp::socket::shutdown_both); socket.cancel(); From 6b967d8f9d1b0b491abe80c7f432851a7071d679 Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Mon, 25 Mar 2024 15:47:17 +0100 Subject: [PATCH 4/5] Refs #20589: Uncrustify Signed-off-by: Jesus Perez --- test/unittest/transport/TCPv4Tests.cpp | 14 +++++++------- test/unittest/transport/TCPv6Tests.cpp | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index ff4d782e393..316a9716176 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1394,7 +1394,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) #endif // if ASIO_VERSION >= 101200 io_service.run(); }; - std::thread ioServiceThread(ioServiceFunction); + std::thread ioServiceThread(ioServiceFunction); // TCPChannelResourceSecure::connect() like connection asio::ip::tcp::resolver resolver(io_service); @@ -1441,7 +1441,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size*2, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); @@ -1457,8 +1457,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) ASSERT_EQ(ec, asio::error_code()); ASSERT_EQ(bytes_read, size); - // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize - message.resize(msg_size*2 + 1); + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); data = message.data(); size = message.size(); bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); @@ -1954,7 +1954,7 @@ TEST_F(TCPv4Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size*2, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); @@ -1968,8 +1968,8 @@ TEST_F(TCPv4Tests, non_blocking_send) size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); ASSERT_EQ(bytes_read, size); - // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize - message.resize(msg_size*2 + 1); + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); data = message.data(); size = message.size(); bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); diff --git a/test/unittest/transport/TCPv6Tests.cpp b/test/unittest/transport/TCPv6Tests.cpp index 3b82193d370..c88a79ff063 100644 --- a/test/unittest/transport/TCPv6Tests.cpp +++ b/test/unittest/transport/TCPv6Tests.cpp @@ -368,7 +368,7 @@ TEST_F(TCPv6Tests, non_blocking_send) // Prepare the message asio::error_code ec; - std::vector message(msg_size*2, 0); + std::vector message(msg_size * 2, 0); const octet* data = message.data(); size_t size = message.size(); @@ -382,8 +382,8 @@ TEST_F(TCPv6Tests, non_blocking_send) size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec); ASSERT_EQ(bytes_read, size); - // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize - message.resize(msg_size*2 + 1); + // Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize + message.resize(msg_size * 2 + 1); data = message.data(); size = message.size(); bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec); From 854dd37b4d3b93947953ff1eb17443bcf9d0b60f Mon Sep 17 00:00:00 2001 From: Jesus Perez Date: Tue, 26 Mar 2024 08:11:02 +0100 Subject: [PATCH 5/5] Refs #20589: Fix unused variable Signed-off-by: Jesus Perez --- test/unittest/transport/TCPv4Tests.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 316a9716176..7c6ad31efb0 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1337,7 +1337,6 @@ TEST_F(TCPv4Tests, secure_non_blocking_send) // Create a TCP Server transport using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions; using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode; - using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole; TCPv4TransportDescriptor senderDescriptor; senderDescriptor.add_listener_port(port); senderDescriptor.apply_security = true;