Skip to content

Commit 37a630b

Browse files
authored
Fixed writing nullable parquet values that are marked in the schema as optional (#1025)
* Fixed writing nullable parquet values that are marked in the schema as optional * Fixed failing validation test * Fixed reading empty chunks without snappy extension
1 parent 4e3e08f commit 37a630b

File tree

14 files changed

+222
-135
lines changed

14 files changed

+222
-135
lines changed

composer.lock

Lines changed: 44 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/lib/parquet/src/Flow/Parquet/ParquetFile/Data/RLEBitPackedHybrid.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public function decodeBitPacked(BinaryReader $reader, int $bitWidth, int $varInt
3737
$readBytes = $reader->readBytes(\min($remainingByteCount, $totalByteCount));
3838
$actualByteCount = $readBytes->count();
3939

40+
if ($actualByteCount === 0) {
41+
return;
42+
}
43+
4044
$bitMask = (1 << $bitWidth) - 1;
4145
$byteIndex = 0;
4246
$currentByte = $readBytes[$byteIndex];

src/lib/parquet/src/Flow/Parquet/ParquetFile/PageReader.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public function readDictionary(FlatColumn $column, PageHeader $pageHeader, Compr
7979
(new Codec($this->options))
8080
->decompress(
8181
/** @phpstan-ignore-next-line */
82-
\fread($stream, $pageHeader->compressedPageSize()),
82+
$pageHeader->compressedPageSize() === 0 ? '' : \fread($stream, $pageHeader->compressedPageSize()),
8383
$codec
8484
),
8585
$column,

src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
namespace Flow\Parquet\ParquetFile;
66

77
use Flow\Parquet\Data\DataConverter;
8-
use Flow\Parquet\Exception\InvalidArgumentException;
98
use Flow\Parquet\ParquetFile\RowGroupBuilder\Validator\{ColumnDataValidator, DisabledValidator};
109
use Flow\Parquet\ParquetFile\RowGroupBuilder\{ColumnChunkBuilder, Flattener, PageSizeCalculator, RowGroupContainer, RowGroupStatistics};
1110
use Flow\Parquet\{Option, Options};
@@ -49,9 +48,6 @@ public function addRow(array $row) : void
4948
$flatRow = [];
5049

5150
foreach ($this->schema->columns() as $column) {
52-
if (!\array_key_exists($column->name(), $row)) {
53-
throw new InvalidArgumentException(\sprintf("Column '%s' not found in row", $column->name()));
54-
}
5551
$flatRow[] = $this->flattener->flattenColumn($column, $row);
5652
}
5753

src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/Flattener.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public function flattenColumn(Column $column, array $row) : array
1818
if (!\array_key_exists($column->name(), $row)) {
1919
$this->validator->validate($column, null);
2020

21-
return [];
21+
return [$column->name() => null];
2222
}
2323

2424
/**

src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/Column.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public function isMap() : bool;
2020

2121
public function isMapElement() : bool;
2222

23+
public function isRequired() : bool;
24+
2325
public function isStruct() : bool;
2426

2527
public function isStructElement() : bool;

src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/FlatColumn.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,11 @@ public function isMapElement() : bool
243243
return false;
244244
}
245245

246+
public function isRequired() : bool
247+
{
248+
return $this->repetition !== Repetition::OPTIONAL;
249+
}
250+
246251
public function isStruct() : bool
247252
{
248253
return false;

src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/NestedColumn.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ public function isMapElement() : bool
264264
return false;
265265
}
266266

267+
public function isRequired() : bool
268+
{
269+
return $this->repetition !== Repetition::OPTIONAL;
270+
}
271+
267272
public function isStruct() : bool
268273
{
269274
if ($this->isMap()) {

0 commit comments

Comments
 (0)