forked from telephantast/bunny-transport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBunnyConsume.php
73 lines (61 loc) · 2.01 KB
/
BunnyConsume.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<?php
declare(strict_types=1);
namespace Telephantast\BunnyTransport;
use Bunny\Message;
use Telephantast\MessageBus\Async\Consumer;
use Telephantast\MessageBus\Async\ObjectDenormalizer;
use Telephantast\MessageBus\Async\TransportConsume;
use function dump;
use function React\Async\await;
/**
* @api
*/
final class BunnyConsume implements TransportConsume
{
private const DEFAULT_PREFETCH_COUNT = 1;
private BunnyMessageDecoder $messageDecoder;
/**
* @var \WeakMap<Consumer, \Closure(): void>
*/
private \WeakMap $consumerToCancel;
public function __construct(
private readonly BunnyConnectionPool $connectionPool,
ObjectDenormalizer $objectDenormalizer,
private readonly int $prefetchCount = self::DEFAULT_PREFETCH_COUNT,
) {
$this->messageDecoder = new BunnyMessageDecoder($objectDenormalizer);
/** @var \WeakMap<Consumer, \Closure(): void> */
$this->consumerToCancel = new \WeakMap();
}
/**
* @psalm-suppress MissingThrowsDocblock
*/
public function runConsumer(Consumer $consumer): void
{
$channel = $this->connectionPool->get()->channel();
$channel->qos(prefetchCount: $this->prefetchCount);
$consumerTag = $channel->consume(
callback: function (Message $message) use ($channel, $consumer): void
{
$consumer->handle($this->messageDecoder->decode($message));
$channel->ack($message);
},
queue: $consumer->queue,
)->consumerTag;
$this->consumerToCancel[$consumer] = static function () use ($channel, $consumerTag): void {
$channel->cancel($consumerTag);
$channel->close();
};
}
/**
* @throws \Throwable
*/
public function stopConsumer(Consumer $consumer): void
{
$cancel = $this->consumerToCancel[$consumer] ?? null;
if ($cancel !== null) {
$cancel();
unset($this->consumerToCancel[$consumer]);
}
}
}