-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package badger_test | ||
|
||
import ( | ||
"errors" | ||
"testing" | ||
|
||
"github.com/dgraph-io/badger/v2" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/onflow/flow-go/module/metrics" | ||
"github.com/onflow/flow-go/storage" | ||
"github.com/onflow/flow-go/utils/unittest" | ||
|
||
badgerstorage "github.com/onflow/flow-go/storage/badger" | ||
) | ||
|
||
// TestCommitsStoreAndRetrieve tests that a commit can be stored, retrieved and attempted to be stored again without an error | ||
func TestCommitsStoreAndRetrieve(t *testing.T) { | ||
unittest.RunWithBadgerDB(t, func(db *badger.DB) { | ||
metrics := metrics.NewNoopCollector() | ||
store := badgerstorage.NewCommits(metrics, db) | ||
|
||
// attempt to get a invalid commit | ||
_, err := store.ByBlockID(unittest.IdentifierFixture()) | ||
assert.True(t, errors.Is(err, storage.ErrNotFound)) | ||
|
||
// store a commit in db | ||
blockID := unittest.IdentifierFixture() | ||
expected := unittest.StateCommitmentFixture() | ||
err = store.Store(blockID, expected) | ||
require.NoError(t, err) | ||
|
||
// retrieve the commit by ID | ||
actual, err := store.ByBlockID(blockID) | ||
require.NoError(t, err) | ||
assert.Equal(t, expected, actual) | ||
|
||
// re-insert the commit - should be idempotent | ||
err = store.Store(blockID, expected) | ||
require.NoError(t, err) | ||
}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package badger | ||
|
||
import ( | ||
"github.com/dgraph-io/badger/v2" | ||
|
||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/module" | ||
"github.com/onflow/flow-go/module/metrics" | ||
"github.com/onflow/flow-go/storage" | ||
"github.com/onflow/flow-go/storage/badger/operation" | ||
"github.com/onflow/flow-go/storage/badger/transaction" | ||
) | ||
|
||
type Commits struct { | ||
db *badger.DB | ||
cache *Cache[flow.Identifier, flow.StateCommitment] | ||
} | ||
|
||
func NewCommits(collector module.CacheMetrics, db *badger.DB) *Commits { | ||
|
||
store := func(blockID flow.Identifier, commit flow.StateCommitment) func(*transaction.Tx) error { | ||
return transaction.WithTx(operation.SkipDuplicates(operation.IndexStateCommitment(blockID, commit))) | ||
} | ||
|
||
retrieve := func(blockID flow.Identifier) func(tx *badger.Txn) (flow.StateCommitment, error) { | ||
return func(tx *badger.Txn) (flow.StateCommitment, error) { | ||
var commit flow.StateCommitment | ||
err := operation.LookupStateCommitment(blockID, &commit)(tx) | ||
return commit, err | ||
} | ||
} | ||
|
||
c := &Commits{ | ||
db: db, | ||
cache: newCache[flow.Identifier, flow.StateCommitment](collector, metrics.ResourceCommit, | ||
withLimit[flow.Identifier, flow.StateCommitment](1000), | ||
withStore(store), | ||
withRetrieve(retrieve), | ||
), | ||
} | ||
|
||
return c | ||
} | ||
|
||
func (c *Commits) storeTx(blockID flow.Identifier, commit flow.StateCommitment) func(*transaction.Tx) error { | ||
return c.cache.PutTx(blockID, commit) | ||
} | ||
|
||
func (c *Commits) retrieveTx(blockID flow.Identifier) func(tx *badger.Txn) (flow.StateCommitment, error) { | ||
return func(tx *badger.Txn) (flow.StateCommitment, error) { | ||
val, err := c.cache.Get(blockID)(tx) | ||
if err != nil { | ||
return flow.DummyStateCommitment, err | ||
} | ||
return val, nil | ||
} | ||
} | ||
|
||
func (c *Commits) Store(blockID flow.Identifier, commit flow.StateCommitment) error { | ||
return operation.RetryOnConflictTx(c.db, transaction.Update, c.storeTx(blockID, commit)) | ||
} | ||
|
||
// BatchStore stores Commit keyed by blockID in provided batch | ||
// No errors are expected during normal operation, even if no entries are matched. | ||
// If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned. | ||
func (c *Commits) BatchStore(blockID flow.Identifier, commit flow.StateCommitment, batch storage.BatchStorage) error { | ||
// we can't cache while using batches, as it's unknown at this point when, and if | ||
// the batch will be committed. Cache will be populated on read however. | ||
writeBatch := batch.GetWriter() | ||
return operation.BatchIndexStateCommitment(blockID, commit)(writeBatch) | ||
} | ||
|
||
func (c *Commits) ByBlockID(blockID flow.Identifier) (flow.StateCommitment, error) { | ||
tx := c.db.NewTransaction(false) | ||
defer tx.Discard() | ||
return c.retrieveTx(blockID)(tx) | ||
} | ||
|
||
func (c *Commits) RemoveByBlockID(blockID flow.Identifier) error { | ||
return c.db.Update(operation.SkipNonExist(operation.RemoveStateCommitment(blockID))) | ||
} | ||
|
||
// BatchRemoveByBlockID removes Commit keyed by blockID in provided batch | ||
// No errors are expected during normal operation, even if no entries are matched. | ||
// If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned. | ||
func (c *Commits) BatchRemoveByBlockID(blockID flow.Identifier, batch storage.BatchStorage) error { | ||
writeBatch := batch.GetWriter() | ||
return operation.BatchRemoveStateCommitment(blockID)(writeBatch) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package badger | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
|
||
"github.com/dgraph-io/badger/v2" | ||
|
||
"github.com/onflow/flow-go/storage" | ||
) | ||
|
||
func handleError(err error, t interface{}) error { | ||
if err != nil { | ||
if errors.Is(err, badger.ErrKeyNotFound) { | ||
return storage.ErrNotFound | ||
} | ||
|
||
return fmt.Errorf("could not retrieve %T: %w", t, err) | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package badger | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/dgraph-io/badger/v2" | ||
|
||
"github.com/onflow/flow-go/storage/badger/operation" | ||
) | ||
|
||
type ConsumerProgress struct { | ||
db *badger.DB | ||
consumer string // to distinguish the consume progress between different consumers | ||
} | ||
|
||
func NewConsumerProgress(db *badger.DB, consumer string) *ConsumerProgress { | ||
return &ConsumerProgress{ | ||
db: db, | ||
consumer: consumer, | ||
} | ||
} | ||
|
||
func (cp *ConsumerProgress) ProcessedIndex() (uint64, error) { | ||
var processed uint64 | ||
err := cp.db.View(operation.RetrieveProcessedIndex(cp.consumer, &processed)) | ||
Check failure on line 25 in storage/badger/consumer_progress.go
|
||
if err != nil { | ||
return 0, fmt.Errorf("failed to retrieve processed index: %w", err) | ||
} | ||
return processed, nil | ||
} | ||
|
||
// InitProcessedIndex insert the default processed index to the storage layer, can only be done once. | ||
// initialize for the second time will return storage.ErrAlreadyExists | ||
func (cp *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { | ||
err := operation.RetryOnConflict(cp.db.Update, operation.InsertProcessedIndex(cp.consumer, defaultIndex)) | ||
Check failure on line 35 in storage/badger/consumer_progress.go
|
||
if err != nil { | ||
return fmt.Errorf("could not update processed index: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (cp *ConsumerProgress) SetProcessedIndex(processed uint64) error { | ||
err := operation.RetryOnConflict(cp.db.Update, operation.SetProcessedIndex(cp.consumer, processed)) | ||
Check failure on line 44 in storage/badger/consumer_progress.go
|
||
if err != nil { | ||
return fmt.Errorf("could not update processed index: %w", err) | ||
} | ||
|
||
return nil | ||
} |