Skip to content

Commit 764b1c3

Browse files
mergify[bot]jepemi
andauthored
Set real TCP non_blocking_send limitation (#4502) (#4631)
* Refs #20589: Set real non_blocking_send limitation * Refs #20589: Readapt test * Refs #20589: Fix failing test in macos/linux * Refs #20589: Uncrustify * Refs #20589: Fix unused variable --------- Signed-off-by: Jesus Perez <[email protected]> Co-authored-by: Jesús Pérez <[email protected]>
1 parent eb52759 commit 764b1c3

File tree

3 files changed

+87
-39
lines changed

3 files changed

+87
-39
lines changed

src/cpp/rtps/transport/TCPChannelResource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ bool TCPChannelResource::check_socket_send_buffer(
310310

311311

312312
size_t future_queue_size = size_t(bytesInSendQueue) + msg_size;
313-
if (future_queue_size > size_t(parent_->configuration()->sendBufferSize))
313+
// TCP actually allocates twice the size of the buffer requested.
314+
if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize))
314315
{
315316
return false;
316317
}

test/unittest/transport/TCPv4Tests.cpp

Lines changed: 65 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,17 +1222,21 @@ TEST_F(TCPv4Tests, send_and_receive_between_secure_ports_untrusted_server)
12221222
// destination that does not read or does it so slowly.
12231223
TEST_F(TCPv4Tests, secure_non_blocking_send)
12241224
{
1225+
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Info);
1226+
12251227
uint16_t port = g_default_port;
12261228
uint32_t msg_size = eprosima::fastdds::rtps::s_minimumSocketBuffer;
12271229
// Create a TCP Server transport
12281230
using TLSOptions = TCPTransportDescriptor::TLSConfig::TLSOptions;
12291231
using TLSVerifyMode = TCPTransportDescriptor::TLSConfig::TLSVerifyMode;
1230-
using TLSHSRole = TCPTransportDescriptor::TLSConfig::TLSHandShakeRole;
12311232
TCPv4TransportDescriptor senderDescriptor;
12321233
senderDescriptor.add_listener_port(port);
1234+
senderDescriptor.apply_security = true;
12331235
senderDescriptor.sendBufferSize = msg_size;
1234-
senderDescriptor.tls_config.handshake_role = TLSHSRole::CLIENT;
1235-
senderDescriptor.tls_config.verify_file = "ca.crt";
1236+
senderDescriptor.tls_config.password = "fastddspwd";
1237+
senderDescriptor.tls_config.cert_chain_file = "fastdds.crt";
1238+
senderDescriptor.tls_config.private_key_file = "fastdds.key";
1239+
senderDescriptor.tls_config.tmp_dh_file = "dh_params.pem";
12361240
senderDescriptor.tls_config.verify_mode = TLSVerifyMode::VERIFY_PEER;
12371241
senderDescriptor.tls_config.add_option(TLSOptions::DEFAULT_WORKAROUNDS);
12381242
senderDescriptor.tls_config.add_option(TLSOptions::SINGLE_DH_USE);
@@ -1249,8 +1253,8 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
12491253
// saturate the reception socket of the datareader. This saturation requires
12501254
// preventing the datareader from reading from the socket, what inevitably
12511255
// happens continuously if instantiating and connecting the receiver transport.
1252-
// Hence, a raw socket is opened and connected to the server. There won't be read
1253-
// calls on that socket.
1256+
// Hence, a raw socket is opened and connected to the server. Read calls on that
1257+
// socket are controlled.
12541258
Locator_t serverLoc;
12551259
serverLoc.kind = LOCATOR_KIND_TCPv4;
12561260
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
@@ -1263,13 +1267,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
12631267
{
12641268
return preverified;
12651269
});
1266-
ssl_context.set_password_callback([](std::size_t, asio::ssl::context_base::password_purpose)
1267-
{
1268-
return "fastddspwd";
1269-
});
1270-
ssl_context.use_certificate_chain_file("fastdds.crt");
1271-
ssl_context.use_private_key_file("fastdds.key", asio::ssl::context::pem);
1272-
ssl_context.use_tmp_dh_file("dh_params.pem");
1270+
ssl_context.load_verify_file("ca.crt");
12731271

