15
15
import logging
16
16
from inspect import signature
17
17
from time import sleep
18
+ from time import time as timestamp
18
19
19
20
import paho .mqtt .client as paho
20
21
from math import ceil
@@ -737,8 +738,12 @@ def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
737
738
if msg_rate_limit .has_limit ():
738
739
return self .__send_publish_with_limitations (kwargs , timeout , device , msg_rate_limit , dp_rate_limit )
739
740
else :
741
+ if self .__is_test_latency_message (kwargs ['payload' ]):
742
+ kwargs = self .__convert_test_latency_message (kwargs )
743
+
740
744
if "payload" in kwargs and not isinstance (kwargs ["payload" ], str ):
741
745
kwargs ["payload" ] = dumps (kwargs ["payload" ])
746
+
742
747
return TBPublishInfo (self ._client .publish (** kwargs ))
743
748
elif _type == TBSendMethod .SUBSCRIBE :
744
749
return self ._client .subscribe (** kwargs )
@@ -788,6 +793,8 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
788
793
message_rate_limit = msg_rate_limit ,
789
794
dp_rate_limit = dp_rate_limit ,
790
795
amount = dp_rate_limit .get_minimal_limit ())
796
+ if self .__is_test_latency_message (kwargs ['payload' ]):
797
+ kwargs = self .__convert_test_latency_message (kwargs )
791
798
kwargs ["payload" ] = dumps (part ['message' ])
792
799
self .wait_until_current_queued_messages_processed ()
793
800
results .append (self ._client .publish (** kwargs ))
@@ -799,6 +806,8 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
799
806
message_rate_limit = msg_rate_limit ,
800
807
dp_rate_limit = dp_rate_limit ,
801
808
amount = datapoints )
809
+ if self .__is_test_latency_message (kwargs ['payload' ]):
810
+ kwargs = self .__convert_test_latency_message (kwargs )
802
811
kwargs ["payload" ] = dumps (payload )
803
812
return TBPublishInfo (self ._client .publish (** kwargs ))
804
813
@@ -1076,3 +1085,24 @@ def _split_message(message_pack, max_size, max_payload_size):
1076
1085
if add_last_item :
1077
1086
split_messages .append (final_message_item )
1078
1087
return split_messages
1088
+
1089
+ @staticmethod
1090
+ def __is_test_latency_message (payload ):
1091
+ if isinstance (payload , list ) and payload [0 ].get ('values' , {}).get ('isTestLatencyMessageType' , False ):
1092
+ return True
1093
+
1094
+ return False
1095
+
1096
+ @staticmethod
1097
+ def __convert_test_latency_message (kwargs ):
1098
+ try :
1099
+ values = kwargs ['payload' ][0 ]['values' ]
1100
+ payload = {
1101
+ values ['connectorName' ]: {'receivedTs' : values ['receivedTs' ], 'publishedTs' : int (timestamp () * 1000 )}}
1102
+
1103
+ kwargs ['payload' ] = payload
1104
+ kwargs ['topic' ] = 'v1/gateway/latency'
1105
+ return kwargs
1106
+ except Exception as e :
1107
+ log .error (e )
1108
+ return kwargs
0 commit comments