Skip to content

Commit 3b0313d

Browse files
committed
Cleanups after refactoring
1 parent 90a0ff7 commit 3b0313d

File tree

5 files changed

+13
-22
lines changed

5 files changed

+13
-22
lines changed

examples/topics/aggregations/group_by/code.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
['id' => 9, 'group' => 'A'],
2020
['id' => 10, 'group' => 'B'],
2121
]))
22-
->groupBy(ref('group'))
22+
->groupBy(ref('group')) // GroupedDataFrame
23+
->toDF() // DataFrame
2324
->write(to_stream(__DIR__ . '/output.txt', truncate: false))
2425
->run();

src/core/etl/src/Flow/ETL/DataFrame.php

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public function aggregate(AggregatingFunction ...$aggregations) : self
109109
$groupBy = new GroupBy();
110110
$groupBy->aggregate(...$aggregations);
111111

112-
$this->pipeline = new LinkedPipeline(new GroupByPipeline($groupBy, $this->pipeline), new SynchronousPipeline());
112+
$this->pipeline = new LinkedPipeline(new GroupByPipeline($groupBy, $this->pipeline));
113113

114114
return $this;
115115
}
@@ -135,7 +135,7 @@ public function autoCast() : self
135135
*/
136136
public function batchSize(int $size) : self
137137
{
138-
$this->pipeline = new LinkedPipeline(new BatchingPipeline($this->pipeline, $size), new SynchronousPipeline());
138+
$this->pipeline = new LinkedPipeline(new BatchingPipeline($this->pipeline, $size));
139139

140140
return $this;
141141
}
@@ -159,7 +159,7 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
159159
}
160160

161161
$this->batchSize($cacheBatchSize ?? $this->context->config->cacheBatchSize());
162-
$this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id), new SynchronousPipeline());
162+
$this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id));
163163

164164
return $this;
165165
}
@@ -172,7 +172,7 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
172172
*/
173173
public function collect() : self
174174
{
175-
$this->pipeline = new LinkedPipeline(new CollectingPipeline($this->pipeline), new SynchronousPipeline());
175+
$this->pipeline = new LinkedPipeline(new CollectingPipeline($this->pipeline));
176176

177177
return $this;
178178
}
@@ -459,7 +459,7 @@ public function join(self $dataFrame, Expression $on, string|Join $type = Join::
459459
$type = Join::from($type);
460460
}
461461

462-
$this->pipeline = new LinkedPipeline(new HashJoinPipeline($this->pipeline, $dataFrame, $on, $type), new SynchronousPipeline());
462+
$this->pipeline = new LinkedPipeline(new HashJoinPipeline($this->pipeline, $dataFrame, $on, $type));
463463

464464
return $this;
465465
}
@@ -556,7 +556,7 @@ public function partitionBy(string|Reference $entry, string|Reference ...$entrie
556556
{
557557
\array_unshift($entries, $entry);
558558

559-
$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, References::init(...$entries)->all()), new SynchronousPipeline());
559+
$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, References::init(...$entries)->all()));
560560

561561
return $this;
562562
}
@@ -852,7 +852,7 @@ public function withEntry(string $entryName, ScalarFunction|WindowFunction $ref)
852852
{
853853
if ($ref instanceof WindowFunction) {
854854
if (\count($ref->window()->partitions())) {
855-
$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, $ref->window()->partitions(), $ref->window()->order()), new SynchronousPipeline());
855+
$this->pipeline = new LinkedPipeline(new PartitioningPipeline($this->pipeline, $ref->window()->partitions(), $ref->window()->order()));
856856
} else {
857857
$this->collect();
858858

src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public function toDataFrame() : DataFrame
3232
{
3333
return $this->df->rebuild(function (Pipeline $pipeline, FlowContext $context) : DataFrame {
3434
return new DataFrame(
35-
new Pipeline\LinkedPipeline(new Pipeline\GroupByPipeline($this->groupBy, $pipeline), new Pipeline\SynchronousPipeline()),
35+
new Pipeline\LinkedPipeline(new Pipeline\GroupByPipeline($this->groupBy, $pipeline)),
3636
$context
3737
);
3838
});

src/core/etl/src/Flow/ETL/Pipeline/HashJoinPipeline.php

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
use Flow\ETL\Exception\{DuplicatedEntriesException, JoinException};
99
use Flow\ETL\Hash\NativePHPHash;
1010
use Flow\ETL\Join\{Expression, Join};
11-
use Flow\ETL\Loader\Closure;
1211
use Flow\ETL\Pipeline\HashJoin\HashTable;
1312
use Flow\ETL\Row\Entry;
1413
use Flow\ETL\{DataFrame, Extractor, FlowContext, Loader, Pipeline, Row, Rows, Transformer};
@@ -29,23 +28,14 @@ public function __construct(
2928

3029
public function add(Loader|Transformer $pipe) : Pipeline
3130
{
32-
$this->left->pipes()->add($pipe);
31+
$this->left->add($pipe);
3332

3433
return $this;
3534
}
3635

37-
public function closure(FlowContext $context) : void
38-
{
39-
foreach ($this->left->pipes()->all() as $pipe) {
40-
if ($pipe instanceof Loader && $pipe instanceof Closure) {
41-
$pipe->closure($context);
42-
}
43-
}
44-
}
45-
4636
public function has(string $transformerClass) : bool
4737
{
48-
return $this->left->pipes()->has($transformerClass);
38+
return $this->left->has($transformerClass);
4939
}
5040

5141
public function pipes() : Pipes

src/core/etl/src/Flow/ETL/Pipeline/Optimizer/BatchSizeOptimization.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public function optimize(Loader|Transformer $element, Pipeline $pipeline) : Pipe
7575
return $pipeline;
7676
}
7777

78-
$pipeline = new Pipeline\LinkedPipeline(new BatchingPipeline($pipeline, $this->batchSize), new Pipeline\SynchronousPipeline());
78+
$pipeline = new Pipeline\LinkedPipeline(new BatchingPipeline($pipeline, $this->batchSize));
7979
$pipeline->add($element);
8080

8181
return $pipeline;

0 commit comments

Comments
 (0)