12741272
uint32_t options = 0;
12751273
options |= asio::ssl::context::default_workarounds;
@@ -1278,8 +1276,19 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
12781276
options |= asio::ssl::context::no_compression;
12791277
ssl_context.set_options(options);
12801278

1281-
// TCPChannelResourceSecure::connect() like connection
12821279
asio::io_service io_service;
1280+
auto ioServiceFunction = [&]()
1281+
{
1282+
#if ASIO_VERSION >= 101200
1283+
asio::executor_work_guard<asio::io_service::executor_type> work(io_service.get_executor());
1284+
#else
1285+
io_service::work work(io_service_);
1286+
#endif // if ASIO_VERSION >= 101200
1287+
io_service.run();
1288+
};
1289+
std::thread ioServiceThread(ioServiceFunction);
1290+
1291+
// TCPChannelResourceSecure::connect() like connection
12831292
asio::ip::tcp::resolver resolver(io_service);
12841293
auto endpoints = resolver.resolve(
12851294
IPLocator::ip_to_string(serverLoc),
@@ -1300,7 +1309,7 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
13001309
)
13011310
{
13021311
ASSERT_TRUE(!ec);
1303-
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::server;
1312+
asio::ssl::stream_base::handshake_type role = asio::ssl::stream_base::client;
13041313
secure_socket->async_handshake(role,
13051314
[](const std::error_code& ec)
13061315
{
@@ -1316,24 +1325,40 @@ TEST_F(TCPv4Tests, secure_non_blocking_send)
13161325
a connection. This channel will not be present in the server's channel_resources_ map
13171326
as communication lacks most of the discovery messages using a raw socket as participant.
13181327
*/
1328+
// auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
13191329
auto sender_unbound_channel_resources = senderTransportUnderTest.get_unbound_channel_resources();
13201330
ASSERT_TRUE(sender_unbound_channel_resources.size() == 1);
13211331
auto sender_channel_resource =
13221332
std::static_pointer_cast<TCPChannelResourceBasic>(sender_unbound_channel_resources[0]);
13231333

13241334
// Prepare the message
13251335
asio::error_code ec;
1326-
std::vector<octet> message(msg_size, 0);
1336+
std::vector<octet> message(msg_size * 2, 0);
13271337
const octet* data = message.data();
13281338
size_t size = message.size();
13291339

1330-
// Send the message with no header
1331-
for (int i = 0; i < 5; i++)
1332-
{
1333-
sender_channel_resource->send(nullptr, 0, data, size, ec);
1334-
}
1340+
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
1341+
// it should be able to send a message of msg_size*2.
1342+
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
1343+
ASSERT_EQ(bytes_sent, size);
1344+
1345+
// Now wait until the receive buffer is flushed (send buffer will be empty too)
1346+
std::vector<octet> buffer(size, 0);
1347+
size_t bytes_read = 0;
1348+
bytes_read = asio::read(*secure_socket, asio::buffer(buffer.data(), size), asio::transfer_exactly(size), ec);
1349+
ASSERT_EQ(ec, asio::error_code());
1350+
ASSERT_EQ(bytes_read, size);
1351+
1352+
// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
1353+
message.resize(msg_size * 2 + 1);
1354+
data = message.data();
1355+
size = message.size();
1356+
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
1357+
ASSERT_EQ(bytes_sent, 0u);
13351358

13361359
secure_socket->lowest_layer().close(ec);
1360+
io_service.stop();
1361+
ioServiceThread.join();
13371362
}
13381363
#endif // ifndef _WIN32
13391364

@@ -1769,7 +1794,7 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
17691794
}
17701795

17711796
#ifndef _WIN32
1772-
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
1797+
// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a
17731798
// destination that does not read or does it so slowly.
17741799
TEST_F(TCPv4Tests, non_blocking_send)
17751800
{
@@ -1790,8 +1815,8 @@ TEST_F(TCPv4Tests, non_blocking_send)
17901815
// saturate the reception socket of the datareader. This saturation requires
17911816
// preventing the datareader from reading from the socket, what inevitably
17921817
// happens continuously if instantiating and connecting the receiver transport.
1793-
// Hence, a raw socket is opened and connected to the server. There won't be read
1794-
// calls on that socket.
1818+
// Hence, a raw socket is opened and connected to the server. Read calls on that
1819+
// socket are controlled.
17951820
Locator_t serverLoc;
17961821
serverLoc.kind = LOCATOR_KIND_TCPv4;
17971822
IPLocator::setIPv4(serverLoc, 127, 0, 0, 1);
@@ -1836,15 +1861,26 @@ TEST_F(TCPv4Tests, non_blocking_send)
18361861

18371862
// Prepare the message
18381863
asio::error_code ec;
1839-
std::vector<octet> message(msg_size, 0);
1864+
std::vector<octet> message(msg_size * 2, 0);
18401865
const octet* data = message.data();
18411866
size_t size = message.size();
18421867

1843-
// Send the message with no header
1844-
for (int i = 0; i < 5; i++)
1845-
{
1846-
sender_channel_resource->send(nullptr, 0, data, size, ec);
1847-
}
1868+
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
1869+
// it should be able to send a message of msg_size*2.
1870+
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
1871+
ASSERT_EQ(bytes_sent, size);
1872+
1873+
// Now wait until the receive buffer is flushed (send buffer will be empty too)
1874+
std::vector<octet> buffer(size, 0);
1875+
size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec);
1876+
ASSERT_EQ(bytes_read, size);
1877+
1878+
// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
1879+
message.resize(msg_size * 2 + 1);
1880+
data = message.data();
1881+
size = message.size();
1882+
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
1883+
ASSERT_EQ(bytes_sent, 0u);
18481884

18491885
socket.shutdown(asio::ip::tcp::socket::shutdown_both);
18501886
socket.cancel();

test/unittest/transport/TCPv6Tests.cpp

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ TEST_F(TCPv6Tests, client_announced_local_port_uniqueness)
243243
}
244244

