Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle empty output & executor & disabled batch submitter #23

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions executor/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,8 @@ func (bs *BatchSubmitter) Initialize(ctx context.Context, startHeight int64, hos
return nil
}

func (bs *BatchSubmitter) SetDANode(da executortypes.DANode) error {
if !da.HasKey() {
return errors.New("da has no key")
}

func (bs *BatchSubmitter) SetDANode(da executortypes.DANode) {
bs.da = da
return nil
}

func (bs *BatchSubmitter) Start(ctx context.Context) {
Expand Down
22 changes: 12 additions & 10 deletions executor/batch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,13 @@ func (bs *BatchSubmitter) finalizeBatch(ctx context.Context, blockHeight int64)
msg, err := bs.da.CreateBatchMsg(headerData)
if err != nil {
return err
} else if msg != nil {
bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{
Msgs: []sdk.Msg{msg},
Timestamp: time.Now().UnixNano(),
Save: true,
})
}
bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{
Msgs: []sdk.Msg{msg},
Timestamp: time.Now().UnixNano(),
Save: true,
})

for i, chunk := range chunks {
chunkData := executortypes.MarshalBatchDataChunk(
Expand All @@ -222,12 +223,13 @@ func (bs *BatchSubmitter) finalizeBatch(ctx context.Context, blockHeight int64)
msg, err := bs.da.CreateBatchMsg(chunkData)
if err != nil {
return err
} else if msg != nil {
bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{
Msgs: []sdk.Msg{msg},
Timestamp: time.Now().UnixNano(),
Save: true,
})
}
bs.processedMsgs = append(bs.processedMsgs, btypes.ProcessedMsgs{
Msgs: []sdk.Msg{msg},
Timestamp: time.Now().UnixNano(),
Save: true,
})
}

