-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
179 lines (131 loc) · 4.79 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# fmt: off
# ## Basic Example
#
# **Create the client** and connect to the MQTT broker.
import time
import typing
import tenta
import json
import ssl
tenta_client = tenta.TentaClient(
mqtt_host="test.mosquitto.org", mqtt_port=1884,
mqtt_identifier="rw", mqtt_password="readwrite",
# the sensor identifier is optional; if you do not specify it here you
# will have to specify it when publishing messages (see below)
sensor_identifier="81b...",
# receive_configs is set to true by default a `ValueError` will be raised
# if this is `True` but `sensor_identifier` has not been specified
receive_configs=True,
)
# **Publish logs asynchronously**, i.e., your code continues to
# run while the message is being published. You can send them one
# by one or in batches.
tenta_client.publish(
tenta.LogMessage(severity="info", message="Hello, world!")
)
tenta_client.publish(
tenta.LogMessage(severity="info", message="Hello, to you too!")
)
tenta_client.publish(
[
tenta.LogMessage(severity="warning", message="Excepteur voluptate proident"),
tenta.LogMessage(severity="error", message="esse aliqua nisi elit"),
]
)
tenta_client.wait_for_publish()
print("Logs 1-4 published!")
# **Publish logs synchronously**, i.e., your code waits until the
# message has been published successfully.
tenta_client.publish(
tenta.LogMessage(severity="warning", message="do incididunt"),
wait_for_publish=True, wait_for_publish_timeout=5,
)
print("Log 5 published!")
tenta_client.publish(
[
tenta.LogMessage(severity="warning", message="dolor elit laboris ipsum"),
tenta.LogMessage(severity="warning", message="Consequat laboris incididunt")
],
wait_for_publish=True, wait_for_publish_timeout=5,
)
print("Log 6-7 published!")
# You can specify the `sensor_identifier` either for the whole client
# on creation (as seen above) or pass the `sensor_identifier` to the
# `publish` function directly which will override the client's value.
# A `ValueError` will be raised if the `sensor_identifier` is not
# specified anywhere.
tenta_client.publish(
tenta.LogMessage(severity="warning", message="Hello, to you too!"),
sensor_identifier="not-81b...",
wait_for_publish=True, wait_for_publish_timeout=5,
)
# **Publish measurements asynchronously**.
tenta_client.publish(
tenta.MeasurementMessage(
value={
"temperature": 20.0, "humidity": 50.0,
"pressure": 1013.25, "voltage": 3.3,
},
)
)
tenta_client.wait_for_publish()
# You can give log messages and measurements a **timestamp**.
# If you do not specify a timestamp, the current time will be used.
tenta_client.publish(
tenta.MeasurementMessage(
value={
"coolness": 80.0, "swagg": 9001,
},
timestamp=time.time() - 3600,
)
)
tenta_client.wait_for_publish()
# **Publish acknowledgment asynchronously**, i.e., "did the sensor successfully
# process a new config revision?" -> `True`/`False`.
tenta_client.publish(
tenta.AcknowledgmentMessage(success=False, revision=20)
)
tenta_client.wait_for_publish()
# Get the **latest received config** on demand. This will raise a `ValueError`
# if `receive_configs` has not been set to `True` on client creation.
config_message: typing.Optional[
tenta.ConfigurationMessage
] = tenta_client.get_latest_received_config_message()
# **Tear down** the MQTT connection and the client.
tenta_client.teardown()
# ## Advanced Example
#
# You can pass **callbacks** to the client to get notified when a message has
# been published successfully or when the client receives a new config message.
#
# You can also check the current **length of the clients message queue** =
# number of messages that have not been published yet. When offline, a full
# queue might raise Exceptions.
tenta_client = tenta.TentaClient(
mqtt_host="localhost", mqtt_port=1884,
mqtt_identifier="server", mqtt_password="password",
sensor_identifier="81c...",
# callbacks are optional
on_config_message=lambda config_message: print(
f"New config message: {json.dumps(config_message)}"
),
on_publish=lambda message_id: print(
f"Message with id {message_id} has been published!"
),
)
tenta_client.teardown()
# You can communicate using **TLS encryption**.
tenta_client_with_tls = tenta.TentaClient(
mqtt_host="test.mosquitto.org", mqtt_port=8885,
mqtt_identifier="rw", mqtt_password="readwrite",
sensor_identifier="81b...",
# tls settings are optional
# this server certificate is only valid for the MQTT
# broker hosted at test.mosquitto.org:8885
tls_parameters=tenta.TLSParameters(
ca_certs=".../tests/mosquitto.org.crt",
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS_CLIENT,
),
)
tenta_client_with_tls.teardown()