Skip to content

Commit 16f7593

Browse files
committed
test: add a test for multithreaded client usage
The previous commits adapted Tntcxx for multithreaded execution - this adds a test for the scenario. Part of #110
1 parent 0935f6d commit 16f7593

File tree

2 files changed

+213
-0
lines changed

2 files changed

+213
-0
lines changed

CMakeLists.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ IF (TNTCXX_BUILD_TESTING)
8686
# Retrieve the source directory to later get the header path.
8787
FETCHCONTENT_GETPROPERTIES(msgpuck)
8888
FETCHCONTENT_MAKEAVAILABLE(msgpuck)
89+
90+
find_package(Threads REQUIRED)
8991
ENDIF()
9092

9193
OPTION(TNTCXX_ENABLE_SANITIZERS
@@ -217,6 +219,11 @@ TNTCXX_TEST(NAME Client.test TYPE ctest
217219
LIBRARIES ${COMMON_LIB}
218220
)
219221

222+
TNTCXX_TEST(NAME ClientMultithread.test TYPE ctest
223+
SOURCES src/Client/Connector.hpp test/ClientMultithreadTest.cpp
224+
LIBRARIES ${COMMON_LIB} Threads::Threads
225+
)
226+
220227
IF (TNTCXX_ENABLE_SSL)
221228
TNTCXX_TEST(NAME ClientSSL.test TYPE ctest
222229
SOURCES src/Client/Connector.hpp test/ClientTest.cpp

test/ClientMultithreadTest.cpp

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
3+
*
4+
* Redistribution and use in source and binary forms, with or
5+
* without modification, are permitted provided that the following
6+
* conditions are met:
7+
*
8+
* 1. Redistributions of source code must retain the above
9+
* copyright notice, this list of conditions and the
10+
* following disclaimer.
11+
*
12+
* 2. Redistributions in binary form must reproduce the above
13+
* copyright notice, this list of conditions and the following
14+
* disclaimer in the documentation and/or other materials
15+
* provided with the distribution.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18+
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19+
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21+
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22+
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25+
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26+
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28+
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29+
* SUCH DAMAGE.
30+
*/
31+
#include "Utils/Helpers.hpp"
32+
#include "Utils/System.hpp"
33+
#include "Utils/UserTuple.hpp"
34+
35+
#include "../src/Client/Connector.hpp"
36+
#include "../src/Client/LibevNetProvider.hpp"
37+
38+
#include <cmath>
39+
#include <thread>
40+
#include <tuple>
41+
42+
const char *localhost = "127.0.0.1";
43+
int port = 3301;
44+
int dummy_server_port = 3302;
45+
const char *unixsocket = "./tnt.sock";
46+
int WAIT_TIMEOUT = 1000; // milliseconds
47+
48+
using Buf_t = tnt::Buffer<16 * 1024>;
49+
50+
#ifdef TNTCXX_ENABLE_SSL
51+
constexpr bool enable_ssl = true;
52+
constexpr StreamTransport transport = STREAM_SSL;
53+
#else
54+
constexpr bool enable_ssl = false;
55+
constexpr StreamTransport transport = STREAM_PLAIN;
56+
#endif
57+
58+
#ifdef __linux__
59+
using NetProvider = EpollNetProvider<Buf_t, DefaultStream>;
60+
#else
61+
using NetProvider = LibevNetProvider<Buf_t, DefaultStream>;
62+
#endif
63+
64+
template <class Connector, class Connection>
65+
static int
66+
test_connect(Connector &client, Connection &conn, const std::string &addr, unsigned port, const std::string user = {},
67+
const std::string passwd = {})
68+
{
69+
std::string service = port == 0 ? std::string {} : std::to_string(port);
70+
return client.connect(conn,
71+
{
72+
.address = addr,
73+
.service = service,
74+
.transport = transport,
75+
.user = user,
76+
.passwd = passwd,
77+
});
78+
}
79+
80+
class PingRequestProcessor {
81+
public:
82+
static rid_t sendRequest(Connection<Buf_t, NetProvider> &conn, size_t thread_id, size_t iter)
83+
{
84+
(void)thread_id;
85+
(void)iter;
86+
rid_t f = conn.ping();
87+
fail_unless(!conn.futureIsReady(f));
88+
return f;
89+
}
90+
91+
static void processResponse(std::optional<Response<Buf_t>> &response, size_t thread_id, size_t iter)
92+
{
93+
(void)thread_id;
94+
(void)iter;
95+
fail_unless(response != std::nullopt);
96+
fail_unless(response->header.code == 0);
97+
}
98+
};
99+
100+
class ReplaceRequestProcessor {
101+
public:
102+
static rid_t sendRequest(Connection<Buf_t, NetProvider> &conn, size_t thread_id, size_t iter)
103+
{
104+
const size_t space_id = 512;
105+
std::tuple data = std::make_tuple(iter, "a", double(iter * thread_id));
106+
rid_t f = conn.space[space_id].replace(data);
107+
fail_unless(!conn.futureIsReady(f));
108+
return f;
109+
}
110+
111+
static void processResponse(std::optional<Response<Buf_t>> &response, size_t thread_id, size_t iter)
112+
{
113+
fail_unless(response != std::nullopt);
114+
fail_unless(response->header.code == 0);
115+
116+
fail_unless(response != std::nullopt);
117+
fail_unless(response->body.data != std::nullopt);
118+
fail_unless(response->body.error_stack == std::nullopt);
119+
120+
std::vector<std::tuple<size_t, std::string, double>> response_data;
121+
fail_unless(response->body.data->decode(response_data));
122+
fail_unless(response_data.size() == 1);
123+
fail_unless(std::get<0>(response_data[0]) == iter);
124+
fail_unless(std::get<1>(response_data[0]) == std::string("a"));
125+
fail_unless(std::fabs(std::get<2>(response_data[0]) - iter * thread_id)
126+
<= std::numeric_limits<double>::epsilon());
127+
}
128+
};
129+
130+
template <typename RequestProcessor, size_t ConnPerThread = 1>
131+
static void
132+
multithread_test(void)
133+
{
134+
TEST_INIT(0);
135+
static constexpr int ITER_NUM = 1000;
136+
static constexpr int THREAD_NUM = 24;
137+
std::vector<std::thread> threads;
138+
threads.reserve(THREAD_NUM);
139+
for (int t = 0; t < THREAD_NUM; t++) {
140+
threads.emplace_back([]() {
141+
Connector<Buf_t, NetProvider> client;
142+
std::vector<Connection<Buf_t, NetProvider>> conns;
143+
for (size_t i = 0; i < ConnPerThread; i++)
144+
conns.emplace_back(client);
145+
for (auto &conn : conns) {
146+
int rc = test_connect(client, conn, localhost, port);
147+
fail_unless(rc == 0);
148+
}
149+
150+
for (int iter = 0; iter < ITER_NUM; iter++) {
151+
std::array<rid_t, ConnPerThread> fs;
152+
153+
for (size_t t = 0; t < ConnPerThread; t++)
154+
fs[t] = RequestProcessor::sendRequest(conns[t], t, iter);
155+
156+
for (size_t t = 0; t < ConnPerThread; t++) {
157+
client.wait(conns[t], fs[t], WAIT_TIMEOUT);
158+
fail_unless(conns[t].futureIsReady(fs[t]));
159+
std::optional<Response<Buf_t>> response = conns[t].getResponse(fs[t]);
160+
RequestProcessor::processResponse(response, t, iter);
161+
}
162+
}
163+
});
164+
}
165+
for (auto &thread : threads)
166+
thread.join();
167+
}
168+
169+
int
170+
main()
171+
{
172+
/*
173+
* Send STDOUT to /dev/null - otherwise, there will be a ton of logs
174+
* and it will be impossible to inspect them on failure.
175+
*/
176+
if (freopen("/dev/null", "w", stdout) == NULL) {
177+
std::cerr << "Cannot send STDOUT to /dev/null" << std::endl;
178+
abort();
179+
}
180+
#ifdef TNTCXX_ENABLE_SSL
181+
#ifndef __FreeBSD__
182+
// There's no way to disable SIGPIPE for SSL on non-FreeBSD platforms,
183+
// so it is needed to disable signal handling.
184+
signal(SIGPIPE, SIG_IGN);
185+
#endif
186+
#endif
187+
188+
if (cleanDir() != 0)
189+
return -1;
190+
191+
#ifdef TNTCXX_ENABLE_SSL
192+
if (genSSLCert() != 0)
193+
return -1;
194+
#endif
195+
196+
if (launchTarantool(enable_ssl) != 0)
197+
return -1;
198+
199+
sleep(1);
200+
201+
multithread_test<PingRequestProcessor>();
202+
multithread_test<PingRequestProcessor, 5>();
203+
multithread_test<ReplaceRequestProcessor>();
204+
multithread_test<ReplaceRequestProcessor, 5>();
205+
return 0;
206+
}

0 commit comments

Comments
 (0)