|
| 1 | +from __future__ import absolute_import, unicode_literals |
| 2 | + |
| 3 | +from case import Mock, ANY |
| 4 | +import pytest |
| 5 | +import amqp |
| 6 | + |
| 7 | + |
| 8 | +@pytest.mark.env('rabbitmq') |
| 9 | +def test_connect(): |
| 10 | + connection = amqp.Connection() |
| 11 | + connection.connect() |
| 12 | + connection.close() |
| 13 | + |
| 14 | + |
| 15 | +@pytest.mark.env('rabbitmq') |
| 16 | +class test_rabbitmq_operations(): |
| 17 | + |
| 18 | + @pytest.fixture(autouse=True) |
| 19 | + def setup_conn(self): |
| 20 | + self.connection = amqp.Connection() |
| 21 | + self.connection.connect() |
| 22 | + self.channel = self.connection.channel() |
| 23 | + yield |
| 24 | + self.connection.close() |
| 25 | + |
| 26 | + @pytest.mark.parametrize( |
| 27 | + "publish_method", ('basic_publish', 'basic_publish_confirm') |
| 28 | + ) |
| 29 | + def test_publish_consume(self, publish_method): |
| 30 | + callback = Mock() |
| 31 | + self.channel.queue_declare( |
| 32 | + queue='py-amqp-unittest', durable=False, exclusive=True |
| 33 | + ) |
| 34 | + getattr(self.channel, publish_method)( |
| 35 | + amqp.Message('Unittest'), routing_key='py-amqp-unittest' |
| 36 | + ) |
| 37 | + self.channel.basic_consume( |
| 38 | + queue='py-amqp-unittest', |
| 39 | + callback=callback, |
| 40 | + consumer_tag='amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg' |
| 41 | + ) |
| 42 | + self.connection.drain_events() |
| 43 | + callback.assert_called_once_with(ANY) |
| 44 | + msg = callback.call_args[0][0] |
| 45 | + assert isinstance(msg, amqp.Message) |
| 46 | + assert msg.body_size == len('Unittest') |
| 47 | + assert msg.body == 'Unittest' |
| 48 | + assert msg.frame_method == amqp.spec.Basic.Deliver |
| 49 | + assert msg.delivery_tag == 1 |
| 50 | + assert msg.ready is True |
| 51 | + assert msg.delivery_info == { |
| 52 | + 'consumer_tag': 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg', |
| 53 | + 'delivery_tag': 1, |
| 54 | + 'redelivered': False, |
| 55 | + 'exchange': '', |
| 56 | + 'routing_key': 'py-amqp-unittest' |
| 57 | + } |
| 58 | + assert msg.properties == {'content_encoding': 'utf-8'} |
| 59 | + |
| 60 | + self.channel.basic_ack(msg.delivery_tag) |
| 61 | + |
| 62 | + def test_publish_get(self): |
| 63 | + self.channel.queue_declare( |
| 64 | + queue='py-amqp-unittest', durable=False, exclusive=True |
| 65 | + ) |
| 66 | + self.channel.basic_publish( |
| 67 | + amqp.Message('Unittest'), routing_key='py-amqp-unittest' |
| 68 | + ) |
| 69 | + msg = self.channel.basic_get( |
| 70 | + queue='py-amqp-unittest', |
| 71 | + ) |
| 72 | + assert msg.body_size == 8 |
| 73 | + assert msg.body == 'Unittest' |
| 74 | + assert msg.frame_method == amqp.spec.Basic.GetOk |
| 75 | + assert msg.delivery_tag == 1 |
| 76 | + assert msg.ready is True |
| 77 | + assert msg.delivery_info == { |
| 78 | + 'delivery_tag': 1, 'redelivered': False, |
| 79 | + 'exchange': '', |
| 80 | + 'routing_key': 'py-amqp-unittest', 'message_count': 0 |
| 81 | + } |
| 82 | + assert msg.properties == { |
| 83 | + 'content_encoding': 'utf-8' |
| 84 | + } |
| 85 | + |
| 86 | + self.channel.basic_ack(msg.delivery_tag) |
| 87 | + |
| 88 | + msg = self.channel.basic_get( |
| 89 | + queue='py-amqp-unittest', |
| 90 | + ) |
| 91 | + assert msg is None |
0 commit comments