Skip to content

Commit 0295428

Browse files
committed
fix more typos
1 parent b368065 commit 0295428

File tree

7 files changed

+94
-125
lines changed

7 files changed

+94
-125
lines changed

snap7/common.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ def _raise_error() -> NoReturn:
2424
error = f"""can't find snap7 shared library.
2525
2626
This probably means you are installing python-snap7 from source. When no binary wheel is found for you architecture, pip
27-
install falls back on a source install. For this to work, you need to manually install the snap7 library, which python-snap7
28-
uses under the hood.
27+
install falls back on a source install. For this to work, you need to manually install the snap7 library, which
28+
python-snap7 uses under the hood.
2929
3030
The shortest path to success is to try to get a binary wheel working. Probably you are running on an unsupported
3131
platform or python version. You are running:

snap7/logo.py

+42-64
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,35 @@
1515
logger = logging.getLogger(__name__)
1616

1717

18+
def parse_address(vm_address: str) -> tuple[int, WordLen]:
19+
logger.debug(f"read, vm_address:{vm_address}")
20+
if re.match(r"V[0-9]{1,4}\.[0-7]", vm_address):
21+
logger.info(f"read, Bit address: {vm_address}")
22+
address = vm_address[1:].split(".")
23+
# transform string to int
24+
address_byte = int(address[0])
25+
address_bit = int(address[1])
26+
start = (address_byte * 8) + address_bit
27+
return start, WordLen.Bit
28+
elif re.match("V[0-9]+", vm_address):
29+
# byte value
30+
logger.info(f"Byte address: {vm_address}")
31+
start = int(vm_address[1:])
32+
return start, WordLen.Byte
33+
elif re.match("VW[0-9]+", vm_address):
34+
# byte value
35+
logger.info(f"Word address: {vm_address}")
36+
start = int(vm_address[2:])
37+
return start, WordLen.Word
38+
elif re.match("VD[0-9]+", vm_address):
39+
# byte value
40+
logger.info(f"DWord address: {vm_address}")
41+
start = int(vm_address[2:])
42+
return start, WordLen.DWord
43+
else:
44+
raise ValueError("Unknown address format")
45+
46+
1847
class Logo(Client):
1948
"""
2049
A snap7 Siemens Logo client:
@@ -61,35 +90,8 @@ def read(self, vm_address: str) -> int:
6190
area = Area.DB
6291
db_number = 1
6392
size = 1
64-
wordlen: WordLen
6593
logger.debug(f"read, vm_address:{vm_address}")
66-
if re.match(r"V[0-9]{1,4}\.[0-7]", vm_address):
67-
# bit value
68-
logger.info(f"read, Bit address: {vm_address}")
69-
address = vm_address[1:].split(".")
70-
# transform string to int
71-
address_byte = int(address[0])
72-
address_bit = int(address[1])
73-
start = (address_byte * 8) + address_bit
74-
wordlen = WordLen.Bit
75-
elif re.match("V[0-9]+", vm_address):
76-
# byte value
77-
logger.info(f"Byte address: {vm_address}")
78-
start = int(vm_address[1:])
79-
wordlen = WordLen.Byte
80-
elif re.match("VW[0-9]+", vm_address):
81-
# byte value
82-
logger.info(f"Word address: {vm_address}")
83-
start = int(vm_address[2:])
84-
wordlen = WordLen.Word
85-
elif re.match("VD[0-9]+", vm_address):
86-
# byte value
87-
logger.info(f"DWord address: {vm_address}")
88-
start = int(vm_address[2:])
89-
wordlen = WordLen.DWord
90-
else:
91-
logger.info("Unknown address format")
92-
return 0
94+
start, wordlen = parse_address(vm_address)
9395

