11package batch
22
33import (
4+ "compress/gzip"
45 "context"
56 "errors"
6- "io"
77 "os"
88 "sync"
9- "time"
109
1110 "go.uber.org/zap"
1211
1312 opchildtypes "github.com/initia-labs/OPinit/x/opchild/types"
1413 ophosttypes "github.com/initia-labs/OPinit/x/ophost/types"
1514
16- dbtypes "github.com/initia-labs/opinit-bots/db/types"
1715 executortypes "github.com/initia-labs/opinit-bots/executor/types"
1816 "github.com/initia-labs/opinit-bots/node"
1917 btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types"
@@ -26,14 +24,6 @@ type hostNode interface {
2624 QueryBatchInfos (context.Context , uint64 ) (* ophosttypes.QueryBatchInfosResponse , error )
2725}
2826
29- type compressionFunc interface {
30- Write ([]byte ) (int , error )
31- Reset (io.Writer )
32- Close () error
33- }
34-
35- var SubmissionKey = []byte ("submission_time" )
36-
3727type BatchSubmitter struct {
3828 version uint8
3929
@@ -50,19 +40,17 @@ type BatchSubmitter struct {
5040
5141 opchildQueryClient opchildtypes.QueryClient
5242
53- batchInfoMu * sync.Mutex
54- batchInfos []ophosttypes.BatchInfoWithOutput
55- batchWriter compressionFunc
56- batchFile * os.File
57- batchHeader * executortypes.BatchHeader
43+ batchInfoMu * sync.Mutex
44+ batchInfos []ophosttypes.BatchInfoWithOutput
45+ batchWriter * gzip. Writer
46+ batchFile * os.File
47+ localBatchInfo * executortypes.LocalBatchInfo
5848
5949 processedMsgs []btypes.ProcessedMsgs
6050
6151 chainID string
6252 homePath string
6353
64- lastSubmissionTime time.Time
65-
6654 // status info
6755 LastBatchEndBlockNumber uint64
6856}
@@ -98,7 +86,8 @@ func NewBatchSubmitterV0(
9886
9987 opchildQueryClient : opchildtypes .NewQueryClient (node .GetRPCClient ()),
10088
101- batchInfoMu : & sync.Mutex {},
89+ batchInfoMu : & sync.Mutex {},
90+ localBatchInfo : & executortypes.LocalBatchInfo {},
10291
10392 processedMsgs : make ([]btypes.ProcessedMsgs , 0 ),
10493 homePath : homePath ,
@@ -131,17 +120,26 @@ func (bs *BatchSubmitter) Initialize(ctx context.Context, startHeight uint64, ho
131120 }
132121
133122 fileFlag := os .O_CREATE | os .O_RDWR
134- // if the node has already processed blocks, append to the file
135- if ! bs .node .HeightInitialized () {
123+ if bs .node .HeightInitialized () {
124+ bs .localBatchInfo .Start = bs .node .GetHeight ()
125+ bs .localBatchInfo .End = 0
126+ bs .localBatchInfo .BatchFileSize = 0
127+
128+ err = bs .saveLocalBatchInfo ()
129+ if err != nil {
130+ return err
131+ }
132+ } else {
133+ // if the node has already processed blocks, append to the file
136134 fileFlag |= os .O_APPEND
137135 }
138136
139137 bs .batchFile , err = os .OpenFile (bs .homePath + "/batch" , fileFlag , 0666 )
140138 if err != nil {
141139 return err
142140 }
143-
144- err = bs . LoadSubmissionInfo ( )
141+ // linux command gzip use level 6 as default
142+ bs . batchWriter , err = gzip . NewWriterLevel ( bs . batchFile , 6 )
145143 if err != nil {
146144 return err
147145 }
@@ -168,25 +166,6 @@ func (bs *BatchSubmitter) SetBridgeInfo(bridgeInfo opchildtypes.BridgeInfo) {
168166 bs .bridgeInfo = bridgeInfo
169167}
170168
171- func (bs * BatchSubmitter ) LoadSubmissionInfo () error {
172- val , err := bs .db .Get (SubmissionKey )
173- if err != nil {
174- if err == dbtypes .ErrNotFound {
175- return nil
176- }
177- return err
178- }
179- bs .lastSubmissionTime = time .Unix (0 , dbtypes .ToInt64 (val ))
180- return nil
181- }
182-
183- func (bs * BatchSubmitter ) SubmissionInfoToRawKV (timestamp int64 ) types.RawKV {
184- return types.RawKV {
185- Key : bs .db .PrefixedKey (SubmissionKey ),
186- Value : dbtypes .FromInt64 (timestamp ),
187- }
188- }
189-
190169func (bs * BatchSubmitter ) ChainID () string {
191170 return bs .chainID
192171}
0 commit comments