From 3aed55dc652aad96cdacf920cb4575e8805a0cb2 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Fri, 9 Feb 2024 12:13:35 +0100 Subject: [PATCH] Bypass schema infering when reading from parquet files (#975) * Bypass schema infering when reading from parquet files * Fixed failing benchmarks --- .../ETL/Adapter/Parquet/ParquetExtractor.php | 7 +- .../ETL/Adapter/Parquet/SchemaConverter.php | 101 +++++++++++++ .../Tests/Benchmark/ParquetLoaderBench.php | 6 - .../Parquet/Tests/Integration/ParquetTest.php | 3 +- ...erTest.php => FlowToParquetSchemaTest.php} | 2 +- .../Tests/Unit/ParquetToFlowSchemaTest.php | 137 ++++++++++++++++++ .../src/Flow/Parquet/BinaryReader/Bytes.php | 7 +- .../Parquet/Data/Converter/JsonConverter.php | 39 +++++ .../Parquet/Data/Converter/UuidConverter.php | 39 +++++ .../src/Flow/Parquet/Data/DataConverter.php | 4 + .../ParquetFile/Data/PlainValueUnpacker.php | 2 +- .../ParquetFile/Data/PlainValuesPacker.php | 2 +- .../Parquet/ParquetFile/Schema/FlatColumn.php | 4 +- .../ParquetFile/Schema/LogicalType.php | 5 + .../Integration/IO/SchemaReadingTest.php | 10 +- .../Unit/Data/Converter/UuidConverterTest.php | 22 +++ 16 files changed, 370 insertions(+), 20 deletions(-) rename src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/{SchemaConverterTest.php => FlowToParquetSchemaTest.php} (98%) create mode 100644 src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetToFlowSchemaTest.php create mode 100644 src/lib/parquet/src/Flow/Parquet/Data/Converter/JsonConverter.php create mode 100644 src/lib/parquet/src/Flow/Parquet/Data/Converter/UuidConverter.php create mode 100644 src/lib/parquet/tests/Flow/Parquet/Tests/Unit/Data/Converter/UuidConverterTest.php diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php index 4fdc2fd58..5b38d9da8 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/ParquetExtractor.php @@ -25,6 +25,8 @@ final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtra use Limitable; use PartitionFiltering; + private SchemaConverter $schemaConverter; + /** * @param Path $path * @param array $columns @@ -37,6 +39,7 @@ public function __construct( private readonly ?int $offset = null ) { $this->resetLimit(); + $this->schemaConverter = new SchemaConverter(); if ($this->path->isPattern() && $this->offset !== null) { throw new InvalidArgumentException('Offset can be used only with single file path, not with pattern'); @@ -48,12 +51,14 @@ public function extract(FlowContext $context) : \Generator $shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows(); foreach ($this->readers($context) as $fileData) { + $flowSchema = $this->schemaConverter->fromParquet($fileData['file']->schema()); + foreach ($fileData['file']->values($this->columns, $this->limit(), $this->offset) as $row) { if ($shouldPutInputIntoRows) { $row['_input_file_uri'] = $fileData['uri']; } - $signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions']); + $signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions'], $flowSchema); $this->countRow(); diff --git a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php index dcc156c29..454c51cd1 100644 --- a/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php +++ b/src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/SchemaConverter.php @@ -2,6 +2,14 @@ namespace Flow\ETL\Adapter\Parquet; +use function Flow\ETL\DSL\list_schema; +use function Flow\ETL\DSL\map_schema; +use function Flow\ETL\DSL\struct_schema; +use function Flow\ETL\DSL\struct_type; +use function Flow\ETL\DSL\structure_element; +use function Flow\ETL\DSL\type_list; +use function Flow\ETL\DSL\type_map; +use function Flow\ETL\DSL\type_object; use Flow\ETL\Exception\RuntimeException; use Flow\ETL\PHP\Type\Logical\DateTimeType; use Flow\ETL\PHP\Type\Logical\JsonType; @@ -9,6 +17,7 @@ use Flow\ETL\PHP\Type\Logical\Map\MapKey; use Flow\ETL\PHP\Type\Logical\Map\MapValue; use Flow\ETL\PHP\Type\Logical\MapType; +use Flow\ETL\PHP\Type\Logical\Structure\StructureElement; use Flow\ETL\PHP\Type\Logical\StructureType; use Flow\ETL\PHP\Type\Logical\UuidType; use Flow\ETL\PHP\Type\Logical\XMLNodeType; @@ -26,6 +35,17 @@ final class SchemaConverter { + public function fromParquet(ParquetSchema $schema) : Schema + { + $definitions = []; + + foreach ($schema->columns() as $column) { + $definitions[] = $this->fromParquetColumnToFlowDefinition($column); + } + + return \Flow\ETL\DSL\schema(...$definitions); + } + public function toParquet(Schema $schema) : ParquetSchema { $columns = []; @@ -242,4 +262,85 @@ private function flowTypeToParquetType(string $name, Type $type) : Column throw new RuntimeException($type::class . ' is not supported.'); } + + private function fromParquetColumnToFlowDefinition(Column $column) : Schema\Definition + { + if ($column instanceof FlatColumn) { + return $this->parquetFlatToFlowType($column); + } + + /** @var NestedColumn $column */ + return $this->parquetNestedToFlowType($column); + } + + private function parquetFlatToFlowType(FlatColumn $column) : Schema\Definition + { + $logicalType = $column->logicalType(); + + if ($logicalType === null) { + return match ($column->type()) { + ParquetSchema\PhysicalType::INT32 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\PhysicalType::INT64 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\PhysicalType::BOOLEAN => Schema\Definition::boolean($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\PhysicalType::DOUBLE => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\PhysicalType::FLOAT => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\PhysicalType::BYTE_ARRAY => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + default => throw new RuntimeException($column->type()->name . ' is not supported.') + }; + } + + return match ($logicalType->name()) { + ParquetSchema\LogicalType::STRING => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\LogicalType::DATE => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\LogicalType::TIME => Schema\Definition::object($column->name(), type_object(\DateInterval::class, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)), + ParquetSchema\LogicalType::TIMESTAMP => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\LogicalType::UUID => Schema\Definition::uuid($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\LogicalType::JSON => Schema\Definition::json($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\LogicalType::DECIMAL => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + ParquetSchema\LogicalType::INTEGER => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL), + default => throw new RuntimeException($logicalType->name() . ' is not supported.') + }; + } + + private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definition + { + if ($column->isList()) { + return list_schema( + $column->name(), + type_list( + $this->fromParquetColumnToFlowDefinition($column->getListElement())->type(), + $column->repetition() === ParquetSchema\Repetition::OPTIONAL + ) + ); + } + + if ($column->isMap()) { + $keyType = $this->fromParquetColumnToFlowDefinition($column->getMapKeyColumn())->type(); + + if (!$keyType instanceof ScalarType) { + throw new RuntimeException('Flow expects map key type to be scalar type.'); + } + + return map_schema( + $column->name(), + type_map( + $keyType, + $this->fromParquetColumnToFlowDefinition($column->getMapValueColumn())->type(), + $column->repetition() === ParquetSchema\Repetition::OPTIONAL + ) + ); + } + + /** @var array $elements */ + $elements = []; + + foreach ($column->children() as $structColumn) { + $elements[] = structure_element( + $structColumn->name(), + $this->fromParquetColumnToFlowDefinition($structColumn)->type() + ); + } + + return struct_schema($column->name(), struct_type($elements, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)); + } } diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Benchmark/ParquetLoaderBench.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Benchmark/ParquetLoaderBench.php index b863c8b28..f0ddb0b16 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Benchmark/ParquetLoaderBench.php +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Benchmark/ParquetLoaderBench.php @@ -4,10 +4,8 @@ use function Flow\ETL\Adapter\Parquet\from_parquet; use function Flow\ETL\Adapter\Parquet\to_parquet; -use function Flow\ETL\DSL\str_entry; use Flow\ETL\Config; use Flow\ETL\FlowContext; -use Flow\ETL\Row; use Flow\ETL\Rows; use PhpBench\Attributes\Groups; @@ -27,10 +25,6 @@ public function __construct() $this->rows = new Rows(); foreach (from_parquet(__DIR__ . '/../Fixtures/orders_flow.parquet')->extract($this->context) as $rows) { - $rows = $rows->map(static function (Row $row) : Row { - return $row->set(str_entry('order_id', $row->valueOf('order_id')->toString())); - }); - $this->rows = $this->rows->merge($rows); } } diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php index 59b2a1bec..6e4d9cf1c 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php @@ -84,12 +84,13 @@ public function test_writing_with_partitioning() : void ->run(); $this->assertEquals( - $rows, + $rows->toArray(), (new Flow()) ->read(from_parquet($path . '/**/*.parquet')) ->drop('date') ->sortBy(ref('datetime')->asc()) ->fetch() + ->toArray() ); $this->assertSame( diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/SchemaConverterTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/FlowToParquetSchemaTest.php similarity index 98% rename from src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/SchemaConverterTest.php rename to src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/FlowToParquetSchemaTest.php index e2a16ab8d..c02c4fbb9 100644 --- a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/SchemaConverterTest.php +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/FlowToParquetSchemaTest.php @@ -23,7 +23,7 @@ use Flow\Parquet\ParquetFile\Schema\NestedColumn; use PHPUnit\Framework\TestCase; -final class SchemaConverterTest extends TestCase +final class FlowToParquetSchemaTest extends TestCase { public function test_convert_array_entry_to_parquet_array() : void { diff --git a/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetToFlowSchemaTest.php b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetToFlowSchemaTest.php new file mode 100644 index 000000000..cc988722c --- /dev/null +++ b/src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Unit/ParquetToFlowSchemaTest.php @@ -0,0 +1,137 @@ +fromParquet(Schema::with( + Schema\FlatColumn::int32('int32'), + Schema\FlatColumn::int64('int64'), + Schema\FlatColumn::string('string'), + Schema\FlatColumn::float('float'), + Schema\FlatColumn::double('double'), + Schema\FlatColumn::decimal('decimal'), + Schema\FlatColumn::boolean('boolean'), + Schema\FlatColumn::date('date'), + Schema\FlatColumn::time('time'), + Schema\FlatColumn::dateTime('datetime'), + Schema\FlatColumn::uuid('uuid'), + Schema\FlatColumn::json('json'), + )); + + $this->assertEquals( + \Flow\ETL\DSL\schema( + int_schema('int32', true), + int_schema('int64', true), + str_schema('string', true), + float_schema('float', true), + float_schema('double', true), + float_schema('decimal', true), + bool_schema('boolean', true), + datetime_schema('date', true), + object_schema('time', type_object(\DateInterval::class, true)), + datetime_schema('datetime', true), + uuid_schema('uuid', true), + json_schema('json', true), + ), + $flowSchema + ); + } + + public function test_converting_list_to_flow_schema() : void + { + $converted = new SchemaConverter(); + + $flowSchema = $converted->fromParquet(Schema::with( + Schema\NestedColumn::list('list', Schema\ListElement::string()), + )); + + $this->assertEquals( + \Flow\ETL\DSL\schema( + list_schema('list', type_list(type_string(true), true)) + ), + $flowSchema, + ); + } + + public function test_converting_map_to_flow_schema() : void + { + $converted = new SchemaConverter(); + + $flowSchema = $converted->fromParquet(Schema::with( + Schema\NestedColumn::map('map', MapKey::string(), MapValue::int64()), + )); + + $this->assertEquals( + \Flow\ETL\DSL\schema( + map_schema('map', type_map(type_string(), type_int(true), true)) + ), + $flowSchema, + ); + } + + public function test_converting_struct_to_flow_schema() : void + { + $converted = new SchemaConverter(); + + $flowSchema = $converted->fromParquet(Schema::with( + Schema\NestedColumn::struct( + 'struct', + [ + Schema\FlatColumn::uuid('uuid'), + Schema\FlatColumn::string('name'), + Schema\FlatColumn::boolean('active'), + ] + ), + )); + + $this->assertEquals( + \Flow\ETL\DSL\schema( + struct_schema( + 'struct', + type_structure( + [ + struct_element('uuid', type_uuid(true)), + struct_element('name', type_string(true)), + struct_element('active', type_boolean(true)), + ], + true + ), + ) + ), + $flowSchema, + ); + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php b/src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php index a7186cec9..b39fb70f8 100644 --- a/src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php +++ b/src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php @@ -9,7 +9,7 @@ final class Bytes implements \ArrayAccess, \Countable, \IteratorAggregate { - private readonly \ArrayIterator $iterator; + private \ArrayIterator|null $iterator = null; private readonly DataSize $size; @@ -18,7 +18,6 @@ public function __construct( private readonly ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN ) { $this->size = new DataSize(\count($this->bytes) * 8); - $this->iterator = new \ArrayIterator($this->bytes); } public static function fromString(string $string, ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN) : self @@ -36,6 +35,10 @@ public function count() : int // IteratorAggregate methods public function getIterator() : \ArrayIterator { + if ($this->iterator === null) { + $this->iterator = new \ArrayIterator($this->bytes); + } + return $this->iterator; } diff --git a/src/lib/parquet/src/Flow/Parquet/Data/Converter/JsonConverter.php b/src/lib/parquet/src/Flow/Parquet/Data/Converter/JsonConverter.php new file mode 100644 index 000000000..37e6cbd5a --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/Data/Converter/JsonConverter.php @@ -0,0 +1,39 @@ +logicalType()?->name() === LogicalType::JSON) { + return true; + } + + return false; + } + + public function toParquetType(mixed $data) : string + { + if (!\is_string($data)) { + throw new RuntimeException('Json must be written as a string from Parquet file'); + } + + return $data; + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/Data/Converter/UuidConverter.php b/src/lib/parquet/src/Flow/Parquet/Data/Converter/UuidConverter.php new file mode 100644 index 000000000..2784c3946 --- /dev/null +++ b/src/lib/parquet/src/Flow/Parquet/Data/Converter/UuidConverter.php @@ -0,0 +1,39 @@ +logicalType()?->name() === LogicalType::UUID) { + return true; + } + + return false; + } + + public function toParquetType(mixed $data) : string + { + if (!\is_string($data)) { + throw new RuntimeException('UUID must be written as a string from Parquet file'); + } + + return $data; + } +} diff --git a/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php b/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php index 4cf893a43..333c413eb 100644 --- a/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php +++ b/src/lib/parquet/src/Flow/Parquet/Data/DataConverter.php @@ -7,7 +7,9 @@ use Flow\Parquet\Data\Converter\Int32DateTimeConverter; use Flow\Parquet\Data\Converter\Int64DateTimeConverter; use Flow\Parquet\Data\Converter\Int96DateTimeConverter; +use Flow\Parquet\Data\Converter\JsonConverter; use Flow\Parquet\Data\Converter\TimeConverter; +use Flow\Parquet\Data\Converter\UuidConverter; use Flow\Parquet\Exception\DataConversionException; use Flow\Parquet\Options; use Flow\Parquet\ParquetFile\Schema\FlatColumn; @@ -37,6 +39,8 @@ public static function initialize(Options $options) : self new Int64DateTimeConverter(), new Int96DateTimeConverter(), new BytesStringConverter(), + new UuidConverter(), + new JsonConverter(), ], $options ); diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php index 122565fe3..6bfdadaa2 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValueUnpacker.php @@ -29,7 +29,7 @@ public function unpack(FlatColumn $column, int $total) : array PhysicalType::FLOAT => $this->reader->readFloats($total), PhysicalType::DOUBLE => $this->reader->readDoubles($total), PhysicalType::BYTE_ARRAY => match ($column->logicalType()?->name()) { - LogicalType::STRING => $this->reader->readStrings($total), + LogicalType::STRING, LogicalType::JSON, LogicalType::UUID => $this->reader->readStrings($total), default => $this->reader->readByteArrays($total) }, PhysicalType::FIXED_LEN_BYTE_ARRAY => match ($column->logicalType()?->name()) { diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValuesPacker.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValuesPacker.php index e5cacb73b..abc7b1686 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValuesPacker.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/PlainValuesPacker.php @@ -87,8 +87,8 @@ public function packValues(FlatColumn $column, array $values) : void break; case PhysicalType::BYTE_ARRAY: switch ($column->logicalType()?->name()) { - case LogicalType::JSON: case LogicalType::UUID: + case LogicalType::JSON: case LogicalType::STRING: $this->writer->writeStrings($parquetValues); diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php index baa65784e..f84587b2a 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php @@ -118,7 +118,7 @@ public static function int64(string $name, Repetition $repetition = Repetition:: public static function json(string $string, Repetition $repetition = Repetition::OPTIONAL) : self { - return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::JSON, LogicalType::string(), $repetition); + return new self($string, PhysicalType::BYTE_ARRAY, ConvertedType::JSON, LogicalType::json(), $repetition); } public static function string(string $name, Repetition $repetition = Repetition::OPTIONAL) : self @@ -137,7 +137,7 @@ public static function time(string $name, Repetition $repetition = Repetition::O public static function uuid(string $string, Repetition $repetition = Repetition::OPTIONAL) : self { - return new self($string, PhysicalType::BYTE_ARRAY, null, LogicalType::string(), $repetition); + return new self($string, PhysicalType::BYTE_ARRAY, null, LogicalType::uuid(), $repetition); } /** diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php index 1af887f55..ab0690ea7 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php @@ -176,6 +176,11 @@ public static function unknown() : self return new self(self::UNKNOWN); } + public static function uuid() : self + { + return new self(self::UUID); + } + public function decimalData() : ?Decimal { return $this->decimal; diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SchemaReadingTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SchemaReadingTest.php index 025ea5bac..38dfdd268 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SchemaReadingTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Integration/IO/SchemaReadingTest.php @@ -93,16 +93,16 @@ public function test_reading_primitives_schema_ddl() : void FlatColumn::boolean('bool_nullable'), FlatColumn::string('string'), FlatColumn::string('string_nullable'), - FlatColumn::json('json'), - FlatColumn::json('json_nullable'), + FlatColumn::string('json'), // files generated by python are storing json as strings + FlatColumn::string('json_nullable'), FlatColumn::date('date'), FlatColumn::date('date_nullable'), FlatColumn::dateTime('timestamp'), FlatColumn::dateTime('timestamp_nullable'), FlatColumn::time('time'), FlatColumn::time('time_nullable'), - FlatColumn::uuid('uuid'), - FlatColumn::uuid('uuid_nullable'), + FlatColumn::string('uuid'), // files generated by python are storing uuid as strings + FlatColumn::string('uuid_nullable'), FlatColumn::enum('enum'), FlatColumn::enum('enum_nullable'), FlatColumn::float('float'), @@ -218,7 +218,7 @@ public function test_reading_structs_schema_ddl() : void FlatColumn::float('float'), NestedColumn::struct('struct_4', [ FlatColumn::string('string'), - FlatColumn::json('json'), + FlatColumn::string('json'), // files generated by python are storing json as strings ]), ]), ]), diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Unit/Data/Converter/UuidConverterTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Unit/Data/Converter/UuidConverterTest.php new file mode 100644 index 000000000..057b86104 --- /dev/null +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Unit/Data/Converter/UuidConverterTest.php @@ -0,0 +1,22 @@ +toString(); + + $converter = new UuidConverter(); + + $this->assertEquals( + $uuid, + $converter->fromParquetType($converter->toParquetType($uuid)) + ); + } +}