Skip to content

Commit 74c38a0

Browse files
committed
add write lock for event store
1 parent 82a29b5 commit 74c38a0

File tree

4 files changed

+365
-90
lines changed

4 files changed

+365
-90
lines changed

phpstan.neon.dist

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,6 @@ parameters:
55
level: max
66
paths:
77
- src
8-
checkGenericClassInNonGenericObjectType: false
8+
ignoreErrors:
9+
-
10+
identifier: missingType.generics

src/Store/DoctrineDbalStore.php

+180-89
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
use Closure;
88
use Doctrine\DBAL\Connection;
99
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
10+
use Doctrine\DBAL\Platforms\MariaDBPlatform;
11+
use Doctrine\DBAL\Platforms\MySQLPlatform;
1012
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
13+
use Doctrine\DBAL\Platforms\SQLitePlatform;
1114
use Doctrine\DBAL\Query\QueryBuilder;
1215
use Doctrine\DBAL\Schema\Schema;
1316
use Doctrine\DBAL\Types\Type;
@@ -48,12 +51,17 @@ final class DoctrineDbalStore implements Store, SubscriptionStore, DoctrineSchem
4851
*/
4952
private const MAX_UNSIGNED_SMALL_INT = 65_535;
5053

54+
/**
55+
* Default lock id for advisory lock.
56+
*/
57+
private const DEFAULT_LOCK_ID = 133742;
58+
5159
private readonly HeadersSerializer $headersSerializer;
5260

53-
/** @var array{table_name: string, aggregate_id_type: 'string'|'uuid'} */
61+
/** @var array{table_name: string, aggregate_id_type: 'string'|'uuid', locking: bool, lock_id: int, lock_timeout: int} */
5462
private readonly array $config;
5563

56-
/** @param array{table_name?: string, aggregate_id_type?: 'string'|'uuid'} $config */
64+
/** @param array{table_name?: string, aggregate_id_type?: 'string'|'uuid', locking?: bool, lock_id?: int, lock_timeout?: int} $config */
5765
public function __construct(
5866
private readonly Connection $connection,
5967
private readonly EventSerializer $eventSerializer,
@@ -65,6 +73,9 @@ public function __construct(
6573
$this->config = array_merge([
6674
'table_name' => 'eventstore',
6775
'aggregate_id_type' => 'uuid',
76+
'locking' => true,
77+
'lock_id' => self::DEFAULT_LOCK_ID,
78+
'lock_timeout' => -1,
6879
], $config);
6980
}
7081

@@ -155,115 +166,120 @@ public function save(Message ...$messages): void
155166
return;
156167
}
157168

