Skip to content

Commit af06aa5

Browse files
bretambroseBret Ambrose
and
Bret Ambrose
authored
make_request implementation (#644)
Co-authored-by: Bret Ambrose <[email protected]>
1 parent bb89973 commit af06aa5

File tree

5 files changed

+735
-2
lines changed

5 files changed

+735
-2
lines changed

awscrt/mqtt_request_response.py

+101-2
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,86 @@
66
# SPDX-License-Identifier: Apache-2.0.
77

88
from dataclasses import dataclass
9-
from typing import Callable, Union
10-
from awscrt import NativeResource, mqtt5, mqtt
9+
from typing import Union
10+
from awscrt import NativeResource, mqtt5, mqtt, exceptions
11+
from concurrent.futures import Future
1112
import _awscrt
13+
import collections.abc
14+
15+
16+
@dataclass
17+
class Response:
18+
"""
19+
Encapsulates a response to an AWS IoT Core MQTT-based service request
20+
21+
Args:
22+
topic (str): MQTT Topic that the response was received on.
23+
payload (Optional[bytes]): The payload of the response.
24+
"""
25+
topic: str
26+
payload: 'Optional[bytes]' = None
27+
28+
29+
@dataclass
30+
class ResponsePath:
31+
"""
32+
A response path is a pair of values - MQTT topic and a JSON path - that describe how a response to
33+
an MQTT-based request may arrive. For a given request type, there may be multiple response paths and each
34+
one is associated with a separate JSON schema for the response body.
35+
36+
Args:
37+
topic (str): MQTT topic that a response may arrive on.
38+
correlation_token_json_path (Optional[str]): JSON path for finding correlation tokens within payloads that arrive on this path's topic.
39+
"""
40+
topic: str
41+
correlation_token_json_path: 'Optional[str]' = None
42+
43+
def validate(self):
44+
assert isinstance(self.topic, str)
45+
assert isinstance(self.correlation_token_json_path, str) or self.correlation_token_json_path is None
46+
47+
48+
@dataclass
49+
class RequestResponseOperationOptions:
50+
"""
51+
Configuration options for an MQTT-based request-response operation.
52+
53+
Args:
54+
subscription_topic_filters (Sequence[str]): Set of topic filters that should be subscribed to in order to cover all possible response paths. Sometimes using wildcards can cut down on the subscriptions needed; other times that isn't valid.
55+
response_paths (Sequence[ResponsePath]): Set of all possible response paths associated with this request type.
56+
publish_topic (str): Topic to publish the request to once response subscriptions have been established.
57+
payload (bytes): Payload to publish to 'publishTopic' in order to initiate the request
58+
correlation_token (Optional[str]): Correlation token embedded in the request that must be found in a response message. This can be null to support certain services which don't use correlation tokens. In that case, the client only allows one token-less request at a time.
59+
"""
60+
subscription_topic_filters: 'Sequence[str]'
61+
response_paths: 'Sequence[ResponsePath]'
62+
publish_topic: str
63+
payload: bytes
64+
correlation_token: 'Optional[str]' = None
65+
66+
def validate(self):
67+
assert isinstance(self.subscription_topic_filters, collections.abc.Sequence)
68+
for topic_filter in self.subscription_topic_filters:
69+
assert isinstance(topic_filter, str)
70+
71+
assert isinstance(self.response_paths, collections.abc.Sequence)
72+
for response_path in self.response_paths:
73+
response_path.validate()
74+
75+
assert isinstance(self.publish_topic, str)
76+
assert isinstance(self.payload, bytes)
77+
assert isinstance(self.correlation_token, str) or self.correlation_token is None
78+
79+
80+
@dataclass
81+
class StreamingOperationOptions:
82+
"""
83+
Configuration options for an MQTT-based streaming operation.
84+
85+
Args:
86+
subscription_topic_filter (str): Topic filter that the streaming operation should listen on
87+
"""
88+
subscription_topic_filter: str
1289

1390

1491
@dataclass
@@ -58,3 +135,25 @@ def __init__(self, protocol_client: Union[mqtt5.Client, mqtt.Connection],
58135
self._binding = _awscrt.mqtt_request_response_client_new_from_5(protocol_client, client_options)
59136
else:
60137
self._binding = _awscrt.mqtt_request_response_client_new_from_311(protocol_client, client_options)
138+
139+
def make_request(self, options: RequestResponseOperationOptions):
140+
options.validate()
141+
142+
future = Future()
143+
144+
def on_request_complete(error_code, topic, payload):
145+
if error_code != 0:
146+
future.set_exception(exceptions.from_code(error_code))
147+
else:
148+
response = Response(topic=topic, payload=payload)
149+
future.set_result(response)
150+
151+
_awscrt.mqtt_request_response_client_make_request(self._binding,
152+
options.subscription_topic_filters,
153+
options.response_paths,
154+
options.publish_topic,
155+
options.payload,
156+
options.correlation_token,
157+
on_request_complete)
158+
159+
return future

source/module.c

+1
Original file line numberDiff line numberDiff line change
@@ -780,6 +780,7 @@ static PyMethodDef s_module_methods[] = {
780780
/* MQTT Request Response Client */
781781
AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_5, METH_VARARGS),
782782
AWS_PY_METHOD_DEF(mqtt_request_response_client_new_from_311, METH_VARARGS),
783+
AWS_PY_METHOD_DEF(mqtt_request_response_client_make_request, METH_VARARGS),
783784

784785
/* Cryptographic primitives */
785786
AWS_PY_METHOD_DEF(md5_new, METH_NOARGS),

0 commit comments

Comments
 (0)