diff --git a/phpstan.neon b/phpstan.neon index fe525bffd..776a18153 100644 --- a/phpstan.neon +++ b/phpstan.neon @@ -1,5 +1,6 @@ parameters: level: 8 + treatPhpDocTypesAsCertain: false checkGenericClassInNonGenericObjectType: false checkMissingIterableValueType: false paths: diff --git a/psalm.xml b/psalm.xml index ff6295297..2c384dd52 100644 --- a/psalm.xml +++ b/psalm.xml @@ -38,6 +38,7 @@ + diff --git a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php index 75af6adde..031da5bbe 100644 --- a/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php +++ b/src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVDetector.php @@ -26,7 +26,6 @@ final class CSVDetector */ public function __construct($resource, ?Option $fallback = new Option(',', '"', '\\'), ?Options $options = null) { - /** @psalm-suppress DocblockTypeContradiction */ if (!\is_resource($resource)) { throw new InvalidArgumentException('Argument must be a valid resource'); } diff --git a/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php b/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php index 2bb782435..77041a602 100644 --- a/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php +++ b/src/adapter/etl-adapter-google-sheet/src/Flow/ETL/Adapter/GoogleSheet/GoogleSheetExtractor.php @@ -48,9 +48,7 @@ public function extract(FlowContext $context) : \Generator /** * @var array[] $values * - * @psalm-suppress RedundantConditionGivenDocblockType, DocblockTypeContradiction - * - * @phpstan-ignore-next-line + * @psalm-suppress RedundantConditionGivenDocblockType */ $values = $response->getValues() ?? []; @@ -116,9 +114,7 @@ function (array $rowData) use ($headers, $shouldPutInputIntoRows) { /** * @var array[] $values * - * @psalm-suppress RedundantConditionGivenDocblockType, DocblockTypeContradiction - * - * @phpstan-ignore-next-line + * @psalm-suppress RedundantConditionGivenDocblockType */ $values = $response->getValues() ?? []; } diff --git a/src/core/etl/src/Flow/ETL/Config.php b/src/core/etl/src/Flow/ETL/Config.php index 62a0e90a9..0d12bbef9 100644 --- a/src/core/etl/src/Flow/ETL/Config.php +++ b/src/core/etl/src/Flow/ETL/Config.php @@ -4,6 +4,7 @@ namespace Flow\ETL; +use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\Filesystem\FilesystemStreams; use Flow\ETL\Pipeline\Optimizer; use Flow\ETL\Row\EntryFactory; @@ -19,6 +20,9 @@ final class Config public const EXTERNAL_SORT_MAX_MEMORY_ENV = 'FLOW_EXTERNAL_SORT_MAX_MEMORY'; + /** + * @param int<1, max> $cacheBatchSize + */ public function __construct( private readonly string $id, private readonly Serializer $serializer, @@ -27,8 +31,12 @@ public function __construct( private readonly FilesystemStreams $filesystemStreams, private readonly Optimizer $optimizer, private readonly bool $putInputIntoRows, - private readonly EntryFactory $entryFactory + private readonly EntryFactory $entryFactory, + private readonly int $cacheBatchSize ) { + if ($this->cacheBatchSize < 1) { + throw new InvalidArgumentException('Cache batch size must be greater than 0'); + } } public static function builder() : ConfigBuilder @@ -46,6 +54,14 @@ public function cache() : Cache return $this->cache; } + /** + * @return int<1, max> + */ + public function cacheBatchSize() : int + { + return $this->cacheBatchSize; + } + public function entryFactory() : EntryFactory { return $this->entryFactory; diff --git a/src/core/etl/src/Flow/ETL/ConfigBuilder.php b/src/core/etl/src/Flow/ETL/ConfigBuilder.php index 942c4c13a..cb5911f10 100644 --- a/src/core/etl/src/Flow/ETL/ConfigBuilder.php +++ b/src/core/etl/src/Flow/ETL/ConfigBuilder.php @@ -5,17 +5,23 @@ namespace Flow\ETL; use Flow\ETL\Cache\LocalFilesystemCache; +use Flow\ETL\Exception\InvalidArgumentException; use Flow\ETL\ExternalSort\MemorySort; use Flow\ETL\Filesystem\{FilesystemStreams, LocalFilesystem}; use Flow\ETL\Monitoring\Memory\Unit; use Flow\ETL\Pipeline\Optimizer; use Flow\ETL\Row\Factory\NativeEntryFactory; -use Flow\Serializer\{CompressingSerializer, NativePHPSerializer, Serializer}; +use Flow\Serializer\{NativePHPSerializer, Serializer}; final class ConfigBuilder { private ?Cache $cache; + /** + * @var int<1, max> + */ + private int $cacheBatchSize = 1000; + private ?ExternalSort $externalSort; private ?Filesystem $filesystem; @@ -46,7 +52,7 @@ public function build() : Config { $this->id ??= \uniqid('flow_php', true); $entryFactory = new NativeEntryFactory(); - $this->serializer ??= new CompressingSerializer(new NativePHPSerializer()); + $this->serializer ??= new NativePHPSerializer(); $cachePath = \is_string(\getenv(Config::CACHE_DIR_ENV)) && \realpath(\getenv(Config::CACHE_DIR_ENV)) ? \getenv(Config::CACHE_DIR_ENV) : \sys_get_temp_dir() . '/flow_php/cache'; @@ -91,7 +97,8 @@ public function build() : Config new FilesystemStreams($this->filesystem), $this->optimizer, $this->putInputIntoRows, - $entryFactory + $entryFactory, + $this->cacheBatchSize ); } @@ -102,6 +109,20 @@ public function cache(Cache $cache) : self return $this; } + /** + * @throws InvalidArgumentException + */ + public function cacheBatchSize(int $cacheBatchSize) : self + { + if ($cacheBatchSize < 1) { + throw new InvalidArgumentException('Cache batch size must be greater than 0'); + } + + $this->cacheBatchSize = $cacheBatchSize; + + return $this; + } + public function dontPutInputIntoRows() : self { $this->putInputIntoRows = false; diff --git a/src/core/etl/src/Flow/ETL/DataFrame.php b/src/core/etl/src/Flow/ETL/DataFrame.php index 1c28e84f3..2178f9216 100644 --- a/src/core/etl/src/Flow/ETL/DataFrame.php +++ b/src/core/etl/src/Flow/ETL/DataFrame.php @@ -144,9 +144,16 @@ public function batchSize(int $size) : self * @lazy * * @param null|string $id + * + * @throws InvalidArgumentException */ - public function cache(?string $id = null) : self + public function cache(?string $id = null, ?int $cacheBatchSize = null) : self { + if ($cacheBatchSize !== null && $cacheBatchSize < 1) { + throw new InvalidArgumentException('Cache batch size must be greater than 0'); + } + + $this->batchSize($cacheBatchSize ?? $this->context->config->cacheBatchSize()); $this->pipeline = new CachingPipeline($this->pipeline, $id); return $this; diff --git a/src/core/etl/src/Flow/ETL/Join/Expression.php b/src/core/etl/src/Flow/ETL/Join/Expression.php index 4c01f4a2d..8ba8e10f8 100644 --- a/src/core/etl/src/Flow/ETL/Join/Expression.php +++ b/src/core/etl/src/Flow/ETL/Join/Expression.php @@ -18,8 +18,6 @@ public function __construct( } /** - * @psalm-suppress DocblockTypeContradiction - * * @param array|Comparison $comparison */ public static function on(array|Comparison $comparison, string $joinPrefix = '') : self diff --git a/src/core/etl/src/Flow/ETL/Monitoring/Memory/Unit.php b/src/core/etl/src/Flow/ETL/Monitoring/Memory/Unit.php index bf2084fe8..9e03a10db 100644 --- a/src/core/etl/src/Flow/ETL/Monitoring/Memory/Unit.php +++ b/src/core/etl/src/Flow/ETL/Monitoring/Memory/Unit.php @@ -43,10 +43,13 @@ public static function fromString(string $memoryString) : self switch (\strtoupper($unit)) { case 'K': + case 'B': return self::fromKb((int) \substr($limit, 0, -1)); case 'M': + case 'MB': return self::fromMb((int) \substr($limit, 0, -1)); case 'G': + case 'GB': return self::fromGb((int) \substr($limit, 0, -1)); default: diff --git a/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php b/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php index a81b07dd4..e2afd9c77 100644 --- a/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php +++ b/src/core/etl/src/Flow/ETL/Pipeline/BatchingPipeline.php @@ -22,11 +22,6 @@ public function __construct(private readonly Pipeline $pipeline, private readonl { $this->nextPipeline = $pipeline->cleanCopy(); - /** - * @psalm-suppress DocblockTypeContradiction - * - * @phpstan-ignore-next-line - */ if ($this->size <= 0) { throw new InvalidArgumentException('Batch size must be greater than 0, given: ' . $this->size); } diff --git a/src/core/etl/src/Flow/ETL/Rows.php b/src/core/etl/src/Flow/ETL/Rows.php index 46e6a4afa..fffd48a6a 100644 --- a/src/core/etl/src/Flow/ETL/Rows.php +++ b/src/core/etl/src/Flow/ETL/Rows.php @@ -503,7 +503,6 @@ public function merge(self $rows) : self */ public function offsetExists($offset) : bool { - /** @psalm-suppress DocblockTypeContradiction */ if (!\is_int($offset)) { throw new InvalidArgumentException('Rows accepts only integer offsets'); } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php index ffba57d0b..9ee3b1213 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php @@ -31,15 +31,8 @@ public function test_etl_sort_at_disk_in_memory() : void $cache = \array_diff(\scandir($this->cacheDir), ['..', '.']); self::assertEmpty($cache); - // 50 initial writes - // 2500 single row writes - // 50 merged writes - self::assertSame(2600, $cacheSpy->writes()); - // 1 main cache - // 50 tmp caches - // 1 sorted cache - // 1 extracted cache - self::assertSame(52, $cacheSpy->clears()); + self::assertSame(2506, $cacheSpy->writes()); + self::assertSame(5, $cacheSpy->clears()); } public function test_etl_sort_by_in_memory() : void @@ -60,6 +53,6 @@ public function test_etl_sort_by_in_memory() : void self::assertEmpty($cache); self::assertSame(\range(0, 39), $rows->reduceToArray('int')); - self::assertSame(20, $cacheSpy->writes()); + self::assertSame(1, $cacheSpy->writes()); } } diff --git a/src/core/etl/tests/Flow/ETL/Tests/Integration/Window/WindowFunctionsTest.php b/src/core/etl/tests/Flow/ETL/Tests/Integration/Window/WindowFunctionsTest.php index 1ce7c7ba6..3c625c1d1 100644 --- a/src/core/etl/tests/Flow/ETL/Tests/Integration/Window/WindowFunctionsTest.php +++ b/src/core/etl/tests/Flow/ETL/Tests/Integration/Window/WindowFunctionsTest.php @@ -31,15 +31,13 @@ public function test_rank_on_partitioned_window() : void ->sortBy(ref('department'), ref('rank')) ->get(); - self::assertSame( + self::assertEquals( [ [ ['id' => 5, 'name' => 'Jane', 'department' => 'Finances', 'salary' => 14_000, 'rank' => 1], ['id' => 3, 'name' => 'Tomas', 'department' => 'Finances', 'salary' => 11_000, 'rank' => 2], ['id' => 4, 'name' => 'John', 'department' => 'Finances', 'salary' => 9000, 'rank' => 3], ['id' => 6, 'name' => 'Janet', 'department' => 'Finances', 'salary' => 4000, 'rank' => 4], - ], - [ ['id' => 1, 'name' => 'Greg', 'department' => 'IT', 'salary' => 6000, 'rank' => 1], ['id' => 2, 'name' => 'Michal', 'department' => 'IT', 'salary' => 5000, 'rank' => 2], ], diff --git a/src/lib/doctrine-dbal-bulk/src/Flow/Doctrine/Bulk/BulkData.php b/src/lib/doctrine-dbal-bulk/src/Flow/Doctrine/Bulk/BulkData.php index a66478557..5190edb42 100644 --- a/src/lib/doctrine-dbal-bulk/src/Flow/Doctrine/Bulk/BulkData.php +++ b/src/lib/doctrine-dbal-bulk/src/Flow/Doctrine/Bulk/BulkData.php @@ -27,7 +27,6 @@ public function __construct(array $rows) $firstRow = \reset($rows); - /** @psalm-suppress DocblockTypeContradiction */ if (!\is_array($firstRow)) { throw new RuntimeException('Each row must be an array'); } @@ -35,7 +34,6 @@ public function __construct(array $rows) $columns = \array_keys($firstRow); foreach ($rows as $row) { - /** @psalm-suppress DocblockTypeContradiction */ if (!\is_array($row)) { throw new RuntimeException('Each row must be an array'); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkReader/WholeChunkReader.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkReader/WholeChunkReader.php index e94e9ed75..41f7fbf1f 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkReader/WholeChunkReader.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkReader/WholeChunkReader.php @@ -91,8 +91,6 @@ public function read(ColumnChunk $columnChunk, FlatColumn $column, $stream) : \G /** * @param resource $stream - * - * @psalm-suppress DocblockTypeContradiction */ private function readHeader($stream, int $pageOffset) : ?PageHeader { diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkViewer/WholeChunkViewer.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkViewer/WholeChunkViewer.php index 3c95722ae..61975617c 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkViewer/WholeChunkViewer.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/ColumnChunkViewer/WholeChunkViewer.php @@ -52,8 +52,6 @@ public function view(ColumnChunk $columnChunk, FlatColumn $column, $stream) : \G /** * @param resource $stream - * - * @psalm-suppress DocblockTypeContradiction */ private function readHeader($stream, int $pageOffset) : ?PageHeader { diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php index 4f15d797c..434aa1f27 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/Header/DataPageHeaderV2.php @@ -27,7 +27,6 @@ public function __construct( public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift) : self { - /** @psalm-suppress DocblockTypeContradiction */ return new self( $thrift->num_values, $thrift->num_nulls, @@ -37,7 +36,6 @@ public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift) $thrift->repetition_levels_byte_length, /** @phpstan-ignore-next-line */ $thrift->is_compressed ?? null, - /** @phpstan-ignore-next-line */ $thrift->statistics ? Statistics::fromThrift($thrift->statistics) : null ); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php index 20cbc5129..95afe4177 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Page/PageHeader.php @@ -22,9 +22,6 @@ public function __construct( ) { } - /** - * @psalm-suppress DocblockTypeContradiction - */ public static function fromThrift(\Flow\Parquet\Thrift\PageHeader $thrift) : self { return new self( diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php index 066042d11..4e610cb16 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup/ColumnChunk.php @@ -40,7 +40,6 @@ public function __construct( } /** - * @psalm-suppress DocblockTypeContradiction * @psalm-suppress RedundantConditionGivenDocblockType */ public static function fromThrift(\Flow\Parquet\Thrift\ColumnChunk $thrift) : self @@ -57,7 +56,6 @@ public static function fromThrift(\Flow\Parquet\Thrift\ColumnChunk $thrift) : se $thrift->meta_data->dictionary_page_offset, $thrift->meta_data->data_page_offset, $thrift->meta_data->index_page_offset, - /** @phpstan-ignore-next-line */ $thrift->meta_data->statistics ? Statistics::fromThrift($thrift->meta_data->statistics) : null, ); } diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php index 083fb908a..c871c065d 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/LogicalType.php @@ -65,7 +65,6 @@ public static function enum() : self } /** - * @psalm-suppress DocblockTypeContradiction * @psalm-suppress RedundantConditionGivenDocblockType */ public static function fromThrift(\Flow\Parquet\Thrift\LogicalType $logicalType) : self diff --git a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/NestedColumn.php b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/NestedColumn.php index e6024daa6..c20a5538a 100644 --- a/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/NestedColumn.php +++ b/src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema/NestedColumn.php @@ -38,7 +38,6 @@ public static function create(string $name, array $columns) : self } /** - * @psalm-suppress DocblockTypeContradiction * @psalm-suppress RedundantConditionGivenDocblockType * * @param array $children @@ -50,7 +49,6 @@ public static function fromThrift(SchemaElement $schemaElement, array $children) $schemaElement->repetition_type ? Repetition::from($schemaElement->repetition_type) : null, $children, $schemaElement->converted_type ? ConvertedType::from($schemaElement->converted_type) : null, - /** @phpstan-ignore-next-line */ $schemaElement->logicalType ? LogicalType::fromThrift($schemaElement->logicalType) : null ); } diff --git a/src/lib/parquet/src/Flow/Parquet/Reader.php b/src/lib/parquet/src/Flow/Parquet/Reader.php index 6ef6b01ba..f73cb8adb 100644 --- a/src/lib/parquet/src/Flow/Parquet/Reader.php +++ b/src/lib/parquet/src/Flow/Parquet/Reader.php @@ -41,7 +41,6 @@ public function read(string $path) : ParquetFile */ public function readStream($stream) : ParquetFile { - /** @psalm-suppress DocblockTypeContradiction */ if (!\is_resource($stream)) { throw new InvalidArgumentException('Given argument is not a valid resource'); } diff --git a/src/lib/parquet/src/Flow/Parquet/Writer.php b/src/lib/parquet/src/Flow/Parquet/Writer.php index 828c75d18..6478bd087 100644 --- a/src/lib/parquet/src/Flow/Parquet/Writer.php +++ b/src/lib/parquet/src/Flow/Parquet/Writer.php @@ -130,7 +130,6 @@ public function open(string $path, Schema $schema) : void */ public function openForStream($resource, Schema $schema) : void { - /** @psalm-suppress DocblockTypeContradiction */ if (!\is_resource($resource)) { throw new InvalidArgumentException('Given argument is not a valid resource'); } @@ -227,7 +226,6 @@ public function reopenForStream($resource) : void throw new RuntimeException('Writer is already open'); } - /** @psalm-suppress DocblockTypeContradiction */ if (!\is_resource($resource)) { throw new InvalidArgumentException('Given argument is not a valid resource'); }