Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
beer-1 committed Jul 30, 2024
1 parent 9d3481b commit f6e90c0
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 31 deletions.
4 changes: 1 addition & 3 deletions executor/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
opchildtypes "github.com/initia-labs/OPinit/x/opchild/types"
ophosttypes "github.com/initia-labs/OPinit/x/ophost/types"
executortypes "github.com/initia-labs/opinit-bots-go/executor/types"
"github.com/initia-labs/opinit-bots-go/merkle"
nodetypes "github.com/initia-labs/opinit-bots-go/node/types"
"github.com/initia-labs/opinit-bots-go/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -44,7 +43,6 @@ type BatchSubmitter struct {
node *node.Node
host hostNode
da executortypes.DANode
mk *merkle.Merkle

bridgeInfo opchildtypes.BridgeInfo

Expand Down Expand Up @@ -115,7 +113,7 @@ func (bs *BatchSubmitter) Initialize(host hostNode, da executortypes.DANode, bri
if len(bs.batchInfos) == 1 || batchInfo.Output.L2BlockNumber >= bs.node.GetHeight() {
break
}
bs.PopBatchInfo()
bs.DequeueBatchInfo()
}
// TODO: set da and key that match the current batch info
bs.da = da
Expand Down
92 changes: 70 additions & 22 deletions executor/batch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"io"
"time"

"github.com/pkg/errors"

