Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocksapi blocks loader #122

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

Add blocksapi blocks loader #122

wants to merge 12 commits into from

Conversation

mrdimidium
Copy link

Adds a new way to get blocks from the Aurora API. Set source: "blocksapi" in the relay config to use this method.

@strokovok
Copy link
Member

Amazing work! I will have some comments though, already in process of review

Copy link
Member

@strokovok strokovok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm btw do you have lints? There are a couple of places where it doesn't compile :)

ToBlock uint64 `mapstructure:"toBlock"`
ForceReindex bool `mapstructure:"forceReindex"`

RetryCountOnFailure uint8 `mapstructure:"retryCountOnFailure"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: those two parameters are also refiner-specific

fromBlockUpdated = true
}
}
}

if config.FromBlock < fromBlock {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you broke the case when config.FromBlock < config.GenesisBlock
In this case config.FromBlock must become equal to config.GenesisBlock

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be checked in indexer.go:108 for both implementations

if (config.ToBlock > DefaultToBlock) && (config.ToBlock <= config.FromBlock) {
if fromBlockUpdated && (cfgFromBlockOrigin < config.ToBlock) {
if fromBlockUpdated && (config.FromBlock < config.ToBlock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, this condition becomes broken as well actually.
It should essentially track wether it's config that is wrong, or we just already progressed further.
So let's restore that cfgFromBlockOrigin variable, it's actually useful here

main.go Outdated
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
<-sig

// graceful interrupt net requests and io
cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: better to defer it. That will also handle panics

@@ -0,0 +1,298 @@
package indexer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow git doesn't recognize this as a renamed file.
Just a suggestion on how to try achieving this next time (no worries now):

git mv indexer.go refiner.go
git commit ...

And only then modify and create another indexer.go

l *log.Logger
b broker.Broker
grpc *grpc.ClientConn
height uint64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name it nextHeight, a bit more descriptive

return
}

switch r := response.Response.(type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also two other cases that have to be handled:

  • case *blocksapi.ReceiveBlocksResponse_Error_ - basically error on server side. Need to be logged and then you get back to reconnection loop
  • case *blocksapi.ReceiveBlocksResponse_Done_ - you have reached the StopTarget successfully


callCtx := metadata.NewOutgoingContext(ctx, md)

go i.run(callCtx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now on the reconnection loop.
It can happen that connection was interrupted, or server has returned error because of some temporary problem.
Currently this code only does single attempt and will basically hang in case of any error.
What needs to be done is a loop.
Each time connection error, EOF or receive error happens - run needs to be called again. So just make a loop around that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:

wg.Add(1)
go func() {
defer wg.Done()
for {
	if indexerCtx.Err() != nil {
		return
	}
	if err := run(indexerCtx); err == nil {
		// Maybe we finished completely (if !force-reindex && toblock != 0)
		// in that case just return
		// otherwise handle switch from reindex to normal endless indexing
	}
	// TODO: Log error

	// That function doesn't exist but I guess you understand what I mean
	if !sleepCtx(indexerCtx, time.Second/5) {
		return
	}
}
}()

callCtx := metadata.NewOutgoingContext(ctx, md)

go i.run(callCtx)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we should also repeat the logic from here:

// after re-indexing a range of blocks, continue with the latest block in db

Basically in case of ForceReindex - we should first do the reindex, and once reindex is done - we should start reading from whatever is the current state.

@@ -263,7 +263,7 @@ func TestConfiguration(t *testing.T) {
args[0].(*db.StoreHandler).Close()
},
call: func(args ...interface{}) (interface{}, error) {
i, err := New(args[0].(*db.StoreHandler))
i, err := NewIndexerRefiner(args[0].(*db.StoreHandler), nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not enough arguments in call to NewIndexerRefiner
(here and below)

// Register protobuf plugin for generate optimized marshall & unmarshal code
encoding.RegisterCodec(vtgrpc.Codec{})
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please add some logging so the progress of indexing is visible.
I would simply start second goroutine that would every couple of seconds log the current height, speed, etc
Something like I did here: https://github.com/aurora-is-near/block-storage/blob/main/cmd/testconsumer/testconsumer.go#L213
Indexer and this logging goroutine can exchange state via atomic vars easily

@maharifu maharifu requested a review from strokovok December 2, 2024 11:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants