Skip to content

Commit 8b0c30f

Browse files
author
Sebastian Gronewold
committed
added configurable QOS support
1 parent ef478f1 commit 8b0c30f

File tree

5 files changed

+71
-2
lines changed

5 files changed

+71
-2
lines changed

DependencyInjection/Configuration.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ public function getConfigTreeBuilder()
5656
->children()
5757
->scalarNode('connection')->defaultValue('default')->end()
5858
->scalarNode('callback')->isRequired()->end()
59+
->arrayNode('qos_options')
60+
->canBeUnset()
61+
->useAttributeAsKey('key')
62+
->children()
63+
->scalarNode('prefetch_size')->defaultValue(0)->end()
64+
->scalarNode('prefetch_count')->defaultValue(0)->end()
65+
->booleanNode('global')->defaultValue(false)->end()
66+
->end()
67+
->end()
5968
->end()
6069
->end()
6170
->end()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,16 @@ protected function loadConsumers()
9797
$definition
9898
->addMethodCall('setExchangeOptions', array($consumer['exchange_options']))
9999
->addMethodCall('setQueueOptions', array($consumer['queue_options']))
100-
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')))
101-
;
100+
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'execute')));
101+
102+
if (array_key_exists('qos_options', $consumer)) {
103+
$definition->addMethodCall('setQosOptions', array(
104+
$consumer['qos_options']['prefetch_size'],
105+
$consumer['qos_options']['prefetch_count'],
106+
$consumer['qos_options']['global']
107+
));
108+
}
109+
102110
$this->injectConnection($definition, $consumer['connection']);
103111
if ($this->collectorEnabled) {
104112
$this->injectLoggedChannel($definition, $key, $consumer['connection']);

RabbitMq/BaseConsumer.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,17 @@ public function getConsumerTag()
7171
{
7272
return $this->consumerTag;
7373
}
74+
75+
/**
76+
* Sets the qos settings for the current channel
77+
* Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
78+
*
79+
* @param int $prefetchSize
80+
* @param int $prefetchCount
81+
* @param bool $global
82+
*/
83+
public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
84+
{
85+
$this->ch->basic_qos($prefetchSize, $prefetchCount, $global);
86+
}
7487
}

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ old_sound_rabbit_mq:
8585
type: direct
8686
callback: default_anon.callback
8787

88+
qos_test_consumer:
89+
connection: foo_connection
90+
exchange_options:
91+
name: foo_exchange
92+
type: direct
93+
queue_options:
94+
name: foo_queue
95+
qos_options:
96+
prefetch_size: 1024
97+
prefetch_count: 1
98+
global: true
99+
callback: foo.callback
100+
88101
rpc_clients:
89102
foo_client:
90103
connection: foo_connection

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,32 @@ public function testDefaultConsumerDefinition()
201201
$this->assertEquals('%old_sound_rabbit_mq.consumer.class%', $definition->getClass());
202202
}
203203

204+
public function testConsumerWithQosOptions()
205+
{
206+
$container = $this->getContainer('test.yml');
207+
208+
$this->assertTrue($container->has('old_sound_rabbit_mq.qos_test_consumer_consumer'));
209+
$definition = $container->getDefinition('old_sound_rabbit_mq.qos_test_consumer_consumer');
210+
$methodCalls = $definition->getMethodCalls();
211+
212+
$setQosParameters = null;
213+
foreach ($methodCalls as $methodCall) {
214+
if ($methodCall[0] === 'setQosOptions') {
215+
$setQosParameters = $methodCall[1];
216+
}
217+
}
218+
219+
$this->assertInternalType('array', $setQosParameters);
220+
$this->assertEquals(
221+
array(
222+
1024,
223+
1,
224+
true
225+
),
226+
$setQosParameters
227+
);
228+
}
229+
204230
public function testFooAnonConsumerDefinition()
205231
{
206232
$container = $this->getContainer('test.yml');

0 commit comments

Comments
 (0)