Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Envelope #185

Merged
merged 6 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
"rector/rector": "^0.19.0",
"roave/infection-static-analysis-plugin": "^1.16",
"spatie/phpunit-watcher": "^1.23",
"yiisoft/yii-debug": "dev-master",
"vimeo/psalm": "^4.30|^5.8",
"yiisoft/test-support": "^3.0"
"yiisoft/test-support": "^3.0",
"yiisoft/yii-debug": "dev-master|dev-php80"
},
"suggest": {
"ext-pcntl": "Need for process signals"
Expand Down
4 changes: 2 additions & 2 deletions src/Adapter/AdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public function runExisting(callable $handlerCallback): void;
*
* @return JobStatus
*/
public function status(string $id): JobStatus;
public function status(string|int $id): JobStatus;

/**
* Pushing a message to the queue. Adapter sets message ID if available.
*/
public function push(MessageInterface $message): void;
public function push(MessageInterface $message): MessageInterface;

/**
* Listen to the queue and pass messages to the given handler as they come.
Expand Down
7 changes: 4 additions & 3 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;

final class SynchronousAdapter implements AdapterInterface
{
Expand Down Expand Up @@ -42,7 +43,7 @@ public function runExisting(callable $handlerCallback): void
}
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
$id = (int) $id;

Expand All @@ -61,12 +62,12 @@ public function status(string $id): JobStatus
throw new InvalidArgumentException('There is no message with the given ID.');
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$key = count($this->messages) + $this->current;
$this->messages[] = $message;

$message->setId((string) $key);
return new IdEnvelope($message, $key);
}

public function subscribe(callable $handlerCallback): void
Expand Down
2 changes: 1 addition & 1 deletion src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
) {
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
$result = $this->queue->status($id);
$this->collector->collectStatus($id, $result);

Check warning on line 24 in src/Debug/QueueDecorator.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ public function status(string|int $id) : JobStatus { $result = $this->queue->status($id); - $this->collector->collectStatus($id, $result); + return $result; } public function push(MessageInterface $message, string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions) : MessageInterface

return $result;
}
Expand All @@ -31,11 +31,11 @@
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions
): MessageInterface {
$message = $this->queue->push($message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getChannelName(), $message, ...$middlewareDefinitions);

Check warning on line 34 in src/Debug/QueueDecorator.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ public function push(MessageInterface $message, string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions) : MessageInterface { $message = $this->queue->push($message, ...$middlewareDefinitions); - $this->collector->collectPush($this->queue->getChannelName(), $message, ...$middlewareDefinitions); + return $message; } public function run(int $max = 0) : void
return $message;
}

public function run(int $max = 0): void

Check warning on line 38 in src/Debug/QueueDecorator.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "DecrementInteger": --- Original +++ New @@ @@ $this->collector->collectPush($this->queue->getChannelName(), $message, ...$middlewareDefinitions); return $message; } - public function run(int $max = 0) : void + public function run(int $max = -1) : void { $this->queue->run($max); }

Check warning on line 38 in src/Debug/QueueDecorator.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.0-ubuntu-latest

Escaped Mutant for Mutator "IncrementInteger": --- Original +++ New @@ @@ $this->collector->collectPush($this->queue->getChannelName(), $message, ...$middlewareDefinitions); return $message; } - public function run(int $max = 0) : void + public function run(int $max = 1) : void { $this->queue->run($max); }
{
$this->queue->run($max);
}
Expand Down
3 changes: 2 additions & 1 deletion src/Exception/JobFailureException.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
use RuntimeException;
use Throwable;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\IdEnvelope;

class JobFailureException extends RuntimeException
{
public function __construct(private MessageInterface $queueMessage, Throwable $previous)
{
$error = $previous->getMessage();
$messageId = $queueMessage->getId() ?? 'null';
$messageId = $queueMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
$messageText = "Processing of message #$messageId is stopped because of an exception:\n$error.";

parent::__construct($messageText, 0, $previous);
Expand Down
15 changes: 15 additions & 0 deletions src/Message/EnvelopeInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

/**
* Envelope is a message container that adds additional metadata.
*/
interface EnvelopeInterface extends MessageInterface
xepozz marked this conversation as resolved.
Show resolved Hide resolved
{
public function getMessage(): MessageInterface;

public function withMessage(MessageInterface $message): self;
}
38 changes: 38 additions & 0 deletions src/Message/EnvelopeTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

trait EnvelopeTrait
{
private MessageInterface $message;

public function getMessage(): MessageInterface
{
return $this->message;
}

public function withMessage(MessageInterface $message): self
{
$instance = clone $this;
$instance->message = $message;

return $instance;
}

public function getHandlerName(): string
{
return $this->message->getHandlerName();
}

public function getData(): mixed
{
return $this->message->getData();
}

public function getMetadata(): array
{
return $this->message->getMetadata();
}
}
38 changes: 38 additions & 0 deletions src/Message/IdEnvelope.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

/**
* ID envelope allows to identify a message.
*/
final class IdEnvelope implements EnvelopeInterface
xepozz marked this conversation as resolved.
Show resolved Hide resolved
{
use EnvelopeTrait;

public const MESSAGE_ID_KEY = 'yii-message-id';

public function __construct(
private MessageInterface $message,
private string|int|null $id = null,
) {
}

public function setId(string|int|null $id): void
{
$this->id = $id;
}

public function getId(): string|int|null
{
return $this->id ?? $this->message->getMetadata()[self::MESSAGE_ID_KEY] ?? null;
}

public function getMetadata(): array
{
return array_merge($this->message->getMetadata(), [
self::MESSAGE_ID_KEY => $this->getId(),
]);
}
}
11 changes: 0 additions & 11 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public function __construct(
private string $handlerName,
private mixed $data,
private array $metadata = [],
private ?string $id = null
) {
}

Expand All @@ -29,16 +28,6 @@ public function getData(): mixed
return $this->data;
}