9496
type_ = wordlen.ctype
9597
data = (type_ * size)()
@@ -121,53 +123,29 @@ def write(self, vm_address: str, value: int) -> int:
121123
"""
122124
area = Area.DB
123125
db_number = 1
124-
amount = 1
125-
wordlen: WordLen
126-
logger.debug(f"write, vm_address:{vm_address}, value:{value}")
127-
if re.match(r"^V[0-9]{1,4}\.[0-7]$", vm_address):
128-
# bit value
129-
logger.info(f"read, Bit address: {vm_address}")
130-
address = vm_address[1:].split(".")
131-
# transform string to int
132-
address_byte = int(address[0])
133-
address_bit = int(address[1])
134-
start = (address_byte * 8) + address_bit
135-
wordlen = WordLen.Bit
126+
size = 1
127+
start, wordlen = parse_address(vm_address)
128+
type_ = wordlen.ctype
129+
130+
if wordlen == WordLen.Bit:
131+
type_ = WordLen.Byte.ctype
136132
if value > 0:
137133
data = bytearray([1])
138134
else:
139135
data = bytearray([0])
140-
elif re.match("^V[0-9]+$", vm_address):
141-
# byte value
142-
logger.info(f"Byte address: {vm_address}")
143-
start = int(vm_address[1:])
144-
wordlen = WordLen.Byte
136+
elif wordlen == WordLen.Byte:
145137
data = bytearray(struct.pack(">B", value))
146-
elif re.match("^VW[0-9]+$", vm_address):
147-
# byte value
148-
logger.info(f"Word address: {vm_address}")
149-
start = int(vm_address[2:])
150-
wordlen = WordLen.Word
138+
elif wordlen == WordLen.Word:
151139
data = bytearray(struct.pack(">h", value))
152-
elif re.match("^VD[0-9]+$", vm_address):
153-
# byte value
154-
logger.info(f"DWord address: {vm_address}")
155-
start = int(vm_address[2:])
156-
wordlen = WordLen.DWord
140+
elif wordlen == WordLen.DWord:
157141
data = bytearray(struct.pack(">l", value))
158142
else:
159-
logger.info(f"write, Unknown address format: {vm_address}")
160-
return 1
161-
162-
if wordlen == WordLen.Bit:
163-
type_ = WordLen.Byte.ctype
164-
else:
165-
type_ = wordlen.ctype
143+
raise ValueError(f"Unknown wordlen {wordlen}")
166144

167-
cdata = (type_ * amount).from_buffer_copy(data)
145+
cdata = (type_ * size).from_buffer_copy(data)
168146

169147
logger.debug(f"write, vm_address:{vm_address} value:{value}")
170148

171-
result = self._lib.Cli_WriteArea(self._s7_client, area, db_number, start, amount, wordlen, byref(cdata))
149+
result = self._lib.Cli_WriteArea(self._s7_client, area, db_number, start, size, wordlen, byref(cdata))
172150
check_error(result, context="client")
173151
return result

snap7/server/__init__.py

+34-34
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def __init__(self, log: bool = True):
4444
event logging to python logging.
4545
4646
Args:
47-
log: `True` for enabling the event logging. Optinoal.
47+
log: `True` for enabling the event logging.
4848
"""
4949
self._lib: Snap7CliProtocol = load_library()
5050
self.create()
@@ -110,49 +110,49 @@ def set_events_callback(self, call_back: Callable[..., Any]) -> int:
110110
logger.info("setting event callback")
111111
callback_wrap: Callable[..., Any] = CFUNCTYPE(None, c_void_p, POINTER(SrvEvent), c_int)
112112

113-
def wrapper(_: Optional[c_void_p], pevent: SrvEvent, __: int) -> int:
113+
def wrapper(_: Optional[c_void_p], event: SrvEvent, __: int) -> int:
114114
"""Wraps python function into a ctypes function
115115
116116
Args:
117117
_: not used
118-
pevent: pointer to snap7 event struct
118+
event: pointer to snap7 event struct
119119
__: not used
120120
121121
Returns:
122122
Should return an int
123123
"""
124-
logger.info(f"callback event: {self.event_text(pevent.contents)}")
125-
call_back(pevent.contents)
124+
logger.info(f"callback event: {self.event_text(event.contents)}")
125+
call_back(event.contents)
126126
return 0
127127

128128
self._callback = cast(type[CFuncPtr], callback_wrap(wrapper))
129-
usrPtr = c_void_p()
130-
return self._lib.Srv_SetEventsCallback(self._s7_server, self._callback, usrPtr)
129+
data = c_void_p()
130+
return self._lib.Srv_SetEventsCallback(self._s7_server, self._callback, data)
131131

132132
@error_wrap(context="server")
133133
def set_read_events_callback(self, call_back: Callable[..., Any]) -> int:
134134
"""Sets the user callback that the Server object has to call when a Read
135135
event is created.
136136
137137
Args:
138-
call_back: a callback function that accepts a pevent argument.
138+
call_back: a callback function that accepts an event argument.
139139
"""
140140
logger.info("setting read event callback")
141141
callback_wrapper: Callable[..., Any] = CFUNCTYPE(None, c_void_p, POINTER(SrvEvent), c_int)
142142

