-
Notifications
You must be signed in to change notification settings - Fork 777
Feat/monitor poll #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Feat/monitor poll #414
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,7 +74,7 @@ TEST_CASE("monitor move assign", "[monitor]") | |
} | ||
} | ||
|
||
TEST_CASE("monitor init event count", "[monitor]") | ||
TEST_CASE("monitor init check event count", "[monitor]") | ||
{ | ||
common_server_client_setup s{false}; | ||
mock_monitor_t monitor; | ||
|
@@ -92,6 +92,93 @@ TEST_CASE("monitor init event count", "[monitor]") | |
CHECK(monitor.total == expected_event_count); | ||
} | ||
|
||
TEST_CASE("monitor init get event count", "[monitor]") | ||
{ | ||
common_server_client_setup s{ false }; | ||
zmq::monitor_t monitor; | ||
|
||
const int expected_event_count = 2; | ||
monitor.init(s.client, "inproc://foo"); | ||
|
||
int total{ 0 }; | ||
int connect_delayed{ 0 }; | ||
int connected{ 0 }; | ||
|
||
auto lbd_count_event = [&](const zmq_event_t& event) { | ||
switch (event.event) | ||
{ | ||
case ZMQ_EVENT_CONNECT_DELAYED: | ||
connect_delayed++; | ||
total++; | ||
break; | ||
|
||
case ZMQ_EVENT_CONNECTED: | ||
connected++; | ||
total++; | ||
break; | ||
} | ||
}; | ||
|
||
zmq_event_t eventMsg; | ||
std::string address; | ||
CHECK_FALSE(monitor.get_event(eventMsg, address, zmq::recv_flags::dontwait)); | ||
s.init(); | ||
|
||
SECTION("get_event") | ||
{ | ||
while (total < expected_event_count) | ||
{ | ||
if (!monitor.get_event(eventMsg, address)) | ||
continue; | ||
|
||
lbd_count_event(eventMsg); | ||
} | ||
|
||
} | ||
|
||
SECTION("poll get_event") | ||
{ | ||
while (total < expected_event_count) | ||
{ | ||
zmq::pollitem_t items[] = { | ||
{ monitor.handle(), 0, ZMQ_POLLIN, 0 }, | ||
}; | ||
|
||
zmq::poll(&items[0], 1, 100); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the overload taking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Roger that |
||
|
||
if (!(items[0].revents & ZMQ_POLLIN)) { | ||
continue; | ||
} | ||
|
||
CHECK(monitor.get_event(eventMsg, address)); | ||
|
||
lbd_count_event(eventMsg); | ||
} | ||
} | ||
|
||
SECTION("poller_t get_event") | ||
{ | ||
zmq::poller_t<> poller; | ||
CHECK_NOTHROW(poller.add(monitor, zmq::event_flags::pollin)); | ||
|
||
while (total < expected_event_count) | ||
{ | ||
std::vector<zmq::poller_event<>> events(1); | ||
if(0 == poller.wait_all(events, std::chrono::milliseconds{ 100 })) | ||
continue; | ||
|
||
CHECK(zmq::event_flags::pollin == events[0].events); | ||
CHECK(monitor.get_event(eventMsg, address)); | ||
|
||
lbd_count_event(eventMsg); | ||
} | ||
} | ||
|
||
CHECK(connect_delayed == 1); | ||
CHECK(connected == 1); | ||
CHECK(total == expected_event_count); | ||
} | ||
|
||
TEST_CASE("monitor init abort", "[monitor]") | ||
{ | ||
class mock_monitor : public mock_monitor_t | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2224,6 +2224,63 @@ class monitor_t | |
on_monitor_started(); | ||
} | ||
|
||
operator void *() ZMQ_NOTHROW { return handle(); } | ||
|
||
operator void const *() const ZMQ_NOTHROW { return handle(); } | ||
Comment on lines
+2227
to
+2229
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather not add these operators. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. |
||
|
||
ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _monitor_socket.handle(); } | ||
|
||
ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _monitor_socket.handle(); } | ||
Comment on lines
+2231
to
+2233
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the comment above, I'd use more specific names for these functions. We are in Also, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. |
||
|
||
operator socket_ref() ZMQ_NOTHROW { return (zmq::socket_ref) _monitor_socket; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, I'd not add an operator here but rather a named There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. |
||
|
||
#if ZMQ_VERSION_MAJOR >= 4 | ||
bool get_event(zmq_event_t& eventMsg, std::string& address, zmq::recv_flags flags = zmq::recv_flags::none) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure about the signature of this method. What about
(This requires C++17 this way, but we can also do this with the fallback to This removes the ambiguity of whether the original value of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I'm not very familiar with std::optional and will try to implement this way |
||
{ | ||
assert(_monitor_socket); | ||
|
||
eventMsg.event = 0; | ||
eventMsg.value = 0; | ||
address = std::string(); | ||
|
||
{ | ||
message_t msg; | ||
int rc = zmq_msg_recv(msg.handle(), _monitor_socket.handle(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course, but then it is not possible to use the same behaviour as the old check_event. Otherwise I will need to use a try catch statement |
||
static_cast<int>(flags)); | ||
|
||
if (rc == -1) | ||
{ | ||
if (zmq_errno() == ETERM || zmq_errno() == EAGAIN) | ||
return false; | ||
Comment on lines
+2253
to
+2254
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this also returns false on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check https://github.com/zeromq/cppzmq/blob/master/zmq.hpp#L2274. I just kept same behaviour implemented in check_event(). If this is not mandatory, I can just throw an exception and not use a std::optionnal as described above, but just std::pair<> There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, yes, that's right. It's inconsistent one way or the other, but I think the behaviour of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what we should implement in this case? Maybe we shall throw an exception? |
||
else | ||
throw error_t(); | ||
} | ||
|
||
const char *data = msg.data<char>(); | ||
memcpy(&eventMsg.event, data, sizeof(uint16_t)); | ||
data += sizeof(uint16_t); | ||
memcpy(&eventMsg.value, data, sizeof(int32_t)); | ||
} | ||
|
||
message_t addrMsg; | ||
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like above, can't this use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See prev comment
Could you add this to the contribution guidelines of the project?
I don't like to put effort on something that will be not merged. I was waiting for a green thumb before putting more work on this, as I did not asked anyone about the design of this PR ;) Thanks for reviewing, will try to solve all of those next week. BR |
||
static_cast<int>(flags)); | ||
|
||
if (rc == -1) | ||
{ | ||
if (zmq_errno() == ETERM) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, why does this return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment above |
||
return false; | ||
else | ||
throw error_t(); | ||
} | ||
|
||
const char *str = addrMsg.data<char>(); | ||
address = std::string(str, str + addrMsg.size()); | ||
|
||
return true; | ||
} | ||
#endif | ||
|
||
bool check_event(int timeout = 0) | ||
{ | ||
assert(_monitor_socket); | ||
|
@@ -2357,6 +2414,15 @@ class monitor_t | |
_socket = socket_ref(); | ||
} | ||
#endif | ||
|
||
void close() ZMQ_NOTHROW | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please also add a test for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will be done. |
||
{ | ||
#ifdef ZMQ_EVENT_MONITOR_STOPPED | ||
abort(); | ||
#endif | ||
_monitor_socket = socket_t(); | ||
} | ||
|
||
virtual void on_monitor_started() {} | ||
virtual void on_event_connected(const zmq_event_t &event_, const char *addr_) | ||
{ | ||
|
@@ -2461,13 +2527,6 @@ class monitor_t | |
|
||
socket_ref _socket; | ||
socket_t _monitor_socket; | ||
|
||
void close() ZMQ_NOTHROW | ||
{ | ||
if (_socket) | ||
zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0); | ||
_monitor_socket.close(); | ||
} | ||
}; | ||
|
||
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does
lbd
stand for?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"lbd" is short for "lambda". I like to differentiate "callback" methods over classical variable in my code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. I am not sure if it's necessary to use a prefix at all (if it were a function pointer, would it make a difference?), but if you feel it's good to have, please use
lambda_
.