158-
$this->connection->transactional(
159-
function (Connection $connection) use ($messages): void {
160-
/** @var array<string, int> $achievedUntilPlayhead */
161-
$achievedUntilPlayhead = [];
162-
163-
$booleanType = Type::getType(Types::BOOLEAN);
164-
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);
165-
166-
$columns = [
167-
'aggregate',
168-
'aggregate_id',
169-
'playhead',
170-
'event',
171-
'payload',
172-
'recorded_on',
173-
'new_stream_start',
174-
'archived',
175-
'custom_headers',
176-
];
177-
178-
$columnsLength = count($columns);
179-
$batchSize = (int)floor(self::MAX_UNSIGNED_SMALL_INT / $columnsLength);
180-
$placeholder = implode(', ', array_fill(0, $columnsLength, '?'));
181-
182-
$parameters = [];
183-
$placeholders = [];
184-
/** @var array<int<0, max>, Type> $types */
185-
$types = [];
186-
$position = 0;
187-
foreach ($messages as $message) {
188-
/** @var int<0, max> $offset */
189-
$offset = $position * $columnsLength;
190-
$placeholders[] = $placeholder;
191-
192-
$data = $this->eventSerializer->serialize($message->event());
193-
194-
try {
195-
$aggregateHeader = $message->header(AggregateHeader::class);
196-
} catch (HeaderNotFound $e) {
197-
throw new MissingDataForStorage($e->name, $e);
198-
}
169+
$this->lock();
170+
try {
171+
$this->connection->transactional(
172+
function (Connection $connection) use ($messages): void {
173+
/** @var array<string, int> $achievedUntilPlayhead */
174+
$achievedUntilPlayhead = [];
175+
176+
$booleanType = Type::getType(Types::BOOLEAN);
177+
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);
178+
179+
$columns = [
180+
'aggregate',
181+
'aggregate_id',
182+
'playhead',
183+
'event',
184+
'payload',
185+
'recorded_on',
186+
'new_stream_start',
187+
'archived',
188+
'custom_headers',
189+
];
190+
191+
$columnsLength = count($columns);
192+
$batchSize = (int)floor(self::MAX_UNSIGNED_SMALL_INT / $columnsLength);
193+
$placeholder = implode(', ', array_fill(0, $columnsLength, '?'));
199194

200-
$parameters[] = $aggregateHeader->aggregateName;
201-
$parameters[] = $aggregateHeader->aggregateId;
202-
$parameters[] = $aggregateHeader->playhead;
203-
$parameters[] = $data->name;
204-
$parameters[] = $data->payload;
195+
$parameters = [];
196+
$placeholders = [];
197+
/** @var array<int<0, max>, Type> $types */
198+
$types = [];
199+
$position = 0;
200+
foreach ($messages as $message) {
201+
/** @var int<0, max> $offset */
202+
$offset = $position * $columnsLength;
203+
$placeholders[] = $placeholder;
205204

206-
$parameters[] = $aggregateHeader->recordedOn;
207-
$types[$offset + 5] = $dateTimeType;
205+
$data = $this->eventSerializer->serialize($message->event());
208206

209-
$streamStart = $message->hasHeader(StreamStartHeader::class);
207+
try {
208+
$aggregateHeader = $message->header(AggregateHeader::class);
209+
} catch (HeaderNotFound $e) {
210+
throw new MissingDataForStorage($e->name, $e);
211+
}
210212

211-
if ($streamStart) {
212-
$key = $aggregateHeader->aggregateName . '/' . $aggregateHeader->aggregateId;
213-
$achievedUntilPlayhead[$key] = $aggregateHeader->playhead;
214-
}
213+
$parameters[] = $aggregateHeader->aggregateName;
214+
$parameters[] = $aggregateHeader->aggregateId;
215+
$parameters[] = $aggregateHeader->playhead;
216+
$parameters[] = $data->name;
217+
$parameters[] = $data->payload;
215218

216-
$parameters[] = $streamStart;
217-
$types[$offset + 6] = $booleanType;
219+
$parameters[] = $aggregateHeader->recordedOn;
220+
$types[$offset + 5] = $dateTimeType;
218221

219-
$parameters[] = $message->hasHeader(ArchivedHeader::class);
220-
$types[$offset + 7] = $booleanType;
222+
$streamStart = $message->hasHeader(StreamStartHeader::class);
221223

222-
$parameters[] = $this->headersSerializer->serialize($this->getCustomHeaders($message));
224+
if ($streamStart) {
225+
$key = $aggregateHeader->aggregateName . '/' . $aggregateHeader->aggregateId;
226+
$achievedUntilPlayhead[$key] = $aggregateHeader->playhead;
227+
}
223228

224-
$position++;
229+
$parameters[] = $streamStart;
230+
$types[$offset + 6] = $booleanType;
225231

226-
if ($position !== $batchSize) {
227-
continue;
228-
}
232+
$parameters[] = $message->hasHeader(ArchivedHeader::class);
233+
$types[$offset + 7] = $booleanType;
229234

230-
$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
235+
$parameters[] = $this->headersSerializer->serialize($this->getCustomHeaders($message));
231236

232-
$parameters = [];
233-
$placeholders = [];
234-
$types = [];
237+
$position++;
235238

236-
$position = 0;
237-
}
239+
if ($position !== $batchSize) {
240+
continue;
241+
}
238242

239-
if ($position !== 0) {
240-
$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
241-
}
243+
$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
242244

243-
foreach ($achievedUntilPlayhead as $key => $playhead) {
244-
[$aggregateName, $aggregateId] = explode('/', $key);
245+
$parameters = [];
246+
$placeholders = [];
247+
$types = [];
245248

246-
$connection->executeStatement(
247-
sprintf(
248-
<<<'SQL'
249+
$position = 0;
250+
}
251+
252+
if ($position !== 0) {
253+
$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
254+
}
255+
256+
foreach ($achievedUntilPlayhead as $key => $playhead) {
257+
[$aggregateName, $aggregateId] = explode('/', $key);
258+
259+
$connection->executeStatement(
260+
sprintf(
261+
<<<'SQL'
249262
UPDATE %s
250263
SET archived = true
251264
WHERE aggregate = :aggregate
252265
AND aggregate_id = :aggregate_id
253266
AND playhead < :playhead
254267
AND archived = false
255268
SQL,
256-
$this->config['table_name'],
257-
),
258-
[
259-
'aggregate' => $aggregateName,
260-
'aggregate_id' => $aggregateId,
261-
'playhead' => $playhead,
262-
],
263-
);
264-
}
265-
},
266-
);
269+
$this->config['table_name'],
270+
),
271+
[
272+
'aggregate' => $aggregateName,
273+
'aggregate_id' => $aggregateId,
274+
'playhead' => $playhead,
275+
],
276+
);
277+
}
278+
},
279+
);
280+
} finally {
281+
$this->unlock();
282+
}
267283
}
268284

