Skip to content

Commit

Permalink
Merge branch '1.x' into integration/symfony-title
Browse files Browse the repository at this point in the history
  • Loading branch information
f-lapinski authored Feb 13, 2025
2 parents ea60e45 + b0e1149 commit d3fc904
Show file tree
Hide file tree
Showing 18 changed files with 424 additions and 40 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## [Unreleased] - 2025-02-11
## [Unreleased] - 2025-02-13

### Added
- [#1475](https://github.com/flow-php/flow/pull/1475) - **Early detection of XML type in dbal bulk** - [@norberttech](https://github.com/norberttech)
- [#1473](https://github.com/flow-php/flow/pull/1473) - **Added IndexOf function with $offset and $ignoreCase parameters and Tests** - [@f-lapinski](https://github.com/f-lapinski)
- [#1470](https://github.com/flow-php/flow/pull/1470) - **ByteString IsUTF8 function with Test** - [@f-lapinski](https://github.com/f-lapinski)
- [#1465](https://github.com/flow-php/flow/pull/1465) - **jsonl writing example** - [@norberttech](https://github.com/norberttech)
- [#1465](https://github.com/flow-php/flow/pull/1465) - **csv writing example** - [@norberttech](https://github.com/norberttech)
- [#1465](https://github.com/flow-php/flow/pull/1465) - **xml writing example** - [@norberttech](https://github.com/norberttech)
Expand Down Expand Up @@ -84,6 +87,8 @@
- [#1244](https://github.com/flow-php/flow/pull/1244) - **Added CLI command to read schema from a file** - [@norberttech](https://github.com/norberttech)

### Changed
- [#1476](https://github.com/flow-php/flow/pull/1476) - **Allow to pass single type to is_type() method** - [@norberttech](https://github.com/norberttech)
- [#1476](https://github.com/flow-php/flow/pull/1476) - **Allow using concat ws on collections** - [@norberttech](https://github.com/norberttech)
- [#1465](https://github.com/flow-php/flow/pull/1465) - **Improved contributing md** - [@norberttech](https://github.com/norberttech)
- [#1462](https://github.com/flow-php/flow/pull/1462) - **Improve error message when schema definition is duplicated** - [@norberttech](https://github.com/norberttech)
- [#1453](https://github.com/flow-php/flow/pull/1453) - **Change StringCamel to StringStyle to using with exisitng Enum StringStyles and get's StringStyle as parameter** - [@f-lapinski](https://github.com/f-lapinski)
Expand Down Expand Up @@ -155,6 +160,9 @@
- [#1240](https://github.com/flow-php/flow/pull/1240) - **Update Homebrew TAP formula: flow-php to version: 0.10.0** - [@norberttech](https://github.com/norberttech)

### Fixed
- [#1475](https://github.com/flow-php/flow/pull/1475) - **Incosistency between XMLEntry::toString and Casting XML's to strings** - [@norberttech](https://github.com/norberttech)
- [#1470](https://github.com/flow-php/flow/pull/1470) - **Add Missing Test for return type of function** - [@f-lapinski](https://github.com/f-lapinski)
- [#1469](https://github.com/flow-php/flow/pull/1469) - **JSONLines Loader would occasionally write a newline to the start of the file.** - [@jmortlock](https://github.com/jmortlock)
- [#1457](https://github.com/flow-php/flow/pull/1457) - **missing entry types to JSON/CSV entry normalizers** - [@norberttech](https://github.com/norberttech)
- [d82381](https://github.com/flow-php/flow/commit/d823813af9be12d5cc21d4b8e2a01171ce0e0c90) - **failing example test** - [@norberttech](https://github.com/norberttech)
- [#1448](https://github.com/flow-php/flow/pull/1448) - **ParquetOutput didn't implement Output interface** - [@radozato](https://github.com/radozato)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,96 @@ public function test_inserts_new_rows_or_updates_already_existed_based_on_primar
);
}

public function test_inserts_xml_element_entry() : void
{
$this->pgsqlDatabaseContext->createTable((new Table(
$table = 'flow_doctrine_bulk_test',
[
new Column('id', Type::getType(Types::INTEGER), ['notnull' => true]),
new Column('name', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
new Column('description', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
],
))
->setPrimaryKey(['id']));

$loader = to_dbal_table_insert($this->connectionParams(), $table);

$documentA = new \DOMDocument();
$documentA->loadXml('<xml>Description One</xml>');

$documentB = new \DOMDocument();
$documentB->loadXml('<xml>Description Two</xml>');

$documentC = new \DOMDocument();
$documentC->loadXml('<xml>Description Three</xml>');

(data_frame())
->read(
from_array([
['id' => 1, 'name' => 'Name One', 'description' => $documentA->getElementsByTagName('xml')[0]],
['id' => 2, 'name' => 'Name Two', 'description' => $documentB->getElementsByTagName('xml')[0]],
['id' => 3, 'name' => 'Name Three', 'description' => $documentC->getElementsByTagName('xml')[0]],
]),
)
->load($loader)
->run();

self::assertEquals(3, $this->pgsqlDatabaseContext->tableCount($table));
self::assertEquals(
[
['id' => 1, 'name' => 'Name One', 'description' => '<xml>Description One</xml>'],
['id' => 2, 'name' => 'Name Two', 'description' => '<xml>Description Two</xml>'],
['id' => 3, 'name' => 'Name Three', 'description' => '<xml>Description Three</xml>'],
],
$this->pgsqlDatabaseContext->selectAll($table)
);
}

public function test_inserts_xml_entry() : void
{
$this->pgsqlDatabaseContext->createTable((new Table(
$table = 'flow_doctrine_bulk_test',
[
new Column('id', Type::getType(Types::INTEGER), ['notnull' => true]),
new Column('name', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
new Column('description', Type::getType(Types::STRING), ['notnull' => true, 'length' => 255]),
],
))
->setPrimaryKey(['id']));

$loader = to_dbal_table_insert($this->connectionParams(), $table);

$documentA = new \DOMDocument();
$documentA->loadXml('<xml>Description One</xml>');

$documentB = new \DOMDocument();
$documentB->loadXml('<xml>Description Two</xml>');

$documentC = new \DOMDocument();
$documentC->loadXml('<b>Description Three</b>');

(data_frame())
->read(
from_array([
['id' => 1, 'name' => 'Name One', 'description' => $documentA],
['id' => 2, 'name' => 'Name Two', 'description' => $documentB],
['id' => 3, 'name' => 'Name Three', 'description' => $documentC],
]),
)
->load($loader)
->run();

self::assertEquals(3, $this->pgsqlDatabaseContext->tableCount($table));
self::assertEquals(
[
['id' => 1, 'name' => 'Name One', 'description' => '<xml>Description One</xml>'],
['id' => 2, 'name' => 'Name Two', 'description' => '<xml>Description Two</xml>'],
['id' => 3, 'name' => 'Name Three', 'description' => '<b>Description Three</b>'],
],
$this->pgsqlDatabaseContext->selectAll($table)
);
}

public function test_update_multiple_rows_at_once() : void
{
$this->pgsqlDatabaseContext->createTable((new Table(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ final class JsonLinesLoader implements Closure, Loader, Loader\FileLoader

private int $flags = JSON_THROW_ON_ERROR;

/**
* @var array<string, int>
*/
private array $writes = [];

public function __construct(private readonly Path $path)
{
}
Expand Down Expand Up @@ -65,16 +60,7 @@ public function write(Rows $nextRows, array $partitions, FlowContext $context) :
$streams = $context->streams();
$normalizer = new RowsNormalizer(new EntryNormalizer($this->dateTimeFormat));

if (!$streams->isOpen($this->path, $partitions)) {
$stream = $streams->writeTo($this->path, $partitions);

if (!\array_key_exists($stream->path()->path(), $this->writes)) {
$this->writes[$stream->path()->path()] = 0;
}

} else {
$stream = $streams->writeTo($this->path, $partitions);
}
$stream = $streams->writeTo($this->path, $partitions);

$this->writeJSON($nextRows, $stream, $normalizer);
}
Expand Down Expand Up @@ -103,11 +89,7 @@ private function writeJSON(Rows $rows, DestinationStream $stream, RowsNormalizer
throw new RuntimeException('Failed to encode JSON: ' . $e->getMessage(), 0, $e);
}

$json = ($this->writes[$stream->path()->path()] > 0) ? ("\n" . $json) : $json;

$stream->append($json);

$this->writes[$stream->path()->path()]++;
$stream->append($json . "\n");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,34 @@
use function Flow\ETL\DSL\{average, df, from_array, overwrite, ref};
use function Flow\ETL\DSL\{config, flow_context, rows};
use function Flow\Filesystem\DSL\path;

use Flow\ETL\Adapter\JSON\JsonLoader;
use Flow\ETL\Tests\Double\FakeExtractor;
use Flow\ETL\{Tests\FlowTestCase};

final class JsonTest extends FlowTestCase
{
public function test_domdocument_json_file() : void
{
$domDocument = new \DOMDocument();
$domDocument->loadXml('<b>red</b>');

df()
->read(from_array([
['id' => 1, 'descriptionHtml' => $domDocument, 'size' => 'small'],
]))
->saveMode(overwrite())
->write(to_json($path = __DIR__ . '/var/test_domdocument.json'))
->run();

self::assertStringContainsString(
<<<'JSON'
[{"id":1,"descriptionHtml":"<b>red<\/b>","size":"small"}]
JSON,
\file_get_contents($path)
);
}

public function test_json_loader() : void
{
$path = __DIR__ . '/var/test_json_loader.json';
Expand Down
22 changes: 13 additions & 9 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -1550,31 +1550,35 @@ function compare_entries_by_type_and_name(array $priorities = Transformer\OrderE
}

/**
* @param array<string|Type<mixed>> $types
* @param array<string|Type<mixed>>|Type<mixed> $type
* @param mixed $value
*/
#[DocumentationDSL(module: Module::CORE, type: DSLType::DATA_FRAME)]
function is_type(array $types, mixed $value) : bool
function is_type(Type|array $type, mixed $value) : bool
{
foreach ($types as $type) {
if (\is_string($type)) {
if (match (\strtolower($type)) {
if ($type instanceof Type) {
$type = [$type];
}

foreach ($type as $nextType) {
if (\is_string($nextType)) {
if (match (\strtolower($nextType)) {
'str', 'string' => \is_string($value),
'int', 'integer' => \is_int($value),
'float' => \is_float($value),
'null' => null === $value,
'object' => \is_object($value),
'array' => \is_array($value),
'list' => \is_array($value) && \array_is_list($value),
default => match (\class_exists($type) || \enum_exists($type)) {
true => $value instanceof $type,
false => throw new RuntimeException('Unexpected type: ' . $type),
default => match (\class_exists($nextType) || \enum_exists($nextType)) {
true => $value instanceof $nextType,
false => throw new RuntimeException('Unexpected type: ' . $nextType),
},
}) {
return true;
}
} else {
if ($type->isValid($value)) {
if ($nextType->isValid($value)) {
return true;
}
}
Expand Down
15 changes: 11 additions & 4 deletions src/core/etl/src/Flow/ETL/Function/ConcatWithSeparator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\{is_type, type_list, type_string};
use Flow\ETL\PHP\Type\Caster;
use Flow\ETL\Row;

Expand Down Expand Up @@ -34,10 +34,17 @@ public function eval(Row $row) : mixed
$concatValues = [];

foreach ($this->refs as $value) {
$value = \is_string($value) ? $value : Caster::default()->to(type_string(true))->value($value->eval($row));
$value = (new Parameter($value))->eval($row);

if (\is_string($value)) {
$concatValues[] = $value;
if (is_type(type_list(type_string()), $value)) {
/** @var list<string> $value */
$concatValues = \array_merge($concatValues, $value);
} else {
$value = \is_string($value) ? $value : Caster::default()->to(type_string(true))->value($value);

if (\is_string($value)) {
$concatValues[] = $value;
}
}
}

Expand Down
45 changes: 45 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/IndexOf.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\{type_int};
use function Symfony\Component\String\u;
use Flow\ETL\Function\ScalarFunction\TypedScalarFunction;
use Flow\ETL\PHP\Type\Type;
use Flow\ETL\Row;

final class IndexOf extends ScalarFunctionChain implements TypedScalarFunction
{
public function __construct(
private readonly ScalarFunction|string $string,
private readonly ScalarFunction|string $needle,
private readonly ScalarFunction|bool $ignoreCase = false,
private readonly ScalarFunction|int $offset = 0,
) {
}

public function eval(Row $row) : int|false|null
{
$string = (new Parameter($this->string))->asString($row);
$needle = (new Parameter($this->needle))->asString($row);
$offset = (new Parameter($this->offset))->as($row, type_int());
$ignoreCase = (new Parameter($this->ignoreCase))->asBoolean($row);

if ($string === null || $needle === null) {
return false;
}

if ($ignoreCase) {
return u($string)->ignoreCase()->indexOf($needle, $offset);
}

return u($string)->indexOf($needle, $offset);
}

public function returns() : Type
{
return type_int();
}
}
34 changes: 34 additions & 0 deletions src/core/etl/src/Flow/ETL/Function/IsUtf8.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Function;

use function Flow\ETL\DSL\{type_boolean};
use function Symfony\Component\String\{b};
use Flow\ETL\Function\ScalarFunction\TypedScalarFunction;
use Flow\ETL\PHP\Type\Type;
use Flow\ETL\Row;

final class IsUtf8 extends ScalarFunctionChain implements TypedScalarFunction
{
public function __construct(private readonly ScalarFunction|string $string)
{
}

public function eval(Row $row) : mixed
{
$string = (new Parameter($this->string))->asString($row);

if ($string === null) {
return null;
}

return b($string)->isUtf8();
}

public function returns() : Type
{
return type_boolean();
}
}
12 changes: 11 additions & 1 deletion src/core/etl/src/Flow/ETL/Function/ScalarFunctionChain.php
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public function concat(ScalarFunction|string ...$params) : self

public function concatWithSeparator(ScalarFunction|string $separator, ScalarFunction|string ...$params) : self
{
return new ConcatWithSeparator($separator, ...$params);
return new ConcatWithSeparator($separator, $this, ...$params);
}

public function contains(ScalarFunction|string $needle) : self
Expand Down Expand Up @@ -259,6 +259,11 @@ public function hash(Algorithm $algorithm = new NativePHPHash()) : self
return new Hash($this, $algorithm);
}

public function indexOf(ScalarFunction|string $needle, ScalarFunction|bool $ignoreCase = false, ScalarFunction|int $offset = 0) : self
{
return new IndexOf($this, $needle, $ignoreCase, $offset);
}

public function isEven() : self
{
return new Equals(new Mod($this, lit(2)), lit(0));
Expand Down Expand Up @@ -316,6 +321,11 @@ public function isType(string|Type ...$types) : self
return new IsType($this, ...$types);
}

public function isUtf8() : IsUtf8
{
return new IsUtf8($this);
}

public function jsonDecode(ScalarFunction|int $flags = JSON_THROW_ON_ERROR) : self
{
return new JsonDecode($this, $flags);
Expand Down
Loading

0 comments on commit d3fc904

Please sign in to comment.