Skip to content

Commit 4547b70

Browse files
committed
add Socket and Thread.close!
1 parent f334336 commit 4547b70

File tree

3 files changed

+55
-32
lines changed

3 files changed

+55
-32
lines changed

mrblib/socket.rb

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,14 @@ def bind(endpoint)
55
self
66
end
77

8-
def close(blocky = true)
9-
LibZMQ.close(self, blocky)
8+
def close
9+
LibZMQ.close(self)
10+
nil
11+
end
12+
13+
# this immediately closes the socket, discarding any messages which couldn't be sent yet.
14+
def close!
15+
LibZMQ.close!(self)
1016
nil
1117
end
1218

mrblib/thread.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,12 @@ def finalize(object_id)
6868
LibZMQ.send(@pipe, [FINALIZE, object_id].to_msgpack, 0)
6969
end
7070

71-
def close(blocky = true)
72-
LibZMQ.threadclose(self, blocky)
71+
def close
72+
LibZMQ.threadclose(self)
73+
end
74+
75+
def close!
76+
LibZMQ.threadclose!(self)
7377
end
7478

7579
class Thread_fn

src/mrb_libzmq.c

+41-28
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,28 @@ static mrb_value
1919
mrb_zmq_close(mrb_state *mrb, mrb_value self)
2020
{
2121
mrb_value socket_val;
22-
mrb_bool blocky = TRUE;
23-
mrb_get_args(mrb, "o|b", &socket_val, &blocky);
22+
mrb_get_args(mrb, "o", &socket_val);
2423

2524
if (mrb_type(socket_val) == MRB_TT_DATA && DATA_TYPE(socket_val) == &mrb_zmq_socket_type) {
26-
if (!blocky) {
27-
int linger = 0;
28-
zmq_setsockopt(DATA_PTR(socket_val), ZMQ_LINGER, &linger, sizeof(linger));
25+
int rc = zmq_close(DATA_PTR(socket_val));
26+
if (unlikely(rc == -1)) {
27+
mrb_zmq_handle_error(mrb, "zmq_close");
2928
}
29+
mrb_data_init(socket_val, NULL, NULL);
30+
}
31+
32+
return mrb_nil_value();
33+
}
34+
35+
static mrb_value
36+
mrb_zmq_close_mark(mrb_state *mrb, mrb_value self)
37+
{
38+
mrb_value socket_val;
39+
mrb_get_args(mrb, "o", &socket_val);
40+
41+
if (mrb_type(socket_val) == MRB_TT_DATA && DATA_TYPE(socket_val) == &mrb_zmq_socket_type) {
42+
int disable = 0;
43+
zmq_setsockopt(DATA_PTR(socket_val), ZMQ_LINGER, &disable, sizeof(disable));
3044
int rc = zmq_close(DATA_PTR(socket_val));
3145
if (unlikely(rc == -1)) {
3246
mrb_zmq_handle_error(mrb, "zmq_close");
@@ -516,7 +530,7 @@ mrb_zmq_socket(mrb_state *mrb, mrb_value self)
516530
mrb_get_args(mrb, "i", &type);
517531
mrb_assert_int_fit(mrb_int, type, int, INT_MAX);
518532

519-
void *socket = zmq_socket(MRB_LIBZMQ_CONTEXT(mrb), type);
533+
void *socket = zmq_socket(MRB_LIBZMQ_CONTEXT(mrb), (int) type);
520534
if (likely(socket)) {
521535
mrb_data_init(self, socket, &mrb_zmq_socket_type);
522536
} else {
@@ -535,7 +549,7 @@ mrb_zmq_socket_monitor(mrb_state *mrb, mrb_value self)
535549
mrb_get_args(mrb, "dzi", &socket, &mrb_zmq_socket_type, &addr, &events);
536550
mrb_assert_int_fit(mrb_int, events, int, INT_MAX);
537551

538-
int rc = zmq_socket_monitor(socket, addr, events);
552+
int rc = zmq_socket_monitor(socket, addr, (int) events);
539553
if (unlikely(rc == -1)) {
540554
mrb_zmq_handle_error(mrb, "zmq_socket_monitor");
541555
}
@@ -558,7 +572,7 @@ mrb_zmq_socket_recv(mrb_state *mrb, mrb_value self)
558572
mrb_value msg_val = mrb_obj_new(mrb, zmq_msg_class, 0, NULL);
559573
zmq_msg_t *msg = (zmq_msg_t *) DATA_PTR(msg_val);
560574

561-
int rc = zmq_msg_recv (msg, DATA_PTR(self), flags);
575+
int rc = zmq_msg_recv (msg, DATA_PTR(self), (int) flags);
562576
if (unlikely(rc == -1)) {
563577
mrb_zmq_handle_error(mrb, "zmq_msg_recv");
564578
}
@@ -629,11 +643,11 @@ mrb_zmq_thread_fn_cb(mrb_state *mrb, const mrb_value mrb_zmq_thread_data_)
629643
mrb_zmq_thread_data_t *mrb_zmq_thread_data = (mrb_zmq_thread_data_t *) mrb_cptr(mrb_zmq_thread_data_);
630644

631645
struct RClass* zmq_mod = mrb_module_get(mrb, "ZMQ");
632-
mrb_value pipe_val = mrb_obj_value(mrb_obj_alloc(mrb, MRB_TT_DATA, mrb_class_get_under(mrb, zmq_mod, "Pair")));
633-
mrb_data_init(pipe_val, mrb_zmq_thread_data->backend, &mrb_zmq_socket_type);
646+
mrb_value backend_pipe = mrb_obj_value(mrb_obj_alloc(mrb, MRB_TT_DATA, mrb_class_get_under(mrb, zmq_mod, "Pair")));
647+
mrb_data_init(backend_pipe, mrb_zmq_thread_data->backend, &mrb_zmq_socket_type);
634648
mrb_value timeo = mrb_int_value(mrb, 120000);
635-
mrb_funcall(mrb, pipe_val, "sndtimeo=", 1, timeo);
636-
mrb_funcall(mrb, pipe_val, "rcvtimeo=", 1, timeo);
649+
mrb_funcall(mrb, backend_pipe, "sndtimeo=", 1, timeo);
650+
mrb_funcall(mrb, backend_pipe, "rcvtimeo=", 1, timeo);
637651

638652
mrb_value argv = mrb_msgpack_unpack(mrb, mrb_zmq_thread_data->argv_str);
639653
mrb_value block = mrb_msgpack_unpack(mrb, mrb_zmq_thread_data->block_str);
@@ -645,7 +659,7 @@ mrb_zmq_thread_fn_cb(mrb_state *mrb, const mrb_value mrb_zmq_thread_data_)
645659
} else {
646660
mrb_zmq_thread_data->thread_fn = mrb_obj_value(mrb_obj_alloc(mrb, MRB_TT_OBJECT, mrb_class_get_under(mrb, mrb_class_get_under(mrb, zmq_mod, "Thread"), "Thread_fn")));
647661
}
648-
mrb_iv_set(mrb, mrb_zmq_thread_data->thread_fn, mrb_intern_lit(mrb, "@pipe"), pipe_val);
662+
mrb_iv_set(mrb, mrb_zmq_thread_data->thread_fn, mrb_intern_lit(mrb, "@pipe"), backend_pipe);
649663
mrb_funcall_with_block(mrb, mrb_zmq_thread_data->thread_fn, mrb_intern_lit(mrb, "initialize"), RARRAY_LEN(argv), RARRAY_PTR(argv), block);
650664
int success = TRUE;
651665
int rc = zmq_send(mrb_zmq_thread_data->backend, &success, sizeof(success), 0);
@@ -731,12 +745,12 @@ mrb_zmq_threadstart(mrb_state *mrb, mrb_value thread_class)
731745
mrb_true_value()
732746
};
733747
mrb_zmq_thread_data->endpoint = mrb_string_value_cstr(mrb, &pipe_args[0]);
734-
mrb_value frontend_val = mrb_obj_new(mrb, mrb_class_get_under(mrb, mrb_module_get(mrb, "ZMQ"), "Pair"), NELEMS(pipe_args), pipe_args);
735-
mrb_zmq_thread_data->frontend = DATA_PTR(frontend_val);
748+
mrb_value frontend_pipe = mrb_obj_new(mrb, mrb_class_get_under(mrb, mrb_module_get(mrb, "ZMQ"), "Pair"), NELEMS(pipe_args), pipe_args);
749+
mrb_zmq_thread_data->frontend = DATA_PTR(frontend_pipe);
736750
mrb_value timeo = mrb_int_value(mrb, 120000);
737-
mrb_funcall(mrb, frontend_val, "sndtimeo=", 1, timeo);
738-
mrb_funcall(mrb, frontend_val, "rcvtimeo=", 1, timeo);
739-
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@pipe"), frontend_val);
751+
mrb_funcall(mrb, frontend_pipe, "sndtimeo=", 1, timeo);
752+
mrb_funcall(mrb, frontend_pipe, "rcvtimeo=", 1, timeo);
753+
mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@pipe"), frontend_pipe);
740754
mrb_zmq_thread_data->backend = zmq_socket(MRB_LIBZMQ_CONTEXT(mrb), ZMQ_PAIR);
741755
if (unlikely(!mrb_zmq_thread_data->backend)) {
742756
mrb_zmq_handle_error(mrb, "zmq_socket");
@@ -751,10 +765,9 @@ mrb_zmq_threadstart(mrb_state *mrb, mrb_value thread_class)
751765
int success = FALSE;
752766
zmq_recv(mrb_zmq_thread_data->frontend, &success, sizeof(success), 0);
753767
if (unlikely(!success)) {
754-
mrb_bool rcvmore = mrb_bool(mrb_funcall(mrb, frontend_val, "rcvmore?", 0));
768+
mrb_bool rcvmore = mrb_bool(mrb_funcall(mrb, frontend_pipe, "rcvmore?", 0));
755769
if (rcvmore) {
756-
mrb_value exc = mrb_msgpack_unpack(mrb, mrb_funcall(mrb, mrb_funcall(mrb, frontend_val, "recv", 0), "to_str", 0));
757-
mrb_exc_raise(mrb, exc);
770+
mrb_exc_raise(mrb, mrb_msgpack_unpack(mrb, mrb_funcall(mrb, mrb_funcall(mrb, frontend_pipe, "recv", 0), "to_str", 0)));
758771
} else {
759772
mrb_raise(mrb, E_RUNTIME_ERROR, "Cannot initialize ZMQ Thread");
760773
}
@@ -996,7 +1009,9 @@ mrb_zmq_poller_wait(mrb_state *mrb, mrb_value self)
9961009
return self;
9971010
}
9981011
}
1012+
9991013
#else
1014+
10001015
static mrb_value
10011016
mrb_zmq_poller_new(mrb_state *mrb, mrb_value self)
10021017
{
@@ -1108,10 +1123,7 @@ mrb_zmq_poller_remove(mrb_state *mrb, mrb_value self)
11081123
++ptr;
11091124
}
11101125
mrb_funcall(mrb, sockets, "delete_at", 1, mrb_int_value(mrb, i));
1111-
pollitems = (struct zmq_pollitem_t *) mrb_realloc(mrb, DATA_PTR(self), RARRAY_LEN(sockets) * sizeof(*pollitems));
1112-
if (DATA_PTR(self) != pollitems) {
1113-
mrb_data_init(self, pollitems, &mrb_zmq_poller_type);
1114-
}
1126+
DATA_PTR(self) = (struct zmq_pollitem_t *) mrb_realloc(mrb, DATA_PTR(self), RARRAY_LEN(sockets) * sizeof(*pollitems));
11151127

11161128
return self;
11171129
}
@@ -1216,7 +1228,7 @@ mrb_zmq_timers_add(mrb_state *mrb, mrb_value self)
12161228
timer_fn_arg->block = block;
12171229
mrb_iv_set(mrb, timer, mrb_intern_lit(mrb, "block"), block);
12181230

1219-
int timer_id = zmq_timers_add(DATA_PTR(self), interval, mrb_zmq_timer_fn, timer_fn_arg);
1231+
int timer_id = zmq_timers_add(DATA_PTR(self), (size_t) interval, mrb_zmq_timer_fn, timer_fn_arg);
12201232
if (unlikely(timer_id == -1)) {
12211233
mrb_zmq_handle_error(mrb, "zmq_timers_add");
12221234
}
@@ -1234,7 +1246,7 @@ mrb_zmq_timers_set_interval(mrb_state *mrb, mrb_value self)
12341246
mrb_assert_int_fit(mrb_int, interval, size_t, SIZE_MAX);
12351247

12361248
mrb_zmq_timers_fn_t *timer_fn_arg = (mrb_zmq_timers_fn_t *) DATA_PTR(self);
1237-
int rc = zmq_timers_set_interval(DATA_PTR(timer_fn_arg->timers), timer_fn_arg->timer_id, interval);
1249+
int rc = zmq_timers_set_interval(DATA_PTR(timer_fn_arg->timers), timer_fn_arg->timer_id, (size_t) interval);
12381250
if (unlikely(rc == -1)) {
12391251
mrb_zmq_handle_error(mrb, "zmq_timers_set_interval");
12401252
}
@@ -1426,7 +1438,8 @@ mrb_mruby_zmq_gem_init(mrb_state* mrb)
14261438
libzmq_mod = mrb_define_module(mrb, "LibZMQ");
14271439
mrb_define_const(mrb, libzmq_mod, "_Context", mrb_cptr_value(mrb, context));
14281440
mrb_define_module_function(mrb, libzmq_mod, "bind", mrb_zmq_bind, MRB_ARGS_REQ(2));
1429-
mrb_define_module_function(mrb, libzmq_mod, "close", mrb_zmq_close, MRB_ARGS_ARG(1, 1));
1441+
mrb_define_module_function(mrb, libzmq_mod, "close", mrb_zmq_close, MRB_ARGS_REQ(1));
1442+
mrb_define_module_function(mrb, libzmq_mod, "close!", mrb_zmq_close_mark, MRB_ARGS_REQ(1));
14301443
mrb_define_module_function(mrb, libzmq_mod, "connect", mrb_zmq_connect, MRB_ARGS_REQ(2));
14311444
mrb_define_module_function(mrb, libzmq_mod, "ctx_get", mrb_zmq_ctx_get, MRB_ARGS_REQ(1));
14321445
mrb_define_module_function(mrb, libzmq_mod, "ctx_set", mrb_zmq_ctx_set, MRB_ARGS_REQ(2));

0 commit comments

Comments
 (0)