Skip to content

Commit

Permalink
Adds a closure method to TransformerLoader (#1495)
Browse files Browse the repository at this point in the history
* Adds a closure method to TransformerLoader to delegate generator stop signals to proxied loaders

* Fix cs issues

* Add TransformerLoader test case to test Closure loaders
  • Loading branch information
christianc1 authored Feb 23, 2025
1 parent 8dbfd2f commit 48ec14a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 16 deletions.
9 changes: 8 additions & 1 deletion src/core/etl/src/Flow/ETL/Loader/TransformerLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@
use function Flow\ETL\DSL\{df, from_rows};
use Flow\ETL\{FlowContext, Loader, Rows, Transformation, Transformer};

final readonly class TransformerLoader implements Loader, OverridingLoader
final readonly class TransformerLoader implements Closure, Loader, OverridingLoader
{
public function __construct(
private Transformer|Transformation $transformer,
private Loader $loader,
) {
}

public function closure(FlowContext $context) : void
{
if ($this->loader instanceof Closure) {
$this->loader->closure($context);
}
}

public function load(Rows $rows, FlowContext $context) : void
{
if ($this->transformer instanceof Transformer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use function Flow\ETL\DSL\{config, rows};
use function Flow\ETL\DSL\{df, flow_context, from_array, ref, to_memory, to_transformation, type_string};
use Flow\ETL\{DataFrame, Loader, Memory\ArrayMemory, Tests\FlowTestCase, Transformation, Transformer};
use Flow\ETL\{DataFrame, FlowContext, Loader, Loader\Closure, Memory\ArrayMemory, Tests\FlowTestCase, Transformation, Transformer};

final class TransformerLoaderTest extends FlowTestCase
{
Expand All @@ -29,23 +29,66 @@ public function test_transformer_loader() : void
$transformer->load(rows(), flow_context(config()));
}

/**
* Tests that the closure method is called when using a Closure loader.
*/
public function test_transformer_loader_with_closure() : void
{
$closure_loader = $this->createMockForIntersectionOfInterfaces([Loader::class, Closure::class]);

$closure_loader->expects(self::once())
->method('closure')
->with(self::isInstanceOf(FlowContext::class));

$transformer = to_transformation(
new class implements Transformation {
public function transform(DataFrame $data_frame) : DataFrame
{
return $data_frame;
}
},
$closure_loader
);

df()
->read(
from_array(
[
['id' => 1],
['id' => 2],
['id' => 3],
]
)
)
->write(
$transformer
)
->run();
}

public function test_transformer_loader_with_transformation() : void
{
df()
->read(from_array([
['id' => 1],
['id' => 2],
['id' => 3],
]))
->write(to_transformation(
new class implements Transformation {
public function transform(DataFrame $dataFrame) : DataFrame
{
return $dataFrame->withEntry('id_string', ref('id')->cast(type_string()));
}
},
to_memory($memory = new ArrayMemory())
))
->read(
from_array(
[
['id' => 1],
['id' => 2],
['id' => 3],
]
)
)
->write(
to_transformation(
new class implements Transformation {
public function transform(DataFrame $dataFrame) : DataFrame
{
return $dataFrame->withEntry('id_string', ref('id')->cast(type_string()));
}
},
to_memory($memory = new ArrayMemory())
)
)
->run();

self::assertEquals(
Expand Down

0 comments on commit 48ec14a

Please sign in to comment.