Skip to content

Commit dd79352

Browse files
committed
Removed Pipeline::setSource
1 parent fa94e65 commit dd79352

16 files changed

+49
-110
lines changed

src/core/etl/src/Flow/ETL/DataFrame.php

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,7 @@ final class DataFrame
3939

4040
public function __construct(private Pipeline $pipeline, Config|FlowContext $context)
4141
{
42-
$this->context = $context instanceof FlowContext ? $context : new FlowContext($context);
43-
}
44-
45-
/**
46-
* @lazy
47-
* @param callable(Pipeline $pipeline, FlowContext $context) : DataFrame $callback
48-
*/
49-
public function rebuild(callable $callback) : self
50-
{
51-
return $callback($this->pipeline, $this->context);
42+
$this->context = $context instanceof FlowContext ? $context : new FlowContext($context);
5243
}
5344

5445
/**
@@ -614,6 +605,16 @@ public function printSchema(?int $limit = 20, Schema\SchemaFormatter $formatter
614605
$clone->run();
615606
}
616607

608+
/**
609+
* @lazy
610+
*
611+
* @param callable(Pipeline $pipeline, FlowContext $context) : DataFrame $callback
612+
*/
613+
public function rebuild(callable $callback) : self
614+
{
615+
return $callback($this->pipeline, $this->context);
616+
}
617+
617618
/**
618619
* @lazy
619620
*/
@@ -772,8 +773,7 @@ public function sortBy(Reference ...$entries) : self
772773
->cache($this->context->config->id())
773774
->run();
774775

775-
$this->pipeline = new SynchronousPipeline();
776-
$this->pipeline->setSource($this->context->config->externalSort()->sortBy(...$entries));
776+
$this->pipeline = new SynchronousPipeline($this->context->config->externalSort()->sortBy(...$entries));
777777

778778
return $this;
779779
}

src/core/etl/src/Flow/ETL/DataFrame/GroupedDataFrame.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ public function pivot(Reference $ref) : self
2828
return $this;
2929
}
3030

