Skip to content

Commit 1078fb3

Browse files
committed
enable to start l1, l2, batch at specific height
1 parent 048b675 commit 1078fb3

File tree

10 files changed

+125
-27
lines changed

10 files changed

+125
-27
lines changed

executor/batch/batch.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,11 @@ func NewBatchSubmitter(
104104
return ch
105105
}
106106

107-
func (bs *BatchSubmitter) Initialize(host hostNode, bridgeInfo opchildtypes.BridgeInfo) error {
107+
func (bs *BatchSubmitter) Initialize(startHeight uint64, host hostNode, bridgeInfo opchildtypes.BridgeInfo) error {
108+
err := bs.node.Initialize(startHeight)
109+
if err != nil {
110+
return err
111+
}
108112
bs.host = host
109113
bs.bridgeInfo = bridgeInfo
110114

@@ -123,7 +127,13 @@ func (bs *BatchSubmitter) Initialize(host hostNode, bridgeInfo opchildtypes.Brid
123127
bs.DequeueBatchInfo()
124128
}
125129

126-
bs.batchFile, err = os.OpenFile(bs.homePath+"/batch", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
130+
fileFlag := os.O_CREATE | os.O_RDWR
131+
// if the node has already processed blocks, append to the file
132+
if bs.node.GetHeight()-1 != startHeight {
133+
fileFlag |= os.O_APPEND
134+
}
135+
136+
bs.batchFile, err = os.OpenFile(bs.homePath+"/batch", fileFlag, 0666)
127137
if err != nil {
128138
return err
129139
}

executor/batch/handler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ func (bs *BatchSubmitter) UpdateBatchInfo(chain string, submitter string, output
256256
bs.batchInfoMu.Lock()
257257
defer bs.batchInfoMu.Unlock()
258258

259+
// check if the batch info is already updated
260+
if bs.batchInfos[len(bs.batchInfos)-1].Output.L2BlockNumber >= l2BlockNumber {
261+
return
262+
}
263+
259264
bs.batchInfos = append(bs.batchInfos, ophosttypes.BatchInfoWithOutput{
260265
BatchInfo: ophosttypes.BatchInfo{
261266
ChainType: ophosttypes.BatchInfo_ChainType(ophosttypes.BatchInfo_ChainType_value["CHAIN_TYPE_"+chain]),

executor/celestia/celestia.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,13 @@ func createCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) {
9393
}
9494

9595
func (c *Celestia) Initialize(batch batchNode, bridgeId int64) error {
96+
err := c.node.Initialize(0)
97+
if err != nil {
98+
return err
99+
}
100+
96101
c.batch = batch
97102
c.bridgeId = bridgeId
98-
var err error
99103
c.namespace, err = sh.NewV0Namespace(c.NamespaceID())
100104
if err != nil {
101105
return err

executor/child/child.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,11 @@ func GetCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) {
109109
})
110110
}
111111

112-
func (ch *Child) Initialize(host hostNode, bridgeInfo opchildtypes.BridgeInfo) error {
112+
func (ch *Child) Initialize(startHeight uint64, host hostNode, bridgeInfo opchildtypes.BridgeInfo) error {
113+
err := ch.node.Initialize(startHeight)
114+
if err != nil {
115+
return err
116+
}
113117
ch.host = host
114118
ch.bridgeInfo = bridgeInfo
115119

executor/executor.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,15 +87,20 @@ func NewExecutor(cfg *executortypes.Config, db types.DB, sv *server.Server, logg
8787
zap.Duration("submission_interval", bridgeInfo.BridgeConfig.SubmissionInterval),
8888
)
8989

90-
err = executor.host.Initialize(executor.child, executor.batch, int64(bridgeInfo.BridgeId))
90+
hostStartHeight, childStartHeight, batchStartHeight, err := executor.getStartHeights()
9191
if err != nil {
9292
panic(err)
9393
}
94-
err = executor.child.Initialize(executor.host, bridgeInfo)
94+
95+
err = executor.host.Initialize(hostStartHeight, executor.child, executor.batch, int64(bridgeInfo.BridgeId))
96+
if err != nil {
97+
panic(err)
98+
}
99+
err = executor.child.Initialize(childStartHeight, executor.host, bridgeInfo)
95100
if err != nil {
96101
panic(err)
97102
}
98-
err = executor.batch.Initialize(executor.host, bridgeInfo)
103+
err = executor.batch.Initialize(batchStartHeight, executor.host, bridgeInfo)
99104
if err != nil {
100105
panic(err)
101106
}
@@ -201,3 +206,10 @@ func (ex *Executor) makeDANode(bridgeId int64) (executortypes.DANode, error) {
201206

202207
return nil, fmt.Errorf("unsupported chain id for DA: %s", ophosttypes.BatchInfo_ChainType_name[int32(batchInfo.BatchInfo.ChainType)])
203208
}
209+
210+
func (ex *Executor) getStartHeights() (l1StartHeight uint64, l2StartHeight uint64, batchStartHeight uint64, err error) {
211+
if ex.cfg.L2StartHeight != 0 {
212+
l1StartHeight, l2StartHeight, err = ex.host.QueryHeightsOfOutputTxWithL2BlockNumber(uint64(ex.cfg.L2StartHeight))
213+
}
214+
return l1StartHeight, l2StartHeight, uint64(ex.cfg.BatchStartHeight), err
215+
}

executor/host/host.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,11 @@ func GetCodec(bech32Prefix string) (codec.Codec, client.TxConfig, error) {
111111
})
112112
}
113113

114-
func (h *Host) Initialize(child childNode, batch batchNode, bridgeId int64) (err error) {
114+
func (h *Host) Initialize(startHeight uint64, child childNode, batch batchNode, bridgeId int64) error {
115+
err := h.node.Initialize(startHeight)
116+
if err != nil {
117+
return err
118+
}
115119
h.child = child
116120
h.batch = batch
117121
h.bridgeId = bridgeId

executor/host/query.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package host
22

33
import (
4+
"fmt"
5+
"strconv"
6+
47
sdk "github.com/cosmos/cosmos-sdk/types"
58
query "github.com/cosmos/cosmos-sdk/types/query"
69

@@ -57,3 +60,46 @@ func (h Host) QueryBatchInfos() (*ophosttypes.QueryBatchInfosResponse, error) {
5760
defer cancel()
5861
return h.ophostQueryClient.BatchInfos(ctx, req)
5962
}
63+
64+
func (h Host) QueryHeightsOfOutputTxWithL2BlockNumber(l2BlockNumber uint64) (uint64, uint64, error) {
65+
ctx, cancel := rpcclient.GetQueryContext(0)
66+
defer cancel()
67+
68+
query := fmt.Sprintf("%s.%s = %d AND %s.%s <= %d", ophosttypes.EventTypeProposeOutput,
69+
ophosttypes.AttributeKeyBridgeId,
70+
h.bridgeId,
71+
ophosttypes.EventTypeProposeOutput,
72+
ophosttypes.AttributeKeyL2BlockNumber,
73+
l2BlockNumber,
74+
)
75+
76+
perPage := 1
77+
res, err := h.node.GetRPCClient().TxSearch(ctx, query, false, nil, &perPage, "desc")
78+
if err != nil {
79+
return 0, 0, err
80+
}
81+
if len(res.Txs) == 0 {
82+
// no output tx found
83+
return 0, 0, nil
84+
}
85+
86+
l2StartHeight := uint64(0)
87+
LOOP:
88+
for _, event := range res.Txs[0].TxResult.Events {
89+
if event.Type == ophosttypes.EventTypeProposeOutput {
90+
for _, attr := range event.Attributes {
91+
if attr.Key == ophosttypes.AttributeKeyL2BlockNumber {
92+
l2StartHeight, err = strconv.ParseUint(attr.Value, 10, 64)
93+
if err != nil {
94+
return 0, 0, err
95+
}
96+
break LOOP
97+
}
98+
}
99+
}
100+
}
101+
if l2StartHeight == 0 {
102+
return 0, 0, fmt.Errorf("something wrong: l2 block number not found in the output tx")
103+
}
104+
return uint64(res.Txs[0].Height), l2StartHeight, nil
105+
}

executor/types/config.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ type Config struct {
5151
MaxChunkSize int64 `json:"max_chunk_size"`
5252
// MaxSubmissionTime is the maximum time to submit a batch.
5353
MaxSubmissionTime int64 `json:"max_submission_time"` // seconds
54+
55+
// L2StartHeight is the height to start the l2 node. If it is 0, it will start from the latest height.
56+
// If the latest height stored in the db is not 0, this config is ignored.
57+
// L2 starts from the last submitted output l2 block number + 1 before L2StartHeight.
58+
// L1 starts from the block number of the output tx + 1
59+
L2StartHeight int64 `json:"l2_start_height"`
60+
// StartBatchHeight is the height to start the batch. If it is 0, it will start from the latest height.
61+
// If the latest height stored in the db is not 0, this config is ignored.
62+
BatchStartHeight int64 `json:"batch_start_height"`
5463
}
5564

5665
type HostConfig struct {
@@ -73,7 +82,7 @@ func DefaultConfig() *Config {
7382

7483
L1ChainID: "testnet-l1-1",
7584
L2ChainID: "testnet-l2-1",
76-
DAChainID: "testnet-l3-1",
85+
DAChainID: "testnet-da-1",
7786

7887
L1Bech32Prefix: "init",
7988
L2Bech32Prefix: "init",
@@ -87,6 +96,9 @@ func DefaultConfig() *Config {
8796
MaxChunks: 5000,
8897
MaxChunkSize: 300000, // 300KB
8998
MaxSubmissionTime: 60 * 60, // 1 hour
99+
100+
L2StartHeight: 0,
101+
BatchStartHeight: 0,
90102
}
91103
}
92104

@@ -112,8 +124,9 @@ func (cfg Config) Validate() error {
112124
return errors.New("L2 chain ID is required")
113125
}
114126
if cfg.DAChainID == "" {
115-
return errors.New("L2 RPC URL is required")
127+
return errors.New("DA chain ID is required")
116128
}
129+
117130
if cfg.ListenAddress == "" {
118131
return errors.New("listen address is required")
119132
}

node/db.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ func (n *Node) SetSyncInfo(height uint64) {
1414
}
1515
}
1616

17-
func (n *Node) loadSyncInfo() error {
17+
func (n *Node) loadSyncInfo(startHeight uint64) error {
1818
data, err := n.db.Get(nodetypes.LastProcessedBlockHeightKey)
1919
if err == dbtypes.ErrNotFound {
20+
n.SetSyncInfo(startHeight)
2021
return nil
2122
} else if err != nil {
2223
return err

node/node.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,39 +62,38 @@ func NewNode(cfg nodetypes.NodeConfig, db types.DB, logger *zap.Logger, cdc code
6262
cdc: cdc,
6363
txConfig: txConfig,
6464
}
65-
6665
// check if node is catching up
67-
status, err := rpcClient.Status(context.Background())
66+
status, err := n.rpcClient.Status(context.Background())
6867
if err != nil {
6968
return nil, err
7069
}
7170
if status.SyncInfo.CatchingUp {
7271
return nil, errors.New("node is catching up")
7372
}
74-
7573
// create broadcaster
76-
if cfg.BroadcasterConfig != nil {
74+
if n.cfg.BroadcasterConfig != nil {
7775
n.broadcaster, err = broadcaster.NewBroadcaster(
78-
*cfg.BroadcasterConfig,
79-
db,
80-
logger,
81-
cdc,
82-
txConfig,
83-
rpcClient,
76+
*n.cfg.BroadcasterConfig,
77+
n.db,
78+
n.logger,
79+
n.cdc,
80+
n.txConfig,
81+
n.rpcClient,
8482
status,
8583
)
8684
if err != nil {
8785
return nil, errors.Wrap(err, "failed to create broadcaster")
8886
}
8987
}
88+
return n, nil
89+
}
9090

91+
// StartHeight is the height to start processing.
92+
// If it is 0, the latest height is used.
93+
// If the latest height exists in the database, this is ignored.
94+
func (n *Node) Initialize(startHeight uint64) error {
9195
// load sync info
92-
err = n.loadSyncInfo()
93-
if err != nil {
94-
return nil, err
95-
}
96-
97-
return n, nil
96+
return n.loadSyncInfo(startHeight)
9897
}
9998

10099
func (n *Node) Start(ctx context.Context) {

0 commit comments

Comments
 (0)