143-
def wrapper(usrptr: Optional[c_void_p], pevent: SrvEvent, size: int) -> int:
143+
def wrapper(_: Optional[c_void_p], event: SrvEvent, __: int) -> int:
144144
"""Wraps python function into a ctypes function
145145
146146
Args:
147-
usrptr: not used
148-
pevent: pointer to snap7 event struct
149-
size:
147+
_: data, not used
148+
event: pointer to snap7 event struct
149+
__: size, not used
150150
151151
Returns:
152152
Should return an int
153153
"""
154-
logger.info(f"callback event: {self.event_text(pevent.contents)}")
155-
call_back(pevent.contents)
154+
logger.info(f"callback event: {self.event_text(event.contents)}")
155+
call_back(event.contents)
156156
return 0
157157

158158
self._read_callback = callback_wrapper(wrapper)
@@ -168,16 +168,16 @@ def log_callback(event: SrvEvent) -> None:
168168
self.set_events_callback(log_callback)
169169

170170
@error_wrap(context="server")
171-
def start(self, tcpport: int = 102) -> int:
171+
def start(self, tcp_port: int = 102) -> int:
172172
"""Starts the server.
173173
174174
Args:
175-
tcpport: port that the server will listen. Optional.
175+
tcp_port: port that the server will listen. Optional.
176176
"""
177-
if tcpport != 102:
178-
logger.info(f"setting server TCP port to {tcpport}")
179-
self.set_param(Parameter.LocalPort, tcpport)
180-
logger.info(f"starting server on 0.0.0.0:{tcpport}")
177+
if tcp_port != 102:
178+
logger.info(f"setting server TCP port to {tcp_port}")
179+
self.set_param(Parameter.LocalPort, tcp_port)
180+
logger.info(f"starting server on 0.0.0.0:{tcp_port}")
181181
return self._lib.Srv_Start(self._s7_server)
182182

183183
@error_wrap(context="server")
@@ -208,11 +208,11 @@ def get_status(self) -> Tuple[str, str, int]:
208208
error = self._lib.Srv_GetStatus(self._s7_server, byref(server_status), byref(cpu_status), byref(clients_count))
209209
check_error(error)
210210
logger.debug(f"status server {server_status.value} cpu {cpu_status.value} clients {clients_count.value}")
211-
return (server_statuses[server_status.value], cpu_statuses[cpu_status.value], clients_count.value)
211+
return server_statuses[server_status.value], cpu_statuses[cpu_status.value], clients_count.value
212212

