Skip to content

Commit 3f50aaf

Browse files
committed
Async also working.
1 parent 91bfe7a commit 3f50aaf

File tree

2 files changed

+104
-64
lines changed

2 files changed

+104
-64
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -734,15 +734,14 @@ async def initialize(self):
734734
"""
735735
Initialize the connection to the chain.
736736
"""
737-
async with self._lock:
738-
self._initializing = True
739-
if not self.initialized:
740-
if not self._chain:
741-
chain = await self.rpc_request("system_chain", [])
742-
self._chain = chain.get("result")
743-
await self.init_runtime()
744-
self.initialized = True
745-
self._initializing = False
737+
self._initializing = True
738+
if not self.initialized:
739+
if not self._chain:
740+
chain = await self.rpc_request("system_chain", [])
741+
self._chain = chain.get("result")
742+
await self.init_runtime()
743+
self.initialized = True
744+
self._initializing = False
746745

747746
async def __aexit__(self, exc_type, exc_val, exc_tb):
748747
pass

async_substrate_interface/substrate_addons.py

Lines changed: 96 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1+
"""
2+
A number of "plugins" for SubstrateInterface (and AsyncSubstrateInterface). At initial creation, it contains only
3+
Retry (sync and async versions).
4+
"""
5+
16
import asyncio
27
import logging
8+
import socket
39
from functools import partial
410
from itertools import cycle
511
from typing import Optional
12+
613
from websockets.exceptions import ConnectionClosed
714

8-
from async_substrate_interface.async_substrate import AsyncSubstrateInterface
15+
from async_substrate_interface.async_substrate import AsyncSubstrateInterface, Websocket
916
from async_substrate_interface.errors import MaxRetriesExceeded
1017
from async_substrate_interface.sync_substrate import SubstrateInterface
1118

@@ -69,6 +76,34 @@
6976

7077

