Skip to content

Commit b815d14

Browse files
authored
Fix/audit2 (#79)
* fix batch info selection logic * wait until all batch txs are processed before changing batch info * handle msg update oracle config * db actions out of iteration * add some test cases * add different bridge id test case * handle multiple output events
1 parent e4ebae9 commit b815d14

File tree

23 files changed

+504
-37
lines changed

23 files changed

+504
-37
lines changed

challenger/eventhandler/pending_events.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ import (
77
challengertypes "github.com/initia-labs/opinit-bots/challenger/types"
88
)
99

10+
func (ch *ChallengeEventHandler) SetPendingEvent(event challengertypes.ChallengeEvent) {
11+
ch.pendingEventsMu.Lock()
12+
defer ch.pendingEventsMu.Unlock()
13+
14+
ch.pendingEvents[event.Id()] = event
15+
}
16+
1017
func (ch *ChallengeEventHandler) SetPendingEvents(events []challengertypes.ChallengeEvent) {
1118
if len(events) == 0 {
1219
return

challenger/host/handler.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,21 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err
2929
return errors.Wrap(err, "failed to save pending events on child db")
3030
}
3131

32-
// save all pending events to host db
33-
// currently, only output event is considered as pending event
34-
if len(h.outputPendingEventQueue) > 1 || (len(h.outputPendingEventQueue) == 1 && h.outputPendingEventQueue[0].Type() != challengertypes.EventTypeOutput) {
35-
panic("must not happen, outputPendingEventQueue should have only one output event")
36-
}
32+
prevEvents := make([]challengertypes.ChallengeEvent, 0)
3733

38-
err = eventhandler.SavePendingEvents(h.stage, h.outputPendingEventQueue)
39-
if err != nil {
40-
return err
41-
}
34+
if len(h.outputPendingEventQueue) > 0 {
35+
// save the last output event to host db
36+
err = eventhandler.SavePendingEvent(h.stage, h.outputPendingEventQueue[len(h.outputPendingEventQueue)-1])
37+
if err != nil {
38+
return errors.Wrap(err, "failed to save pending event on host db")
39+
}
4240

43-
prevEvents := make([]challengertypes.ChallengeEvent, 0)
44-
for _, pendingEvent := range h.outputPendingEventQueue {
45-
prevEvent, ok := h.eventHandler.GetPrevPendingEvent(pendingEvent)
41+
prevEvent, ok := h.eventHandler.GetPrevPendingEvent(h.outputPendingEventQueue[0])
4642
if ok {
4743
prevEvents = append(prevEvents, prevEvent)
4844
}
4945
}
46+
5047
unprocessedEvents := h.eventHandler.GetUnprocessedPendingEvents(prevEvents)
5148
pendingChallenges, processedEvents := h.eventHandler.CheckTimeout(args.Block.Header.Time, unprocessedEvents)
5249
processedEvents = append(processedEvents, prevEvents...)
@@ -69,7 +66,10 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err
6966

7067
h.child.SetPendingEvents(h.eventQueue)
7168
h.eventHandler.DeletePendingEvents(processedEvents)
72-
h.eventHandler.SetPendingEvents(h.outputPendingEventQueue)
69+
// save the last output event
70+
if len(h.outputPendingEventQueue) > 0 {
71+
h.eventHandler.SetPendingEvent(h.outputPendingEventQueue[len(h.outputPendingEventQueue)-1])
72+
}
7373
h.challenger.SendPendingChallenges(pendingChallenges)
7474
return nil
7575
}

challenger/host/handler_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,79 @@ func TestEndBlockHandler(t *testing.T) {
214214
},
215215
},
216216
err: false,
217+
}, {
218+
name: "update multiple output events",
219+
child: NewMockChild(nil, 1, 1),
220+
challenger: NewMockChallenger(nil),
221+
pendingEvents: []challengertypes.ChallengeEvent{
222+
&challengertypes.Output{
223+
EventType: "Output",
224+
L2BlockNumber: 2,
225+
OutputIndex: 1,
226+
OutputRoot: []byte(""),
227+
Time: time.Unix(0, 10000).UTC(),
228+
Timeout: false,
229+
},
230+
},
231+
eventQueue: []challengertypes.ChallengeEvent{},
232+
outputPendingEventQueue: []challengertypes.ChallengeEvent{
233+
&challengertypes.Output{
234+
EventType: "Output",
235+
L2BlockNumber: 2,
236+
OutputIndex: 2,
237+
OutputRoot: []byte(""),
238+
Time: time.Unix(0, 11000).UTC(),
239+
Timeout: false,
240+
},
241+
&challengertypes.Output{
242+
EventType: "Output",
243+
L2BlockNumber: 2,
244+
OutputIndex: 3,
245+
OutputRoot: []byte(""),
246+
Time: time.Unix(0, 11000).UTC(),
247+
Timeout: false,
248+
},
249+
&challengertypes.Output{
250+
EventType: "Output",
251+
L2BlockNumber: 2,
252+
OutputIndex: 4,
253+
OutputRoot: []byte(""),
254+
Time: time.Unix(0, 11000).UTC(),
255+
Timeout: false,
256+
},
257+
},
258+
dbChanges: []types.KV{},
259+
endBlockArgs: nodetypes.EndBlockArgs{
260+
Block: cmtproto.Block{
261+
Header: cmtproto.Header{
262+
Height: 10,
263+
Time: time.Unix(0, 11000).UTC(),
264+
},
265+
},
266+
},
267+
expectedPendingEvents: []challengertypes.ChallengeEvent{
268+
&challengertypes.Output{
269+
EventType: "Output",
270+
L2BlockNumber: 2,
271+
OutputIndex: 4,
272+
OutputRoot: []byte(""),
273+
Time: time.Unix(0, 11000).UTC(),
274+
Timeout: false,
275+
},
276+
},
277+
expectedEventQueue: []challengertypes.ChallengeEvent{},
278+
expectedChallenges: []challengertypes.Challenge{},
279+
expectedDB: []types.KV{
280+
{
281+
Key: []byte("test_host/synced_height"),
282+
Value: []byte("10"),
283+
},
284+
{
285+
Key: append([]byte("test_host/pending_event/"), []byte{0x1, '/', 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x4}...),
286+
Value: []byte(`{"event_type":"Output","l2_block_number":2,"output_index":4,"output_root":"","time":"1970-01-01T00:00:00.000011Z","timeout":false}`),
287+
},
288+
},
289+
err: false,
217290
},
218291
{
219292
name: "output event timeout",

challenger/host/oracle.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,13 @@ import (
44
"time"
55

66
comettypes "github.com/cometbft/cometbft/types"
7+
"github.com/pkg/errors"
8+
"go.uber.org/zap"
79

810
challengertypes "github.com/initia-labs/opinit-bots/challenger/types"
11+
nodetypes "github.com/initia-labs/opinit-bots/node/types"
12+
hostprovider "github.com/initia-labs/opinit-bots/provider/host"
13+
"github.com/initia-labs/opinit-bots/types"
914
)
1015

1116
func (h *Host) oracleTxHandler(blockHeight int64, blockTime time.Time, oracleDataBytes comettypes.Tx) {
@@ -17,3 +22,21 @@ func (h *Host) oracleTxHandler(blockHeight int64, blockTime time.Time, oracleDat
1722

1823
h.eventQueue = append(h.eventQueue, oracle)
1924
}
25+
26+
func (h *Host) updateOracleConfigHandler(ctx types.Context, args nodetypes.EventHandlerArgs) error {
27+
bridgeId, oracleEnabled, err := hostprovider.ParseMsgUpdateOracleConfig(args.EventAttributes)
28+
if err != nil {
29+
return errors.Wrap(err, "failed to parse update oracle config event")
30+
}
31+
if bridgeId != h.BridgeId() {
32+
return nil
33+
}
34+
35+
ctx.Logger().Info("update oracle config",
36+
zap.Uint64("bridge_id", bridgeId),
37+
zap.Bool("oracle_enabled", oracleEnabled),
38+
)
39+
40+
h.UpdateOracleEnabled(oracleEnabled)
41+
return nil
42+
}

challenger/host/oracle_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package host
22

33
import (
4+
"context"
45
"testing"
56
"time"
67

@@ -10,7 +11,9 @@ import (
1011
"github.com/initia-labs/opinit-bots/node"
1112
nodetypes "github.com/initia-labs/opinit-bots/node/types"
1213
hostprovider "github.com/initia-labs/opinit-bots/provider/host"
14+
"github.com/initia-labs/opinit-bots/types"
1315
"github.com/stretchr/testify/require"
16+
"go.uber.org/zap"
1417
"golang.org/x/crypto/sha3"
1518
)
1619

@@ -74,3 +77,59 @@ func TestOracleTxHandler(t *testing.T) {
7477
}
7578
require.NoError(t, err)
7679
}
80+
81+
func TestUpdateOracleConfigHandler(t *testing.T) {
82+
db, err := db.NewMemDB()
83+
require.NoError(t, err)
84+
hostNode := node.NewTestNode(nodetypes.NodeConfig{}, db.WithPrefix([]byte("test_host")), nil, nil, nil, nil)
85+
86+
h := Host{
87+
BaseHost: hostprovider.NewTestBaseHost(0, hostNode, ophosttypes.QueryBridgeResponse{
88+
BridgeId: 1,
89+
BridgeConfig: ophosttypes.BridgeConfig{
90+
OracleEnabled: false,
91+
},
92+
}, nodetypes.NodeConfig{}, nil),
93+
}
94+
95+
cases := []struct {
96+
name string
97+
bridgeId uint64
98+
oracleEnabled bool
99+
expectedOracleEnabled bool
100+
err bool
101+
}{
102+
{
103+
name: "oracle enabled",
104+
bridgeId: 1,
105+
oracleEnabled: true,
106+
expectedOracleEnabled: true,
107+
},
108+
{
109+
name: "oracle disabled",
110+
bridgeId: 1,
111+
oracleEnabled: false,
112+
expectedOracleEnabled: false,
113+
},
114+
{
115+
name: "another bridge id",
116+
bridgeId: 2,
117+
oracleEnabled: true,
118+
expectedOracleEnabled: false,
119+
},
120+
}
121+
122+
for _, tc := range cases {
123+
t.Run(tc.name, func(t *testing.T) {
124+
err := h.updateOracleConfigHandler(types.NewContext(context.Background(), zap.NewNop(), ""), nodetypes.EventHandlerArgs{
125+
EventAttributes: hostprovider.UpdateOracleConfigEvents(tc.bridgeId, tc.oracleEnabled),
126+
})
127+
if tc.err {
128+
require.Error(t, err)
129+
} else {
130+
require.NoError(t, err)
131+
require.Equal(t, tc.expectedOracleEnabled, h.OracleEnabled())
132+
}
133+
})
134+
}
135+
}

executor/batchsubmitter/batch.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
// prepareBatch prepares the batch info for the given block height.
2222
// if there is more than one batch info in the queue and the start block height of the local batch info is greater than the l2 block number of the next batch info,
2323
// it finalizes the current batch and occurs a panic to restart the block process from the next batch info's l2 block number + 1.
24-
func (bs *BatchSubmitter) prepareBatch(blockHeight int64) error {
24+
func (bs *BatchSubmitter) prepareBatch(ctx types.Context, blockHeight int64) error {
2525
localBatchInfo, err := GetLocalBatchInfo(bs.DB())
2626
if err != nil {
2727
return errors.Wrap(err, "failed to get local batch info")
@@ -32,6 +32,35 @@ func (bs *BatchSubmitter) prepareBatch(blockHeight int64) error {
3232
if nextBatchInfo := bs.NextBatchInfo(); nextBatchInfo != nil &&
3333
types.MustUint64ToInt64(nextBatchInfo.Output.L2BlockNumber) < bs.localBatchInfo.Start {
3434
// if the next batch info is reached, finalize the current batch and update the batch info.
35+
36+
// wait until all processed msgs and pending txs are processed
37+
timer := time.NewTicker(1 * time.Second)
38+
defer timer.Stop()
39+
for {
40+
lenProcessedBatchMsgs, err := bs.da.LenProcessedBatchMsgs()
41+
if err != nil {
42+
return errors.Wrap(err, "failed to get processed msgs length")
43+
}
44+
lenPendingBatchTxs, err := bs.da.LenPendingBatchTxs()
45+
if err != nil {
46+
return errors.Wrap(err, "failed to get pending txs length")
47+
}
48+
if lenProcessedBatchMsgs == 0 && lenPendingBatchTxs == 0 {
49+
break
50+
}
51+
52+
ctx.Logger().Info("waiting for processed batch msgs and pending batch txs to be processed before changing batch info",
53+
zap.Int("processed_batch_msgs", lenProcessedBatchMsgs),
54+
zap.Int("pending_batch_txs", lenPendingBatchTxs),
55+
)
56+
57+
select {
58+
case <-ctx.Done():
59+
return ctx.Err()
60+
case <-timer.C:
61+
}
62+
}
63+
3564
if bs.batchWriter != nil {
3665
err := bs.batchWriter.Close()
3766
if err != nil {
@@ -56,15 +85,6 @@ func (bs *BatchSubmitter) prepareBatch(blockHeight int64) error {
5685
return errors.Wrap(err, "failed to set synced height")
5786
}
5887

59-
err = node.DeleteProcessedMsgs(bs.da.DB())
60-
if err != nil {
61-
return errors.Wrap(err, "failed to delete processed msgs")
62-
}
63-
err = node.DeletePendingTxs(bs.da.DB())
64-
if err != nil {
65-
return errors.Wrap(err, "failed to delete pending txs")
66-
}
67-
6888
// error will restart block process from nextBatchInfo.Output.L2BlockNumber + 1
6989
panic(fmt.Errorf("batch info updated: reset from %d", nextBatchInfo.Output.L2BlockNumber))
7090
}

executor/batchsubmitter/batch_submitter.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,14 @@ func (bs *BatchSubmitter) Initialize(ctx types.Context, syncedHeight int64, host
107107
if len(bs.batchInfos) == 0 {
108108
return errors.New("no batch info")
109109
}
110-
for _, batchInfo := range bs.batchInfos {
111-
if len(bs.batchInfos) == 1 || types.MustUint64ToInt64(batchInfo.Output.L2BlockNumber+1) >= bs.node.GetHeight() {
110+
111+
for i, batchInfo := range bs.batchInfos {
112+
if batchInfo.Output.L2BlockNumber != 0 && types.MustUint64ToInt64(batchInfo.Output.L2BlockNumber+1) > bs.node.GetHeight() {
112113
break
114+
} else if i > 0 {
115+
// dequeue the previous batch info
116+
bs.DequeueBatchInfo()
113117
}
114-
bs.DequeueBatchInfo()
115118
}
116119

117120
fileFlag := os.O_CREATE | os.O_RDWR | os.O_APPEND

executor/batchsubmitter/batch_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ func TestPrepareBatch(t *testing.T) {
191191
node: batchNode,
192192
batchInfoMu: &sync.Mutex{},
193193
batchInfos: tc.batchInfoQueue,
194+
da: NewMockDA(nil, nil, 1, ""),
194195
}
195196

196197
err = SaveLocalBatchInfo(batchDB, tc.existingLocalBatchInfo)
@@ -204,9 +205,10 @@ func TestPrepareBatch(t *testing.T) {
204205
require.NoError(t, err)
205206
defer batchSubmitter.batchWriter.Close()
206207

208+
ctx := types.NewContext(context.Background(), zap.NewNop(), "")
207209
if tc.panic {
208210
require.Panics(t, func() {
209-
batchSubmitter.prepareBatch(tc.blockHeight) //nolint:errcheck
211+
batchSubmitter.prepareBatch(ctx, tc.blockHeight) //nolint:errcheck
210212
})
211213
for _, expectedKV := range tc.expectedChanges {
212214
value, err := baseDB.Get(expectedKV.Key)
@@ -218,7 +220,7 @@ func TestPrepareBatch(t *testing.T) {
218220
require.NoError(t, err)
219221
require.Equal(t, int64(0), fileSize)
220222
} else {
221-
err := batchSubmitter.prepareBatch(tc.blockHeight)
223+
err := batchSubmitter.prepareBatch(ctx, tc.blockHeight)
222224
if tc.err {
223225
require.Error(t, err)
224226
} else {

executor/batchsubmitter/common_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ func (m *mockDA) BroadcastProcessedMsgs(msgs ...btypes.ProcessedMsgs) {
104104
m.processedMsgs = append(m.processedMsgs, msgs...)
105105
}
106106

107+
func (m mockDA) LenProcessedBatchMsgs() (int, error) {
108+
return len(m.processedMsgs), nil
109+
}
110+
111+
func (m mockDA) LenPendingBatchTxs() (int, error) {
112+
return 0, nil
113+
}
114+
107115
var _ executortypes.DANode = (*mockDA)(nil)
108116

109117
func logCapturer() (*zap.Logger, *observer.ObservedLogs) {

executor/batchsubmitter/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func (bs *BatchSubmitter) rawBlockHandler(ctx types.Context, args nodetypes.RawB
1818
bs.processedMsgs = bs.processedMsgs[:0]
1919
bs.stage.Reset()
2020

21-
err := bs.prepareBatch(args.BlockHeight)
21+
err := bs.prepareBatch(ctx, args.BlockHeight)
2222
if err != nil {
2323
return errors.Wrap(err, "failed to prepare batch")
2424
}

0 commit comments

Comments
 (0)