213213
@error_wrap(context="server")
214214
def unregister_area(self, area: SrvArea, index: int) -> int:
215-
"""'Unshares' a memory area previously shared with Srv_RegisterArea().
215+
"""Unregisters a memory area previously registered with Srv_RegisterArea().
216216
217217
Notes:
218218
That memory block will be no longer visible by the clients.
@@ -345,7 +345,7 @@ def get_param(self, number: int) -> int:
345345
Returns:
346346
Value of the parameter.
347347
"""
348-
logger.debug(f"retreiving param number {number}")
348+
logger.debug(f"retrieving param number {number}")
349349
value = c_int()
350350
code = self._lib.Srv_GetParam(self._s7_server, number, byref(value))
351351
check_error(code)
@@ -377,32 +377,32 @@ def clear_events(self) -> int:
377377
return self._lib.Srv_ClearEvents(self._s7_server)
378378

379379

380-
def mainloop(tcpport: int = 1102, init_standard_values: bool = False) -> None:
380+
def mainloop(tcp_port: int = 1102, init_standard_values: bool = False) -> None:
381381
"""Init a fake Snap7 server with some default values.
382382
383383
Args:
384-
tcpport: port that the server will listen.
384+
tcp_port: port that the server will listen.
385385
init_standard_values: if `True` will init some defaults values to be read on DB0.
386386
"""
387387

388388
server = Server()
389389
size = 100
390-
DBdata: CDataArrayType = (WordLen.Byte.ctype * size)()
391-
PAdata: CDataArrayType = (WordLen.Byte.ctype * size)()
392-
TMdata: CDataArrayType = (WordLen.Byte.ctype * size)()
393-
CTdata: CDataArrayType = (WordLen.Byte.ctype * size)()
394-
server.register_area(SrvArea.DB, 1, DBdata)
395-
server.register_area(SrvArea.PA, 1, PAdata)
396-
server.register_area(SrvArea.TM, 1, TMdata)
397-
server.register_area(SrvArea.CT, 1, CTdata)
390+
db_data: CDataArrayType = (WordLen.Byte.ctype * size)()
391+
pa_data: CDataArrayType = (WordLen.Byte.ctype * size)()
392+
tm_data: CDataArrayType = (WordLen.Byte.ctype * size)()
393+
ct_data: CDataArrayType = (WordLen.Byte.ctype * size)()
394+
server.register_area(SrvArea.DB, 1, db_data)
395+
server.register_area(SrvArea.PA, 1, pa_data)
396+
server.register_area(SrvArea.TM, 1, tm_data)
397+
server.register_area(SrvArea.CT, 1, ct_data)
398398

399399
if init_standard_values:
400400
logger.info("initialising with standard values")
401401
ba = _init_standard_values()
402402
userdata = WordLen.Byte.ctype * len(ba)
403403
server.register_area(SrvArea.DB, 0, userdata.from_buffer(ba))
404404

405-
server.start(tcpport=tcpport)
405+
server.start(tcp_port=tcp_port)
406406
while True:
407407
while True:
408408
event = server.pick_event()

snap7/util/getters.py

+10-19
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ def get_word(bytearray_: bytearray, byte_index: int) -> bytearray:
6262
Word value.
6363
6464
Examples:
65-
>>> data = bytearray([0, 100]) # two bytes for a word
66-
>>> get_word(data, 0)
65+
>>> get_word(bytearray([0, 100]), 0)
6766
100
6867
"""
6968
data = bytearray_[byte_index : byte_index + 2]
@@ -88,8 +87,7 @@ def get_int(bytearray_: bytearray, byte_index: int) -> int:
8887
Value read.
8988
9089
Examples:
91-
>>> data = bytearray([0, 255])
92-
>>> get_int(data, 0)
90+
>>> get_int(bytearray([0, 255]), 0)
9391
255
9492
"""
9593
data = bytearray_[byte_index : byte_index + 2]
@@ -120,12 +118,7 @@ def get_uint(bytearray_: bytearray, byte_index: int) -> int:
120118
>>> get_uint(data, 0)
121119
65535
122120
"""
123-
data = bytearray_[byte_index : byte_index + 2]
124-
data[1] = data[1] & 0xFF
125-
data[0] = data[0] & 0xFF
126-
packed = struct.pack("2B", *data)
127-
value: int = struct.unpack(">H", packed)[0]
128-
return value
121+
return int(get_word(bytearray_, byte_index))
129122

130123

131124
def get_real(bytearray_: bytearray, byte_index: int) -> float:
@@ -467,7 +460,7 @@ def get_sint(bytearray_: bytearray, byte_index: int) -> int:
467460
return value
468461

469462

470-
def get_lint(bytearray_: bytearray, byte_index: int) -> NoReturn:
463+
def get_lint(bytearray_: bytearray, byte_index: int) -> int:
471464
"""Get the long int
472465
473466
THIS VALUE IS NEITHER TESTED NOR VERIFIED BY A REAL PLC AT THE MOMENT
@@ -492,10 +485,9 @@ def get_lint(bytearray_: bytearray, byte_index: int) -> NoReturn:
492485
12345
493486
"""
494487

495-
# raw_lint = bytearray_[byte_index:byte_index + 8]
496-
# lint = struct.unpack('>q', struct.pack('8B', *raw_lint))[0]
497-
# return lint
498-
raise NotImplementedError
488+
raw_lint = bytearray_[byte_index : byte_index + 8]
489+
lint = struct.unpack(">q", struct.pack("8B", *raw_lint))[0]
490+
return int(lint)
499491

500492

501493
def get_lreal(bytearray_: bytearray, byte_index: int) -> float:
@@ -548,10 +540,9 @@ def get_lword(bytearray_: bytearray, byte_index: int) -> bytearray:
548540
>>> get_lword(data, 0)
549541
bytearray(b"\\x00\\x00\\x00\\x00\\x00\\x00\\xAB\\xCD")
550542
"""
551-
# data = bytearray_[byte_index:byte_index + 4]
552-
# dword = struct.unpack('>Q', struct.pack('8B', *data))[0]
553-
# return bytearray(dword)
554-
raise NotImplementedError
543+
data = bytearray_[byte_index : byte_index + 4]
544+
dword = struct.unpack(">Q", struct.pack("8B", *data))[0]
545+
return bytearray(dword)
555546

556547

557548
def get_ulint(bytearray_: bytearray, byte_index: int) -> int:

0 commit comments

Comments
 (0)