|
1 | 1 | """Classes for logging power consumption of meshtastic devices."""
|
2 | 2 |
|
3 | 3 | import logging
|
| 4 | +import threading |
| 5 | +import time |
4 | 6 | from typing import Optional
|
5 | 7 |
|
6 | 8 | from ppk2_api import ppk2_api # type: ignore[import-untyped]
|
@@ -31,27 +33,70 @@ def __init__(self, portName: Optional[str] = None):
|
31 | 33 |
|
32 | 34 | self.r = r = ppk2_api.PPK2_MP(portName) # serial port will be different for you
|
33 | 35 | r.get_modifiers()
|
34 |
| - self.r.start_measuring() # start measuring |
35 | 36 |
|
36 |
| - logging.info("Connected to PPK2 power supply") |
| 37 | + self.r.start_measuring() # send command to ppk2 |
| 38 | + self.current_measurements = [0.0] # reset current measurements to 0mA |
| 39 | + self.measuring = True |
| 40 | + |
| 41 | + self.measurement_thread = threading.Thread( |
| 42 | + target=self.measurement_loop, daemon=True, name="ppk2 measurement" |
| 43 | + ) |
| 44 | + self.measurement_thread.start() |
| 45 | + |
| 46 | + logging.info("Connected to Power Profiler Kit II (PPK2)") |
37 | 47 |
|
38 | 48 | super().__init__() # we call this late so that the port is already open and _getRawWattHour callback works
|
39 | 49 |
|
| 50 | + def measurement_loop(self): |
| 51 | + """Endless measurement loop will run in a thread.""" |
| 52 | + while self.measuring: |
| 53 | + read_data = self.r.get_data() |
| 54 | + if read_data != b"": |
| 55 | + samples, _ = self.r.get_samples(read_data) |
| 56 | + self.current_measurements += samples |
| 57 | + time.sleep(0.001) # FIXME figure out correct sleep duration |
| 58 | + |
| 59 | + def get_min_current_mA(self): |
| 60 | + """Returns max current in mA (since last call to this method).""" |
| 61 | + return min(self.current_measurements) / 1000 |
| 62 | + |
| 63 | + def get_max_current_mA(self): |
| 64 | + """Returns max current in mA (since last call to this method).""" |
| 65 | + return max(self.current_measurements) / 1000 |
| 66 | + |
| 67 | + def get_average_current_mA(self): |
| 68 | + """Returns average current in mA (since last call to this method).""" |
| 69 | + average_current_mA = ( |
| 70 | + sum(self.current_measurements) / len(self.current_measurements) |
| 71 | + ) / 1000 # measurements are in microamperes, divide by 1000 |
| 72 | + |
| 73 | + return average_current_mA |
| 74 | + |
| 75 | + def reset_measurements(self): |
| 76 | + """Reset current measurements.""" |
| 77 | + # Use the last reading as the new only reading (to ensure we always have a valid current reading) |
| 78 | + self.current_measurements = [ self.current_measurements[-1] ] |
| 79 | + |
40 | 80 | def close(self) -> None:
|
41 | 81 | """Close the power meter."""
|
42 |
| - self.r.stop_measuring() |
| 82 | + self.measuring = False |
| 83 | + self.r.stop_measuring() # send command to ppk2 |
| 84 | + self.measurement_thread.join() # wait for our thread to finish |
43 | 85 | super().close()
|
44 | 86 |
|
45 | 87 | def setIsSupply(self, s: bool):
|
46 | 88 | """If in supply mode we will provide power ourself, otherwise we are just an amp meter."""
|
| 89 | + |
| 90 | + self.r.set_source_voltage( |
| 91 | + int(self.v * 1000) |
| 92 | + ) # set source voltage in mV BEFORE setting source mode |
| 93 | + # Note: source voltage must be set even if we are using the amp meter mode |
| 94 | + |
47 | 95 | if (
|
48 | 96 | not s
|
49 | 97 | ): # min power outpuf of PPK2. If less than this assume we want just meter mode.
|
50 | 98 | self.r.use_ampere_meter()
|
51 | 99 | else:
|
52 |
| - self.r.set_source_voltage( |
53 |
| - int(self.v * 1000) |
54 |
| - ) # set source voltage in mV BEFORE setting source mode |
55 | 100 | self.r.use_source_meter() # set source meter mode
|
56 | 101 |
|
57 | 102 | def powerOn(self):
|
|
0 commit comments