From 9933ad8ee3c8ff0ddc0be37b078edd23a6899e28 Mon Sep 17 00:00:00 2001 From: Facundo Date: Tue, 4 Feb 2025 16:37:04 +0100 Subject: [PATCH 1/9] add batch extender --- sequencing/sequencer.go | 135 +------------------------ sequencing/sequencer_test.go | 4 +- sequencing/transaction_queue.go | 173 ++++++++++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 134 deletions(-) create mode 100644 sequencing/transaction_queue.go diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index dd7c9bb..da80626 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -3,7 +3,6 @@ package sequencing import ( "bytes" "context" - "crypto/sha256" "encoding/hex" "errors" "fmt" @@ -38,7 +37,7 @@ type BatchQueue struct { mu sync.Mutex } -// NewBatchQueue creates a new TransactionQueue +// NewBatchQueue creates a new BatchQueue func NewBatchQueue() *BatchQueue { return &BatchQueue{ queue: make([]sequencing.Batch, 0), @@ -134,134 +133,6 @@ func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { return err } -// TransactionQueue is a queue of transactions -type TransactionQueue struct { - queue []sequencing.Tx - mu sync.Mutex -} - -// NewTransactionQueue creates a new TransactionQueue -func NewTransactionQueue() *TransactionQueue { - return &TransactionQueue{ - queue: make([]sequencing.Tx, 0), - } -} - -// GetTransactionHash to get hash from transaction bytes using SHA-256 -func GetTransactionHash(txBytes []byte) string { - hashBytes := sha256.Sum256(txBytes) - return hex.EncodeToString(hashBytes[:]) -} - -// AddTransaction adds a new transaction to the queue -func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error { - tq.mu.Lock() - tq.queue = append(tq.queue, tx) - tq.mu.Unlock() - - // Store transaction in BadgerDB - err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) - }) - return err -} - -// GetNextBatch extracts a batch of transactions from the queue -func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { - tq.mu.Lock() - defer tq.mu.Unlock() - - var batch [][]byte - batchSize := len(tq.queue) - if batchSize == 0 { - return sequencing.Batch{Transactions: nil} - } - for { - batch = tq.queue[:batchSize] - blobSize := totalBytes(batch) - if uint64(blobSize) <= max { - break - } - batchSize = batchSize - 1 - } - - // Retrieve transactions from BadgerDB and remove processed ones - for _, tx := range batch { - txHash := GetTransactionHash(tx) - err := db.Update(func(txn *badger.Txn) error { - // Get and then delete the transaction from BadgerDB - _, err := txn.Get([]byte(txHash)) - if err != nil { - return err - } - return txn.Delete([]byte(txHash)) // Remove processed transaction - }) - if err != nil { - return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails - } - } - tq.queue = tq.queue[batchSize:] - return sequencing.Batch{Transactions: batch} -} - -// LoadFromDB reloads all transactions from BadgerDB into the in-memory queue after a crash. -func (tq *TransactionQueue) LoadFromDB(db *badger.DB) error { - tq.mu.Lock() - defer tq.mu.Unlock() - - // Start a read-only transaction - err := db.View(func(txn *badger.Txn) error { - // Create an iterator to go through all transactions stored in BadgerDB - it := txn.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() // Ensure that the iterator is properly closed - - // Iterate through all items in the database - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - err := item.Value(func(val []byte) error { - // Load each transaction from DB and add to the in-memory queue - tq.queue = append(tq.queue, val) - return nil - }) - if err != nil { - return err - } - } - return nil - }) - - return err -} - -// AddBatchBackToQueue re-adds the batch to the transaction queue (and BadgerDB) after a failure. -func (tq *TransactionQueue) AddBatchBackToQueue(batch sequencing.Batch, db *badger.DB) error { - tq.mu.Lock() - defer tq.mu.Unlock() - - // Add the batch back to the in-memory transaction queue - tq.queue = append(tq.queue, batch.Transactions...) - - // Optionally, persist the batch back to BadgerDB - for _, tx := range batch.Transactions { - err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) // Store transaction back in DB - }) - if err != nil { - return fmt.Errorf("failed to revert transaction to DB: %w", err) - } - } - - return nil -} - -func totalBytes(data [][]byte) int { - total := 0 - for _, sub := range data { - total += len(sub) - } - return total -} - // Sequencer implements go-sequencing interface type Sequencer struct { dalc *da.DAClient @@ -286,7 +157,7 @@ type Sequencer struct { } // NewSequencer ... -func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId []byte, batchTime time.Duration, metrics *Metrics, dbPath string) (*Sequencer, error) { +func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId []byte, batchTime time.Duration, metrics *Metrics, dbPath string, extender BatchExtender) (*Sequencer, error) { ctx := context.Background() dac, err := proxyda.NewClient(daAddress, daAuthToken) if err != nil { @@ -316,7 +187,7 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId [] ctx: ctx, maxSize: maxBlobSize, rollupId: rollupId, - tq: NewTransactionQueue(), + tq: NewTransactionQueue(extender), bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), db: db, diff --git a/sequencing/sequencer_test.go b/sequencing/sequencer_test.go index 58be4e5..09b3b0b 100644 --- a/sequencing/sequencer_test.go +++ b/sequencing/sequencer_test.go @@ -52,7 +52,7 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv func TestNewSequencer(t *testing.T) { // Create a new sequencer with mock DA client metrics, _ := NopMetrics() - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "") + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "", nil) require.NoError(t, err) defer func() { err := seq.Close() @@ -69,7 +69,7 @@ func TestNewSequencer(t *testing.T) { func TestSequencer_SubmitRollupTransaction(t *testing.T) { // Initialize a new sequencer metrics, _ := NopMetrics() - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "") + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "", nil) require.NoError(t, err) defer func() { err := seq.Close() diff --git a/sequencing/transaction_queue.go b/sequencing/transaction_queue.go new file mode 100644 index 0000000..7350e05 --- /dev/null +++ b/sequencing/transaction_queue.go @@ -0,0 +1,173 @@ +package sequencing + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + + "github.com/dgraph-io/badger/v3" + "github.com/rollkit/go-sequencing" +) + +type BatchExtender interface { + Head(max uint64) ([]byte, error) + Tail(max uint64) ([]byte, error) +} + +// TransactionQueue is a queue of transactions +type TransactionQueue struct { + queue []sequencing.Tx + mu sync.Mutex + extender BatchExtender +} + +// NewTransactionQueue creates a new TransactionQueue +func NewTransactionQueue(extender BatchExtender) *TransactionQueue { + return &TransactionQueue{ + queue: make([]sequencing.Tx, 0), + extender: extender, + } +} + +// GetTransactionHash to get hash from transaction bytes using SHA-256 +func GetTransactionHash(txBytes []byte) string { + hashBytes := sha256.Sum256(txBytes) + return hex.EncodeToString(hashBytes[:]) +} + +// AddTransaction adds a new transaction to the queue +func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error { + tq.mu.Lock() + tq.queue = append(tq.queue, tx) + tq.mu.Unlock() + + // Store transaction in BadgerDB + err := db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(GetTransactionHash(tx)), tx) + }) + return err +} + +// GetNextBatch extracts a batch of transactions from the queue +func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Add head of batch if extender is provided, also ask for the tail of the batch + var head, tail []byte + if tq.extender != nil { + var err error + head, err = tq.extender.Head(max) + if err != nil { + return sequencing.Batch{Transactions: nil} + } + tail, err = tq.extender.Tail(max) + if err != nil { + return sequencing.Batch{Transactions: nil} + } + } + + batch := tq.queue + headTailSize := len(head) + len(tail) + + for uint64(totalBytes(batch)+headTailSize) > max { + batch = batch[:len(batch)-1] + } + + // batchLen before adding head and tail, to remove the correct number of transactions from the queue + batchLen := len(batch) + + // Add head and tail of the batch + if head != nil { + batch = append([][]byte{head}, batch...) + } + + if tail != nil { + batch = append(batch, tail) + } + + if len(batch) == 0 { + return sequencing.Batch{Transactions: nil} + } + + // Retrieve transactions from BadgerDB and remove processed ones + for i, tx := range batch { + txHash := GetTransactionHash(tx) + err := db.Update(func(txn *badger.Txn) error { + // Get and then delete the transaction from BadgerDB + _, err := txn.Get([]byte(txHash)) + if err != nil { + // If the transaction not found is the head or tail, skip it as they are not in the queue + if err == badger.ErrKeyNotFound && (i == 0 || i == len(batch)-1) { + return nil + } + return err + } + return txn.Delete([]byte(txHash)) // Remove processed transaction + }) + if err != nil { + return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails + } + } + tq.queue = tq.queue[batchLen:] + return sequencing.Batch{Transactions: batch} +} + +// LoadFromDB reloads all transactions from BadgerDB into the in-memory queue after a crash. +func (tq *TransactionQueue) LoadFromDB(db *badger.DB) error { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Start a read-only transaction + err := db.View(func(txn *badger.Txn) error { + // Create an iterator to go through all transactions stored in BadgerDB + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() // Ensure that the iterator is properly closed + + // Iterate through all items in the database + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(val []byte) error { + // Load each transaction from DB and add to the in-memory queue + tq.queue = append(tq.queue, val) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return err +} + +// AddBatchBackToQueue re-adds the batch to the transaction queue (and BadgerDB) after a failure. +func (tq *TransactionQueue) AddBatchBackToQueue(batch sequencing.Batch, db *badger.DB) error { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Add the batch back to the in-memory transaction queue + tq.queue = append(tq.queue, batch.Transactions...) + + // Optionally, persist the batch back to BadgerDB + for _, tx := range batch.Transactions { + err := db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(GetTransactionHash(tx)), tx) // Store transaction back in DB + }) + if err != nil { + return fmt.Errorf("failed to revert transaction to DB: %w", err) + } + } + + return nil +} + +func totalBytes(data [][]byte) int { + total := 0 + for _, sub := range data { + total += len(sub) + } + return total +} From 13dfccd643d12b0490bd793a8f674db4a9dfa711 Mon Sep 17 00:00:00 2001 From: Facundo Date: Tue, 4 Feb 2025 16:43:15 +0100 Subject: [PATCH 2/9] lint fix --- cmd/centralized-sequencer/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/centralized-sequencer/main.go b/cmd/centralized-sequencer/main.go index 29117f2..914f173 100644 --- a/cmd/centralized-sequencer/main.go +++ b/cmd/centralized-sequencer/main.go @@ -91,7 +91,7 @@ func main() { if err != nil { log.Fatalf("Failed to create metrics: %v", err) } - centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, metrics, db_path) + centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, metrics, db_path, nil) if err != nil { log.Fatalf("Failed to create centralized sequencer: %v", err) } From 270e90632b368d99d1a120436c3204010e536e30 Mon Sep 17 00:00:00 2001 From: Facundo Date: Tue, 4 Feb 2025 16:46:12 +0100 Subject: [PATCH 3/9] fix --- sequencing/transaction_queue.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sequencing/transaction_queue.go b/sequencing/transaction_queue.go index 7350e05..032cb21 100644 --- a/sequencing/transaction_queue.go +++ b/sequencing/transaction_queue.go @@ -3,6 +3,7 @@ package sequencing import ( "crypto/sha256" "encoding/hex" + "errors" "fmt" "sync" @@ -10,6 +11,7 @@ import ( "github.com/rollkit/go-sequencing" ) +// BatchExtender is an interface for extending a batch of transactions type BatchExtender interface { Head(max uint64) ([]byte, error) Tail(max uint64) ([]byte, error) @@ -99,7 +101,7 @@ func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.B _, err := txn.Get([]byte(txHash)) if err != nil { // If the transaction not found is the head or tail, skip it as they are not in the queue - if err == badger.ErrKeyNotFound && (i == 0 || i == len(batch)-1) { + if errors.Is(err, badger.ErrKeyNotFound) && (i == 0 || i == len(batch)-1) { return nil } return err From 4218fbf73d4bd12612a8d3a229ac274b0ab5af01 Mon Sep 17 00:00:00 2001 From: Facundo Date: Tue, 4 Feb 2025 16:47:41 +0100 Subject: [PATCH 4/9] annoying lint --- sequencing/transaction_queue.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sequencing/transaction_queue.go b/sequencing/transaction_queue.go index 032cb21..0e29fe0 100644 --- a/sequencing/transaction_queue.go +++ b/sequencing/transaction_queue.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/dgraph-io/badger/v3" + "github.com/rollkit/go-sequencing" ) From e32958054e568ce351bb05443b3c24426f7117db Mon Sep 17 00:00:00 2001 From: Facundo Date: Wed, 5 Feb 2025 12:49:39 +0100 Subject: [PATCH 5/9] upgrade to badger v4 --- go.mod | 2 +- go.sum | 11 ++--------- sequencing/sequencer.go | 2 +- sequencing/sequencer_test.go | 2 +- sequencing/transaction_queue.go | 2 +- 5 files changed, 6 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 95165de..c10fb44 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22 toolchain go1.22.3 require ( - github.com/dgraph-io/badger/v3 v3.2103.5 + github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581 github.com/go-kit/kit v0.13.0 github.com/gogo/protobuf v1.3.2 github.com/ipfs/go-log/v2 v2.5.1 diff --git a/go.sum b/go.sum index f4c9db0..ea859e6 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnN github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= -github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= -github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= +github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581 h1:yy45brf1ktmnkTCZlHynP1gRlVwZ9g19oz5D9wG81v4= +github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581/go.mod h1:T/uWAYxrXdaXw64ihI++9RMbKTCpKd/yE9+saARew7k= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= @@ -120,7 +120,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY= github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -151,7 +150,6 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= @@ -391,7 +389,6 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= -go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -447,7 +444,6 @@ golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -459,7 +455,6 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -503,13 +498,11 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6f google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index da80626..f2fdd7c 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -11,7 +11,7 @@ import ( "sync/atomic" "time" - "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v4" logging "github.com/ipfs/go-log/v2" "github.com/rollkit/centralized-sequencer/da" diff --git a/sequencing/sequencer_test.go b/sequencing/sequencer_test.go index 09b3b0b..30b0872 100644 --- a/sequencing/sequencer_test.go +++ b/sequencing/sequencer_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/sequencing/transaction_queue.go b/sequencing/transaction_queue.go index 0e29fe0..c64dc4d 100644 --- a/sequencing/transaction_queue.go +++ b/sequencing/transaction_queue.go @@ -7,7 +7,7 @@ import ( "fmt" "sync" - "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v4" "github.com/rollkit/go-sequencing" ) From 01a97b18123ed90fcf30e2d3a88c882851d22360 Mon Sep 17 00:00:00 2001 From: Facundo Date: Thu, 20 Feb 2025 13:58:43 +0100 Subject: [PATCH 6/9] fix db store --- sequencing/batch_queue.go | 140 ++++++++++++++++++++++++++++++++ sequencing/sequencer.go | 108 ++---------------------- sequencing/transaction_queue.go | 15 ++-- 3 files changed, 157 insertions(+), 106 deletions(-) create mode 100644 sequencing/batch_queue.go diff --git a/sequencing/batch_queue.go b/sequencing/batch_queue.go new file mode 100644 index 0000000..07a5b6d --- /dev/null +++ b/sequencing/batch_queue.go @@ -0,0 +1,140 @@ +package sequencing + +import ( + "sync" + + "github.com/dgraph-io/badger/v4" + "github.com/rollkit/go-sequencing" +) + +// BatchQueue ... +type BatchQueue struct { + queue []sequencing.Batch + mu sync.Mutex +} + +// NewBatchQueue creates a new BatchQueue +func NewBatchQueue() *BatchQueue { + return &BatchQueue{ + queue: make([]sequencing.Batch, 0), + } +} + +// AddBatch adds a new transaction to the queue +func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { + bq.mu.Lock() + bq.queue = append(bq.queue, batch) + bq.mu.Unlock() + + // Get the hash and bytes of the batch + h, err := batch.Hash() + if err != nil { + return err + } + + // Marshal the batch + batchBytes, err := batch.Marshal() + if err != nil { + return err + } + + // Store the batch in BadgerDB + err = db.Update(func(txn *badger.Txn) error { + key := append(keyPrefixBatch, h...) + return txn.Set(key, batchBytes) + }) + return err +} + +// AddBatch adds a new transaction to the queue +func (bq *BatchQueue) AddBatchToTheTop(batch sequencing.Batch, db *badger.DB) error { + bq.mu.Lock() + bq.queue = append([]sequencing.Batch{batch}, bq.queue...) + bq.mu.Unlock() + + // Get the hash and bytes of the batch + h, err := batch.Hash() + if err != nil { + return err + } + + // Marshal the batch + batchBytes, err := batch.Marshal() + if err != nil { + return err + } + + // Store the batch in BadgerDB + err = db.Update(func(txn *badger.Txn) error { + key := append(keyPrefixBatch, h...) + return txn.Set(key, batchBytes) + }) + return err +} + +// Next extracts a batch of transactions from the queue +func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { + bq.mu.Lock() + defer bq.mu.Unlock() + if len(bq.queue) == 0 { + return &sequencing.Batch{Transactions: nil}, nil + } + batch := bq.queue[0] + bq.queue = bq.queue[1:] + + h, err := batch.Hash() + if err != nil { + return &sequencing.Batch{Transactions: nil}, err + } + + // Remove the batch from BadgerDB after processing + err = db.Update(func(txn *badger.Txn) error { + // Get the batch to ensure it exists in the DB before deleting + key := append(keyPrefixBatch, h...) + _, err := txn.Get(key) + if err != nil { + return err + } + // Delete the batch from BadgerDB + return txn.Delete(h) + }) + if err != nil { + return &sequencing.Batch{Transactions: nil}, err + } + + return &batch, nil +} + +// LoadFromDB reloads all batches from BadgerDB into the in-memory queue after a crash or restart. +func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { + bq.mu.Lock() + defer bq.mu.Unlock() + + err := db.View(func(txn *badger.Txn) error { + // Create an iterator to go through all batches stored in BadgerDB + opts := badger.DefaultIteratorOptions + opts.Prefix = keyPrefixBatch + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(val []byte) error { + var batch sequencing.Batch + // Unmarshal the batch bytes and add them to the in-memory queue + err := batch.Unmarshal(val) + if err != nil { + return err + } + bq.queue = append(bq.queue, batch) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return err +} diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index f2fdd7c..02a3439 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -31,107 +31,13 @@ const defaultMempoolTTL = 25 var initialBackoff = 100 * time.Millisecond -// BatchQueue ... -type BatchQueue struct { - queue []sequencing.Batch - mu sync.Mutex -} - -// NewBatchQueue creates a new BatchQueue -func NewBatchQueue() *BatchQueue { - return &BatchQueue{ - queue: make([]sequencing.Batch, 0), - } -} - -// AddBatch adds a new transaction to the queue -func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { - bq.mu.Lock() - bq.queue = append(bq.queue, batch) - bq.mu.Unlock() - - // Get the hash and bytes of the batch - h, err := batch.Hash() - if err != nil { - return err - } - - // Marshal the batch - batchBytes, err := batch.Marshal() - if err != nil { - return err - } - - // Store the batch in BadgerDB - err = db.Update(func(txn *badger.Txn) error { - return txn.Set(h, batchBytes) - }) - return err -} - -// Next extracts a batch of transactions from the queue -func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { - bq.mu.Lock() - defer bq.mu.Unlock() - if len(bq.queue) == 0 { - return &sequencing.Batch{Transactions: nil}, nil - } - batch := bq.queue[0] - bq.queue = bq.queue[1:] - - h, err := batch.Hash() - if err != nil { - return &sequencing.Batch{Transactions: nil}, err - } - - // Remove the batch from BadgerDB after processing - err = db.Update(func(txn *badger.Txn) error { - // Get the batch to ensure it exists in the DB before deleting - _, err := txn.Get(h) - if err != nil { - return err - } - // Delete the batch from BadgerDB - return txn.Delete(h) - }) - if err != nil { - return &sequencing.Batch{Transactions: nil}, err - } - - return &batch, nil -} - -// LoadFromDB reloads all batches from BadgerDB into the in-memory queue after a crash or restart. -func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { - bq.mu.Lock() - defer bq.mu.Unlock() - - err := db.View(func(txn *badger.Txn) error { - // Create an iterator to go through all batches stored in BadgerDB - it := txn.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() - - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - err := item.Value(func(val []byte) error { - var batch sequencing.Batch - // Unmarshal the batch bytes and add them to the in-memory queue - err := batch.Unmarshal(val) - if err != nil { - return err - } - bq.queue = append(bq.queue, batch) - return nil - }) - if err != nil { - return err - } - } - return nil - }) - - return err -} +// Key prefixes for BadgerDB +var ( + keyLastBatchHash = []byte("lastBatchHash") + keyPrefixSeenBatch = []byte("seenBatch") + keyPrefixBatch = []byte("batch") + keyPrefixTx = []byte("tx") +) // Sequencer implements go-sequencing interface type Sequencer struct { diff --git a/sequencing/transaction_queue.go b/sequencing/transaction_queue.go index c64dc4d..1bb555a 100644 --- a/sequencing/transaction_queue.go +++ b/sequencing/transaction_queue.go @@ -47,7 +47,8 @@ func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) erro // Store transaction in BadgerDB err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) + key := append(keyPrefixTx, []byte(GetTransactionHash(tx))...) + return txn.Set(key, tx) }) return err } @@ -99,7 +100,8 @@ func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.B txHash := GetTransactionHash(tx) err := db.Update(func(txn *badger.Txn) error { // Get and then delete the transaction from BadgerDB - _, err := txn.Get([]byte(txHash)) + key := append(keyPrefixTx, []byte(txHash)...) + _, err := txn.Get(key) if err != nil { // If the transaction not found is the head or tail, skip it as they are not in the queue if errors.Is(err, badger.ErrKeyNotFound) && (i == 0 || i == len(batch)-1) { @@ -107,7 +109,7 @@ func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.B } return err } - return txn.Delete([]byte(txHash)) // Remove processed transaction + return txn.Delete(key) // Remove processed transaction }) if err != nil { return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails @@ -125,7 +127,9 @@ func (tq *TransactionQueue) LoadFromDB(db *badger.DB) error { // Start a read-only transaction err := db.View(func(txn *badger.Txn) error { // Create an iterator to go through all transactions stored in BadgerDB - it := txn.NewIterator(badger.DefaultIteratorOptions) + opts := badger.DefaultIteratorOptions + opts.Prefix = keyPrefixTx + it := txn.NewIterator(opts) defer it.Close() // Ensure that the iterator is properly closed // Iterate through all items in the database @@ -157,7 +161,8 @@ func (tq *TransactionQueue) AddBatchBackToQueue(batch sequencing.Batch, db *badg // Optionally, persist the batch back to BadgerDB for _, tx := range batch.Transactions { err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) // Store transaction back in DB + key := append(keyPrefixTx, []byte(GetTransactionHash(tx))...) + return txn.Set(key, tx) // Store transaction back in DB }) if err != nil { return fmt.Errorf("failed to revert transaction to DB: %w", err) From d24fde91c2b40d6bda10bf81b6a649394ba7f4c6 Mon Sep 17 00:00:00 2001 From: Facundo Date: Thu, 20 Feb 2025 14:05:18 +0100 Subject: [PATCH 7/9] fixes --- sequencing/batch_queue.go | 4 ++-- sequencing/sequencer.go | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sequencing/batch_queue.go b/sequencing/batch_queue.go index 07a5b6d..73bcd4d 100644 --- a/sequencing/batch_queue.go +++ b/sequencing/batch_queue.go @@ -20,7 +20,7 @@ func NewBatchQueue() *BatchQueue { } } -// AddBatch adds a new transaction to the queue +// AddBatch adds a new batch to the queue func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { bq.mu.Lock() bq.queue = append(bq.queue, batch) @@ -46,7 +46,7 @@ func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { return err } -// AddBatch adds a new transaction to the queue +// AddBatchToTheTop adds a new batch to the queue, at index 0 func (bq *BatchQueue) AddBatchToTheTop(batch sequencing.Batch, db *badger.DB) error { bq.mu.Lock() bq.queue = append([]sequencing.Batch{batch}, bq.queue...) diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index 02a3439..83eb376 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -188,7 +188,9 @@ func (c *Sequencer) LoadSeenBatchesFromDB() error { err := c.db.View(func(txn *badger.Txn) error { // Create an iterator to go through all entries in BadgerDB - it := txn.NewIterator(badger.DefaultIteratorOptions) + opts := badger.DefaultIteratorOptions + opts.Prefix = keyPrefixSeenBatch + it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { @@ -216,8 +218,9 @@ func (c *Sequencer) addSeenBatch(hash []byte) error { c.dbMux.Lock() defer c.dbMux.Unlock() + key := append([]byte(keyPrefixSeenBatch), hash...) return c.db.Update(func(txn *badger.Txn) error { - return txn.Set(hash, []byte{1}) // Just to mark the batch as seen + return txn.Set(key, []byte{1}) // Just to mark the batch as seen }) } @@ -433,7 +436,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req sequencing.GetNextBatc func (c *Sequencer) recover(batch sequencing.Batch, err error) (*sequencing.GetNextBatchResponse, error) { // Revert the batch if Hash() errors out by adding it back to the BatchQueue - revertErr := c.bq.AddBatch(batch, c.db) + revertErr := c.bq.AddBatchToTheTop(batch, c.db) if revertErr != nil { return nil, fmt.Errorf("failed to revert batch: %w", revertErr) } From 49c27faa619a0e271e042f508607bb2e3185569c Mon Sep 17 00:00:00 2001 From: Facundo Date: Thu, 20 Feb 2025 14:07:13 +0100 Subject: [PATCH 8/9] fix --- sequencing/batch_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencing/batch_queue.go b/sequencing/batch_queue.go index 73bcd4d..e4b58d4 100644 --- a/sequencing/batch_queue.go +++ b/sequencing/batch_queue.go @@ -96,7 +96,7 @@ func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { return err } // Delete the batch from BadgerDB - return txn.Delete(h) + return txn.Delete(key) }) if err != nil { return &sequencing.Batch{Transactions: nil}, err From 15e3e3c24a548db8c1217a768ba98a72bef92caf Mon Sep 17 00:00:00 2001 From: Facundo Date: Thu, 20 Feb 2025 14:11:27 +0100 Subject: [PATCH 9/9] fix --- sequencing/sequencer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index 83eb376..b69b911 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -161,7 +161,7 @@ func (c *Sequencer) LoadLastBatchHashFromDB() error { var hash []byte // Load the last batch hash from BadgerDB if it exists err := c.db.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte("lastBatchHash")) + item, err := txn.Get(keyLastBatchHash) if errors.Is(err, badger.ErrKeyNotFound) { // If no last batch hash exists, it's the first time or nothing was processed c.lastBatchHash = nil