Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cody/cbrotli #3

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.21

require (
github.com/BurntSushi/toml v1.3.2
github.com/DataDog/zstd v1.5.5
github.com/andybalholm/brotli v1.0.5
github.com/btcsuite/btcd v0.24.0
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0
github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4
Expand All @@ -17,6 +19,7 @@ require (
github.com/go-chi/chi/v5 v5.0.12
github.com/go-chi/docgen v1.2.0
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb
github.com/google/brotli/go/cbrotli v0.0.0-20240411182347-a813a6a1e436
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8
github.com/google/uuid v1.6.0
Expand Down Expand Up @@ -57,13 +60,11 @@ require (
require (
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect
github.com/CloudyKit/jet/v6 v6.2.0 // indirect
github.com/DataDog/zstd v1.5.2 // indirect
github.com/Joker/jade v1.1.3 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect
github.com/VictoriaMetrics/fastcache v1.12.1 // indirect
github.com/allegro/bigcache v1.2.1 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/CloudyKit/jet/v6 v6.2.0 h1:EpcZ6SR9n28BUGtNJSvlBqf90IpjeFr36Tizxhn/oM
github.com/CloudyKit/jet/v6 v6.2.0/go.mod h1:d3ypHeIRNo2+XyqnGA8s+aphtcVpjP5hPwP/Lzo7Ro4=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8=
github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Joker/hpp v1.0.0 h1:65+iuJYdRXv/XyN62C1uEmmOx3432rNG/rKlX6V7Kkc=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.1.3 h1:Qbeh12Vq6BxURXT1qZBRHsDxeURB8ztcL6f3EXSGeHk=
Expand Down Expand Up @@ -292,6 +292,8 @@ github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/gomarkdown/markdown v0.0.0-20230716120725-531d2d74bc12 h1:uK3X/2mt4tbSGoHvbLBHUny7CKiuwUip3MArtukol4E=
github.com/gomarkdown/markdown v0.0.0-20230716120725-531d2d74bc12/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA=
github.com/google/brotli/go/cbrotli v0.0.0-20240411182347-a813a6a1e436 h1:AeXhsYd0U2d7y0+lk44vuhKRiMB2FClBRJfZT2P8/Bo=
github.com/google/brotli/go/cbrotli v0.0.0-20240411182347-a813a6a1e436/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down
12 changes: 11 additions & 1 deletion op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi
}
var co derive.ChannelOut
if cfg.BatchType == derive.SpanBatchType {
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize)
co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo, cfg.CompressorConfig.BrotliQuality, cfg.CompressorConfig.BrotliWindow)
} else {
co, err = derive.NewSingularChannelOut(c)
}
Expand Down Expand Up @@ -406,3 +406,13 @@ func (c *ChannelBuilder) PushFrames(frames ...frameData) {
c.frames = append(c.frames, f)
}
}

// Reset resets the internal state of the channel builder so that it can be
// reused. Note that a new channel id is also generated by Reset.
func (c *ChannelBuilder) Reset() error {
c.blocks = c.blocks[:0]
c.frames = c.frames[:0]
c.timeout = 0
c.fullErr = nil
return c.co.Reset()
}
5 changes: 5 additions & 0 deletions op-batcher/compressor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ type Config struct {
// Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor
// will default to RatioKind.
Kind string

CompressionAlgo string

BrotliQuality int
BrotliWindow int
}

func (c Config) NewCompressor() (derive.Compressor, error) {
Expand Down
2 changes: 1 addition & 1 deletion op-batcher/compressor/shadow_compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ func (t *ShadowCompressor) Flush() error {

func (t *ShadowCompressor) FullErr() error {
return t.fullErr
}
}
154 changes: 132 additions & 22 deletions op-node/rollup/derive/span_channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"io"
"math/big"

"github.com/DataDog/zstd"
"github.com/google/brotli/go/cbrotli"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"

Expand All @@ -26,10 +29,6 @@ type SpanChannelOut struct {
// lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed
// it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression
lastCompressedRLPSize int
// compressed contains compressed data for making output frames
compressed *bytes.Buffer
// compress is the zlib writer for the channel
compressor *zlib.Writer
// target is the target size of the compressed data
target uint64
// closed indicates if the channel is closed
Expand All @@ -38,6 +37,23 @@ type SpanChannelOut struct {
full error
// spanBatch is the batch being built, which immutably holds genesis timestamp and chain ID, but otherwise can be reset
spanBatch *SpanBatch

// Compressor related
compressorAlgo string

// compressed contains compressed data for making output frames
zlibCompressed *bytes.Buffer
// compress is the zlib writer for the channel
zlibCompressor *zlib.Writer

brotliCompressed *bytes.Buffer
brotliCompressor *cbrotli.Writer

zstdCompressed *bytes.Buffer
zstdCompressor *zstd.Writer

brotliQuality int
brotliWindow int
}

func (co *SpanChannelOut) ID() ChannelID {
Expand All @@ -49,34 +65,76 @@ func (co *SpanChannelOut) setRandomID() error {
return err
}

func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64) (*SpanChannelOut, error) {
func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressorAlgo string, brotliQuality int, brotliWindow int) (*SpanChannelOut, error) {
c := &SpanChannelOut{
id: ChannelID{},
frame: 0,
spanBatch: NewSpanBatch(genesisTimestamp, chainID),
rlp: [2]*bytes.Buffer{{}, {}},
compressed: &bytes.Buffer{},
zlibCompressed: &bytes.Buffer{},
brotliCompressed: &bytes.Buffer{},
zstdCompressed: &bytes.Buffer{},
target: targetOutputSize,
compressorAlgo: compressorAlgo,
brotliQuality: brotliQuality,
brotliWindow: brotliWindow,
}
var err error
if err = c.setRandomID(); err != nil {
return nil, err
}
if c.compressor, err = zlib.NewWriterLevel(c.compressed, zlib.BestCompression); err != nil {

// zlib compressor
if c.zlibCompressor, err = zlib.NewWriterLevel(c.zlibCompressed, zlib.BestCompression); err != nil {
return nil, err
}

// brotli compressor
c.brotliCompressor = cbrotli.NewWriter(
c.brotliCompressed,
cbrotli.WriterOptions{
Quality: brotliQuality,
LGWin: brotliWindow,
},
)

// zstd compressor
c.zstdCompressor = zstd.NewWriterLevel(c.zstdCompressed, 22)


return c, nil
}

func (co *SpanChannelOut) compressorReset() {
if co.compressorAlgo == "zlib" {
co.zlibCompressed.Reset()
co.zlibCompressor.Reset(co.zlibCompressed)
} else if co.compressorAlgo == "brotli" {
co.brotliCompressed.Reset()
co.brotliCompressor = cbrotli.NewWriter(
co.brotliCompressed,
cbrotli.WriterOptions{
Quality: co.brotliQuality,
LGWin: co.brotliWindow,
},
)
} else if co.compressorAlgo == "zstd" {
co.zstdCompressed.Reset()
// no reset, start a new zstd compressor
co.zstdCompressor = zstd.NewWriterLevel(co.zstdCompressed, 22)
} else {
panic("unknown compressor")
}
}

func (co *SpanChannelOut) Reset() error {
co.closed = false
co.full = nil
co.frame = 0
co.rlp[0].Reset()
co.rlp[1].Reset()
co.lastCompressedRLPSize = 0
co.compressed.Reset()
co.compressor.Reset(co.compressed)
co.compressorReset()
co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID)
// setting the new randomID is the only part of the reset that can fail
return co.setRandomID()
Expand Down Expand Up @@ -153,7 +211,16 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// if the compressed data *plus* the new rlp data is under the target size, return early
// this optimizes out cases where the compressor will obviously come in under the target size
rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize
if uint64(co.compressed.Len()+rlpGrowth) < co.target {
var compressedLen int
if co.compressorAlgo == "zlib" {
compressedLen = co.zlibCompressed.Len()
} else if co.compressorAlgo == "brotli" {
compressedLen = co.brotliCompressed.Len()
} else if co.compressorAlgo == "zstd" {
compressedLen = co.zstdCompressed.Len()
}

if uint64(compressedLen+rlpGrowth) < co.target {
return nil
}

Expand Down Expand Up @@ -186,13 +253,29 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64)
// compress compresses the active RLP buffer and checks if the compressed data is over the target size.
// it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally.
func (co *SpanChannelOut) compress() error {
co.compressed.Reset()
co.compressor.Reset(co.compressed)
if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil {
return err
}
if err := co.compressor.Close(); err != nil {
return err
co.compressorReset()
// write out this whole implementation for 3 different compressors
if co.compressorAlgo == "zlib" {
if _, err := co.zlibCompressor.Write(co.activeRLP().Bytes()); err != nil {
return err
}
if err := co.zlibCompressor.Close(); err != nil {
return err
}
} else if co.compressorAlgo == "brotli" {
if _, err := co.brotliCompressor.Write(co.activeRLP().Bytes()); err != nil {
return err
}
if err := co.brotliCompressor.Close(); err != nil {
return err
}
} else if co.compressorAlgo == "zstd" {
if _, err := co.zstdCompressor.Write(co.activeRLP().Bytes()); err != nil {
return err
}
if err := co.zstdCompressor.Close(); err != nil {
return err
}
}
co.checkFull()
return nil
Expand All @@ -207,7 +290,13 @@ func (co *SpanChannelOut) InputBytes() int {
// Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full
func (co *SpanChannelOut) ReadyBytes() int {
if co.closed || co.FullErr() != nil {
return co.compressed.Len()
if co.compressorAlgo == "zlib" {
return co.zlibCompressed.Len()
} else if co.compressorAlgo == "brotli" {
return co.brotliCompressed.Len()
} else if co.compressorAlgo == "zstd" {
return co.zstdCompressed.Len()
}
}
return 0
}
Expand All @@ -225,8 +314,19 @@ func (co *SpanChannelOut) checkFull() {
if co.full != nil {
return
}
if uint64(co.compressed.Len()) >= co.target {
co.full = ErrCompressorFull

if co.compressorAlgo == "zlib" {
if uint64(co.zlibCompressed.Len()) >= co.target {
co.full = ErrCompressorFull
}
} else if co.compressorAlgo == "brotli" {
if uint64(co.brotliCompressed.Len()) >= co.target {
co.full = ErrCompressorFull
}
} else if co.compressorAlgo == "zstd" {
if uint64(co.zstdCompressed.Len()) >= co.target {
co.full = ErrCompressorFull
}
}
}

Expand Down Expand Up @@ -264,8 +364,18 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16,

f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize)

if _, err := io.ReadFull(co.compressed, f.Data); err != nil {
return 0, err
if co.compressorAlgo == "zlib" {
if _, err := io.ReadFull(co.zlibCompressed, f.Data); err != nil {
return 0, err
}
} else if co.compressorAlgo == "brotli" {
if _, err := io.ReadFull(co.brotliCompressed, f.Data); err != nil {
return 0, err
}
} else if co.compressorAlgo == "zstd" {
if _, err := io.ReadFull(co.zstdCompressed, f.Data); err != nil {
return 0, err
}
}

if err := f.MarshalBinary(w); err != nil {
Expand Down