Skip to content

Commit

Permalink
remove duplicated codes
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-cha committed Aug 6, 2024
1 parent c23cc14 commit 8c36887
Showing 1 changed file with 0 additions and 56 deletions.
56 changes: 0 additions & 56 deletions executor/batch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@ func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error {
err := proto.Unmarshal(args.BlockBytes, pbb)
if err != nil {
return errors.Wrap(err, "failed to unmarshal block")
return errors.Wrap(err, "failed to unmarshal block")
}

err = bs.prepareBatch(args.BlockHeight, pbb.Header.Time)
if err != nil {
return errors.Wrap(err, "failed to prepare batch")
return errors.Wrap(err, "failed to prepare batch")
}

blockBytes, err := bs.emptyOracleData(pbb)
Expand All @@ -54,70 +52,50 @@ func (bs *BatchSubmitter) rawBlockHandler(args nodetypes.RawBlockArgs) error {
err = bs.checkBatch(args.BlockHeight, pbb.Header.Time)
if err != nil {
return errors.Wrap(err, "failed to check batch")
return errors.Wrap(err, "failed to check batch")
}

// store the processed state into db with batch operation
// store the processed state into db with batch operation
batchKVs := make([]types.RawKV, 0)
batchKVs = append(batchKVs, bs.node.SyncInfoToRawKV(args.BlockHeight))
batchMsgKVs, err := bs.da.ProcessedMsgsToRawKV(bs.processedMsgs, false)
batchMsgKVs, err := bs.da.ProcessedMsgsToRawKV(bs.processedMsgs, false)
if err != nil {
return errors.Wrap(err, "failed to convert processed messages to raw key value")
return errors.Wrap(err, "failed to convert processed messages to raw key value")
}
batchKVs = append(batchKVs, batchMsgKVs...)
if len(batchMsgKVs) > 0 {
batchKVs = append(batchKVs, batchMsgKVs...)
if len(batchMsgKVs) > 0 {
batchKVs = append(batchKVs, bs.SubmissionInfoToRawKV(pbb.Header.Time.UnixNano()))
}
err = bs.db.RawBatchSet(batchKVs...)
if err != nil {
return errors.Wrap(err, "failed to set raw batch")
return errors.Wrap(err, "failed to set raw batch")
}

// broadcast processed messages

// broadcast processed messages
for _, processedMsg := range bs.processedMsgs {
bs.da.BroadcastMsgs(processedMsg)
}

// clear processed messages

// clear processed messages
bs.processedMsgs = bs.processedMsgs[:0]


return nil
}

func (bs *BatchSubmitter) prepareBatch(blockHeight uint64, blockTime time.Time) error {
// check whether the requested block height is reached to the l2 block number of the next batch info.
if nextBatchInfo := bs.NextBatchInfo(); nextBatchInfo != nil && nextBatchInfo.Output.L2BlockNumber < blockHeight {

// if the next batch info is reached, finalize the current batch and update the batch info.

// if the next batch info is reached, finalize the current batch and update the batch info.
if bs.batchWriter != nil {
err := bs.batchWriter.Close()
if err != nil {
return errors.Wrap(err, "failed to close batch writer")
return errors.Wrap(err, "failed to close batch writer")
}
}
err := bs.batchFile.Truncate(0)
if err != nil {
return errors.Wrap(err, "failed to truncate batch file")
return errors.Wrap(err, "failed to truncate batch file")
}
_, err = bs.batchFile.Seek(0, 0)
if err != nil {
return errors.Wrap(err, "failed to seek batch file")
return errors.Wrap(err, "failed to seek batch file")
}

// save sync info
Expand All @@ -126,58 +104,44 @@ func (bs *BatchSubmitter) prepareBatch(blockHeight uint64, blockTime time.Time)
err = bs.node.SaveSyncInfo(nextBatchInfo.Output.L2BlockNumber)
if err != nil {
return errors.Wrap(err, "failed to save sync info")
return errors.Wrap(err, "failed to save sync info")
}


// set last processed block height to l2 block number
bs.node.SetSyncInfo(nextBatchInfo.Output.L2BlockNumber)
bs.DequeueBatchInfo()
bs.DequeueBatchInfo()

// error will restart block process from nextBatchInfo.Output.L2BlockNumber + 1
panic(fmt.Errorf("batch info updated: reset from %d", nextBatchInfo.Output.L2BlockNumber))
}

if bs.batchHeader != nil {
// if the batch header end is not set, it means the batch is not finalized yet.
// if the batch header end is not set, it means the batch is not finalized yet.
if bs.batchHeader.End == 0 {
return nil
}


err := bs.finalizeBatch(blockHeight)
if err != nil {
return errors.Wrap(err, "failed to finalize batch")
return errors.Wrap(err, "failed to finalize batch")
}

// update last submission time

// update last submission time
bs.lastSubmissionTime = blockTime
}

// reset batch header
var err error

// reset batch header
var err error
bs.batchHeader = &executortypes.BatchHeader{}


// linux command gzip use level 6 as default
bs.batchWriter, err = gzip.NewWriterLevel(bs.batchFile, 6)
if err != nil {
return err
}


return nil
}

// write block bytes to batch file
// write block bytes to batch file
func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error {
_, err := bs.batchWriter.Write(prependLength(blockBytes))
Expand All @@ -187,31 +151,26 @@ func (bs *BatchSubmitter) handleBatch(blockBytes []byte) error {
return nil
}

// finalize batch and create batch messages
// finalize batch and create batch messages
func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {

// write last block's commit to batch file
rawCommit, err := bs.node.GetRPCClient().QueryRawCommit(int64(blockHeight))
if err != nil {
return errors.Wrap(err, "failed to query raw commit")
return errors.Wrap(err, "failed to query raw commit")
}
_, err = bs.batchWriter.Write(prependLength(rawCommit))
if err != nil {
return errors.Wrap(err, "failed to write raw commit")
return errors.Wrap(err, "failed to write raw commit")
}
err = bs.batchWriter.Close()
if err != nil {
return errors.Wrap(err, "failed to close batch writer")
return errors.Wrap(err, "failed to close batch writer")
}

batchBuffer := make([]byte, bs.batchCfg.MaxChunkSize)
checksums := make([][]byte, 0)


// room for batch header
bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{
Timestamp: time.Now().UnixNano(),
Expand Down Expand Up @@ -244,8 +203,6 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {
}
}

// update batch header

// update batch header
bs.batchHeader.Chunks = checksums
headerBytes, err := json.Marshal(bs.batchHeader)
Expand All @@ -258,8 +215,6 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {
}
bs.processedMsgs[0].Msgs = []sdk.Msg{msg}

// reset batch file

// reset batch file
err = bs.batchFile.Truncate(0)
if err != nil {
Expand All @@ -270,21 +225,15 @@ func (bs *BatchSubmitter) finalizeBatch(blockHeight uint64) error {
return err
}


return nil
}

func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) error {
info, err := bs.batchFile.Stat()
if err != nil {
return errors.Wrap(err, "failed to get batch file stat")
return errors.Wrap(err, "failed to get batch file stat")
}

// if the block time is after the last submission time + submission interval * 2/3
// or the block time is after the last submission time + max submission time
// or the batch file size is greater than (max chunks - 1) * max chunk size
// then finalize the batch
// if the block time is after the last submission time + submission interval * 2/3
// or the block time is after the last submission time + max submission time
// or the batch file size is greater than (max chunks - 1) * max chunk size
Expand All @@ -299,7 +248,6 @@ func (bs *BatchSubmitter) checkBatch(blockHeight uint64, blockTime time.Time) er
bs.batchHeader.End = blockHeight
}


return nil
}

Expand All @@ -319,7 +267,6 @@ func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, output
})
}

// BatchInfo returns the current batch info
// BatchInfo returns the current batch info
func (bs *BatchSubmitter) BatchInfo() *ophosttypes.BatchInfoWithOutput {
bs.batchInfoMu.Lock()
Expand All @@ -328,7 +275,6 @@ func (bs *BatchSubmitter) BatchInfo() *ophosttypes.BatchInfoWithOutput {
return &bs.batchInfos[0]
}

// NextBatchInfo returns the next batch info in the queue
// NextBatchInfo returns the next batch info in the queue
func (bs *BatchSubmitter) NextBatchInfo() *ophosttypes.BatchInfoWithOutput {
bs.batchInfoMu.Lock()
Expand All @@ -339,8 +285,6 @@ func (bs *BatchSubmitter) NextBatchInfo() *ophosttypes.BatchInfoWithOutput {
return &bs.batchInfos[1]
}

// DequeueBatchInfo removes the first batch info from the queue
func (bs *BatchSubmitter) DequeueBatchInfo() {
// DequeueBatchInfo removes the first batch info from the queue
func (bs *BatchSubmitter) DequeueBatchInfo() {
bs.batchInfoMu.Lock()
Expand Down

0 comments on commit 8c36887

Please sign in to comment.