Skip to content

Commit

Permalink
make stage copiable and introduce allow to customize prefix of db
Browse files Browse the repository at this point in the history
  • Loading branch information
beer-1 committed Dec 6, 2024
1 parent 7be59a6 commit 6194495
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 65 deletions.
4 changes: 1 addition & 3 deletions challenger/child/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ func (ch *Child) endBlockHandler(ctx types.Context, args nodetypes.EndBlockArgs)
return err
}

err = ch.stage.ExecuteFnWithDB(ch.challenger.DB(), func() error {
return challengerdb.SavePendingChallenges(ch.stage, pendingChallenges)
})
err = challengerdb.SavePendingChallenges(ch.stage.WithPrefixedKey(ch.challenger.DB().PrefixedKey), pendingChallenges)
if err != nil {
return errors.Wrap(err, "failed to save pending events on child db")
}
Expand Down
8 changes: 2 additions & 6 deletions challenger/host/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err
}

// save all pending events to child db
err = h.stage.ExecuteFnWithDB(h.child.DB(), func() error {
return eventhandler.SavePendingEvents(h.stage, h.eventQueue)
})
err = eventhandler.SavePendingEvents(h.stage.WithPrefixedKey(h.child.DB().PrefixedKey), h.eventQueue)
if err != nil {
return errors.Wrap(err, "failed to save pending events on child db")
}
Expand Down Expand Up @@ -59,9 +57,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err
return err
}

err = h.stage.ExecuteFnWithDB(h.challenger.DB(), func() error {
return challengerdb.SavePendingChallenges(h.stage, pendingChallenges)
})
err = challengerdb.SavePendingChallenges(h.stage.WithPrefixedKey(h.challenger.DB().PrefixedKey), pendingChallenges)
if err != nil {
return errors.Wrap(err, "failed to save pending events on child db")
}
Expand Down
5 changes: 1 addition & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,5 @@ func (db LevelDB) GetPrefix() []byte {
}

