Skip to content

Commit

Permalink
Improved DX on adding path filter to file extractors (#1235)
Browse files Browse the repository at this point in the history
* Improved DX on adding path filter to file extractors

* Removed PartitionExtractor replaced with FileExtractor interface
  • Loading branch information
norberttech authored Sep 26, 2024
1 parent f065daf commit 926b66e
Show file tree
Hide file tree
Showing 12 changed files with 43 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace Flow\ETL\Adapter\Avro\FlixTech;

use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor};
use Flow\ETL\{Exception\RuntimeException, Extractor, FlowContext};
use Flow\Filesystem\Path;

final class AvroExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class AvroExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Extractor\PathFiltering;
use Limitable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace Flow\ETL\Adapter\CSV;

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
use Flow\ETL\Row\Schema;
use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext};
use Flow\Filesystem\Path;

final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class CSVExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;
use PathFiltering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
namespace Flow\ETL\Adapter\JSON\JSONMachine;

use function Flow\ETL\DSL\{array_to_rows};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
use Flow\ETL\Row\Schema;
use Flow\ETL\{Extractor, FlowContext};
use Flow\Filesystem\Path;
use JsonMachine\Items;
use JsonMachine\JsonDecoder\ExtJsonDecoder;

final class JsonExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class JsonExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;
use PathFiltering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext};
use Flow\Filesystem\{Path, SourceStream};
use Flow\Parquet\{ByteOrder, Options, ParquetFile, Reader};

final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;
use PathFiltering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace Flow\ETL\Adapter\Text;

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
use Flow\ETL\{Extractor, FlowContext};
use Flow\Filesystem\Path;

final class TextExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class TextExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;
use PathFiltering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
use Flow\ETL\{Extractor, FlowContext};
use Flow\Filesystem\Path;

final class XMLParserExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class XMLParserExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;
use PathFiltering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace Flow\ETL\Adapter\XML;

use function Flow\ETL\DSL\array_to_rows;
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PartitionExtractor, PathFiltering, Signal};
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
use Flow\ETL\{Exception\InvalidArgumentException, Extractor, FlowContext};
use Flow\Filesystem\Path;

final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class XMLReaderExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;
use PathFiltering;
Expand Down
10 changes: 5 additions & 5 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Flow\ETL\DataFrame\GroupedDataFrame;
use Flow\ETL\Dataset\{Report, Statistics};
use Flow\ETL\Exception\{InvalidArgumentException, InvalidFileFormatException, RuntimeException};
use Flow\ETL\Extractor\{PartitionExtractor};
use Flow\ETL\Extractor\FileExtractor;
use Flow\ETL\Filesystem\{SaveMode, ScalarFunctionFilter};
use Flow\ETL\Formatter\AsciiTableFormatter;
use Flow\ETL\Function\{AggregatingFunction, ScalarFunction, WindowFunction};
Expand Down Expand Up @@ -368,17 +368,17 @@ public function filterPartitions(Filter|ScalarFunction $filter) : self
{
$extractor = $this->pipeline->source();

if (!$extractor instanceof PartitionExtractor) {
throw new RuntimeException('filterPartitions can be used only with extractors that implement PartitionsExtractor interface');
if (!$extractor instanceof FileExtractor) {
throw new RuntimeException('filterPartitions can be used only with extractors that implement FileExtractor interface');
}

if ($filter instanceof Filter) {
$extractor->addFilter($filter);
$extractor->withPathFilter($filter);

return $this;
}

$extractor->addFilter(
$extractor->withPathFilter(
new ScalarFunctionFilter(
$filter,
$this->context->entryFactory(),
Expand Down
7 changes: 6 additions & 1 deletion src/core/etl/src/Flow/ETL/Extractor/FileExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@

interface FileExtractor
{
public function addFilter(Filter $filter) : void;
/**
* @deprecated Use withPathFilter instead
*/
public function addFilter(Filter $filter) : self;

public function filter() : Filter;

public function source() : Path;

public function withPathFilter(Filter $filter) : self;
}
14 changes: 0 additions & 14 deletions src/core/etl/src/Flow/ETL/Extractor/PartitionExtractor.php

This file was deleted.

24 changes: 17 additions & 7 deletions src/core/etl/src/Flow/ETL/Extractor/PathFiltering.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,35 @@ trait PathFiltering
{
private ?Filter $filter = null;

public function addFilter(Filter $filter) : void
/**
* @deprecated Use withPathFilter instead
*/
public function addFilter(Filter $filter) : self
{
return $this->withPathFilter($filter);
}

public function filter() : Filter
{
return $this->filter ?? new OnlyFiles();
}

public function withPathFilter(Filter $filter) : self
{
if ($this->filter === null) {
$this->filter = $filter;

return;
return $this;
}

if ($this->filter instanceof Filters) {
$this->filter = $this->filter->add($filter);

return;
return $this;
}

$this->filter = new Filters($this->filter, $filter);
}

public function filter() : Filter
{
return $this->filter ?? new OnlyFiles();
return $this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Flow\ETL\{Extractor, FlowContext};
use Flow\Filesystem\{Partition, Path};

final class PathPartitionsExtractor implements Extractor, FileExtractor, LimitableExtractor, PartitionExtractor
final class PathPartitionsExtractor implements Extractor, FileExtractor, LimitableExtractor
{
use Limitable;
use PathFiltering;
Expand Down

0 comments on commit 926b66e

Please sign in to comment.