Skip to content

Commit

Permalink
Improved schema manipulation (#991)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Feb 15, 2024
1 parent 38b2bda commit ce6acd6
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Exception;

final class SchemaDefinitionNotUniqueException extends InvalidArgumentException
{
}
127 changes: 111 additions & 16 deletions src/core/etl/src/Flow/ETL/Row/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,19 @@

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\SchemaDefinitionNotFoundException;
use Flow\ETL\Exception\SchemaDefinitionNotUniqueException;
use Flow\ETL\Row\Schema\Definition;

final class Schema implements \Countable
{
/**
* @var array<string, Definition>
*/
private readonly array $definitions;
private array $definitions;

public function __construct(Definition ...$definitions)
{
$uniqueDefinitions = [];

foreach ($definitions as $definition) {
$uniqueDefinitions[$definition->entry()->name()] = $definition;
}

if (\count($uniqueDefinitions) !== \count($definitions)) {
throw new InvalidArgumentException(\sprintf('Entry definitions must be unique, given: [%s]', \implode(', ', \array_map(fn (Definition $d) => $d->entry()->name(), $definitions))));
}

$this->definitions = $uniqueDefinitions;
$this->setDefinitions(...$definitions);
}

public static function fromArray(array $definitions) : self
Expand All @@ -45,6 +36,13 @@ public static function fromArray(array $definitions) : self
return new self(...$schema);
}

public function add(Definition ...$definitions) : self
{
$this->setDefinitions(...\array_merge(\array_values($this->definitions), $definitions));

return $this;
}

public function count() : int
{
return \count($this->definitions);
Expand Down Expand Up @@ -97,6 +95,26 @@ public function getDefinition(string|Reference $ref) : Definition
return $this->findDefinition($ref) ?: throw new SchemaDefinitionNotFoundException((string) $ref);
}

/**
* Gracefully remove entries from schema without throwing an exception if entry does not exist.
*/
public function gracefulRemove(string|Reference ...$entries) : self
{
$refs = References::init(...$entries);

$definitions = [];

foreach ($this->definitions as $definition) {
if (!$refs->has($definition->entry())) {
$definitions[] = $definition;
}
}

$this->setDefinitions(...$definitions);

return $this;
}

public function merge(self $schema) : self
{
$newDefinitions = $this->definitions;
Expand All @@ -123,7 +141,9 @@ public function merge(self $schema) : self
}
}

return new self(...\array_values($newDefinitions));
$this->setDefinitions(...\array_values($newDefinitions));

return $this;
}

public function normalize() : array
Expand All @@ -149,21 +169,96 @@ public function nullable() : self
}
}

return new self(...$definitions);
$this->setDefinitions(...$definitions);

return $this;
}

public function without(string|Reference ...$entries) : self
public function remove(string|Reference ...$entries) : self
{
$refs = References::init(...$entries);

$definitions = [];

foreach ($entries as $entry) {
if (!$this->findDefinition($entry)) {
throw new SchemaDefinitionNotFoundException((string) $entry);
}
}

foreach ($this->definitions as $definition) {
if (!$refs->has($definition->entry())) {
$definitions[] = $definition;
}
}

return new self(...$definitions);
$this->setDefinitions(...$definitions);

return $this;
}

public function rename(string|Reference $entry, string $newName) : self
{
$definitions = [];

if (!$this->findDefinition($entry)) {
throw new SchemaDefinitionNotFoundException((string) $entry);
}

foreach ($this->definitions as $nextDefinition) {
if ($nextDefinition->entry()->is(EntryReference::init($entry))) {
$definitions[] = $nextDefinition->rename($newName);
} else {
$definitions[] = $nextDefinition;
}
}

$this->setDefinitions(...$definitions);

return $this;
}

public function replace(string|Reference $entry, Definition $definition) : self
{
$definitions = [];

if (!$this->findDefinition($entry)) {
throw new SchemaDefinitionNotFoundException((string) $entry);
}

foreach ($this->definitions as $nextDefinition) {
if ($nextDefinition->entry()->is(EntryReference::init($entry))) {
$definitions[] = $definition;
} else {
$definitions[] = $nextDefinition;
}
}

$this->setDefinitions(...$definitions);

return $this;
}

/**
* @deprecated Use `remove` instead
*/
public function without(string|Reference ...$entries) : self
{
return $this->remove(...$entries);
}

private function setDefinitions(Definition ...$definitions) : void
{
$uniqueDefinitions = [];

foreach ($definitions as $definition) {
$uniqueDefinitions[$definition->entry()->name()] = $definition;
}

if (\count($uniqueDefinitions) !== \count($definitions)) {
throw new SchemaDefinitionNotUniqueException(\sprintf('Entry definitions must be unique, given: [%s]', \implode(', ', \array_map(fn (Definition $d) => $d->entry()->name(), $definitions))));
}

$this->definitions = $uniqueDefinitions;
}
}
10 changes: 10 additions & 0 deletions src/core/etl/src/Flow/ETL/Row/Schema/Definition.php
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,16 @@ public function nullable() : self
return new self($this->ref, $this->entryClass, $this->type->makeNullable(true), $this->metadata);
}

