Skip to content

Commit eee754f

Browse files
Adjust shared subscription sample to better fit docs (#442)
1 parent d0ae85a commit eee754f

File tree

2 files changed

+25
-45
lines changed

2 files changed

+25
-45
lines changed

samples/mqtt5_shared_subscription.md

+6-8
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ MQTT5 introduces additional features and enhancements that improve the developme
1010

1111
Note: MQTT5 support is currently in **developer preview**. We encourage feedback at all times, but feedback during the preview window is especially valuable in shaping the final product. During the preview period we may make backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid.
1212

13-
[Shared Subscriptions](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed.
13+
[Shared Subscriptions](https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html#mqtt5-shared-subscription) allow IoT devices to connect to a group where messages sent to a topic are then relayed to the group in a round-robin-like fashion. This is useful for distributing message load across multiple subscribing MQTT5 clients automatically. This is helpful for load balancing when you have many messages that need to be processed.
1414

15-
Shared Subscriptions rely on a group identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share/<group identifier>/<topic>`.
15+
Shared Subscriptions rely on a group name/identifier, which tells the MQTT5 broker/server which IoT devices to treat as a group for message distribution. This is done when subscribing by formatting the subscription topic like the following: `$share/<ShareName>/<TopicFilter>`.
1616
* `$share`: Tells the MQTT5 broker/server that the device is subscribing to a Shared Subscription.
17-
* `<group identifier>`: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group.
18-
* `<topic>`: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`.
17+
* `<ShareName>`: Tells the MQTT5 broker/server which group to add this Shared Subscription to. Messages published to a matching topic will be distributed round-robin amongst the group.
18+
* `<TopicFilter>`: The topic that the Shared Subscription is for. Messages published to this topic will be processed in a round-robin fashion. For example, `test/topic`.
1919

20-
Shared Subscriptions use a round-robbin like method of distributing messages. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order:
20+
Shared Subscriptions use a round-robbin like method of distributing messages for the subscribed clients. For example, say you have three MQTT5 clients all subscribed to the same Shared Subscription group and topic. If five messages are sent to the Shared Subscription topic, the messages will likely be delivered in the following order:
2121
* Message 1 -> Client one
2222
* Message 2 -> Client two
2323
* Message 3 -> Client three
@@ -71,14 +71,12 @@ Replace with the following with the data from your AWS account:
7171
* `<region>`: The AWS IoT Core region where you created your AWS IoT Core thing you wish to use with this sample. For example `us-east-1`.
7272
* `<account>`: Your AWS IoT Core account ID. This is the set of numbers in the top right next to your AWS account name when using the AWS IoT Core website.
7373

74-
Note that in a real application, you may want to avoid the use of wildcards in your ClientID or use them selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.
74+
Note that in a real application, you may want to avoid the use of wildcards in your ClientID and shared subscription group names/identifiers. Wildcards should only be used selectively. Please follow best practices when working with AWS on production applications using the SDK. Also, for the purposes of this sample, please make sure your policy allows a client ID of `test-*` to connect or use `--client_id <client ID here>` to send the client ID your policy supports.
7575

7676
</details>
7777

7878
## How to run
7979

80-
### Direct MQTT via mTLS
81-
8280
To Run this sample using a direct MQTT connection with a key and certificate, use the following command:
8381

8482
```sh

samples/mqtt5_shared_subscription.py

+19-37
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from awscrt import mqtt5
55
from awsiot import mqtt5_client_builder
6-
from uuid import uuid4
76
import threading
87
from concurrent.futures import Future
98
import time
@@ -16,8 +15,6 @@ class sample_mqtt5_client:
1615
client: mqtt5.Client
1716
name: str
1817
count: int
19-
received_count: int
20-
received_all_event = threading.Event()
2118
future_stopped: Future
2219
future_connection_success: Future
2320

@@ -29,11 +26,8 @@ def __init__(
2926
input_key,
3027
input_ca,
3128
input_client_id,
32-
input_count,
3329
input_client_name) -> None:
3430
try:
35-
self.count = input_count
36-
self.received_count = 0
3731
self.name = input_client_name
3832
self.future_stopped = Future()
3933
self.future_connection_success = Future()
@@ -68,10 +62,6 @@ def on_publish_received(self, publish_packet_data):
6862
user_property = publish_packet.user_properties[i]
6963
print(f"\t\twith UserProperty ({user_property.name}, {user_property.value})")
7064

71-
self.received_count += 1
72-
if self.received_count == self.count:
73-
self.received_all_event.set()
74-
7565
# Callback for the lifecycle event Stopped
7666
def on_lifecycle_stopped(self, lifecycle_stopped_data: mqtt5.LifecycleStoppedData):
7767
print(f"[{self.name}]: Lifecycle Stopped")
@@ -108,22 +98,18 @@ def on_lifecycle_disconnection(self, disconnect_data: mqtt5.LifecycleDisconnectD
10898
# Construct the shared topic
10999
input_shared_topic = f"$share/{cmdData.input_group_identifier}/{cmdData.input_topic}"
110100

111-
# Make sure the message count is even
112-
if (cmdData.input_count % 2 > 0):
113-
exit(ValueError("Error: '--count' is an odd number. '--count' must be even or zero for this sample."))
114-
115101
if __name__ == '__main__':
116102
try:
117103
# Create the MQTT5 clients: one publisher and two subscribers
118104
publisher = sample_mqtt5_client(
119105
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
120-
cmdData.input_clientId + "1", cmdData.input_count / 2, "Publisher")
106+
cmdData.input_clientId + "1", "Publisher")
121107
subscriber_one = sample_mqtt5_client(
122108
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
123-
cmdData.input_clientId + "2", cmdData.input_count / 2, "Subscriber One")
109+
cmdData.input_clientId + "2", "Subscriber One")
124110
subscriber_two = sample_mqtt5_client(
125111
cmdData.input_endpoint, cmdData.input_cert, cmdData.input_key, cmdData.input_ca,
126-
cmdData.input_clientId + "3", cmdData.input_count, "Subscriber Two")
112+
cmdData.input_clientId + "3", "Subscriber Two")
127113

128114
# Connect all the clients
129115
publisher.client.start()
@@ -136,25 +122,20 @@ def on_lifecycle_disconnection(self, disconnect_data: mqtt5.LifecycleDisconnectD
136122
subscriber_two.future_connection_success.result(60)
137123
print(f"[{subscriber_two.name}]: Connected")
138124

139-
# Subscribe to the shared topic on the two subscribers
125+
# Subscribe to the shared topic on both subscribers
140126
subscribe_packet = mqtt5.SubscribePacket(
141127
subscriptions=[mqtt5.Subscription(
142128
topic_filter=input_shared_topic,
143129
qos=mqtt5.QoS.AT_LEAST_ONCE)]
144130
)
145-
try:
146-
subscribe_one_future = subscriber_one.client.subscribe(subscribe_packet)
147-
suback_one = subscribe_one_future.result(60)
148-
print(f"[{subscriber_one.name}]: Subscribed with: {suback_one.reason_codes}")
149-
subscribe_two_future = subscriber_two.client.subscribe(subscribe_packet)
150-
suback_two = subscribe_two_future.result(60)
151-
print(f"[{subscriber_two.name}]: Subscribed with: {suback_two.reason_codes}")
152-
except Exception as ex:
153-
# TMP: If this fails subscribing in CI, just exit the sample gracefully.
154-
if (cmdData.input_is_ci is not None):
155-
exit(0)
156-
else:
157-
raise ex
131+
subscribe_one_future = subscriber_one.client.subscribe(subscribe_packet)
132+
suback_one = subscribe_one_future.result(60)
133+
print(f"[{subscriber_one.name}]: Subscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
134+
print(f"[{subscriber_one.name}]: Full subscribed topic is: '{input_shared_topic}' with SubAck code: {suback_one.reason_codes}")
135+
subscribe_two_future = subscriber_two.client.subscribe(subscribe_packet)
136+
suback_two = subscribe_two_future.result(60)
137+
print(f"[{subscriber_two.name}]: Subscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
138+
print(f"[{subscriber_two.name}]: Full subscribed topic is: '{input_shared_topic}' with SubAck code: {suback_two.reason_codes}")
158139

159140
# Publish using the publisher client
160141
if (cmdData.input_count > 0):
@@ -167,24 +148,25 @@ def on_lifecycle_disconnection(self, disconnect_data: mqtt5.LifecycleDisconnectD
167148
qos=mqtt5.QoS.AT_LEAST_ONCE
168149
))
169150
publish_completion_data = publish_future.result(60)
170-
print(f"[{publisher.name}]: Sent publish and got PubAck with {repr(publish_completion_data.puback.reason_code)}")
151+
print(f"[{publisher.name}]: Sent publish and got PubAck code: {repr(publish_completion_data.puback.reason_code)}")
171152
time.sleep(1)
172153
publish_count += 1
173154

174-
# Make sure all the messages were gotten on the subscribers
175-
subscriber_one.received_all_event.wait(60)
176-
subscriber_two.received_all_event.wait(60)
155+
# Wait 5 seconds to let the last publish go out before unsubscribing
156+
time.sleep(5)
177157
else:
178158
print("Skipping publishing messages due to message count being zero...")
179159

180160
# Unsubscribe from the shared topic on the two subscribers
181161
unsubscribe_packet = mqtt5.UnsubscribePacket(topic_filters=[input_shared_topic])
182162
unsubscribe_one_future = subscriber_one.client.unsubscribe(unsubscribe_packet)
183163
unsuback_one = unsubscribe_one_future.result(60)
184-
print(f"[{subscriber_one.name}]: Unsubscribed with {unsuback_one.reason_codes}")
164+
print(f"[{subscriber_one.name}]: Unsubscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
165+
print(f"[{subscriber_one.name}]: Full unsubscribed topic is: '{input_shared_topic}' with UnsubAck code: {unsuback_one.reason_codes}")
185166
unsubscribe_two_future = subscriber_two.client.unsubscribe(unsubscribe_packet)
186167
unsuback_two = unsubscribe_two_future.result(60)
187-
print(f"[{subscriber_two.name}]: Unsubscribed with {unsuback_two.reason_codes}")
168+
print(f"[{subscriber_two.name}]: Unsubscribed to topic '{cmdData.input_topic}' in shared subscription group '{cmdData.input_group_identifier}'.")
169+
print(f"[{subscriber_two.name}]: Full unsubscribed topic is: '{input_shared_topic}' with UnsubAck code {unsuback_two.reason_codes}")
188170

189171
# Disconnect all the clients
190172
publisher.client.stop()

0 commit comments

Comments
 (0)