From 727b3f9d03e6d618b008745eb069d2082fe99399 Mon Sep 17 00:00:00 2001 From: sh-cha Date: Thu, 26 Sep 2024 20:24:53 +0900 Subject: [PATCH 1/3] handle empty output & executor & disabled batch submitter --- executor/batch/batch.go | 4 ---- executor/batch/handler.go | 22 ++++++++++++---------- executor/celestia/celestia.go | 30 ++++++++++++++++++++++++++---- executor/child/withdraw.go | 3 ++- executor/executor.go | 13 +++++++++++-- executor/host/batch.go | 15 +++++++++++---- executor/host/deposit.go | 4 ++-- executor/host/host.go | 2 +- executor/types/config.go | 17 ++++++++++++----- node/node.go | 28 +++++++++++++--------------- provider/child/msgs.go | 31 ++++++++++--------------------- provider/child/query.go | 14 +++++++++++--- provider/host/msgs.go | 12 ++++++++++-- provider/host/query.go | 14 +++++++++++--- types/errors.go | 5 +++++ 15 files changed, 137 insertions(+), 77 deletions(-) create mode 100644 types/errors.go diff --git a/executor/batch/batch.go b/executor/batch/batch.go index de8bc7f..786bfdf 100644 --- a/executor/batch/batch.go +++ b/executor/batch/batch.go @@ -155,10 +155,6 @@ func (bs *BatchSubmitter) Initialize(ctx context.Context, startHeight int64, hos } func (bs *BatchSubmitter) SetDANode(da executortypes.DANode) error { - if !da.HasKey() { - return errors.New("da has no key") - } - bs.da = da return nil } diff --git a/executor/batch/handler.go b/executor/batch/handler.go index a3b150f..b3b5cde 100644 --- a/executor/batch/handler.go +++ b/executor/batch/handler.go @@ -204,12 +204,13 @@ func (bs *BatchSubmitter) finalizeBatch(ctx context.Context, blockHeight int64) msg, err := bs.da.CreateBatchMsg(headerData) if err != nil { return err + } else if msg != nil { + bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{ + Msgs: []sdk.Msg{msg}, + Timestamp: time.Now().UnixNano(), + Save: true, + }) } - bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{ - Msgs: []sdk.Msg{msg}, - Timestamp: time.Now().UnixNano(), - Save: true, - }) for i, chunk := range chunks { chunkData := executortypes.MarshalBatchDataChunk( @@ -222,12 +223,13 @@ func (bs *BatchSubmitter) finalizeBatch(ctx context.Context, blockHeight int64) msg, err := bs.da.CreateBatchMsg(chunkData) if err != nil { return err + } else if msg != nil { + bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{ + Msgs: []sdk.Msg{msg}, + Timestamp: time.Now().UnixNano(), + Save: true, + }) } - bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{ - Msgs: []sdk.Msg{msg}, - Timestamp: time.Now().UnixNano(), - Save: true, - }) } bs.logger.Info("finalize batch", diff --git a/executor/celestia/celestia.go b/executor/celestia/celestia.go index d60eaba..004a123 100644 --- a/executor/celestia/celestia.go +++ b/executor/celestia/celestia.go @@ -3,6 +3,7 @@ package celestia import ( "context" "crypto/sha256" + "errors" "go.uber.org/zap" @@ -69,9 +70,11 @@ func NewDACelestia( panic(err) } - cfg.BroadcasterConfig.KeyringConfig.Address = batchSubmitter - cfg.BroadcasterConfig.BuildTxWithMessages = c.BuildTxWithMessages - cfg.BroadcasterConfig.PendingTxToProcessedMsgs = c.PendingTxToProcessedMsgs + if cfg.BroadcasterConfig != nil { + cfg.BroadcasterConfig.KeyringConfig.Address = batchSubmitter + cfg.BroadcasterConfig.BuildTxWithMessages = c.BuildTxWithMessages + cfg.BroadcasterConfig.PendingTxToProcessedMsgs = c.PendingTxToProcessedMsgs + } node, err := node.NewNode(cfg, db, logger, appCodec, txConfig) if err != nil { @@ -141,8 +144,11 @@ func (c Celestia) GetHeight() int64 { } func (c Celestia) CreateBatchMsg(rawBlob []byte) (sdk.Msg, error) { - submitter, err := c.node.MustGetBroadcaster().GetAddressString() + submitter, err := c.GetAddressStr() if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return nil, nil + } return nil, err } blob, err := sh.NewV0Blob(c.namespace, rawBlob) @@ -184,3 +190,19 @@ func (c Celestia) NamespaceID() []byte { chainIDhash := sha256.Sum256([]byte(c.batch.ChainID())) return chainIDhash[:10] } + +func (c Celestia) GetAddress() (sdk.AccAddress, error) { + broadcaster, err := c.node.GetBroadcaster() + if err != nil { + return nil, err + } + return broadcaster.GetAddress(), nil +} + +func (c Celestia) GetAddressStr() (string, error) { + broadcaster, err := c.node.GetBroadcaster() + if err != nil { + return "", err + } + return broadcaster.GetAddressString() +} diff --git a/executor/child/withdraw.go b/executor/child/withdraw.go index 2ea32e5..26b7e3e 100644 --- a/executor/child/withdraw.go +++ b/executor/child/withdraw.go @@ -166,8 +166,9 @@ func (ch *Child) handleOutput(blockHeight int64, version uint8, blockId []byte, ) if err != nil { return err + } else if msg != nil { + ch.AppendMsgQueue(msg) } - ch.AppendMsgQueue(msg) return nil } diff --git a/executor/executor.go b/executor/executor.go index 6b29a0f..0e0465e 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -179,10 +179,19 @@ func (ex *Executor) makeDANode(ctx context.Context, bridgeInfo opchildtypes.Brid ex.logger.Named(types.DAHostName), ex.cfg.DANode.Bech32Prefix, batchInfo.BatchInfo.Submitter, ) - if ex.host.GetAddress().Equals(da.GetAddress()) { + daAddr, err := da.GetAddress() + if err != nil && !errors.Is(err, types.ErrKeyNotSet) { + return nil, err + } + + hostAddr, err := ex.host.GetAddress() + if err == nil && hostAddr.Equals(daAddr) { return ex.host, nil + } else if err != nil && !errors.Is(err, types.ErrKeyNotSet) { + return nil, err } - err := da.InitializeDA(ctx, bridgeInfo) + + err = da.InitializeDA(ctx, bridgeInfo) return da, err case ophosttypes.BatchInfo_CHAIN_TYPE_CELESTIA: da := celestia.NewDACelestia(ex.cfg.Version, ex.cfg.DANodeConfig(ex.homePath), diff --git a/executor/host/batch.go b/executor/host/batch.go index c0b2cc8..b4009fd 100644 --- a/executor/host/batch.go +++ b/executor/host/batch.go @@ -2,21 +2,28 @@ package host import ( "context" + "errors" nodetypes "github.com/initia-labs/opinit-bots/node/types" hostprovider "github.com/initia-labs/opinit-bots/provider/host" + "github.com/initia-labs/opinit-bots/types" "go.uber.org/zap" ) func (h *Host) recordBatchHandler(_ context.Context, args nodetypes.EventHandlerArgs) error { - submitter, err := hostprovider.ParseMsgRecordBatch(args.EventAttributes) - if err != nil { - return err - } hostAddress, err := h.GetAddressStr() if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return nil + } return nil } + + submitter, err := hostprovider.ParseMsgRecordBatch(args.EventAttributes) + if err != nil { + return err + } + if submitter != hostAddress { return nil } diff --git a/executor/host/deposit.go b/executor/host/deposit.go index 94ab88b..3e3be4c 100644 --- a/executor/host/deposit.go +++ b/executor/host/deposit.go @@ -37,9 +37,9 @@ func (h *Host) initiateDepositHandler(_ context.Context, args nodetypes.EventHan ) if err != nil { return err + } else if msg != nil { + h.AppendMsgQueue(msg) } - - h.AppendMsgQueue(msg) return nil } diff --git a/executor/host/host.go b/executor/host/host.go index a5ea223..c782d31 100644 --- a/executor/host/host.go +++ b/executor/host/host.go @@ -52,7 +52,7 @@ func NewHostV1( cfg nodetypes.NodeConfig, db types.DB, logger *zap.Logger, bech32Prefix, batchSubmitter string, ) *Host { - if batchSubmitter != "" { + if cfg.BroadcasterConfig != nil && batchSubmitter != "" { cfg.BroadcasterConfig.Bech32Prefix = bech32Prefix cfg.BroadcasterConfig.KeyringConfig.Address = batchSubmitter } diff --git a/executor/types/config.go b/executor/types/config.go index aad1676..0d2f144 100644 --- a/executor/types/config.go +++ b/executor/types/config.go @@ -56,6 +56,10 @@ type Config struct { // If you don't want to use the bridge executor feature, you can leave it empty. BridgeExecutor string `json:"bridge_executor"` + // EnableBatchSubmitter is the flag to enable the batch submitter. + // If it is false, the batch submitter will not be started. + EnableBatchSubmitter bool `json:"enable_batch_submitter"` + // MaxChunks is the maximum number of chunks in a batch. MaxChunks int64 `json:"max_chunks"` // MaxChunkSize is the maximum size of a chunk in a batch. @@ -105,8 +109,9 @@ func DefaultConfig() *Config { TxTimeout: 60, }, - OutputSubmitter: "", - BridgeExecutor: "", + OutputSubmitter: "", + BridgeExecutor: "", + EnableBatchSubmitter: true, MaxChunks: 5000, MaxChunkSize: 300000, // 300KB @@ -214,7 +219,10 @@ func (cfg Config) DANodeConfig(homePath string) nodetypes.NodeConfig { nc := nodetypes.NodeConfig{ RPC: cfg.DANode.RPCAddress, ProcessType: nodetypes.PROCESS_TYPE_ONLY_BROADCAST, - BroadcasterConfig: &btypes.BroadcasterConfig{ + } + + if cfg.EnableBatchSubmitter { + nc.BroadcasterConfig = &btypes.BroadcasterConfig{ ChainID: cfg.DANode.ChainID, GasPrice: cfg.DANode.GasPrice, GasAdjustment: cfg.DANode.GasAdjustment, @@ -223,9 +231,8 @@ func (cfg Config) DANodeConfig(homePath string) nodetypes.NodeConfig { KeyringConfig: btypes.KeyringConfig{ HomePath: homePath, }, - }, + } } - return nc } diff --git a/node/node.go b/node/node.go index d7c6cd8..4bc213c 100644 --- a/node/node.go +++ b/node/node.go @@ -134,21 +134,19 @@ func (n *Node) Start(ctx context.Context) { } if n.cfg.ProcessType == nodetypes.PROCESS_TYPE_ONLY_BROADCAST { - if n.broadcaster == nil { - panic("broadcaster cannot be nil with nodetypes.PROCESS_TYPE_ONLY_BROADCAST") + if n.broadcaster != nil { + errGrp.Go(func() (err error) { + defer func() { + n.logger.Info("tx checker looper stopped") + if r := recover(); r != nil { + n.logger.Error("tx checker panic", zap.Any("recover", r)) + err = fmt.Errorf("tx checker panic: %v", r) + } + }() + + return n.txChecker(ctx) + }) } - - errGrp.Go(func() (err error) { - defer func() { - n.logger.Info("tx checker looper stopped") - if r := recover(); r != nil { - n.logger.Error("tx checker panic", zap.Any("recover", r)) - err = fmt.Errorf("tx checker panic: %v", r) - } - }() - - return n.txChecker(ctx) - }) } else { errGrp.Go(func() (err error) { defer func() { @@ -182,7 +180,7 @@ func (n Node) HasBroadcaster() bool { func (n Node) GetBroadcaster() (*broadcaster.Broadcaster, error) { if n.broadcaster == nil { - return nil, errors.New("cannot get broadcaster without broadcaster") + return nil, types.ErrKeyNotSet } return n.broadcaster, nil diff --git a/provider/child/msgs.go b/provider/child/msgs.go index 6e2aa52..85f9ad1 100644 --- a/provider/child/msgs.go +++ b/provider/child/msgs.go @@ -1,31 +1,14 @@ package child import ( + "errors" + opchildtypes "github.com/initia-labs/OPinit/x/opchild/types" "github.com/initia-labs/opinit-bots/types" sdk "github.com/cosmos/cosmos-sdk/types" ) -func (b BaseChild) GetMsgSetBridgeInfo( - bridgeInfo opchildtypes.BridgeInfo, -) (sdk.Msg, error) { - sender, err := b.node.MustGetBroadcaster().GetAddressString() - if err != nil { - return nil, err - } - - msg := opchildtypes.NewMsgSetBridgeInfo( - sender, - bridgeInfo, - ) - err = msg.Validate(b.node.AccountCodec()) - if err != nil { - return nil, err - } - return msg, nil -} - func (b BaseChild) GetMsgFinalizeTokenDeposit( from string, to string, @@ -35,8 +18,11 @@ func (b BaseChild) GetMsgFinalizeTokenDeposit( l1Denom string, data []byte, ) (sdk.Msg, error) { - sender, err := b.node.MustGetBroadcaster().GetAddressString() + sender, err := b.GetAddressStr() if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return nil, nil + } return nil, err } @@ -61,8 +47,11 @@ func (b BaseChild) GetMsgUpdateOracle( height int64, data []byte, ) (sdk.Msg, error) { - sender, err := b.node.MustGetBroadcaster().GetAddressString() + sender, err := b.GetAddressStr() if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return nil, nil + } return nil, err } diff --git a/provider/child/query.go b/provider/child/query.go index 135c0ee..e829de7 100644 --- a/provider/child/query.go +++ b/provider/child/query.go @@ -10,12 +10,20 @@ import ( "github.com/initia-labs/opinit-bots/node/rpcclient" ) -func (b BaseChild) GetAddress() sdk.AccAddress { - return b.node.MustGetBroadcaster().GetAddress() +func (b BaseChild) GetAddress() (sdk.AccAddress, error) { + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + return nil, err + } + return broadcaster.GetAddress(), nil } func (b BaseChild) GetAddressStr() (string, error) { - return b.node.MustGetBroadcaster().GetAddressString() + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + return "", err + } + return broadcaster.GetAddressString() } func (b BaseChild) QueryBridgeInfo(ctx context.Context) (opchildtypes.BridgeInfo, error) { diff --git a/provider/host/msgs.go b/provider/host/msgs.go index fcafd1c..8604a93 100644 --- a/provider/host/msgs.go +++ b/provider/host/msgs.go @@ -1,6 +1,8 @@ package host import ( + "errors" + ophosttypes "github.com/initia-labs/OPinit/x/ophost/types" "github.com/initia-labs/opinit-bots/types" @@ -13,8 +15,11 @@ func (b BaseHost) GetMsgProposeOutput( l2BlockNumber int64, outputRoot []byte, ) (sdk.Msg, error) { - sender, err := b.node.MustGetBroadcaster().GetAddressString() + sender, err := b.GetAddressStr() if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return nil, nil + } return nil, err } @@ -33,8 +38,11 @@ func (b BaseHost) GetMsgProposeOutput( } func (b BaseHost) CreateBatchMsg(batchBytes []byte) (sdk.Msg, error) { - submitter, err := b.node.MustGetBroadcaster().GetAddressString() + submitter, err := b.GetAddressStr() if err != nil { + if errors.Is(err, types.ErrKeyNotSet) { + return nil, nil + } return nil, err } diff --git a/provider/host/query.go b/provider/host/query.go index c653ba3..109d6d0 100644 --- a/provider/host/query.go +++ b/provider/host/query.go @@ -17,12 +17,20 @@ import ( "github.com/initia-labs/opinit-bots/types" ) -func (b BaseHost) GetAddress() sdk.AccAddress { - return b.node.MustGetBroadcaster().GetAddress() +func (b BaseHost) GetAddress() (sdk.AccAddress, error) { + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + return nil, err + } + return broadcaster.GetAddress(), nil } func (b BaseHost) GetAddressStr() (string, error) { - return b.node.MustGetBroadcaster().GetAddressString() + broadcaster, err := b.node.GetBroadcaster() + if err != nil { + return "", err + } + return broadcaster.GetAddressString() } func (b BaseHost) QueryBridgeConfig(ctx context.Context, bridgeId uint64) (*ophosttypes.QueryBridgeResponse, error) { diff --git a/types/errors.go b/types/errors.go new file mode 100644 index 0000000..d8d8ee1 --- /dev/null +++ b/types/errors.go @@ -0,0 +1,5 @@ +package types + +import "errors" + +var ErrKeyNotSet = errors.New("key not set") From 0909e6f6b994c8a5d73719d3c6e4a0e8f894aa53 Mon Sep 17 00:00:00 2001 From: sh-cha Date: Fri, 27 Sep 2024 11:58:55 +0900 Subject: [PATCH 2/3] add noop da --- executor/batch/batch.go | 3 +-- executor/batch/noop_da.go | 29 +++++++++++++++++++++++++++++ executor/executor.go | 37 ++++++++++++++++++++----------------- 3 files changed, 50 insertions(+), 19 deletions(-) create mode 100644 executor/batch/noop_da.go diff --git a/executor/batch/batch.go b/executor/batch/batch.go index 786bfdf..955d113 100644 --- a/executor/batch/batch.go +++ b/executor/batch/batch.go @@ -154,9 +154,8 @@ func (bs *BatchSubmitter) Initialize(ctx context.Context, startHeight int64, hos return nil } -func (bs *BatchSubmitter) SetDANode(da executortypes.DANode) error { +func (bs *BatchSubmitter) SetDANode(da executortypes.DANode) { bs.da = da - return nil } func (bs *BatchSubmitter) Start(ctx context.Context) { diff --git a/executor/batch/noop_da.go b/executor/batch/noop_da.go new file mode 100644 index 0000000..49f3ecb --- /dev/null +++ b/executor/batch/noop_da.go @@ -0,0 +1,29 @@ +package batch + +import ( + "context" + + sdk "github.com/cosmos/cosmos-sdk/types" + executortypes "github.com/initia-labs/opinit-bots/executor/types" + btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types" + nodetypes "github.com/initia-labs/opinit-bots/node/types" + "github.com/initia-labs/opinit-bots/types" +) + +var _ executortypes.DANode = &NoopDA{} + +type NoopDA struct { +} + +func NewNoopDA() *NoopDA { + return &NoopDA{} +} + +func (n NoopDA) Start(_ context.Context) {} +func (n NoopDA) HasKey() bool { return false } +func (n NoopDA) CreateBatchMsg(_ []byte) (sdk.Msg, error) { return nil, nil } +func (n NoopDA) BroadcastMsgs(nil btypes.ProcessedMsgs) {} +func (n NoopDA) ProcessedMsgsToRawKV(_ []btypes.ProcessedMsgs, _ bool) ([]types.RawKV, error) { + return nil, nil +} +func (n NoopDA) GetNodeStatus() nodetypes.Status { return nodetypes.Status{} } diff --git a/executor/executor.go b/executor/executor.go index 0e0465e..d733144 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -111,11 +111,7 @@ func (ex *Executor) Initialize(ctx context.Context) error { if err != nil { return err } - err = ex.batch.SetDANode(da) - if err != nil { - return err - } - + ex.batch.SetDANode(da) ex.RegisterQuerier() return nil } @@ -170,41 +166,48 @@ func (ex *Executor) RegisterQuerier() { } func (ex *Executor) makeDANode(ctx context.Context, bridgeInfo opchildtypes.BridgeInfo) (executortypes.DANode, error) { + if !ex.cfg.EnableBatchSubmitter { + return batch.NewNoopDA(), nil + } + batchInfo := ex.batch.BatchInfo() switch batchInfo.BatchInfo.ChainType { case ophosttypes.BatchInfo_CHAIN_TYPE_INITIA: - da := host.NewHostV1( + hostda := host.NewHostV1( ex.cfg.DANodeConfig(ex.homePath), ex.db.WithPrefix([]byte(types.DAHostName)), ex.logger.Named(types.DAHostName), ex.cfg.DANode.Bech32Prefix, batchInfo.BatchInfo.Submitter, ) - daAddr, err := da.GetAddress() - if err != nil && !errors.Is(err, types.ErrKeyNotSet) { + + // should exist + daAddr, err := hostda.GetAddress() + if err != nil { return nil, err } + // might not exist hostAddr, err := ex.host.GetAddress() - if err == nil && hostAddr.Equals(daAddr) { - return ex.host, nil - } else if err != nil && !errors.Is(err, types.ErrKeyNotSet) { + if err != nil && !errors.Is(err, types.ErrKeyNotSet) { return nil, err + } else if err == nil && hostAddr.Equals(daAddr) { + return ex.host, nil } - err = da.InitializeDA(ctx, bridgeInfo) - return da, err + err = hostda.InitializeDA(ctx, bridgeInfo) + return hostda, err case ophosttypes.BatchInfo_CHAIN_TYPE_CELESTIA: - da := celestia.NewDACelestia(ex.cfg.Version, ex.cfg.DANodeConfig(ex.homePath), + celestiada := celestia.NewDACelestia(ex.cfg.Version, ex.cfg.DANodeConfig(ex.homePath), ex.db.WithPrefix([]byte(types.DACelestiaName)), ex.logger.Named(types.DACelestiaName), ex.cfg.DANode.Bech32Prefix, batchInfo.BatchInfo.Submitter, ) - err := da.Initialize(ctx, ex.batch, bridgeInfo.BridgeId) + err := celestiada.Initialize(ctx, ex.batch, bridgeInfo.BridgeId) if err != nil { return nil, err } - da.RegisterDAHandlers() - return da, nil + celestiada.RegisterDAHandlers() + return celestiada, nil } return nil, fmt.Errorf("unsupported chain id for DA: %s", ophosttypes.BatchInfo_ChainType_name[int32(batchInfo.BatchInfo.ChainType)]) From 754876591c66b1de130696f8d1d55db2b3b73afa Mon Sep 17 00:00:00 2001 From: sh-cha Date: Fri, 27 Sep 2024 13:16:27 +0900 Subject: [PATCH 3/3] check empty broadcaster --- node/node.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/node/node.go b/node/node.go index 4bc213c..b346b45 100644 --- a/node/node.go +++ b/node/node.go @@ -134,19 +134,21 @@ func (n *Node) Start(ctx context.Context) { } if n.cfg.ProcessType == nodetypes.PROCESS_TYPE_ONLY_BROADCAST { - if n.broadcaster != nil { - errGrp.Go(func() (err error) { - defer func() { - n.logger.Info("tx checker looper stopped") - if r := recover(); r != nil { - n.logger.Error("tx checker panic", zap.Any("recover", r)) - err = fmt.Errorf("tx checker panic: %v", r) - } - }() - - return n.txChecker(ctx) - }) + if n.broadcaster == nil { + panic("broadcaster cannot be nil with nodetypes.PROCESS_TYPE_ONLY_BROADCAST") } + + errGrp.Go(func() (err error) { + defer func() { + n.logger.Info("tx checker looper stopped") + if r := recover(); r != nil { + n.logger.Error("tx checker panic", zap.Any("recover", r)) + err = fmt.Errorf("tx checker panic: %v", r) + } + }() + + return n.txChecker(ctx) + }) } else { errGrp.Go(func() (err error) { defer func() {