Skip to content

Commit dfe5389

Browse files
committed
client: poll connections on non-blocking wait
In the previous commits, we made `wait(0)` non-blocking. However, it must poll connections (otherwise, it is just pointless) - for this purpose, we should check if the timer has expired after the first call of `NetProvider.wait()`. Also, `LibevNetProvider` should run event loop with a special `EVRUN_NOWAIT` flag when the timeout is zero for non-blocking poll. Closes #111
1 parent 0342f3b commit dfe5389

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

src/Client/Connector.hpp

+13-5
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
185185
timer.start();
186186
if (connectionDecodeResponses(conn, result) != 0)
187187
return -1;
188-
while (!conn.hasError() && !conn.futureIsReady(future) &&
189-
!timer.isExpired()) {
188+
while (!conn.hasError() && !conn.futureIsReady(future)) {
190189
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
191190
conn.setError(std::string("Failed to poll: ") +
192191
strerror(errno), errno);
@@ -203,6 +202,8 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
203202
if (!hasDataToDecode(conn))
204203
m_ReadyToDecode.erase(conn);
205204
}
205+
if (timer.isExpired())
206+
break;
206207
}
207208
if (conn.hasError()) {
208209
LOG_ERROR("Connection got an error: ", conn.getError().msg);
@@ -226,7 +227,7 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
226227
Timer timer{timeout};
227228
timer.start();
228229
size_t last_not_ready = 0;
229-
while (!conn.hasError() && !timer.isExpired()) {
230+
while (!conn.hasError()) {
230231
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
231232
conn.setError(std::string("Failed to poll: ") +
232233
strerror(errno), errno);
@@ -249,6 +250,8 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
249250
}
250251
if (finish)
251252
return 0;
253+
if (timer.isExpired())
254+
break;
252255
}
253256
if (conn.hasError()) {
254257
LOG_ERROR("Connection got an error: ", conn.getError().msg);
@@ -264,8 +267,11 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
264267
{
265268
Timer timer{timeout};
266269
timer.start();
267-
while (m_ReadyToDecode.empty() && !timer.isExpired())
270+
while (m_ReadyToDecode.empty()) {
268271
m_NetProvider.wait(timer.timeLeft());
272+
if (timer.isExpired())
273+
break;
274+
}
269275
if (m_ReadyToDecode.empty()) {
270276
LOG_ERROR("wait() has been timed out! No responses are received");
271277
return std::nullopt;
@@ -287,7 +293,7 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
287293
Timer timer{timeout};
288294
timer.start();
289295
size_t ready_futures = conn.getFutureCount();
290-
while (!conn.hasError() && !timer.isExpired()) {
296+
while (!conn.hasError()) {
291297
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
292298
conn.setError(std::string("Failed to poll: ") +
293299
strerror(errno), errno);
@@ -302,6 +308,8 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
302308
}
303309
if ((conn.getFutureCount() - ready_futures) >= future_count)
304310
return 0;
311+
if (timer.isExpired())
312+
break;
305313
}
306314
if (conn.hasError()) {
307315
LOG_ERROR("Connection got an error: ", conn.getError().msg);

src/Client/LibevNetProvider.hpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@ LibevNetProvider<BUFFER, Stream>::wait(int timeout)
381381

382382
}
383383
}
384-
ev_run(m_Loop, EVRUN_ONCE);
384+
/* Work in non-blocking mode when the timeout is zero. */
385+
int flags = timeout == 0 ? EVRUN_NOWAIT : EVRUN_ONCE;
386+
ev_run(m_Loop, flags);
385387
return 0;
386388
}

test/ClientTest.cpp

+44
Original file line numberDiff line numberDiff line change
@@ -1166,13 +1166,35 @@ test_wait(Connector<BUFFER, NetProvider> &client)
11661166
std::optional<Response<Buf_t>> response = conn.getResponse(f);
11671167
fail_unless(response.has_value());
11681168

1169+
TEST_CASE("wait(0) polls connections");
1170+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1171+
fail_unless(!conn.futureIsReady(f));
1172+
while (!conn.futureIsReady(f)) {
1173+
client.wait(conn, f, 0);
1174+
usleep(10 * 1000); /* 10ms */
1175+
}
1176+
fail_unless(conn.futureIsReady(f));
1177+
response = conn.getResponse(f);
1178+
fail_unless(response.has_value());
1179+
11691180
TEST_CASE("waitAny(0) and waitAny(-1)");
11701181
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
11711182
fail_unless(!client.waitAny(0).has_value());
11721183
fail_unless(client.waitAny(-1).has_value());
11731184
response = conn.getResponse(f);
11741185
fail_unless(response.has_value());
11751186

1187+
TEST_CASE("waitAny(0) polls connections");
1188+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1189+
fail_unless(!conn.futureIsReady(f));
1190+
while (!conn.futureIsReady(f)) {
1191+
client.waitAny(0);
1192+
usleep(10 * 1000); /* 10ms */
1193+
}
1194+
fail_unless(conn.futureIsReady(f));
1195+
response = conn.getResponse(f);
1196+
fail_unless(response.has_value());
1197+
11761198
TEST_CASE("waitAll(0) and waitAll(-1)");
11771199
std::vector<rid_t> fs;
11781200
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
@@ -1184,6 +1206,17 @@ test_wait(Connector<BUFFER, NetProvider> &client)
11841206
response = conn.getResponse(fs[1]);
11851207
fail_unless(response.has_value());
11861208

1209+
TEST_CASE("waitAll(0) polls connections");
1210+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1211+
fail_unless(!conn.futureIsReady(f));
1212+
while (!conn.futureIsReady(f)) {
1213+
client.waitAll(conn, std::vector<rid_t>{f}, 0);
1214+
usleep(10 * 1000); /* 10ms */
1215+
}
1216+
fail_unless(conn.futureIsReady(f));
1217+
response = conn.getResponse(f);
1218+
fail_unless(response.has_value());
1219+
11871220
TEST_CASE("waitCount(0) and waitCount(-1)");
11881221
fs.clear();
11891222
fs.push_back(conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME)));
@@ -1195,6 +1228,17 @@ test_wait(Connector<BUFFER, NetProvider> &client)
11951228
response = conn.getResponse(fs[1]);
11961229
fail_unless(response.has_value());
11971230

1231+
TEST_CASE("waitCount(0) polls connections");
1232+
f = conn.call("remote_sleep", std::forward_as_tuple(SLEEP_TIME));
1233+
fail_unless(!conn.futureIsReady(f));
1234+
while (!conn.futureIsReady(f)) {
1235+
client.waitCount(conn, 1, 0);
1236+
usleep(10 * 1000); /* 10ms */
1237+
}
1238+
fail_unless(conn.futureIsReady(f));
1239+
response = conn.getResponse(f);
1240+
fail_unless(response.has_value());
1241+
11981242
client.close(conn);
11991243
}
12001244

0 commit comments

Comments
 (0)