Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new functions to batch messages #9

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Connector/Contract/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ interface Queue
*/
public function queueMessage(string $queue, array $message) : void;

/**
* @param string[][] $messages
*/
public function queueBatch(string $queue, array $messages) : void;

public function consume(string $queue, callable $callback, callable $idleCallback) : void;

public function acknowledge(string $queue, Message $message) : void;
Expand Down
19 changes: 19 additions & 0 deletions src/Connector/RabbitMQ.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ public function queueMessage(string $queue, array $message) : void
);
}

/**
* @param string[][] $messages
*/
public function queueBatch(string $queue, array $messages) : void
{
$this->connectToChannel();

foreach ($messages as $message) {
$this->channel->batch_basic_publish(
new AMQPMessage(json_encode($message)),
$queue
);
}

$this->channel->publish_batch();

$this->channel->confirm_select();
}

public function consume(string $queue, callable $callback, callable $idleCallback) : void
{
$this->connectToChannel();
Expand Down
34 changes: 31 additions & 3 deletions src/Connector/SQS.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,24 @@ class SQS extends SqsClient implements Queue
*/
public function queueMessage(string $queue, array $message) : void
{
$this->sendMessage([
$this->sendMessage(
['QueueUrl' => $queue] + $this->wrapMessageInSQSFormat($message)
);
}

/**
* @param string[][] $messages
*/
public function queueBatch(string $queue, array $messages) : void
{
$batchedMessages = [];
foreach ($messages as $index => $message) {
$batchedMessages[] = $this->wrapMessageInSQSFormat($message, $index);
}

$this->sendMessageBatch([
'QueueUrl' => $queue,
'MessageBody' => json_encode($message),
'MessageAttributes' => $this->getMessageAttributes($message),
'Entries' => $batchedMessages,
]);
}

Expand Down Expand Up @@ -95,6 +109,20 @@ public function setQueueOptions(array $queueOptions) : void
}
}

/**
* @param string[] $message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This this type hint correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope :( Should be array[] - I'll fix it 👍 - Just writing some tests around this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah sorry, this one is correct, the parameters for the batch message function wasn't - it should be string[][] $messages which I've updated :)

*
* @return string[][]
*/
private function wrapMessageInSQSFormat(array $message, ?int $idIndex = null) : array
{
return [
'Id' => $idIndex,
'MessageBody' => json_encode($message),
'MessageAttributes' => $this->getMessageAttributes($message),
];
}

/**
* @param string[] $message
*
Expand Down
42 changes: 38 additions & 4 deletions tests/Functional/Connector/RabbitMQTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class RabbitMQTest extends RabbitMQTestCase

private const DEAD_LETTER_QUEUE_NAME = 'dead_letter';

private const NUMBER_OF_MESSAGES_IN_BATCH = 10;

public function setUp() : void
{
$this->rabbitMq = new RabbitMQ(
Expand All @@ -26,6 +28,9 @@ public function setUp() : void
);

$this->channel = $this->rabbitMq->channel();

$this->cleanUpMessages(self::QUEUE_NAME);
$this->cleanUpMessages(self::DEAD_LETTER_QUEUE_NAME);
}

public function testWeCanConnectToTheRabbitMQServer() : void
Expand Down Expand Up @@ -56,10 +61,12 @@ public function testWeCanRejectAMessageAndSendItToTheDeadLetterQueue() : void

public function testWeCanCallTheCallbackFunctionWhenWeHaveAMessage() : void
{
$this->rabbitMq->setQueueOptions([
'blockingConsumer' => false,
'non-existing-option' => true,
]);
$this->rabbitMq->setQueueOptions(
[
'blockingConsumer' => false,
'non-existing-option' => true,
]
);

$this->addMessageToQueue();

Expand All @@ -75,8 +82,35 @@ function () : void {
);
}

public function testWeCanBatchMultipleMessagesAndTheyAppearInTheQueue() : void
{
$numberOfMessagesToTestWith = self::NUMBER_OF_MESSAGES_IN_BATCH;

$this->addMultipleMessagesToQueueAsBatch($numberOfMessagesToTestWith);

while ($numberOfMessagesToTestWith-- > 0) {
$this->assertQueueHasAMessage(self::QUEUE_NAME);
}

$this->assertQueueIsEmpty(self::QUEUE_NAME);
}

private function addMessageToQueue() : void
{
$this->rabbitMq->queueMessage(self::QUEUE_NAME, ['example' => 'test']);
}

private function addMultipleMessagesToQueueAsBatch(int $numberOfMessagesInBatch) : void
{
$messageBatch = [];

while ($numberOfMessagesInBatch-- > 0) {
$messageBatch[] = [
'example' => 'test',
'number' => $numberOfMessagesInBatch,
];
}

$this->rabbitMq->queueBatch(self::QUEUE_NAME, $messageBatch);
}
}
98 changes: 36 additions & 62 deletions tests/Functional/Connector/SQSTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
use SykesCottages\Qu\Message\Contract\Message;
use SykesCottages\Qu\Message\SQSMessage;
use Tests\Functional\Connector\Stubs\SQSCallable;
use Tests\Functional\FunctionalTestCase;
use function current;
use Tests\Functional\SQSTestCase;
use function getenv;

class SQSTest extends FunctionalTestCase
class SQSTest extends SQSTestCase
{
private const DEFAULT_NUMBER_OF_URLS = 2;

Expand All @@ -23,36 +22,35 @@ class SQSTest extends FunctionalTestCase

private const QUEUE_NAME = 'test';

/** @var SQS */
private $sqs;
/** @var string */
private $testingQueueUrl;
private const NUMBER_OF_MESSAGES_IN_BATCH = 10;

public function setUp() : void
{
$this->sqs = new SQS([
'service' => 'sqs',
'endpoint' => getenv('SQS_ENDPOINT'),
'region' => 'elasticmq',
'credentials' => [
'key' => 'X',
'secret' => 'X',
],
'version' => '2012-11-05',
'exception_class' => 'Aws\Exception\AwsException',
]);
$this->sqs = new SQS(
[
'service' => 'sqs',
'endpoint' => getenv('SQS_ENDPOINT'),
'region' => 'elasticmq',
'credentials' => [
'key' => 'X',
'secret' => 'X',
],
'version' => '2012-11-05',
'exception_class' => 'Aws\Exception\AwsException',
]
);

$this->testingQueueUrl = getenv('SQS_ENDPOINT') . '/queue/' . self::QUEUE_NAME;

$this->sqs->setQueueOptions([
'blockingConsumer' => false,
'pollTime' => 0,
'non-existing-option' => true,
]);
$this->sqs->setQueueOptions(
[
'blockingConsumer' => false,
'pollTime' => 0,
'non-existing-option' => true,
]
);

$this->sqs->purgeQueue([
'QueueUrl' => $this->testingQueueUrl,
]);
$this->purgeQueue();
}

public function testWeCanConnectToSQSAndReturnAListOfQueueUrls() : void
Expand All @@ -68,30 +66,20 @@ public function testWeCanAcknowledgeAMessageInTheQueue() : void
{
$this->addMessageToQueue();

$messages = $this->getMessages($this->testingQueueUrl);

$this->assertCount(1, $messages);
$this->assertQueueHasAtLeastOneMessageWithAcknowledgement();

$this->sqs->acknowledge($this->testingQueueUrl, new SQSMessage(current($messages)));

$this->assertEmpty($this->getMessages($this->testingQueueUrl));
$this->assertQueueIsEmpty();
}

public function testWeCanRejectAMessageInTheQueue() : void
{
$this->addMessageToQueue();

$messages = $this->getMessages($this->testingQueueUrl);

$this->assertCount(1, $messages);

$this->sqs->reject($this->testingQueueUrl, new SQSMessage(current($messages)));

$this->assertEmpty($this->getMessages($this->testingQueueUrl));
$this->assertQueueHasAtLeastOneMessageAndRejectMessage();

$messages = $this->getMessages($this->testingQueueUrl . '-deadletter');
$this->assertQueueIsEmpty();

$this->assertCount(1, $messages);
$this->assertDeadLetterQueueHasAtLeastOneMessage();
}

public function testWeCanCallTheCallbackFunctionOnConsume() : void
Expand Down Expand Up @@ -165,30 +153,16 @@ function () : void {
);
}

