Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add batch extender + persistent fixes #45

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/centralized-sequencer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
if err != nil {
log.Fatalf("Failed to create metrics: %v", err)
}
centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, metrics, db_path)
centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, []byte(rollupId), batchTime, metrics, db_path, nil)

Check warning on line 94 in cmd/centralized-sequencer/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/centralized-sequencer/main.go#L94

Added line #L94 was not covered by tests
if err != nil {
log.Fatalf("Failed to create centralized sequencer: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.22
toolchain go1.22.3

require (
github.com/dgraph-io/badger/v3 v3.2103.5
github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581
github.com/go-kit/kit v0.13.0
github.com/gogo/protobuf v1.3.2
github.com/ipfs/go-log/v2 v2.5.1
Expand Down
11 changes: 2 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnN
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o=
github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk=
github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg=
github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw=
github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581 h1:yy45brf1ktmnkTCZlHynP1gRlVwZ9g19oz5D9wG81v4=
github.com/dgraph-io/badger/v4 v4.2.1-0.20231013074411-fb1b00959581/go.mod h1:T/uWAYxrXdaXw64ihI++9RMbKTCpKd/yE9+saARew7k=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
Expand Down Expand Up @@ -120,7 +120,6 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY=
github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -151,7 +150,6 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
Expand Down Expand Up @@ -391,7 +389,6 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0=
go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down Expand Up @@ -447,7 +444,6 @@ golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -459,7 +455,6 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -503,13 +498,11 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6f
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
Expand Down
140 changes: 140 additions & 0 deletions sequencing/batch_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package sequencing

import (
"sync"

"github.com/dgraph-io/badger/v4"

Check failure on line 6 in sequencing/batch_queue.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

File is not properly formatted (goimports)

Check failure on line 6 in sequencing/batch_queue.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

File is not properly formatted (goimports)
"github.com/rollkit/go-sequencing"
)

// BatchQueue ...
type BatchQueue struct {
queue []sequencing.Batch
mu sync.Mutex
}

// NewBatchQueue creates a new BatchQueue
func NewBatchQueue() *BatchQueue {
return &BatchQueue{
queue: make([]sequencing.Batch, 0),
}
}

// AddBatch adds a new transaction to the queue
func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error {
bq.mu.Lock()
bq.queue = append(bq.queue, batch)
bq.mu.Unlock()

// Get the hash and bytes of the batch
h, err := batch.Hash()
if err != nil {
return err
}

Check warning on line 33 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L32-L33

Added lines #L32 - L33 were not covered by tests

// Marshal the batch
batchBytes, err := batch.Marshal()
if err != nil {
return err
}

Check warning on line 39 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L38-L39

Added lines #L38 - L39 were not covered by tests

// Store the batch in BadgerDB
err = db.Update(func(txn *badger.Txn) error {
key := append(keyPrefixBatch, h...)
return txn.Set(key, batchBytes)
})
return err
}

// AddBatch adds a new transaction to the queue

Check failure on line 49 in sequencing/batch_queue.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: comment on exported method BatchQueue.AddBatchToTheTop should be of the form "AddBatchToTheTop ..." (revive)

Check failure on line 49 in sequencing/batch_queue.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

exported: comment on exported method BatchQueue.AddBatchToTheTop should be of the form "AddBatchToTheTop ..." (revive)
func (bq *BatchQueue) AddBatchToTheTop(batch sequencing.Batch, db *badger.DB) error {
bq.mu.Lock()
bq.queue = append([]sequencing.Batch{batch}, bq.queue...)
bq.mu.Unlock()

// Get the hash and bytes of the batch
h, err := batch.Hash()
if err != nil {
return err
}

Check warning on line 59 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L50-L59

Added lines #L50 - L59 were not covered by tests

// Marshal the batch
batchBytes, err := batch.Marshal()
if err != nil {
return err
}

Check warning on line 65 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L62-L65

Added lines #L62 - L65 were not covered by tests

// Store the batch in BadgerDB
err = db.Update(func(txn *badger.Txn) error {
key := append(keyPrefixBatch, h...)
return txn.Set(key, batchBytes)
})
return err

Check warning on line 72 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L68-L72

Added lines #L68 - L72 were not covered by tests
}

// Next extracts a batch of transactions from the queue
func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) {
bq.mu.Lock()
defer bq.mu.Unlock()
if len(bq.queue) == 0 {
return &sequencing.Batch{Transactions: nil}, nil
}
batch := bq.queue[0]
bq.queue = bq.queue[1:]

h, err := batch.Hash()
if err != nil {
return &sequencing.Batch{Transactions: nil}, err
}

Check warning on line 88 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L87-L88

Added lines #L87 - L88 were not covered by tests

// Remove the batch from BadgerDB after processing
err = db.Update(func(txn *badger.Txn) error {
// Get the batch to ensure it exists in the DB before deleting
key := append(keyPrefixBatch, h...)
_, err := txn.Get(key)
if err != nil {
return err
}

Check warning on line 97 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L96-L97

Added lines #L96 - L97 were not covered by tests
// Delete the batch from BadgerDB
return txn.Delete(h)
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect key in batch deletion.

The method uses h instead of key when deleting the batch from BadgerDB, which will cause the deletion to fail.

     err = db.Update(func(txn *badger.Txn) error {
         // Get the batch to ensure it exists in the DB before deleting
         key := append(keyPrefixBatch, h...)
         _, err := txn.Get(key)
         if err != nil {
             return err
         }
         // Delete the batch from BadgerDB
-        return txn.Delete(h)
+        return txn.Delete(key)
     })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = db.Update(func(txn *badger.Txn) error {
// Get the batch to ensure it exists in the DB before deleting
key := append(keyPrefixBatch, h...)
_, err := txn.Get(key)
if err != nil {
return err
}
// Delete the batch from BadgerDB
return txn.Delete(h)
})
err = db.Update(func(txn *badger.Txn) error {
// Get the batch to ensure it exists in the DB before deleting
key := append(keyPrefixBatch, h...)
_, err := txn.Get(key)
if err != nil {
return err
}
// Delete the batch from BadgerDB
return txn.Delete(key)
})

if err != nil {
return &sequencing.Batch{Transactions: nil}, err
}

Check warning on line 103 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L102-L103

Added lines #L102 - L103 were not covered by tests

return &batch, nil
}

// LoadFromDB reloads all batches from BadgerDB into the in-memory queue after a crash or restart.
func (bq *BatchQueue) LoadFromDB(db *badger.DB) error {
bq.mu.Lock()
defer bq.mu.Unlock()

err := db.View(func(txn *badger.Txn) error {
// Create an iterator to go through all batches stored in BadgerDB
opts := badger.DefaultIteratorOptions
opts.Prefix = keyPrefixBatch
it := txn.NewIterator(opts)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
err := item.Value(func(val []byte) error {
var batch sequencing.Batch
// Unmarshal the batch bytes and add them to the in-memory queue
err := batch.Unmarshal(val)
if err != nil {
return err
}
bq.queue = append(bq.queue, batch)
return nil

Check warning on line 130 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L121-L130

Added lines #L121 - L130 were not covered by tests
})
if err != nil {
return err
}

Check warning on line 134 in sequencing/batch_queue.go

View check run for this annotation

Codecov / codecov/patch

sequencing/batch_queue.go#L132-L134

Added lines #L132 - L134 were not covered by tests
}
return nil
})

return err
}
Loading
Loading