|
1 |
| -import mdf_iter, canedge_browser, can_decoder |
2 |
| -from utils import setup_fs, setup_fs_s3, write_influx, load_last_run, set_last_run, print_summary |
| 1 | +from utils import setup_fs, load_dbc_files, list_log_files, SetupInflux, DataWriter |
3 | 2 | import inputs
|
4 |
| -from pathlib import Path |
5 | 3 |
|
| 4 | +# setup filesystem (local or S3), load DBC files and list log files for processing |
| 5 | +fs = setup_fs(inputs.s3, inputs.key, inputs.secret, inputs.endpoint) |
| 6 | +db_list = load_dbc_files(inputs.dbc_paths) |
| 7 | +log_files = list_log_files(fs, inputs.devices, inputs.dynamic) |
6 | 8 |
|
7 |
| -# function for loading raw CAN data from S3, DBC converting it and writing it to InfluxDB |
8 |
| -def process_data_and_write(): |
| 9 | +# initialize connection to InfluxDB |
| 10 | +influx = SetupInflux(influx_url=inputs.influx_url, token=inputs.token, org_id=inputs.org_id, influx_bucket=inputs.influx_bucket) |
9 | 11 |
|
10 |
| - # initialize DBC converter, file loader and start date |
11 |
| - db = can_decoder.load_dbc(Path(__file__).parent / inputs.dbc_path) |
12 |
| - df_decoder = can_decoder.DataFrameDecoder(db) |
13 |
| - |
14 |
| - if inputs.use_s3: |
15 |
| - fs = setup_fs_s3() |
16 |
| - else: |
17 |
| - fs = setup_fs() |
18 |
| - |
19 |
| - start = load_last_run() |
20 |
| - |
21 |
| - if inputs.use_dynamic: |
22 |
| - set_last_run() |
23 |
| - |
24 |
| - log_files = canedge_browser.get_log_files(fs, inputs.devices, start_date=start, stop_date=inputs.stop) |
25 |
| - print(f"Found {len(log_files)} log files") |
26 |
| - |
27 |
| - for log_file in log_files: |
28 |
| - # open log file, get device_id and extract dataframe with raw CAN data |
29 |
| - with fs.open(log_file, "rb") as handle: |
30 |
| - mdf_file = mdf_iter.MdfFile(handle) |
31 |
| - device_id = mdf_file.get_metadata()["HDComment.Device Information.serial number"]["value_raw"] |
32 |
| - df_raw = mdf_file.get_data_frame() |
33 |
| - |
34 |
| - # DBC convert the raw CAN dataframe |
35 |
| - df_phys = df_decoder.decode_frame(df_raw) |
36 |
| - if df_phys.empty: |
37 |
| - continue |
38 |
| - |
39 |
| - print_summary(device_id, log_file, df_phys) |
40 |
| - |
41 |
| - # group the data to enable a signal-by-signal loop |
42 |
| - df_phys_grouped = df_phys.groupby("Signal")["Physical Value"] |
43 |
| - |
44 |
| - # for each signal in your list, resample the data and write to InfluxDB |
45 |
| - for signal, group in df_phys_grouped: |
46 |
| - if signal in inputs.signals or len(inputs.signals) == 0: |
47 |
| - df_signal = group.to_frame().rename(columns={"Physical Value": signal}) |
48 |
| - print(f"Signal: {signal} (mean: {round(df_signal[signal].mean(),2)})") |
49 |
| - |
50 |
| - if inputs.res != "": |
51 |
| - cnt = len(df_signal) |
52 |
| - df_signal = df_signal.resample(inputs.res).pad().dropna() |
53 |
| - print(f"- Resampling to {inputs.res} ({cnt} --> {len(df_signal)} records)") |
54 |
| - |
55 |
| - # print(df_signal) |
56 |
| - write_influx(device_id, df_signal) |
57 |
| - |
58 |
| - return |
59 |
| - |
60 |
| - |
61 |
| -# execute the script |
62 |
| -if __name__ == "__main__": |
63 |
| - |
64 |
| - process_data_and_write() |
65 |
| - pass |
| 12 | +# process the log files and write extracted signals to InfluxDB |
| 13 | +writer = DataWriter(fs=fs, db_list=db_list, signals=inputs.signals, res=inputs.res, db_func=influx.write_influx) |
| 14 | +writer.decode_log_files(log_files) |
0 commit comments