Skip to content

Commit

Permalink
Added DataFrame::schema method to just read schema from dataset (#925)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Jan 19, 2024
1 parent e5d4aeb commit 2253b91
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,14 @@ function list_schema(string $name, ListType $type, bool $nullable = false, ?Sche
return Definition::list($name, $type, $nullable, $constraint, $metadata);
}

/**
* @param array<class-string<Row\Entry>> $entry_classes
*/
function union_schema(string $name, array $entry_classes, ?Schema\Constraint $constraint = null, ?Schema\Metadata $metadata = null) : Definition
{
return Definition::union($name, $entry_classes, $constraint, $metadata);
}

/**
* @param class-string<\UnitEnum> $type
*/
Expand Down
14 changes: 14 additions & 0 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,20 @@ public function saveMode(SaveMode $mode) : self
return $this->mode($mode);
}

/**
* @trigger
*/
public function schema() : Schema
{
$schema = new Schema();

foreach ($this->pipeline->process($this->context) as $rows) {
$schema = $schema->merge($rows->schema());
}

return $schema;
}

/**
* @lazy
* Keep only given entries.
Expand Down
11 changes: 11 additions & 0 deletions src/core/etl/src/Flow/ETL/Row/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ public function merge(self $schema) : self
return new self(...\array_values($newDefinitions));
}

public function narrow() : self
{
$definitions = [];

foreach ($this->definitions as $definition) {
$definitions[] = $definition->narrow();
}

return new self(...$definitions);
}

public function nullable() : self
{
$definitions = [];
Expand Down
9 changes: 9 additions & 0 deletions src/core/etl/src/Flow/ETL/Row/Schema/Definition.php
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,15 @@ public function metadata() : Metadata
return $this->metadata;
}

public function narrow() : self
{
if (!$this->isUnion()) {
return $this;
}

return self::string($this->ref, $this->isNullable(), $this->constraint, $this->metadata);
}

public function nullable() : self
{
if (!\in_array(NullEntry::class, $this->classes, true)) {
Expand Down
103 changes: 103 additions & 0 deletions src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SchemaTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\DataFrame;

use function Flow\ETL\DSL\array_to_rows;
use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\from_rows;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\schema;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\union_schema;
use Flow\ETL\Row\Entry\IntegerEntry;
use Flow\ETL\Row\Entry\StringEntry;
use Flow\ETL\Tests\Integration\IntegrationTestCase;

final class SchemaTest extends IntegrationTestCase
{
public function test_getting_schema() : void
{
$rows = array_to_rows(\array_map(
function ($i) {
return [
'id' => $i,
'name' => 'name_' . $i,
'active' => $i % 2 === 0,
];
},
\range(1, 100)
));

$this->assertEquals(
schema(
int_schema('id'),
str_schema('name'),
bool_schema('active')
),
df()
->read(from_rows($rows))
->autoCast()
->schema()
);
}

public function test_getting_schema_from_limited_rows() : void
{
$rows = array_to_rows(\array_map(
function ($i) {
return [
'id' => $i,
'name' => 'name_' . $i,
'active' => $i % 2 === 0,
'union' => $i > 50 ? 'string' : 1,
];
},
\range(1, 100)
));

$this->assertEquals(
schema(
int_schema('id'),
str_schema('name'),
bool_schema('active'),
int_schema('union')
),
df()
->read(from_rows($rows))
->autoCast()
->limit(50)
->schema()
);
}

public function test_getting_schema_with_union_type() : void
{
$rows = array_to_rows(\array_map(
function ($i) {
return [
'id' => $i,
'name' => 'name_' . $i,
'active' => $i % 2 === 0,
'union' => $i > 50 ? 'string' : 1,
];
},
\range(1, 100)
));

$this->assertEquals(
schema(
int_schema('id'),
str_schema('name'),
bool_schema('active'),
union_schema('union', [IntegerEntry::class, StringEntry::class])
),
df()
->read(from_rows($rows))
->autoCast()
->schema()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

use function Flow\ETL\DSL\bool_entry;
use function Flow\ETL\DSL\int_entry;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\null_entry;
use function Flow\ETL\DSL\str_entry;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\struct_element;
use function Flow\ETL\DSL\struct_entry;
use function Flow\ETL\DSL\struct_type;
Expand All @@ -17,6 +19,7 @@
use Flow\ETL\PHP\Type\Logical\List\ListElement;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\StructureType;
use Flow\ETL\Row\Entry\DateTimeEntry;
use Flow\ETL\Row\Entry\IntegerEntry;
use Flow\ETL\Row\Entry\ListEntry;
use Flow\ETL\Row\Entry\NullEntry;
Expand Down Expand Up @@ -175,6 +178,36 @@ public function test_multi_types_is_not_union() : void
$this->assertTrue(Definition::union('id', [IntegerEntry::class, StringEntry::class, NullEntry::class])->isUnion());
}

public function test_narrow_non_union_type() : void
{
$def = int_schema('int');

$this->assertSame(
$def,
$def->narrow()
);
}

public function test_narrow_nullable_union_type() : void
{
$def = Definition::union('test', [IntegerEntry::class, StringEntry::class, DateTimeEntry::class, NullEntry::class]);

$this->assertEquals(
str_schema('test', true),
$def->narrow()
);
}

public function test_narrow_union_type() : void
{
$def = Definition::union('test', [IntegerEntry::class, StringEntry::class, DateTimeEntry::class]);

$this->assertEquals(
str_schema('test'),
$def->narrow()
);
}

public function test_not_matches_when_constraint_not_satisfied() : void
{
$constraint = $this->createMock(Constraint::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

namespace Flow\Parquet\Tests\Unit\ParquetFile;

use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\union_schema;
use Flow\ETL\Adapter\Elasticsearch\Tests\Integration\TestCase;
use Flow\ETL\Row\Entry\IntegerEntry;
use Flow\ETL\Row\Entry\NullEntry;
use Flow\ETL\Row\Entry\StringEntry;
use Flow\Parquet\ParquetFile\Schema;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Schema\ListElement;
Expand Down Expand Up @@ -105,4 +111,20 @@ public function test_flattening_schema_to_receive_simple_array_of_flat_columns()
$this->assertInstanceOf(FlatColumn::class, $column);
}
}

public function test_narrowing_schema_with_union_types() : void
{
$schema = \Flow\ETL\DSL\schema(
int_schema('id'),
union_schema('tracking_number', [StringEntry::class, IntegerEntry::class, NullEntry::class]),
)->narrow();

$this->assertEquals(
\Flow\ETL\DSL\schema(
int_schema('id'),
str_schema('tracking_number', true),
),
$schema
);
}
}

0 comments on commit 2253b91

Please sign in to comment.