Skip to content

Commit

Permalink
Fixed replacing instance of dataframe when using aggregate method (#1057
Browse files Browse the repository at this point in the history
)

* Fixed replacing instance of dataframe when using aggregate method

* Removed DataFrame::rebuild
  • Loading branch information
norberttech authored Apr 27, 2024
1 parent 9c06b72 commit 42fe036
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 20 deletions.
10 changes: 0 additions & 10 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -605,16 +605,6 @@ public function printSchema(?int $limit = 20, Schema\SchemaFormatter $formatter
$clone->run();
}

/**
* @lazy
*
* @param callable(Pipeline $pipeline, FlowContext $context) : DataFrame $callback
*/
public function rebuild(callable $callback) : self
{
return $callback($this->pipeline, $this->context);
}

/**
* @lazy
*/
Expand Down
19 changes: 12 additions & 7 deletions src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@

use Flow\ETL\Function\AggregatingFunction;
use Flow\ETL\Row\Reference;
use Flow\ETL\{DataFrame, FlowContext, GroupBy, Pipeline};
use Flow\ETL\{DataFrame, GroupBy, Pipeline};

final class GroupedDataFrame
{
/**
* @var \ReflectionClass<DataFrame>
*/
private \ReflectionClass $dataFrameReflection;

public function __construct(private readonly DataFrame $df, private readonly GroupBy $groupBy)
{
$this->dataFrameReflection = new \ReflectionClass($this->df);
}

public function aggregate(AggregatingFunction ...$aggregations) : DataFrame
{
$this->groupBy->aggregate(...$aggregations);

return $this->df->rebuild(function (Pipeline $pipeline, FlowContext $context) : DataFrame {
return new DataFrame(
new Pipeline\LinkedPipeline(new Pipeline\GroupByPipeline($this->groupBy, $pipeline)),
$context
);
});
$pipelineProperty = $this->dataFrameReflection->getProperty('pipeline');
$currentPipeline = $pipelineProperty->getValue($this->df);
$pipelineProperty->setValue($this->df, new Pipeline\LinkedPipeline(new Pipeline\GroupByPipeline($this->groupBy, $currentPipeline)));

return $this->df;
}

public function pivot(Reference $ref) : self
Expand Down
4 changes: 1 addition & 3 deletions src/core/etl/src/Flow/ETL/Pipeline/LinkedPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public function pipes() : Pipes

public function process(FlowContext $context) : \Generator
{
foreach ($this->nextPipeline->process($context) as $rows) {
yield $rows;
}
return $this->nextPipeline->process($context);
}

public function source() : Extractor
Expand Down

0 comments on commit 42fe036

Please sign in to comment.