diff --git a/executor/batch/batch.go b/executor/batch/batch.go index a1c6b43..79b4587 100644 --- a/executor/batch/batch.go +++ b/executor/batch/batch.go @@ -13,7 +13,6 @@ import ( opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" executortypes "github.com/initia-labs/opinit-bots-go/executor/types" - "github.com/initia-labs/opinit-bots-go/merkle" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" "github.com/initia-labs/opinit-bots-go/types" "go.uber.org/zap" @@ -44,7 +43,6 @@ type BatchSubmitter struct { node *node.Node host hostNode da executortypes.DANode - mk *merkle.Merkle bridgeInfo opchildtypes.BridgeInfo @@ -115,7 +113,7 @@ func (bs *BatchSubmitter) Initialize(host hostNode, da executortypes.DANode, bri if len(bs.batchInfos) == 1 || batchInfo.Output.L2BlockNumber >= bs.node.GetHeight() { break } - bs.PopBatchInfo() + bs.DequeueBatchInfo() } // TODO: set da and key that match the current batch info bs.da = da diff --git a/executor/batch/handler.go b/executor/batch/handler.go index baa1236..c05438b 100644 --- a/executor/batch/handler.go +++ b/executor/batch/handler.go @@ -9,6 +9,8 @@ import ( "io" "time" + "github.com/pkg/errors" + sdk "github.com/cosmos/cosmos-sdk/types" executortypes "github.com/initia-labs/opinit-bots-go/executor/types" nodetypes "github.com/initia-labs/opinit-bots-go/node/types" @@ -27,92 +29,115 @@ func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error { pbb := new(cmtproto.Block) err := proto.Unmarshal(args.BlockBytes, pbb) if err != nil { - return err + return errors.Wrap(err, "failed to unmarshal block") } err = bs.prepareBatch(args.BlockHeight, pbb.Header.Time) if err != nil { - return err + return errors.Wrap(err, "failed to prepare batch") } + err = bs.handleBatch(args.BlockBytes) if err != nil { - return err + return errors.Wrap(err, "failed to handle batch") } err = bs.checkBatch(args.BlockHeight, pbb.Header.Time) if err != nil { - return err + return errors.Wrap(err, "failed to check batch") } + // store the processed state into db with batch operation batchKVs := make([]types.RawKV, 0) batchKVs = append(batchKVs, bs.node.SyncInfoToRawKV(args.BlockHeight)) - batchMsgkvs, err := bs.da.ProcessedMsgsToRawKV(bs.processedMsgs, false) + batchMsgKVs, err := bs.da.ProcessedMsgsToRawKV(bs.processedMsgs, false) if err != nil { - return err + return errors.Wrap(err, "failed to convert processed messages to raw key value") } - batchKVs = append(batchKVs, batchMsgkvs...) - if len(batchMsgkvs) > 0 { + batchKVs = append(batchKVs, batchMsgKVs...) + if len(batchMsgKVs) > 0 { batchKVs = append(batchKVs, bs.SubmissionInfoToRawKV(pbb.Header.Time.UnixNano())) } err = bs.db.RawBatchSet(batchKVs...) if err != nil { - return err + return errors.Wrap(err, "failed to set raw batch") } + + // broadcast processed messages for _, processedMsg := range bs.processedMsgs { bs.da.BroadcastMsgs(processedMsg) } + + // clear processed messages bs.processedMsgs = bs.processedMsgs[:0] + return nil } func (bs *BatchSubmitter) prepareBatch(blockHeight uint64, blockTime time.Time) error { + + // check whether the requested block height is reached to the l2 block number of the next batch info. if nextBatchInfo := bs.NextBatchInfo(); nextBatchInfo != nil && nextBatchInfo.Output.L2BlockNumber < blockHeight { + + // if the next batch info is reached, finalize the current batch and update the batch info. if bs.batchWriter != nil { err := bs.batchWriter.Close() if err != nil { - return err + return errors.Wrap(err, "failed to close batch writer") } } err := bs.batchFile.Truncate(0) if err != nil { - return err + return errors.Wrap(err, "failed to truncate batch file") } _, err = bs.batchFile.Seek(0, 0) if err != nil { - return err + return errors.Wrap(err, "failed to seek batch file") } + + // save sync info err = bs.node.SaveSyncInfo(nextBatchInfo.Output.L2BlockNumber) if err != nil { - return err + return errors.Wrap(err, "failed to save sync info") } + // set last processed block height to l2 block number bs.node.SetSyncInfo(nextBatchInfo.Output.L2BlockNumber) - bs.PopBatchInfo() + bs.DequeueBatchInfo() // error will restart block process from nextBatchInfo.Output.L2BlockNumber + 1 return fmt.Errorf("batch info updated: reset from %d", nextBatchInfo.Output.L2BlockNumber) } if bs.batchHeader != nil { + // if the batch header end is not set, it means the batch is not finalized yet. if bs.batchHeader.End == 0 { return nil } + err := bs.finalizeBatch(blockHeight) if err != nil { - return err + return errors.Wrap(err, "failed to finalize batch") } + + // update last submission time bs.lastSubmissionTime = blockTime } - bs.batchHeader = &executortypes.BatchHeader{} + + // reset batch header var err error + bs.batchHeader = &executortypes.BatchHeader{} + // linux command gzip use level 6 as default bs.batchWriter, err = gzip.NewWriterLevel(bs.batchFile, 6) if err != nil { return err } + return nil } +// write block bytes to batch file func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error { encodedBlockBytes := base64.StdEncoding.EncodeToString(blockBytes) _, err := bs.batchWriter.Write(append([]byte(encodedBlockBytes), ',')) @@ -122,23 +147,27 @@ func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error { return nil } +// finalize batch and create batch messages func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { + + // write last block's commit to batch file rawCommit, err := bs.node.QueryRawCommit(int64(blockHeight)) if err != nil { - return err + return errors.Wrap(err, "failed to query raw commit") } encodedRawCommit := base64.StdEncoding.EncodeToString(rawCommit) _, err = bs.batchWriter.Write([]byte(encodedRawCommit)) if err != nil { - return err + return errors.Wrap(err, "failed to write raw commit") } err = bs.batchWriter.Close() if err != nil { - return err + return errors.Wrap(err, "failed to close batch writer") } batchBuffer := make([]byte, bs.batchCfg.MaxChunkSize) checksums := make([][]byte, 0) + // room for batch header bs.processedMsgs = append(bs.processedMsgs, nodetypes.ProcessedMsgs{ Timestamp: time.Now().UnixNano(), @@ -152,7 +181,9 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { } else if readLength == 0 { break } - batchBuffer = batchBuffer[:readLength] + + // trim the buffer to the actual read length + batchBuffer := batchBuffer[:readLength] msg, err := bs.createBatchMsg(batchBuffer) if err != nil { return err @@ -168,6 +199,8 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { break } } + + // update batch header bs.batchHeader.Chunks = checksums headerBytes, err := json.Marshal(bs.batchHeader) if err != nil { @@ -178,6 +211,8 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { return err } bs.processedMsgs[0].Msgs = []sdk.Msg{msg} + + // reset batch file err = bs.batchFile.Truncate(0) if err != nil { return err @@ -186,23 +221,32 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error { if err != nil { return err } + return nil } func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) error { info, err := bs.batchFile.Stat() if err != nil { - return err + return errors.Wrap(err, "failed to get batch file stat") } + // if the block time is after the last submission time + submission interval * 2/3 + // or the block time is after the last submission time + max submission time + // or the batch file size is greater than (max chunks - 1) * max chunk size + // then finalize the batch if blockTime.After(bs.lastSubmissionTime.Add(bs.bridgeInfo.BridgeConfig.SubmissionInterval*2/3)) || blockTime.After(bs.lastSubmissionTime.Add(time.Duration(bs.batchCfg.MaxSubmissionTime)*time.Second)) || info.Size() > (bs.batchCfg.MaxChunks-1)*bs.batchCfg.MaxChunkSize { + + // finalize the batch bs.batchHeader.End = blockHeight } + return nil } +// TODO: support celestia func (bs *BatchSubmitter) createBatchMsg(batchBytes []byte) (sdk.Msg, error) { submitter, err := bs.da.GetAddressStr() if err != nil { @@ -216,6 +260,7 @@ func (bs *BatchSubmitter) createBatchMsg(batchBytes []byte) (sdk.Msg, error) { ), nil } +// UpdateBatchInfo appends the batch info with the given chain, submitter, output index, and l2 block number func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, outputIndex uint64, l2BlockNumber uint64) { bs.batchInfoMu.Lock() defer bs.batchInfoMu.Unlock() @@ -231,6 +276,7 @@ func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, output }) } +// BatchInfo returns the current batch info func (bs *BatchSubmitter) BatchInfo() *ophosttypes.BatchInfoWithOutput { bs.batchInfoMu.Lock() defer bs.batchInfoMu.Unlock() @@ -238,6 +284,7 @@ func (bs *BatchSubmitter) BatchInfo() *ophosttypes.BatchInfoWithOutput { return &bs.batchInfos[0] } +// NextBatchInfo returns the next batch info in the queue func (bs *BatchSubmitter) NextBatchInfo() *ophosttypes.BatchInfoWithOutput { bs.batchInfoMu.Lock() defer bs.batchInfoMu.Unlock() @@ -247,7 +294,8 @@ func (bs *BatchSubmitter) NextBatchInfo() *ophosttypes.BatchInfoWithOutput { return &bs.batchInfos[1] } -func (bs *BatchSubmitter) PopBatchInfo() { +// DequeueBatchInfo removes the first batch info from the queue +func (bs *BatchSubmitter) DequeueBatchInfo() { bs.batchInfoMu.Lock() defer bs.batchInfoMu.Unlock() diff --git a/executor/executor.go b/executor/executor.go index 1b3546a..5ad3265 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -53,7 +53,11 @@ func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logg db.WithPrefix([]byte(executortypes.ChildNodeName)), logger.Named(executortypes.ChildNodeName), cdc, txConfig, ), - batch: batch.NewBatchSubmitter(cfg.Version, cfg.DANodeConfig(), cfg.BatchConfig(), db.WithPrefix([]byte(executortypes.BatchNodeName)), logger.Named(executortypes.BatchNodeName), cdc, txConfig, homePath), + batch: batch.NewBatchSubmitter( + cfg.Version, cfg.DANodeConfig(), cfg.BatchConfig(), + db.WithPrefix([]byte(executortypes.BatchNodeName)), + logger.Named(executortypes.BatchNodeName), cdc, txConfig, homePath, + ), cfg: cfg, db: db, diff --git a/executor/types/batch.go b/executor/types/batch.go index 56bf815..9ea92ed 100644 --- a/executor/types/batch.go +++ b/executor/types/batch.go @@ -9,7 +9,7 @@ type DANode interface { GetAddressStr() (string, error) HasKey() bool BroadcastMsgs(nodetypes.ProcessedMsgs) - ProcessedMsgsToRawKV([]nodetypes.ProcessedMsgs, bool) ([]types.RawKV, error) + ProcessedMsgsToRawKV(processedMsgs []nodetypes.ProcessedMsgs, delete bool) ([]types.RawKV, error) } type BatchHeader struct { diff --git a/executor/types/config.go b/executor/types/config.go index 2dceffb..ae67e76 100644 --- a/executor/types/config.go +++ b/executor/types/config.go @@ -46,8 +46,11 @@ type Config struct { // RelayOracle is the flag to enable the oracle relay feature. RelayOracle bool `json:"relay_oracle"` - MaxChunks int64 `json:"max_chunks"` - MaxChunkSize int64 `json:"max_chunk_size"` + // MaxChunks is the maximum number of chunks in a batch. + MaxChunks int64 `json:"max_chunks"` + // MaxChunkSize is the maximum size of a chunk in a batch. + MaxChunkSize int64 `json:"max_chunk_size"` + // MaxSubmissionTime is the maximum time to submit a batch. MaxSubmissionTime int64 `json:"max_submission_time"` // seconds } diff --git a/go.mod b/go.mod index c9be55b..29ed031 100644 --- a/go.mod +++ b/go.mod @@ -178,7 +178,7 @@ require ( github.com/oasisprotocol/curve25519-voi v0.0.0-20230904125328-1f23a7beb09a // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect diff --git a/node/query.go b/node/query.go index 0e1244e..b00996a 100644 --- a/node/query.go +++ b/node/query.go @@ -153,12 +153,14 @@ func GetQueryContext(height uint64) (context.Context, context.CancelFunc) { return ctx, cancel } +// QueryRawCommit queries the raw commit at a given height. func (n *Node) QueryRawCommit(height int64) ([]byte, error) { ctx, cancel := GetQueryContext(uint64(height)) defer cancel() return n.RawCommit(ctx, &height) } +// QueryBlockBulk queries blocks in bulk. func (n *Node) QueryBlockBulk(start uint64, end uint64) ([][]byte, error) { ctx, cancel := GetQueryContext(0) defer cancel() diff --git a/node/tx.go b/node/tx.go index 9779101..9d031e3 100644 --- a/node/tx.go +++ b/node/tx.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/pkg/errors" + sdkerrors "cosmossdk.io/errors" abci "github.com/cometbft/cometbft/abci/types" comettypes "github.com/cometbft/cometbft/types" @@ -55,7 +57,7 @@ func (n *Node) txBroadcastLooper(ctx context.Context) error { } if err != nil { - panic(err) + return errors.Wrap(err, "failed to handle processed msgs") } } }