Skip to content

Commit ebde1dd

Browse files
committed
switch to functional design
1 parent 42657bc commit ebde1dd

File tree

1 file changed

+89
-79
lines changed

1 file changed

+89
-79
lines changed

Diff for: src/phyto_arm/src/digital_logger_node.py

+89-79
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,109 @@
11
#!/usr/bin/env python3
2+
"""
3+
Functionality for monitoring and controlling a Digital Loggers Web Power Switch Pro model.
4+
"""
25
import base64
36
import urllib.request
47

58
import rospy
69

710
from phyto_arm.msg import OutletStatus
811

12+
# Global variables relevant to digital logger control. These variables are defined once the digital
13+
# logger node is initialized.
14+
AUTH = ''
15+
ADDRESS = ''
16+
OUTLETS = ''
17+
OUTLET_NAMES = ''
918

10-
class DigitalLogger:
11-
def __init__(self):
12-
rospy.init_node('digital_logger')
1319

14-
# subscribe to the digital logger control topic
15-
rospy.Subscriber('/digital_logger/control', OutletStatus, self.control_outlet)
20+
def control_outlet(msg):
21+
"""
22+
Send the given msg to the digital logger as an HTTP request.
23+
"""
1624

17-
self.username = rospy.get_param('~username')
18-
self.password = rospy.get_param('~password')
19-
self.address = rospy.get_param('~address')
20-
self.outlets = rospy.get_param('~outlets')
21-
self.outlet_names = {outlet['name']: int(outlet['outlet']) for outlet in self.outlets}
25+
outlet_num = OUTLET_NAMES.get(msg.name)
2226

23-
self.auth = f'Basic {base64.b64encode(f"{self.username}:{self.password}".encode()).decode()}'
27+
data = f'value={str(msg.is_active).lower()}'
28+
url = f'http://{ADDRESS}/restapi/relay/outlets/{outlet_num}/state/'
2429

25-
self.outlet_publishers = []
26-
for outlet_num, _ in enumerate(self.outlets):
27-
self.outlet_publishers.append(rospy.Publisher(f'/digital_logger/outlet/{outlet_num}/status/', OutletStatus, queue_size=10))
30+
# Create a PUT request
31+
req = urllib.request.Request(url, data=data.encode("utf-8"), method="PUT")
32+
req.add_header("Authorization", AUTH)
33+
req.add_header("X-CSRF", 'x')
2834

