diff --git a/src/Connector/Contract/Queue.php b/src/Connector/Contract/Queue.php index 0387b4f..adad9d7 100644 --- a/src/Connector/Contract/Queue.php +++ b/src/Connector/Contract/Queue.php @@ -13,6 +13,11 @@ interface Queue */ public function queueMessage(string $queue, array $message) : void; + /** + * @param string[][] $messages + */ + public function queueBatch(string $queue, array $messages) : void; + public function consume(string $queue, callable $callback, callable $idleCallback) : void; public function acknowledge(string $queue, Message $message) : void; diff --git a/src/Connector/RabbitMQ.php b/src/Connector/RabbitMQ.php index bd64e5c..cf313ab 100644 --- a/src/Connector/RabbitMQ.php +++ b/src/Connector/RabbitMQ.php @@ -47,6 +47,25 @@ public function queueMessage(string $queue, array $message) : void ); } + /** + * @param string[][] $messages + */ + public function queueBatch(string $queue, array $messages) : void + { + $this->connectToChannel(); + + foreach ($messages as $message) { + $this->channel->batch_basic_publish( + new AMQPMessage(json_encode($message)), + $queue + ); + } + + $this->channel->publish_batch(); + + $this->channel->confirm_select(); + } + public function consume(string $queue, callable $callback, callable $idleCallback) : void { $this->connectToChannel(); diff --git a/src/Connector/SQS.php b/src/Connector/SQS.php index 8ab8875..6724bf6 100644 --- a/src/Connector/SQS.php +++ b/src/Connector/SQS.php @@ -31,10 +31,24 @@ class SQS extends SqsClient implements Queue */ public function queueMessage(string $queue, array $message) : void { - $this->sendMessage([ + $this->sendMessage( + ['QueueUrl' => $queue] + $this->wrapMessageInSQSFormat($message) + ); + } + + /** + * @param string[][] $messages + */ + public function queueBatch(string $queue, array $messages) : void + { + $batchedMessages = []; + foreach ($messages as $index => $message) { + $batchedMessages[] = $this->wrapMessageInSQSFormat($message, $index); + } + + $this->sendMessageBatch([ 'QueueUrl' => $queue, - 'MessageBody' => json_encode($message), - 'MessageAttributes' => $this->getMessageAttributes($message), + 'Entries' => $batchedMessages, ]); } @@ -95,6 +109,20 @@ public function setQueueOptions(array $queueOptions) : void } } + /** + * @param string[] $message + * + * @return string[][] + */ + private function wrapMessageInSQSFormat(array $message, ?int $idIndex = null) : array + { + return [ + 'Id' => $idIndex, + 'MessageBody' => json_encode($message), + 'MessageAttributes' => $this->getMessageAttributes($message), + ]; + } + /** * @param string[] $message * diff --git a/tests/Functional/Connector/RabbitMQTest.php b/tests/Functional/Connector/RabbitMQTest.php index 6369fac..d7e2e47 100644 --- a/tests/Functional/Connector/RabbitMQTest.php +++ b/tests/Functional/Connector/RabbitMQTest.php @@ -16,6 +16,8 @@ class RabbitMQTest extends RabbitMQTestCase private const DEAD_LETTER_QUEUE_NAME = 'dead_letter'; + private const NUMBER_OF_MESSAGES_IN_BATCH = 10; + public function setUp() : void { $this->rabbitMq = new RabbitMQ( @@ -26,6 +28,9 @@ public function setUp() : void ); $this->channel = $this->rabbitMq->channel(); + + $this->cleanUpMessages(self::QUEUE_NAME); + $this->cleanUpMessages(self::DEAD_LETTER_QUEUE_NAME); } public function testWeCanConnectToTheRabbitMQServer() : void @@ -56,10 +61,12 @@ public function testWeCanRejectAMessageAndSendItToTheDeadLetterQueue() : void public function testWeCanCallTheCallbackFunctionWhenWeHaveAMessage() : void { - $this->rabbitMq->setQueueOptions([ - 'blockingConsumer' => false, - 'non-existing-option' => true, - ]); + $this->rabbitMq->setQueueOptions( + [ + 'blockingConsumer' => false, + 'non-existing-option' => true, + ] + ); $this->addMessageToQueue(); @@ -75,8 +82,35 @@ function () : void { ); } + public function testWeCanBatchMultipleMessagesAndTheyAppearInTheQueue() : void + { + $numberOfMessagesToTestWith = self::NUMBER_OF_MESSAGES_IN_BATCH; + + $this->addMultipleMessagesToQueueAsBatch($numberOfMessagesToTestWith); + + while ($numberOfMessagesToTestWith-- > 0) { + $this->assertQueueHasAMessage(self::QUEUE_NAME); + } + + $this->assertQueueIsEmpty(self::QUEUE_NAME); + } + private function addMessageToQueue() : void { $this->rabbitMq->queueMessage(self::QUEUE_NAME, ['example' => 'test']); } + + private function addMultipleMessagesToQueueAsBatch(int $numberOfMessagesInBatch) : void + { + $messageBatch = []; + + while ($numberOfMessagesInBatch-- > 0) { + $messageBatch[] = [ + 'example' => 'test', + 'number' => $numberOfMessagesInBatch, + ]; + } + + $this->rabbitMq->queueBatch(self::QUEUE_NAME, $messageBatch); + } } diff --git a/tests/Functional/Connector/SQSTest.php b/tests/Functional/Connector/SQSTest.php index f6457c5..abf1409 100644 --- a/tests/Functional/Connector/SQSTest.php +++ b/tests/Functional/Connector/SQSTest.php @@ -9,11 +9,10 @@ use SykesCottages\Qu\Message\Contract\Message; use SykesCottages\Qu\Message\SQSMessage; use Tests\Functional\Connector\Stubs\SQSCallable; -use Tests\Functional\FunctionalTestCase; -use function current; +use Tests\Functional\SQSTestCase; use function getenv; -class SQSTest extends FunctionalTestCase +class SQSTest extends SQSTestCase { private const DEFAULT_NUMBER_OF_URLS = 2; @@ -23,36 +22,35 @@ class SQSTest extends FunctionalTestCase private const QUEUE_NAME = 'test'; - /** @var SQS */ - private $sqs; - /** @var string */ - private $testingQueueUrl; + private const NUMBER_OF_MESSAGES_IN_BATCH = 10; public function setUp() : void { - $this->sqs = new SQS([ - 'service' => 'sqs', - 'endpoint' => getenv('SQS_ENDPOINT'), - 'region' => 'elasticmq', - 'credentials' => [ - 'key' => 'X', - 'secret' => 'X', - ], - 'version' => '2012-11-05', - 'exception_class' => 'Aws\Exception\AwsException', - ]); + $this->sqs = new SQS( + [ + 'service' => 'sqs', + 'endpoint' => getenv('SQS_ENDPOINT'), + 'region' => 'elasticmq', + 'credentials' => [ + 'key' => 'X', + 'secret' => 'X', + ], + 'version' => '2012-11-05', + 'exception_class' => 'Aws\Exception\AwsException', + ] + ); $this->testingQueueUrl = getenv('SQS_ENDPOINT') . '/queue/' . self::QUEUE_NAME; - $this->sqs->setQueueOptions([ - 'blockingConsumer' => false, - 'pollTime' => 0, - 'non-existing-option' => true, - ]); + $this->sqs->setQueueOptions( + [ + 'blockingConsumer' => false, + 'pollTime' => 0, + 'non-existing-option' => true, + ] + ); - $this->sqs->purgeQueue([ - 'QueueUrl' => $this->testingQueueUrl, - ]); + $this->purgeQueue(); } public function testWeCanConnectToSQSAndReturnAListOfQueueUrls() : void @@ -68,30 +66,20 @@ public function testWeCanAcknowledgeAMessageInTheQueue() : void { $this->addMessageToQueue(); - $messages = $this->getMessages($this->testingQueueUrl); - - $this->assertCount(1, $messages); + $this->assertQueueHasAtLeastOneMessageWithAcknowledgement(); - $this->sqs->acknowledge($this->testingQueueUrl, new SQSMessage(current($messages))); - - $this->assertEmpty($this->getMessages($this->testingQueueUrl)); + $this->assertQueueIsEmpty(); } public function testWeCanRejectAMessageInTheQueue() : void { $this->addMessageToQueue(); - $messages = $this->getMessages($this->testingQueueUrl); - - $this->assertCount(1, $messages); - - $this->sqs->reject($this->testingQueueUrl, new SQSMessage(current($messages))); - - $this->assertEmpty($this->getMessages($this->testingQueueUrl)); + $this->assertQueueHasAtLeastOneMessageAndRejectMessage(); - $messages = $this->getMessages($this->testingQueueUrl . '-deadletter'); + $this->assertQueueIsEmpty(); - $this->assertCount(1, $messages); + $this->assertDeadLetterQueueHasAtLeastOneMessage(); } public function testWeCanCallTheCallbackFunctionOnConsume() : void @@ -165,30 +153,16 @@ function () : void { ); } - private function addMultipleMessagesToQueue(int $messagesToSend) : void + public function testWeCanSendABatchOfMessagesToQueue() : void { - $countOfMessages = 0; - while ($countOfMessages < $messagesToSend) { - $this->addMessageToQueue(); - $countOfMessages++; - } - } + $numberOfMessagesToTestWith = self::NUMBER_OF_MESSAGES_IN_BATCH; - private function addMessageToQueue() : void - { - $this->sqs->queueMessage($this->testingQueueUrl, ['example' => 'test']); - } + $this->addMultipleMessagesToQueueAsBatch($numberOfMessagesToTestWith); - /** - * @return SQSMessage[] - */ - private function getMessages(string $queueUrl) : array - { - $message = $this->sqs->receiveMessage([ - 'QueueUrl' => $queueUrl, - 'WaitTimeSeconds' => 0, - ]); + while ($numberOfMessagesToTestWith-- > 0) { + $this->assertQueueHasAtLeastOneMessage(); + } - return $message->get('Messages') ?? []; + $this->assertQueueIsEmpty(); } } diff --git a/tests/Functional/RabbitMQTestCase.php b/tests/Functional/RabbitMQTestCase.php index 077cb75..510a012 100644 --- a/tests/Functional/RabbitMQTestCase.php +++ b/tests/Functional/RabbitMQTestCase.php @@ -33,6 +33,7 @@ protected function assertQueueIsEmpty(string $queueName) : void protected function assertQueueHasAMessage(string $queueName) : void { $message = $this->channel->basic_get($queueName); + $this->channel->basic_ack($message->getDeliveryTag()); $this->assertInstanceOf(AMQPMessage::class, $message); } diff --git a/tests/Functional/SQSTestCase.php b/tests/Functional/SQSTestCase.php new file mode 100644 index 0000000..c312be0 --- /dev/null +++ b/tests/Functional/SQSTestCase.php @@ -0,0 +1,104 @@ +sqs->receiveMessage( + [ + 'QueueUrl' => $queueUrl, + 'WaitTimeSeconds' => 0, + ] + ); + + return $message->get('Messages') ?? []; + } + + protected function addMultipleMessagesToQueueAsBatch(int $numberOfMessagesInBatch) : void + { + $messageBatch = []; + + while ($numberOfMessagesInBatch-- > 0) { + $messageBatch[] = [ + 'example' => 'test', + 'number' => $numberOfMessagesInBatch, + ]; + } + + $this->sqs->queueBatch($this->testingQueueUrl, $messageBatch); + } + + protected function addMultipleMessagesToQueue(int $messagesToSend) : void + { + $countOfMessages = 0; + while ($countOfMessages < $messagesToSend) { + $this->addMessageToQueue(); + $countOfMessages++; + } + } + + protected function addMessageToQueue() : void + { + $this->sqs->queueMessage($this->testingQueueUrl, ['example' => 'test']); + } + + protected function assertQueueIsEmpty() : void + { + $this->assertEmpty( + $this->getMessages($this->testingQueueUrl) + ); + } + + protected function assertQueueHasAtLeastOneMessage() : void + { + $messages = $this->getMessages($this->testingQueueUrl); + $this->assertCount(1, $messages); + } + + protected function assertDeadLetterQueueHasAtLeastOneMessage() : void + { + $messages = $this->getMessages($this->testingQueueUrl . '-deadletter'); + $this->assertCount(1, $messages); + } + + protected function assertQueueHasAtLeastOneMessageWithAcknowledgement() : void + { + $messages = $this->getMessages($this->testingQueueUrl); + $this->assertCount(1, $messages); + + $this->sqs->acknowledge($this->testingQueueUrl, new SQSMessage(current($messages))); + } + + protected function assertQueueHasAtLeastOneMessageAndRejectMessage() : void + { + $messages = $this->getMessages($this->testingQueueUrl); + $this->assertCount(1, $messages); + + $this->sqs->reject($this->testingQueueUrl, new SQSMessage(current($messages))); + } + + protected function purgeQueue() : void + { + $this->sqs->purgeQueue( + [ + 'QueueUrl' => $this->testingQueueUrl, + ] + ); + } +} diff --git a/tests/Unit/Connector/SQSTest.php b/tests/Unit/Connector/SQSTest.php index f40d0ce..0b21910 100644 --- a/tests/Unit/Connector/SQSTest.php +++ b/tests/Unit/Connector/SQSTest.php @@ -24,12 +24,14 @@ public function setUp() : void { $this->genericMessage = Mockery::mock(Message::class); - $this->sqs = new SQS([ - 'service' => 'sqs', - 'region' => 'elasticmq', - 'version' => '2012-11-05', - 'exception_class' => 'Aws\Exception\AwsException', - ]); + $this->sqs = new SQS( + [ + 'service' => 'sqs', + 'region' => 'elasticmq', + 'version' => '2012-11-05', + 'exception_class' => 'Aws\Exception\AwsException', + ] + ); } /**