private function addMultipleMessagesToQueue(int $messagesToSend) : void
public function testWeCanSendABatchOfMessagesToQueue() : void
{
$countOfMessages = 0;
while ($countOfMessages < $messagesToSend) {
$this->addMessageToQueue();
$countOfMessages++;
}
}
$numberOfMessagesToTestWith = self::NUMBER_OF_MESSAGES_IN_BATCH;

private function addMessageToQueue() : void
{
$this->sqs->queueMessage($this->testingQueueUrl, ['example' => 'test']);
}
$this->addMultipleMessagesToQueueAsBatch($numberOfMessagesToTestWith);

/**
* @return SQSMessage[]
*/
private function getMessages(string $queueUrl) : array
{
$message = $this->sqs->receiveMessage([
'QueueUrl' => $queueUrl,
'WaitTimeSeconds' => 0,
]);
while ($numberOfMessagesToTestWith-- > 0) {
$this->assertQueueHasAtLeastOneMessage();
}

return $message->get('Messages') ?? [];
$this->assertQueueIsEmpty();
}
}
1 change: 1 addition & 0 deletions tests/Functional/RabbitMQTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ protected function assertQueueIsEmpty(string $queueName) : void
protected function assertQueueHasAMessage(string $queueName) : void
{
$message = $this->channel->basic_get($queueName);
$this->channel->basic_ack($message->getDeliveryTag());
$this->assertInstanceOf(AMQPMessage::class, $message);
}

Expand Down
Loading