Skip to content

Commit 33e4701

Browse files
committed
Added basic handling for CONACK properties
1 parent 0b093f7 commit 33e4701

File tree

2 files changed

+113
-59
lines changed

2 files changed

+113
-59
lines changed

tb_device_mqtt.py

Lines changed: 109 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def check_tb_paho_mqtt_installed():
4040
raise ImportError("tb-paho-mqtt-client is not installed, please install it manually.") from e
4141

4242
import paho.mqtt.client as paho
43+
from paho.mqtt.enums import CallbackAPIVersion
4344
from math import ceil
4445

4546
try:
@@ -395,6 +396,7 @@ def reach_limit(self):
395396
log.info("Received disconnection due to rate limit for \"%s\" rate limit, waiting for tokens in bucket for %s seconds",
396397
self.name,
397398
target_duration)
399+
return self.__reached_limit_index, self.__reached_limit_index_time
398400

399401
@property
400402
def __dict__(self):
@@ -475,7 +477,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
475477
messages_rate_limit = messages_rate_limit if kwargs.get('rate_limit') == "DEFAULT_RATE_LIMIT" else kwargs.get('rate_limit', messages_rate_limit) # noqa
476478
telemetry_rate_limit = telemetry_rate_limit if kwargs.get('rate_limit') == "DEFAULT_RATE_LIMIT" else kwargs.get('rate_limit', telemetry_rate_limit) # noqa
477479
telemetry_dp_rate_limit = telemetry_dp_rate_limit if kwargs.get('dp_rate_limit') == "DEFAULT_RATE_LIMIT" else kwargs.get('dp_rate_limit', telemetry_dp_rate_limit) # noqa
478-
self._client = paho.Client(protocol=5, client_id=client_id)
480+
self._client = paho.Client(protocol=5, client_id=client_id, callback_api_version=CallbackAPIVersion.VERSION2)
479481
self.quality_of_service = quality_of_service if quality_of_service is not None else 1
480482
self.__host = host
481483
self.__port = port
@@ -512,7 +514,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
512514
self.__timeout_thread.daemon = True
513515
self.__timeout_thread.start()
514516
self._client.on_connect = self._on_connect
515-
# self._client.on_publish = self._on_publish
517+
self._client.on_publish = self._on_publish
516518
self._client.on_message = self._on_message
517519
self._client.on_disconnect = self._on_disconnect
518520
self.current_firmware_info = {
@@ -528,6 +530,8 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
528530
self.__request_service_configuration_required = False
529531
self.__service_loop = Thread(target=self.__service_loop, name="Service loop", daemon=True)
530532
self.__service_loop.start()
533+
self.__messages_limit_reached_set_time = (0,0)
534+
self.__datapoints_limit_reached_set_time = (0,0)
531535

532536
def __service_loop(self):
533537
while not self.stopped:
@@ -550,51 +554,74 @@ def __service_loop(self):
550554
self.firmware_received = False
551555
sleep(0.05)
552556

553-
def _on_publish(self, client, userdata, mid):
554-
# log.debug("Message %s was published, by client with id: %r", mid ,id(client))
555-
pass
556-
557-
def _on_disconnect(self, client: paho.Client, userdata, result_code, properties=None):
557+
def _on_publish(self, client, userdata, mid, rc=None, properties=None):
558+
if isinstance(rc, ReasonCodes) and rc.value != 0:
559+
log.debug("Publish failed with result code %s (%s) ", str(rc.value), rc.getName())
560+
if rc.value in [151, 131]:
561+
if self.__messages_limit_reached_set_time[1] - monotonic() > self.__messages_limit_reached_set_time[0]:
562+
self.__messages_limit_reached_set_time = self._messages_rate_limit.reach_limit()
563+
if self.__datapoints_limit_reached_set_time[1] - monotonic() > self.__datapoints_limit_reached_set_time[0]:
564+
self._telemetry_dp_rate_limit.reach_limit()
565+
if rc.value == 0:
566+
if self.__messages_limit_reached_set_time[0] > 0 and self.__messages_limit_reached_set_time[1] > 0:
567+
self.__messages_limit_reached_set_time = (0, 0)
568+
if self.__datapoints_limit_reached_set_time[0] > 0 and self.__datapoints_limit_reached_set_time[1] > 0:
569+
self.__datapoints_limit_reached_set_time = (0, 0)
570+
571+
def _on_disconnect(self, client: paho.Client, userdata, disconnect_flags, reason=None, properties=None):
558572
self.__is_connected = False
559-
client._out_packet.clear()
560-
client._out_messages.clear()
573+
with self._client._out_message_mutex:
574+
client._out_packet.clear()
575+
client._out_messages.clear()
561576
client._in_messages.clear()
562577
self.__attr_request_number = 0
563578
self.__device_max_sub_id = 0
564579
self.__device_client_rpc_number = 0
565580
self.__device_sub_dict = {}
566581
self.__device_client_rpc_dict = {}
567582
self.__attrs_request_timeout = {}
568-
log.warning("MQTT client was disconnected with reason code %s (%s) ",
569-
str(result_code), TBPublishInfo.ERRORS_DESCRIPTION.get(result_code, "Description not found."))
583+
result_code = reason.value
584+
if disconnect_flags.is_disconnect_packet_from_server:
585+
log.warning("MQTT client was disconnected by server with reason code %s (%s) ",
586+
str(result_code), reason.getName())
587+
else:
588+
log.info("MQTT client was disconnected by client with reason code %s (%s) ",
589+
str(result_code), reason.getName())
570590
log.debug("Client: %s, user data: %s, result code: %s. Description: %s",
571591
str(client), str(userdata),
572-
str(result_code), TBPublishInfo.ERRORS_DESCRIPTION.get(result_code, "Description not found."))
592+
str(result_code), reason.getName())
573593

