|
8 | 8 | from threading import Event
|
9 | 9 | from threading import Thread
|
10 | 10 | from typing import Any
|
11 |
| -from typing import Awaitable |
12 | 11 | from typing import Dict
|
13 | 12 | from typing import List
|
14 | 13 | from typing import Optional
|
15 |
| -from typing import Union |
16 | 14 |
|
17 | 15 | import zmq
|
18 | 16 | from traitlets import Instance
|
|
30 | 28 | # during garbage collection of threads at exit
|
31 | 29 |
|
32 | 30 |
|
33 |
| -async def get_msg(msg: Awaitable) -> Union[List[bytes], List[zmq.Message]]: |
34 |
| - return await msg |
35 |
| - |
36 |
| - |
37 | 31 | class ThreadedZMQSocketChannel(object):
|
38 | 32 | """A ZMQ socket invoking a callback in the ioloop"""
|
39 | 33 |
|
@@ -68,6 +62,7 @@ def __init__(
|
68 | 62 | evt = Event()
|
69 | 63 |
|
70 | 64 | def setup_stream():
|
| 65 | + assert self.socket is not None |
71 | 66 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
|
72 | 67 | self.stream.on_recv(self._handle_recv)
|
73 | 68 | evt.set()
|
@@ -113,13 +108,11 @@ def thread_send():
|
113 | 108 | assert self.ioloop is not None
|
114 | 109 | self.ioloop.add_callback(thread_send)
|
115 | 110 |
|
116 |
| - def _handle_recv(self, future_msg: Awaitable) -> None: |
| 111 | + def _handle_recv(self, msg_list: List[bytes]) -> None: |
117 | 112 | """Callback for stream.on_recv.
|
118 | 113 |
|
119 | 114 | Unpacks message, and calls handlers with it.
|
120 | 115 | """
|
121 |
| - assert self.ioloop is not None |
122 |
| - msg_list = self.ioloop._asyncio_event_loop.run_until_complete(get_msg(future_msg)) |
123 | 116 | assert self.session is not None
|
124 | 117 | ident, smsg = self.session.feed_identities(msg_list)
|
125 | 118 | msg = self.session.deserialize(smsg)
|
|
0 commit comments