Skip to content

Commit

Permalink
add desired capabilities to sender and amqp short type (#210)
Browse files Browse the repository at this point in the history
* add desired capabilities to sender and amqp short type

* update changelog

* add test/sample code for idempotent producer

* fix typo

* more typo fix
  • Loading branch information
yunhaoling authored Feb 24, 2021
1 parent ec32de9 commit 9ecc351
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 12 deletions.
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
Release History
===============

1.2.15 (Unreleased)
+++++++++++++++++++

- Added desired-capabilities for `SendClient(Async)` and `MessageSender(Async)` as part of the AMQP protocol.
- Added types for AMQPShort and AMQPuShort for explicit handling of short and unsigned short encoding.

1.2.14 (2021-02-01)
+++++++++++++++++++

Expand Down
44 changes: 42 additions & 2 deletions samples/asynctests/test_azure_event_hubs_send_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import types

import uamqp
from uamqp import authentication
from uamqp import types as uamqp_types, utils, authentication


def get_logger(level):
Expand Down Expand Up @@ -172,6 +172,46 @@ async def fake_get_token():
assert jwt_token.hostname == b"123.45.67.89"


@pytest.mark.asyncio
async def test_event_hubs_idempotent_producer(live_eventhub_config):

uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

target = "amqps://{}/{}/Partitions/0".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])

symbol_array = [uamqp_types.AMQPSymbol(b"com.microsoft:idempotent-producer")]
desired_capabilities = utils.data_factory(uamqp_types.AMQPArray(symbol_array))

link_properties = {
uamqp_types.AMQPSymbol(b"com.microsoft:timeout"): uamqp_types.AMQPLong(int(60 * 1000))
}

def on_attach(attach_source, attach_target, properties, error):
if str(attach_target) == target:
on_attach.owner_level = properties.get(b"com.microsoft:producer-epoch")
on_attach.producer_group_id = properties.get(b"com.microsoft:producer-id")
on_attach.starting_sequence_number = properties.get(b"com.microsoft:producer-sequence-number")

send_client = uamqp.SendClientAsync(
target,
auth=sas_auth,
desired_capabilities=desired_capabilities,
link_properties=link_properties,
on_attach=on_attach,
debug=True
)
await send_client.open_async()
while not await send_client.client_ready_async():
await asyncio.sleep(0.05)

assert on_attach.owner_level is not None
assert on_attach.producer_group_id is not None
assert on_attach.starting_sequence_number is not None
await send_client.close_async()


if __name__ == '__main__':
config = {}
config['hostname'] = os.environ['EVENT_HUB_HOSTNAME']
Expand All @@ -182,4 +222,4 @@ async def fake_get_token():
config['partition'] = "0"

loop = asyncio.get_event_loop()
loop.run_until_complete(test_event_hubs_send_timeout_async(config))
loop.run_until_complete(test_event_hubs_idempotent_producer(config))
42 changes: 40 additions & 2 deletions samples/test_azure_event_hubs_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import pytest

import uamqp
from uamqp import authentication
from uamqp import types as uamqp_types, utils, authentication


def get_logger(level):
Expand Down Expand Up @@ -202,6 +202,44 @@ def fake_get_token():
assert jwt_token.hostname == b"123.45.67.89"


def test_event_hubs_idempotent_producer(live_eventhub_config):

uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAuth.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

target = "amqps://{}/{}/Partitions/0".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])

symbol_array = [uamqp_types.AMQPSymbol(b"com.microsoft:idempotent-producer")]
desired_capabilities = utils.data_factory(uamqp_types.AMQPArray(symbol_array))

link_properties = {
uamqp_types.AMQPSymbol(b"com.microsoft:timeout"): uamqp_types.AMQPLong(int(60 * 1000))
}

def on_attach(attach_source, attach_target, properties, error):
if str(attach_target) == target:
on_attach.owner_level = properties.get(b"com.microsoft:producer-epoch")
on_attach.producer_group_id = properties.get(b"com.microsoft:producer-id")
on_attach.starting_sequence_number = properties.get(b"com.microsoft:producer-sequence-number")

send_client = uamqp.SendClient(
target,
auth=sas_auth,
desired_capabilities=desired_capabilities,
link_properties=link_properties,
on_attach=on_attach,
debug=True
)
send_client.open()
while not send_client.client_ready():
time.sleep(0.05)
assert on_attach.owner_level is not None
assert on_attach.producer_group_id is not None
assert on_attach.starting_sequence_number is not None
send_client.close()


if __name__ == '__main__':
config = {}
config['hostname'] = os.environ['EVENT_HUB_HOSTNAME']
Expand All @@ -211,4 +249,4 @@ def fake_get_token():
config['consumer_group'] = "$Default"
config['partition'] = "0"

test_event_hubs_send_timeout_sync(config)
test_event_hubs_idempotent_producer(config)
2 changes: 1 addition & 1 deletion uamqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
pass # Async not supported.


__version__ = "1.2.14"
__version__ = "1.2.15"


