Skip to content

Commit 8cccdc9

Browse files
authored
Merge pull request #68 from samson0v/master
Added service configuration processing, small fixes for examples
2 parents 2693a89 + af0f80f commit 8cccdc9

File tree

5 files changed

+191
-47
lines changed

5 files changed

+191
-47
lines changed

examples/device/client_rpc_request.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def callback(request_id, resp_body, exception=None):
2323
logging.error("Exception: " + str(exception))
2424
else:
2525
logging.info("request id: {request_id}, response body: {resp_body}".format(request_id=request_id,
26-
resp_body=resp_body))
26+
resp_body=resp_body))
2727

2828

2929
def main():

examples/device/hardware_specs_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def on_upload_frequency_change(value, error):
4242
def on_server_side_rpc_request(request_id, request_body):
4343
print(client, request_id, request_body)
4444
if request_body["method"] == "getCPULoad":
45-
client.send_rpc_reply(request_id, {"CPU percent": psutil.cpu_percent()})
45+
client.send_rpc_reply(request_id, {"CPU percent": psutil.cpu_percent(interval=0.1)})
4646
elif request_body["method"] == "getMemoryUsage":
4747
client.send_rpc_reply(request_id, {"Memory": psutil.virtual_memory().percent})
4848
else:

examples/gateway/respond_to_rpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def rpc_request_response(gateway, request_body):
3232
req_id = request_body["data"]["id"]
3333
# dependently of request method we send different data back
3434
if method == 'getCPULoad':
35-
gateway.gw_send_rpc_reply(device, req_id, psutil.cpu_percent())
35+
gateway.gw_send_rpc_reply(device, req_id, psutil.cpu_percent(interval=0.1))
3636
elif method == 'getMemoryLoad':
3737
gateway.gw_send_rpc_reply(device, req_id, psutil.virtual_memory().percent)
3838
else:

tb_device_mqtt.py

Lines changed: 122 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ def __init__(self, rate_limit):
198198
self.__no_limit = False
199199
if ''.join(c for c in rate_limit if c not in [' ', ',', ';']) in ("", "0:0"):
200200
self.__no_limit = True
201-
self.__config = rate_limit
202201
self.__rate_limit_dict = {}
203202
self.__lock = RLock()
204203
rate_configs = rate_limit.split(";")
@@ -248,9 +247,38 @@ def get_minimal_timeout(self):
248247
def has_limit(self):
249248
return not self.__no_limit
250249

250+
def set_limit(self, rate_limit, percentage=0):
251+
self.__rate_limit_dict = {}
252+
rate_configs = rate_limit.split(";")
253+
if "," in rate_limit:
254+
rate_configs = rate_limit.split(",")
255+
for rate in rate_configs:
256+
if rate == "":
257+
continue
258+
rate = rate.split(":")
259+
self.__rate_limit_dict[int(rate[1])] = {"counter": self.__rate_limit_dict[int(rate[1])]['counter'],
260+
"start": self.__rate_limit_dict[int(rate[1])]['start'],
261+
"limit": int(rate[0]) * percentage / 100}
262+
251263
@staticmethod
252264
def get_rate_limits_by_host(host, rate_limit, dp_rate_limit):
253-
if rate_limit == "DEFAULT_RATE_LIMIT":
265+
rate_limit = RateLimit.get_rate_limit_by_host(host, rate_limit)
266+
dp_rate_limit = RateLimit.get_dp_rate_limit_by_host(host, dp_rate_limit)
267+
268+
return rate_limit, dp_rate_limit
269+
270+
@staticmethod
271+
def get_rate_limit_by_host(host, rate_limit):
272+
if rate_limit == "DEFAULT_TELEMETRY_RATE_LIMIT":
273+
if "thingsboard.cloud" in host:
274+
rate_limit = "100:1,4000:60,70000:3600,"
275+
elif "tb" in host and "cloud" in host:
276+
rate_limit = "10:1,300:60,7000:3600,"
277+
elif "demo.thingsboard.io" in host:
278+
rate_limit = "10:1,300:60,"
279+
else:
280+
rate_limit = "0:0,"
281+
elif rate_limit == "DEFAULT_MESSAGES_RATE_LIMIT":
254282
if "thingsboard.cloud" in host:
255283
rate_limit = "100:1,4000:60,70000:3600,"
256284
elif "tb" in host and "cloud" in host:
@@ -262,7 +290,11 @@ def get_rate_limits_by_host(host, rate_limit, dp_rate_limit):
262290
else:
263291
rate_limit = rate_limit
264292

