From 6194495c7f16f845c78de0196c5e54c3713e9bff Mon Sep 17 00:00:00 2001 From: beer-1 Date: Fri, 6 Dec 2024 16:53:02 +0900 Subject: [PATCH] make stage copiable and introduce allow to customize prefix of db --- challenger/child/handler.go | 4 +-- challenger/host/handler.go | 8 ++--- db/db.go | 5 +-- db/db_test.go | 2 +- db/stage.go | 55 +++++++++++++++--------------- db/stage_test.go | 23 ++++++------- executor/batchsubmitter/handler.go | 4 +-- executor/child/handler.go | 4 +-- executor/host/handler.go | 4 +-- types/db.go | 2 +- 10 files changed, 46 insertions(+), 65 deletions(-) diff --git a/challenger/child/handler.go b/challenger/child/handler.go index eb6acde..4b71cc9 100644 --- a/challenger/child/handler.go +++ b/challenger/child/handler.go @@ -80,9 +80,7 @@ func (ch *Child) endBlockHandler(ctx types.Context, args nodetypes.EndBlockArgs) return err } - err = ch.stage.ExecuteFnWithDB(ch.challenger.DB(), func() error { - return challengerdb.SavePendingChallenges(ch.stage, pendingChallenges) - }) + err = challengerdb.SavePendingChallenges(ch.stage.WithPrefixedKey(ch.challenger.DB().PrefixedKey), pendingChallenges) if err != nil { return errors.Wrap(err, "failed to save pending events on child db") } diff --git a/challenger/host/handler.go b/challenger/host/handler.go index 49b2f58..edfb844 100644 --- a/challenger/host/handler.go +++ b/challenger/host/handler.go @@ -24,9 +24,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err } // save all pending events to child db - err = h.stage.ExecuteFnWithDB(h.child.DB(), func() error { - return eventhandler.SavePendingEvents(h.stage, h.eventQueue) - }) + err = eventhandler.SavePendingEvents(h.stage.WithPrefixedKey(h.child.DB().PrefixedKey), h.eventQueue) if err != nil { return errors.Wrap(err, "failed to save pending events on child db") } @@ -59,9 +57,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err return err } - err = h.stage.ExecuteFnWithDB(h.challenger.DB(), func() error { - return challengerdb.SavePendingChallenges(h.stage, pendingChallenges) - }) + err = challengerdb.SavePendingChallenges(h.stage.WithPrefixedKey(h.challenger.DB().PrefixedKey), pendingChallenges) if err != nil { return errors.Wrap(err, "failed to save pending events on child db") } diff --git a/db/db.go b/db/db.go index a7a46c5..939d8c4 100644 --- a/db/db.go +++ b/db/db.go @@ -186,8 +186,5 @@ func (db LevelDB) GetPrefix() []byte { } func (db *LevelDB) NewStage() types.CommitDB { - return &Stage{ - kvmap: make(map[string][]byte), - parent: db, - } + return newStage(db) } diff --git a/db/db_test.go b/db/db_test.go index 0d945c7..92cad6d 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -364,7 +364,7 @@ func TestNewStage(t *testing.T) { tstage := db.NewStage() require.NotNil(t, tstage) - stage, ok := tstage.(*Stage) + stage, ok := tstage.(Stage) require.True(t, ok) require.Equal(t, stage.batch.Len(), 0) require.Equal(t, len(stage.kvmap), 0) diff --git a/db/stage.go b/db/stage.go index 680f524..f1bbf89 100644 --- a/db/stage.go +++ b/db/stage.go @@ -1,29 +1,45 @@ package db import ( - dbtypes "github.com/initia-labs/opinit-bots/db/types" "github.com/initia-labs/opinit-bots/types" "github.com/syndtr/goleveldb/leveldb" "golang.org/x/exp/maps" ) type Stage struct { - batch leveldb.Batch + batch *leveldb.Batch kvmap map[string][]byte parent *LevelDB + + prefixedKey func(key []byte) []byte +} + +func newStage(parent *LevelDB) Stage { + return Stage{ + batch: new(leveldb.Batch), + kvmap: make(map[string][]byte), + parent: parent, + + prefixedKey: parent.PrefixedKey, + } +} + +func (s Stage) WithPrefixedKey(prefixedKey func(key []byte) []byte) types.CommitDB { + s.prefixedKey = prefixedKey + return s } -var _ types.CommitDB = (*Stage)(nil) +var _ types.CommitDB = Stage{} -func (s *Stage) Set(key []byte, value []byte) error { - prefixedKey := s.parent.PrefixedKey(key) +func (s Stage) Set(key []byte, value []byte) error { + prefixedKey := s.prefixedKey(key) s.batch.Put(prefixedKey, value) s.kvmap[string(prefixedKey)] = value return nil } func (s Stage) Get(key []byte) ([]byte, error) { - prefixedKey := s.parent.PrefixedKey(key) + prefixedKey := s.prefixedKey(key) value, ok := s.kvmap[string(prefixedKey)] if ok { return value, nil @@ -31,43 +47,26 @@ func (s Stage) Get(key []byte) ([]byte, error) { return s.parent.Get(key) } -func (s *Stage) Delete(key []byte) error { - prefixedKey := s.parent.PrefixedKey(key) +func (s Stage) Delete(key []byte) error { + prefixedKey := s.prefixedKey(key) s.batch.Delete(prefixedKey) s.kvmap[string(prefixedKey)] = nil return nil } -func (s *Stage) Commit() error { - err := s.parent.db.Write(&s.batch, nil) +func (s Stage) Commit() error { + err := s.parent.db.Write(s.batch, nil) if err != nil { return err } return nil } -// ExecuteFnWithDB executes the given function with the given db. -// It temporarily sets the given db as the parent db of the stage and restores the original parent db after the function execution. -func (s *Stage) ExecuteFnWithDB(db types.DB, fn func() error) error { - existing := s.parent - defer func() { - s.parent = existing - }() - - leveldb, ok := db.(*LevelDB) - if !ok { - return dbtypes.ErrInvalidParentDBType - } - s.parent = leveldb - - return fn() -} - func (s Stage) Len() int { return s.batch.Len() } -func (s *Stage) Reset() { +func (s Stage) Reset() { s.batch.Reset() maps.Clear(s.kvmap) } diff --git a/db/stage_test.go b/db/stage_test.go index 6be4a78..346dfc7 100644 --- a/db/stage_test.go +++ b/db/stage_test.go @@ -7,14 +7,14 @@ import ( "golang.org/x/exp/maps" ) -func CreateTestStage(t *testing.T, db *LevelDB) (*LevelDB, *Stage, error) { +func CreateTestStage(t *testing.T, db *LevelDB) (*LevelDB, Stage, error) { var err error if db == nil { db, err = NewMemDB() require.NoError(t, err) } tstage := db.NewStage() - stage, ok := tstage.(*Stage) + stage, ok := tstage.(Stage) require.True(t, ok) return db, stage, nil } @@ -200,7 +200,7 @@ func TestStageCommit(t *testing.T) { require.Error(t, err) } -func TestExecuteFnWithDB(t *testing.T) { +func TestWithPrefixedKey(t *testing.T) { db, err := NewMemDB() require.NoError(t, err) @@ -214,14 +214,15 @@ func TestExecuteFnWithDB(t *testing.T) { err = stage1.Set([]byte("key1"), []byte("value1")) require.NoError(t, err) - err = stage1.ExecuteFnWithDB(db2, func() error { - err := stage1.Set([]byte("key1"), []byte("value2")) - require.NoError(t, err) - return err - }) + err = stage1.WithPrefixedKey(db2.PrefixedKey).Set([]byte("key1"), []byte("value2")) + require.NoError(t, err) + + // previous WithPrefixedKey should not affect the new one + err = stage1.Set([]byte("key2"), []byte("value2")) require.NoError(t, err) require.Equal(t, stage1.kvmap[string(db1.PrefixedKey([]byte("key1")))], []byte("value1")) + require.Equal(t, stage1.kvmap[string(db1.PrefixedKey([]byte("key2")))], []byte("value2")) require.Equal(t, stage1.kvmap[string(db2.PrefixedKey([]byte("key1")))], []byte("value2")) } @@ -239,11 +240,7 @@ func TestStageAll(t *testing.T) { err = stage1.Set([]byte("key1"), []byte("value1")) require.NoError(t, err) - err = stage1.ExecuteFnWithDB(db2, func() error { - err := stage1.Set([]byte("key1"), []byte("value2")) - require.NoError(t, err) - return err - }) + err = stage1.WithPrefixedKey(db2.PrefixedKey).Set([]byte("key1"), []byte("value2")) require.NoError(t, err) allKVs := stage1.All() diff --git a/executor/batchsubmitter/handler.go b/executor/batchsubmitter/handler.go index 8fd3714..b801c6f 100644 --- a/executor/batchsubmitter/handler.go +++ b/executor/batchsubmitter/handler.go @@ -70,9 +70,7 @@ func (bs *BatchSubmitter) rawBlockHandler(ctx types.Context, args nodetypes.RawB } if bs.da.HasBroadcaster() { // save processed msgs to stage using host db - err := bs.stage.ExecuteFnWithDB(bs.da.DB(), func() error { - return broadcaster.SaveProcessedMsgsBatch(bs.stage, bs.da.Codec(), bs.processedMsgs) - }) + err := broadcaster.SaveProcessedMsgsBatch(bs.stage.WithPrefixedKey(bs.da.DB().PrefixedKey), bs.da.Codec(), bs.processedMsgs) if err != nil { return errors.Wrap(err, "failed to save processed msgs") } diff --git a/executor/child/handler.go b/executor/child/handler.go index ad87c69..f48000e 100644 --- a/executor/child/handler.go +++ b/executor/child/handler.go @@ -54,9 +54,7 @@ func (ch *Child) endBlockHandler(ctx types.Context, args nodetypes.EndBlockArgs) ch.AppendProcessedMsgs(broadcaster.MsgsToProcessedMsgs(ch.GetMsgQueue())...) // save processed msgs to stage using host db - err := ch.stage.ExecuteFnWithDB(ch.host.DB(), func() error { - return broadcaster.SaveProcessedMsgsBatch(ch.stage, ch.host.Codec(), ch.GetProcessedMsgs()) - }) + err := broadcaster.SaveProcessedMsgsBatch(ch.stage.WithPrefixedKey(ch.host.DB().PrefixedKey), ch.host.Codec(), ch.GetProcessedMsgs()) if err != nil { return errors.Wrap(err, "failed to save processed msgs") } diff --git a/executor/host/handler.go b/executor/host/handler.go index 88c94f7..34c02af 100644 --- a/executor/host/handler.go +++ b/executor/host/handler.go @@ -30,9 +30,7 @@ func (h *Host) endBlockHandler(_ types.Context, args nodetypes.EndBlockArgs) err h.AppendProcessedMsgs(broadcaster.MsgsToProcessedMsgs(h.GetMsgQueue())...) // save processed msgs to stage using child db - err := h.stage.ExecuteFnWithDB(h.child.DB(), func() error { - return broadcaster.SaveProcessedMsgsBatch(h.stage, h.child.Codec(), h.GetProcessedMsgs()) - }) + err := broadcaster.SaveProcessedMsgsBatch(h.stage.WithPrefixedKey(h.child.DB().PrefixedKey), h.child.Codec(), h.GetProcessedMsgs()) if err != nil { return errors.Wrap(err, "failed to save processed msgs on child db") } diff --git a/types/db.go b/types/db.go index 553a968..92674e7 100644 --- a/types/db.go +++ b/types/db.go @@ -30,7 +30,7 @@ type DB interface { type CommitDB interface { BasicDB - ExecuteFnWithDB(DB, func() error) error + WithPrefixedKey(prefixedKey func(key []byte) []byte) CommitDB Commit() error Reset() Len() int