Skip to content

Commit

Permalink
Added possibility to set cache batch size (#1034)
Browse files Browse the repository at this point in the history
* Added possibility to set cache batch size

* Static analyze fixes

* Fixed failing tests
  • Loading branch information
norberttech authored Apr 2, 2024
1 parent 3343e03 commit 3580718
Show file tree
Hide file tree
Showing 23 changed files with 60 additions and 52 deletions.
1 change: 1 addition & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
parameters:
level: 8
treatPhpDocTypesAsCertain: false
checkGenericClassInNonGenericObjectType: false
checkMissingIterableValueType: false
paths:
Expand Down
1 change: 1 addition & 0 deletions psalm.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
</ignoreFiles>
</projectFiles>
<issueHandlers>
<DocblockTypeContradiction errorLevel="suppress" />
<RiskyTruthyFalsyComparison errorLevel="suppress" />
<LessSpecificReturnStatement errorLevel="suppress" />
<MoreSpecificReturnType errorLevel="suppress" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ final class CSVDetector
*/
public function __construct($resource, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null)
{
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_resource($resource)) {
throw new InvalidArgumentException('Argument must be a valid resource');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ public function extract(FlowContext $context) : \Generator
/**
* @var array[] $values
*
* @psalm-suppress RedundantConditionGivenDocblockType, DocblockTypeContradiction
*
* @phpstan-ignore-next-line
* @psalm-suppress RedundantConditionGivenDocblockType
*/
$values = $response->getValues() ?? [];

Expand Down Expand Up @@ -116,9 +114,7 @@ function (array $rowData) use ($headers, $shouldPutInputIntoRows) {
/**
* @var array[] $values
*
* @psalm-suppress RedundantConditionGivenDocblockType, DocblockTypeContradiction
*
* @phpstan-ignore-next-line
* @psalm-suppress RedundantConditionGivenDocblockType
*/
$values = $response->getValues() ?? [];
}
Expand Down
18 changes: 17 additions & 1 deletion src/core/etl/src/Flow/ETL/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Flow\ETL;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Row\EntryFactory;
Expand All @@ -19,6 +20,9 @@ final class Config

public const EXTERNAL_SORT_MAX_MEMORY_ENV = 'FLOW_EXTERNAL_SORT_MAX_MEMORY';

/**
* @param int<1, max> $cacheBatchSize
*/
public function __construct(
private readonly string $id,
private readonly Serializer $serializer,
Expand All @@ -27,8 +31,12 @@ public function __construct(
private readonly FilesystemStreams $filesystemStreams,
private readonly Optimizer $optimizer,
private readonly bool $putInputIntoRows,
private readonly EntryFactory $entryFactory
private readonly EntryFactory $entryFactory,
private readonly int $cacheBatchSize
) {
if ($this->cacheBatchSize < 1) {
throw new InvalidArgumentException('Cache batch size must be greater than 0');
}
}

public static function builder() : ConfigBuilder
Expand All @@ -46,6 +54,14 @@ public function cache() : Cache
return $this->cache;
}

/**
* @return int<1, max>
*/
public function cacheBatchSize() : int
{
return $this->cacheBatchSize;
}

public function entryFactory() : EntryFactory
{
return $this->entryFactory;
Expand Down
27 changes: 24 additions & 3 deletions src/core/etl/src/Flow/ETL/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@
namespace Flow\ETL;

use Flow\ETL\Cache\LocalFilesystemCache;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\ExternalSort\MemorySort;
use Flow\ETL\Filesystem\{FilesystemStreams, LocalFilesystem};
use Flow\ETL\Monitoring\Memory\Unit;
use Flow\ETL\Pipeline\Optimizer;
use Flow\ETL\Row\Factory\NativeEntryFactory;
use Flow\Serializer\{CompressingSerializer, NativePHPSerializer, Serializer};
use Flow\Serializer\{NativePHPSerializer, Serializer};

final class ConfigBuilder
{
private ?Cache $cache;

/**
* @var int<1, max>
*/
private int $cacheBatchSize = 1000;

private ?ExternalSort $externalSort;

private ?Filesystem $filesystem;
Expand Down Expand Up @@ -46,7 +52,7 @@ public function build() : Config
{
$this->id ??= \uniqid('flow_php', true);
$entryFactory = new NativeEntryFactory();
$this->serializer ??= new CompressingSerializer(new NativePHPSerializer());
$this->serializer ??= new NativePHPSerializer();
$cachePath = \is_string(\getenv(Config::CACHE_DIR_ENV)) && \realpath(\getenv(Config::CACHE_DIR_ENV))
? \getenv(Config::CACHE_DIR_ENV)
: \sys_get_temp_dir() . '/flow_php/cache';
Expand Down Expand Up @@ -91,7 +97,8 @@ public function build() : Config
new FilesystemStreams($this->filesystem),
$this->optimizer,
$this->putInputIntoRows,
$entryFactory
$entryFactory,
$this->cacheBatchSize
);
}

Expand All @@ -102,6 +109,20 @@ public function cache(Cache $cache) : self
return $this;
}

/**
* @throws InvalidArgumentException
*/
public function cacheBatchSize(int $cacheBatchSize) : self
{
if ($cacheBatchSize < 1) {
throw new InvalidArgumentException('Cache batch size must be greater than 0');
}

$this->cacheBatchSize = $cacheBatchSize;

return $this;
}

public function dontPutInputIntoRows() : self
{
$this->putInputIntoRows = false;
Expand Down
9 changes: 8 additions & 1 deletion src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,16 @@ public function batchSize(int $size) : self
* @lazy
*
* @param null|string $id
*
* @throws InvalidArgumentException
*/
public function cache(?string $id = null) : self
public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
{
if ($cacheBatchSize !== null && $cacheBatchSize < 1) {
throw new InvalidArgumentException('Cache batch size must be greater than 0');
}

$this->batchSize($cacheBatchSize ?? $this->context->config->cacheBatchSize());
$this->pipeline = new CachingPipeline($this->pipeline, $id);

return $this;
Expand Down
2 changes: 0 additions & 2 deletions src/core/etl/src/Flow/ETL/Join/Expression.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ public function __construct(
}

/**
* @psalm-suppress DocblockTypeContradiction
*
* @param array<string, string>|Comparison $comparison
*/
public static function on(array|Comparison $comparison, string $joinPrefix = '') : self
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/Monitoring/Memory/Unit.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ public static function fromString(string $memoryString) : self

switch (\strtoupper($unit)) {
case 'K':
case 'B':
return self::fromKb((int) \substr($limit, 0, -1));
case 'M':
case 'MB':
return self::fromMb((int) \substr($limit, 0, -1));
case 'G':
case 'GB':
return self::fromGb((int) \substr($limit, 0, -1));

default:
Expand Down
5 changes: 0 additions & 5 deletions src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ public function __construct(private readonly Pipeline $pipeline, private readonl
{
$this->nextPipeline = $pipeline->cleanCopy();

/**
* @psalm-suppress DocblockTypeContradiction
*
* @phpstan-ignore-next-line
*/
if ($this->size <= 0) {
throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->size);
}
Expand Down
1 change: 0 additions & 1 deletion src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,6 @@ public function merge(self $rows) : self
*/
public function offsetExists($offset) : bool
{
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_int($offset)) {
throw new InvalidArgumentException('Rows accepts only integer offsets');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,8 @@ public function test_etl_sort_at_disk_in_memory() : void
$cache = \array_diff(\scandir($this->cacheDir), ['..', '.']);

self::assertEmpty($cache);
// 50 initial writes
// 2500 single row writes
// 50 merged writes
self::assertSame(2600, $cacheSpy->writes());
// 1 main cache
// 50 tmp caches
// 1 sorted cache
// 1 extracted cache
self::assertSame(52, $cacheSpy->clears());
self::assertSame(2506, $cacheSpy->writes());
self::assertSame(5, $cacheSpy->clears());
}

public function test_etl_sort_by_in_memory() : void
Expand All @@ -60,6 +53,6 @@ public function test_etl_sort_by_in_memory() : void

self::assertEmpty($cache);
self::assertSame(\range(0, 39), $rows->reduceToArray('int'));
self::assertSame(20, $cacheSpy->writes());
self::assertSame(1, $cacheSpy->writes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ public function test_rank_on_partitioned_window() : void
->sortBy(ref('department'), ref('rank'))
->get();

self::assertSame(
self::assertEquals(
[
[
['id' => 5, 'name' => 'Jane', 'department' => 'Finances', 'salary' => 14_000, 'rank' => 1],
['id' => 3, 'name' => 'Tomas', 'department' => 'Finances', 'salary' => 11_000, 'rank' => 2],
['id' => 4, 'name' => 'John', 'department' => 'Finances', 'salary' => 9000, 'rank' => 3],
['id' => 6, 'name' => 'Janet', 'department' => 'Finances', 'salary' => 4000, 'rank' => 4],
],
[
['id' => 1, 'name' => 'Greg', 'department' => 'IT', 'salary' => 6000, 'rank' => 1],
['id' => 2, 'name' => 'Michal', 'department' => 'IT', 'salary' => 5000, 'rank' => 2],
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ public function __construct(array $rows)

$firstRow = \reset($rows);

/** @psalm-suppress DocblockTypeContradiction */
if (!\is_array($firstRow)) {
throw new RuntimeException('Each row must be an array');
}

$columns = \array_keys($firstRow);

foreach ($rows as $row) {
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_array($row)) {
throw new RuntimeException('Each row must be an array');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public function read(ColumnChunk $columnChunk, FlatColumn $column, $stream) : \G

/**
* @param resource $stream
*
* @psalm-suppress DocblockTypeContradiction
*/
private function readHeader($stream, int $pageOffset) : ?PageHeader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ public function view(ColumnChunk $columnChunk, FlatColumn $column, $stream) : \G

/**
* @param resource $stream
*
* @psalm-suppress DocblockTypeContradiction
*/
private function readHeader($stream, int $pageOffset) : ?PageHeader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public function __construct(

public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift) : self
{
/** @psalm-suppress DocblockTypeContradiction */
return new self(
$thrift->num_values,
$thrift->num_nulls,
Expand All @@ -37,7 +36,6 @@ public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift)
$thrift->repetition_levels_byte_length,
/** @phpstan-ignore-next-line */
$thrift->is_compressed ?? null,
/** @phpstan-ignore-next-line */
$thrift->statistics ? Statistics::fromThrift($thrift->statistics) : null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ public function __construct(
) {
}

/**
* @psalm-suppress DocblockTypeContradiction
*/
public static function fromThrift(\Flow\Parquet\Thrift\PageHeader $thrift) : self
{
return new self(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public function __construct(
}

/**
* @psalm-suppress DocblockTypeContradiction
* @psalm-suppress RedundantConditionGivenDocblockType
*/
public static function fromThrift(\Flow\Parquet\Thrift\ColumnChunk $thrift) : self
Expand All @@ -57,7 +56,6 @@ public static function fromThrift(\Flow\Parquet\Thrift\ColumnChunk $thrift) : se
$thrift->meta_data->dictionary_page_offset,
$thrift->meta_data->data_page_offset,
$thrift->meta_data->index_page_offset,
/** @phpstan-ignore-next-line */
$thrift->meta_data->statistics ? Statistics::fromThrift($thrift->meta_data->statistics) : null,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public static function enum() : self
}

/**
* @psalm-suppress DocblockTypeContradiction
* @psalm-suppress RedundantConditionGivenDocblockType
*/
public static function fromThrift(\Flow\Parquet\Thrift\LogicalType $logicalType) : self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static function create(string $name, array $columns) : self
}

/**
* @psalm-suppress DocblockTypeContradiction
* @psalm-suppress RedundantConditionGivenDocblockType
*
* @param array<Column> $children
Expand All @@ -50,7 +49,6 @@ public static function fromThrift(SchemaElement $schemaElement, array $children)
$schemaElement->repetition_type ? Repetition::from($schemaElement->repetition_type) : null,
$children,
$schemaElement->converted_type ? ConvertedType::from($schemaElement->converted_type) : null,
/** @phpstan-ignore-next-line */
$schemaElement->logicalType ? LogicalType::fromThrift($schemaElement->logicalType) : null
);
}
Expand Down
1 change: 0 additions & 1 deletion src/lib/parquet/src/Flow/Parquet/Reader.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public function read(string $path) : ParquetFile
*/
public function readStream($stream) : ParquetFile
{
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_resource($stream)) {
throw new InvalidArgumentException('Given argument is not a valid resource');
}
Expand Down
2 changes: 0 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public function open(string $path, Schema $schema) : void
*/
public function openForStream($resource, Schema $schema) : void
{
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_resource($resource)) {
throw new InvalidArgumentException('Given argument is not a valid resource');
}
Expand Down Expand Up @@ -227,7 +226,6 @@ public function reopenForStream($resource) : void
throw new RuntimeException('Writer is already open');
}

/** @psalm-suppress DocblockTypeContradiction */
if (!\is_resource($resource)) {
throw new InvalidArgumentException('Given argument is not a valid resource');
}
Expand Down

0 comments on commit 3580718

Please sign in to comment.