Skip to content

Commit 48ad0f6

Browse files
committed
Adjust message splitting and grouping
1 parent 5f8c8e9 commit 48ad0f6

File tree

1 file changed

+119
-41
lines changed

1 file changed

+119
-41
lines changed

tb_device_mqtt.py

Lines changed: 119 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,72 +1410,150 @@ def _split_message(message_pack, datapoints_max_count, max_payload_size):
14101410
if not isinstance(message_pack, list):
14111411
message_pack = [message_pack]
14121412

1413-
datapoints_max_count = max(datapoints_max_count - 1, 0)
1414-
1415-
append_split_message = split_messages.append
1416-
1417-
# Group cache key = (ts, metadata_repr or None)
1418-
ts_group_cache = {}
1419-
14201413
def _get_metadata_repr(metadata):
14211414
if isinstance(metadata, dict):
14221415
return tuple(sorted(metadata.items()))
14231416
return None
14241417

1425-
def flush_ts_group(ts_key):
1426-
if ts_key in ts_group_cache:
1427-
ts, metadata_repr = ts_key
1428-
values, size, metadata = ts_group_cache.pop(ts_key)
1429-
if ts is not None:
1430-
chunk = {"ts": ts, "values": values}
1431-
if metadata:
1432-
chunk["metadata"] = metadata
1433-
else:
1434-
chunk = values # Raw mode, no ts
1418+
def estimate_chunk_size(chunk):
1419+
if isinstance(chunk, dict) and "values" in chunk:
1420+
size = sum(len(str(k)) + len(str(v)) for k, v in chunk["values"].items())
1421+
size += len(str(chunk.get("ts", "")))
1422+
if "metadata" in chunk:
1423+
size += sum(len(str(k)) + len(str(v)) for k, v in chunk["metadata"].items())
1424+
return size + 40
1425+
elif isinstance(chunk, dict):
1426+
return sum(len(str(k)) + len(str(v)) for k, v in chunk.items()) + 20
1427+
else:
1428+
return len(str(chunk)) + 20
14351429

1436-
message = {
1437-
"data": [chunk],
1438-
"datapoints": len(values)
1439-
}
1440-
append_split_message(message)
14411430

1442-
for message_index, message in enumerate(message_pack):
1431+
ts_group_cache = {}
1432+
current_message = {"data": [], "datapoints": 0}
1433+
current_datapoints = 0
1434+
current_size = 0
1435+
1436+
def flush_current_message():
1437+
nonlocal current_message, current_datapoints, current_size
1438+
if current_message["data"]:
1439+
split_messages.append(current_message)
1440+
current_message = {"data": [], "datapoints": 0}
1441+
current_datapoints = 0
1442+
current_size = 0
1443+
1444+
def split_and_add_chunk(chunk, chunk_datapoints):
1445+
nonlocal current_message, current_datapoints, current_size
1446+
chunk_size = estimate_chunk_size(chunk)
1447+
1448+
if (datapoints_max_count > 0 and current_datapoints + chunk_datapoints > datapoints_max_count) or \
1449+
(current_size + chunk_size > max_payload_size):
1450+
flush_current_message()
1451+
1452+
if chunk_datapoints > datapoints_max_count > 0 or chunk_size > max_payload_size:
1453+
keys = list(chunk["values"].keys()) if "values" in chunk else list(chunk.keys())
1454+
if len(keys) == 1:
1455+
current_message["data"].append(chunk)
1456+
current_message["datapoints"] += chunk_datapoints
1457+
current_size += chunk_size
1458+
return
1459+
1460+
max_step = datapoints_max_count if datapoints_max_count > 0 else len(keys)
1461+
if max_step < 1:
1462+
max_step = 1
1463+
1464+
for i in range(0, len(keys), max_step):
1465+
sub_values = {k: chunk["values"][k] for k in keys[i:i + max_step]} if "values" in chunk else {
1466+
k: chunk[k] for k in keys[i:i + max_step]}
1467+
sub_chunk = {"ts": chunk.get("ts"), "values": sub_values} if "values" in chunk else sub_values
1468+
if "metadata" in chunk:
1469+
sub_chunk["metadata"] = chunk["metadata"]
1470+
1471+
sub_datapoints = len(sub_values)
1472+
sub_size = estimate_chunk_size(sub_chunk)
1473+
1474+
if sub_size > max_payload_size:
1475+
current_message["data"].append(sub_chunk)
1476+
current_message["datapoints"] += sub_datapoints
1477+
current_size += sub_size
1478+
continue
1479+
1480+
split_and_add_chunk(sub_chunk, sub_datapoints)
1481+
return
1482+
1483+
current_message["data"].append(chunk)
1484+
current_message["datapoints"] += chunk_datapoints
1485+
current_size += chunk_size
1486+
1487+
def add_chunk_to_current_message(chunk, chunk_datapoints):
1488+
nonlocal current_message, current_datapoints, current_size
1489+
chunk_size = estimate_chunk_size(chunk)
1490+
1491+
if (datapoints_max_count > 0 and chunk_datapoints > datapoints_max_count) or chunk_size > max_payload_size:
1492+
split_and_add_chunk(chunk, chunk_datapoints)
1493+
return
1494+
1495+
if (datapoints_max_count > 0 and current_datapoints + chunk_datapoints > datapoints_max_count) or \
1496+
(current_size + chunk_size > max_payload_size):
1497+
flush_current_message()
1498+
1499+
current_message["data"].append(chunk)
1500+
current_message["datapoints"] += chunk_datapoints
1501+
current_size += chunk_size
1502+
1503+
if datapoints_max_count > 0 and current_message["datapoints"] == datapoints_max_count:
1504+
flush_current_message()
1505+
1506+
def flush_ts_group(ts_key, ts, metadata_repr):
1507+
if ts_key not in ts_group_cache:
1508+
return
1509+
values, _, metadata = ts_group_cache.pop(ts_key)
1510+
keys = list(values.keys())
1511+
step = datapoints_max_count if datapoints_max_count > 0 else len(keys)
1512+
if step < 1:
1513+
step = 1
1514+
for i in range(0, len(keys), step):
1515+
chunk_values = {k: values[k] for k in keys[i:i + step]}
1516+
chunk = {"ts": ts, "values": chunk_values}
1517+
if metadata:
1518+
chunk["metadata"] = metadata
1519+
add_chunk_to_current_message(chunk, len(chunk_values))
1520+
1521+
for message in message_pack:
14431522
if not isinstance(message, dict):
1444-
log.error("Message is not a dictionary!")
1445-
log.debug("Message: %s", message)
14461523
continue
14471524

