@@ -623,6 +623,37 @@ mrb_zmq_z85_encode(mrb_state *mrb, mrb_value self)
623
623
return dest ;
624
624
}
625
625
626
+ static void
627
+ mrb_zmq_thread_fn_cb (mrb_state * mrb , const mrb_value mrb_zmq_thread_data_ )
628
+ {
629
+ mrb_zmq_thread_data_t * mrb_zmq_thread_data = (mrb_zmq_thread_data_t * ) mrb_cptr (mrb_zmq_thread_data_ );
630
+
631
+ struct RClass * zmq_mod = mrb_module_get (mrb , "ZMQ" );
632
+ mrb_value pipe_val = mrb_obj_value (mrb_obj_alloc (mrb , MRB_TT_DATA , mrb_class_get_under (mrb , zmq_mod , "Pair" )));
633
+ mrb_data_init (pipe_val , mrb_zmq_thread_data -> backend , & mrb_zmq_socket_type );
634
+ mrb_value timeo = mrb_int_value (mrb , 120000 );
635
+ mrb_funcall (mrb , pipe_val , "sndtimeo=" , 1 , timeo );
636
+ mrb_funcall (mrb , pipe_val , "rcvtimeo=" , 1 , timeo );
637
+
638
+ mrb_value argv = mrb_msgpack_unpack (mrb , mrb_zmq_thread_data -> argv_str );
639
+ mrb_value block = mrb_msgpack_unpack (mrb , mrb_zmq_thread_data -> block_str );
640
+ if (mrb_type (mrb_ary_ref (mrb , argv , 0 )) == MRB_TT_CLASS ) {
641
+ mrb_value bg_class = mrb_ary_shift (mrb , argv );
642
+ enum mrb_vtype ttype = MRB_INSTANCE_TT (mrb_class_ptr (bg_class ));
643
+ if (ttype == 0 ) ttype = MRB_TT_OBJECT ;
644
+ mrb_zmq_thread_data -> thread_fn = mrb_obj_value (mrb_obj_alloc (mrb , ttype , mrb_class_ptr (bg_class )));
645
+ } else {
646
+ mrb_zmq_thread_data -> thread_fn = mrb_obj_value (mrb_obj_alloc (mrb , MRB_TT_OBJECT , mrb_class_get_under (mrb , mrb_class_get_under (mrb , zmq_mod , "Thread" ), "Thread_fn" )));
647
+ }
648
+ mrb_iv_set (mrb , mrb_zmq_thread_data -> thread_fn , mrb_intern_lit (mrb , "@pipe" ), pipe_val );
649
+ mrb_funcall_with_block (mrb , mrb_zmq_thread_data -> thread_fn , mrb_intern_lit (mrb , "initialize" ), RARRAY_LEN (argv ), RARRAY_PTR (argv ), block );
650
+ int success = TRUE;
651
+ int rc = zmq_send (mrb_zmq_thread_data -> backend , & success , sizeof (success ), 0 );
652
+ if (unlikely (rc == -1 )) {
653
+ mrb_zmq_handle_error (mrb , "zmq_send" );
654
+ }
655
+ }
656
+
626
657
static int
627
658
mrb_zmq_thread_fn (void * mrb_zmq_thread_data_ )
628
659
{
@@ -634,49 +665,14 @@ mrb_zmq_thread_fn(void *mrb_zmq_thread_data_)
634
665
mrb_state * mrb = mrb_open_allocf (mrb_zmq_thread_data -> mrb_parent -> allocf , mrb_zmq_thread_data -> mrb_parent -> allocf_ud );
635
666
if (likely (mrb )) {
636
667
mrb_zmq_thread_data -> backend_ctx = MRB_LIBZMQ_CONTEXT (mrb );
637
- mrb_value thread_fn = mrb_nil_value ();
638
-
639
- struct mrb_jmpbuf * prev_jmp = mrb -> jmp ;
640
- struct mrb_jmpbuf c_jmp ;
641
-
642
- MRB_TRY (& c_jmp )
643
- {
644
- mrb -> jmp = & c_jmp ;
645
- struct RClass * zmq_mod = mrb_module_get (mrb , "ZMQ" );
646
- mrb_value pipe_val = mrb_obj_value (mrb_obj_alloc (mrb , MRB_TT_DATA , mrb_class_get_under (mrb , zmq_mod , "Pair" )));
647
- mrb_data_init (pipe_val , mrb_zmq_thread_data -> backend , & mrb_zmq_socket_type );
648
- mrb_value timeo = mrb_int_value (mrb , 120000 );
649
- mrb_funcall (mrb , pipe_val , "sndtimeo=" , 1 , timeo );
650
- mrb_funcall (mrb , pipe_val , "rcvtimeo=" , 1 , timeo );
651
-
652
- mrb_value argv = mrb_msgpack_unpack (mrb , mrb_zmq_thread_data -> argv_str );
653
- mrb_value block = mrb_msgpack_unpack (mrb , mrb_zmq_thread_data -> block_str );
654
- if (mrb_type (mrb_ary_ref (mrb , argv , 0 )) == MRB_TT_CLASS ) {
655
- mrb_value bg_class = mrb_ary_shift (mrb , argv );
656
- enum mrb_vtype ttype = MRB_INSTANCE_TT (mrb_class_ptr (bg_class ));
657
- if (ttype == 0 ) ttype = MRB_TT_OBJECT ;
658
- thread_fn = mrb_obj_value (mrb_obj_alloc (mrb , ttype , mrb_class_ptr (bg_class )));
659
- } else {
660
- thread_fn = mrb_obj_value (mrb_obj_alloc (mrb , MRB_TT_OBJECT , mrb_class_get_under (mrb , mrb_class_get_under (mrb , zmq_mod , "Thread" ), "Thread_fn" )));
661
- }
662
- mrb_iv_set (mrb , thread_fn , mrb_intern_lit (mrb , "@pipe" ), pipe_val );
663
- mrb_funcall_with_block (mrb , thread_fn , mrb_intern_lit (mrb , "initialize" ), RARRAY_LEN (argv ), RARRAY_PTR (argv ), block );
664
- success = TRUE;
665
-
666
- rc = zmq_send (mrb_zmq_thread_data -> backend , & success , sizeof (success ), 0 );
667
- if (unlikely (rc == -1 )) {
668
- mrb_zmq_handle_error (mrb , "zmq_send" );
669
- }
670
- mrb -> jmp = prev_jmp ;
671
- }
672
- MRB_CATCH (& c_jmp )
673
- {
674
- mrb -> jmp = prev_jmp ;
675
- success = FALSE;
676
- mrb_value old_exc = mrb_obj_value (mrb -> exc );
677
- mrb -> exc = NULL ;
678
- if (mrb_test (old_exc ) && mrb_exception_p (old_exc )) {
679
- mrb_value exc = mrb_msgpack_pack (mrb , old_exc );
668
+ mrb_zmq_thread_data -> thread_fn = mrb_nil_value ();
669
+
670
+ mrb_bool error = FALSE;
671
+ mrb_value exc = mrb_protect (mrb , mrb_zmq_thread_fn_cb , mrb_cptr_value (mrb , mrb_zmq_thread_data ), & error );
672
+ success = !error ;
673
+ if (unlikely (error )) {
674
+ if (mrb_test (exc ) && mrb_exception_p (exc )) {
675
+ exc = mrb_msgpack_pack (mrb , exc );
680
676
if (mrb_string_p (exc )) {
681
677
zmq_send (mrb_zmq_thread_data -> backend , & success , sizeof (success ), ZMQ_SNDMORE );
682
678
zmq_send (mrb_zmq_thread_data -> backend , RSTRING_PTR (exc ), RSTRING_LEN (exc ), 0 );
@@ -688,17 +684,18 @@ mrb_zmq_thread_fn(void *mrb_zmq_thread_data_)
688
684
}
689
685
zmq_close (mrb_zmq_thread_data -> backend );
690
686
}
691
- MRB_END_EXC (& c_jmp );
692
687
693
688
mrb_gc_arena_restore (mrb , 0 );
694
689
if (likely (success )) {
695
- mrb_gc_protect (mrb , thread_fn );
696
- mrb_funcall (mrb , thread_fn , "run" , 0 );
690
+ mrb_gc_register (mrb , mrb_zmq_thread_data -> thread_fn );
691
+ mrb_funcall (mrb , mrb_zmq_thread_data -> thread_fn , "run" , 0 );
697
692
}
693
+
698
694
if (mrb -> exc && !mrb_obj_is_kind_of (mrb , mrb_obj_value (mrb -> exc ), E_ETERM_ERROR )) {
699
695
success = FALSE;
700
696
mrb_print_error (mrb );
701
697
}
698
+
702
699
mrb_close (mrb );
703
700
} else {
704
701
zmq_send (mrb_zmq_thread_data -> backend , & success , sizeof (success ), 0 );
@@ -725,8 +722,6 @@ mrb_zmq_threadstart(mrb_state *mrb, mrb_value thread_class)
725
722
mrb_zmq_thread_data_t * mrb_zmq_thread_data = (mrb_zmq_thread_data_t * ) mrb_realloc (mrb , DATA_PTR (self ), sizeof (* mrb_zmq_thread_data ));
726
723
memset (mrb_zmq_thread_data , 0 , sizeof (* mrb_zmq_thread_data ));
727
724
mrb_data_init (self , mrb_zmq_thread_data , & mrb_zmq_thread_type );
728
- mrb_value threads = mrb_iv_get (mrb , mrb_obj_value (mrb_class_get_under (mrb , mrb_module_get (mrb , "ZMQ" ), "Thread" )), mrb_intern_lit (mrb , "threads" ));
729
- mrb_hash_set (mrb , threads , mrb_int_value (mrb , (intptr_t ) mrb_zmq_thread_data ), self );
730
725
mrb_zmq_thread_data -> mrb_parent = mrb ;
731
726
mrb_zmq_thread_data -> argv_str = mrb_msgpack_pack_argv (mrb , argv , argv_len );
732
727
mrb_zmq_thread_data -> block_str = mrb_msgpack_pack (mrb , block );
@@ -793,8 +788,6 @@ mrb_zmq_threadclose(mrb_state *mrb, mrb_value self)
793
788
mrb_free (mrb , mrb_zmq_thread_data );
794
789
mrb_data_init (thread_val , NULL , NULL );
795
790
mrb_iv_remove (mrb , thread_val , mrb_intern_lit (mrb , "@pipe" ));
796
- mrb_value threads = mrb_iv_get (mrb , mrb_obj_value (mrb_class_get_under (mrb , mrb_module_get (mrb , "ZMQ" ), "Thread" )), mrb_intern_lit (mrb , "threads" ));
797
- mrb_hash_delete_key (mrb , threads , mrb_int_value (mrb , (intptr_t ) mrb_zmq_thread_data ));
798
791
return mrb_bool_value (success );
799
792
}
800
793
@@ -1468,8 +1461,6 @@ mrb_mruby_zmq_gem_init(mrb_state* mrb)
1468
1461
MRB_SET_INSTANCE_TT (zmq_thread_class , MRB_TT_DATA );
1469
1462
mrb_define_class_method (mrb , zmq_thread_class , "new" , mrb_zmq_threadstart , MRB_ARGS_ANY ());
1470
1463
1471
- mrb_iv_set (mrb , mrb_obj_value (zmq_thread_class ), mrb_intern_lit (mrb , "threads" ), mrb_hash_new (mrb ));
1472
-
1473
1464
struct RClass * zmq_poller_class = mrb_define_class_under (mrb , zmq_mod , "Poller" , mrb -> object_class );
1474
1465
MRB_SET_INSTANCE_TT (zmq_poller_class , MRB_TT_DATA );
1475
1466
mrb_define_const (mrb , zmq_poller_class , "In" , mrb_int_value (mrb , ZMQ_POLLIN ));
@@ -1543,10 +1534,8 @@ mrb_mruby_zmq_gem_init(mrb_state* mrb)
1543
1534
void
1544
1535
mrb_mruby_zmq_gem_final (mrb_state * mrb )
1545
1536
{
1546
- struct RClass * zmq_mod = mrb_module_get (mrb , "ZMQ" );
1547
- mrb_objspace_each_objects (mrb , mrb_zmq_thread_close_gem_final , mrb_class_get_under (mrb , zmq_mod , "Thread" ));
1548
1537
void * context = MRB_LIBZMQ_CONTEXT (mrb );
1549
1538
zmq_ctx_shutdown (context );
1550
- mrb_objspace_each_objects (mrb , mrb_zmq_zmq_close_gem_final , mrb_class_get_under (mrb , zmq_mod , "Socket" ));
1539
+ mrb_objspace_each_objects (mrb , mrb_zmq_zmq_close_gem_final , mrb_class_get_under (mrb , mrb_module_get ( mrb , "ZMQ" ) , "Socket" ));
1551
1540
zmq_ctx_term (context );
1552
1541
}
0 commit comments