245245
#ifndef _WIN32
246-
// The primary purpose of this test is to check the non-blocking behavior of a secure socket sending data to a
246+
// The primary purpose of this test is to check the non-blocking behavior of a socket sending data to a
247247
// destination that does not read or does it so slowly.
248248
TEST_F(TCPv6Tests, non_blocking_send)
249249
{
@@ -264,8 +264,8 @@ TEST_F(TCPv6Tests, non_blocking_send)
264264
// saturate the reception socket of the datareader. This saturation requires
265265
// preventing the datareader from reading from the socket, what inevitably
266266
// happens continuously if instantiating and connecting the receiver transport.
267-
// Hence, a raw socket is opened and connected to the server. There won't be read
268-
// calls on that socket.
267+
// Hence, a raw socket is opened and connected to the server. Read calls on that
268+
// socket are controlled.
269269
Locator_t serverLoc;
270270
serverLoc.kind = LOCATOR_KIND_TCPv6;
271271
IPLocator::setIPv6(serverLoc, "::1");
@@ -310,15 +310,26 @@ TEST_F(TCPv6Tests, non_blocking_send)
310310

311311
// Prepare the message
312312
asio::error_code ec;
313-
std::vector<octet> message(msg_size, 0);
313+
std::vector<octet> message(msg_size * 2, 0);
314314
const octet* data = message.data();
315315
size_t size = message.size();
316316

317-
// Send the message with no header
318-
for (int i = 0; i < 5; i++)
319-
{
320-
sender_channel_resource->send(nullptr, 0, data, size, ec);
321-
}
317+
// Send the message with no header. Since TCP actually allocates twice the size of the buffer requested
318+
// it should be able to send a message of msg_size*2.
319+
size_t bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
320+
ASSERT_EQ(bytes_sent, size);
321+
322+
// Now wait until the receive buffer is flushed (send buffer will be empty too)
323+
std::vector<octet> buffer(size, 0);
324+
size_t bytes_read = asio::read(socket, asio::buffer(buffer, size), asio::transfer_exactly(size), ec);
325+
ASSERT_EQ(bytes_read, size);
326+
327+
// Now try to send a message that is bigger than the buffer size: (msg_size*2 + 1) + bytes_in_send_buffer(0) > 2*sendBufferSize
328+
message.resize(msg_size * 2 + 1);
329+
data = message.data();
330+
size = message.size();
331+
bytes_sent = sender_channel_resource->send(nullptr, 0, data, size, ec);
332+
ASSERT_EQ(bytes_sent, 0u);
322333

323334
socket.shutdown(asio::ip::tcp::socket::shutdown_both);
324335
socket.cancel();

0 commit comments

Comments
 (0)