diff --git a/cmd/centralized-sequencer/main.go b/cmd/centralized-sequencer/main.go index 29117f2..914f173 100644 --- a/cmd/centralized-sequencer/main.go +++ b/cmd/centralized-sequencer/main.go @@ -91,7 +91,7 @@ func main() { if err != nil { log.Fatalf("Failed to create metrics: %v", err) } - centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, metrics, db_path) + centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, metrics, db_path, nil) if err != nil { log.Fatalf("Failed to create centralized sequencer: %v", err) } diff --git a/go.mod b/go.mod index 34541ab..c5e4388 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22 toolchain go1.22.3 require ( - github.com/dgraph-io/badger/v3 v3.2103.5 + github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581 github.com/go-kit/kit v0.13.0 github.com/gogo/protobuf v1.3.2 github.com/ipfs/go-log/v2 v2.5.1 diff --git a/go.sum b/go.sum index 7b15b0a..a652fee 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnN github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= -github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= -github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= +github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581 h1:yy45brf1ktmnkTCZlHynP1gRlVwZ9g19oz5D9wG81v4= +github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581/go.mod h1:T/uWAYxrXdaXw64ihI++9RMbKTCpKd/yE9+saARew7k= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= @@ -120,7 +120,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY= github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -151,7 +150,6 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= @@ -391,7 +389,6 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= -go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -447,7 +444,6 @@ golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -459,7 +455,6 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -503,13 +498,11 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6f google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= diff --git a/sequencing/batch_queue.go b/sequencing/batch_queue.go new file mode 100644 index 0000000..e4b58d4 --- /dev/null +++ b/sequencing/batch_queue.go @@ -0,0 +1,140 @@ +package sequencing + +import ( + "sync" + + "github.com/dgraph-io/badger/v4" + "github.com/rollkit/go-sequencing" +) + +// BatchQueue ... +type BatchQueue struct { + queue []sequencing.Batch + mu sync.Mutex +} + +// NewBatchQueue creates a new BatchQueue +func NewBatchQueue() *BatchQueue { + return &BatchQueue{ + queue: make([]sequencing.Batch, 0), + } +} + +// AddBatch adds a new batch to the queue +func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { + bq.mu.Lock() + bq.queue = append(bq.queue, batch) + bq.mu.Unlock() + + // Get the hash and bytes of the batch + h, err := batch.Hash() + if err != nil { + return err + } + + // Marshal the batch + batchBytes, err := batch.Marshal() + if err != nil { + return err + } + + // Store the batch in BadgerDB + err = db.Update(func(txn *badger.Txn) error { + key := append(keyPrefixBatch, h...) + return txn.Set(key, batchBytes) + }) + return err +} + +// AddBatchToTheTop adds a new batch to the queue, at index 0 +func (bq *BatchQueue) AddBatchToTheTop(batch sequencing.Batch, db *badger.DB) error { + bq.mu.Lock() + bq.queue = append([]sequencing.Batch{batch}, bq.queue...) + bq.mu.Unlock() + + // Get the hash and bytes of the batch + h, err := batch.Hash() + if err != nil { + return err + } + + // Marshal the batch + batchBytes, err := batch.Marshal() + if err != nil { + return err + } + + // Store the batch in BadgerDB + err = db.Update(func(txn *badger.Txn) error { + key := append(keyPrefixBatch, h...) + return txn.Set(key, batchBytes) + }) + return err +} + +// Next extracts a batch of transactions from the queue +func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { + bq.mu.Lock() + defer bq.mu.Unlock() + if len(bq.queue) == 0 { + return &sequencing.Batch{Transactions: nil}, nil + } + batch := bq.queue[0] + bq.queue = bq.queue[1:] + + h, err := batch.Hash() + if err != nil { + return &sequencing.Batch{Transactions: nil}, err + } + + // Remove the batch from BadgerDB after processing + err = db.Update(func(txn *badger.Txn) error { + // Get the batch to ensure it exists in the DB before deleting + key := append(keyPrefixBatch, h...) + _, err := txn.Get(key) + if err != nil { + return err + } + // Delete the batch from BadgerDB + return txn.Delete(key) + }) + if err != nil { + return &sequencing.Batch{Transactions: nil}, err + } + + return &batch, nil +} + +// LoadFromDB reloads all batches from BadgerDB into the in-memory queue after a crash or restart. +func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { + bq.mu.Lock() + defer bq.mu.Unlock() + + err := db.View(func(txn *badger.Txn) error { + // Create an iterator to go through all batches stored in BadgerDB + opts := badger.DefaultIteratorOptions + opts.Prefix = keyPrefixBatch + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(val []byte) error { + var batch sequencing.Batch + // Unmarshal the batch bytes and add them to the in-memory queue + err := batch.Unmarshal(val) + if err != nil { + return err + } + bq.queue = append(bq.queue, batch) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return err +} diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index dd7c9bb..b69b911 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -3,7 +3,6 @@ package sequencing import ( "bytes" "context" - "crypto/sha256" "encoding/hex" "errors" "fmt" @@ -12,7 +11,7 @@ import ( "sync/atomic" "time" - "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v4" logging "github.com/ipfs/go-log/v2" "github.com/rollkit/centralized-sequencer/da" @@ -32,235 +31,13 @@ const defaultMempoolTTL = 25 var initialBackoff = 100 * time.Millisecond -// BatchQueue ... -type BatchQueue struct { - queue []sequencing.Batch - mu sync.Mutex -} - -// NewBatchQueue creates a new TransactionQueue -func NewBatchQueue() *BatchQueue { - return &BatchQueue{ - queue: make([]sequencing.Batch, 0), - } -} - -// AddBatch adds a new transaction to the queue -func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { - bq.mu.Lock() - bq.queue = append(bq.queue, batch) - bq.mu.Unlock() - - // Get the hash and bytes of the batch - h, err := batch.Hash() - if err != nil { - return err - } - - // Marshal the batch - batchBytes, err := batch.Marshal() - if err != nil { - return err - } - - // Store the batch in BadgerDB - err = db.Update(func(txn *badger.Txn) error { - return txn.Set(h, batchBytes) - }) - return err -} - -// Next extracts a batch of transactions from the queue -func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { - bq.mu.Lock() - defer bq.mu.Unlock() - if len(bq.queue) == 0 { - return &sequencing.Batch{Transactions: nil}, nil - } - batch := bq.queue[0] - bq.queue = bq.queue[1:] - - h, err := batch.Hash() - if err != nil { - return &sequencing.Batch{Transactions: nil}, err - } - - // Remove the batch from BadgerDB after processing - err = db.Update(func(txn *badger.Txn) error { - // Get the batch to ensure it exists in the DB before deleting - _, err := txn.Get(h) - if err != nil { - return err - } - // Delete the batch from BadgerDB - return txn.Delete(h) - }) - if err != nil { - return &sequencing.Batch{Transactions: nil}, err - } - - return &batch, nil -} - -// LoadFromDB reloads all batches from BadgerDB into the in-memory queue after a crash or restart. -func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { - bq.mu.Lock() - defer bq.mu.Unlock() - - err := db.View(func(txn *badger.Txn) error { - // Create an iterator to go through all batches stored in BadgerDB - it := txn.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() - - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - err := item.Value(func(val []byte) error { - var batch sequencing.Batch - // Unmarshal the batch bytes and add them to the in-memory queue - err := batch.Unmarshal(val) - if err != nil { - return err - } - bq.queue = append(bq.queue, batch) - return nil - }) - if err != nil { - return err - } - } - return nil - }) - - return err -} - -// TransactionQueue is a queue of transactions -type TransactionQueue struct { - queue []sequencing.Tx - mu sync.Mutex -} - -// NewTransactionQueue creates a new TransactionQueue -func NewTransactionQueue() *TransactionQueue { - return &TransactionQueue{ - queue: make([]sequencing.Tx, 0), - } -} - -// GetTransactionHash to get hash from transaction bytes using SHA-256 -func GetTransactionHash(txBytes []byte) string { - hashBytes := sha256.Sum256(txBytes) - return hex.EncodeToString(hashBytes[:]) -} - -// AddTransaction adds a new transaction to the queue -func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error { - tq.mu.Lock() - tq.queue = append(tq.queue, tx) - tq.mu.Unlock() - - // Store transaction in BadgerDB - err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) - }) - return err -} - -// GetNextBatch extracts a batch of transactions from the queue -func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { - tq.mu.Lock() - defer tq.mu.Unlock() - - var batch [][]byte - batchSize := len(tq.queue) - if batchSize == 0 { - return sequencing.Batch{Transactions: nil} - } - for { - batch = tq.queue[:batchSize] - blobSize := totalBytes(batch) - if uint64(blobSize) <= max { - break - } - batchSize = batchSize - 1 - } - - // Retrieve transactions from BadgerDB and remove processed ones - for _, tx := range batch { - txHash := GetTransactionHash(tx) - err := db.Update(func(txn *badger.Txn) error { - // Get and then delete the transaction from BadgerDB - _, err := txn.Get([]byte(txHash)) - if err != nil { - return err - } - return txn.Delete([]byte(txHash)) // Remove processed transaction - }) - if err != nil { - return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails - } - } - tq.queue = tq.queue[batchSize:] - return sequencing.Batch{Transactions: batch} -} - -// LoadFromDB reloads all transactions from BadgerDB into the in-memory queue after a crash. -func (tq *TransactionQueue) LoadFromDB(db *badger.DB) error { - tq.mu.Lock() - defer tq.mu.Unlock() - - // Start a read-only transaction - err := db.View(func(txn *badger.Txn) error { - // Create an iterator to go through all transactions stored in BadgerDB - it := txn.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() // Ensure that the iterator is properly closed - - // Iterate through all items in the database - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - err := item.Value(func(val []byte) error { - // Load each transaction from DB and add to the in-memory queue - tq.queue = append(tq.queue, val) - return nil - }) - if err != nil { - return err - } - } - return nil - }) - - return err -} - -// AddBatchBackToQueue re-adds the batch to the transaction queue (and BadgerDB) after a failure. -func (tq *TransactionQueue) AddBatchBackToQueue(batch sequencing.Batch, db *badger.DB) error { - tq.mu.Lock() - defer tq.mu.Unlock() - - // Add the batch back to the in-memory transaction queue - tq.queue = append(tq.queue, batch.Transactions...) - - // Optionally, persist the batch back to BadgerDB - for _, tx := range batch.Transactions { - err := db.Update(func(txn *badger.Txn) error { - return txn.Set([]byte(GetTransactionHash(tx)), tx) // Store transaction back in DB - }) - if err != nil { - return fmt.Errorf("failed to revert transaction to DB: %w", err) - } - } - - return nil -} - -func totalBytes(data [][]byte) int { - total := 0 - for _, sub := range data { - total += len(sub) - } - return total -} +// Key prefixes for BadgerDB +var ( + keyLastBatchHash = []byte("lastBatchHash") + keyPrefixSeenBatch = []byte("seenBatch") + keyPrefixBatch = []byte("batch") + keyPrefixTx = []byte("tx") +) // Sequencer implements go-sequencing interface type Sequencer struct { @@ -286,7 +63,7 @@ type Sequencer struct { } // NewSequencer ... -func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId []byte, batchTime time.Duration, metrics *Metrics, dbPath string) (*Sequencer, error) { +func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId []byte, batchTime time.Duration, metrics *Metrics, dbPath string, extender BatchExtender) (*Sequencer, error) { ctx := context.Background() dac, err := proxyda.NewClient(daAddress, daAuthToken) if err != nil { @@ -316,7 +93,7 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, rollupId [] ctx: ctx, maxSize: maxBlobSize, rollupId: rollupId, - tq: NewTransactionQueue(), + tq: NewTransactionQueue(extender), bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), db: db, @@ -384,7 +161,7 @@ func (c *Sequencer) LoadLastBatchHashFromDB() error { var hash []byte // Load the last batch hash from BadgerDB if it exists err := c.db.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte("lastBatchHash")) + item, err := txn.Get(keyLastBatchHash) if errors.Is(err, badger.ErrKeyNotFound) { // If no last batch hash exists, it's the first time or nothing was processed c.lastBatchHash = nil @@ -411,7 +188,9 @@ func (c *Sequencer) LoadSeenBatchesFromDB() error { err := c.db.View(func(txn *badger.Txn) error { // Create an iterator to go through all entries in BadgerDB - it := txn.NewIterator(badger.DefaultIteratorOptions) + opts := badger.DefaultIteratorOptions + opts.Prefix = keyPrefixSeenBatch + it := txn.NewIterator(opts) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { @@ -439,8 +218,9 @@ func (c *Sequencer) addSeenBatch(hash []byte) error { c.dbMux.Lock() defer c.dbMux.Unlock() + key := append([]byte(keyPrefixSeenBatch), hash...) return c.db.Update(func(txn *badger.Txn) error { - return txn.Set(hash, []byte{1}) // Just to mark the batch as seen + return txn.Set(key, []byte{1}) // Just to mark the batch as seen }) } @@ -656,7 +436,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req sequencing.GetNextBatc func (c *Sequencer) recover(batch sequencing.Batch, err error) (*sequencing.GetNextBatchResponse, error) { // Revert the batch if Hash() errors out by adding it back to the BatchQueue - revertErr := c.bq.AddBatch(batch, c.db) + revertErr := c.bq.AddBatchToTheTop(batch, c.db) if revertErr != nil { return nil, fmt.Errorf("failed to revert batch: %w", revertErr) } diff --git a/sequencing/sequencer_test.go b/sequencing/sequencer_test.go index 58be4e5..30b0872 100644 --- a/sequencing/sequencer_test.go +++ b/sequencing/sequencer_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -52,7 +52,7 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv func TestNewSequencer(t *testing.T) { // Create a new sequencer with mock DA client metrics, _ := NopMetrics() - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "") + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "", nil) require.NoError(t, err) defer func() { err := seq.Close() @@ -69,7 +69,7 @@ func TestNewSequencer(t *testing.T) { func TestSequencer_SubmitRollupTransaction(t *testing.T) { // Initialize a new sequencer metrics, _ := NopMetrics() - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "") + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), []byte("rollup1"), 10*time.Second, metrics, "", nil) require.NoError(t, err) defer func() { err := seq.Close() diff --git a/sequencing/transaction_queue.go b/sequencing/transaction_queue.go new file mode 100644 index 0000000..1bb555a --- /dev/null +++ b/sequencing/transaction_queue.go @@ -0,0 +1,181 @@ +package sequencing + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "sync" + + "github.com/dgraph-io/badger/v4" + + "github.com/rollkit/go-sequencing" +) + +// BatchExtender is an interface for extending a batch of transactions +type BatchExtender interface { + Head(max uint64) ([]byte, error) + Tail(max uint64) ([]byte, error) +} + +// TransactionQueue is a queue of transactions +type TransactionQueue struct { + queue []sequencing.Tx + mu sync.Mutex + extender BatchExtender +} + +// NewTransactionQueue creates a new TransactionQueue +func NewTransactionQueue(extender BatchExtender) *TransactionQueue { + return &TransactionQueue{ + queue: make([]sequencing.Tx, 0), + extender: extender, + } +} + +// GetTransactionHash to get hash from transaction bytes using SHA-256 +func GetTransactionHash(txBytes []byte) string { + hashBytes := sha256.Sum256(txBytes) + return hex.EncodeToString(hashBytes[:]) +} + +// AddTransaction adds a new transaction to the queue +func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error { + tq.mu.Lock() + tq.queue = append(tq.queue, tx) + tq.mu.Unlock() + + // Store transaction in BadgerDB + err := db.Update(func(txn *badger.Txn) error { + key := append(keyPrefixTx, []byte(GetTransactionHash(tx))...) + return txn.Set(key, tx) + }) + return err +} + +// GetNextBatch extracts a batch of transactions from the queue +func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Add head of batch if extender is provided, also ask for the tail of the batch + var head, tail []byte + if tq.extender != nil { + var err error + head, err = tq.extender.Head(max) + if err != nil { + return sequencing.Batch{Transactions: nil} + } + tail, err = tq.extender.Tail(max) + if err != nil { + return sequencing.Batch{Transactions: nil} + } + } + + batch := tq.queue + headTailSize := len(head) + len(tail) + + for uint64(totalBytes(batch)+headTailSize) > max { + batch = batch[:len(batch)-1] + } + + // batchLen before adding head and tail, to remove the correct number of transactions from the queue + batchLen := len(batch) + + // Add head and tail of the batch + if head != nil { + batch = append([][]byte{head}, batch...) + } + + if tail != nil { + batch = append(batch, tail) + } + + if len(batch) == 0 { + return sequencing.Batch{Transactions: nil} + } + + // Retrieve transactions from BadgerDB and remove processed ones + for i, tx := range batch { + txHash := GetTransactionHash(tx) + err := db.Update(func(txn *badger.Txn) error { + // Get and then delete the transaction from BadgerDB + key := append(keyPrefixTx, []byte(txHash)...) + _, err := txn.Get(key) + if err != nil { + // If the transaction not found is the head or tail, skip it as they are not in the queue + if errors.Is(err, badger.ErrKeyNotFound) && (i == 0 || i == len(batch)-1) { + return nil + } + return err + } + return txn.Delete(key) // Remove processed transaction + }) + if err != nil { + return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails + } + } + tq.queue = tq.queue[batchLen:] + return sequencing.Batch{Transactions: batch} +} + +// LoadFromDB reloads all transactions from BadgerDB into the in-memory queue after a crash. +func (tq *TransactionQueue) LoadFromDB(db *badger.DB) error { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Start a read-only transaction + err := db.View(func(txn *badger.Txn) error { + // Create an iterator to go through all transactions stored in BadgerDB + opts := badger.DefaultIteratorOptions + opts.Prefix = keyPrefixTx + it := txn.NewIterator(opts) + defer it.Close() // Ensure that the iterator is properly closed + + // Iterate through all items in the database + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(val []byte) error { + // Load each transaction from DB and add to the in-memory queue + tq.queue = append(tq.queue, val) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return err +} + +// AddBatchBackToQueue re-adds the batch to the transaction queue (and BadgerDB) after a failure. +func (tq *TransactionQueue) AddBatchBackToQueue(batch sequencing.Batch, db *badger.DB) error { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Add the batch back to the in-memory transaction queue + tq.queue = append(tq.queue, batch.Transactions...) + + // Optionally, persist the batch back to BadgerDB + for _, tx := range batch.Transactions { + err := db.Update(func(txn *badger.Txn) error { + key := append(keyPrefixTx, []byte(GetTransactionHash(tx))...) + return txn.Set(key, tx) // Store transaction back in DB + }) + if err != nil { + return fmt.Errorf("failed to revert transaction to DB: %w", err) + } + } + + return nil +} + +func totalBytes(data [][]byte) int { + total := 0 + for _, sub := range data { + total += len(sub) + } + return total +}