Skip to content

Commit 507ac6a

Browse files
authored
fix(subscriptions): filter not found recreates polling filter (#78)
1 parent 53e596e commit 507ac6a

File tree

4 files changed

+141
-5
lines changed

4 files changed

+141
-5
lines changed

ethers.nimble

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ requires "stew"
1515
requires "eth#c482b4c5b658a77cc96b49d4a397aa6d98472ac7"
1616

1717
task test, "Run the test suite":
18-
exec "nimble install -d -y"
18+
# exec "nimble install -d -y"
1919
withDir "testmodule":
2020
exec "nimble test"

ethers/providers/jsonrpc/subscriptions.nim

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import std/tables
22
import std/sequtils
3+
import std/strutils
34
import pkg/chronos
45
import pkg/json_rpc/rpcclient
56
import ../../basics
@@ -132,16 +133,31 @@ type
132133
PollingSubscriptions = ref object of JsonRpcSubscriptions
133134
polling: Future[void]
134135

136+
# We need to keep around the filters that are used to create log filters on the RPC node
137+
# as there might be a time when they need to be recreated as RPC node might prune/forget
138+
# about them
139+
filters: Table[JsonNode, EventFilter]
140+
141+
# Used when filters are recreated to translate from the id that user
142+
# originally got returned to new filter id
143+
subscriptionMapping: Table[JsonNode, JsonNode]
144+
135145
proc new*(_: type JsonRpcSubscriptions,
136146
client: RpcHttpClient,
137147
pollingInterval = 4.seconds): JsonRpcSubscriptions =
138148

139149
let subscriptions = PollingSubscriptions(client: client)
140150

141-
proc getChanges(id: JsonNode): Future[JsonNode] {.async.} =
151+
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} =
142152
try:
143-
return await subscriptions.client.eth_getFilterChanges(id)
144-
except CatchableError:
153+
let mappedId = subscriptions.subscriptionMapping[originalId]
154+
return await subscriptions.client.eth_getFilterChanges(mappedId)
155+
except CatchableError as e:
156+
if "filter not found" in e.msg:
157+
let filter = subscriptions.filters[originalId]
158+
let newId = await subscriptions.client.eth_newFilter(filter)
159+
subscriptions.subscriptionMapping[originalId] = newId
160+
145161
return newJArray()
146162

