Skip to content

Commit 91bfe7a

Browse files
committed
Sync substrate retry working.
1 parent 951d558 commit 91bfe7a

File tree

2 files changed

+47
-29
lines changed

2 files changed

+47
-29
lines changed

async_substrate_interface/substrate_addons.py

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
from functools import partial
44
from itertools import cycle
55
from typing import Optional
6+
from websockets.exceptions import ConnectionClosed
67

78
from async_substrate_interface.async_substrate import AsyncSubstrateInterface
89
from async_substrate_interface.errors import MaxRetriesExceeded
910
from async_substrate_interface.sync_substrate import SubstrateInterface
1011

12+
logger = logging.getLogger("async_substrate_interface")
13+
1114

1215
RETRY_METHODS = [
1316
"_get_block_handler",
@@ -106,65 +109,64 @@ def __init__(
106109
max_retries=max_retries,
107110
)
108111
initialized = True
112+
logger.info(f"Connected to {chain_url}")
109113
break
110114
except ConnectionError:
111-
logging.warning(f"Unable to connect to {chain_url}")
115+
logger.warning(f"Unable to connect to {chain_url}")
112116
if not initialized:
113117
raise ConnectionError(
114118
f"Unable to connect at any chains specified: {[url] + fallback_chains}"
115119
)
120+
retry_methods = ["connect"] + RETRY_METHODS
116121
self._original_methods = {
117-
method: getattr(self, method) for method in RETRY_METHODS
122+
method: getattr(self, method) for method in retry_methods
118123
}
119-
for method in RETRY_METHODS:
124+
for method in retry_methods:
120125
setattr(self, method, partial(self._retry, method))
121126

122127
def _retry(self, method, *args, **kwargs):
123128
try:
124129
method_ = self._original_methods[method]
125130
return method_(*args, **kwargs)
126-
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
131+
except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e:
127132
try:
128133
self._reinstantiate_substrate(e)
129-
method_ = getattr(self, method)
130-
return self._retry(method_(*args, **kwargs))
134+
method_ = self._original_methods[method]
135+
return method_(*args, **kwargs)
131136
except StopIteration:
132-
logging.error(
137+
logger.error(
133138
f"Max retries exceeded with {self.url}. No more fallback chains."
134139
)
135140
raise MaxRetriesExceeded
136141

137142
def _retry_property(self, property_):
138143
try:
139144
return getattr(self, property_)
140-
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
145+
except (MaxRetriesExceeded, ConnectionError, EOFError, ConnectionClosed) as e:
141146
try:
142147
self._reinstantiate_substrate(e)
143148
return self._retry_property(property_)
144149
except StopIteration:
145-
logging.error(
150+
logger.error(
146151
f"Max retries exceeded with {self.url}. No more fallback chains."
147152
)
148153
raise MaxRetriesExceeded
149154

150155
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
151156
next_network = next(self.fallback_chains)
157+
self.ws.close()
152158
if e.__class__ == MaxRetriesExceeded:
153-
logging.error(
159+
logger.error(
154160
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
155161
)
156162
else:
157-
print(f"Connection error. Trying again with {next_network}")
158-
super().__init__(
159-
url=next_network,
160-
ss58_format=self.ss58_format,
161-
type_registry=self.type_registry,
162-
use_remote_preset=self.use_remote_preset,
163-
chain_name=self.chain_name,
164-
_mock=self._mock,
165-
retry_timeout=self.retry_timeout,
166-
max_retries=self.max_retries,
167-
)
163+
logger.error(f"Connection error. Trying again with {next_network}")
164+
self.url = next_network
165+
self.chain_endpoint = next_network
166+
self.initialized = False
167+
self.ws = self.connect(init=True)
168+
if not self._mock:
169+
self.initialize()
168170

169171

170172
class RetryAsyncSubstrate(AsyncSubstrateInterface):
@@ -213,7 +215,7 @@ def __init__(
213215
def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
214216
next_network = next(self.fallback_chains)
215217
if e.__class__ == MaxRetriesExceeded:
216-
logging.error(
218+
logger.error(
217219
f"Max retries exceeded with {self.url}. Retrying with {next_network}."
218220
)
219221
else:
@@ -228,12 +230,22 @@ def _reinstantiate_substrate(self, e: Optional[Exception] = None) -> None:
228230
retry_timeout=self.retry_timeout,
229231
max_retries=self.max_retries,
230232
)
233+
self._original_methods = {
234+
method: getattr(self, method) for method in RETRY_METHODS
235+
}
236+
for method in RETRY_METHODS:
237+
setattr(self, method, partial(self._retry, method))
231238

232239
async def _retry(self, method, *args, **kwargs):
233240
try:
234241
method_ = self._original_methods[method]
235242
return await method_(*args, **kwargs)
236-
except (MaxRetriesExceeded, ConnectionError, ConnectionRefusedError) as e:
243+
except (
244+
MaxRetriesExceeded,
245+
ConnectionError,
246+
ConnectionRefusedError,
247+
EOFError,
248+
) as e:
237249
try:
238250
self._reinstantiate_substrate(e)
239251
await self.initialize()
@@ -243,7 +255,7 @@ async def _retry(self, method, *args, **kwargs):
243255
else:
244256
return method_(*args, **kwargs)
245257
except StopIteration:
246-
logging.error(
258+
logger.error(
247259
f"Max retries exceeded with {self.url}. No more fallback chains."
248260
)
249261
raise MaxRetriesExceeded
@@ -256,7 +268,7 @@ async def _retry_property(self, property_):
256268
self._reinstantiate_substrate(e)
257269
return await self._retry_property(property_)
258270
except StopIteration:
259-
logging.error(
271+
logger.error(
260272
f"Max retries exceeded with {self.url}. No more fallback chains."
261273
)
262274
raise MaxRetriesExceeded

async_substrate_interface/sync_substrate.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import functools
22
import logging
3+
import socket
34
from hashlib import blake2b
45
from typing import Optional, Union, Callable, Any
56

@@ -511,7 +512,6 @@ def __init__(
511512
"strict_scale_decode": True,
512513
}
513514
self.initialized = False
514-
self._forgettable_task = None
515515
self.ss58_format = ss58_format
516516
self.type_registry = type_registry
517517
self.type_registry_preset = type_registry_preset
@@ -587,13 +587,19 @@ def name(self):
587587

588588
def connect(self, init=False):
589589
if init is True:
590-
return connect(self.chain_endpoint, max_size=self.ws_max_size)
590+
try:
591+
return connect(self.chain_endpoint, max_size=self.ws_max_size)
592+
except (ConnectionError, socket.gaierror) as e:
593+
raise ConnectionError(e)
591594
else:
592595
if not self.ws.close_code:
593596
return self.ws
594597
else:
595-
self.ws = connect(self.chain_endpoint, max_size=self.ws_max_size)
596-
return self.ws
598+
try:
599+
self.ws = connect(self.chain_endpoint, max_size=self.ws_max_size)
600+
return self.ws
601+
except (ConnectionError, socket.gaierror) as e:
602+
raise ConnectionError(e)
597603

598604
def get_storage_item(
599605
self, module: str, storage_function: str, block_hash: str = None

0 commit comments

Comments
 (0)