5
5
import re
6
6
import logging
7
7
from ctypes import CFUNCTYPE , byref , create_string_buffer , sizeof
8
- from ctypes import Array , c_byte , c_char_p , c_int , c_int32 , c_uint16 , c_ulong , c_void_p
8
+ from ctypes import Array , _SimpleCData , c_byte , c_char_p , c_int , c_int32 , c_uint16 , c_ulong , c_void_p
9
9
from datetime import datetime
10
- from typing import Any , Callable , List , Optional , Tuple , Union
10
+ from typing import Any , Callable , Hashable , List , Optional , Tuple , Union , Type
11
+ from types import TracebackType
11
12
12
13
from ..common import check_error , ipv4 , load_library
14
+ from ..protocol import Snap7CliProtocol
13
15
from ..types import S7SZL , Areas , BlocksList , S7CpInfo , S7CpuInfo , S7DataItem
14
16
from ..types import S7OrderCode , S7Protection , S7SZLList , TS7BlockInfo , WordLen
15
17
from ..types import S7Object , buffer_size , buffer_type , cpu_statuses , param_types
18
20
logger = logging .getLogger (__name__ )
19
21
20
22
21
- def error_wrap (func ) :
23
+ def error_wrap (func : Callable [..., Any ]) -> Callable [..., Any ] :
22
24
"""Parses a s7 error code returned the decorated function."""
23
25
24
- def f (* args , ** kw ) :
25
- code = func (* args , ** kw )
26
+ def f (* args : tuple [ Any , ...], ** kwargs : dict [ Hashable , Any ]) -> None :
27
+ code = func (* args , ** kwargs )
26
28
check_error (code , context = "client" )
27
29
28
30
return f
@@ -47,10 +49,10 @@ class Client:
47
49
>>> client.db_write(1, 0, data)
48
50
"""
49
51
50
- _lib : Any # since this is dynamically loaded from a DLL we don't have the type signature.
52
+ _lib : Snap7CliProtocol
51
53
_read_callback = None
52
54
_callback = None
53
- _s7_client : Optional [ S7Object ] = None
55
+ _s7_client : S7Object
54
56
55
57
def __init__ (self , lib_location : Optional [str ] = None ):
56
58
"""Creates a new `Client` instance.
@@ -66,22 +68,24 @@ def __init__(self, lib_location: Optional[str] = None):
66
68
<snap7.client.Client object at 0x0000028B257128E0>
67
69
"""
68
70
69
- self ._lib = load_library (lib_location )
71
+ self ._lib : Snap7CliProtocol = load_library (lib_location )
70
72
self .create ()
71
73
72
- def __enter__ (self ):
74
+ def __enter__ (self ) -> "Client" :
73
75
return self
74
76
75
- def __exit__ (self , exc_type , exc_val , exc_tb ):
77
+ def __exit__ (
78
+ self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
79
+ ) -> None :
76
80
self .destroy ()
77
81
78
- def __del__ (self ):
82
+ def __del__ (self ) -> None :
79
83
self .destroy ()
80
84
81
- def create (self ):
85
+ def create (self ) -> None :
82
86
"""Creates a SNAP7 client."""
83
87
logger .info ("creating snap7 client" )
84
- self ._lib .Cli_Create .restype = S7Object
88
+ self ._lib .Cli_Create .restype = S7Object # type: ignore[attr-defined]
85
89
self ._s7_client = S7Object (self ._lib .Cli_Create ())
86
90
87
91
def destroy (self ) -> Optional [int ]:
@@ -97,7 +101,7 @@ def destroy(self) -> Optional[int]:
97
101
logger .info ("destroying snap7 client" )
98
102
if self ._lib and self ._s7_client is not None :
99
103
return self ._lib .Cli_Destroy (byref (self ._s7_client ))
100
- self ._s7_client = None
104
+ self ._s7_client = None # type: ignore[assignment]
101
105
return None
102
106
103
107
def plc_stop (self ) -> int :
@@ -199,7 +203,7 @@ def connect(self, address: str, rack: int, slot: int, tcpport: int = 102) -> int
199
203
"""
200
204
logger .info (f"connecting to { address } :{ tcpport } rack { rack } slot { slot } " )
201
205
202
- self .set_param (RemotePort , tcpport )
206
+ self .set_param (number = RemotePort , value = tcpport )
203
207
return self ._lib .Cli_ConnectTo (self ._s7_client , c_char_p (address .encode ()), c_int (rack ), c_int (slot ))
204
208
205
209
def db_read (self , db_number : int , start : int , size : int ) -> bytearray :
@@ -441,7 +445,7 @@ def write_area(self, area: Areas, dbnumber: int, start: int, data: bytearray) ->
441
445
cdata = (type_ * len (data )).from_buffer_copy (data )
442
446
return self ._lib .Cli_WriteArea (self ._s7_client , area .value , dbnumber , start , size , wordlen .value , byref (cdata ))
443
447
444
- def read_multi_vars (self , items ) -> Tuple [int , S7DataItem ]:
448
+ def read_multi_vars (self , items : Array [ S7DataItem ] ) -> Tuple [int , Array [ S7DataItem ] ]:
445
449
"""Reads different kind of variables from a PLC simultaneously.
446
450
447
451
Args:
@@ -472,7 +476,7 @@ def list_blocks(self) -> BlocksList:
472
476
logger .debug (f"blocks: { blocksList } " )
473
477
return blocksList
474
478
475
- def list_blocks_of_type (self , blocktype : str , size : int ) -> Union [int , Array ]:
479
+ def list_blocks_of_type (self , blocktype : str , size : int ) -> Union [int , Array [ c_uint16 ] ]:
476
480
"""This function returns the AG list of a specified block type.
477
481
478
482
Args:
@@ -592,11 +596,11 @@ def set_connection_params(self, address: str, local_tsap: int, remote_tsap: int)
592
596
"""
593
597
if not re .match (ipv4 , address ):
594
598
raise ValueError (f"{ address } is invalid ipv4" )
595
- result = self ._lib .Cli_SetConnectionParams (self ._s7_client , address , c_uint16 (local_tsap ), c_uint16 (remote_tsap ))
599
+ result = self ._lib .Cli_SetConnectionParams (self ._s7_client , address . encode () , c_uint16 (local_tsap ), c_uint16 (remote_tsap ))
596
600
if result != 0 :
597
601
raise ValueError ("The parameter was invalid" )
598
602
599
- def set_connection_type (self , connection_type : int ):
603
+ def set_connection_type (self , connection_type : int ) -> None :
600
604
"""Sets the connection resource type, i.e the way in which the Clients connects to a PLC.
601
605
602
606
Args:
@@ -659,7 +663,7 @@ def ab_write(self, start: int, data: bytearray) -> int:
659
663
logger .debug (f"ab write: start: { start } : size: { size } : " )
660
664
return self ._lib .Cli_ABWrite (self ._s7_client , start , size , byref (cdata ))
661
665
662
- def as_ab_read (self , start : int , size : int , data ) -> int :
666
+ def as_ab_read (self , start : int , size : int , data : "Array[_SimpleCData[Any]]" ) -> int :
663
667
"""Reads a part of IPU area from a PLC asynchronously.
664
668
665
669
Args:
@@ -720,7 +724,7 @@ def as_copy_ram_to_rom(self, timeout: int = 1) -> int:
720
724
check_error (result , context = "client" )
721
725
return result
722
726
723
- def as_ct_read (self , start : int , amount : int , data ) -> int :
727
+ def as_ct_read (self , start : int , amount : int , data : "Array[_SimpleCData[Any]]" ) -> int :
724
728
"""Reads counters from a PLC asynchronously.
725
729
726
730
Args:
@@ -752,7 +756,7 @@ def as_ct_write(self, start: int, amount: int, data: bytearray) -> int:
752
756
check_error (result , context = "client" )
753
757
return result
754
758
755
- def as_db_fill (self , db_number : int , filler ) -> int :
759
+ def as_db_fill (self , db_number : int , filler : int ) -> int :
756
760
"""Fills a DB in AG with a given byte.
757
761
758
762
Args:
@@ -766,7 +770,7 @@ def as_db_fill(self, db_number: int, filler) -> int:
766
770
check_error (result , context = "client" )
767
771
return result
768
772
769
- def as_db_get (self , db_number : int , _buffer , size ) -> bytearray :
773
+ def as_db_get (self , db_number : int , _buffer : "Array[_SimpleCData[Any]]" , size : "_SimpleCData[Any]" ) -> int :
770
774
"""Uploads a DB from AG using DBRead.
771
775
772
776
Note:
@@ -784,7 +788,7 @@ def as_db_get(self, db_number: int, _buffer, size) -> bytearray:
784
788
check_error (result , context = "client" )
785
789
return result
786
790
787
- def as_db_read (self , db_number : int , start : int , size : int , data ) -> Array :
791
+ def as_db_read (self , db_number : int , start : int , size : int , data : "Array[_SimpleCData[Any]]" ) -> int :
788
792
"""Reads a part of a DB from a PLC.
789
793
790
794
Args:
@@ -807,7 +811,7 @@ def as_db_read(self, db_number: int, start: int, size: int, data) -> Array:
807
811
check_error (result , context = "client" )
808
812
return result
809
813
810
- def as_db_write (self , db_number : int , start : int , size : int , data ) -> int :
814
+ def as_db_write (self , db_number : int , start : int , size : int , data : "Array[_SimpleCData[Any]]" ) -> int :
811
815
"""Writes a part of a DB into a PLC.
812
816
813
817
Args:
@@ -943,7 +947,7 @@ def set_plc_datetime(self, dt: datetime) -> int:
943
947
944
948
return self ._lib .Cli_SetPlcDateTime (self ._s7_client , byref (buffer ))
945
949
946
- def check_as_completion (self , p_value ) -> int :
950
+ def check_as_completion (self , p_value : c_int ) -> int :
947
951
"""Method to check Status of an async request. Result contains if the check was successful, not the data value itself
948
952
949
953
Args:
@@ -952,7 +956,7 @@ def check_as_completion(self, p_value) -> int:
952
956
Returns:
953
957
Snap7 code. If 0 - Job is done successfully. If 1 - Job is either pending or contains s7errors
954
958
"""
955
- result = self ._lib .Cli_CheckAsCompletion (self ._s7_client , p_value )
959
+ result = self ._lib .Cli_CheckAsCompletion (self ._s7_client , byref ( p_value ) )
956
960
check_error (result , context = "client" )
957
961
return result
958
962
@@ -1000,7 +1004,7 @@ def wait_as_completion(self, timeout: int) -> int:
1000
1004
check_error (result , context = "client" )
1001
1005
return result
1002
1006
1003
- def _prepare_as_read_area (self , area : Areas , size : int ) -> Tuple [WordLen , Array ]:
1007
+ def _prepare_as_read_area (self , area : Areas , size : int ) -> Tuple [WordLen , " Array[_SimpleCData[int]]" ]:
1004
1008
if area not in Areas :
1005
1009
raise ValueError (f"{ area } is not implemented in types" )
1006
1010
elif area == Areas .TM :
@@ -1013,7 +1017,9 @@ def _prepare_as_read_area(self, area: Areas, size: int) -> Tuple[WordLen, Array]
1013
1017
usrdata = (type_ * size )()
1014
1018
return wordlen , usrdata
1015
1019
1016
- def as_read_area (self , area : Areas , dbnumber : int , start : int , size : int , wordlen : WordLen , pusrdata ) -> int :
1020
+ def as_read_area (
1021
+ self , area : Areas , dbnumber : int , start : int , size : int , wordlen : WordLen , pusrdata : "Array[_SimpleCData[Any]]"
1022
+ ) -> int :
1017
1023
"""Reads a data area from a PLC asynchronously.
1018
1024
With it you can read DB, Inputs, Outputs, Merkers, Timers and Counters.
1019
1025
@@ -1032,11 +1038,11 @@ def as_read_area(self, area: Areas, dbnumber: int, start: int, size: int, wordle
1032
1038
f"reading area: { area .name } dbnumber: { dbnumber } start: { start } amount: { size } "
1033
1039
f"wordlen: { wordlen .name } ={ wordlen .value } "
1034
1040
)
1035
- result = self ._lib .Cli_AsReadArea (self ._s7_client , area .value , dbnumber , start , size , wordlen .value , pusrdata )
1041
+ result = self ._lib .Cli_AsReadArea (self ._s7_client , area .value , dbnumber , start , size , wordlen .value , byref ( pusrdata ) )
1036
1042
check_error (result , context = "client" )
1037
1043
return result
1038
1044
1039
- def _prepare_as_write_area (self , area : Areas , data : bytearray ) -> Tuple [WordLen , Array ]:
1045
+ def _prepare_as_write_area (self , area : Areas , data : bytearray ) -> Tuple [WordLen , " Array[_SimpleCData[Any]]" ]:
1040
1046
if area not in Areas :
1041
1047
raise ValueError (f"{ area } is not implemented in types" )
1042
1048
elif area == Areas .TM :
@@ -1049,7 +1055,9 @@ def _prepare_as_write_area(self, area: Areas, data: bytearray) -> Tuple[WordLen,
1049
1055
cdata = (type_ * len (data )).from_buffer_copy (data )
1050
1056
return wordlen , cdata
1051
1057
1052
- def as_write_area (self , area : Areas , dbnumber : int , start : int , size : int , wordlen : WordLen , pusrdata ) -> int :
1058
+ def as_write_area (
1059
+ self , area : Areas , dbnumber : int , start : int , size : int , wordlen : WordLen , pusrdata : "Array[_SimpleCData[Any]]"
1060
+ ) -> int :
1053
1061
"""Writes a data area into a PLC asynchronously.
1054
1062
1055
1063
Args:
@@ -1072,7 +1080,7 @@ def as_write_area(self, area: Areas, dbnumber: int, start: int, size: int, wordl
1072
1080
check_error (res , context = "client" )
1073
1081
return res
1074
1082
1075
- def as_eb_read (self , start : int , size : int , data ) -> int :
1083
+ def as_eb_read (self , start : int , size : int , data : "Array[_SimpleCData[Any]]" ) -> int :
1076
1084
"""Reads a part of IPI area from a PLC asynchronously.
1077
1085
1078
1086
Args:
@@ -1124,7 +1132,7 @@ def as_full_upload(self, _type: str, block_num: int) -> int:
1124
1132
check_error (result , context = "client" )
1125
1133
return result
1126
1134
1127
- def as_list_blocks_of_type (self , blocktype : str , data , count ) -> int :
1135
+ def as_list_blocks_of_type (self , blocktype : str , data : "Array[_SimpleCData[Any]]" , count : "_SimpleCData[Any]" ) -> int :
1128
1136
"""Returns the AG blocks list of a given type.
1129
1137
1130
1138
Args:
@@ -1145,7 +1153,7 @@ def as_list_blocks_of_type(self, blocktype: str, data, count) -> int:
1145
1153
check_error (result , context = "client" )
1146
1154
return result
1147
1155
1148
- def as_mb_read (self , start : int , size : int , data ) -> int :
1156
+ def as_mb_read (self , start : int , size : int , data : "Array[_SimpleCData[Any]]" ) -> int :
1149
1157
"""Reads a part of Merkers area from a PLC.
1150
1158
1151
1159
Args:
@@ -1177,7 +1185,7 @@ def as_mb_write(self, start: int, size: int, data: bytearray) -> int:
1177
1185
check_error (result , context = "client" )
1178
1186
return result
1179
1187
1180
- def as_read_szl (self , ssl_id : int , index : int , s7_szl : S7SZL , size ) -> int :
1188
+ def as_read_szl (self , ssl_id : int , index : int , s7_szl : S7SZL , size : "_SimpleCData[Any]" ) -> int :
1181
1189
"""Reads a partial list of given ID and Index.
1182
1190
1183
1191
Args:
@@ -1193,7 +1201,7 @@ def as_read_szl(self, ssl_id: int, index: int, s7_szl: S7SZL, size) -> int:
1193
1201
check_error (result , context = "client" )
1194
1202
return result
1195
1203
1196
- def as_read_szl_list (self , szl_list , items_count ) -> int :
1204
+ def as_read_szl_list (self , szl_list : S7SZLList , items_count : "_SimpleCData[Any]" ) -> int :
1197
1205
"""Reads the list of partial lists available in the CPU.
1198
1206
1199
1207
Args:
@@ -1207,7 +1215,7 @@ def as_read_szl_list(self, szl_list, items_count) -> int:
1207
1215
check_error (result , context = "client" )
1208
1216
return result
1209
1217
1210
- def as_tm_read (self , start : int , amount : int , data ) -> bytearray :
1218
+ def as_tm_read (self , start : int , amount : int , data : "Array[_SimpleCData[Any]]" ) -> int :
1211
1219
"""Reads timers from a PLC.
1212
1220
1213
1221
Args:
@@ -1239,7 +1247,7 @@ def as_tm_write(self, start: int, amount: int, data: bytearray) -> int:
1239
1247
check_error (result )
1240
1248
return result
1241
1249
1242
- def as_upload (self , block_num : int , _buffer , size ) -> int :
1250
+ def as_upload (self , block_num : int , _buffer : "Array[_SimpleCData[Any]]" , size : "_SimpleCData[Any]" ) -> int :
1243
1251
"""Uploads a block from AG.
1244
1252
1245
1253
Note:
@@ -1363,7 +1371,7 @@ def error_text(self, error: int) -> str:
1363
1371
text_length = c_int (256 )
1364
1372
error_code = c_int32 (error )
1365
1373
text = create_string_buffer (buffer_size )
1366
- response = self ._lib .Cli_ErrorText (error_code , byref ( text ) , text_length )
1374
+ response = self ._lib .Cli_ErrorText (error_code , text , text_length )
1367
1375
check_error (response )
1368
1376
result = bytearray (text )[: text_length .value ].decode ().strip ("\x00 " )
1369
1377
return result
0 commit comments