Skip to content

Commit 7fc1912

Browse files
committed
client: redesign waiting methods
Currently, call of any waiting method (`wait`, `waitAll` and so on) with zero timeout blocks execution until all required responses are received. That's not great because in some scenarios users want to check if there are any responses ready without blocking execution. Of course, one can use `wait(1)` which sleeps for only a millisecond, but anyway it's often unexpected that `wait(0)` sleeps forever. Let's stick to `epoll_wait` interface - `wait(0)` is non-blocking and `wait(-1)` blocks forever. Note that before the commit, passing `-1` would lead to an assertion failure. Note that we used to block forever in waiting methods by default. To maintain this behavior, the commit updates default value of timeout since the old default (zero) means non-blocking polling of connections now. Also, the commit makes `wait(0)` non-blocking, but it doesn't poll connections. We will implement that part in a separate commit for the sake of clarity of the patch. Part of #111
1 parent 0851b2f commit 7fc1912

File tree

7 files changed

+76
-9
lines changed

7 files changed

+76
-9
lines changed

src/Client/Connector.hpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,12 @@ class Connector
7575
const std::string& addr, unsigned port);
7676

7777
int wait(Connection<BUFFER, NetProvider> &conn, rid_t future,
78-
int timeout = 0, Response<BUFFER> *result = nullptr);
78+
int timeout = -1, Response<BUFFER> *result = nullptr);
7979
int waitAll(Connection<BUFFER, NetProvider> &conn,
80-
const std::vector<rid_t > &futures, int timeout = 0);
80+
const std::vector<rid_t > &futures, int timeout = -1);
8181
int waitCount(Connection<BUFFER, NetProvider> &conn,
82-
size_t feature_count, int timeout = 0);
83-
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = 0);
82+
size_t feature_count, int timeout = -1);
83+
std::optional<Connection<BUFFER, NetProvider>> waitAny(int timeout = -1);
8484
////////////////////////////Service interfaces//////////////////////////
8585
void readyToDecode(const Connection<BUFFER, NetProvider> &conn);
8686
void readyToSend(const Connection<BUFFER, NetProvider> &conn);

src/Client/EpollNetProvider.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,8 @@ template<class BUFFER, class Stream>
258258
int
259259
EpollNetProvider<BUFFER, Stream>::wait(int timeout)
260260
{
261-
assert(timeout >= 0);
262-
if (timeout == 0)
261+
assert(timeout >= -1);
262+
if (timeout == -1)
263263
timeout = TIMEOUT_INFINITY;
264264
LOG_DEBUG("Network engine wait for ", timeout, " milliseconds");
265265
/* Send pending requests. */

src/Client/LibevNetProvider.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ template<class BUFFER, class Stream>
363363
int
364364
LibevNetProvider<BUFFER, Stream>::wait(int timeout)
365365
{
366-
assert(timeout >= 0);
366+
assert(timeout >= -1);
367367
if (timeout > 0) {
368368
ev_timer_init(&m_TimeoutWatcher, &timeout_cb, timeout / MILLISECONDS, 0 /* repeat */);
369369
ev_timer_start(m_Loop, &m_TimeoutWatcher);

src/Utils/Timer.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Timer {
4040
}
4141
bool isExpired() const
4242
{
43-
if (m_Timeout == std::chrono::milliseconds{0})
43+
if (m_Timeout == std::chrono::milliseconds{-1})
4444
return false;
4545
std::chrono::time_point<std::chrono::steady_clock> end =
4646
std::chrono::steady_clock::now();
@@ -50,7 +50,7 @@ class Timer {
5050
}
5151
int elapsed() const
5252
{
53-
if (m_Timeout == std::chrono::milliseconds{0})
53+
if (m_Timeout == std::chrono::milliseconds{-1})
5454
return 0;
5555
std::chrono::time_point<std::chrono::steady_clock> end =
5656
std::chrono::steady_clock::now();

test/ClientTest.cpp

+55
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,60 @@ response_decoding(Connector<BUFFER, NetProvider> &client)
11441144
client.close(conn);
11451145
}
11461146

1147+
/** Checks all available `wait` methods of connector. */
1148+
template <class BUFFER, class NetProvider>
1149+
void
1150+
test_wait(Connector<BUFFER, NetProvider> &client)
1151+
{
1152+
TEST_INIT(0);
1153+
static constexpr double SLEEP_TIME = 0.1;
1154+
1155+
Connection<Buf_t, NetProvider> conn(client);
1156+
int rc = test_connect(client, conn, localhost, port);
1157+
fail_unless(rc == 0);
1158+
1159+
TEST_CASE("wait(0) and wait(-1)");
1160+
rid_t f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1161+
fail_unless(!conn.futureIsReady(f));
1162+
client.wait(conn, f, 0);
1163+
fail_unless(!conn.futureIsReady(f));
1164+
client.wait(conn, f, -1);
1165+
fail_unless(conn.futureIsReady(f));
1166+
std::optional<Response<Buf_t>> response = conn.getResponse(f);
1167+
fail_unless(response.has_value());
1168+
1169+
TEST_CASE("waitAny(0) and waitAny(-1)");
1170+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1171+
fail_unless(!client.waitAny(0).has_value());
1172+
fail_unless(client.waitAny(-1).has_value());
1173+
response = conn.getResponse(f);
1174+
fail_unless(response.has_value());
1175+
1176+
TEST_CASE("waitAll(0) and waitAll(-1)");
1177+
std::vector<rid_t> fs;
1178+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1179+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1180+
fail_unless(client.waitAll(conn, fs, 0) == -1);
1181+
fail_unless(client.waitAll(conn, fs, -1) == 0);
1182+
response = conn.getResponse(fs[0]);
1183+
fail_unless(response.has_value());
1184+
response = conn.getResponse(fs[1]);
1185+
fail_unless(response.has_value());
1186+
1187+
TEST_CASE("waitCount(0) and waitCount(-1)");
1188+
fs.clear();
1189+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1190+
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
1191+
fail_unless(client.waitCount(conn, 2, 0) == -1);
1192+
fail_unless(client.waitCount(conn, 2, -1) == 0);
1193+
response = conn.getResponse(fs[0]);
1194+
fail_unless(response.has_value());
1195+
response = conn.getResponse(fs[1]);
1196+
fail_unless(response.has_value());
1197+
1198+
client.close(conn);
1199+
}
1200+
11471201
int main()
11481202
{
11491203
#ifdef TNTCXX_ENABLE_SSL
@@ -1193,5 +1247,6 @@ int main()
11931247
#endif
11941248
::test_dead_connection_wait();
11951249
response_decoding(client);
1250+
test_wait(client);
11961251
return 0;
11971252
}

test/cfg.lua

+6
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ function remote_echo(...)
3434
return {...}
3535
end
3636

37+
function remote_sleep(timeout)
38+
local fiber = require('fiber')
39+
fiber.sleep(timeout)
40+
return nil
41+
end
42+
3743
function get_rps()
3844
return box.stat.net().REQUESTS.rps
3945
end

test/cfg_ssl.lua

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ function remote_echo(...)
3232
return {...}
3333
end
3434

35+
function remote_sleep(timeout)
36+
local fiber = require('fiber')
37+
fiber.sleep(timeout)
38+
return nil
39+
end
40+
3541
function get_rps()
3642
return box.stat.net().REQUESTS.rps
3743
end

0 commit comments

Comments
 (0)