3
3
import logging
4
4
import time
5
5
6
- from pubsub import pub # type: ignore[import-untyped]
7
-
8
- from meshtastic .protobuf import portnums_pb2
9
- from meshtastic .protobuf .powermon_pb2 import PowerStressMessage
6
+ from ..protobuf import ( portnums_pb2 , powermon_pb2 )
10
7
11
8
12
9
def onPowerStressResponse (packet , interface ):
@@ -20,7 +17,7 @@ class PowerStressClient:
20
17
The client stub for talking to the firmware PowerStress module.
21
18
"""
22
19
23
- def __init__ (self , iface , node_id = None ):
20
+ def __init__ (self , iface , node_id = None ):
24
21
"""
25
22
Create a new PowerStressClient instance.
26
23
@@ -31,14 +28,18 @@ def __init__(self, iface, node_id = None):
31
28
if not node_id :
32
29
node_id = iface .myInfo .my_node_num
33
30
34
- self .node_id = node_id
31
+ self .node_id = node_id
35
32
# No need to subscribe - because we
36
33
# pub.subscribe(onGPIOreceive, "meshtastic.receive.powerstress")
37
34
38
35
def sendPowerStress (
39
- self , cmd : PowerStressMessage .Opcode .ValueType , num_seconds : float = 0.0 , onResponse = None
36
+ self ,
37
+ cmd : powermon_pb2 .PowerStressMessage .Opcode .ValueType ,
38
+ num_seconds : float = 0.0 ,
39
+ onResponse = None ,
40
40
):
41
- r = PowerStressMessage ()
41
+ """Client goo for talking with the device side agent."""
42
+ r = powermon_pb2 .PowerStressMessage ()
42
43
r .cmd = cmd
43
44
r .num_seconds = num_seconds
44
45
@@ -49,16 +50,16 @@ def sendPowerStress(
49
50
wantAck = True ,
50
51
wantResponse = True ,
51
52
onResponse = onResponse ,
52
- onResponseAckPermitted = True
53
+ onResponseAckPermitted = True ,
53
54
)
54
55
56
+
55
57
class PowerStress :
56
58
"""Walk the UUT through a set of power states so we can capture repeatable power consumption measurements."""
57
59
58
60
def __init__ (self , iface ):
59
61
self .client = PowerStressClient (iface )
60
62
61
-
62
63
def run (self ):
63
64
"""Run the power stress test."""
64
65
# Send the power stress command
@@ -68,11 +69,13 @@ def onResponse(packet: dict): # pylint: disable=unused-argument
68
69
nonlocal gotAck
69
70
gotAck = True
70
71
71
- logging .info ("Starting power stress test, attempting to contact UUT..." )
72
- self .client .sendPowerStress (PowerStressMessage .PRINT_INFO , onResponse = onResponse )
72
+ logging .info ("Starting power stress test, attempting to contact UUT..." )
73
+ self .client .sendPowerStress (
74
+ powermon_pb2 .PowerStressMessage .PRINT_INFO , onResponse = onResponse
75
+ )
73
76
74
77
# Wait for the response
75
78
while not gotAck :
76
79
time .sleep (0.1 )
77
80
78
- logging .info ("Power stress test complete." )
81
+ logging .info ("Power stress test complete." )
0 commit comments