Skip to content

Commit

Permalink
Bypass schema infering when reading from parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Feb 9, 2024
1 parent a767a1e commit 9d7c4a6
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtra
use Limitable;
use PartitionFiltering;

private SchemaConverter $schemaConverter;

/**
* @param Path $path
* @param array<string> $columns
Expand All @@ -37,6 +39,7 @@ public function __construct(
private readonly ?int $offset = null
) {
$this->resetLimit();
$this->schemaConverter = new SchemaConverter();

if ($this->path->isPattern() && $this->offset !== null) {
throw new InvalidArgumentException('Offset can be used only with single file path, not with pattern');
Expand All @@ -48,12 +51,14 @@ public function extract(FlowContext $context) : \Generator
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($this->readers($context) as $fileData) {
$flowSchema = $this->schemaConverter->fromParquet($fileData['file']->schema());

foreach ($fileData['file']->values($this->columns, $this->limit(), $this->offset) as $row) {
if ($shouldPutInputIntoRows) {
$row['_input_file_uri'] = $fileData['uri'];
}

$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions']);
$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions'], $flowSchema);

$this->countRow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\struct_schema;
use function Flow\ETL\DSL\struct_type;
use function Flow\ETL\DSL\structure_element;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_object;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\JsonType;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\Map\MapKey;
use Flow\ETL\PHP\Type\Logical\Map\MapValue;
use Flow\ETL\PHP\Type\Logical\MapType;
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
use Flow\ETL\PHP\Type\Logical\StructureType;
use Flow\ETL\PHP\Type\Logical\UuidType;
use Flow\ETL\PHP\Type\Logical\XMLNodeType;
Expand All @@ -26,6 +35,17 @@

final class SchemaConverter
{
public function fromParquet(ParquetSchema $schema) : Schema
{
$definitions = [];

foreach ($schema->columns() as $column) {
$definitions[] = $this->fromParquetColumnToFlowDefinition($column);
}

return \Flow\ETL\DSL\schema(...$definitions);
}

public function toParquet(Schema $schema) : ParquetSchema
{
$columns = [];
Expand Down Expand Up @@ -242,4 +262,85 @@ private function flowTypeToParquetType(string $name, Type $type) : Column

throw new RuntimeException($type::class . ' is not supported.');
}

private function fromParquetColumnToFlowDefinition(Column $column) : Schema\Definition
{
if ($column instanceof FlatColumn) {
return $this->parquetFlatToFlowType($column);
}

/** @var NestedColumn $column */
return $this->parquetNestedToFlowType($column);
}