func (db *LevelDB) NewStage() types.CommitDB {
return &Stage{
kvmap: make(map[string][]byte),
parent: db,
}
return newStage(db)
}
2 changes: 1 addition & 1 deletion db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestNewStage(t *testing.T) {
tstage := db.NewStage()
require.NotNil(t, tstage)

stage, ok := tstage.(*Stage)
stage, ok := tstage.(Stage)
require.True(t, ok)
require.Equal(t, stage.batch.Len(), 0)
require.Equal(t, len(stage.kvmap), 0)
Expand Down
55 changes: 27 additions & 28 deletions db/stage.go
Original file line number Diff line number Diff line change
@@ -1,73 +1,72 @@
package db

import (
dbtypes "github.com/initia-labs/opinit-bots/db/types"
"github.com/initia-labs/opinit-bots/types"
"github.com/syndtr/goleveldb/leveldb"
"golang.org/x/exp/maps"
)

type Stage struct {
batch leveldb.Batch
batch *leveldb.Batch
kvmap map[string][]byte
parent *LevelDB

prefixedKey func(key []byte) []byte
}

func newStage(parent *LevelDB) Stage {
return Stage{
batch: new(leveldb.Batch),
kvmap: make(map[string][]byte),
parent: parent,

prefixedKey: parent.PrefixedKey,
}
}

func (s Stage) WithPrefixedKey(prefixedKey func(key []byte) []byte) types.CommitDB {
s.prefixedKey = prefixedKey
return s
}

var _ types.CommitDB = (*Stage)(nil)
var _ types.CommitDB = Stage{}

func (s *Stage) Set(key []byte, value []byte) error {
prefixedKey := s.parent.PrefixedKey(key)
func (s Stage) Set(key []byte, value []byte) error {
prefixedKey := s.prefixedKey(key)
s.batch.Put(prefixedKey, value)
s.kvmap[string(prefixedKey)] = value
return nil
}

func (s Stage) Get(key []byte) ([]byte, error) {
prefixedKey := s.parent.PrefixedKey(key)
prefixedKey := s.prefixedKey(key)
value, ok := s.kvmap[string(prefixedKey)]
if ok {
return value, nil
}
return s.parent.Get(key)
}

func (s *Stage) Delete(key []byte) error {
prefixedKey := s.parent.PrefixedKey(key)
func (s Stage) Delete(key []byte) error {
prefixedKey := s.prefixedKey(key)
s.batch.Delete(prefixedKey)
s.kvmap[string(prefixedKey)] = nil
return nil
}

func (s *Stage) Commit() error {
err := s.parent.db.Write(&s.batch, nil)
func (s Stage) Commit() error {
err := s.parent.db.Write(s.batch, nil)
if err != nil {
return err
}
return nil
}

// ExecuteFnWithDB executes the given function with the given db.
// It temporarily sets the given db as the parent db of the stage and restores the original parent db after the function execution.
func (s *Stage) ExecuteFnWithDB(db types.DB, fn func() error) error {
existing := s.parent
defer func() {
s.parent = existing
}()

leveldb, ok := db.(*LevelDB)
if !ok {
return dbtypes.ErrInvalidParentDBType
}
s.parent = leveldb

return fn()
}

func (s Stage) Len() int {
return s.batch.Len()
}

func (s *Stage) Reset() {
func (s Stage) Reset() {
s.batch.Reset()
maps.Clear(s.kvmap)
}
Expand Down
23 changes: 10 additions & 13 deletions db/stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"golang.org/x/exp/maps"
)

func CreateTestStage(t *testing.T, db *LevelDB) (*LevelDB, *Stage, error) {
func CreateTestStage(t *testing.T, db *LevelDB) (*LevelDB, Stage, error) {
var err error
if db == nil {
db, err = NewMemDB()
require.NoError(t, err)
}
tstage := db.NewStage()
stage, ok := tstage.(*Stage)
stage, ok := tstage.(Stage)
require.True(t, ok)
return db, stage, nil
}
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestStageCommit(t *testing.T) {
require.Error(t, err)
}

func TestExecuteFnWithDB(t *testing.T) {
func TestWithPrefixedKey(t *testing.T) {
db, err := NewMemDB()
require.NoError(t, err)

Expand All @@ -214,14 +214,15 @@ func TestExecuteFnWithDB(t *testing.T) {
err = stage1.Set([]byte("key1"), []byte("value1"))
require.NoError(t, err)

err = stage1.ExecuteFnWithDB(db2, func() error {
err := stage1.Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)
return err
})
err = stage1.WithPrefixedKey(db2.PrefixedKey).Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)

// previous WithPrefixedKey should not affect the new one
err = stage1.Set([]byte("key2"), []byte("value2"))
require.NoError(t, err)

require.Equal(t, stage1.kvmap[string(db1.PrefixedKey([]byte("key1")))], []byte("value1"))
require.Equal(t, stage1.kvmap[string(db1.PrefixedKey([]byte("key2")))], []byte("value2"))
require.Equal(t, stage1.kvmap[string(db2.PrefixedKey([]byte("key1")))], []byte("value2"))
}

Expand All @@ -239,11 +240,7 @@ func TestStageAll(t *testing.T) {
err = stage1.Set([]byte("key1"), []byte("value1"))
require.NoError(t, err)

err = stage1.ExecuteFnWithDB(db2, func() error {
err := stage1.Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)
return err
})
err = stage1.WithPrefixedKey(db2.PrefixedKey).Set([]byte("key1"), []byte("value2"))
require.NoError(t, err)

allKVs := stage1.All()
Expand Down
4 changes: 1 addition & 3 deletions executor/batchsubmitter/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ func (bs *BatchSubmitter) rawBlockHandler(ctx types.Context, args nodetypes.RawB
}
if bs.da.HasBroadcaster() {
// save processed msgs to stage using host db
err := bs.stage.ExecuteFnWithDB(bs.da.DB(), func() error {
return broadcaster.SaveProcessedMsgsBatch(bs.stage, bs.da.Codec(), bs.processedMsgs)
})
err := broadcaster.SaveProcessedMsgsBatch(bs.stage.WithPrefixedKey(bs.da.DB().PrefixedKey), bs.da.Codec(), bs.processedMsgs)
if err != nil {
return errors.Wrap(err, "failed to save processed msgs")
}
Expand Down
4 changes: 1 addition & 3 deletions executor/child/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ func (ch *Child) endBlockHandler(ctx types.Context, args nodetypes.EndBlockArgs)
ch.AppendProcessedMsgs(broadcaster.MsgsToProcessedMsgs(ch.GetMsgQueue())...)

// save processed msgs to stage using host db
err := ch.stage.ExecuteFnWithDB(ch.host.DB(), func() error {
return broadcaster.SaveProcessedMsgsBatch(ch.stage, ch.host.Codec(), ch.GetProcessedMsgs())
})
err := broadcaster.SaveProcessedMsgsBatch(ch.stage.WithPrefixedKey(ch.host.DB().PrefixedKey), ch.host.Codec(), ch.GetProcessedMsgs())
if err != nil {
return errors.Wrap(err, "failed to save processed msgs")
}
Expand Down
4 changes: 1 addition & 3 deletions executor/host/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err
h.AppendProcessedMsgs(broadcaster.MsgsToProcessedMsgs(h.GetMsgQueue())...)

// save processed msgs to stage using child db
err := h.stage.ExecuteFnWithDB(h.child.DB(), func() error {
return broadcaster.SaveProcessedMsgsBatch(h.stage, h.child.Codec(), h.GetProcessedMsgs())
})
err := broadcaster.SaveProcessedMsgsBatch(h.stage.WithPrefixedKey(h.child.DB().PrefixedKey), h.child.Codec(), h.GetProcessedMsgs())
if err != nil {
return errors.Wrap(err, "failed to save processed msgs on child db")
}
Expand Down
2 changes: 1 addition & 1 deletion types/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type DB interface {
type CommitDB interface {
BasicDB

ExecuteFnWithDB(DB, func() error) error
WithPrefixedKey(prefixedKey func(key []byte) []byte) CommitDB
Commit() error
Reset()
Len() int
Expand Down

0 comments on commit 6194495

Please sign in to comment.