Skip to content

Commit

Permalink
Added missing logical types (#926)
Browse files Browse the repository at this point in the history
* Added missing logical types

* CS fixes

* Fixed typo in the code
  • Loading branch information
norberttech authored Jan 19, 2024
1 parent c20a191 commit 51d33a7
Show file tree
Hide file tree
Showing 57 changed files with 970 additions and 334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
use Flow\ETL\FlowContext;
use Flow\ETL\Loader;
use Flow\ETL\Loader\Closure;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\PHP\Type\Logical\UuidType;
use Flow\ETL\Row;
use Flow\ETL\Row\Schema;
use Flow\ETL\Rows;
Expand Down Expand Up @@ -78,28 +79,26 @@ private function listEntryToValues(Row\Entry\ListEntry $entry) : array
$listType = $entry->definition()->metadata()->get(Schema\FlowMetadata::METADATA_LIST_ENTRY_TYPE);
$listElement = $listType->element();

if ($listElement->type() instanceof ObjectType) {
if (\is_a($listElement->type()->class, Row\Entry\Type\Uuid::class, true)) {
/** @var array<string> $data */
$data = [];
if ($listElement->type() instanceof UuidType) {
/** @var array<string> $data */
$data = [];

foreach ($entry->value() as $value) {
$data[] = $value->toString();
}

return $data;
foreach ($entry->value() as $value) {
$data[] = $value->toString();
}

if (\is_a($listElement->type()->class, \DateTimeInterface::class, true)) {
/** @var array<int> $data */
$data = [];
return $data;
}

foreach ($entry->value() as $value) {
$data[] = (int) $value->format('Uu');
}
if ($listElement->type() instanceof DateTimeType) {
/** @var array<int> $data */
$data = [];

return $data;
foreach ($entry->value() as $value) {
$data[] = (int) $value->format('Uu');
}

return $data;
}

return $entry->value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
namespace Flow\ETL\Adapter\Avro\FlixTech;

use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\MapType;
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
use Flow\ETL\PHP\Type\Logical\StructureType;
use Flow\ETL\PHP\Type\Logical\UuidType;
use Flow\ETL\PHP\Type\Native\ArrayType;
use Flow\ETL\PHP\Type\Native\ObjectType;
use Flow\ETL\PHP\Type\Native\ScalarType;
Expand Down Expand Up @@ -61,10 +63,8 @@ private function convert(Definition $definition) : array
};
}

if ($listElement->type() instanceof ObjectType) {
if (\is_a($listElement->type()->class, \DateTimeInterface::class, true)) {
return ['name' => $definition->entry()->name(), 'type' => ['type' => 'array', 'items' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros']];
}
if ($listElement->type() instanceof DateTimeType) {
return ['name' => $definition->entry()->name(), 'type' => ['type' => 'array', 'items' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros']];
}

throw new RuntimeException("List of {$listElement->toString()} is not supported yet supported.");
Expand Down Expand Up @@ -118,7 +118,7 @@ private function convert(Definition $definition) : array
}

$avroType = match ($type) {
Entry\StringEntry::class, Entry\JsonEntry::class, Entry\UuidEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
Entry\StringEntry::class, Entry\JsonEntry::class, Entry\UuidEntry::class, Entry\XMLEntry::class, Entry\XMLNodeEntry::class => ['name' => $definition->entry()->name(), 'type' => \AvroSchema::STRING_TYPE],
Entry\EnumEntry::class => [
'name' => $definition->entry()->name(),
'type' => [
Expand Down Expand Up @@ -172,15 +172,15 @@ private function structureElementToArvo(StructureElement $element) : array
throw new RuntimeException("ArrayEntry entry can't be saved in Avro file, try convert it to ListEntry, MapEntry or StructEntry");
}

if ($elementType instanceof ObjectType) {
if (\in_array($elementType->class, [\DateTimeImmutable::class, \DateTimeInterface::class, \DateTime::class], true)) {
return ['name' => $element->name(), 'type' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros'];
}
if ($elementType instanceof DateTimeType) {
return ['name' => $element->name(), 'type' => 'long', \AvroSchema::LOGICAL_TYPE_ATTR => 'timestamp-micros'];
}

if ($elementType->class === Entry\Type\Uuid::class) {
return ['name' => $element->name(), 'type' => \AvroSchema::STRING_TYPE];
}
if ($elementType instanceof UuidType) {
return ['name' => $element->name(), 'type' => \AvroSchema::STRING_TYPE];
}

if ($elementType instanceof ObjectType) {
throw new RuntimeException($elementType->class . ' is not supported.');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
use function Flow\ETL\DSL\struct_element;
use function Flow\ETL\DSL\struct_entry;
use function Flow\ETL\DSL\struct_type;
use function Flow\ETL\DSL\type_datetime;
use function Flow\ETL\DSL\type_float;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_object;
use function Flow\ETL\DSL\type_string;
use Flow\ETL\Adapter\Avro\FlixTech\AvroExtractor;
use Flow\ETL\Config;
Expand Down Expand Up @@ -77,7 +77,7 @@ public function test_safe_writing_and_reading_avro_with_all_supported_types() :
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class)))
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime()))
);
}, \range(1, 100))
)
Expand Down Expand Up @@ -155,7 +155,7 @@ public function test_writing_and_reading_avro_with_all_supported_types() : void
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class))),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime())),
struct_entry(
'address',
[
Expand All @@ -165,19 +165,19 @@ public function test_writing_and_reading_avro_with_all_supported_types() : void
'country' => 'country_' . $i,
'location' => ['lat' => 1.5, 'lon' => 1.5],
],
struct_type(
struct_type([
struct_element('street', type_string()),
struct_element('city', type_string()),
struct_element('zip', type_string()),
struct_element('country', type_string()),
struct_element(
'location',
struct_type(
struct_type([
struct_element('lat', type_float()),
struct_element('lon', type_float()),
)
)
),
])
),
]),
),
);
}, \range(1, 100))
Expand Down Expand Up @@ -219,7 +219,7 @@ public function test_writing_twice_to_the_same_location() : void
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class)))
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime()))
);
}, \range(1, 100))
)
Expand All @@ -240,7 +240,7 @@ public function test_writing_twice_to_the_same_location() : void
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class)))
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime()))
);
}, \range(1, 100))
)
Expand Down Expand Up @@ -277,7 +277,7 @@ public function test_writing_twice_to_the_same_location_with_ignore_mode() : voi
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class)))
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime()))
);
}, \range(1, 100))
)
Expand All @@ -298,7 +298,7 @@ public function test_writing_twice_to_the_same_location_with_ignore_mode() : voi
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class)))
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime()))
);
}, \range(1, 100))
)
Expand Down Expand Up @@ -336,7 +336,7 @@ public function test_writing_twice_to_the_same_location_with_overwrite_mode() :
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class)))
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime()))
);
}, \range(1, 100))
)
Expand All @@ -357,7 +357,7 @@ public function test_writing_twice_to_the_same_location_with_overwrite_mode() :
json_object_entry('json_object', ['id' => 1, 'name' => 'test']),
json_entry('json', [['id' => 1, 'name' => 'test'], ['id' => 2, 'name' => 'test']]),
list_entry('list_of_strings', ['a', 'b', 'c'], type_list(type_string())),
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_object(\DateTimeImmutable::class)))
list_entry('list_of_datetimes', [new \DateTimeImmutable(), new \DateTimeImmutable(), new \DateTimeImmutable()], type_list(type_datetime()))
);
}, \range(1, 100))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public function test_convert_etl_entries_to_avro_json() : void
Schema\Definition::dateTime('datetime'),
Schema\Definition::json('json'),
Schema\Definition::list('list', new ListType(ListElement::string())),
Schema\Definition::structure('structure', new StructureType(new StructureElement('a', type_string()))),
Schema\Definition::structure('structure', new StructureType([new StructureElement('a', type_string())])),
Schema\Definition::map('map', new MapType(MapKey::string(), MapValue::integer()))
))
);
Expand Down
Loading

0 comments on commit 51d33a7

Please sign in to comment.