From 8f5c9010c15bdbdc81cc36e78d6c90146d47aaa1 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Mon, 29 Jan 2024 13:13:57 +0100 Subject: [PATCH] Added offset option to parquet extractor (#940) --- .../ETL/Adapter/Parquet/ParquetExtractor.php | 10 +++++-- .../Integration/ParquetExtractorTest.php | 21 +++++++++++++-- .../Tests/Unit/ParquetExtractorTest.php | 26 +++++++++++++++++++ .../ColumnChunkStatisticsTest.php | 2 +- 4 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetExtractorTest.php rename src/{adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit => lib/parquet/tests/Flow/Parquet/Tests/Integration}/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php (99%) diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php index 45a3f86a7..4fdc2fd58 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php @@ -3,6 +3,7 @@ namespace Flow\ETL\Adapter\Parquet; use function Flow\ETL\DSL\array_to_rows; +use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Extractor; use Flow\ETL\Extractor\FileExtractor; use Flow\ETL\Extractor\Limitable; @@ -32,9 +33,14 @@ public function __construct( private readonly Path $path, private readonly Options $options, private readonly ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN, - private readonly array $columns = [] + private readonly array $columns = [], + private readonly ?int $offset = null ) { $this->resetLimit(); + + if ($this->path->isPattern() && $this->offset !== null) { + throw new InvalidArgumentException('Offset can be used only with single file path, not with pattern'); + } } public function extract(FlowContext $context) : \Generator @@ -42,7 +48,7 @@ public function extract(FlowContext $context) : \Generator $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); foreach ($this->readers($context) as $fileData) { - foreach ($fileData['file']->values($this->columns, $this->limit()) as $row) { + foreach ($fileData['file']->values($this->columns, $this->limit(), $this->offset) as $row) { if ($shouldPutInputIntoRows) { $row['_input_file_uri'] = $fileData['uri']; } diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetExtractorTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetExtractorTest.php index 6f8ea591e..34fcf8f3e 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetExtractorTest.php +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetExtractorTest.php @@ -11,13 +11,14 @@ use Flow\ETL\Flow; use Flow\ETL\FlowContext; use Flow\Parquet\Options; +use Flow\Parquet\Reader; use PHPUnit\Framework\TestCase; final class ParquetExtractorTest extends TestCase { public function test_limit() : void { - $path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.csv'; + $path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.parquet'; if (\file_exists($path)) { \unlink($path); @@ -36,9 +37,25 @@ public function test_limit() : void ); } + public function test_reading_file_from_given_offset() : void + { + $totalRows = (new Reader())->read(__DIR__ . '/../Fixtures/orders_flow.parquet')->metadata()->rowsNumber(); + + $extractor = new ParquetExtractor( + Path::realpath(__DIR__ . '/../Fixtures/orders_flow.parquet'), + Options::default(), + offset: $totalRows - 100 + ); + + $this->assertCount( + 100, + \iterator_to_array($extractor->extract(new FlowContext(Config::default()))) + ); + } + public function test_signal_stop() : void { - $path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.csv'; + $path = \sys_get_temp_dir() . '/parquet_extractor_signal_stop.parquet'; if (\file_exists($path)) { \unlink($path); diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetExtractorTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetExtractorTest.php new file mode 100644 index 000000000..f9c6eae57 --- /dev/null +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetExtractorTest.php @@ -0,0 +1,26 @@ +expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Offset can be used only with single file path, not with pattern'); + + new ParquetExtractor( + new Path('/tmp/*.parquet'), + Options::default(), + offset: 100 + ); + } +} diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php similarity index 99% rename from src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php rename to src/lib/parquet/tests/Flow/Parquet/Tests/Integration/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php index fddecd836..08e1f7f2b 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/ParquetFile/RowGroupBuilder/ColumnChunkStatisticsTest.php @@ -1,6 +1,6 @@