Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bypass schema infering when reading from parquet files #975

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ final class ParquetExtractor implements Extractor, FileExtractor, LimitableExtra
use Limitable;
use PartitionFiltering;

private SchemaConverter $schemaConverter;

/**
* @param Path $path
* @param array<string> $columns
Expand All @@ -37,6 +39,7 @@ public function __construct(
private readonly ?int $offset = null
) {
$this->resetLimit();
$this->schemaConverter = new SchemaConverter();

if ($this->path->isPattern() && $this->offset !== null) {
throw new InvalidArgumentException('Offset can be used only with single file path, not with pattern');
Expand All @@ -48,12 +51,14 @@ public function extract(FlowContext $context) : \Generator
$shouldPutInputIntoRows = $context->config->shouldPutInputIntoRows();

foreach ($this->readers($context) as $fileData) {
$flowSchema = $this->schemaConverter->fromParquet($fileData['file']->schema());

foreach ($fileData['file']->values($this->columns, $this->limit(), $this->offset) as $row) {
if ($shouldPutInputIntoRows) {
$row['_input_file_uri'] = $fileData['uri'];
}

$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions']);
$signal = yield array_to_rows($row, $context->entryFactory(), $fileData['partitions'], $flowSchema);

$this->countRow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@

namespace Flow\ETL\Adapter\Parquet;

use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\struct_schema;
use function Flow\ETL\DSL\struct_type;
use function Flow\ETL\DSL\structure_element;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_object;
use Flow\ETL\Exception\RuntimeException;
use Flow\ETL\PHP\Type\Logical\DateTimeType;
use Flow\ETL\PHP\Type\Logical\JsonType;
use Flow\ETL\PHP\Type\Logical\ListType;
use Flow\ETL\PHP\Type\Logical\Map\MapKey;
use Flow\ETL\PHP\Type\Logical\Map\MapValue;
use Flow\ETL\PHP\Type\Logical\MapType;
use Flow\ETL\PHP\Type\Logical\Structure\StructureElement;
use Flow\ETL\PHP\Type\Logical\StructureType;
use Flow\ETL\PHP\Type\Logical\UuidType;
use Flow\ETL\PHP\Type\Logical\XMLNodeType;
Expand All @@ -26,6 +35,17 @@

final class SchemaConverter
{
public function fromParquet(ParquetSchema $schema) : Schema
{
$definitions = [];

foreach ($schema->columns() as $column) {
$definitions[] = $this->fromParquetColumnToFlowDefinition($column);
}

return \Flow\ETL\DSL\schema(...$definitions);
}

public function toParquet(Schema $schema) : ParquetSchema
{
$columns = [];
Expand Down Expand Up @@ -242,4 +262,85 @@ private function flowTypeToParquetType(string $name, Type $type) : Column

throw new RuntimeException($type::class . ' is not supported.');
}

private function fromParquetColumnToFlowDefinition(Column $column) : Schema\Definition
{
if ($column instanceof FlatColumn) {
return $this->parquetFlatToFlowType($column);
}

/** @var NestedColumn $column */
return $this->parquetNestedToFlowType($column);
}