265-
if dp_rate_limit == "DEFAULT_RATE_LIMIT":
293+
return rate_limit
294+
295+
@staticmethod
296+
def get_dp_rate_limit_by_host(host, dp_rate_limit):
297+
if dp_rate_limit == "DEFAULT_TELEMETRY_DP_RATE_LIMIT":
266298
if "thingsboard.cloud" in host:
267299
dp_rate_limit = "190:1,5900:60,13900:3600,"
268300
elif "tb" in host and "cloud" in host:
@@ -274,13 +306,18 @@ def get_rate_limits_by_host(host, rate_limit, dp_rate_limit):
274306
else:
275307
dp_rate_limit = dp_rate_limit
276308

277-
return rate_limit, dp_rate_limit
309+
return dp_rate_limit
278310

279311

280312
class TBDeviceMqttClient:
281313
"""ThingsBoard MQTT client. This class provides interface to send data to ThingsBoard and receive data from"""
314+
315+
EMPTY_RATE_LIMIT = RateLimit('0:0,')
316+
282317
def __init__(self, host, port=1883, username=None, password=None, quality_of_service=None, client_id="",
283-
chunk_size=0, rate_limit="DEFAULT_RATE_LIMIT", dp_rate_limit="DEFAULT_RATE_LIMIT"):
318+
chunk_size=0, messages_rate_limit="DEFAULT_MESSAGES_RATE_LIMIT",
319+
telemetry_rate_limit="DEFAULT_TELEMETRY_RATE_LIMIT",
320+
telemetry_dp_rate_limit="DEFAULT_TELEMETRY_DP_RATE_LIMIT", max_payload_size=8196):
284321
self._client = paho.Client(protocol=5, client_id=client_id)
285322
self.quality_of_service = quality_of_service if quality_of_service is not None else 1
286323
self.__host = host
@@ -301,10 +338,17 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
301338
self.__device_sub_dict = {}
302339
self.__device_client_rpc_dict = {}
303340
self.__attr_request_number = 0
304-
rate_limit, dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host, rate_limit, dp_rate_limit)
305-
self.__rate_limit = RateLimit(rate_limit)
306-
self.__dp_rate_limit = RateLimit(dp_rate_limit)
307-
self._client.max_inflight_messages_set(self.__rate_limit.get_minimal_limit())
341+
self.max_payload_size = max_payload_size
342+
self.service_configuration_callback = self.__on_service_configuration
343+
telemetry_rate_limit, telemetry_dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host,
344+
telemetry_rate_limit,
345+
telemetry_dp_rate_limit)
346+
messages_rate_limit = RateLimit.get_rate_limit_by_host(self.__host, messages_rate_limit)
347+
348+
self._messages_rate_limit = RateLimit(messages_rate_limit)
349+
self.__telemetry_rate_limit = RateLimit(telemetry_rate_limit)
350+
self.__telemetry_dp_rate_limit = RateLimit(telemetry_dp_rate_limit)
351+
self._client.max_inflight_messages_set(self.__telemetry_rate_limit.get_minimal_limit())
308352
self.__attrs_request_timeout = {}
309353
self.__timeout_thread = Thread(target=self.__timeout_check)
310354
self.__timeout_thread.daemon = True
@@ -345,6 +389,7 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params):
345389
self._subscribe_to_topic(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service)
346390
self._subscribe_to_topic(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service)
347391
self._subscribe_to_topic(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service)
392+
self.request_service_configuration(self.service_configuration_callback)
348393
else:
349394
if isinstance(result_code, int):
350395
if result_code in RESULT_CODES:
@@ -599,20 +644,39 @@ def send_rpc_call(self, method, params, callback):
599644
RPC_REQUEST_TOPIC + str(rpc_request_id),
600645
self.quality_of_service)
601646

647+
def request_service_configuration(self, callback):
648+
self.send_rpc_call("getServiceConfiguration", {}, callback)
649+
650+
def __on_service_configuration(self, _, service_config, *args, **kwargs):
651+
if service_config.get("deviceMsgRateLimit"):
652+
self._messages_rate_limit.set_limit(service_config.get("deviceMsgRateLimit"), percentage=80)
653+
if service_config.get('deviceTelemetryMsgRateLimit'):
654+
self.__telemetry_rate_limit.set_limit(service_config.get('deviceTelemetryMsgRateLimit'), percentage=80)
655+
if service_config.get('deviceTelemetryDataPointsRateLimit'):
656+
self.__telemetry_dp_rate_limit.set_limit(service_config.get('deviceTelemetryDataPointsRateLimit'),
657+
percentage=80)
658+
if service_config.get('maxInflightMessages'):
659+
self.max_inflight_messages_set(int(service_config.get('maxInflightMessages')))
660+
if service_config.get('maxPayloadSize'):
661+
self.max_payload_size = int(service_config.get('maxPayloadSize'))
662+
if service_config.get('payloadType'):
663+
pass
664+
602665
def set_server_side_rpc_request_handler(self, handler):
603666
"""Set the callback that will be called when a server-side RPC is received."""
604667
self.__device_on_server_side_rpc_response = handler
605668