sdk "github.com/cosmos/cosmos-sdk/types"
executortypes "github.com/initia-labs/opinit-bots-go/executor/types"
nodetypes "github.com/initia-labs/opinit-bots-go/node/types"
Expand All @@ -27,92 +29,115 @@ func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error {
pbb := new(cmtproto.Block)
err := proto.Unmarshal(args.BlockBytes, pbb)
if err != nil {
return err
return errors.Wrap(err, "failed to unmarshal block")
}

err = bs.prepareBatch(args.BlockHeight, pbb.Header.Time)
if err != nil {
return err
return errors.Wrap(err, "failed to prepare batch")
}

err = bs.handleBatch(args.BlockBytes)
if err != nil {
return err
return errors.Wrap(err, "failed to handle batch")
}

err = bs.checkBatch(args.BlockHeight, pbb.Header.Time)
if err != nil {
return err
return errors.Wrap(err, "failed to check batch")
}

// store the processed state into db with batch operation
batchKVs := make([]types.RawKV, 0)
batchKVs = append(batchKVs, bs.node.SyncInfoToRawKV(args.BlockHeight))
batchMsgkvs, err := bs.da.ProcessedMsgsToRawKV(bs.processedMsgs, false)
batchMsgKVs, err := bs.da.ProcessedMsgsToRawKV(bs.processedMsgs, false)
if err != nil {
return err
return errors.Wrap(err, "failed to convert processed messages to raw key value")
}
batchKVs = append(batchKVs, batchMsgkvs...)
if len(batchMsgkvs) > 0 {
batchKVs = append(batchKVs, batchMsgKVs...)
if len(batchMsgKVs) > 0 {
batchKVs = append(batchKVs, bs.SubmissionInfoToRawKV(pbb.Header.Time.UnixNano()))
}
err = bs.db.RawBatchSet(batchKVs...)
if err != nil {
return err
return errors.Wrap(err, "failed to set raw batch")
}

// broadcast processed messages
for _, processedMsg := range bs.processedMsgs {
bs.da.BroadcastMsgs(processedMsg)
}

// clear processed messages
bs.processedMsgs = bs.processedMsgs[:0]

return nil
}

func (bs *BatchSubmitter) prepareBatch(blockHeight uint64, blockTime time.Time) error {

// check whether the requested block height is reached to the l2 block number of the next batch info.
if nextBatchInfo := bs.NextBatchInfo(); nextBatchInfo != nil && nextBatchInfo.Output.L2BlockNumber < blockHeight {

// if the next batch info is reached, finalize the current batch and update the batch info.
if bs.batchWriter != nil {
err := bs.batchWriter.Close()
if err != nil {
return err
return errors.Wrap(err, "failed to close batch writer")
}
}
err := bs.batchFile.Truncate(0)
if err != nil {
return err
return errors.Wrap(err, "failed to truncate batch file")
}
_, err = bs.batchFile.Seek(0, 0)
if err != nil {
return err
return errors.Wrap(err, "failed to seek batch file")
}

// save sync info
err = bs.node.SaveSyncInfo(nextBatchInfo.Output.L2BlockNumber)
if err != nil {
return err
return errors.Wrap(err, "failed to save sync info")
}

// set last processed block height to l2 block number
bs.node.SetSyncInfo(nextBatchInfo.Output.L2BlockNumber)
bs.PopBatchInfo()
bs.DequeueBatchInfo()

// error will restart block process from nextBatchInfo.Output.L2BlockNumber + 1
return fmt.Errorf("batch info updated: reset from %d", nextBatchInfo.Output.L2BlockNumber)
}

if bs.batchHeader != nil {
// if the batch header end is not set, it means the batch is not finalized yet.
if bs.batchHeader.End == 0 {
return nil
}

err := bs.finalizeBatch(blockHeight)
if err != nil {
return err
return errors.Wrap(err, "failed to finalize batch")
}

// update last submission time
bs.lastSubmissionTime = blockTime
}
bs.batchHeader = &executortypes.BatchHeader{}

// reset batch header
var err error
bs.batchHeader = &executortypes.BatchHeader{}

// linux command gzip use level 6 as default
bs.batchWriter, err = gzip.NewWriterLevel(bs.batchFile, 6)
if err != nil {
return err
}

return nil
}

// write block bytes to batch file
func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error {
encodedBlockBytes := base64.StdEncoding.EncodeToString(blockBytes)
_, err := bs.batchWriter.Write(append([]byte(encodedBlockBytes), ','))
Expand All @@ -122,23 +147,27 @@ func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error {
return nil
}

// finalize batch and create batch messages
func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {

// write last block's commit to batch file
rawCommit, err := bs.node.QueryRawCommit(int64(blockHeight))
if err != nil {
return err
return errors.Wrap(err, "failed to query raw commit")
}
encodedRawCommit := base64.StdEncoding.EncodeToString(rawCommit)
_, err = bs.batchWriter.Write([]byte(encodedRawCommit))
if err != nil {
return err
return errors.Wrap(err, "failed to write raw commit")
}
err = bs.batchWriter.Close()
if err != nil {
return err
return errors.Wrap(err, "failed to close batch writer")
}

batchBuffer := make([]byte, bs.batchCfg.MaxChunkSize)
checksums := make([][]byte, 0)

// room for batch header
bs.processedMsgs = append(bs.processedMsgs, nodetypes.ProcessedMsgs{
Timestamp: time.Now().UnixNano(),
Expand All @@ -152,7 +181,9 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {
} else if readLength == 0 {
break
}
batchBuffer = batchBuffer[:readLength]

// trim the buffer to the actual read length
batchBuffer := batchBuffer[:readLength]
msg, err := bs.createBatchMsg(batchBuffer)
if err != nil {
return err
Expand All @@ -168,6 +199,8 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {
break
}
}

// update batch header
bs.batchHeader.Chunks = checksums
headerBytes, err := json.Marshal(bs.batchHeader)
if err != nil {
Expand All @@ -178,6 +211,8 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {
return err
}
bs.processedMsgs[0].Msgs = []sdk.Msg{msg}

// reset batch file
err = bs.batchFile.Truncate(0)
if err != nil {
return err
Expand All @@ -186,23 +221,32 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {
if err != nil {
return err
}

return nil
}

func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) error {
info, err := bs.batchFile.Stat()
if err != nil {
return err
return errors.Wrap(err, "failed to get batch file stat")
}

// if the block time is after the last submission time + submission interval * 2/3
// or the block time is after the last submission time + max submission time
// or the batch file size is greater than (max chunks - 1) * max chunk size
// then finalize the batch
if blockTime.After(bs.lastSubmissionTime.Add(bs.bridgeInfo.BridgeConfig.SubmissionInterval*2/3)) ||
blockTime.After(bs.lastSubmissionTime.Add(time.Duration(bs.batchCfg.MaxSubmissionTime)*time.Second)) ||
info.Size() > (bs.batchCfg.MaxChunks-1)*bs.batchCfg.MaxChunkSize {

// finalize the batch
bs.batchHeader.End = blockHeight
}

return nil
}

// TODO: support celestia
func (bs *BatchSubmitter) createBatchMsg(batchBytes []byte) (sdk.Msg, error) {
submitter, err := bs.da.GetAddressStr()
if err != nil {
Expand All @@ -216,6 +260,7 @@ func (bs *BatchSubmitter) createBatchMsg(batchBytes []byte) (sdk.Msg, error) {
), nil
}

// UpdateBatchInfo appends the batch info with the given chain, submitter, output index, and l2 block number
func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, outputIndex uint64, l2BlockNumber uint64) {
bs.batchInfoMu.Lock()
defer bs.batchInfoMu.Unlock()
Expand All @@ -231,13 +276,15 @@ func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, output
})
}

// BatchInfo returns the current batch info
func (bs *BatchSubmitter) BatchInfo() *ophosttypes.BatchInfoWithOutput {
bs.batchInfoMu.Lock()
defer bs.batchInfoMu.Unlock()

return &bs.batchInfos[0]
}

// NextBatchInfo returns the next batch info in the queue
func (bs *BatchSubmitter) NextBatchInfo() *ophosttypes.BatchInfoWithOutput {
bs.batchInfoMu.Lock()
defer bs.batchInfoMu.Unlock()
Expand All @@ -247,7 +294,8 @@ func (bs *BatchSubmitter) NextBatchInfo() *ophosttypes.BatchInfoWithOutput {
return &bs.batchInfos[1]
}

func (bs *BatchSubmitter) PopBatchInfo() {
// DequeueBatchInfo removes the first batch info from the queue
func (bs *BatchSubmitter) DequeueBatchInfo() {
bs.batchInfoMu.Lock()
defer bs.batchInfoMu.Unlock()

Expand Down
6 changes: 5 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logg
db.WithPrefix([]byte(executortypes.ChildNodeName)),
logger.Named(executortypes.ChildNodeName), cdc, txConfig,
),
batch: batch.NewBatchSubmitter(cfg.Version, cfg.DANodeConfig(), cfg.BatchConfig(), db.WithPrefix([]byte(executortypes.BatchNodeName)), logger.Named(executortypes.BatchNodeName), cdc, txConfig, homePath),
batch: batch.NewBatchSubmitter(
cfg.Version, cfg.DANodeConfig(), cfg.BatchConfig(),
db.WithPrefix([]byte(executortypes.BatchNodeName)),
logger.Named(executortypes.BatchNodeName), cdc, txConfig, homePath,
),

cfg: cfg,
db: db,
Expand Down
2 changes: 1 addition & 1 deletion executor/types/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type DANode interface {
GetAddressStr() (string, error)
HasKey() bool
BroadcastMsgs(nodetypes.ProcessedMsgs)
ProcessedMsgsToRawKV([]nodetypes.ProcessedMsgs, bool) ([]types.RawKV, error)
ProcessedMsgsToRawKV(processedMsgs []nodetypes.ProcessedMsgs, delete bool) ([]types.RawKV, error)
}

type BatchHeader struct {
Expand Down
7 changes: 5 additions & 2 deletions executor/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ type Config struct {
// RelayOracle is the flag to enable the oracle relay feature.
RelayOracle bool `json:"relay_oracle"`

MaxChunks int64 `json:"max_chunks"`
MaxChunkSize int64 `json:"max_chunk_size"`
// MaxChunks is the maximum number of chunks in a batch.
MaxChunks int64 `json:"max_chunks"`
// MaxChunkSize is the maximum size of a chunk in a batch.
MaxChunkSize int64 `json:"max_chunk_size"`
// MaxSubmissionTime is the maximum time to submit a batch.
MaxSubmissionTime int64 `json:"max_submission_time"` // seconds
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ require (
github.com/oasisprotocol/curve25519-voi v0.0.0-20230904125328-1f23a7beb09a // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions node/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ func GetQueryContext(height uint64) (context.Context, context.CancelFunc) {
return ctx, cancel
}

// QueryRawCommit queries the raw commit at a given height.
func (n *Node) QueryRawCommit(height int64) ([]byte, error) {
ctx, cancel := GetQueryContext(uint64(height))
defer cancel()
return n.RawCommit(ctx, &height)
}

// QueryBlockBulk queries blocks in bulk.
func (n *Node) QueryBlockBulk(start uint64, end uint64) ([][]byte, error) {
ctx, cancel := GetQueryContext(0)
defer cancel()
Expand Down
4 changes: 3 additions & 1 deletion node/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"time"

"github.com/pkg/errors"

sdkerrors "cosmossdk.io/errors"
abci "github.com/cometbft/cometbft/abci/types"
comettypes "github.com/cometbft/cometbft/types"
Expand Down Expand Up @@ -55,7 +57,7 @@ func (n *Node) txBroadcastLooper(ctx context.Context) error {
}

if err != nil {
panic(err)
return errors.Wrap(err, "failed to handle processed msgs")
}
}
}
Expand Down

0 comments on commit f6e90c0

Please sign in to comment.