Skip to content

Commit 73f8f5e

Browse files
committed
test: add a test for multithreaded client usage
The previous commits adapted Tntcxx for multithreaded execution - this adds a test for the scenario. Part of #110
1 parent 86d60ca commit 73f8f5e

File tree

2 files changed

+194
-0
lines changed

2 files changed

+194
-0
lines changed

CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ IF (TNTCXX_BUILD_TESTING)
8686
# Retrieve the source directory to later get the header path.
8787
FETCHCONTENT_GETPROPERTIES(msgpuck)
8888
FETCHCONTENT_MAKEAVAILABLE(msgpuck)
89+
90+
find_package(Threads REQUIRED)
8991
ENDIF()
9092

9193
OPTION(TNTCXX_ENABLE_SANITIZERS
@@ -209,6 +211,11 @@ TNTCXX_TEST(NAME Client.test TYPE ctest
209211
LIBRARIES ${COMMON_LIB}
210212
)
211213

214+
TNTCXX_TEST(NAME ClientMultithread.test TYPE ctest
215+
SOURCES src/Client/Connector.hpp test/ClientMultithreadTest.cpp
216+
LIBRARIES ${COMMON_LIB} Threads::Threads
217+
)
218+
212219
IF (TNTCXX_ENABLE_SSL)
213220
TNTCXX_TEST(NAME ClientSSL.test TYPE ctest
214221
SOURCES src/Client/Connector.hpp test/ClientTest.cpp

test/ClientMultithreadTest.cpp

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* SPDX-License-Identifier: BSD-2-Clause
3+
*
4+
* Copyright 2010-2025, Tarantool AUTHORS, please see AUTHORS file.
5+
*/
6+
#include "Utils/Helpers.hpp"
7+
#include "Utils/System.hpp"
8+
#include "Utils/UserTuple.hpp"
9+
10+
#include "Client/Connector.hpp"
11+
#include "Client/LibevNetProvider.hpp"
12+
13+
#include <cmath>
14+
#include <thread>
15+
#include <tuple>
16+
17+
const char *localhost = "127.0.0.1";
18+
int port = 3301;
19+
int dummy_server_port = 3302;
20+
const char *unixsocket = "./tnt.sock";
21+
int WAIT_TIMEOUT = 1000; // milliseconds
22+
23+
using Buf_t = tnt::Buffer<16 * 1024>;
24+
25+
#ifdef TNTCXX_ENABLE_SSL
26+
constexpr bool enable_ssl = true;
27+
constexpr StreamTransport transport = STREAM_SSL;
28+
#else
29+
constexpr bool enable_ssl = false;
30+
constexpr StreamTransport transport = STREAM_PLAIN;
31+
#endif
32+
33+
#ifdef __linux__
34+
using NetProvider = EpollNetProvider<Buf_t, DefaultStream>;
35+
#else
36+
using NetProvider = LibevNetProvider<Buf_t, DefaultStream>;
37+
#endif
38+
39+
template <class Connector, class Connection>
40+
static int
41+
test_connect(Connector &client, Connection &conn, const std::string &addr, unsigned port, const std::string user = {},
42+
const std::string passwd = {})
43+
{
44+
std::string service = port == 0 ? std::string {} : std::to_string(port);
45+
return client.connect(conn,
46+
{
47+
.address = addr,
48+
.service = service,
49+
.transport = transport,
50+
.user = user,
51+
.passwd = passwd,
52+
});
53+
}
54+
55+
class PingRequestProcessor {
56+
public:
57+
static rid_t sendRequest(Connection<Buf_t, NetProvider> &conn, size_t thread_id, size_t iter)
58+
{
59+
(void)thread_id;
60+
(void)iter;
61+
rid_t f = conn.ping();
62+
fail_unless(!conn.futureIsReady(f));
63+
return f;
64+
}
65+
66+
static void processResponse(std::optional<Response<Buf_t>> &response, size_t thread_id, size_t iter)
67+
{
68+
(void)thread_id;
69+
(void)iter;
70+
fail_unless(response != std::nullopt);
71+
fail_unless(response->header.code == 0);
72+
}
73+
};
74+
75+
class ReplaceRequestProcessor {
76+
public:
77+
static rid_t sendRequest(Connection<Buf_t, NetProvider> &conn, size_t thread_id, size_t iter)
78+
{
79+
const size_t space_id = 512;
80+
std::tuple data = std::make_tuple(iter, "a", double(iter * thread_id));
81+
rid_t f = conn.space[space_id].replace(data);
82+
fail_unless(!conn.futureIsReady(f));
83+
return f;
84+
}
85+
86+
static void processResponse(std::optional<Response<Buf_t>> &response, size_t thread_id, size_t iter)
87+
{
88+
fail_unless(response != std::nullopt);
89+
fail_unless(response->header.code == 0);
90+
91+
fail_unless(response != std::nullopt);
92+
fail_unless(response->body.data != std::nullopt);
93+
fail_unless(response->body.error_stack == std::nullopt);
94+
95+
std::vector<std::tuple<size_t, std::string, double>> response_data;
96+
fail_unless(response->body.data->decode(response_data));
97+
fail_unless(response_data.size() == 1);
98+
fail_unless(std::get<0>(response_data[0]) == iter);
99+
fail_unless(std::get<1>(response_data[0]) == std::string("a"));
100+
fail_unless(std::fabs(std::get<2>(response_data[0]) - iter * thread_id)
101+
<= std::numeric_limits<double>::epsilon());
102+
}
103+
};
104+
105+
template <typename RequestProcessor, size_t ConnPerThread = 1>
106+
static void
107+
multithread_test(void)
108+
{
109+
TEST_INIT(0);
110+
static constexpr int ITER_NUM = 1000;
111+
static constexpr int THREAD_NUM = 24;
112+
std::vector<std::thread> threads;
113+
threads.reserve(THREAD_NUM);
114+
for (int t = 0; t < THREAD_NUM; t++) {
115+
threads.emplace_back([]() {
116+
Connector<Buf_t, NetProvider> client;
117+
std::vector<Connection<Buf_t, NetProvider>> conns;
118+
for (size_t i = 0; i < ConnPerThread; i++)
119+
conns.emplace_back(client);
120+
for (auto &conn : conns) {
121+
int rc = test_connect(client, conn, localhost, port);
122+
fail_unless(rc == 0);
123+
}
124+
125+
for (int iter = 0; iter < ITER_NUM; iter++) {
126+
std::array<rid_t, ConnPerThread> fs;
127+
128+
for (size_t t = 0; t < ConnPerThread; t++)
129+
fs[t] = RequestProcessor::sendRequest(conns[t], t, iter);
130+
131+
for (size_t t = 0; t < ConnPerThread; t++) {
132+
client.wait(conns[t], fs[t], WAIT_TIMEOUT);
133+
fail_unless(conns[t].futureIsReady(fs[t]));
134+
std::optional<Response<Buf_t>> response = conns[t].getResponse(fs[t]);
135+
RequestProcessor::processResponse(response, t, iter);
136+
}
137+
}
138+
});
139+
}
140+
for (auto &thread : threads)
141+
thread.join();
142+
}
143+
144+
int
145+
main()
146+
{
147+
/*
148+
* Send STDOUT to /dev/null - otherwise, there will be a ton of debug
149+
* logs and it will be impossible to inspect them on failure.
150+
*
151+
* Note that one can disable debug logs by setting logging level of
152+
* `Logger`. But that's not the case here because we want to cover
153+
* the logger with multi-threading test as well to make sure that
154+
* it is thread-safe, so we shouldn't disable them, but we don't want
155+
* to read them either.
156+
*/
157+
if (freopen("/dev/null", "w", stdout) == NULL) {
158+
std::cerr << "Cannot send STDOUT to /dev/null" << std::endl;
159+
abort();
160+
}
161+
#ifdef TNTCXX_ENABLE_SSL
162+
#ifndef __FreeBSD__
163+
// There's no way to disable SIGPIPE for SSL on non-FreeBSD platforms,
164+
// so it is needed to disable signal handling.
165+
signal(SIGPIPE, SIG_IGN);
166+
#endif
167+
#endif
168+
169+
if (cleanDir() != 0)
170+
return -1;
171+
172+
#ifdef TNTCXX_ENABLE_SSL
173+
if (genSSLCert() != 0)
174+
return -1;
175+
#endif
176+
177+
if (launchTarantool(enable_ssl) != 0)
178+
return -1;
179+
180+
sleep(1);
181+
182+
multithread_test<PingRequestProcessor>();
183+
multithread_test<PingRequestProcessor, 5>();
184+
multithread_test<ReplaceRequestProcessor>();
185+
multithread_test<ReplaceRequestProcessor, 5>();
186+
return 0;
187+
}

0 commit comments

Comments
 (0)