574-
def _on_connect(self, client, userdata, flags, result_code, *extra_params):
594+
def _on_connect(self, client, userdata, connect_flags, result_code, properties, *extra_params):
575595
if result_code == 0:
576596
self.__is_connected = True
577597
log.info("MQTT client %r - Connected!", client)
598+
if properties:
599+
log.debug("MQTT client %r - CONACK Properties: %r", client, properties)
600+
config = {
601+
"maxPayloadSize": int(properties.MaximumPacketSize * DEFAULT_RATE_LIMIT_PERCENTAGE / 100),
602+
"maxInflightMessages": properties.ReceiveMaximum,
603+
}
604+
self.on_service_configuration(None, config)
578605
self._subscribe_to_topic(ATTRIBUTES_TOPIC, qos=self.quality_of_service)
579606
self._subscribe_to_topic(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service)
580607
self._subscribe_to_topic(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service)
581608
self._subscribe_to_topic(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service)
582609
self.__request_service_configuration_required = True
583610
else:
584-
if isinstance(result_code, int):
585-
if result_code in RESULT_CODES:
586-
log.error("connection FAIL with error %s %s", result_code, RESULT_CODES[result_code])
587-
else:
588-
log.error("connection FAIL with unknown error")
589-
elif isinstance(result_code, ReasonCodes):
590-
log.error("connection FAIL with error %s %s", result_code, result_code.getName())
611+
log.error("Connection failed with result code %s (%s) ",
612+
str(result_code.value), result_code.getName())
591613

592614
if callable(self.__connect_callback):
593615
sleep(.2)
594616
if "tb_client" in signature(self.__connect_callback).parameters:
595-
self.__connect_callback(client, userdata, flags, result_code, *extra_params, tb_client=self)
617+
self.__connect_callback(client, userdata, connect_flags, result_code, properties, *extra_params, tb_client=self)
596618
else:
597-
self.__connect_callback(client, userdata, flags, result_code, *extra_params)
619+
self.__connect_callback(client, userdata, connect_flags, result_code, *extra_params)
620+
621+
if result_code.value in [159, 151]:
622+
log.debug("Connection rate limit reached, waiting before reconnecting...")
623+
sleep(1) # Wait for 1 second before reconnecting, if connection rate limit is reached
624+
log.debug("Reconnecting allowed...")
598625

599626
def get_firmware_update(self):
600627
self._client.subscribe("v2/fw/response/+")
@@ -954,40 +981,67 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
954981
log.debug("Rate limit released, sending data to ThingsBoard...")
955982

