Skip to content

Commit

Permalink
Added DataFrame::dropPartitions() transformation (#922)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Jan 18, 2024
1 parent 349f4d2 commit 75d183d
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
use Flow\ETL\Transformer\CallbackRowTransformer;
use Flow\ETL\Transformer\CrossJoinRowsTransformer;
use Flow\ETL\Transformer\DropDuplicatesTransformer;
use Flow\ETL\Transformer\DropPartitionsTransformer;
use Flow\ETL\Transformer\EntryNameStyleConverterTransformer;
use Flow\ETL\Transformer\JoinEachRowsTransformer;
use Flow\ETL\Transformer\JoinRowsTransformer;
Expand Down Expand Up @@ -300,6 +301,13 @@ public function dropDuplicates(string|Reference ...$entries) : self
return $this;
}

public function dropPartitions() : self
{
$this->pipeline->add(new DropPartitionsTransformer());

return $this;
}

/**
* Be aware that fetch is not memory safe and will load all rows into memory.
* If you want to safely iterate over Rows use oe of the following methods:.
Expand Down
5 changes: 5 additions & 0 deletions src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ public function drop(int $size) : self
return self::partitioned(\array_slice($this->rows, $size), $this->partitions);
}

public function dropPartitions() : self
{
return new self(...$this->rows);
}

public function dropRight(int $size) : self
{
if ($size === 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Transformer;

use Flow\ETL\FlowContext;
use Flow\ETL\Rows;
use Flow\ETL\Transformer;

final class DropPartitionsTransformer implements Transformer
{
public function transform(Rows $rows, FlowContext $context) : Rows
{
if ($rows->isPartitioned()) {
return $rows->dropPartitions();
}

return $rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@

final class PartitioningTest extends IntegrationTestCase
{
public function test_dropping_partitions() : void
{
$rows = df()
->read(from_rows(
rows_partitioned(
[
row(int_entry('id', 1), str_entry('country', 'PL'), int_entry('age', 20)),
row(int_entry('id', 2), str_entry('country', 'PL'), int_entry('age', 20)),
row(int_entry('id', 3), str_entry('country', 'PL'), int_entry('age', 25)),
row(int_entry('id', 4), str_entry('country', 'PL'), int_entry('age', 30)),
],
[
partition('country', 'PL'),
]
)
))
->dropPartitions()
->fetch();

$this->assertFalse($rows->isPartitioned());
}

public function test_partition_by() : void
{
$rows = df()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Unit\Transformer;

use function Flow\ETL\DSL\array_to_rows;
use function Flow\ETL\DSL\flow_context;
use function Flow\ETL\DSL\ref;
use Flow\ETL\Transformer\DropPartitionsTransformer;
use PHPUnit\Framework\TestCase;

final class DropPartitionsTransformerTest extends TestCase
{
public function test_dropping_partitions() : void
{
$partitioned = array_to_rows([
['id' => 1, 'name' => 'one', 'category' => 'a'],
['id' => 2, 'name' => 'two', 'category' => 'a'],
['id' => 3, 'name' => 'three', 'category' => 'a'],
['id' => 4, 'name' => 'four', 'category' => 'a'],
['id' => 5, 'name' => 'five', 'category' => 'a'],
['id' => 6, 'name' => 'six', 'category' => 'b'],
['id' => 7, 'name' => 'seven', 'category' => 'b'],
['id' => 8, 'name' => 'eight', 'category' => 'b'],
['id' => 9, 'name' => 'nine', 'category' => 'b'],
['id' => 10, 'name' => 'ten', 'category' => 'b'],
])->partitionBy(ref('category'));

foreach ($partitioned as $rows) {
$this->assertTrue($rows->isPartitioned());

$notPartitioned = (new DropPartitionsTransformer())->transform($rows, flow_context());

$this->assertFalse($notPartitioned->isPartitioned());
}
}

public function test_transforming_not_partitioned_rows() : void
{
$rows = array_to_rows([
['id' => 1, 'name' => 'one', 'category' => 'a'],
['id' => 2, 'name' => 'two', 'category' => 'a'],
['id' => 3, 'name' => 'three', 'category' => 'a'],
['id' => 4, 'name' => 'four', 'category' => 'a'],
['id' => 5, 'name' => 'five', 'category' => 'a'],
['id' => 6, 'name' => 'six', 'category' => 'b'],
['id' => 7, 'name' => 'seven', 'category' => 'b'],
['id' => 8, 'name' => 'eight', 'category' => 'b'],
['id' => 9, 'name' => 'nine', 'category' => 'b'],
['id' => 10, 'name' => 'ten', 'category' => 'b'],
]);

$this->assertSame(
$rows,
(new DropPartitionsTransformer())->transform($rows, flow_context())
);
}
}

0 comments on commit 75d183d

Please sign in to comment.