Skip to content

Commit

Permalink
Added DataFrame with function (#1392)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Jan 19, 2025
1 parent e89fae7 commit deaf5b3
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 21 deletions.
2 changes: 1 addition & 1 deletion examples/topics/data_reading/elasticsearch/code.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@
]
))
->write(to_stream(__DIR__ . '/output.raw.txt', truncate: false))
->transform(es_hits_to_rows())
->with(es_hits_to_rows())
->write(to_stream(__DIR__ . '/output.txt', truncate: false))
->run();
2 changes: 1 addition & 1 deletion examples/topics/data_writing/elasticsearch/code.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@
]
))
->write(to_stream(__DIR__ . '/output.raw.txt', truncate: false))
->transform(es_hits_to_rows())
->with(es_hits_to_rows())
->write(to_stream(__DIR__ . '/output.txt', truncate: false))
->run();
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public function test_extraction_index_with_from_and_size() : void

$results = (data_frame())
->extract(from_es($this->elasticsearchContext->clientConfig(), $params))
->transform(es_hits_to_rows(DocumentDataSource::fields))
->with(es_hits_to_rows(DocumentDataSource::fields))
->fetch();

self::assertCount(2000, $results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function test_extraction_index_with_from_and_size() : void

$results = (data_frame())
->extract(from_meilisearch($this->meilisearchContext->clientConfig(), $params, self::INDEX_NAME))
->transform(meilisearch_hits_to_rows())
->with(meilisearch_hits_to_rows())
->fetch();

self::assertCount(49, $results);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private function stream() : void
{
df($this->config)
->read($this->extractor)
->transform($this->transformations)
->with($this->transformations)
->dropPartitions()
->write($this->output->loader())
->run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function test_masking_columns_transformation() : void
['id' => 3, 'name' => 'John Smith', 'salary' => 9000, 'currency' => 'USD'],
['id' => 4, 'name' => 'Jane Smith', 'salary' => 10000, 'currency' => 'USD'],
]))
->transform(new MaskColumns(['salary']))
->with(new MaskColumns(['salary']))
->fetch()
->toArray();

Expand Down
30 changes: 20 additions & 10 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public function collect() : self
#[DSLMethod(exclude: true)]
public function collectRefs(References $references) : self
{
$this->transform(new CallbackRowTransformer(function (Row $row) use ($references) : Row {
$this->with(new CallbackRowTransformer(function (Row $row) use ($references) : Row {
foreach ($row->entries()->all() as $entry) {
$references->add($entry->ref());
}
Expand Down Expand Up @@ -739,7 +739,7 @@ public function reorderEntries(Comparator $comparator = new TypeComparator()) :
*/
public function rows(Transformer|Transformation $transformer) : self
{
return $this->transform($transformer);
return $this->with($transformer);
}

/**
Expand Down Expand Up @@ -826,17 +826,13 @@ public function sortBy(Reference ...$entries) : self
}

/**
* Alias for DataFrame::with().
*
* @lazy
*/
public function transform(Transformer|Transformation $transformer) : self
{
if ($transformer instanceof Transformer) {
$this->pipeline->add($transformer);

return $this;
}

return $transformer->transform($this);
return $this->with($transformer);
}

/**
Expand Down Expand Up @@ -878,6 +874,20 @@ public function void() : self
return $this;
}

/**
* @lazy
*/
public function with(Transformer|Transformation $transformer) : self
{
if ($transformer instanceof Transformer) {
$this->pipeline->add($transformer);

return $this;
}

return $transformer->transform($this);
}

/**
* @lazy
*
Expand Down Expand Up @@ -910,7 +920,7 @@ public function withEntry(string|Definition $entry, ScalarFunction|WindowFunctio

$this->pipeline->add(new WindowFunctionTransformer($entry, $reference));
} else {
$this->transform(new ScalarFunctionTransformer($entry, $reference));
$this->with(new ScalarFunctionTransformer($entry, $reference));
}

return $this;
Expand Down
4 changes: 2 additions & 2 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/DataFrameTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public function extract(FlowContext $context) : \Generator
}
}
)
->transform(
->with(
new class implements Transformer {
public function transform(Rows $rows, FlowContext $context) : Rows
{
Expand Down Expand Up @@ -545,7 +545,7 @@ public function extract(FlowContext $context) : \Generator
}
}
)
->transform(
->with(
new class implements Transformer {
public function transform(Rows $rows, FlowContext $context) : Rows
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function load(Rows $rows, FlowContext $context) : void
(data_frame())
->extract($extractor)
->onError(throw_error_handler())
->transform($brokenTransformer)
->with($brokenTransformer)
->load($loader)
->run();
}
Expand Down Expand Up @@ -90,7 +90,7 @@ public function load(Rows $rows, FlowContext $context) : void
(data_frame())
->extract($extractor)
->onError(ignore_error_handler())
->transform($brokenTransformer)
->with($brokenTransformer)
->load($loader)
->run();

Expand Down Expand Up @@ -152,7 +152,7 @@ public function load(Rows $rows, FlowContext $context) : void
(data_frame())
->extract($extractor)
->onError(skip_rows_handler())
->transform($brokenTransformer)
->with($brokenTransformer)
->load($loader)
->run();

Expand Down

0 comments on commit deaf5b3

Please sign in to comment.