956983
def _wait_until_current_queued_messages_processed(self):
957-
previous_notification_time = 0
958-
current_out_messages = len(self._client._out_messages) * 2
959-
max_inflight_messages = self._client._max_inflight_messages if self._client._max_inflight_messages > 0 else 5
960984
logger = None
961-
waiting_started = int(monotonic())
962-
connection_was_lost = False
963-
timeout_for_break = 300
964-
965-
if current_out_messages > 0:
966-
while current_out_messages >= max_inflight_messages and not self.stopped:
967-
current_out_messages = len(self._client._out_messages)
968-
elapsed = monotonic() - waiting_started
969-
remaining = timeout_for_break - elapsed
970-
971-
if int(monotonic()) - previous_notification_time > 5 and current_out_messages > max_inflight_messages:
972-
if logger is None:
973-
logger = logging.getLogger('tb_connection')
974-
logger.debug(
975-
"Waiting for messages to be processed: current queue size: %r, max inflight: %r. "
976-
"Elapsed time: %.2f seconds, remaining timeout: %.2f seconds",
977-
current_out_messages, max_inflight_messages, elapsed, remaining
978-
)
979-
previous_notification_time = int(monotonic())
980-
981-
connection_was_lost = True
982-
983-
if current_out_messages >= max_inflight_messages:
984-
sleep(.01)
985-
986-
if (elapsed > timeout_for_break and not connection_was_lost) or self.stopped:
987-
logger.debug("Breaking wait loop after %.2f seconds due to timeout or stop signal.", elapsed)
988-
break
989-
990-
sleep(.001)
985+
986+
max_wait_time = 300
987+
log_interval = 5
988+
stuck_threshold = 15
989+
polling_interval = 0.05
990+
max_inflight = self._client._max_inflight_messages
991+
992+
if len(self._client._out_messages) < max_inflight or max_inflight == 0:
993+
return
994+
995+
waiting_start = monotonic()
996+
last_log_time = waiting_start
997+
last_queue_size = len(self._client._out_messages)
998+
last_queue_change_time = waiting_start
999+
1000+
while not self.stopped:
1001+
now = monotonic()
1002+
elapsed = now - waiting_start
1003+
current_queue_size = len(self._client._out_messages)
1004+
1005+
if current_queue_size < max_inflight:
1006+
return
1007+
1008+
if current_queue_size != last_queue_size:
1009+
last_queue_size = current_queue_size
1010+
last_queue_change_time = now
1011+
1012+
if (now - last_queue_change_time > stuck_threshold
1013+
and not self._client.is_connected()):
1014+
if logger is None:
1015+
logger = logging.getLogger('tb_connection')
1016+
logger.warning(
1017+
"MQTT out_messages queue is stuck (%d messages) and client is disconnected. "
1018+
"Clearing queue after %.2f seconds.",
1019+
current_queue_size, now - last_queue_change_time
1020+
)
1021+
with self._client._out_message_mutex:
1022+
self._client._out_packet.clear()
1023+
return
1024+
1025+
if now - last_log_time >= log_interval:
1026+
if logger is None:
1027+
logger = logging.getLogger('tb_connection')
1028+
logger.debug(
1029+
"Waiting for MQTT queue to drain: %d messages (max inflight %d). "
1030+
"Elapsed: %.2f s",
1031+
current_queue_size, max_inflight, elapsed
1032+
)
1033+
last_log_time = now
1034+
1035+
if elapsed > max_wait_time:
1036+
if logger is None:
1037+
logger = logging.getLogger('tb_connection')
1038+
logger.warning(
1039+
"MQTT wait timeout reached (%.2f s). Queue still has %d messages.",
1040+
elapsed, current_queue_size
1041+
)
1042+
return
1043+
1044+
sleep(polling_interval)
9911045

9921046
def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
9931047
msg_rate_limit=None, dp_rate_limit=None):

tb_gateway_mqtt.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ def __init__(self, host, port=1883, username=None, password=None, gateway=None,
8282
self._gw_subscriptions = {}
8383
self.gateway = gateway
8484

85-
def _on_connect(self, client, userdata, flags, result_code, *extra_params):
86-
super()._on_connect(client, userdata, flags, result_code, *extra_params)
85+
def _on_connect(self, client, userdata, flags, result_code, properties, *extra_params):
86+
super()._on_connect(client, userdata, flags, result_code, properties, *extra_params)
8787
if result_code == 0:
8888
gateway_attributes_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])
8989
self._add_or_delete_subscription(GATEWAY_ATTRIBUTES_TOPIC, gateway_attributes_topic_sub_id)
@@ -94,7 +94,7 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params):
9494
gateway_rpc_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_RPC_TOPIC, qos=1)[1])
9595
self._add_or_delete_subscription(GATEWAY_RPC_TOPIC, gateway_rpc_topic_sub_id)
9696

97-
def _on_subscribe(self, client, userdata, mid, reasoncodes, properties=None):
97+
def _on_subscribe(self, client, userdata, mid, reasoncodes, properties=None, *extra_params):
9898
subscription = self._gw_subscriptions.get(mid)
9999
if subscription is not None:
100100
if mid == 128:
@@ -116,7 +116,7 @@ def _add_or_delete_subscription(self, topic, subscription_id):
116116

117117
@staticmethod
118118
def _on_unsubscribe(*args):
119-
log.debug(args)
119+
log.debug("Unsubscribe callback called with args: %r", args)
120120

121121
def get_subscriptions_in_progress(self):
122122
return True if self._gw_subscriptions else False

0 commit comments

Comments
 (0)