606-
def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_limit, amount=1):
669+
def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_limit=None, amount=1):
607670
start_time = int(time())
608-
timeout = max(message_rate_limit.get_minimal_timeout(), dp_rate_limit.get_minimal_timeout(), timeout) + 10
671+
dp_rate_limit_timeout = dp_rate_limit.get_minimal_timeout() if dp_rate_limit is not None else 0
672+
timeout = max(message_rate_limit.get_minimal_timeout(), dp_rate_limit_timeout, timeout) + 10
609673
timeout_updated = False
610674
disconnected = False
611675
limit_reached_check = True
612676
log_posted = False
613677
while limit_reached_check:
614678
limit_reached_check = (message_rate_limit.check_limit_reached()
615-
or dp_rate_limit.check_limit_reached(amount=amount)
679+
or (dp_rate_limit is not None and dp_rate_limit.check_limit_reached(amount=amount))
616680
or not self.is_connected())
617681
if not timeout_updated and limit_reached_check:
618682
timeout = max(timeout, limit_reached_check) + 10
@@ -642,48 +706,69 @@ def wait_until_current_queued_messages_processed(self):
642706
previous_notification_time = int(time())
643707
sleep(.001)
644708

645-
def _send_request(self, type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
709+
def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
646710
msg_rate_limit=None, dp_rate_limit=None):
647711
if msg_rate_limit is None:
648-
msg_rate_limit = self.__rate_limit
712+
if kwargs.get('topic') == TELEMETRY_TOPIC:
713+
msg_rate_limit = self.__telemetry_rate_limit
714+
else:
715+
msg_rate_limit = self._messages_rate_limit
649716
if dp_rate_limit is None:
650-
dp_rate_limit = self.__dp_rate_limit
717+
if kwargs.get('topic') == TELEMETRY_TOPIC:
718+
dp_rate_limit = self.__telemetry_dp_rate_limit
719+
else:
720+
dp_rate_limit = self.EMPTY_RATE_LIMIT
651721
if msg_rate_limit.has_limit() or dp_rate_limit.has_limit():
652722
msg_rate_limit.increase_rate_limit_counter()
653723
is_reached = self._wait_for_rate_limit_released(timeout, msg_rate_limit, dp_rate_limit)
654724
if is_reached:
655725
return is_reached
656726

657-
if type == TBSendMethod.PUBLISH:
658-
if self.__rate_limit.has_limit():
727+
if _type == TBSendMethod.PUBLISH:
728+
if msg_rate_limit.has_limit():
659729
return self.__send_publish_with_limitations(kwargs, timeout, device, msg_rate_limit, dp_rate_limit)
660730
else:
661731
if "payload" in kwargs and not isinstance(kwargs["payload"], str):
662732
kwargs["payload"] = dumps(kwargs["payload"])
663733
return TBPublishInfo(self._client.publish(**kwargs))
664-
elif type == TBSendMethod.SUBSCRIBE:
734+
elif _type == TBSendMethod.SUBSCRIBE:
665735
return self._client.subscribe(**kwargs)
666-
elif type == TBSendMethod.UNSUBSCRIBE:
736+
elif _type == TBSendMethod.UNSUBSCRIBE:
667737
return self._client.unsubscribe(**kwargs)
668738