31-
public function toDataFrame(): DataFrame
31+
public function toDataFrame() : DataFrame
3232
{
33-
return $this->df->rebuild(function (Pipeline $pipeline, FlowContext $context): DataFrame {
33+
return $this->df->rebuild(function (Pipeline $pipeline, FlowContext $context) : DataFrame {
3434
return new DataFrame(
3535
new Pipeline\LinkedPipeline(new Pipeline\GroupByPipeline($this->groupBy, $pipeline), new Pipeline\SynchronousPipeline()),
3636
$context
3737
);
3838
});
3939
}
4040

41-
public function toDF(): DataFrame
41+
public function toDF() : DataFrame
4242
{
4343
return $this->toDataFrame();
4444
}

src/core/etl/src/Flow/ETL/Flow.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public static function setUp(ConfigBuilder|Config $config) : self
2828
public function extract(Extractor $extractor) : DataFrame
2929
{
3030
return new DataFrame(
31-
(new SynchronousPipeline())->setSource($extractor),
31+
(new SynchronousPipeline($extractor)),
3232
$this->config
3333
);
3434
}
@@ -41,7 +41,7 @@ public function from(Extractor $extractor) : DataFrame
4141
public function process(Rows ...$rows) : DataFrame
4242
{
4343
return new DataFrame(
44-
(new SynchronousPipeline())->setSource(new ProcessExtractor(...$rows)),
44+
(new SynchronousPipeline(new ProcessExtractor(...$rows))),
4545
$this->config
4646
);
4747
}

src/core/etl/src/Flow/ETL/Pipeline.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,5 @@ public function pipes() : Pipes;
2424
*/
2525
public function process(FlowContext $context) : \Generator;
2626

27-
public function setSource(Extractor $extractor) : self;
28-
2927
public function source() : Extractor;
3028
}

src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
final class BatchingPipeline implements Pipeline
1212
{
13-
1413
/**
1514
* @param Pipeline $pipeline
1615
* @param int<1, max> $size
@@ -51,13 +50,6 @@ public function process(FlowContext $context) : \Generator
5150
return chunks_from(from_pipeline($this->pipeline), $this->size)->extract($context);
5251
}
5352

54-
public function setSource(Extractor $extractor) : self
55-
{
56-
$this->pipeline->setSource($extractor);
57-
58-
return $this;
59-
}
60-
6153
public function source() : Extractor
6254
{
6355
return $this->pipeline->source();

src/core/etl/src/Flow/ETL/Pipeline/CachingPipeline.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
namespace Flow\ETL\Pipeline;
66

7-
use function Flow\ETL\DSL\from_rows;
87
use Flow\ETL\{Extractor, FlowContext, Loader, Pipeline, Transformer};
98

109
final class CachingPipeline implements Pipeline
@@ -49,13 +48,6 @@ public function process(FlowContext $context) : \Generator
4948
}
5049
}
5150

52-
public function setSource(Extractor $extractor) : Pipeline
53-
{
54-
$this->pipeline->setSource($extractor);
55-
56-
return $this;
57-
}
58-
5951
public function source() : Extractor
6052
{
6153
return $this->pipeline->source();

src/core/etl/src/Flow/ETL/Pipeline/CollectingPipeline.php

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
namespace Flow\ETL\Pipeline;
66

7-
use function Flow\ETL\DSL\from_rows;
87
use Flow\ETL\Exception\InvalidArgumentException;
98
use Flow\ETL\{Extractor, FlowContext, Loader, Pipeline, Rows, Transformer};
109

@@ -55,13 +54,6 @@ public function process(FlowContext $context) : \Generator
5554
yield $rows;
5655
}
5756

58-
public function setSource(Extractor $extractor) : self
59-
{
60-
$this->pipeline->setSource($extractor);
61-
62-
return $this;
63-
}
64-
6557
public function source() : Extractor
6658
{
6759
return $this->pipeline->source();

src/core/etl/src/Flow/ETL/Pipeline/GroupByPipeline.php

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,6 @@ public function process(FlowContext $context) : \Generator
4646
yield $this->groupBy->result($context);
4747
}
4848

49-
public function setSource(Extractor $extractor) : self
50-
{
51-
$this->pipeline->setSource($extractor);
52-
53-
return $this;
54-
}
55-
5649
public function source() : Extractor
5750
{
5851
return $this->pipeline->source();

src/core/etl/src/Flow/ETL/Pipeline/LinkedPipeline.php

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88

99
final class LinkedPipeline implements OverridingPipeline, Pipeline
1010
{
11+
private readonly Pipeline $nextPipeline;
12+
1113
public function __construct(
1214
private readonly Pipeline $pipeline,
13-
private readonly Pipeline $nextPipeline
1415
) {
16+
$this->nextPipeline = new SynchronousPipeline(new Extractor\PipelineExtractor($this->pipeline));
1517
}
1618

1719
public function add(Loader|Transformer $pipe) : Pipeline
@@ -60,18 +62,11 @@ public function pipes() : Pipes
6062

6163
public function process(FlowContext $context) : \Generator
6264
{
63-
foreach ($this->nextPipeline->setSource(new Extractor\PipelineExtractor($this->pipeline))->process($context) as $rows) {
65+
foreach ($this->nextPipeline->process($context) as $rows) {
6466
yield $rows;
6567
}
6668
}
6769

68-
public function setSource(Extractor $extractor) : Pipeline
69-
{
70-
$this->pipeline->setSource($extractor);
71-
72-
return $this;
73-
}
74-
7570
public function source() : Extractor
7671
{
7772
return $this->pipeline->source();

src/core/etl/src/Flow/ETL/Pipeline/PartitioningPipeline.php

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,6 @@ public function process(FlowContext $context) : \Generator
7878
)->extract($context);
7979
}
8080

81-
public function setSource(Extractor $extractor) : Pipeline
82-
{
83-
$this->pipeline->setSource($extractor);
84-
85-
return $this;
86-
}
87-
8881
public function source() : Extractor
8982
{
9083
return $this->pipeline->source();

0 commit comments

Comments
 (0)