Skip to content

Commit eefea4a

Browse files
authored
Improve performance of sorting operation by reducing number of writes to cache (#1036)
* Improve performance of sorting operation by reducing number of writes to cache * CS Fixes
1 parent 68bc7a3 commit eefea4a

File tree

4 files changed

+22
-28
lines changed

4 files changed

+22
-28
lines changed

src/core/etl/src/Flow/ETL/Cache/PSRSimpleCache.php

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,26 @@ public function has(string $id) : bool
4040
public function read(string $id) : \Generator
4141
{
4242
foreach ($this->index($id) as $entry) {
43-
if ($this->cache->has($entry)) {
44-
/**
45-
* @var Rows $rows
46-
*/
47-
$rows = $this->serializer->unserialize((string) $this->cache->get($entry), Rows::class);
43+
$serializedRows = $this->cache->get($entry);
4844

49-
yield $rows;
45+
if ($serializedRows === null) {
46+
continue;
5047
}
48+
49+
/**
50+
* @var Rows $rows
51+
*/
52+
$rows = $this->serializer->unserialize((string) $serializedRows, Rows::class);
53+
54+
yield $rows;
5155
}
5256
}
5357

5458
private function addToIndex(string $indexId, string $id) : void
5559
{
56-
if (!$this->cache->has($indexId)) {
57-
$this->cache->set($indexId, [$id], $this->ttl);
58-
59-
return;
60-
}
61-
62-
/** @var array<string> $index */
60+
/** @var null|array<string> $index */
6361
$index = $this->cache->get($indexId);
64-
$this->cache->set($indexId, \array_merge($index, [$id]), $this->ttl);
62+
$this->cache->set($indexId, \array_merge($index ?? [], [$id]), $this->ttl);
6563
}
6664

6765
/**
@@ -71,13 +69,9 @@ private function addToIndex(string $indexId, string $id) : void
7169
*/
7270
private function index(string $indexId) : array
7371
{
74-
if (!$this->cache->has($indexId)) {
75-
return [];
76-
}
77-
78-
/** @var array<string> $index */
72+
/** @var null|array<string> $index */
7973
$index = $this->cache->get($indexId);
8074

81-
return $index;
75+
return $index ?? [];
8276
}
8377
}

src/core/etl/src/Flow/ETL/ExternalSort/CacheExternalSort.php

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,8 @@ public function sortBy(Reference ...$refs) : Extractor
3636
/** @var int $i */
3737
foreach ($this->cache->read($this->id) as $i => $rows) {
3838
$maxRowsSize = \max($maxRowsSize, $rows->count());
39-
/** @var Rows $singleRowRows */
4039
$partId = $this->id . '_tmp_' . $i;
41-
42-
foreach ($rows->sortBy(...$refs)->chunks(1) as $singleRowRows) {
43-
$this->cache->add($partId, $singleRowRows);
44-
}
45-
40+
$this->cache->add($partId, $rows->sortBy(...$refs));
4641
$cachedPartsArray[$partId] = $this->cache->read($partId);
4742
}
4843

src/core/etl/src/Flow/ETL/ExternalSort/CachedParts.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ public function createHeap(Reference ...$refs) : RowsMinHeap
3333

3434
foreach ($this->generators as $cacheId => $generator) {
3535
if ($generator->valid()) {
36-
$heap->insert(CachedRow::fromRows($generator->current(), $cacheId));
36+
/** @var Rows $rows */
37+
$rows = $generator->current();
38+
39+
foreach ($rows->chunks(1) as $chunk) {
40+
$heap->insert(CachedRow::fromRows($chunk, $cacheId));
41+
}
3742
$generator->next();
3843
}
3944
}

src/core/etl/tests/Flow/ETL/Tests/Integration/DataFrame/SortTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public function test_etl_sort_at_disk_in_memory() : void
3131
$cache = \array_diff(\scandir($this->cacheDir), ['..', '.']);
3232

3333
self::assertEmpty($cache);
34-
self::assertSame(2506, $cacheSpy->writes());
34+
self::assertSame(9, $cacheSpy->writes());
3535
self::assertSame(5, $cacheSpy->clears());
3636
}
3737

0 commit comments

Comments
 (0)