|
| 1 | +import threading |
| 2 | + |
1 | 3 | from cassandra.connection import Connection, ConnectionShutdown
|
2 | 4 | import sys
|
3 | 5 | import asyncio
|
@@ -41,13 +43,12 @@ def end(self):
|
41 | 43 |
|
42 | 44 | def __init__(self, timeout, callback, loop):
|
43 | 45 | delayed = self._call_delayed_coro(timeout=timeout,
|
44 |
| - callback=callback, |
45 |
| - loop=loop) |
| 46 | + callback=callback) |
46 | 47 | self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)
|
47 | 48 |
|
48 | 49 | @staticmethod
|
49 |
| - async def _call_delayed_coro(timeout, callback, loop): |
50 |
| - await asyncio.sleep(timeout, loop=loop) |
| 50 | + async def _call_delayed_coro(timeout, callback): |
| 51 | + await asyncio.sleep(timeout) |
51 | 52 | return callback()
|
52 | 53 |
|
53 | 54 | def __lt__(self, other):
|
@@ -111,8 +112,11 @@ def initialize_reactor(cls):
|
111 | 112 | if cls._pid != os.getpid():
|
112 | 113 | cls._loop = None
|
113 | 114 | if cls._loop is None:
|
114 |
| - cls._loop = asyncio.new_event_loop() |
115 |
| - asyncio.set_event_loop(cls._loop) |
| 115 | + try: |
| 116 | + cls._loop = asyncio.get_running_loop() |
| 117 | + except RuntimeError: |
| 118 | + cls._loop = asyncio.new_event_loop() |
| 119 | + asyncio.set_event_loop(cls._loop) |
116 | 120 |
|
117 | 121 | if not cls._loop_thread:
|
118 | 122 | # daemonize so the loop will be shut down on interpreter
|
@@ -165,7 +169,7 @@ def push(self, data):
|
165 | 169 | else:
|
166 | 170 | chunks = [data]
|
167 | 171 |
|
168 |
| - if self._loop_thread.ident != get_ident(): |
| 172 | + if self._loop_thread != threading.current_thread(): |
169 | 173 | asyncio.run_coroutine_threadsafe(
|
170 | 174 | self._push_msg(chunks),
|
171 | 175 | loop=self._loop
|
|
0 commit comments