private function parquetFlatToFlowType(FlatColumn $column) : Schema\Definition
{
$logicalType = $column->logicalType();

if ($logicalType === null) {
return match ($column->type()) {
ParquetSchema\PhysicalType::INT32 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::INT64 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BOOLEAN => Schema\Definition::boolean($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::DOUBLE => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::FLOAT => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BYTE_ARRAY => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
default => throw new RuntimeException($column->type()->name . ' is not supported.')
};
}

return match ($logicalType->name()) {
ParquetSchema\LogicalType::STRING => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DATE => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::TIME => Schema\Definition::object($column->name(), type_object(\DateInterval::class, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)),
ParquetSchema\LogicalType::TIMESTAMP => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::UUID => Schema\Definition::uuid($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::JSON => Schema\Definition::json($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DECIMAL => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::INTEGER => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
default => throw new RuntimeException($logicalType->name() . ' is not supported.')
};
}

private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definition
{
if ($column->isList()) {
return list_schema(
$column->name(),
type_list(
$this->fromParquetColumnToFlowDefinition($column->getListElement())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
)
);
}

if ($column->isMap()) {
$keyType = $this->fromParquetColumnToFlowDefinition($column->getMapKeyColumn())->type();

if (!$keyType instanceof ScalarType) {
throw new RuntimeException('Flow expects map key type to be scalar type.');
}

return map_schema(
$column->name(),
type_map(
$keyType,
$this->fromParquetColumnToFlowDefinition($column->getMapValueColumn())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
)
);
}

/** @var array<StructureElement> $elements */
$elements = [];

foreach ($column->children() as $structColumn) {
$elements[] = structure_element(
$structColumn->name(),
$this->fromParquetColumnToFlowDefinition($structColumn)->type()
);
}

return struct_schema($column->name(), struct_type($elements, $column->repetition() === ParquetSchema\Repetition::OPTIONAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ public function test_writing_with_partitioning() : void
->run();

$this->assertEquals(
$rows,
$rows->toArray(),
(new Flow())
->read(from_parquet($path . '/**/*.parquet'))
->drop('date')
->sortBy(ref('datetime')->asc())
->fetch()
->toArray()
);

$this->assertSame(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use Flow\Parquet\ParquetFile\Schema\NestedColumn;
use PHPUnit\Framework\TestCase;

final class SchemaConverterTest extends TestCase
final class FlowToParquetSchemaTest extends TestCase
{
public function test_convert_array_entry_to_parquet_array() : void
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Parquet\Tests\Unit;

use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\json_schema;
use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\object_schema;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\struct_element;
use function Flow\ETL\DSL\struct_schema;
use function Flow\ETL\DSL\type_boolean;
use function Flow\ETL\DSL\type_int;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_object;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\type_structure;
use function Flow\ETL\DSL\type_uuid;
use function Flow\ETL\DSL\uuid_schema;
use Flow\ETL\Adapter\Parquet\SchemaConverter;
use Flow\Parquet\ParquetFile\Schema;
use Flow\Parquet\ParquetFile\Schema\MapKey;
use Flow\Parquet\ParquetFile\Schema\MapValue;
use PHPUnit\Framework\TestCase;

final class ParquetToFlowSchemaTest extends TestCase
{
public function test_converting_flat_fields_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\FlatColumn::int32('int32'),
Schema\FlatColumn::int64('int64'),
Schema\FlatColumn::string('string'),
Schema\FlatColumn::float('float'),
Schema\FlatColumn::double('double'),
Schema\FlatColumn::decimal('decimal'),
Schema\FlatColumn::boolean('boolean'),
Schema\FlatColumn::date('date'),
Schema\FlatColumn::time('time'),
Schema\FlatColumn::dateTime('datetime'),
Schema\FlatColumn::uuid('uuid'),
Schema\FlatColumn::json('json'),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
int_schema('int32', true),
int_schema('int64', true),
str_schema('string', true),
float_schema('float', true),
float_schema('double', true),
float_schema('decimal', true),
bool_schema('boolean', true),
datetime_schema('date', true),
object_schema('time', type_object(\DateInterval::class, true)),
datetime_schema('datetime', true),
uuid_schema('uuid', true),
json_schema('json', true),
),
$flowSchema
);
}

public function test_converting_list_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::list('list', Schema\ListElement::string()),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
list_schema('list', type_list(type_string(true), true))
),
$flowSchema,
);
}

public function test_converting_map_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::map('map', MapKey::string(), MapValue::int64()),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
map_schema('map', type_map(type_string(), type_int(true), true))
),
$flowSchema,
);
}

public function test_converting_struct_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::struct(
'struct',
[
Schema\FlatColumn::uuid('uuid'),
Schema\FlatColumn::string('name'),
Schema\FlatColumn::boolean('active'),
]
),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
struct_schema(
'struct',
type_structure(
[
struct_element('uuid', type_uuid(true)),
struct_element('name', type_string(true)),
struct_element('active', type_boolean(true)),
],
true
),
)
),
$flowSchema,
);
}
}
7 changes: 5 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

final class Bytes implements \ArrayAccess, \Countable, \IteratorAggregate
{
private readonly \ArrayIterator $iterator;
private \ArrayIterator|null $iterator = null;

private readonly DataSize $size;

Expand All @@ -18,7 +18,6 @@ public function __construct(
private readonly ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN
) {
$this->size = new DataSize(\count($this->bytes) * 8);
$this->iterator = new \ArrayIterator($this->bytes);
}

public static function fromString(string $string, ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN) : self
Expand All @@ -36,6 +35,10 @@ public function count() : int
// IteratorAggregate methods
public function getIterator() : \ArrayIterator
{
if ($this->iterator === null) {
$this->iterator = new \ArrayIterator($this->bytes);
}

return $this->iterator;
}

Expand Down
39 changes: 39 additions & 0 deletions src/lib/parquet/src/Flow/Parquet/Data/Converter/JsonConverter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php declare(strict_types=1);

namespace Flow\Parquet\Data\Converter;

use Flow\Parquet\Data\Converter;
use Flow\Parquet\Exception\RuntimeException;
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\Schema\FlatColumn;
use Flow\Parquet\ParquetFile\Schema\LogicalType;

final class JsonConverter implements Converter
{
public function fromParquetType(mixed $data) : string
{
if (!\is_string($data)) {
throw new RuntimeException('Json must be read as a string from Parquet file');
}

return $data;
}

public function isFor(FlatColumn $column, Options $options) : bool
{
if ($column->logicalType()?->name() === LogicalType::JSON) {
return true;
}

return false;
}

public function toParquetType(mixed $data) : string
{
if (!\is_string($data)) {
throw new RuntimeException('Json must be written as a string from Parquet file');
}

return $data;
}
}
Loading

0 comments on commit 9d7c4a6

Please sign in to comment.