-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpi_zero_thermostat.py
229 lines (181 loc) · 6.83 KB
/
pi_zero_thermostat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
import json
import datetime
from w1thermsensor import W1ThermSensor
from rich.console import Console
from convenience.conversions import *
from convenience.radio import *
from convenience.display import Display
from convenience.display import Colors
from thermostat import Thermostat
import os.path
topic = 'home-assistant/thermostat'
local_temp_topic = f'{topic}/local_temperature'
remote_temp_topic = f'{topic}/remote_temperature'
import logging
from rich.logging import RichHandler
# Set up logging. This makes a file that the logger will write to
logfile_console = Console(
file=open(f'logfiles/thermostat_{time.strftime("%m_%d_%Y-%H_%M")}.log', "a"),
log_time_format="[%x_%X]",
)
# By specifying two loggers here, I can display some stuff to the console, and other stuff to a logfile.
# Props to https://stackoverflow.com/a/11784984
FORMAT = "%(message)s"
# noinspection PyArgumentList
logging.basicConfig(
format=FORMAT,
level=logging.DEBUG,
datefmt="[%x %X]",
handlers=[RichHandler(), RichHandler(console=logfile_console)],
)
TEMP_LEVEL_NUM = 5
logging.addLevelName(TEMP_LEVEL_NUM, "TEMP")
def temp_log(self, message, *args, **kws):
if self.isEnabledFor(TEMP_LEVEL_NUM):
self._log(TEMP_LEVEL_NUM, message, args, **kws)
def update_thermostat_schedule(_thermostat, _schedule_path, last_updated=None):
file_update = os.path.getmtime(_schedule_path)
if last_updated is not None:
need_update = file_update > last_updated
else:
need_update = True
if need_update:
_thermostat.schedule.clear_ranges()
with open(_schedule_path, "r") as schedule_file:
schedules = json.load(schedule_file)["schedules"]
for schedule in schedules:
_thermostat.schedule.add_range(
[*schedule["temp_range"], schedule["hysteresis"]],
[datetime.time(*schedule["start"]), datetime.time(*schedule["end"])],
)
return file_update
logging.Logger.temp = temp_log
log = logging.getLogger()
thermostat_thermometer_calibration = (38, 211.4)
remote_thermometer_calibration = (40, 210.76)
pipes = [[0xC2, 0xC2, 0xC2, 0xC2, 0xC2], [0xF0, 0xF0, 0xF0, 0xF0, 0xF0]]
if __name__ == "__main__":
display = Display(
port=0,
cs=1,
dc=20,
rst=21,
rotation=90,
offset_top=3,
)
last_modified_time = -1
# Initialize thermostat
thermostat = Thermostat(temp_control_pin=22, temp_select_pin=18, fan_pin=22)
schedule_path = 'schedule.json'
last_file_update = update_thermostat_schedule(_thermostat=thermostat, _schedule_path=schedule_path)
thermostat.all_off()
# Initialize radio
radio2 = open_radio(
ce_pin=19,
csn_pin=0,
pa_level=NRF24.PA_LOW,
datarate=NRF24.BR_2MBPS,
write_pipe=pipes[1],
read_pipe=pipes[0],
crc=NRF24.CRC_16,
)
log.info("Entering main loop")
sensor = W1ThermSensor()
invalid_responses = 0
while True:
raw_local_temp = to_fahrenheit(sensor.get_temperature())
calibrated_local_temp = calibrate_temp(
raw_local_temp, *thermostat_thermometer_calibration
)
thermostat.local_temp = calibrated_local_temp
thermostat.mqtt_client.publish(local_temp_topic, f"{thermostat.local_temp:.1f}")
log.info("Waiting on temp")
retries = 0
while True:
raw_remote_temp = 0.0
calibrated_remote_temp = 0.0
try:
raw_remote_temp = get_remote_temp(radio2)
calibrated_remote_temp = calibrate_temp(
raw_remote_temp, *remote_thermometer_calibration
)
recv_temp = calibrated_remote_temp
thermostat.mqtt_client.publish(remote_temp_topic, f"{recv_temp:.1f}")
break
except TimeoutError:
# If you fail to read the radio, reset everything and try again a couple times.
thermostat.mqtt_client.publish(remote_temp_topic, "")
log.error("Failed to get response - trying again...")
radio2.stopListening()
radio2.flush_rx()
radio2.flush_tx()
radio2.end()
radio2 = open_radio(
ce_pin=19,
csn_pin=0,
pa_level=NRF24.PA_LOW,
datarate=NRF24.BR_2MBPS,
write_pipe=pipes[1],
read_pipe=pipes[0],
crc=NRF24.CRC_16,
)
radio2.startListening()
retries += 1
if retries > 2:
log.error("Couldn't get temperature. Falling back to local.")
recv_temp = calibrated_local_temp
break
if 100 > recv_temp > 0:
invalid_responses = 0
cur_temp = recv_temp
log.info(
f"Remote| Current: {raw_remote_temp:.2f}. Calibrated: {calibrated_remote_temp:.2f}"
)
log.info(
f"Local| Current: {raw_local_temp:.2f}. Calibrated: {calibrated_local_temp:.2f}"
)
# Check the schedule file for changes
last_file_update = update_thermostat_schedule(_thermostat=thermostat, _schedule_path=schedule_path,
last_updated=last_file_update)
# Only update thermostat state if we're actually getting input
thermostat.update_state(current_temp=cur_temp)
else:
invalid_responses += 1
continue
if invalid_responses > 100:
log.warning(
">100 invalid responses received. Turning off thermostat until I get valid responses."
)
thermostat.all_off()
log.temp(
f"{time.time():.0f},{calibrated_remote_temp:.2f},{calibrated_local_temp:.2f}"
)
time.sleep(1.0)
status = "NONE"
if thermostat.heating:
status = "HEAT"
stat_color = Colors.RED
elif thermostat.cooling:
status = "COOL"
stat_color = Colors.BLUE
else:
status = "OFF"
stat_color = Colors.WHITE
# Make the temperature color yellow if you had to fall back to local temp
if retries > 2:
cur_color = Colors.YELLOW
else:
cur_color = Colors.WHITE
display.update_thermostat_display(
low_temp=thermostat.low_temp,
high_temp=thermostat.high_temp,
local_temp=calibrated_local_temp,
cur_temp=cur_temp,
status=status,
last_update=datetime.datetime.now().time(),
cur_color=cur_color,
stat_color=stat_color,
)