Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): Add flag to be able to index past blocks to crawl for missed messages #15547

Merged
merged 6 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/relayer/cmd/flags/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ var (
Category: indexerCategory,
EnvVars: []string{"SRC_TAIKO_ADDRESS"},
}
NumLatestBlocksToIgnoreWhenCrawling = &cli.Uint64Flag{
Name: "numLatestBlocksToIgnoreWhenCrawling",
Usage: "Number of blocks to ingore when crawling chain, should be higher for L2-L1 indexing due to delay",
Value: 1000,
Category: indexerCategory,
EnvVars: []string{"NUM_LATEST_BLOCKS_TO_IGNORE_WHEN_CRAWLING"},
}
)

var IndexerFlags = MergeFlags(CommonFlags, QueueFlags, []cli.Flag{
Expand All @@ -80,4 +87,5 @@ var IndexerFlags = MergeFlags(CommonFlags, QueueFlags, []cli.Flag{
SyncMode,
WatchMode,
DestBridgeAddress,
NumLatestBlocksToIgnoreWhenCrawling,
})
66 changes: 34 additions & 32 deletions packages/relayer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,43 +31,45 @@ type Config struct {
QueueHost string
QueuePort uint64
// rpc configs
SrcRPCUrl string
DestRPCUrl string
ETHClientTimeout uint64
BlockBatchSize uint64
NumGoroutines uint64
SubscriptionBackoff uint64
SyncMode SyncMode
WatchMode WatchMode
OpenQueueFunc func() (queue.Queue, error)
OpenDBFunc func() (DB, error)
SrcRPCUrl string
DestRPCUrl string
ETHClientTimeout uint64
BlockBatchSize uint64
NumGoroutines uint64
SubscriptionBackoff uint64
SyncMode SyncMode
WatchMode WatchMode
NumLatestBlocksToIgnoreWhenCrawling uint64
OpenQueueFunc func() (queue.Queue, error)
OpenDBFunc func() (DB, error)
}

// NewConfigFromCliContext creates a new config instance from command line flags.
func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
return &Config{
SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)),
SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)),
DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)),
DatabaseUsername: c.String(flags.DatabaseUsername.Name),
DatabasePassword: c.String(flags.DatabasePassword.Name),
DatabaseName: c.String(flags.DatabaseName.Name),
DatabaseHost: c.String(flags.DatabaseHost.Name),
DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name),
DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name),
DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name),
QueueUsername: c.String(flags.QueueUsername.Name),
QueuePassword: c.String(flags.QueuePassword.Name),
QueuePort: c.Uint64(flags.QueuePort.Name),
QueueHost: c.String(flags.QueueHost.Name),
SrcRPCUrl: c.String(flags.SrcRPCUrl.Name),
DestRPCUrl: c.String(flags.DestRPCUrl.Name),
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
WatchMode: WatchMode(c.String(flags.WatchMode.Name)),
SyncMode: SyncMode(c.String(flags.SyncMode.Name)),
ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name),
SrcBridgeAddress: common.HexToAddress(c.String(flags.SrcBridgeAddress.Name)),
SrcTaikoAddress: common.HexToAddress(c.String(flags.SrcTaikoAddress.Name)),
DestBridgeAddress: common.HexToAddress(c.String(flags.DestBridgeAddress.Name)),
DatabaseUsername: c.String(flags.DatabaseUsername.Name),
DatabasePassword: c.String(flags.DatabasePassword.Name),
DatabaseName: c.String(flags.DatabaseName.Name),
DatabaseHost: c.String(flags.DatabaseHost.Name),
DatabaseMaxIdleConns: c.Uint64(flags.DatabaseMaxIdleConns.Name),
DatabaseMaxOpenConns: c.Uint64(flags.DatabaseMaxOpenConns.Name),
DatabaseMaxConnLifetime: c.Uint64(flags.DatabaseConnMaxLifetime.Name),
QueueUsername: c.String(flags.QueueUsername.Name),
QueuePassword: c.String(flags.QueuePassword.Name),
QueuePort: c.Uint64(flags.QueuePort.Name),
QueueHost: c.String(flags.QueueHost.Name),
SrcRPCUrl: c.String(flags.SrcRPCUrl.Name),
DestRPCUrl: c.String(flags.DestRPCUrl.Name),
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
NumGoroutines: c.Uint64(flags.MaxNumGoroutines.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
WatchMode: WatchMode(c.String(flags.WatchMode.Name)),
SyncMode: SyncMode(c.String(flags.SyncMode.Name)),
ETHClientTimeout: c.Uint64(flags.ETHClientTimeout.Name),
NumLatestBlocksToIgnoreWhenCrawling: c.Uint64(flags.NumLatestBlocksToIgnoreWhenCrawling.Name),
OpenDBFunc: func() (DB, error) {
return db.OpenDBConnection(db.DBConnectionOpts{
Name: c.String(flags.DatabaseUsername.Name),
Expand Down
6 changes: 6 additions & 0 deletions packages/relayer/indexer/detect_and_handle_reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
)

func (i *Indexer) detectAndHandleReorg(ctx context.Context, eventType string, msgHash string) error {
// dont check on crawling past blocks, it will be a secondary indexer.
// we expect to see duplicates in this mode.
if i.watchMode == CrawlPastBlocks {
return nil
}

e, err := i.eventRepo.FirstByEventAndMsgHash(ctx, eventType, msgHash)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.FirstByMsgHash")
Expand Down
62 changes: 42 additions & 20 deletions packages/relayer/indexer/handle_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (i *Indexer) handleEvent(
return errors.Wrap(err, "svc.eventStatusFromMsgHash")
}

if i.watchMode == CrawlPastBlocks && eventStatus != relayer.EventStatusNew {
// we can return early, this message has been processed as expected.
return nil
}

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
Expand All @@ -58,32 +63,49 @@ func (i *Indexer) handleEvent(
return errors.Wrap(err, "eventTypeAmountAndCanonicalTokenFromEvent(event)")
}

opts := relayer.SaveEventOpts{
Name: relayer.EventNameMessageSent,
Data: string(marshaled),
ChainID: chainID,
Status: eventStatus,
EventType: eventType,
Amount: amount.String(),
MsgHash: common.Hash(event.MsgHash).Hex(),
MessageOwner: event.Message.Owner.Hex(),
Event: relayer.EventNameMessageSent,
existingEvent, err := i.eventRepo.FirstByEventAndMsgHash(
ctx,
relayer.EventNameMessageSent,
common.Hash(event.MsgHash).Hex(),
)
if err != nil {
return errors.Wrap(err, "i.eventRepo.FirstByEventAndMsgHash")
}

if canonicalToken != nil {
opts.CanonicalTokenAddress = canonicalToken.Address().Hex()
opts.CanonicalTokenSymbol = canonicalToken.ContractSymbol()
opts.CanonicalTokenName = canonicalToken.ContractName()
opts.CanonicalTokenDecimals = canonicalToken.TokenDecimals()
}
var id int

if existingEvent == nil {
opts := relayer.SaveEventOpts{
Name: relayer.EventNameMessageSent,
Data: string(marshaled),
ChainID: chainID,
Status: eventStatus,
EventType: eventType,
Amount: amount.String(),
MsgHash: common.Hash(event.MsgHash).Hex(),
MessageOwner: event.Message.Owner.Hex(),
Event: relayer.EventNameMessageSent,
}

e, err := i.eventRepo.Save(ctx, opts)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
if canonicalToken != nil {
opts.CanonicalTokenAddress = canonicalToken.Address().Hex()
opts.CanonicalTokenSymbol = canonicalToken.ContractSymbol()
opts.CanonicalTokenName = canonicalToken.ContractName()
opts.CanonicalTokenDecimals = canonicalToken.TokenDecimals()
}

e, err := i.eventRepo.Save(ctx, opts)
if err != nil {
return errors.Wrap(err, "svc.eventRepo.Save")
}

id = e.ID
} else {
id = existingEvent.ID
}

msg := queue.QueueMessageBody{
ID: e.ID,
ID: id,
Event: event,
}

Expand Down
80 changes: 62 additions & 18 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ var (
Filter WatchMode = "filter"
Subscribe WatchMode = "subscribe"
FilterAndSubscribe WatchMode = "filter-and-subscribe"
WatchModes = []WatchMode{Filter, Subscribe, FilterAndSubscribe}
CrawlPastBlocks WatchMode = "crawl-past-blocks"
WatchModes = []WatchMode{Filter, Subscribe, FilterAndSubscribe, CrawlPastBlocks}
)

type SyncMode string
Expand Down Expand Up @@ -90,6 +91,8 @@ type Indexer struct {

wg *sync.WaitGroup

numLatestBlocksToIgnoreWhenCrawling uint64

ctx context.Context
}

Expand Down Expand Up @@ -185,6 +188,8 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {

i.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second

i.numLatestBlocksToIgnoreWhenCrawling = cfg.NumLatestBlocksToIgnoreWhenCrawling

return nil
}

Expand Down Expand Up @@ -241,7 +246,14 @@ func (i *Indexer) filter(ctx context.Context) error {
return i.subscribe(ctx, i.srcChainId)
}

if err := i.setInitialProcessingBlockByMode(ctx, i.syncMode, i.srcChainId); err != nil {
syncMode := i.syncMode

// always use Resync when crawling past blocks
if i.watchMode == CrawlPastBlocks {
syncMode = Resync
}

if err := i.setInitialProcessingBlockByMode(ctx, syncMode, i.srcChainId); err != nil {
return errors.Wrap(err, "i.setInitialProcessingBlockByMode")
}

Expand All @@ -262,12 +274,20 @@ func (i *Indexer) filter(ctx context.Context) error {
"batchsize", i.blockBatchSize,
)

for j := i.processingBlockHeight; j < header.Number.Uint64(); j += i.blockBatchSize {
endBlockID := header.Number.Uint64()

// ignore latest N blocks, they are probably in queue already
// and are not "missed".
if i.watchMode == CrawlPastBlocks {
endBlockID -= i.numLatestBlocksToIgnoreWhenCrawling
}

for j := i.processingBlockHeight; j < endBlockID; j += i.blockBatchSize {
end := i.processingBlockHeight + i.blockBatchSize
// if the end of the batch is greater than the latest block number, set end
// to the latest block number
if end > header.Number.Uint64() {
end = header.Number.Uint64()
if end > endBlockID {
end = endBlockID
}

// filter exclusive of the end block.
Expand All @@ -283,17 +303,21 @@ func (i *Indexer) filter(ctx context.Context) error {
Context: ctx,
}

messageStatusChangedEvents, err := i.bridge.FilterMessageStatusChanged(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "bridge.FilterMessageStatusChanged")
}

// we don't need to do anything with msgStatus events except save them to the DB.
// we don't need to process them. they are for exposing via the API.

err = i.saveMessageStatusChangedEvents(ctx, i.srcChainId, messageStatusChangedEvents)
if err != nil {
return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents")
// we dont want to watch for message status changed events
// when crawling past blocks on a loop.
if i.watchMode != CrawlPastBlocks {
messageStatusChangedEvents, err := i.bridge.FilterMessageStatusChanged(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "bridge.FilterMessageStatusChanged")
}

// we don't need to do anything with msgStatus events except save them to the DB.
// we don't need to process them. they are for exposing via the API.

err = i.saveMessageStatusChangedEvents(ctx, i.srcChainId, messageStatusChangedEvents)
if err != nil {
return errors.Wrap(err, "bridge.saveMessageStatusChangedEvents")
}
}

messageSentEvents, err := i.bridge.FilterMessageSent(filterOpts, nil)
Expand Down Expand Up @@ -333,15 +357,33 @@ func (i *Indexer) filter(ctx context.Context) error {
}

slog.Info(
"indexer fully caught up, checking latest block number to see if it's advanced",
"indexer fully caught up",
)

if i.watchMode == CrawlPastBlocks {
slog.Info("restarting filtering from genesis")
return i.filter(ctx)
}

slog.Info("getting latest block to see if header has advanced")

latestBlock, err := i.srcEthClient.HeaderByNumber(ctx, nil)
if err != nil {
return errors.Wrap(err, "i.srcEthClient.HeaderByNumber")
}

if i.processingBlockHeight < latestBlock.Number.Uint64() {
latestBlockIDToCompare := latestBlock.Number.Uint64()

if i.watchMode == CrawlPastBlocks && latestBlockIDToCompare > i.numLatestBlocksToIgnoreWhenCrawling {
latestBlockIDToCompare -= i.numLatestBlocksToIgnoreWhenCrawling
}

if i.processingBlockHeight < latestBlockIDToCompare {
slog.Info("header has advanced",
"processingBlockHeight", i.processingBlockHeight,
"latestBlock", latestBlockIDToCompare,
)

return i.filter(ctx)
}

Expand All @@ -350,6 +392,8 @@ func (i *Indexer) filter(ctx context.Context) error {
return nil
}

slog.Info("processing is caught up to latest block, subscribing to new blocks")

return i.subscribe(ctx, i.srcChainId)
}

Expand Down
Loading