Skip to content

Commit

Permalink
Add blocksapi blocks loader
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdimidium committed Oct 8, 2024
1 parent 4be29a0 commit ebc9bcd
Show file tree
Hide file tree
Showing 8 changed files with 660 additions and 431 deletions.
36 changes: 19 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/aurora-is-near/relayer2-public

go 1.18
go 1.21

toolchain go1.23.0

//replace github.com/aurora-is-near/relayer2-base => "../relayer2-base"
//replace github.com/aurora-is-near/near-api-go => github.com/aurora-is-near/near-api-go
Expand All @@ -10,18 +12,20 @@ go 1.18
replace github.com/btcsuite/btcd => github.com/btcsuite/btcd v0.23.2

require (
github.com/aurora-is-near/borealis-prototypes-go v0.0.0-20240612174445-a1ca8d036252
github.com/aurora-is-near/near-api-go v0.0.14
github.com/aurora-is-near/relayer2-base v1.1.7
github.com/btcsuite/btcutil v1.0.2
github.com/buger/jsonparser v1.1.1
github.com/ethereum/go-ethereum v1.10.25
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/json-iterator/go v1.1.12
github.com/planetscale/vtprotobuf v0.6.0
github.com/spf13/cobra v1.6.0
github.com/spf13/viper v1.13.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.47.0
golang.org/x/net v0.17.0
google.golang.org/grpc v1.65.0
)

require (
Expand All @@ -37,10 +41,11 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fasthttp/websocket v1.5.2 // indirect
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/glog v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v22.9.29+incompatible // indirect
github.com/holiman/uint256 v1.2.0 // indirect
Expand All @@ -63,38 +68,35 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/sync v0.1.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)

require (
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0
github.com/go-stack/stack v1.8.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.5.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
92 changes: 53 additions & 39 deletions go.sum

Large diffs are not rendered by default.

173 changes: 173 additions & 0 deletions indexer/blockapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package indexer

import (
"context"
"errors"
"io"
"sync"

blocksapi "github.com/aurora-is-near/borealis-prototypes-go/generated/blocksapi"
"github.com/aurora-is-near/relayer2-base/broker"
"github.com/aurora-is-near/relayer2-base/db"
"github.com/aurora-is-near/relayer2-base/log"
"github.com/aurora-is-near/relayer2-base/types"
"github.com/aurora-is-near/relayer2-base/types/common"
"github.com/aurora-is-near/relayer2-base/types/indexer"
"github.com/aurora-is-near/relayer2-base/utils"
jsoniter "github.com/json-iterator/go"

vtgrpc "github.com/planetscale/vtprotobuf/codec/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding"
_ "google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/metadata"
)

type IndexerBlocksAPI struct {
token string
stream string

dbh db.Handler
l *log.Logger
b broker.Broker
grpc *grpc.ClientConn
height uint64
mu sync.Mutex
}

func init() {
// Register protobuf plugin for generate optimized marshall & unmarshal code
encoding.RegisterCodec(vtgrpc.Codec{})
}

func NewIndexerBlocksApi(config *Config, dbh db.Handler, b broker.Broker) (*IndexerBlocksAPI, error) {
logger := log.Log()

client, err := grpc.NewClient(
// TODO: use client-side load balancer
config.BlocksApiUrl[0],
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithInitialConnWindowSize(64*1024*1024), // 64 MB
grpc.WithInitialWindowSize(64*1024*1024), // 64 MB
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1*1024*1024*1024)), // 1GB
)
if err != nil {
return nil, err
}

return &IndexerBlocksAPI{
stream: config.BlocksApiStream,
token: config.BlocksApiToken,
dbh: dbh,
l: logger,
b: b,
grpc: client,
height: config.FromBlock,
}, nil
}

func (i *IndexerBlocksAPI) Start(ctx context.Context) {
i.mu.Lock()
defer i.mu.Unlock()

// Attaching authorization token
md := metadata.New(make(map[string]string))
md.Set("authorization", "Bearer "+i.token)

callCtx := metadata.NewOutgoingContext(ctx, md)

go i.run(callCtx)
}

func (i *IndexerBlocksAPI) run(callCtx context.Context) {
blocksProviderClient := blocksapi.NewBlocksProviderClient(i.grpc)

request := &blocksapi.ReceiveBlocksRequest{
StreamName: i.stream,
StartPolicy: blocksapi.ReceiveBlocksRequest_START_EXACTLY_ON_TARGET,
StopPolicy: blocksapi.ReceiveBlocksRequest_STOP_NEVER,
StartTarget: &blocksapi.BlockMessage_ID{
Kind: blocksapi.BlockMessage_MSG_WHOLE,
Height: i.height,
},
}

callClient, err := blocksProviderClient.ReceiveBlocks(callCtx, request)
if err != nil {
i.l.Error().Err(err).Msgf("unable to call ReceiveBlocks")
return
}

defer callClient.CloseSend()

for {
select {
case <-callCtx.Done():
return
default:
}

response, err := callClient.Recv()
if errors.Is(err, io.EOF) {
break
} else if err != nil {
i.l.Error().Err(err).Msg("unable to receive next response")
return
}

switch r := response.Response.(type) {
case *blocksapi.ReceiveBlocksResponse_Message:
var block indexer.Block

var payload []byte
if rawPayload, ok := r.Message.Message.Payload.(*blocksapi.BlockMessage_RawPayload); ok {
payload = rawPayload.RawPayload
} else {
i.l.Fatal().Msg("invalid payload type")
return
}

err = jsoniter.Unmarshal(payload, &block)
if err != nil {
i.l.Error().Err(err).Msg("couln't parse block")
return
}

err := i.dbh.InsertBlock(&block)
if err != nil {
i.l.Error().Err(err).Msg("couln't insert block")
return
}

if i.b != nil {
ctx := utils.PutChainId(callCtx, block.ChainId)
bn := common.UintToBN64(block.Height)
block, err := i.dbh.GetBlockByNumber(ctx, bn, true)
if err != nil {
// just log, this is a best-effort operation
i.l.Error().Err(err).Msg("couln't get block number")
} else {
i.b.PublishNewHeads(block)
}

nfilter := &(types.Filter{FromBlock: bn.Uint64(), ToBlock: bn.Uint64()})
logs, err := i.dbh.GetLogs(ctx, nfilter.ToLogFilter())
if err != nil {
// just log, this is a best-effort operation
i.l.Error().Err(err).Msg("couln't get block logs")
} else {
i.b.PublishLogs(logs)
}
}

i.height += 1
}
}
}

func (i *IndexerBlocksAPI) Close() {
if err := i.grpc.Close(); err != nil {
log.Log().Printf("Unable to close connection: %v", err)
}
}
62 changes: 0 additions & 62 deletions indexer/config.go

This file was deleted.

Loading

0 comments on commit ebc9bcd

Please sign in to comment.