-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmp_deye_sensor.py
139 lines (116 loc) · 5.09 KB
/
mp_deye_sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# from abc import abstractmethod
class Sensor():
"""
Models solar inverter sensor.
This is an abstract class. Method 'read_value' must be provided by the extending subclass.
"""
def __init__(self, name: str, mqtt_topic_suffix='', print_format='{:s}', groups={}):
self.name = name
self.mqtt_topic_suffix = mqtt_topic_suffix
self.print_format = print_format
self.groups = groups
# @abstractmethod
def read_value(self, registers: dict[int, int]):
"""
Reads sensor value from Modbus registers
"""
pass
def format_value(self, value):
"""
Formats sensor value using configured format string
"""
return self.print_format.format(value)
def in_any_group(self, active_groups: set[str]) -> bool:
"""
Checks if this sensor is included in at least one of the given active_groups.
Sensor matches any group when its groups set is empty (default behavior)
"""
#return not self.groups or len(active_groups.intersection(self.groups)) > 0
return not self.groups or len(active_groups & self.groups) > 0 # See: https://www.geeksforgeeks.org/intersection-function-python/
class SingleRegisterSensor(Sensor):
"""
Solar inverter sensor with value stored as 32-bit integer in a single Modbus register.
"""
def __init__(
self, name: str, reg_address: int, factor: float, offset: float = 0,
mqtt_topic_suffix='', print_format='{:0.1f}', groups={}):
super().__init__(name, mqtt_topic_suffix, print_format, groups)
self.reg_address = reg_address
self.factor = factor
self.offset = offset
def read_value(self, registers: dict[int, int]):
if self.reg_address in registers:
reg_value = registers[self.reg_address]
return int.from_bytes(reg_value, 'big') * self.factor + self.offset
else:
return None
class DoubleRegisterSensor(Sensor):
"""
Solar inverter sensor with value stored as 64-bit integer in two Modbus registers.
"""
def __init__(
self, name: str, reg_address: int, factor: float, offset: float = 0,
mqtt_topic_suffix='', print_format='{:0.1f}', groups={}):
super().__init__(name, mqtt_topic_suffix, print_format, groups)
self.reg_address = reg_address
self.factor = factor
self.offset = offset
def read_value(self, registers: dict[int, int]):
low_word_reg_address = self.reg_address
high_word_reg_address = self.reg_address + 1
if low_word_reg_address in registers and high_word_reg_address in registers:
low_word = registers[low_word_reg_address]
high_word = registers[high_word_reg_address]
return (int.from_bytes(high_word, 'big') * 65536 + int.from_bytes(low_word, 'big')) * self.factor + self.offset
else:
return None
class ComputedPowerSensor(Sensor):
"""
Electric Power sensor with value computed as multiplication of values read by voltage and current sensors.
"""
def __init__(
self, name: str, voltage_sensor: Sensor, current_sensor: Sensor, mqtt_topic_suffix='',
print_format='{:0.1f}', groups={}):
super().__init__(name, mqtt_topic_suffix, print_format, groups)
self.voltage_sensor = voltage_sensor
self.current_sensor = current_sensor
def read_value(self, registers: dict[int, int]):
voltage = self.voltage_sensor.read_value(registers)
current = self.current_sensor.read_value(registers)
if voltage is not None and current is not None:
return voltage * current
else:
return None
class ComputedSumSensor(Sensor):
"""
Computes a sum of values read by given list of sensors.
"""
def __init__(
self, name: str, sensors: list[Sensor], mqtt_topic_suffix='',
print_format='{:0.1f}', groups={}):
super().__init__(name, mqtt_topic_suffix, print_format, groups)
self.sensors = sensors
def read_value(self, registers: dict[int, int]):
result = 0
sensor_values = [s.read_value(registers) for s in self.sensors]
for value in sensor_values:
if value is None:
return None
result += value
return result