|
2 | 2 |
|
3 | 3 | use crate::{
|
4 | 4 | errors::PipelineError,
|
5 |
| - stages::{decompress_brotli, BatchStreamProvider}, |
| 5 | + stages::BatchStreamProvider, |
6 | 6 | traits::{OriginAdvancer, OriginProvider, SignalReceiver},
|
7 | 7 | types::{PipelineResult, Signal},
|
8 | 8 | };
|
9 |
| -use alloc::{boxed::Box, sync::Arc, vec::Vec}; |
| 9 | +use alloc::{boxed::Box, sync::Arc}; |
10 | 10 | use alloy_primitives::Bytes;
|
11 |
| -use alloy_rlp::Decodable; |
12 | 11 | use async_trait::async_trait;
|
13 | 12 | use core::fmt::Debug;
|
14 |
| -use miniz_oxide::inflate::decompress_to_vec_zlib; |
15 | 13 | use op_alloy_genesis::{
|
16 | 14 | RollupConfig, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK, MAX_RLP_BYTES_PER_CHANNEL_FJORD,
|
17 | 15 | };
|
18 |
| -use op_alloy_protocol::{Batch, BlockInfo}; |
19 |
| - |
20 |
| -/// ZLIB Deflate Compression Method. |
21 |
| -pub(crate) const ZLIB_DEFLATE_COMPRESSION_METHOD: u8 = 8; |
22 |
| - |
23 |
| -/// ZLIB Reserved Compression Info. |
24 |
| -pub(crate) const ZLIB_RESERVED_COMPRESSION_METHOD: u8 = 15; |
25 |
| - |
26 |
| -/// Brotili Compression Channel Version. |
27 |
| -pub(crate) const CHANNEL_VERSION_BROTLI: u8 = 1; |
| 16 | +use op_alloy_protocol::{Batch, BatchReader, BlockInfo}; |
| 17 | +use tracing::{debug, warn}; |
28 | 18 |
|
29 | 19 | /// The [ChannelReader] provider trait.
|
30 | 20 | #[async_trait]
|
@@ -171,88 +161,6 @@ where
|
171 | 161 | }
|
172 | 162 | }
|
173 | 163 |
|
174 |
| -/// Batch Reader provides a function that iteratively consumes batches from the reader. |
175 |
| -/// The L1Inclusion block is also provided at creation time. |
176 |
| -/// Warning: the batch reader can read every batch-type. |
177 |
| -/// The caller of the batch-reader should filter the results. |
178 |
| -#[derive(Debug)] |
179 |
| -pub(crate) struct BatchReader { |
180 |
| - /// The raw data to decode. |
181 |
| - data: Option<Vec<u8>>, |
182 |
| - /// Decompressed data. |
183 |
| - decompressed: Vec<u8>, |
184 |
| - /// The current cursor in the `decompressed` data. |
185 |
| - cursor: usize, |
186 |
| - /// The maximum RLP bytes per channel. |
187 |
| - max_rlp_bytes_per_channel: usize, |
188 |
| -} |
189 |
| - |
190 |
| -impl BatchReader { |
191 |
| - /// Creates a new [BatchReader] from the given data and max decompressed RLP bytes per channel. |
192 |
| - pub(crate) fn new<T>(data: T, max_rlp_bytes_per_channel: usize) -> Self |
193 |
| - where |
194 |
| - T: Into<Vec<u8>>, |
195 |
| - { |
196 |
| - Self { |
197 |
| - data: Some(data.into()), |
198 |
| - decompressed: Vec::new(), |
199 |
| - cursor: 0, |
200 |
| - max_rlp_bytes_per_channel, |
201 |
| - } |
202 |
| - } |
203 |
| - |
204 |
| - /// Pulls out the next batch from the reader. |
205 |
| - pub(crate) fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> { |
206 |
| - // If the data is not already decompressed, decompress it. |
207 |
| - let mut brotli_used = false; |
208 |
| - |
209 |
| - if let Some(data) = self.data.take() { |
210 |
| - // Peek at the data to determine the compression type. |
211 |
| - if data.is_empty() { |
212 |
| - warn!(target: "batch-reader", "Data is too short to determine compression type, skipping batch"); |
213 |
| - return None; |
214 |
| - } |
215 |
| - |
216 |
| - let compression_type = data[0]; |
217 |
| - if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD || |
218 |
| - (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD |
219 |
| - { |
220 |
| - self.decompressed = decompress_to_vec_zlib(&data).ok()?; |
221 |
| - |
222 |
| - // Check the size of the decompressed channel RLP. |
223 |
| - if self.decompressed.len() > self.max_rlp_bytes_per_channel { |
224 |
| - return None; |
225 |
| - } |
226 |
| - } else if compression_type == CHANNEL_VERSION_BROTLI { |
227 |
| - brotli_used = true; |
228 |
| - self.decompressed = |
229 |
| - decompress_brotli(&data[1..], self.max_rlp_bytes_per_channel).ok()?; |
230 |
| - } else { |
231 |
| - error!(target: "batch-reader", "Unsupported compression type: {:x}, skipping batch", compression_type); |
232 |
| - return None; |
233 |
| - } |
234 |
| - } |
235 |
| - |
236 |
| - // Decompress and RLP decode the batch data, before finally decoding the batch itself. |
237 |
| - let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref(); |
238 |
| - let bytes = Bytes::decode(decompressed_reader).ok()?; |
239 |
| - let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else { |
240 |
| - error!(target: "batch-reader", "Failed to decode batch, skipping batch"); |
241 |
| - return None; |
242 |
| - }; |
243 |
| - |
244 |
| - // Confirm that brotli decompression was performed *after* the Fjord hardfork. |
245 |
| - if brotli_used && !cfg.is_fjord_active(batch.timestamp()) { |
246 |
| - warn!(target: "batch-reader", "Brotli compression used before Fjord hardfork, skipping batch"); |
247 |
| - return None; |
248 |
| - } |
249 |
| - |
250 |
| - // Advance the cursor on the reader. |
251 |
| - self.cursor = self.decompressed.len() - decompressed_reader.len(); |
252 |
| - Some(batch) |
253 |
| - } |
254 |
| -} |
255 |
| - |
256 | 164 | #[cfg(test)]
|
257 | 165 | mod test {
|
258 | 166 | use super::*;
|
@@ -334,24 +242,6 @@ mod test {
|
334 | 242 | assert!(reader.next_batch.is_some());
|
335 | 243 | }
|
336 | 244 |
|
337 |
| - #[test] |
338 |
| - fn test_batch_reader() { |
339 |
| - let raw = new_compressed_batch_data(); |
340 |
| - let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len(); |
341 |
| - let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_BEDROCK as usize); |
342 |
| - reader.next_batch(&RollupConfig::default()).unwrap(); |
343 |
| - assert_eq!(reader.cursor, decompressed_len); |
344 |
| - } |
345 |
| - |
346 |
| - #[test] |
347 |
| - fn test_batch_reader_fjord() { |
348 |
| - let raw = new_compressed_batch_data(); |
349 |
| - let decompressed_len = decompress_to_vec_zlib(&raw).unwrap().len(); |
350 |
| - let mut reader = BatchReader::new(raw, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize); |
351 |
| - reader.next_batch(&RollupConfig { fjord_time: Some(0), ..Default::default() }).unwrap(); |
352 |
| - assert_eq!(reader.cursor, decompressed_len); |
353 |
| - } |
354 |
| - |
355 | 245 | #[tokio::test]
|
356 | 246 | async fn test_flush_post_holocene() {
|
357 | 247 | let raw = new_compressed_batch_data();
|
|
0 commit comments