private function parquetFlatToFlowType(FlatColumn $column) : Schema\Definition
{
$logicalType = $column->logicalType();

if ($logicalType === null) {
return match ($column->type()) {
ParquetSchema\PhysicalType::INT32 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::INT64 => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BOOLEAN => Schema\Definition::boolean($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::DOUBLE => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::FLOAT => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\PhysicalType::BYTE_ARRAY => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
default => throw new RuntimeException($column->type()->name . ' is not supported.')
};
}

return match ($logicalType->name()) {
ParquetSchema\LogicalType::STRING => Schema\Definition::string($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DATE => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::TIME => Schema\Definition::object($column->name(), type_object(\DateInterval::class, $column->repetition() === ParquetSchema\Repetition::OPTIONAL)),
ParquetSchema\LogicalType::TIMESTAMP => Schema\Definition::dateTime($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::UUID => Schema\Definition::uuid($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::JSON => Schema\Definition::json($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::DECIMAL => Schema\Definition::float($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
ParquetSchema\LogicalType::INTEGER => Schema\Definition::integer($column->name(), $column->repetition() === ParquetSchema\Repetition::OPTIONAL),
default => throw new RuntimeException($logicalType->name() . ' is not supported.')
};
}

private function parquetNestedToFlowType(NestedColumn $column) : Schema\Definition
{
if ($column->isList()) {
return list_schema(
$column->name(),
type_list(
$this->fromParquetColumnToFlowDefinition($column->getListElement())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
)
);
}

if ($column->isMap()) {
$keyType = $this->fromParquetColumnToFlowDefinition($column->getMapKeyColumn())->type();

if (!$keyType instanceof ScalarType) {
throw new RuntimeException('Flow expects map key type to be scalar type.');
}

return map_schema(
$column->name(),
type_map(
$keyType,
$this->fromParquetColumnToFlowDefinition($column->getMapValueColumn())->type(),
$column->repetition() === ParquetSchema\Repetition::OPTIONAL
)
);
}

/** @var array<StructureElement> $elements */
$elements = [];

foreach ($column->children() as $structColumn) {
$elements[] = structure_element(
$structColumn->name(),
$this->fromParquetColumnToFlowDefinition($structColumn)->type()
);
}

return struct_schema($column->name(), struct_type($elements, $column->repetition() === ParquetSchema\Repetition::OPTIONAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

use function Flow\ETL\Adapter\Parquet\from_parquet;
use function Flow\ETL\Adapter\Parquet\to_parquet;
use function Flow\ETL\DSL\str_entry;
use Flow\ETL\Config;
use Flow\ETL\FlowContext;
use Flow\ETL\Row;
use Flow\ETL\Rows;
use PhpBench\Attributes\Groups;

Expand All @@ -27,10 +25,6 @@ public function __construct()
$this->rows = new Rows();

foreach (from_parquet(__DIR__ . '/../Fixtures/orders_flow.parquet')->extract($this->context) as $rows) {
$rows = $rows->map(static function (Row $row) : Row {
return $row->set(str_entry('order_id', $row->valueOf('order_id')->toString()));
});

$this->rows = $this->rows->merge($rows);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ public function test_writing_with_partitioning() : void
->run();

$this->assertEquals(
$rows,
$rows->toArray(),
(new Flow())
->read(from_parquet($path . '/**/*.parquet'))
->drop('date')
->sortBy(ref('datetime')->asc())
->fetch()
->toArray()
);

$this->assertSame(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use Flow\Parquet\ParquetFile\Schema\NestedColumn;
use PHPUnit\Framework\TestCase;

final class SchemaConverterTest extends TestCase
final class FlowToParquetSchemaTest extends TestCase
{
public function test_convert_array_entry_to_parquet_array() : void
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\Parquet\Tests\Unit;

use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\datetime_schema;
use function Flow\ETL\DSL\float_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\json_schema;
use function Flow\ETL\DSL\list_schema;
use function Flow\ETL\DSL\map_schema;
use function Flow\ETL\DSL\object_schema;
use function Flow\ETL\DSL\str_schema;
use function Flow\ETL\DSL\struct_element;
use function Flow\ETL\DSL\struct_schema;
use function Flow\ETL\DSL\type_boolean;
use function Flow\ETL\DSL\type_int;
use function Flow\ETL\DSL\type_list;
use function Flow\ETL\DSL\type_map;
use function Flow\ETL\DSL\type_object;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\type_structure;
use function Flow\ETL\DSL\type_uuid;
use function Flow\ETL\DSL\uuid_schema;
use Flow\ETL\Adapter\Parquet\SchemaConverter;
use Flow\Parquet\ParquetFile\Schema;
use Flow\Parquet\ParquetFile\Schema\MapKey;
use Flow\Parquet\ParquetFile\Schema\MapValue;
use PHPUnit\Framework\TestCase;

final class ParquetToFlowSchemaTest extends TestCase
{
public function test_converting_flat_fields_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\FlatColumn::int32('int32'),
Schema\FlatColumn::int64('int64'),
Schema\FlatColumn::string('string'),
Schema\FlatColumn::float('float'),
Schema\FlatColumn::double('double'),
Schema\FlatColumn::decimal('decimal'),
Schema\FlatColumn::boolean('boolean'),
Schema\FlatColumn::date('date'),
Schema\FlatColumn::time('time'),
Schema\FlatColumn::dateTime('datetime'),
Schema\FlatColumn::uuid('uuid'),
Schema\FlatColumn::json('json'),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
int_schema('int32', true),
int_schema('int64', true),
str_schema('string', true),
float_schema('float', true),
float_schema('double', true),
float_schema('decimal', true),
bool_schema('boolean', true),
datetime_schema('date', true),
object_schema('time', type_object(\DateInterval::class, true)),
datetime_schema('datetime', true),
uuid_schema('uuid', true),
json_schema('json', true),
),
$flowSchema
);
}

public function test_converting_list_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::list('list', Schema\ListElement::string()),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
list_schema('list', type_list(type_string(true), true))
),
$flowSchema,
);
}

public function test_converting_map_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::map('map', MapKey::string(), MapValue::int64()),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
map_schema('map', type_map(type_string(), type_int(true), true))
),
$flowSchema,
);
}

public function test_converting_struct_to_flow_schema() : void
{
$converted = new SchemaConverter();

$flowSchema = $converted->fromParquet(Schema::with(
Schema\NestedColumn::struct(
'struct',
[
Schema\FlatColumn::uuid('uuid'),
Schema\FlatColumn::string('name'),
Schema\FlatColumn::boolean('active'),
]
),
));

$this->assertEquals(
\Flow\ETL\DSL\schema(
struct_schema(
'struct',
type_structure(
[
struct_element('uuid', type_uuid(true)),
struct_element('name', type_string(true)),
struct_element('active', type_boolean(true)),
],
true
),
)
),
$flowSchema,
);
}
}
7 changes: 5 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/BinaryReader/Bytes.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

final class Bytes implements \ArrayAccess, \Countable, \IteratorAggregate
{
private readonly \ArrayIterator $iterator;
private \ArrayIterator|null $iterator = null;

private readonly DataSize $size;

Expand All @@ -18,7 +18,6 @@ public function __construct(
private readonly ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN
) {
$this->size = new DataSize(\count($this->bytes) * 8);
$this->iterator = new \ArrayIterator($this->bytes);
}

public static function fromString(string $string, ByteOrder $byteOrder = ByteOrder::LITTLE_ENDIAN) : self
Expand All @@ -36,6 +35,10 @@ public function count() : int
// IteratorAggregate methods
public function getIterator() : \ArrayIterator
{
if ($this->iterator === null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just this mnemonic is reducing memory consumption by half due to not keeping repeated bytes in memory.

$this->iterator = new \ArrayIterator($this->bytes);
}

return $this->iterator;
}

Expand Down
Loading
Loading