669-
def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate_limit=None, dp_rate_limit=None):
739+
def __get_rate_limits_by_topic(self, topic, device=None, msg_rate_limit=None, dp_rate_limit=None):
740+
if device is not None:
741+
return msg_rate_limit, dp_rate_limit
742+
else:
743+
if topic == TELEMETRY_TOPIC:
744+
return self.__telemetry_rate_limit, self.__telemetry_dp_rate_limit
745+
else:
746+
return self._messages_rate_limit, None
747+
748+
def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate_limit: RateLimit = None,
749+
dp_rate_limit: RateLimit = None):
670750
data_for_analysis = data = kwargs.get("payload")
671751
if isinstance(data, str):
672752
data_for_analysis = loads(data)
673-
datapoints = self._count_datapoints_in_message(data_for_analysis, device=device)
753+
datapoints = -1
754+
if dp_rate_limit.has_limit():
755+
datapoints = self._count_datapoints_in_message(data_for_analysis, device=device)
674756
payload = data
675-
if self.__dp_rate_limit.get_minimal_limit() < datapoints:
757+
if dp_rate_limit.has_limit() and datapoints >= 0 and dp_rate_limit.get_minimal_limit() < datapoints:
676758
log.debug("Rate limit is too low, cannot send message with %i datapoints, "
677759
"splitting to messages with %i datapoints",
678760
datapoints, dp_rate_limit.get_minimal_limit())
679761
if device is None or data.get(device) is None:
680-
device_split_messages = self._split_message(data, dp_rate_limit.get_minimal_limit())
762+
device_split_messages = self._split_message(data, dp_rate_limit.get_minimal_limit(),
763+
self.max_payload_size)
681764
split_messages = [{'message': split_message['data'], 'datapoints': split_message['datapoints']}
682765
for split_message in device_split_messages]
683766
else:
684767
device_data = data.get(device)
685-
device_split_messages = self._split_message(device_data, dp_rate_limit.get_minimal_limit())
686-
split_messages = [{'message': {device: [split_message['data']]}, 'datapoints': split_message['datapoints']}
768+
device_split_messages = self._split_message(device_data, dp_rate_limit.get_minimal_limit(),
769+
self.max_payload_size)
770+
split_messages = [
771+
{'message': {device: [split_message['data']]}, 'datapoints': split_message['datapoints']}
687772
for split_message in device_split_messages]
688773
if len(split_messages) == 0:
689774
log.debug("Cannot split message to smaller parts!")
@@ -699,11 +784,12 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
699784
results.append(self._client.publish(**kwargs))
700785
return TBPublishInfo(results)
701786
else:
702-
dp_rate_limit.increase_rate_limit_counter(datapoints)
703-
self._wait_for_rate_limit_released(timeout,
704-
message_rate_limit=msg_rate_limit,
705-
dp_rate_limit=dp_rate_limit,
706-
amount=datapoints)
787+
if dp_rate_limit is not None:
788+
dp_rate_limit.increase_rate_limit_counter(datapoints)
789+
self._wait_for_rate_limit_released(timeout,
790+
message_rate_limit=msg_rate_limit,
791+
dp_rate_limit=dp_rate_limit,
792+
amount=datapoints)
707793
kwargs["payload"] = dumps(payload)
708794
return TBPublishInfo(self._client.publish(**kwargs))
709795

@@ -720,7 +806,7 @@ def _subscribe_to_topic(self, topic, qos=None, timeout=DEFAULT_TIMEOUT):
720806
waiting_for_connection_message_time = time()
721807
sleep(0.01)
722808

723-
return self._send_request(TBSendMethod.SUBSCRIBE, {"topic": topic, "qos": qos}, timeout)
809+
return self._send_request(TBSendMethod.SUBSCRIBE, {"topic": topic, "qos": qos}, timeout, msg_rate_limit=self._messages_rate_limit)
724810

725811
def _publish_data(self, data, topic, qos, timeout=DEFAULT_TIMEOUT, device=None,
726812
msg_rate_limit=None, dp_rate_limit=None):
@@ -921,7 +1007,7 @@ def provision(host,
9211007
return provisioning_client.get_credentials()
9221008

9231009
@staticmethod
924-
def _split_message(message_pack, max_size):
1010+
def _split_message(message_pack, max_size, max_payload_size):
9251011
if message_pack is None:
9261012
return []
9271013
split_messages = []
@@ -961,10 +1047,12 @@ def _split_message(message_pack, max_size):
9611047
message_item_values_with_allowed_size = {}
9621048
for current_data_key_index in range(len(values_data_keys)):
9631049
data_key = values_data_keys[current_data_key_index]
964-
if len(message_item_values_with_allowed_size.keys()) < max_size:
1050+
if len(message_item_values_with_allowed_size.keys()) < max_size and len(
1051+
str(message_item_values_with_allowed_size)) < max_payload_size:
9651052
message_item_values_with_allowed_size[data_key] = values[data_key]
9661053
if (len(message_item_values_with_allowed_size.keys()) >= max_size
967-
or current_data_key_index == len(values_data_keys) - 1):
1054+
or current_data_key_index == len(values_data_keys) - 1) or len(
1055+
str(message_item_values_with_allowed_size)) >= max_payload_size:
9681056
if ts is not None:
9691057
final_message_item['data'] = {"ts": ts, "values": message_item_values_with_allowed_size}
9701058
else:

0 commit comments

Comments
 (0)