4
4
5
5
"""SmartIR json manager class."""
6
6
7
+ import glob
7
8
import json
8
- import time
9
+ import os
10
+ import platform
11
+ import re
12
+ import signal
13
+ import sys
9
14
from collections import namedtuple
10
15
from copy import deepcopy
11
16
from dataclasses import dataclass
12
17
from datetime import datetime
13
18
from enum import Enum
14
19
from itertools import product
15
20
from pathlib import Path
21
+ from threading import Event
16
22
from typing import Literal , Optional , Union
17
23
18
24
import click
@@ -75,11 +81,11 @@ class SkipReason:
75
81
_CombinationTupleNone = namedtuple ('_CombinationTupleNone' , ', ' .join (_combination_arguments_none )) # type: ignore
76
82
77
83
78
- def _countdown (msg : str ):
84
+ def _countdown (msg : str , event : Event ):
79
85
click .echo (msg )
80
- for i in range ( 5 , 0 , - 1 ):
81
- click . echo ( i )
82
- time . sleep ( 1 )
86
+ if event . is_set ( ):
87
+ input ( "-->> Press enter when ready <<--" )
88
+ event . clear ( )
83
89
84
90
85
91
class SmartIrManager : # pylint: disable=too-many-instance-attributes
@@ -107,7 +113,10 @@ def __init__( # pylint: disable=too-many-branches,too-many-statements
107
113
"""
108
114
self .__broadlink_manager = broadlink_mng
109
115
110
- self .__json_file_name = file_name
116
+ self .__partial_inc = 0
117
+ self .__prompt_event = Event ()
118
+ self .__prompt_event .clear ()
119
+ self .__json_file_name_path = Path (file_name )
111
120
with open (str (file_name ), "r" , encoding = 'utf-8' ) as in_file :
112
121
self .__smartir_dict = json .load (in_file )
113
122
@@ -137,6 +146,7 @@ def __init__( # pylint: disable=too-many-branches,too-many-statements
137
146
if not all (list (map (lambda x : x in self .__op_modes , no_temp_on_mode ))):
138
147
raise click .exceptions .UsageError ("no-temp-on-mode parameter is using a not-existent operating mode." )
139
148
149
+ # fill dict with all empty combination
140
150
_temp_dict = {
141
151
f"{ t } " : deepcopy ('' ) for t in range (self .__min_temp , self .__max_temp + 1 , self .__precision_temp )
142
152
}
@@ -161,6 +171,10 @@ def __init__( # pylint: disable=too-many-branches,too-many-statements
161
171
else :
162
172
_operation_dict = {f"{ o } " : deepcopy (_temp_dict ) for o in self .__op_modes } # type: ignore
163
173
self .__smartir_dict [_DictKeys .COMMANDS ].update (_operation_dict )
174
+
175
+ # overwrite combination if tmp file exist
176
+ self ._load_partial_dict ()
177
+
164
178
except KeyError as key_err :
165
179
raise click .exceptions .UsageError (f"Missing mandatory field in json file: { key_err } " ) from None
166
180
else :
@@ -169,6 +183,7 @@ def __init__( # pylint: disable=too-many-branches,too-many-statements
169
183
self .__operation_mode = ''
170
184
self .__fan_mode = ''
171
185
self .__swing_mode = ''
186
+ self ._setup_signal_handler ()
172
187
173
188
@property
174
189
def smartir_dict (self ) -> dict :
@@ -179,6 +194,22 @@ def smartir_dict(self) -> dict:
179
194
"""
180
195
return self .__smartir_dict
181
196
197
+ def _setup_signal_handler (self ):
198
+ _system = platform .system ().lower ()
199
+ signal .signal (signal .SIGINT , self ._signal_handler )
200
+ signal .signal (signal .SIGABRT , self ._signal_handler )
201
+
202
+ if _system in ('linux' , 'darwin' ):
203
+ signal .signal (signal .SIGTERM , self ._signal_handler )
204
+ if _system == 'windows' :
205
+ signal .signal (signal .SIGBREAK , self ._signal_handler ) # pylint: disable=no-member
206
+ signal .signal (signal .CTRL_C_EVENT , self ._signal_handler ) # pylint: disable=no-member
207
+ signal .signal (signal .CTRL_BREAK_EVENT , self ._signal_handler ) # pylint: disable=no-member
208
+
209
+ def _signal_handler (self , _signumber , _frame ):
210
+ self ._save_partial_dict ()
211
+ sys .exit (2 )
212
+
182
213
def _setup_combinations (self ):
183
214
_variable_args = [self .__fan_modes , self .__swing_modes ]
184
215
if all (_variable_args ):
@@ -307,6 +338,15 @@ def swing_mode(self, new_value: str) -> None:
307
338
"""
308
339
self .__swing_mode = new_value
309
340
341
+ @property
342
+ def partial_inc (self ) -> int :
343
+ """Partial increment value.
344
+
345
+ Returns:
346
+ int: last index of saved partial json files
347
+ """
348
+ return self .__partial_inc
349
+
310
350
def _set_dict_value (self , value : str ) -> None :
311
351
if _DictKeys .FAN_MODES in self .__combination_arguments :
312
352
if _DictKeys .SWING_MODES in self .__combination_arguments :
@@ -325,38 +365,100 @@ def _set_dict_value(self, value: str) -> None:
325
365
else :
326
366
self .__smartir_dict [_DictKeys .COMMANDS .value ][self .operation_mode ][self .temperature ] = value
327
367
368
+ def _get_dict_value (self ) -> str :
369
+ _value = ''
370
+ if _DictKeys .FAN_MODES in self .__combination_arguments :
371
+ if _DictKeys .SWING_MODES in self .__combination_arguments :
372
+ _value = self .__smartir_dict [_DictKeys .COMMANDS .value ][self .operation_mode ][self .fan_mode ][
373
+ self .swing_mode
374
+ ][self .temperature ]
375
+ else :
376
+ _value = self .__smartir_dict [_DictKeys .COMMANDS .value ][self .operation_mode ][self .fan_mode ][
377
+ self .temperature
378
+ ]
379
+ else :
380
+ if _DictKeys .SWING_MODES in self .__combination_arguments :
381
+ _value = self .__smartir_dict [_DictKeys .COMMANDS .value ][self .operation_mode ][self .swing_mode ][
382
+ self .temperature
383
+ ]
384
+ else :
385
+ _value = self .__smartir_dict [_DictKeys .COMMANDS .value ][self .operation_mode ][self .temperature ]
386
+ return _value
387
+
328
388
def save_dict (self ):
329
389
"""Save modified dict to output json file."""
330
390
now = datetime .now ()
331
- _modified_file_name = Path ( self .__json_file_name .parent ) .joinpath (
332
- f'{ self .__json_file_name .stem } _' f' { now .strftime ("%Y%m%d_%H%M%S" )} .json'
391
+ _modified_file_name = self .__json_file_name_path .parent .joinpath (
392
+ f'{ self .__json_file_name_path .stem } _{ now .strftime ("%Y%m%d_%H%M%S" )} .json'
333
393
)
334
394
with open (_modified_file_name , 'w' , encoding = 'utf-8' ) as out_file :
335
395
json .dump (self .__smartir_dict , out_file )
336
396
click .echo (f"Created new file { _modified_file_name } " )
397
+ _previous = glob .glob (
398
+ os .path .join (self .__json_file_name_path .parent , f'{ self .__json_file_name_path .stem } _tmp_*.json' )
399
+ )
400
+ for _file in _previous :
401
+ os .remove (_file )
402
+
403
+ def _save_partial_dict (self ):
404
+ # save with incremental 3 numbers to sort correctly when load
405
+ _modified_file_name = self .__json_file_name_path .parent .joinpath (
406
+ f'{ self .__json_file_name_path .stem } _tmp_{ self .__partial_inc :03} .json'
407
+ )
408
+
409
+ try :
410
+ _no_off = deepcopy (self .__smartir_dict )
411
+ del _no_off ['commands' ]['off' ]
412
+ except KeyError :
413
+ return
414
+ else :
415
+ with open (_modified_file_name , 'w' , encoding = 'utf-8' ) as out_file :
416
+ json .dump (_no_off [_DictKeys .COMMANDS .value ], out_file , indent = 2 )
417
+ self .__partial_inc += 1
418
+
419
+ def _load_partial_dict (self ):
420
+ _previous = glob .glob (
421
+ os .path .join (self .__json_file_name_path .parent , f'{ self .__json_file_name_path .stem } _tmp_*.json' )
422
+ )
423
+ _previous .sort ()
424
+ try :
425
+ # load last file that's the most updated
426
+ _last_file = _previous [- 1 ]
427
+ with open (str (_last_file ), "r" , encoding = 'utf-8' ) as partial_file :
428
+ self .__smartir_dict [_DictKeys .COMMANDS .value ].update (json .load (partial_file ))
429
+ _pattern = re .compile (fr'{ self .__json_file_name_path .stem } _tmp_(\d+).json' )
430
+ _res = _pattern .search (_last_file )
431
+ self .__partial_inc = int (_res .group (1 ))
432
+ except IndexError :
433
+ pass
337
434
338
435
def learn_off (self ):
339
436
"""Learn OFF command that's outside the combination.
340
437
341
438
Raises:
342
439
UsageError: if no IR signal is learnt within timeout
343
440
"""
441
+ self .__prompt_event .clear ()
344
442
_countdown (
345
- "First of all, let's learn OFF command: turn ON the remote and then turn it OFF when "
346
- "'Listening' message is on screen..."
443
+ "First of all, let's learn OFF command:\n turn ON the remote and then turn it OFF when "
444
+ "'Listening' message is on screen, or interrupt with CTRL-C..." ,
445
+ self .__prompt_event ,
347
446
)
447
+ # set event to wait for first code
448
+ self .__prompt_event .set ()
348
449
_off = self .__broadlink_manager .learn_single_code ()
349
450
if not _off :
350
451
raise click .exceptions .UsageError ("No IR signal learnt for OFF command within timeout." )
351
452
self .__smartir_dict [_DictKeys .COMMANDS .value ]["off" ] = _off
352
453
353
- def lear_all (self ): # pylint: disable=too-many-branches
454
+ def learn_all (self ): # pylint: disable=too-many-branches
354
455
"""Learn all the commands depending on calculated combination.
355
456
356
457
Raises:
357
458
UsageError: if no IR signal is learnt within timeout
358
459
"""
359
460
_previous_code = None
461
+ _previous_combination : Optional [tuple ] = None
360
462
for comb in self .__all_combinations : # pylint: disable=too-many-nested-blocks
361
463
self .operation_mode = comb .operationModes
362
464
if _DictKeys .FAN_MODES in comb ._fields :
@@ -365,6 +467,10 @@ def lear_all(self): # pylint: disable=too-many-branches
365
467
self .swing_mode = comb .swingModes
366
468
self .temperature = str (comb .temperature )
367
469
470
+ if self ._get_dict_value () != '' :
471
+ self .__prompt_event .set ()
472
+ continue
473
+
368
474
_do_skip = self ._skip_learning (comb )
369
475
if _do_skip .skip :
370
476
# must read the first temperature and then reuse the same for next combination
@@ -388,15 +494,24 @@ def lear_all(self): # pylint: disable=too-many-branches
388
494
self ._set_dict_value (_previous_code )
389
495
continue
390
496
497
+ if _previous_combination :
498
+ for i in range (0 , len (comb ) - 1 ):
499
+ if _previous_combination [i ] != comb [i ]: # pylint: disable=unsubscriptable-object
500
+ self .__prompt_event .set ()
501
+ self ._save_partial_dict ()
502
+ _previous_combination = comb
503
+
391
504
_combination_str = self ._get_combination (comb )
392
505
_countdown (
393
- f"Let's learn command of\n { _combination_str } \n "
394
- "Prepare the remote to this combination, then turn it OFF. When 'Listening' message"
395
- " is on screen, turn the remote ON to learn combination previously set..."
506
+ "-" * 30 + f"\n Let's learn IR command of\n { _combination_str } \n "
507
+ "Prepare the remote so Broadlink can listen the above combination when 'Listening' message"
508
+ " is on screen, or interrupt with CTRL-C..." ,
509
+ self .__prompt_event ,
396
510
)
397
511
_code = self .__broadlink_manager .learn_single_code ()
398
512
_previous_code = _code
399
513
if not _code :
514
+ self ._save_partial_dict ()
400
515
raise click .exceptions .UsageError (f"No IR signal learnt for { _combination_str } command within timeout." )
401
516
402
517
# swing modes must be saved because all temperature need to be listened
0 commit comments