@@ -1107,14 +1107,14 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
1107
1107
attributes_format = topic .endswith ('attributes' )
1108
1108
if topic .endswith ('telemetry' ) or attributes_format :
1109
1109
if device is None or data .get (device ) is None :
1110
- device_split_messages = self ._split_message (data , dp_rate_limit .get_minimal_limit (), self .max_payload_size ) # noqa
1110
+ device_split_messages = self ._split_message (data , int ( dp_rate_limit .get_minimal_limit () ), self .max_payload_size ) # noqa
1111
1111
if attributes_format :
1112
1112
split_messages = [{'message' : msg_data , 'datapoints' : len (msg_data )} for split_message in device_split_messages for msg_data in split_message ['data' ]] # noqa
1113
1113
else :
1114
1114
split_messages = [{'message' : split_message ['data' ], 'datapoints' : split_message ['datapoints' ]} for split_message in device_split_messages ] # noqa
1115
1115
else :
1116
1116
device_data = data .get (device )
1117
- device_split_messages = self ._split_message (device_data , dp_rate_limit .get_minimal_limit (), self .max_payload_size ) # noqa
1117
+ device_split_messages = self ._split_message (device_data , int ( dp_rate_limit .get_minimal_limit () ), self .max_payload_size ) # noqa
1118
1118
if attributes_format :
1119
1119
split_messages = [{'message' : {device : msg_data }, 'datapoints' : len (msg_data )} for split_message in device_split_messages for msg_data in split_message ['data' ]] # noqa
1120
1120
else :
@@ -1457,16 +1457,23 @@ def split_and_add_chunk(chunk, chunk_datapoints):
1457
1457
current_size += chunk_size
1458
1458
return
1459
1459
1460
- max_step = datapoints_max_count if datapoints_max_count > 0 else len (keys )
1460
+ max_step = int ( datapoints_max_count ) if datapoints_max_count > 0 else len (keys )
1461
1461
if max_step < 1 :
1462
1462
max_step = 1
1463
1463
1464
1464
for i in range (0 , len (keys ), max_step ):
1465
- sub_values = {k : chunk ["values" ][k ] for k in keys [i :i + max_step ]} if "values" in chunk else {
1466
- k : chunk [k ] for k in keys [i :i + max_step ]}
1467
- sub_chunk = {"ts" : chunk .get ("ts" ), "values" : sub_values } if "values" in chunk else sub_values
1468
- if "metadata" in chunk :
1469
- sub_chunk ["metadata" ] = chunk ["metadata" ]
1465
+ sub_values = (
1466
+ {k : chunk ["values" ][k ] for k in keys [i :i + max_step ]}
1467
+ if "values" in chunk else
1468
+ {k : chunk [k ] for k in keys [i :i + max_step ]}
1469
+ )
1470
+
1471
+ if "ts" in chunk :
1472
+ sub_chunk = {"ts" : chunk ["ts" ], "values" : sub_values }
1473
+ if "metadata" in chunk :
1474
+ sub_chunk ["metadata" ] = chunk ["metadata" ]
1475
+ else :
1476
+ sub_chunk = sub_values .copy ()
1470
1477
1471
1478
sub_datapoints = len (sub_values )
1472
1479
sub_size = estimate_chunk_size (sub_chunk )
@@ -1508,14 +1515,17 @@ def flush_ts_group(ts_key, ts, metadata_repr):
1508
1515
return
1509
1516
values , _ , metadata = ts_group_cache .pop (ts_key )
1510
1517
keys = list (values .keys ())
1511
- step = datapoints_max_count if datapoints_max_count > 0 else len (keys )
1518
+ step = int ( datapoints_max_count ) if datapoints_max_count > 0 else len (keys )
1512
1519
if step < 1 :
1513
1520
step = 1
1514
1521
for i in range (0 , len (keys ), step ):
1515
1522
chunk_values = {k : values [k ] for k in keys [i :i + step ]}
1516
- chunk = {"ts" : ts , "values" : chunk_values }
1517
- if metadata :
1518
- chunk ["metadata" ] = metadata
1523
+ if ts is not None :
1524
+ chunk = {"ts" : ts , "values" : chunk_values }
1525
+ if metadata :
1526
+ chunk ["metadata" ] = metadata
1527
+ else :
1528
+ chunk = chunk_values .copy ()
1519
1529
add_chunk_to_current_message (chunk , len (chunk_values ))
1520
1530
1521
1531
for message in message_pack :
0 commit comments