Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Implement backoff retry and sequential fetch#31

Open
ginesdt wants to merge 2 commits intomasterfrom
retry-on-error
Open

Implement backoff retry and sequential fetch#31
ginesdt wants to merge 2 commits intomasterfrom
retry-on-error

Conversation

@ginesdt
Copy link
Copy Markdown
Contributor

@ginesdt ginesdt commented Nov 16, 2022

Some web3 providers force rate limits such that the amount of request you can do per second is capped. In that case, the provider sends an error if many requests are done in very short time.

This pull requests implements a retry mechanism with exponential backoff in case an error is received from the provider.

Additionally, a config parameter has been added to allow fetching the events sequentially to limit the amount of requests. That is, a request is not attempted until previous one is completed.

@ginesdt ginesdt requested review from kwladyka, madis and madvas November 16, 2022 12:39
@kwladyka
Copy link
Copy Markdown
Contributor

kwladyka commented Nov 18, 2022

Hey, re-try mechanism sounds good.

But I am not sure if we need replay-sequential and replay-parallel and other changes.

(async/pipeline-async 10 ch-final-logs chunk->logs' ch-chunks-to-process) - the 10 is the number which you can use to control max chunks processed at once.

Instead of code changes, try change 10 to for example 1. It can be passed as parameter.

Although re-try has it own value.

Let me know if it works for you.

@ginesdt
Copy link
Copy Markdown
Contributor Author

ginesdt commented Nov 21, 2022

Hey, re-try mechanism sounds good.

But I am not sure if we need replay-sequential and replay-parallel and other changes.

(async/pipeline-async 10 ch-final-logs chunk->logs' ch-chunks-to-process) - the 10 is the number which you can use to control max chunks processed at once.

Instead of code changes, try change 10 to for example 1. It can be passed as parameter.

Although re-try has it own value.

Let me know if it works for you.

Thanks for the review. That's what I first thought, just changing the value to 1 would just use 1 processor and everything would be sequential. However this wasn't the case. Contrary to the logic, changing the value to 1 would still run 3 processes in parallel, so I end up implementing the additional -sequential functions you see.

@kwladyka
Copy link
Copy Markdown
Contributor

kwladyka commented Nov 21, 2022

There is caveat of this even with 1. Considering output buffer is (chan 1) it processing first item and put it into output chan, there is also "buffer" chan which is always 1. And processing at least once 1 which will want <!. So summary you have 3 items (chunks) in queue to chan. But everything after this will be run after output channel will have place in buffer again.

So there is a flow:
(async/pipeline-async N ch-final-logs chunk->logs' ch-chunks-to-process)
N processing at once -> buffer chan 1 -> output chan

So:
(async/pipeline-async 1 ch-final-logs chunk->logs' ch-chunks-to-process)
with
(ch-chunks-to-process 1)
will give
1 -> 1 -> 1 which is 3 chunks buffered minimum

To summary this up:

  1. even if output channel is (chan 1) and (async/pipeline-async 1 ch-final-logs chunk->logs' ch-chunks-to-process), then minimum 3 chunks will be buffered (queued).
  2. (async/pipeline-async 1 ch-final-logs chunk->logs' ch-chunks-to-process) still mean it is done "sequentially" so only 1 chunk processing at the same time.

The number 3 is about buffer (queue), not parallel or concurrency processing.

At least this is how it works in Java world.

@kwladyka
Copy link
Copy Markdown
Contributor

This can help understanding how it works under the hood:

(require 'clojure.core.async :as async)

(let [output (async/chan 1)]
    (async/pipeline-blocking 3 output (map (fn [x] (println "processing:" x) (Thread/sleep 3000) x)) (async/to-chan!! (range 10)))
    (Thread/sleep 5000)
    (loop []
      (when-let [x (async/<!! output)]
        (println "loop x:" x)
        (Thread/sleep 500)
        (recur))))
  • this is Clojure code on JVM

Copy link
Copy Markdown
Contributor

@kwladyka kwladyka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to use your version or mine https://github.com/district0x/district-server-smart-contracts/pull/34/files

Notice I also changed

if (:error? (meta log))
                              (callback log nil)

which was a bug.

Please fix it also in your version if you decide to merge yours.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants