diff --git a/go.mod b/go.mod index c2a8f7733a20..4c1b13be81ad 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.21 require ( github.com/BurntSushi/toml v1.3.2 + github.com/DataDog/zstd v1.5.5 + github.com/andybalholm/brotli v1.0.5 github.com/btcsuite/btcd v0.24.0 github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 github.com/cockroachdb/pebble v0.0.0-20231018212520-f6cde3fc2fa4 @@ -17,6 +19,7 @@ require ( github.com/go-chi/chi/v5 v5.0.12 github.com/go-chi/docgen v1.2.0 github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb + github.com/google/brotli/go/cbrotli v0.0.0-20240411182347-a813a6a1e436 github.com/google/go-cmp v0.6.0 github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8 github.com/google/uuid v1.6.0 @@ -57,13 +60,11 @@ require ( require ( github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53 // indirect github.com/CloudyKit/jet/v6 v6.2.0 // indirect - github.com/DataDog/zstd v1.5.2 // indirect github.com/Joker/jade v1.1.3 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect github.com/Shopify/goreferrer v0.0.0-20220729165902-8cddb4f5de06 // indirect github.com/VictoriaMetrics/fastcache v1.12.1 // indirect github.com/allegro/bigcache v1.2.1 // indirect - github.com/andybalholm/brotli v1.0.5 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/aymerick/douceur v0.2.0 // indirect github.com/benbjohnson/clock v1.3.5 // indirect diff --git a/go.sum b/go.sum index 4681bd373b2b..60d26d5659ff 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/CloudyKit/jet/v6 v6.2.0 h1:EpcZ6SR9n28BUGtNJSvlBqf90IpjeFr36Tizxhn/oM github.com/CloudyKit/jet/v6 v6.2.0/go.mod h1:d3ypHeIRNo2+XyqnGA8s+aphtcVpjP5hPwP/Lzo7Ro4= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= -github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= -github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= +github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Joker/hpp v1.0.0 h1:65+iuJYdRXv/XyN62C1uEmmOx3432rNG/rKlX6V7Kkc= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.1.3 h1:Qbeh12Vq6BxURXT1qZBRHsDxeURB8ztcL6f3EXSGeHk= @@ -292,6 +292,8 @@ github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb/go.mod h1:/XxbfmMg github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/gomarkdown/markdown v0.0.0-20230716120725-531d2d74bc12 h1:uK3X/2mt4tbSGoHvbLBHUny7CKiuwUip3MArtukol4E= github.com/gomarkdown/markdown v0.0.0-20230716120725-531d2d74bc12/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= +github.com/google/brotli/go/cbrotli v0.0.0-20240411182347-a813a6a1e436 h1:AeXhsYd0U2d7y0+lk44vuhKRiMB2FClBRJfZT2P8/Bo= +github.com/google/brotli/go/cbrotli v0.0.0-20240411182347-a813a6a1e436/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index e364570d48b0..39cfa49eddc6 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -86,7 +86,7 @@ func NewChannelBuilder(cfg ChannelConfig, rollupCfg rollup.Config, latestL1Origi } var co derive.ChannelOut if cfg.BatchType == derive.SpanBatchType { - co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize) + co, err = derive.NewSpanChannelOut(rollupCfg.Genesis.L2Time, rollupCfg.L2ChainID, cfg.CompressorConfig.TargetOutputSize, cfg.CompressorConfig.CompressionAlgo, cfg.CompressorConfig.BrotliQuality, cfg.CompressorConfig.BrotliWindow) } else { co, err = derive.NewSingularChannelOut(c) } @@ -406,3 +406,13 @@ func (c *ChannelBuilder) PushFrames(frames ...frameData) { c.frames = append(c.frames, f) } } + +// Reset resets the internal state of the channel builder so that it can be +// reused. Note that a new channel id is also generated by Reset. +func (c *ChannelBuilder) Reset() error { + c.blocks = c.blocks[:0] + c.frames = c.frames[:0] + c.timeout = 0 + c.fullErr = nil + return c.co.Reset() +} diff --git a/op-batcher/compressor/config.go b/op-batcher/compressor/config.go index 8befc43812ae..d47db0affa54 100644 --- a/op-batcher/compressor/config.go +++ b/op-batcher/compressor/config.go @@ -16,6 +16,11 @@ type Config struct { // Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor // will default to RatioKind. Kind string + + CompressionAlgo string + + BrotliQuality int + BrotliWindow int } func (c Config) NewCompressor() (derive.Compressor, error) { diff --git a/op-batcher/compressor/shadow_compressor.go b/op-batcher/compressor/shadow_compressor.go index 7e6172460392..7731761b3727 100644 --- a/op-batcher/compressor/shadow_compressor.go +++ b/op-batcher/compressor/shadow_compressor.go @@ -115,4 +115,4 @@ func (t *ShadowCompressor) Flush() error { func (t *ShadowCompressor) FullErr() error { return t.fullErr -} +} \ No newline at end of file diff --git a/op-node/rollup/derive/span_channel_out.go b/op-node/rollup/derive/span_channel_out.go index e549e862e6bf..a20925b7295b 100644 --- a/op-node/rollup/derive/span_channel_out.go +++ b/op-node/rollup/derive/span_channel_out.go @@ -8,6 +8,9 @@ import ( "io" "math/big" + "github.com/DataDog/zstd" + "github.com/google/brotli/go/cbrotli" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" @@ -26,10 +29,6 @@ type SpanChannelOut struct { // lastCompressedRLPSize tracks the *uncompressed* size of the last RLP buffer that was compressed // it is used to measure the growth of the RLP buffer when adding a new batch to optimize compression lastCompressedRLPSize int - // compressed contains compressed data for making output frames - compressed *bytes.Buffer - // compress is the zlib writer for the channel - compressor *zlib.Writer // target is the target size of the compressed data target uint64 // closed indicates if the channel is closed @@ -38,6 +37,23 @@ type SpanChannelOut struct { full error // spanBatch is the batch being built, which immutably holds genesis timestamp and chain ID, but otherwise can be reset spanBatch *SpanBatch + + // Compressor related + compressorAlgo string + + // compressed contains compressed data for making output frames + zlibCompressed *bytes.Buffer + // compress is the zlib writer for the channel + zlibCompressor *zlib.Writer + + brotliCompressed *bytes.Buffer + brotliCompressor *cbrotli.Writer + + zstdCompressed *bytes.Buffer + zstdCompressor *zstd.Writer + + brotliQuality int + brotliWindow int } func (co *SpanChannelOut) ID() ChannelID { @@ -49,25 +65,68 @@ func (co *SpanChannelOut) setRandomID() error { return err } -func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64) (*SpanChannelOut, error) { +func NewSpanChannelOut(genesisTimestamp uint64, chainID *big.Int, targetOutputSize uint64, compressorAlgo string, brotliQuality int, brotliWindow int) (*SpanChannelOut, error) { c := &SpanChannelOut{ id: ChannelID{}, frame: 0, spanBatch: NewSpanBatch(genesisTimestamp, chainID), rlp: [2]*bytes.Buffer{{}, {}}, - compressed: &bytes.Buffer{}, + zlibCompressed: &bytes.Buffer{}, + brotliCompressed: &bytes.Buffer{}, + zstdCompressed: &bytes.Buffer{}, target: targetOutputSize, + compressorAlgo: compressorAlgo, + brotliQuality: brotliQuality, + brotliWindow: brotliWindow, } var err error if err = c.setRandomID(); err != nil { return nil, err } - if c.compressor, err = zlib.NewWriterLevel(c.compressed, zlib.BestCompression); err != nil { + + // zlib compressor + if c.zlibCompressor, err = zlib.NewWriterLevel(c.zlibCompressed, zlib.BestCompression); err != nil { return nil, err } + + // brotli compressor + c.brotliCompressor = cbrotli.NewWriter( + c.brotliCompressed, + cbrotli.WriterOptions{ + Quality: brotliQuality, + LGWin: brotliWindow, + }, + ) + + // zstd compressor + c.zstdCompressor = zstd.NewWriterLevel(c.zstdCompressed, 22) + + return c, nil } +func (co *SpanChannelOut) compressorReset() { + if co.compressorAlgo == "zlib" { + co.zlibCompressed.Reset() + co.zlibCompressor.Reset(co.zlibCompressed) + } else if co.compressorAlgo == "brotli" { + co.brotliCompressed.Reset() + co.brotliCompressor = cbrotli.NewWriter( + co.brotliCompressed, + cbrotli.WriterOptions{ + Quality: co.brotliQuality, + LGWin: co.brotliWindow, + }, + ) + } else if co.compressorAlgo == "zstd" { + co.zstdCompressed.Reset() + // no reset, start a new zstd compressor + co.zstdCompressor = zstd.NewWriterLevel(co.zstdCompressed, 22) + } else { + panic("unknown compressor") + } +} + func (co *SpanChannelOut) Reset() error { co.closed = false co.full = nil @@ -75,8 +134,7 @@ func (co *SpanChannelOut) Reset() error { co.rlp[0].Reset() co.rlp[1].Reset() co.lastCompressedRLPSize = 0 - co.compressed.Reset() - co.compressor.Reset(co.compressed) + co.compressorReset() co.spanBatch = NewSpanBatch(co.spanBatch.GenesisTimestamp, co.spanBatch.ChainID) // setting the new randomID is the only part of the reset that can fail return co.setRandomID() @@ -153,7 +211,16 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) // if the compressed data *plus* the new rlp data is under the target size, return early // this optimizes out cases where the compressor will obviously come in under the target size rlpGrowth := co.activeRLP().Len() - co.lastCompressedRLPSize - if uint64(co.compressed.Len()+rlpGrowth) < co.target { + var compressedLen int + if co.compressorAlgo == "zlib" { + compressedLen = co.zlibCompressed.Len() + } else if co.compressorAlgo == "brotli" { + compressedLen = co.brotliCompressed.Len() + } else if co.compressorAlgo == "zstd" { + compressedLen = co.zstdCompressed.Len() + } + + if uint64(compressedLen+rlpGrowth) < co.target { return nil } @@ -186,13 +253,29 @@ func (co *SpanChannelOut) AddSingularBatch(batch *SingularBatch, seqNum uint64) // compress compresses the active RLP buffer and checks if the compressed data is over the target size. // it resets all the compression buffers because Span Batches aren't meant to be compressed incrementally. func (co *SpanChannelOut) compress() error { - co.compressed.Reset() - co.compressor.Reset(co.compressed) - if _, err := co.compressor.Write(co.activeRLP().Bytes()); err != nil { - return err - } - if err := co.compressor.Close(); err != nil { - return err + co.compressorReset() + // write out this whole implementation for 3 different compressors + if co.compressorAlgo == "zlib" { + if _, err := co.zlibCompressor.Write(co.activeRLP().Bytes()); err != nil { + return err + } + if err := co.zlibCompressor.Close(); err != nil { + return err + } + } else if co.compressorAlgo == "brotli" { + if _, err := co.brotliCompressor.Write(co.activeRLP().Bytes()); err != nil { + return err + } + if err := co.brotliCompressor.Close(); err != nil { + return err + } + } else if co.compressorAlgo == "zstd" { + if _, err := co.zstdCompressor.Write(co.activeRLP().Bytes()); err != nil { + return err + } + if err := co.zstdCompressor.Close(); err != nil { + return err + } } co.checkFull() return nil @@ -207,7 +290,13 @@ func (co *SpanChannelOut) InputBytes() int { // Span Channel Out does not provide early output, so this will always be 0 until the channel is closed or full func (co *SpanChannelOut) ReadyBytes() int { if co.closed || co.FullErr() != nil { - return co.compressed.Len() + if co.compressorAlgo == "zlib" { + return co.zlibCompressed.Len() + } else if co.compressorAlgo == "brotli" { + return co.brotliCompressed.Len() + } else if co.compressorAlgo == "zstd" { + return co.zstdCompressed.Len() + } } return 0 } @@ -225,8 +314,19 @@ func (co *SpanChannelOut) checkFull() { if co.full != nil { return } - if uint64(co.compressed.Len()) >= co.target { - co.full = ErrCompressorFull + + if co.compressorAlgo == "zlib" { + if uint64(co.zlibCompressed.Len()) >= co.target { + co.full = ErrCompressorFull + } + } else if co.compressorAlgo == "brotli" { + if uint64(co.brotliCompressed.Len()) >= co.target { + co.full = ErrCompressorFull + } + } else if co.compressorAlgo == "zstd" { + if uint64(co.zstdCompressed.Len()) >= co.target { + co.full = ErrCompressorFull + } } } @@ -264,8 +364,18 @@ func (co *SpanChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, f := createEmptyFrame(co.id, co.frame, co.ReadyBytes(), co.closed, maxSize) - if _, err := io.ReadFull(co.compressed, f.Data); err != nil { - return 0, err + if co.compressorAlgo == "zlib" { + if _, err := io.ReadFull(co.zlibCompressed, f.Data); err != nil { + return 0, err + } + } else if co.compressorAlgo == "brotli" { + if _, err := io.ReadFull(co.brotliCompressed, f.Data); err != nil { + return 0, err + } + } else if co.compressorAlgo == "zstd" { + if _, err := io.ReadFull(co.zstdCompressed, f.Data); err != nil { + return 0, err + } } if err := f.MarshalBinary(w); err != nil {