public function setId(?string $id): void
{
$this->id = $id;
}

public function getId(): ?string
{
return $this->id;
}

public function getMetadata(): array
{
return $this->metadata;
Expand Down
9 changes: 0 additions & 9 deletions src/Message/MessageInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@

interface MessageInterface
{
public function setId(?string $id): void;

/**
* Returns unique message ID.
*
* @return string|null
*/
public function getId(): ?string;

/**
* Returns handler name.
*
Expand Down
25 changes: 25 additions & 0 deletions src/Middleware/FailureHandling/FailureEnvelope.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Middleware\FailureHandling;

use Yiisoft\Queue\Message\EnvelopeInterface;
use Yiisoft\Queue\Message\EnvelopeTrait;
use Yiisoft\Queue\Message\MessageInterface;

final class FailureEnvelope implements EnvelopeInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need phpdoc.

{
use EnvelopeTrait;

public function __construct(
private MessageInterface $message,
private array $meta,
) {
}

public function getMetadata(): array
{
return array_merge($this->message->getMetadata(), $this->meta);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface;
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;

/**
* Failure strategy which resends the given message to a queue with an exponentially increasing delay.
Expand Down Expand Up @@ -62,15 +63,12 @@ public function processFailure(
): FailureHandlingRequest {
$message = $request->getMessage();
if ($this->suites($message)) {
$messageNew = new Message(
handlerName: $message->getHandlerName(),
data: $message->getData(),
metadata: $this->formNewMeta($message),
id: $message->getId(),
);
($this->queue ?? $request->getQueue())->push(
$messageNew,
$this->delayMiddleware->withDelay($this->getDelay($message))
$envelope = new FailureEnvelope($message, $this->createNewMeta($message));
$queue = $this->queue ?? $request->getQueue();
$middlewareDefinitions = $this->delayMiddleware->withDelay($this->getDelay($envelope));
$messageNew = $queue->push(
$envelope,
$middlewareDefinitions
);

return $request->withMessage($messageNew);
Expand All @@ -84,7 +82,7 @@ private function suites(MessageInterface $message): bool
return $this->maxAttempts > $this->getAttempts($message);
}

private function formNewMeta(MessageInterface $message): array
private function createNewMeta(MessageInterface $message): array
{
$meta = $message->getMetadata();
$meta[self::META_KEY_DELAY . "-$this->id"] = $this->getDelay($message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace Yiisoft\Queue\Middleware\FailureHandling\Implementation;

use InvalidArgumentException;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope;
use Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest;
use Yiisoft\Queue\Middleware\FailureHandling\MessageFailureHandlerInterface;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFailureInterface;
Expand Down Expand Up @@ -40,15 +40,11 @@ public function processFailure(
): FailureHandlingRequest {
$message = $request->getMessage();
if ($this->suites($message)) {
$message = new Message(
handlerName: $message->getHandlerName(),
data: $message->getData(),
metadata: $this->createMeta($message),
id: $message->getId(),
);
$message = $this->queue?->push($message) ?? $request->getQueue()->push($message);
$envelope = new FailureEnvelope($message, $this->createMeta($message));
$envelope = ($this->queue ?? $request->getQueue())->push($envelope);

return $request->withMessage($message)->withQueue($this->queue ?? $request->getQueue());
return $request->withMessage($envelope)
->withQueue($this->queue ?? $request->getQueue());
}

return $handler->handleFailure($request);
Expand All @@ -59,11 +55,6 @@ private function suites(MessageInterface $message): bool
return $this->getAttempts($message) < $this->maxAttempts;
}

private function getMetaKey(): string
{
return self::META_KEY_RESEND . "-$this->id";
}

private function createMeta(MessageInterface $message): array
{
$metadata = $message->getMetadata();
Expand All @@ -81,4 +72,9 @@ private function getAttempts(MessageInterface $message): int

return (int) $result;
}

private function getMetaKey(): string
{
return self::META_KEY_RESEND . "-$this->id";
}
}
6 changes: 2 additions & 4 deletions src/Middleware/Push/AdapterPushHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ final class AdapterPushHandler implements MessageHandlerPushInterface
{
public function handlePush(PushRequest $request): PushRequest
{
if ($request->getAdapter() === null) {
if (($adapter = $request->getAdapter()) === null) {
throw new AdapterNotConfiguredException();
}
$request->getAdapter()->push($request->getMessage());

return $request;
return $request->withMessage($adapter->push($request->getMessage()));
}
}
8 changes: 5 additions & 3 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Push\PushRequest;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;

final class Queue implements QueueInterface
{
Expand Down Expand Up @@ -57,9 +58,10 @@ public function push(
->dispatch($request, $this->createPushHandler($middlewareDefinitions))
->getMessage();

$messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
$this->logger->info(
'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.',
['handlerName' => $message->getHandlerName(), 'id' => $message->getId() ?? 'null']
['handlerName' => $message->getHandlerName(), 'id' => $messageId]
);

return $message;
Expand Down Expand Up @@ -100,7 +102,7 @@ public function listen(): void
$this->logger->info('Finish listening to the queue.');
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
$this->checkAdapter();

Expand Down Expand Up @@ -159,7 +161,7 @@ private function createPushHandler(array $middlewares): MessageHandlerPushInterf
return new class (
$this->adapterPushHandler,
$this->pushMiddlewareDispatcher,
[...array_values($this->middlewareDefinitions), ...array_values($middlewares)]
array_merge($this->middlewareDefinitions, $middlewares)
) implements MessageHandlerPushInterface {
public function __construct(
private AdapterPushHandler $adapterPushHandler,
Expand Down
2 changes: 1 addition & 1 deletion src/QueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function listen(): void;
*
* @return JobStatus
*/
public function status(string $id): JobStatus;
public function status(string|int $id): JobStatus;

public function withAdapter(AdapterInterface $adapter): self;

Expand Down
Loading
Loading