@@ -2,8 +2,10 @@ import std/tables
2
2
import std/ sequtils
3
3
import std/ strutils
4
4
import pkg/ chronos
5
+ import pkg/ questionable
5
6
import pkg/ json_rpc/ rpcclient
6
7
import ../../ basics
8
+ import ../../ errors
7
9
import ../../ provider
8
10
include ../../ nimshims/ hashes
9
11
import ./ rpccalls
16
18
callbacks: Table [JsonNode , SubscriptionCallback ]
17
19
methodHandlers: Table [string , MethodHandler ]
18
20
MethodHandler * = proc (j: JsonNode ) {.gcsafe , raises : [].}
19
- SubscriptionCallback = proc (id, arguments: JsonNode ) {.gcsafe , raises :[].}
20
- SubscriptionError * = object of EthersError
21
+ SubscriptionCallback = proc (id, arguments: SubscriptionResult [JsonNode ]) {.gcsafe , raises :[CancelledError ].}
21
22
22
23
{.push raises :[].}
23
24
@@ -53,7 +54,7 @@ proc setMethodHandler(
53
54
method subscribeBlocks * (subscriptions: JsonRpcSubscriptions ,
54
55
onBlock: BlockHandler ):
55
56
Future [JsonNode ]
56
- {.async , base .} =
57
+ {.async , base , raises : [ CancelledError ] .} =
57
58
raiseAssert " not implemented"
58
59
59
60
method subscribeLogs * (subscriptions: JsonRpcSubscriptions ,
@@ -74,14 +75,16 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
74
75
await subscriptions.unsubscribe (id)
75
76
76
77
proc getCallback (subscriptions: JsonRpcSubscriptions ,
77
- id: JsonNode ): ? SubscriptionCallback =
78
+ id: JsonNode ): ? SubscriptionCallback {. raises :[].} =
78
79
try :
79
80
if not id.isNil and id in subscriptions.callbacks:
80
- subscriptions.callbacks[id].some
81
+ try :
82
+ return subscriptions.callbacks[id].some
83
+ except : discard
81
84
else :
82
- SubscriptionCallback .none
85
+ return SubscriptionCallback .none
83
86
except KeyError :
84
- SubscriptionCallback .none
87
+ return SubscriptionCallback .none
85
88
86
89
# Web sockets
87
90
@@ -95,17 +98,22 @@ proc new*(_: type JsonRpcSubscriptions,
95
98
proc subscriptionHandler (arguments: JsonNode ) {.raises :[].} =
96
99
let id = arguments{" subscription" } or newJString (" " )
97
100
if callback =? subscriptions.getCallback (id):
98
- callback (id, arguments)
101
+ callback (id, success ( arguments) )
99
102
subscriptions.setMethodHandler (" eth_subscription" , subscriptionHandler)
100
103
subscriptions
101
104
102
105
method subscribeBlocks (subscriptions: WebSocketSubscriptions ,
103
106
onBlock: BlockHandler ):
104
107
Future [JsonNode ]
105
- {.async .} =
106
- proc callback (id, arguments: JsonNode ) {.raises : [].} =
108
+ {.async , raises : [].} =
109
+ proc callback (id, argumentsResult: SubscriptionResult [JsonNode ]) {.raises : [].} =
110
+ if argumentsResult.isErr:
111
+ onBlock (SubscriptionResult [Block ].err (argumentsResult.error))
112
+ return
113
+ let arguments = argumentsResult.value
107
114
if blck =? Block .fromJson (arguments{" result" }):
108
- onBlock (blck)
115
+ onBlock (SubscriptionResult [Block ].ok (blck))
116
+
109
117
let id = await subscriptions.client.eth_subscribe (" newHeads" )
110
118
subscriptions.callbacks[id] = callback
111
119
return id
@@ -115,9 +123,15 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions,
115
123
onLog: LogHandler ):
116
124
Future [JsonNode ]
117
125
{.async .} =
118
- proc callback (id, arguments: JsonNode ) =
126
+ proc callback (id, argumentsResult: SubscriptionResult [JsonNode ]) =
127
+ if argumentsResult.isErr:
128
+ onLog (SubscriptionResult [Log ].err (argumentsResult.error))
129
+ return
130
+
131
+ let arguments = argumentsResult.value
119
132
if log =? Log .fromJson (arguments{" result" }):
120
- onLog (log)
133
+ onLog (SubscriptionResult [Log ].ok (log))
134
+
121
135
let id = await subscriptions.client.eth_subscribe (" logs" , filter)
122
136
subscriptions.callbacks[id] = callback
123
137
return id
@@ -149,7 +163,7 @@ proc new*(_: type JsonRpcSubscriptions,
149
163
150
164
let subscriptions = PollingSubscriptions (client: client)
151
165
152
- proc getChanges (originalId: JsonNode ): Future [JsonNode ] {.async .} =
166
+ proc getChanges (originalId: JsonNode ): Future [JsonNode ] {.async , raises :[ CancelledError , SubscriptionError ] .} =
153
167
try :
154
168
let mappedId = subscriptions.subscriptionMapping[originalId]
155
169
let changes = await subscriptions.client.eth_getFilterChanges (mappedId)
@@ -175,12 +189,19 @@ proc new*(_: type JsonRpcSubscriptions,
175
189
subscriptions.subscriptionMapping[originalId] = newId
176
190
return await getChanges (originalId)
177
191
else :
178
- raise e
192
+ raise newException ( SubscriptionError , " HTTP polling: There was an exception while getting subscription changes: " & e.msg, e)
179
193
180
- proc poll (id: JsonNode ) {.async .} =
181
- for change in await getChanges (id):
182
- if callback =? subscriptions.getCallback (id):
183
- callback (id, change)
194
+ proc poll (id: JsonNode ) {.async , raises : [CancelledError , SubscriptionError ].} =
195
+ without callback =? subscriptions.getCallback (id):
196
+ return
197
+
198
+ try :
199
+ for change in await getChanges (id):
200
+ callback (id, success (change))
201
+ except CancelledError as e:
202
+ raise e
203
+ except CatchableError as e:
204
+ callback (id, failure (e))
184
205
185
206
proc poll {.async .} =
186
207
untilCancelled:
@@ -198,16 +219,24 @@ method close*(subscriptions: PollingSubscriptions) {.async.} =
198
219
method subscribeBlocks (subscriptions: PollingSubscriptions ,
199
220
onBlock: BlockHandler ):
200
221
Future [JsonNode ]
201
- {.async .} =
222
+ {.async , raises :[ CancelledError ] .} =
202
223
203
- proc getBlock (hash: BlockHash ) {.async .} =
224
+ proc getBlock (hash: BlockHash ) {.async , raises :[ CancelledError ] .} =
204
225
try :
205
226
if blck =? (await subscriptions.client.eth_getBlockByHash (hash, false )):
206
- onBlock (blck)
207
- except CatchableError :
208
- discard
227
+ onBlock (SubscriptionResult [Block ].ok (blck))
228
+ except CancelledError as e:
229
+ raise e
230
+ except CatchableError as e:
231
+ let wrappedErr = newException (SubscriptionError , " HTTP polling: There was an exception while getting subscription's block: " & e.msg, e)
232
+ onBlock (SubscriptionResult [Block ].err (wrappedErr))
233
+
234
+ proc callback (id, changeResult: SubscriptionResult [JsonNode ]) {.raises :[CancelledError ].} =
235
+ if changeResult.isErr:
236
+ onBlock (SubscriptionResult [Block ].err (changeResult.error))
237
+ return
209
238
210
- proc callback (id, change: JsonNode ) =
239
+ let change = changeResult.value
211
240
if hash =? BlockHash .fromJson (change):
212
241
asyncSpawn getBlock (hash)
213
242
@@ -222,9 +251,14 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
222
251
Future [JsonNode ]
223
252
{.async .} =
224
253
225
- proc callback (id, change: JsonNode ) =
254
+ proc callback (id, changeResult: SubscriptionResult [JsonNode ]) =
255
+ if changeResult.isErr:
256
+ onLog (SubscriptionResult [Log ].err (changeResult.error))
257
+ return
258
+
259
+ let change = changeResult.value
226
260
if log =? Log .fromJson (change):
227
- onLog (log)
261
+ onLog (SubscriptionResult [ Log ]. ok ( log) )
228
262
229
263
let id = await subscriptions.client.eth_newFilter (filter)
230
264
subscriptions.callbacks[id] = callback
0 commit comments