147163
proc poll(id: JsonNode) {.async.} =
@@ -180,6 +196,7 @@ method subscribeBlocks(subscriptions: PollingSubscriptions,
180196

181197
let id = await subscriptions.client.eth_newBlockFilter()
182198
subscriptions.callbacks[id] = callback
199+
subscriptions.subscriptionMapping[id] = id
183200
return id
184201

185202
method subscribeLogs(subscriptions: PollingSubscriptions,
@@ -194,10 +211,14 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
194211

195212
let id = await subscriptions.client.eth_newFilter(filter)
196213
subscriptions.callbacks[id] = callback
214+
subscriptions.filters[id] = filter
215+
subscriptions.subscriptionMapping[id] = id
197216
return id
198217

199218
method unsubscribe*(subscriptions: PollingSubscriptions,
200219
id: JsonNode)
201220
{.async.} =
221+
discard await subscriptions.client.eth_uninstallFilter(subscriptions.subscriptionMapping[id])
222+
subscriptions.filters.del(id)
202223
subscriptions.callbacks.del(id)
203-
discard await subscriptions.client.eth_uninstallFilter(id)
224+
subscriptions.subscriptionMapping.del(id)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import ../../examples
2+
import ../../../ethers/provider
3+
import ../../../ethers/providers/jsonrpc/conversions
4+
5+
import std/tables
6+
import pkg/stew/byteutils
7+
import pkg/json_rpc/rpcserver except `%`, `%*`
8+
import pkg/json_rpc/errors
9+
10+
11+
type MockRpcHttpServer* = ref object
12+
filters*: Table[string, bool]
13+
newFilterCounter*: int
14+
srv: RpcHttpServer
15+
16+
proc new*(_: type MockRpcHttpServer): MockRpcHttpServer =
17+
MockRpcHttpServer(filters: initTable[string, bool](), newFilterCounter: 0, srv: newRpcHttpServer(["127.0.0.1:0"]))
18+
19+
proc invalidateFilter*(server: MockRpcHttpServer, id: string) =
20+
server.filters[id] = false
21+
22+
proc start*(server: MockRpcHttpServer) =
23+
server.srv.router.rpc("eth_newFilter") do(filter: EventFilter) -> string:
24+
let filterId = "0x" & (array[16, byte].example).toHex
25+
server.filters[filterId] = true
26+
server.newFilterCounter += 1
27+
return filterId
28+
29+
server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]:
30+
if(not hasKey(server.filters, id) or not server.filters[id]):
31+
raise (ref ApplicationError)(code: -32000, msg: "filter not found")
32+
33+
return @[]
34+
35+
server.srv.router.rpc("eth_uninstallFilter") do(id: string) -> bool:
36+
if(not hasKey(server.filters, id)):
37+
raise (ref ApplicationError)(code: -32000, msg: "filter not found")
38+
39+
server.filters.del(id)
40+
return true
41+
42+
server.srv.start()
43+
44+
proc stop*(server: MockRpcHttpServer) {.async.} =
45+
await server.srv.stop()
46+
await server.srv.closeWait()
47+
48+
49+
proc localAddress*(server: MockRpcHttpServer): seq[TransportAddress] =
50+
return server.srv.localAddress()

testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
import std/json
2+
import std/sequtils
23
import pkg/asynctest
4+
import pkg/serde
35
import pkg/json_rpc/rpcclient
6+
import pkg/json_rpc/rpcserver
47
import ethers/provider
58
import ethers/providers/jsonrpc/subscriptions
69

10+
import ../../examples
11+
import ./rpc_mock
12+
713
suite "JsonRpcSubscriptions":
814

915
test "can be instantiated with an http client":
@@ -89,3 +95,62 @@ suite "HTTP polling subscriptions":
8995
await client.close()
9096

9197
subscriptionTests(subscriptions, client)
98+
99+
suite "HTTP polling subscriptions - filter not found":
100+
101+
var subscriptions: JsonRpcSubscriptions
102+
var client: RpcHttpClient
103+
var mockServer: MockRpcHttpServer
104+
105+
setup:
106+
mockServer = MockRpcHttpServer.new()
107+
mockServer.start()
108+
109+
client = newRpcHttpClient()
110+
await client.connect("http://" & $mockServer.localAddress()[0])
111+
112+
subscriptions = JsonRpcSubscriptions.new(client,
113+
pollingInterval = 15.millis)
114+
subscriptions.start()
115+
116+
teardown:
117+
await subscriptions.close()
118+
await client.close()
119+
await mockServer.stop()
120+
121+
test "filter not found error recreates filter":
122+
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
123+
let emptyHandler = proc(log: Log) = discard
124+
125+
check mockServer.newFilterCounter == 0
126+
let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler)
127+
let id = string.fromJson(jsonId).tryGet
128+
check mockServer.newFilterCounter == 1
129+
130+
await sleepAsync(50.millis)
131+
mockServer.invalidateFilter(id)
132+
await sleepAsync(50.millis)
133+
check mockServer.newFilterCounter == 2
134+
135+
test "recreated filter can be still unsubscribed using the original id":
136+
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
137+
let emptyHandler = proc(log: Log) = discard
138+
139+
check mockServer.newFilterCounter == 0
140+
let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler)
141+
let id = string.fromJson(jsonId).tryGet
142+
check mockServer.newFilterCounter == 1
143+
144+
await sleepAsync(50.millis)
145+
mockServer.invalidateFilter(id)
146+
check eventually mockServer.newFilterCounter == 2
147+
check mockServer.filters[id] == false
148+
check mockServer.filters.len() == 2
149+
await subscriptions.unsubscribe(jsonId)
150+
check mockServer.filters.len() == 1
151+
152+
# invalidateFilter sets the filter's value to false which will return the "filter not found"
153+
# unsubscribing will actually delete the key from filters table
154+
# hence after unsubscribing the only key left in the table should be the original id
155+
for key in mockServer.filters.keys():
156+
check key == id

0 commit comments

Comments
 (0)