diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index a3f0810..ba1756b 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -1,5 +1,6 @@ import std/tables import std/sequtils +import std/strutils import pkg/chronos import pkg/json_rpc/rpcclient import ../../basics @@ -132,16 +133,31 @@ type PollingSubscriptions = ref object of JsonRpcSubscriptions polling: Future[void] + # We need to keep around the filters that are used to create log filters on the RPC node + # as there might be a time when they need to be recreated as RPC node might prune/forget + # about them + filters: Table[JsonNode, EventFilter] + + # Used when filters are recreated to translate from the id that user + # originally got returned to new filter id + subscriptionMapping: Table[JsonNode, JsonNode] + proc new*(_: type JsonRpcSubscriptions, client: RpcHttpClient, pollingInterval = 4.seconds): JsonRpcSubscriptions = let subscriptions = PollingSubscriptions(client: client) - proc getChanges(id: JsonNode): Future[JsonNode] {.async.} = + proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} = try: - return await subscriptions.client.eth_getFilterChanges(id) - except CatchableError: + let mappedId = subscriptions.subscriptionMapping[originalId] + return await subscriptions.client.eth_getFilterChanges(mappedId) + except CatchableError as e: + if "filter not found" in e.msg: + let filter = subscriptions.filters[originalId] + let newId = await subscriptions.client.eth_newFilter(filter) + subscriptions.subscriptionMapping[originalId] = newId + return newJArray() proc poll(id: JsonNode) {.async.} = @@ -180,6 +196,7 @@ method subscribeBlocks(subscriptions: PollingSubscriptions, let id = await subscriptions.client.eth_newBlockFilter() subscriptions.callbacks[id] = callback + subscriptions.subscriptionMapping[id] = id return id method subscribeLogs(subscriptions: PollingSubscriptions, @@ -194,10 +211,14 @@ method subscribeLogs(subscriptions: PollingSubscriptions, let id = await subscriptions.client.eth_newFilter(filter) subscriptions.callbacks[id] = callback + subscriptions.filters[id] = filter + subscriptions.subscriptionMapping[id] = id return id method unsubscribe*(subscriptions: PollingSubscriptions, id: JsonNode) {.async.} = + subscriptions.filters.del(id) subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_uninstallFilter(id) + discard await subscriptions.client.eth_uninstallFilter(subscriptions.subscriptionMapping[id]) + subscriptions.subscriptionMapping.del(id) diff --git a/testmodule/providers/jsonrpc/rpc_mock.nim b/testmodule/providers/jsonrpc/rpc_mock.nim new file mode 100644 index 0000000..57cf6a8 --- /dev/null +++ b/testmodule/providers/jsonrpc/rpc_mock.nim @@ -0,0 +1,50 @@ +import ../../examples +import ../../../ethers/provider +import ../../../ethers/providers/jsonrpc/conversions + +import std/tables +import pkg/stew/byteutils +import pkg/json_rpc/rpcserver except `%`, `%*` +import pkg/json_rpc/errors + + +type MockRpcHttpServer* = ref object + filters*: Table[string, bool] + newFilterCounter*: int + srv: RpcHttpServer + +proc new*(_: type MockRpcHttpServer): MockRpcHttpServer = + MockRpcHttpServer(filters: initTable[string, bool](), newFilterCounter: 0, srv: newRpcHttpServer(["127.0.0.1:0"])) + +proc invalidateFilter*(server: MockRpcHttpServer, id: string) = + server.filters[id] = false + +proc start*(server: MockRpcHttpServer) = + server.srv.router.rpc("eth_newFilter") do(filter: EventFilter) -> string: + let filterId = "0x" & (array[16, byte].example).toHex + server.filters[filterId] = true + server.newFilterCounter += 1 + return filterId + + server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]: + if(not hasKey(server.filters, id) or not server.filters[id]): + raise (ref ApplicationError)(code: -32000, msg: "filter not found") + + return @[] + + server.srv.router.rpc("eth_uninstallFilter") do(id: string) -> bool: + if(not hasKey(server.filters, id)): + raise (ref ApplicationError)(code: -32000, msg: "filter not found") + + del(server.filters, id) + return true + + server.srv.start() + +proc stop*(server: MockRpcHttpServer) {.async.} = + await server.srv.stop() + await server.srv.closeWait() + + +proc localAddress*(server: MockRpcHttpServer): seq[TransportAddress] = + return server.srv.localAddress() diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index a402774..003490d 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -1,9 +1,14 @@ import std/json import pkg/asynctest +import pkg/serde import pkg/json_rpc/rpcclient +import pkg/json_rpc/rpcserver import ethers/provider import ethers/providers/jsonrpc/subscriptions +import ../../examples +import ./rpc_mock + suite "JsonRpcSubscriptions": test "can be instantiated with an http client": @@ -89,3 +94,55 @@ suite "HTTP polling subscriptions": await client.close() subscriptionTests(subscriptions, client) + +suite "HTTP polling subscriptions - filter not found": + + var subscriptions: JsonRpcSubscriptions + var client: RpcHttpClient + var mockServer: MockRpcHttpServer + + setup: + mockServer = MockRpcHttpServer.new() + mockServer.start() + + client = newRpcHttpClient() + await client.connect("http://" & $mockServer.localAddress()[0]) + + subscriptions = JsonRpcSubscriptions.new(client, + pollingInterval = 100.millis) + subscriptions.start() + + teardown: + await subscriptions.close() + await client.close() + await mockServer.stop() + + test "filter not found error recreates filter": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: Log) = discard + + check mockServer.newFilterCounter == 0 + let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler) + let id = string.fromJson(jsonId).tryGet + check mockServer.newFilterCounter == 1 + + await sleepAsync(300.millis) + mockServer.invalidateFilter(id) + await sleepAsync(300.millis) + check mockServer.newFilterCounter == 2 + + test "recreated filter can be still unsubscribed using the original id": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: Log) = discard + + check mockServer.newFilterCounter == 0 + let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler) + let id = string.fromJson(jsonId).tryGet + check mockServer.newFilterCounter == 1 + + await sleepAsync(300.millis) + mockServer.invalidateFilter(id) + await sleepAsync(300.millis) + check mockServer.newFilterCounter == 2 + await subscriptions.unsubscribe(jsonId) +