Skip to content

Commit

Permalink
Added transformers to serialize/unserialize entire row into/from entry (
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Jan 12, 2025
1 parent 79cdd40 commit 6b3bbe1
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 2 deletions.
27 changes: 27 additions & 0 deletions src/core/etl/src/Flow/ETL/Transformer/SerializeTransformer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Transformer;

use function Flow\ETL\DSL\{ref, row, str_entry};
use Flow\ETL\Row\Reference;
use Flow\ETL\{FlowContext, Row, Rows, Transformer};

final readonly class SerializeTransformer implements Transformer
{
public function __construct(private Reference|string $target, private bool $standalone = false)
{
}

public function transform(Rows $rows, FlowContext $context) : Rows
{
$target = $this->target instanceof Reference ? $this->target : ref($this->target);

return $rows->map(
fn (Row $row) => $this->standalone
? row(str_entry($target->name(), $context->config->serializer()->serialize($row)))
: $row->add(str_entry($target->name(), $context->config->serializer()->serialize($row)))
);
}
}
49 changes: 49 additions & 0 deletions src/core/etl/src/Flow/ETL/Transformer/UnserializeTransformer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Transformer;

use function Flow\ETL\DSL\ref;
use Flow\ETL\Row\Reference;
use Flow\ETL\{FlowContext, Row, Rows, Transformer};
use Flow\Serializer\Exception\SerializationException;

final readonly class UnserializeTransformer implements Transformer
{
/**
* @param Reference|string $source
* @param bool $merge
* @param string $mergePrefix - used only when merge is set to true
*/
public function __construct(private Reference|string $source, private bool $merge = true, private string $mergePrefix = '')
{
}

public function transform(Rows $rows, FlowContext $context) : Rows
{
$source = $this->source instanceof Reference ? $this->source : ref($this->source);

return $rows->map(
function (Row $row) use ($source, $context) : Row {
if (!$row->has($source->name())) {
return $row;
}

$serialized = $row->valueOf($source->name());

if (!\is_string($serialized)) {
return $row;
}

try {
return $this->merge
? $row->merge($context->config->serializer()->unserialize($serialized, [Row::class]), $this->mergePrefix)
: $context->config->serializer()->unserialize($serialized, [Row::class]);
} catch (SerializationException) {
return $row;
}
}
);
}
}
11 changes: 9 additions & 2 deletions src/core/etl/src/Flow/Serializer/Base64Serializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Flow\Serializer;

use Flow\Serializer\Exception\SerializationException;

final readonly class Base64Serializer implements Serializer
{
public function __construct(private Serializer $serializer)
Expand All @@ -17,7 +19,12 @@ public function serialize(object $serializable) : string

public function unserialize(string $serialized, array $classes) : object
{
/** @phpstan-ignore-next-line */
return $this->serializer->unserialize(\base64_decode($serialized, true), $classes);
$decodedString = \base64_decode($serialized, true);

if ($decodedString === false) {
throw new SerializationException('Base64Serializer::unserialize failed to decode string');
}

return $this->serializer->unserialize($decodedString, $classes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Flow\Serializer\Exception;

use Flow\ETL\Exception\Exception;

final class SerializationException extends Exception
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Unit\Transformer;

use function Flow\ETL\DSL\{bool_entry, flow_context, int_entry, list_entry, row, rows, str_entry, type_list, type_string};
use Flow\ETL\Tests\FlowTestCase;
use Flow\ETL\Transformer\SerializeTransformer;
use Flow\Serializer\{Base64Serializer, NativePHPSerializer};

final class SerializeTransformerTest extends FlowTestCase
{
public function test_serializing_empty_row_under_one_entry() : void
{
$rows = rows(
$row1 = row(),
);

$transformer = new SerializeTransformer('serialized');
$transformedRows = $transformer->transform($rows, flow_context());

self::assertEquals(
[
[
'serialized' => (new Base64Serializer(new NativePHPSerializer()))->serialize($row1),
],
],
$transformedRows->toArray(),
);
}

public function test_serializing_row_under_one_entry() : void
{
$rows = rows(
$row1 = row(
int_entry('id', 1),
str_entry('name', 'John'),
bool_entry('active', true),
list_entry('tags', ['tag1', 'tag2'], type_list(type_string())),
),
$row2 = row(
int_entry('id', 2),
str_entry('name', 'Jane'),
bool_entry('active', false),
list_entry('tags', ['tag3', 'tag4'], type_list(type_string())),
),
);

$transformer = new SerializeTransformer('serialized');

$transformedRows = $transformer->transform($rows, flow_context());

self::assertEquals(
[
[
'id' => 1,
'name' => 'John',
'active' => true,
'tags' => ['tag1', 'tag2'],
'serialized' => (new Base64Serializer(new NativePHPSerializer()))->serialize($row1),
],
[
'id' => 2,
'name' => 'Jane',
'active' => false,
'tags' => ['tag3', 'tag4'],
'serialized' => (new Base64Serializer(new NativePHPSerializer()))->serialize($row2),
],
],
$transformedRows->toArray(),
);
}

public function test_serializing_row_under_standalone_entry() : void
{
$rows = rows(
$row1 = row(
int_entry('id', 1),
str_entry('name', 'John'),
bool_entry('active', true),
list_entry('tags', ['tag1', 'tag2'], type_list(type_string())),
),
$row2 = row(
int_entry('id', 2),
str_entry('name', 'Jane'),
bool_entry('active', false),
list_entry('tags', ['tag3', 'tag4'], type_list(type_string())),
),
);

$transformer = new SerializeTransformer('serialized', true);

$transformedRows = $transformer->transform($rows, flow_context());

self::assertEquals(
[
[
'serialized' => (new Base64Serializer(new NativePHPSerializer()))->serialize($row1),
],
[
'serialized' => (new Base64Serializer(new NativePHPSerializer()))->serialize($row2),
],
],
$transformedRows->toArray(),
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Unit\Transformer;

use function Flow\ETL\DSL\{bool_entry, flow_context, int_entry, list_entry, row, rows, str_entry, type_list, type_string};
use Flow\ETL\Tests\FlowTestCase;
use Flow\ETL\Transformer\UnserializeTransformer;
use Flow\Serializer\{Base64Serializer, NativePHPSerializer};

final class UnserializeTransformerTest extends FlowTestCase
{
public function test_unserializing_row_from_entry() : void
{
$row1 = row(
int_entry('id', 1),
str_entry('name', 'John'),
bool_entry('active', true),
list_entry('tags', ['tag1', 'tag2'], type_list(type_string())),
);
$row2 = row(
int_entry('id', 2),
str_entry('name', 'Jane'),
bool_entry('active', false),
list_entry('tags', ['tag3', 'tag4'], type_list(type_string())),
);

$rows = rows(
row(str_entry('serialized', (new Base64Serializer(new NativePHPSerializer()))->serialize($row1))),
row(str_entry('serialized', (new Base64Serializer(new NativePHPSerializer()))->serialize($row2))),
);

$transformer = new UnserializeTransformer('serialized');

$transformedRows = $transformer->transform($rows, flow_context());

self::assertEquals(
[
[
'serialized' => (new Base64Serializer(new NativePHPSerializer()))->serialize($row1),
'id' => 1,
'name' => 'John',
'active' => true,
'tags' => ['tag1', 'tag2'],
],
[
'serialized' => (new Base64Serializer(new NativePHPSerializer()))->serialize($row2),
'id' => 2,
'name' => 'Jane',
'active' => false,
'tags' => ['tag3', 'tag4'],
],
],
$transformedRows->toArray(),
);
}

public function test_unserializing_something_that_is_not_serialized_row() : void
{
$rows = rows(
row(str_entry('serialized', 'not-serialized')),
);

$transformer = new UnserializeTransformer('serialized');

$transformedRows = $transformer->transform($rows, flow_context());

self::assertEquals($rows, $transformedRows);
}

public function test_unserializing_without_merge() : void
{
$row1 = row(
int_entry('id', 1),
str_entry('name', 'John'),
bool_entry('active', true),
list_entry('tags', ['tag1', 'tag2'], type_list(type_string())),
);
$row2 = row(
int_entry('id', 2),
str_entry('name', 'Jane'),
bool_entry('active', false),
list_entry('tags', ['tag3', 'tag4'], type_list(type_string())),
);

$rows = rows(
row(str_entry('serialized', (new Base64Serializer(new NativePHPSerializer()))->serialize($row1))),
row(str_entry('serialized', (new Base64Serializer(new NativePHPSerializer()))->serialize($row2))),
);

$transformer = new UnserializeTransformer('serialized', false);

$transformedRows = $transformer->transform($rows, flow_context());

self::assertEquals(
[
[
'id' => 1,
'name' => 'John',
'active' => true,
'tags' => ['tag1', 'tag2'],
],
[
'id' => 2,
'name' => 'Jane',
'active' => false,
'tags' => ['tag3', 'tag4'],
],
],
$transformedRows->toArray(),
);
}
}

0 comments on commit 6b3bbe1

Please sign in to comment.