Skip to content

Commit

Permalink
Removed GroupedDataFrame::toDF()
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Apr 25, 2024
1 parent 3b0313d commit 656bcc3
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 50 deletions.
5 changes: 5 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ Above changes were introduced in all 3 types of joins:
- `DataFrame::joinEach()`
- `DataFrame::crossJoin()`

### 2) GroupBy

From now on, DataFrame::groupBy() method will return GroupedDataFrame object, which is nothing more than a GroupBy
statement Builder. In order to get the results you first need to define the aggregation functions or optionally pivot the data.

## Upgrading from 0.6.x to 0.7.x

### 1) DataFrame::appendSafe() method was removed
Expand Down
18 changes: 4 additions & 14 deletions src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ public function aggregate(AggregatingFunction ...$aggregations) : DataFrame
{
$this->groupBy->aggregate(...$aggregations);

return $this->toDataFrame();
}

public function pivot(Reference $ref) : self
{
$this->groupBy->pivot($ref);

return $this;
}

public function toDataFrame() : DataFrame
{
return $this->df->rebuild(function (Pipeline $pipeline, FlowContext $context) : DataFrame {
return new DataFrame(
new Pipeline\LinkedPipeline(new Pipeline\GroupByPipeline($this->groupBy, $pipeline)),
Expand All @@ -38,8 +26,10 @@ public function toDataFrame() : DataFrame
});
}

public function toDF() : DataFrame
public function pivot(Reference $ref) : self
{
return $this->toDataFrame();
$this->groupBy->pivot($ref);

return $this;
}
}
7 changes: 0 additions & 7 deletions src/core/etl/src/Flow/ETL/Pipeline/HashJoinPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,6 @@ public function process(FlowContext $context) : \Generator
}
}

public function setSource(Extractor $extractor) : Pipeline
{
$this->extractor = $extractor;

return $this;
}

public function source() : Extractor
{
return $this->extractor;
Expand Down
7 changes: 0 additions & 7 deletions src/core/etl/src/Flow/ETL/Pipeline/SynchronousPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,6 @@ public function process(FlowContext $context) : \Generator
}
}

public function setSource(Extractor $extractor) : self
{
$this->extractor = $extractor;

return $this;
}

public function source() : Extractor
{
return $this->extractor;
Expand Down
5 changes: 0 additions & 5 deletions src/core/etl/src/Flow/ETL/Pipeline/VoidPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ public function process(FlowContext $context) : \Generator
yield new Rows();
}

public function setSource(Extractor $extractor) : self
{
return $this;
}

public function source() : Extractor
{
return $this->pipeline->source();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,23 @@

namespace Flow\ETL\Tests\Integration\DataFrame;

use function Flow\ETL\DSL\{average, count, df, float_entry, from_all, from_array, from_memory, from_rows, int_entry, integer_entry, lit, max, rank, ref, str_entry, sum, window};
use function Flow\ETL\DSL\{average,
count,
df,
float_entry,
from_all,
from_array,
from_memory,
from_rows,
int_entry,
integer_entry,
lit,
max,
rank,
ref,
str_entry,
sum,
window};
use Flow\ETL\Memory\ArrayMemory;
use Flow\ETL\Tests\Integration\IntegrationTestCase;
use Flow\ETL\{Loader, Row, Rows};
Expand Down Expand Up @@ -32,19 +48,20 @@ public function test_group_by_multiple_columns_and_batch_size() : void
)
))
->groupBy('country', 'gender')
->toDF()
->aggregate(average(ref('age')))
->withEntry('age_avg', ref('age_avg')->round(lit(2)))
->batchSize(1)
->write($loader)
->fetch();

self::assertEquals(
new Rows(
Row::create(str_entry('country', 'PL'), str_entry('gender', 'male')),
Row::create(str_entry('country', 'PL'), str_entry('gender', 'female')),
Row::create(str_entry('country', 'US'), str_entry('gender', 'female')),
Row::create(str_entry('country', 'US'), str_entry('gender', 'male')),
),
$rows
[
['country' => 'PL', 'gender' => 'male', 'age_avg' => 21.67],
['country' => 'PL', 'gender' => 'female', 'age_avg' => 30.0],
['country' => 'US', 'gender' => 'female', 'age_avg' => 42.5],
['country' => 'US', 'gender' => 'male', 'age_avg' => 45],
],
$rows->toArray()
);
}

Expand Down Expand Up @@ -125,15 +142,15 @@ public function test_group_by_single_column() : void
)
))
->groupBy('country')
->toDF()
->aggregate(sum(ref('age')))
->fetch();

self::assertEquals(
new Rows(
Row::create(str_entry('country', 'PL')),
Row::create(str_entry('country', 'US')),
),
$rows
[
['country' => 'PL', 'age_sum' => 95],
['country' => 'US', 'age_sum' => 175],
],
$rows->toArray()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
namespace Flow\ETL\Tests\Integration\DataFrame;

use function Flow\ETL\Adapter\Text\{from_text, to_text};
use function Flow\ETL\DSL\{df,
use function Flow\ETL\DSL\{collect,
df,
from_array,
from_rows,
int_entry,
Expand Down Expand Up @@ -195,7 +196,7 @@ public function test_pruning_single_partition() : void
->select('year')
->withEntry('year', ref('year')->cast('int'))
->groupBy(ref('year'))
->toDF()
->aggregate(collect(ref('year')))
->fetch();

self::assertCount(1, $rows);
Expand Down

0 comments on commit 656bcc3

Please sign in to comment.