269285
/**
@@ -423,4 +439,79 @@ private function executeSave(
423439
throw new UniqueConstraintViolation($e);
424440
}
425441
}
442+
443+
private function lock(): void
444+
{
445+
if (!$this->config['locking']) {
446+
return;
447+
}
448+
449+
$platform = $this->connection->getDatabasePlatform();
450+
451+
if ($platform instanceof PostgreSQLPlatform) {
452+
$this->connection->executeStatement(
453+
sprintf(
454+
'SELECT pg_advisory_lock(%s)',
455+
$this->config['lock_id'],
456+
),
457+
);
458+
459+
return;
460+
}
461+
462+
if ($platform instanceof MariaDBPlatform || $platform instanceof MySQLPlatform) {
463+
$this->connection->fetchAllAssociative(
464+
sprintf(
465+
'SELECT GET_LOCK("%s", %d)',
466+
$this->config['lock_id'],
467+
$this->config['lock_timeout'],
468+
),
469+
);
470+
471+
return;
472+
}
473+
474+
if ($platform instanceof SQLitePlatform) {
475+
return; // locking is not supported
476+
}
477+
478+
throw new LockingNotImplemented($platform::class);
479+
}
480+
481+
private function unlock(): void
482+
{
483+
if (!$this->config['locking']) {
484+
return;
485+
}
486+
487+
$platform = $this->connection->getDatabasePlatform();
488+
489+
if ($platform instanceof PostgreSQLPlatform) {
490+
$this->connection->executeStatement(
491+
sprintf(
492+
'SELECT pg_advisory_unlock(%s)',
493+
$this->config['lock_id'],
494+
),
495+
);
496+
497+
return;
498+
}
499+
500+
if ($platform instanceof MariaDBPlatform || $platform instanceof MySQLPlatform) {
501+
$this->connection->fetchAllAssociative(
502+
sprintf(
503+
'SELECT RELEASE_LOCK("%s")',
504+
$this->config['lock_id'],
505+
),
506+
);
507+
508+
return;
509+
}
510+
511+
if ($platform instanceof SQLitePlatform) {
512+
return; // locking is not supported
513+
}
514+
515+
throw new LockingNotImplemented($platform::class);
516+
}
426517
}

src/Store/LockingNotImplemented.php

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Patchlevel\EventSourcing\Store;
6+
7+
use function sprintf;
8+
9+
final class LockingNotImplemented extends StoreException
10+
{
11+
/** @param class-string $platform */
12+
public function __construct(string $platform)
13+
{
14+
parent::__construct(
15+
sprintf(
16+
'Locking is not implemented on platform %s. Disable locking in the store options.',
17+
$platform,
18+
),
19+
);
20+
}
21+
}

0 commit comments

Comments
 (0)