7178
class RetrySyncSubstrate(SubstrateInterface):
79+
"""
80+
A subclass of SubstrateInterface that allows for handling chain failures by using backup chains. If a sustained
81+
network failure is encountered on a chain endpoint, the object will initialize a new connection on the next chain in
82+
the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain in `fallback_chains`,
83+
the connection will attempt to iterate over the list (starting with `url`) again.
84+
85+
E.g.
86+
```
87+
substrate = RetrySyncSubstrate(
88+
"wss://entrypoint-finney.opentensor.ai:443",
89+
fallback_chains=["ws://127.0.0.1:9946"]
90+
)
91+
```
92+
In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this
93+
also fails, a `MaxRetriesExceeded` exception will be raised.
94+
95+
```
96+
substrate = RetrySyncSubstrate(
97+
"wss://entrypoint-finney.opentensor.ai:443",
98+
fallback_chains=["ws://127.0.0.1:9946"],
99+
retry_forever=True
100+
)
101+
```
102+
In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost),
103+
the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and
104+
so forth.
105+
"""
106+
72107
def __init__(
73108
self,
74109
url: str,
@@ -117,6 +152,7 @@ def __init__(
117152
raise ConnectionError(
118153
f"Unable to connect at any chains specified: {[url] + fallback_chains}"
119154
)
155+
# "connect" is only used by SubstrateInterface, not AsyncSubstrateInterface
120156
retry_methods = ["connect"] + RETRY_METHODS
121157
self._original_methods = {
122158
method: getattr(self, method) for method in retry_methods
@@ -125,33 +161,19 @@ def __init__(
125161
setattr(self, method, partial(self._retry, method))
126162

127163
def _retry(self, method, *args, **kwargs):
164+
method_ = self._original_methods[method]
128165
try:
129-
method_ = self._original_methods[method]
130166
return method_(*args, **kwargs)
131167
except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e:
132168
try:
133169
self._reinstantiate_substrate(e)
134-
method_ = self._original_methods[method]
135170
return method_(*args, **kwargs)
136171
except StopIteration:
137172
logger.error(
138173
f"Max retries exceeded with {self.url}. No more fallback chains."
139174
)
140175
raise MaxRetriesExceeded
141176

142-
def _retry_property(self, property_):
143-
try:
144-
return getattr(self, property_)
145-
except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e:
146-
try:
147-
self._reinstantiate_substrate(e)
148-
return self._retry_property(property_)
149-
except StopIteration:
150-
logger.error(
151-
f"Max retries exceeded with {self.url}. No more fallback chains."
152-
)
153-
raise MaxRetriesExceeded
154-
155177
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
156178
next_network = next(self.fallback_chains)
157179
self.ws.close()
@@ -170,6 +192,34 @@ def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
170192

171193

172194
class RetryAsyncSubstrate(AsyncSubstrateInterface):
195+
"""
196+
A subclass of AsyncSubstrateInterface that allows for handling chain failures by using backup chains. If a
197+
sustained network failure is encountered on a chain endpoint, the object will initialize a new connection on
198+
the next chain in the `fallback_chains` list. If the `retry_forever` flag is set, upon reaching the last chain
199+
in `fallback_chains`, the connection will attempt to iterate over the list (starting with `url`) again.
200+
201+
E.g.
202+
```
203+
substrate = RetryAsyncSubstrate(
204+
"wss://entrypoint-finney.opentensor.ai:443",
205+
fallback_chains=["ws://127.0.0.1:9946"]
206+
)
207+
```
208+
In this case, if there is a failure on entrypoint-finney, the connection will next attempt to hit localhost. If this
209+
also fails, a `MaxRetriesExceeded` exception will be raised.
210+
211+
```
212+
substrate = RetryAsyncSubstrate(
213+
"wss://entrypoint-finney.opentensor.ai:443",
214+
fallback_chains=["ws://127.0.0.1:9946"],
215+
retry_forever=True
216+
)
217+
```
218+
In this case, rather than a MaxRetriesExceeded exception being raised upon failure of the second chain (localhost),
219+
the object will again being to initialize a new connection on entrypoint-finney, and then localhost, and so on and
220+
so forth.
221+
"""
222+
173223
def __init__(
174224
self,
175225
url: str,
@@ -212,62 +262,53 @@ def __init__(
212262
for method in RETRY_METHODS:
213263
setattr(self, method, partial(self._retry, method))
214264

215-
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
265+
async def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
216266
next_network = next(self.fallback_chains)
217267
if e.__class__ == MaxRetriesExceeded:
218268
logger.error(
219269
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
220270
)
221271
else:
222-
print(f"Connection error. Trying again with {next_network}")
223-
super().__init__(
224-
url=next_network,
225-
ss58_format=self.ss58_format,
226-
type_registry=self.type_registry,
227-
use_remote_preset=self.use_remote_preset,
228-
chain_name=self.chain_name,
229-
_mock=self._mock,
230-
retry_timeout=self.retry_timeout,
231-
max_retries=self.max_retries,
272+
logger.error(f"Connection error. Trying again with {next_network}")
273+
try:
274+
await self.ws.shutdown()
275+
except AttributeError:
276+
pass
277+
if self._forgettable_task is not None:
278+
self._forgettable_task: asyncio.Task
279+
self._forgettable_task.cancel()
280+
try:
281+
await self._forgettable_task
282+
except asyncio.CancelledError:
283+
pass
284+
self.chain_endpoint = next_network
285+
self.url = next_network
286+
self.ws = Websocket(
287+
next_network,
288+
options={
289+
"max_size": self.ws_max_size,
290+
"write_limit": 2**16,
291+
},
232292
)
233-
self._original_methods = {
234-
method: getattr(self, method) for method in RETRY_METHODS
235-
}
236-
for method in RETRY_METHODS:
237-
setattr(self, method, partial(self._retry, method))
293+
self._initialized = False
294+
self._initializing = False
295+
await self.initialize()
238296

239297
async def _retry(self, method, *args, **kwargs):
298+
method_ = self._original_methods[method]
240299
try:
241-
method_ = self._original_methods[method]
242300
return await method_(*args, **kwargs)
243301
except (
244302
MaxRetriesExceeded,
245303
ConnectionError,
246-
ConnectionRefusedError,
304+
ConnectionClosed,
247305
EOFError,
306+
socket.gaierror,
248307
) as e:
249308
try:
250-
self._reinstantiate_substrate(e)
251-
await self.initialize()
252-
method_ = getattr(self, method)
253-
if asyncio.iscoroutinefunction(method_):
254-
return await method_(*args, **kwargs)
255-
else:
256-
return method_(*args, **kwargs)
257-
except StopIteration:
258-
logger.error(
259-
f"Max retries exceeded with {self.url}. No more fallback chains."
260-
)
261-
raise MaxRetriesExceeded
262-
263-
async def _retry_property(self, property_):
264-
try:
265-
return await getattr(self, property_)
266-
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
267-
try:
268-
self._reinstantiate_substrate(e)
269-
return await self._retry_property(property_)
270-
except StopIteration:
309+
await self._reinstantiate_substrate(e)
310+
return await method_(*args, **kwargs)
311+
except StopAsyncIteration:
271312
logger.error(
272313
f"Max retries exceeded with {self.url}. No more fallback chains."
273314
)

0 commit comments

Comments
 (0)