bs.logger.Info("finalize batch",
Expand Down
29 changes: 29 additions & 0 deletions executor/batch/noop_da.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package batch

import (
"context"

sdk "github.com/cosmos/cosmos-sdk/types"
executortypes "github.com/initia-labs/opinit-bots/executor/types"
btypes "github.com/initia-labs/opinit-bots/node/broadcaster/types"
nodetypes "github.com/initia-labs/opinit-bots/node/types"
"github.com/initia-labs/opinit-bots/types"
)

var _ executortypes.DANode = &NoopDA{}

type NoopDA struct {
}

func NewNoopDA() *NoopDA {
return &NoopDA{}
}

func (n NoopDA) Start(_ context.Context) {}
func (n NoopDA) HasKey() bool { return false }
func (n NoopDA) CreateBatchMsg(_ []byte) (sdk.Msg, error) { return nil, nil }
func (n NoopDA) BroadcastMsgs(nil btypes.ProcessedMsgs) {}
func (n NoopDA) ProcessedMsgsToRawKV(_ []btypes.ProcessedMsgs, _ bool) ([]types.RawKV, error) {
return nil, nil
}
func (n NoopDA) GetNodeStatus() nodetypes.Status { return nodetypes.Status{} }
30 changes: 26 additions & 4 deletions executor/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package celestia
import (
"context"
"crypto/sha256"
"errors"

"go.uber.org/zap"

Expand Down Expand Up @@ -69,9 +70,11 @@ func NewDACelestia(
panic(err)
}

cfg.BroadcasterConfig.KeyringConfig.Address = batchSubmitter
cfg.BroadcasterConfig.BuildTxWithMessages = c.BuildTxWithMessages
cfg.BroadcasterConfig.PendingTxToProcessedMsgs = c.PendingTxToProcessedMsgs
if cfg.BroadcasterConfig != nil {
cfg.BroadcasterConfig.KeyringConfig.Address = batchSubmitter
cfg.BroadcasterConfig.BuildTxWithMessages = c.BuildTxWithMessages
cfg.BroadcasterConfig.PendingTxToProcessedMsgs = c.PendingTxToProcessedMsgs
}

node, err := node.NewNode(cfg, db, logger, appCodec, txConfig)
if err != nil {
Expand Down Expand Up @@ -141,8 +144,11 @@ func (c Celestia) GetHeight() int64 {
}

func (c Celestia) CreateBatchMsg(rawBlob []byte) (sdk.Msg, error) {
submitter, err := c.node.MustGetBroadcaster().GetAddressString()
submitter, err := c.GetAddressStr()
if err != nil {
if errors.Is(err, types.ErrKeyNotSet) {
return nil, nil
}
return nil, err
}
blob, err := sh.NewV0Blob(c.namespace, rawBlob)
Expand Down Expand Up @@ -184,3 +190,19 @@ func (c Celestia) NamespaceID() []byte {
chainIDhash := sha256.Sum256([]byte(c.batch.ChainID()))
return chainIDhash[:10]
}

func (c Celestia) GetAddress() (sdk.AccAddress, error) {
broadcaster, err := c.node.GetBroadcaster()
if err != nil {
return nil, err
}
return broadcaster.GetAddress(), nil
}

func (c Celestia) GetAddressStr() (string, error) {
broadcaster, err := c.node.GetBroadcaster()
if err != nil {
return "", err
}
return broadcaster.GetAddressString()
}
3 changes: 2 additions & 1 deletion executor/child/withdraw.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ func (ch *Child) handleOutput(blockHeight int64, version uint8, blockId []byte,
)
if err != nil {
return err
} else if msg != nil {
ch.AppendMsgQueue(msg)
}
ch.AppendMsgQueue(msg)
return nil
}

Expand Down
38 changes: 25 additions & 13 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,7 @@ func (ex *Executor) Initialize(ctx context.Context) error {
if err != nil {
return err
}
err = ex.batch.SetDANode(da)
if err != nil {
return err
}

ex.batch.SetDANode(da)
ex.RegisterQuerier()
return nil
}
Expand Down Expand Up @@ -170,32 +166,48 @@ func (ex *Executor) RegisterQuerier() {
}

func (ex *Executor) makeDANode(ctx context.Context, bridgeInfo opchildtypes.BridgeInfo) (executortypes.DANode, error) {
if !ex.cfg.EnableBatchSubmitter {
return batch.NewNoopDA(), nil
}

batchInfo := ex.batch.BatchInfo()
switch batchInfo.BatchInfo.ChainType {
case ophosttypes.BatchInfo_CHAIN_TYPE_INITIA:
da := host.NewHostV1(
hostda := host.NewHostV1(
ex.cfg.DANodeConfig(ex.homePath),
ex.db.WithPrefix([]byte(types.DAHostName)),
ex.logger.Named(types.DAHostName),
ex.cfg.DANode.Bech32Prefix, batchInfo.BatchInfo.Submitter,
)
if ex.host.GetAddress().Equals(da.GetAddress()) {

// should exist
daAddr, err := hostda.GetAddress()
if err != nil {
return nil, err
}

// might not exist
hostAddr, err := ex.host.GetAddress()
if err != nil && !errors.Is(err, types.ErrKeyNotSet) {
return nil, err
} else if err == nil && hostAddr.Equals(daAddr) {
return ex.host, nil
}
err := da.InitializeDA(ctx, bridgeInfo)
return da, err

err = hostda.InitializeDA(ctx, bridgeInfo)
return hostda, err
case ophosttypes.BatchInfo_CHAIN_TYPE_CELESTIA:
da := celestia.NewDACelestia(ex.cfg.Version, ex.cfg.DANodeConfig(ex.homePath),
celestiada := celestia.NewDACelestia(ex.cfg.Version, ex.cfg.DANodeConfig(ex.homePath),
ex.db.WithPrefix([]byte(types.DACelestiaName)),
ex.logger.Named(types.DACelestiaName),
ex.cfg.DANode.Bech32Prefix, batchInfo.BatchInfo.Submitter,
)
err := da.Initialize(ctx, ex.batch, bridgeInfo.BridgeId)
err := celestiada.Initialize(ctx, ex.batch, bridgeInfo.BridgeId)
if err != nil {
return nil, err
}
da.RegisterDAHandlers()
return da, nil
celestiada.RegisterDAHandlers()
return celestiada, nil
}

return nil, fmt.Errorf("unsupported chain id for DA: %s", ophosttypes.BatchInfo_ChainType_name[int32(batchInfo.BatchInfo.ChainType)])
Expand Down
15 changes: 11 additions & 4 deletions executor/host/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,28 @@ package host

import (
"context"
"errors"

nodetypes "github.com/initia-labs/opinit-bots/node/types"
hostprovider "github.com/initia-labs/opinit-bots/provider/host"
"github.com/initia-labs/opinit-bots/types"
"go.uber.org/zap"
)

func (h *Host) recordBatchHandler(_ context.Context, args nodetypes.EventHandlerArgs) error {
submitter, err := hostprovider.ParseMsgRecordBatch(args.EventAttributes)
if err != nil {
return err
}
hostAddress, err := h.GetAddressStr()
if err != nil {
if errors.Is(err, types.ErrKeyNotSet) {
return nil
}
return nil
}

submitter, err := hostprovider.ParseMsgRecordBatch(args.EventAttributes)
if err != nil {
return err
}

if submitter != hostAddress {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions executor/host/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (h *Host) initiateDepositHandler(_ context.Context, args nodetypes.EventHan
)
if err != nil {
return err
} else if msg != nil {
h.AppendMsgQueue(msg)
}

h.AppendMsgQueue(msg)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewHostV1(
cfg nodetypes.NodeConfig,
db types.DB, logger *zap.Logger, bech32Prefix, batchSubmitter string,
) *Host {
if batchSubmitter != "" {
if cfg.BroadcasterConfig != nil && batchSubmitter != "" {
cfg.BroadcasterConfig.Bech32Prefix = bech32Prefix
cfg.BroadcasterConfig.KeyringConfig.Address = batchSubmitter
}
Expand Down
17 changes: 12 additions & 5 deletions executor/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type Config struct {
// If you don't want to use the bridge executor feature, you can leave it empty.
BridgeExecutor string `json:"bridge_executor"`

// EnableBatchSubmitter is the flag to enable the batch submitter.
// If it is false, the batch submitter will not be started.
EnableBatchSubmitter bool `json:"enable_batch_submitter"`

// MaxChunks is the maximum number of chunks in a batch.
MaxChunks int64 `json:"max_chunks"`
// MaxChunkSize is the maximum size of a chunk in a batch.
Expand Down Expand Up @@ -105,8 +109,9 @@ func DefaultConfig() *Config {
TxTimeout: 60,
},

OutputSubmitter: "",
BridgeExecutor: "",
OutputSubmitter: "",
BridgeExecutor: "",
EnableBatchSubmitter: true,

MaxChunks: 5000,
MaxChunkSize: 300000, // 300KB
Expand Down Expand Up @@ -214,7 +219,10 @@ func (cfg Config) DANodeConfig(homePath string) nodetypes.NodeConfig {
nc := nodetypes.NodeConfig{
RPC: cfg.DANode.RPCAddress,
ProcessType: nodetypes.PROCESS_TYPE_ONLY_BROADCAST,
BroadcasterConfig: &btypes.BroadcasterConfig{
}

if cfg.EnableBatchSubmitter {
nc.BroadcasterConfig = &btypes.BroadcasterConfig{
ChainID: cfg.DANode.ChainID,
GasPrice: cfg.DANode.GasPrice,
GasAdjustment: cfg.DANode.GasAdjustment,
Expand All @@ -223,9 +231,8 @@ func (cfg Config) DANodeConfig(homePath string) nodetypes.NodeConfig {
KeyringConfig: btypes.KeyringConfig{
HomePath: homePath,
},
},
}
}

return nc
}

Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (n Node) HasBroadcaster() bool {

func (n Node) GetBroadcaster() (*broadcaster.Broadcaster, error) {
if n.broadcaster == nil {
return nil, errors.New("cannot get broadcaster without broadcaster")
return nil, types.ErrKeyNotSet
}

return n.broadcaster, nil
Expand Down
31 changes: 10 additions & 21 deletions provider/child/msgs.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,14 @@
package child

import (
"errors"

opchildtypes "github.com/initia-labs/OPinit/x/opchild/types"
"github.com/initia-labs/opinit-bots/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

func (b BaseChild) GetMsgSetBridgeInfo(
bridgeInfo opchildtypes.BridgeInfo,
) (sdk.Msg, error) {
sender, err := b.node.MustGetBroadcaster().GetAddressString()
if err != nil {
return nil, err
}

msg := opchildtypes.NewMsgSetBridgeInfo(
sender,
bridgeInfo,
)
err = msg.Validate(b.node.AccountCodec())
if err != nil {
return nil, err
}
return msg, nil
}

func (b BaseChild) GetMsgFinalizeTokenDeposit(
from string,
to string,
Expand All @@ -35,8 +18,11 @@ func (b BaseChild) GetMsgFinalizeTokenDeposit(
l1Denom string,
data []byte,
) (sdk.Msg, error) {
sender, err := b.node.MustGetBroadcaster().GetAddressString()
sender, err := b.GetAddressStr()
if err != nil {
if errors.Is(err, types.ErrKeyNotSet) {
return nil, nil
}
return nil, err
}

Expand All @@ -61,8 +47,11 @@ func (b BaseChild) GetMsgUpdateOracle(
height int64,
data []byte,
) (sdk.Msg, error) {
sender, err := b.node.MustGetBroadcaster().GetAddressString()
sender, err := b.GetAddressStr()
if err != nil {
if errors.Is(err, types.ErrKeyNotSet) {
return nil, nil
}
return nil, err
}

Expand Down
Loading
Loading