From 2253b9162adc5b0d063a8c32b6953e88d9d7c407 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz <1921950+norberttech@users.noreply.github.com> Date: Fri, 19 Jan 2024 12:08:17 +0100 Subject: [PATCH] Added DataFrame::schema method to just read schema from dataset (#925) --- src/core/etl/src/Flow/ETL/DSL/functions.php | 8 ++ src/core/etl/src/Flow/ETL/DataFrame.php | 14 +++ src/core/etl/src/Flow/ETL/Row/Schema.php | 11 ++ .../src/Flow/ETL/Row/Schema/Definition.php | 9 ++ .../Integration/DataFrame/SchemaTest.php | 103 ++++++++++++++++++ .../Tests/Unit/Row/Schema/DefinitionTest.php | 33 ++++++ .../Tests/Unit/ParquetFile/SchemaTest.php | 22 ++++ 7 files changed, 200 insertions(+) create mode 100644 src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SchemaTest.php diff --git a/src/core/etl/src/Flow/ETL/DSL/functions.php b/src/core/etl/src/Flow/ETL/DSL/functions.php index 5ad47e6f3..884028a15 100644 --- a/src/core/etl/src/Flow/ETL/DSL/functions.php +++ b/src/core/etl/src/Flow/ETL/DSL/functions.php @@ -984,6 +984,14 @@ function list_schema(string $name, ListType $type, bool $nullable = false, ?Sche return Definition::list($name, $type, $nullable, $constraint, $metadata); } +/** + * @param array> $entry_classes + */ +function union_schema(string $name, array $entry_classes, ?Schema\Constraint $constraint = null, ?Schema\Metadata $metadata = null) : Definition +{ + return Definition::union($name, $entry_classes, $constraint, $metadata); +} + /** * @param class-string<\UnitEnum> $type */ diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index b23bc9d00..7f2db4484 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -769,6 +769,20 @@ public function saveMode(SaveMode $mode) : self return $this->mode($mode); } + /** + * @trigger + */ + public function schema() : Schema + { + $schema = new Schema(); + + foreach ($this->pipeline->process($this->context) as $rows) { + $schema = $schema->merge($rows->schema()); + } + + return $schema; + } + /** * @lazy * Keep only given entries. diff --git a/src/core/etl/src/Flow/ETL/Row/Schema.php b/src/core/etl/src/Flow/ETL/Row/Schema.php index d19afd0c3..dbdadd9d9 100644 --- a/src/core/etl/src/Flow/ETL/Row/Schema.php +++ b/src/core/etl/src/Flow/ETL/Row/Schema.php @@ -104,6 +104,17 @@ public function merge(self $schema) : self return new self(...\array_values($newDefinitions)); } + public function narrow() : self + { + $definitions = []; + + foreach ($this->definitions as $definition) { + $definitions[] = $definition->narrow(); + } + + return new self(...$definitions); + } + public function nullable() : self { $definitions = []; diff --git a/src/core/etl/src/Flow/ETL/Row/Schema/Definition.php b/src/core/etl/src/Flow/ETL/Row/Schema/Definition.php index 648a0f2b0..53e635e56 100644 --- a/src/core/etl/src/Flow/ETL/Row/Schema/Definition.php +++ b/src/core/etl/src/Flow/ETL/Row/Schema/Definition.php @@ -283,6 +283,15 @@ public function metadata() : Metadata return $this->metadata; } + public function narrow() : self + { + if (!$this->isUnion()) { + return $this; + } + + return self::string($this->ref, $this->isNullable(), $this->constraint, $this->metadata); + } + public function nullable() : self { if (!\in_array(NullEntry::class, $this->classes, true)) { diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SchemaTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SchemaTest.php new file mode 100644 index 000000000..244d9e3bb --- /dev/null +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SchemaTest.php @@ -0,0 +1,103 @@ + $i, + 'name' => 'name_' . $i, + 'active' => $i % 2 === 0, + ]; + }, + \range(1, 100) + )); + + $this->assertEquals( + schema( + int_schema('id'), + str_schema('name'), + bool_schema('active') + ), + df() + ->read(from_rows($rows)) + ->autoCast() + ->schema() + ); + } + + public function test_getting_schema_from_limited_rows() : void + { + $rows = array_to_rows(\array_map( + function ($i) { + return [ + 'id' => $i, + 'name' => 'name_' . $i, + 'active' => $i % 2 === 0, + 'union' => $i > 50 ? 'string' : 1, + ]; + }, + \range(1, 100) + )); + + $this->assertEquals( + schema( + int_schema('id'), + str_schema('name'), + bool_schema('active'), + int_schema('union') + ), + df() + ->read(from_rows($rows)) + ->autoCast() + ->limit(50) + ->schema() + ); + } + + public function test_getting_schema_with_union_type() : void + { + $rows = array_to_rows(\array_map( + function ($i) { + return [ + 'id' => $i, + 'name' => 'name_' . $i, + 'active' => $i % 2 === 0, + 'union' => $i > 50 ? 'string' : 1, + ]; + }, + \range(1, 100) + )); + + $this->assertEquals( + schema( + int_schema('id'), + str_schema('name'), + bool_schema('active'), + union_schema('union', [IntegerEntry::class, StringEntry::class]) + ), + df() + ->read(from_rows($rows)) + ->autoCast() + ->schema() + ); + } +} diff --git a/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/Schema/DefinitionTest.php b/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/Schema/DefinitionTest.php index 32a828f89..731dfa95b 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/Schema/DefinitionTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Unit/Row/Schema/DefinitionTest.php @@ -6,8 +6,10 @@ use function Flow\ETL\DSL\bool_entry; use function Flow\ETL\DSL\int_entry; +use function Flow\ETL\DSL\int_schema; use function Flow\ETL\DSL\null_entry; use function Flow\ETL\DSL\str_entry; +use function Flow\ETL\DSL\str_schema; use function Flow\ETL\DSL\struct_element; use function Flow\ETL\DSL\struct_entry; use function Flow\ETL\DSL\struct_type; @@ -17,6 +19,7 @@ use Flow\ETL\PHP\Type\Logical\List\ListElement; use Flow\ETL\PHP\Type\Logical\ListType; use Flow\ETL\PHP\Type\Logical\StructureType; +use Flow\ETL\Row\Entry\DateTimeEntry; use Flow\ETL\Row\Entry\IntegerEntry; use Flow\ETL\Row\Entry\ListEntry; use Flow\ETL\Row\Entry\NullEntry; @@ -175,6 +178,36 @@ public function test_multi_types_is_not_union() : void $this->assertTrue(Definition::union('id', [IntegerEntry::class, StringEntry::class, NullEntry::class])->isUnion()); } + public function test_narrow_non_union_type() : void + { + $def = int_schema('int'); + + $this->assertSame( + $def, + $def->narrow() + ); + } + + public function test_narrow_nullable_union_type() : void + { + $def = Definition::union('test', [IntegerEntry::class, StringEntry::class, DateTimeEntry::class, NullEntry::class]); + + $this->assertEquals( + str_schema('test', true), + $def->narrow() + ); + } + + public function test_narrow_union_type() : void + { + $def = Definition::union('test', [IntegerEntry::class, StringEntry::class, DateTimeEntry::class]); + + $this->assertEquals( + str_schema('test'), + $def->narrow() + ); + } + public function test_not_matches_when_constraint_not_satisfied() : void { $constraint = $this->createMock(Constraint::class); diff --git a/src/lib/parquet/tests/Flow/Parquet/Tests/Unit/ParquetFile/SchemaTest.php b/src/lib/parquet/tests/Flow/Parquet/Tests/Unit/ParquetFile/SchemaTest.php index 7ff5df0f8..dee95a046 100644 --- a/src/lib/parquet/tests/Flow/Parquet/Tests/Unit/ParquetFile/SchemaTest.php +++ b/src/lib/parquet/tests/Flow/Parquet/Tests/Unit/ParquetFile/SchemaTest.php @@ -2,7 +2,13 @@ namespace Flow\Parquet\Tests\Unit\ParquetFile; +use function Flow\ETL\DSL\int_schema; +use function Flow\ETL\DSL\str_schema; +use function Flow\ETL\DSL\union_schema; use Flow\ETL\Adapter\Elasticsearch\Tests\Integration\TestCase; +use Flow\ETL\Row\Entry\IntegerEntry; +use Flow\ETL\Row\Entry\NullEntry; +use Flow\ETL\Row\Entry\StringEntry; use Flow\Parquet\ParquetFile\Schema; use Flow\Parquet\ParquetFile\Schema\FlatColumn; use Flow\Parquet\ParquetFile\Schema\ListElement; @@ -105,4 +111,20 @@ public function test_flattening_schema_to_receive_simple_array_of_flat_columns() $this->assertInstanceOf(FlatColumn::class, $column); } } + + public function test_narrowing_schema_with_union_types() : void + { + $schema = \Flow\ETL\DSL\schema( + int_schema('id'), + union_schema('tracking_number', [StringEntry::class, IntegerEntry::class, NullEntry::class]), + )->narrow(); + + $this->assertEquals( + \Flow\ETL\DSL\schema( + int_schema('id'), + str_schema('tracking_number', true), + ), + $schema + ); + } }