_logger = logging.getLogger(__name__)
Expand Down
8 changes: 7 additions & 1 deletion uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ class SendClientAsync(client.SendClient, AMQPClientAsync):
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param link_properties: Metadata to be sent in the Link ATTACH frame.
Expand Down Expand Up @@ -507,6 +512,7 @@ async def _client_ready_async(self):
properties=self._link_properties,
error_policy=self._error_policy,
encoding=self._encoding,
desired_capabilities=self._desired_capabilities,
loop=self.loop)
await asyncio.shield(self.message_handler.open_async(), loop=self.loop)
return False
Expand Down Expand Up @@ -723,7 +729,7 @@ class ReceiveClientAsync(client.ReceiveClient, AMQPClientAsync):
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
Expand Down
2 changes: 1 addition & 1 deletion uamqp/async_ops/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MessageReceiverAsync(receiver.MessageReceiver):
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
Expand Down
9 changes: 8 additions & 1 deletion uamqp/async_ops/sender_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class MessageSenderAsync(sender.MessageSender):
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param link_credit: The sender Link credit that determines how many
Expand Down Expand Up @@ -85,6 +90,7 @@ def __init__(self, session, source, target,
error_policy=None,
debug=False,
encoding='UTF-8',
desired_capabilities=None,
loop=None):
self.loop = loop or get_running_loop()
super(MessageSenderAsync, self).__init__(
Expand All @@ -97,7 +103,8 @@ def __init__(self, session, source, target,
properties=properties,
error_policy=error_policy,
debug=debug,
encoding=encoding)
encoding=encoding,
desired_capabilities=desired_capabilities)

async def __aenter__(self):
"""Open the MessageSender in an async context manager."""
Expand Down
10 changes: 8 additions & 2 deletions uamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ class SendClient(AMQPClient):
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param link_properties: Metadata to be sent in the Link ATTACH frame.
Expand Down Expand Up @@ -521,7 +526,8 @@ def _client_ready(self):
link_credit=self._link_credit,
properties=self._link_properties,
error_policy=self._error_policy,
encoding=self._encoding)
encoding=self._encoding,
desired_capabilities=self._desired_capabilities)
self.message_handler.open()
return False
if self.message_handler.get_state() == constants.MessageSenderState.Error:
Expand Down Expand Up @@ -832,7 +838,7 @@ class ReceiveClient(AMQPClient):
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
Expand Down
2 changes: 1 addition & 1 deletion uamqp/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class MessageReceiver(object):
the client will not wait for confirmation and assume success.
:type send_settle_mode: ~uamqp.constants.SenderSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create an desired_capabilities object, please do as follows:
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
Expand Down
10 changes: 9 additions & 1 deletion uamqp/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class MessageSender(object):
will assume successful receipt of the message and clear it from the queue. The
default is `PeekLock`.
:type receive_settle_mode: ~uamqp.constants.ReceiverSettleMode
:param desired_capabilities: The extension capabilities desired from the peer endpoint.
To create a desired_capabilities object, please do as follows:
- 1. Create an array of desired capability symbols: `capabilities_symbol_array = [types.AMQPSymbol(string)]`
- 2. Transform the array to AMQPValue object: `utils.data_factory(types.AMQPArray(capabilities_symbol_array))`
:type desired_capabilities: ~uamqp.c_uamqp.AMQPValue
:param max_message_size: The maximum allowed message size negotiated for the Link.
:type max_message_size: int
:param link_credit: The sender Link credit that determines how many
Expand Down Expand Up @@ -77,7 +82,8 @@ def __init__(self, session, source, target,
properties=None,
error_policy=None,
debug=False,
encoding='UTF-8'):
encoding='UTF-8',
desired_capabilities=None):
# pylint: disable=protected-access
if name:
self.name = name.encode(encoding) if isinstance(name, six.text_type) else name
Expand Down Expand Up @@ -105,6 +111,8 @@ def __init__(self, session, source, target,
self.receive_settle_mode = receive_settle_mode
if max_message_size:
self.max_message_size = max_message_size
if desired_capabilities:
self._link.set_desired_capabilities(desired_capabilities)

self._sender = c_uamqp.create_message_sender(self._link, self)
self._sender.set_trace(debug)
Expand Down
42 changes: 42 additions & 0 deletions uamqp/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,48 @@ def _c_wrapper(self, value):
raise ValueError("Value {} is too large for an unsigned int value.".format(value))


class AMQPShort(AMQPType):
"""An AMQP short object.
:ivar value: The Python value of the AMQP type.
:vartype value: int
:ivar c_data: The C AMQP encoded object.
:vartype c_data: uamqp.c_uamqp.ShortValue
:param value: The value to encode as an AMQP short.
:type value: int
:raises: ValueError if value is not within allowed range.
"""

def _c_wrapper(self, value):
try:
return c_uamqp.short_value(int(value))
except TypeError:
raise ValueError("Value must be an integer")
except OverflowError:
raise ValueError("Value {} is too large for a short value.".format(value))


class AMQPuShort(AMQPType):
"""An AMQP unsigned short object.
:ivar value: The Python value of the AMQP uInt.
:vartype value: int
:ivar c_data: The C AMQP encoded object.
:vartype c_data: uamqp.c_uamqp.UShortValue
:param value: The value to encode as an AMQP unsigned short.
:type value: int
:raises: ValueError if value is not within allowed range.
"""

def _c_wrapper(self, value):
try:
return c_uamqp.ushort_value(int(value))
except TypeError:
raise ValueError("Value must be an integer")
except OverflowError:
raise ValueError("Value {} is too large for an unsigned short value.".format(value))


class AMQPArray(AMQPType):
"""An AMQP Array object. All the values in the array
must be of the same type.
Expand Down

0 comments on commit 9ecc351

Please sign in to comment.