Skip to content

Commit 936cd7d

Browse files
author
Martin
committed
Added basic tag example to the code
1 parent 307c0f0 commit 936cd7d

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ df_phys = proc.extract_phys(df_raw, tp_type=tp_type)
9494
...
9595
```
9696

97+
### Add InfluxDB tags
98+
You can add tags to your data when using InfluxDB. This effectively adds additional dimensions to your data that you can e.g. use to color timeseries based on events or to further segment your queries when visualizing the data. The `utils_db.py` contains a basic example via the `add_signal_tags` functions that you can use as outset for building your own logic.
99+
97100
---
98101
### Regarding InfluxDB and S3 usage costs
99102
Note that if you use the paid InfluxDB cloud and a paid S3 server, we recommend that you monitor usage during your tests early on to ensure that no unexpected cost developments occur.

dashboard-writer/utils_db.py

+22-5
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,30 @@ def get_start_times(self, devices, default_start, dynamic):
4949

5050
return start_times
5151

52+
def add_signal_tags(self, df_signal):
53+
"""Advanced: This can be used to add custom tags to the signals
54+
based on a specific use case logic. In effect, this will
55+
split the signal into multiple timeseries
56+
"""
57+
tag_columns = ["my_tag"]
58+
59+
def event_test(row):
60+
return "event" if row[0] > 1200 else "no event"
61+
62+
for tag in tag_columns:
63+
df_signal[tag] = df_signal.apply(lambda row: event_test(row), axis=1)
64+
65+
return tag_columns, df_signal
66+
5267
def write_signals(self, device_id, df_phys):
5368
"""Given a device ID and a dataframe of physical values,
5469
resample and write each signal to a time series database
5570
5671
:param device_id: ID of device (used as the 'measurement name')
5772
:param df_phys: Dataframe of physical values (e.g. as per output of can_decoder)
5873
"""
74+
tag_columns = []
75+
5976
if not df_phys.empty:
6077
for signal, group in df_phys.groupby("Signal")["Physical Value"]:
6178
df_signal = group.to_frame().rename(columns={"Physical Value": signal})
@@ -68,9 +85,11 @@ def write_signals(self, device_id, df_phys):
6885
f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)} | records: {len(df_signal)} | resampling: {self.res})"
6986
)
7087

71-
self.write_influx(device_id, df_signal)
88+
# tag_columns, df_signal = self.add_signal_tags(df_signal)
7289

73-
def write_influx(self, name, df):
90+
self.write_influx(device_id, df_signal, tag_columns)
91+
92+
def write_influx(self, name, df, tag_columns):
7493
"""Helper function to write signal dataframes to InfluxDB
7594
"""
7695
from influxdb_client import WriteOptions
@@ -83,9 +102,7 @@ def write_influx(self, name, df):
83102
write_options=WriteOptions(batch_size=5000, flush_interval=1_000, jitter_interval=2_000, retry_interval=5_000,)
84103
)
85104

86-
_write_client.write(
87-
self.influx_bucket, record=df, data_frame_measurement_name=name,
88-
)
105+
_write_client.write(self.influx_bucket, record=df, data_frame_measurement_name=name, data_frame_tag_columns=tag_columns)
89106

90107
if self.verbose:
91108
print(f"- SUCCESS: {len(df.index)} records of {name} written to InfluxDB\n\n")

0 commit comments

Comments
 (0)