public function rename(string $newName) : self
{
return new self(
$newName,
$this->entryClass,
$this->type,
$this->metadata
);
}

public function type() : Type
{
return $this->type;
Expand Down
4 changes: 2 additions & 2 deletions src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public function joinLeft(self $right, Expression $expression) : self
*/
$joined = [];

$rightSchema = $right->schema()->without(...$expression->right());
$rightSchema = $right->schema()->gracefulRemove(...$expression->right());

foreach ($this->rows as $leftRow) {
/** @var ?Row $joinedRow */
Expand Down Expand Up @@ -436,7 +436,7 @@ public function joinRight(self $right, Expression $expression) : self
*/
$joined = [];

$leftSchema = $this->schema()->without(...$expression->left());
$leftSchema = $this->schema()->gracefulRemove(...$expression->left());

foreach ($right->rows as $rightRow) {
/** @var ?Row $joinedRow */
Expand Down
128 changes: 121 additions & 7 deletions src/core/etl/tests/Flow/ETL/Tests/Unit/Row/SchemaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL\Tests\Unit\Row;

use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\json_schema;
use function Flow\ETL\DSL\list_schema;
Expand All @@ -21,15 +22,44 @@
use function Flow\ETL\DSL\type_structure;
use function Flow\ETL\DSL\uuid_schema;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Exception\SchemaDefinitionNotFoundException;
use Flow\ETL\Exception\SchemaDefinitionNotUniqueException;
use Flow\ETL\Row\EntryReference;
use Flow\ETL\Row\Schema;
use PHPUnit\Framework\TestCase;

final class SchemaTest extends TestCase
{
public function test_adding_duplicated_definitions() : void
{
$this->expectException(SchemaDefinitionNotUniqueException::class);
schema(
int_schema('id'),
str_schema('str', true),
)->add(int_schema('str'));
}

public function test_adding_new_definitions() : void
{
$schema = schema(
int_schema('id'),
str_schema('str', true),
)->add(int_schema('number'), bool_schema('bool'));

$this->assertEquals(
schema(
int_schema('id'),
str_schema('str', true),
int_schema('number'),
bool_schema('bool'),
),
$schema
);
}

public function test_allowing_only_unique_definitions() : void
{
$this->expectException(InvalidArgumentException::class);
$this->expectException(SchemaDefinitionNotUniqueException::class);

new Schema(
Schema\Definition::integer('id'),
Expand Down Expand Up @@ -71,6 +101,21 @@ public function test_creating_schema_from_invalid_json_format_at_definition_leve
schema_from_json('[{"ref": "id", "type": "test", "metadata": []}]');
}

public function test_graceful_remove_non_existing_definition() : void
{

$this->assertEquals(
schema(
int_schema('id'),
str_schema('name'),
),
schema(
int_schema('id'),
str_schema('name'),
)->gracefulRemove('not-existing')
);
}

public function test_making_whole_schema_nullable() : void
{
$schema = new Schema(
Expand Down Expand Up @@ -159,18 +204,87 @@ public function test_normalizing_and_recreating_schema() : void
);
}

public function test_remove_non_existing_definition() : void
{
$this->expectException(SchemaDefinitionNotFoundException::class);

schema(
int_schema('id'),
str_schema('name'),
)->remove('not-existing');
}

public function test_removing_elements_from_schema() : void
{
$schema = new Schema(
Schema\Definition::integer('id'),
Schema\Definition::string('name'),
$this->assertEquals(
schema(
int_schema('id'),
),
schema(
int_schema('id'),
str_schema('name'),
)->without('name')
);
$this->assertEquals(
schema(
int_schema('id'),
),
schema(
int_schema('id'),
str_schema('name'),
)->remove('name')
);
}

public function test_rename() : void
{
$schema = schema(
int_schema('id'),
str_schema('name'),
);

$this->assertEquals(
new Schema(
Schema\Definition::integer('id'),
schema(
int_schema('id'),
str_schema('new_name'),
),
$schema->rename('name', 'new_name')
);
}

public function test_rename_non_existing() : void
{
$this->expectException(SchemaDefinitionNotFoundException::class);

schema(
int_schema('id'),
str_schema('name'),
)->rename('not-existing', 'new_name');
}

public function test_replace_non_existing_reference() : void
{
$this->expectException(SchemaDefinitionNotFoundException::class);

schema(
int_schema('id'),
str_schema('str', true),
)->replace('not-existing', int_schema('number'));
}

public function test_replace_reference() : void
{
$schema = schema(
int_schema('id'),
str_schema('str', true),
)->replace('str', int_schema('number'));

$this->assertEquals(
schema(
int_schema('id'),
int_schema('number'),
),
$schema->without('name', 'tags')
$schema
);
}

Expand Down

0 comments on commit ce6acd6

Please sign in to comment.