1448-
ts = message.get("ts")
1449-
values = message.get("values", message)
1450-
metadata = message.get("metadata") if "metadata" in message and isinstance(message["metadata"],
1451-
dict) else None
1452-
metadata_repr = _get_metadata_repr(metadata)
1525+
ts = message.get("ts", None)
1526+
metadata = message.get("metadata") if isinstance(message.get("metadata"), dict) else None
1527+
values = message.get("values") if isinstance(message.get("values"), dict) else \
1528+
message if isinstance(message, dict) else {}
14531529

1530+
metadata_repr = _get_metadata_repr(metadata)
14541531
ts_key = (ts, metadata_repr)
14551532

1456-
for data_key, value in values.items():
1457-
data_key_size = len(data_key) + len(str(value))
1458-
1533+
for key, value in values.items():
1534+
pair_size = len(str(key)) + len(str(value)) + 4
14591535
if ts_key not in ts_group_cache:
14601536
ts_group_cache[ts_key] = ({}, 0, metadata)
14611537

1462-
ts_values, current_size, current_metadata = ts_group_cache[ts_key]
1538+
group_values, group_size, group_metadata = ts_group_cache[ts_key]
14631539

14641540
can_add = (
1465-
(datapoints_max_count == 0 or len(ts_values) < datapoints_max_count)
1466-
and (current_size + data_key_size < max_payload_size)
1541+
(datapoints_max_count == 0 or len(group_values) < datapoints_max_count) and
1542+
(group_size + pair_size <= max_payload_size)
14671543
)
14681544

14691545
if can_add:
1470-
ts_values[data_key] = value
1471-
ts_group_cache[ts_key] = (ts_values, current_size + data_key_size, metadata)
1546+
group_values[key] = value
1547+
ts_group_cache[ts_key] = (group_values, group_size + pair_size, group_metadata)
14721548
else:
1473-
flush_ts_group(ts_key)
1474-
ts_group_cache[ts_key] = ({data_key: value}, data_key_size, metadata)
1549+
flush_ts_group(ts_key, ts, metadata_repr)
1550+
ts_group_cache[ts_key] = ({key: value}, pair_size, metadata)
14751551

14761552
for ts_key in list(ts_group_cache.keys()):
1477-
flush_ts_group(ts_key)
1553+
ts, metadata_repr = ts_key
1554+
flush_ts_group(ts_key, ts, metadata_repr)
14781555

1556+
flush_current_message()
14791557
return split_messages
14801558

14811559
@staticmethod

0 commit comments

Comments
 (0)