29-
# Monitor outlets at 1Hz
30-
self.rate = rospy.Rate(1)
31-
32-
def run(self):
33-
"""
34-
Run the digital logger node. Publishes outlet statuses at
35-
/digital_logger/outlets/{outlet num}/status. The outlets can be controlled by publishing a
36-
OutletStatus message to /digital_logger/control.
37-
"""
38-
while not rospy.is_shutdown():
39-
# send a status request for each available outlet
40-
for outlet_index, _ in enumerate(self.outlets):
41-
# Construct request
42-
url = f'http://{self.address}/restapi/relay/outlets/{outlet_index}/state/'
43-
request = urllib.request.Request(url)
44-
request.add_header("Authorization", self.auth)
45-
46-
try:
47-
# Send the GET request
48-
with urllib.request.urlopen(request) as response:
49-
# Read and decode the response
50-
response_data = response.read().decode('utf-8')
51-
except urllib.error.HTTPError as http_err:
52-
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
53-
except urllib.error.URLError as url_err:
54-
raise ValueError(f"URL Error: {url_err.reason}") from url_err
55-
56-
# publish the status of each outlet to its specific topic
57-
outlet_status = OutletStatus()
58-
outlet_status.name = self.outlets[outlet_index]['name']
59-
# DL API uses 'true' and 'false' to denote outlet status, which need to be converted to Python bools
60-
outlet_status.is_active = response_data == 'true'
61-
62-
self.outlet_publishers[outlet_index].publish(outlet_status)
63-
64-
self.rate.sleep()
65-
66-
def control_outlet(self, msg):
67-
"""
68-
Send the given msg to the digital logger as an HTTP request.
69-
"""
70-
71-
outlet_num = self.outlet_names.get(msg.name)
72-
73-
data = f'value={str(msg.is_active).lower()}'
74-
url = f'http://{self.address}/restapi/relay/outlets/{outlet_num}/state/'
75-
76-
# Create a PUT request
77-
req = urllib.request.Request(url, data=data.encode("utf-8"), method="PUT")
78-
req.add_header("Authorization", self.auth)
79-
req.add_header("X-CSRF", 'x')
80-
81-
try:
82-
# Send the PUT request
83-
response = urllib.request.urlopen(req)
84-
except urllib.error.HTTPError as http_err:
85-
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
86-
except urllib.error.URLError as url_err:
87-
raise ValueError(f"URL Error: {url_err.reason}") from url_err
88-
89-
result = response.read().decode('utf-8')
90-
91-
rospy.loginfo(f'sent status={str(msg.is_active).lower()} to {self.address}:{url}, received: code {response.status} : {result}')
35+
try:
36+
# Send the PUT request
37+
response = urllib.request.urlopen(req)
38+
except urllib.error.HTTPError as http_err:
39+
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
40+
except urllib.error.URLError as url_err:
41+
raise ValueError(f"URL Error: {url_err.reason}") from url_err
42+
43+
result = response.read().decode('utf-8')
44+
45+
rospy.loginfo(f'sent status={str(msg.is_active).lower()} to {ADDRESS}:{url}, received: code {response.status} : {result}')
46+
47+
48+
def run_digital_logger_node():
49+
"""
50+
Run the digital logger node. Publishes outlet statuses at
51+
/digital_logger/OUTLETS/{outlet num}/status. The OUTLETS can be controlled by publishing a
52+
OutletStatus message to /digital_logger/control.
53+
"""
54+
global AUTH, ADDRESS, OUTLETS, OUTLET_NAMES
55+
56+
rospy.init_node('digital_logger')
57+
58+
rospy.init_node('digital_logger')
59+
username = rospy.get_param('~username')
60+
password = rospy.get_param('~password')
61+
AUTH = f"Basic {base64.b64encode(f'{username}:{password}'.encode()).decode()}"
62+
ADDRESS = rospy.get_param('~address')
63+
OUTLETS = rospy.get_param('~outlets')
64+
OUTLET_NAMES = {outlet['name']: int(outlet['outlet']) for outlet in OUTLETS}
65+
66+
# subscribe to the digital logger control topic
67+
rospy.Subscriber('/digital_logger/control', OutletStatus, control_outlet)
68+
69+
outlet_publishers = []
70+
for outlet_num, _ in enumerate(OUTLETS):
71+
outlet_publishers.append(rospy.Publisher(f'/digital_logger/outlet/{outlet_num}/status/', OutletStatus, queue_size=10))
72+
73+
# Monitor outlets at 1Hz
74+
rate = rospy.Rate(1)
75+
76+
while not rospy.is_shutdown():
77+
# send a status request for each available outlet
78+
for outlet_index, _ in enumerate(OUTLETS):
79+
# Construct request
80+
url = f'http://{ADDRESS}/restapi/relay/outlets/{outlet_index}/state/'
81+
request = urllib.request.Request(url)
82+
request.add_header("Authorization", AUTH)
83+
84+
try:
85+
# Send the GET request
86+
with urllib.request.urlopen(request) as response:
87+
# Read and decode the response
88+
response_data = response.read().decode('utf-8')
89+
except urllib.error.HTTPError as http_err:
90+
raise ValueError(f"HTTP Error: {http_err.code} : {http_err.reason}") from http_err
91+
except urllib.error.URLError as url_err:
92+
raise ValueError(f"URL Error: {url_err.reason}") from url_err
93+
94+
# publish the status of each outlet to its specific topic
95+
outlet_status = OutletStatus()
96+
outlet_status.name = OUTLETS[outlet_index]['name']
97+
# DL API uses 'true' and 'false' to denote outlet status, which need to be converted to Python booleans
98+
outlet_status.is_active = response_data == 'true'
99+
100+
outlet_publishers[outlet_index].publish(outlet_status)
101+
102+
rate.sleep()
92103

93104

94105
if __name__ == '__main__':
95106
try:
96-
digital_logger = DigitalLogger()
97-
digital_logger.run()
107+
run_digital_logger_node()
98108
except rospy.ROSInterruptException:
99109
pass

0 commit comments

Comments
 (0)