1
1
import asyncio
2
2
import socket
3
- import time
4
3
from dataclasses import dataclass , field
5
4
from enum import IntEnum
6
5
@@ -30,41 +29,70 @@ class SNMPServer(Driver):
30
29
timeout : int = 3
31
30
plug : int = field ()
32
31
oid : str = field (default = "1.3.6.1.4.1.13742.6.4.1.2.1.2.1" )
32
+ auth_protocol : str = field (default = None ) # 'MD5' or 'SHA'
33
+ auth_key : str = field (default = None )
34
+ priv_protocol : str = field (default = None ) # 'DES' or 'AES'
35
+ priv_key : str = field (default = None )
33
36
34
37
def __post_init__ (self ):
35
38
if hasattr (super (), "__post_init__" ):
36
39
super ().__post_init__ ()
37
40
38
41
try :
39
42
self .ip_address = socket .gethostbyname (self .host )
40
- self .logger .info (f"Resolved { self .host } to { self .ip_address } " )
43
+ self .logger .debug (f"Resolved { self .host } to { self .ip_address } " )
41
44
except socket .gaierror as e :
42
45
raise SNMPError (f"Failed to resolve hostname { self .host } : { e } " ) from e
43
46
44
47
self .full_oid = tuple (int (x ) for x in self .oid .split ('.' )) + (self .plug ,)
45
48
46
49
def _setup_snmp (self ):
47
50
try :
48
- # TODO: switch to anyio?
49
51
asyncio .get_running_loop ()
50
52
except RuntimeError :
51
53
loop = asyncio .new_event_loop ()
52
54
asyncio .set_event_loop (loop )
53
55
54
-
55
56
snmp_engine = engine .SnmpEngine ()
56
57
57
- config .add_v3_user (
58
- snmp_engine ,
59
- self .user ,
60
- config .USM_AUTH_NONE ,
61
- None
62
- )
58
+ if self .auth_protocol and self .auth_key :
59
+ if self .priv_protocol and self .priv_key :
60
+ security_level = 'authPriv'
61
+ auth_protocol = getattr (config , f'usmHMAC{ self .auth_protocol } AuthProtocol' )
62
+ priv_protocol = getattr (config , f'usmPriv{ self .priv_protocol } Protocol' )
63
+
64
+ config .add_v3_user (
65
+ snmp_engine ,
66
+ self .user ,
67
+ auth_protocol ,
68
+ self .auth_key ,
69
+ priv_protocol ,
70
+ self .priv_key
71
+ )
72
+ else :
73
+ security_level = 'authNoPriv'
74
+ auth_protocol = getattr (config , f'usmHMAC{ self .auth_protocol } AuthProtocol' )
75
+
76
+ config .add_v3_user (
77
+ snmp_engine ,
78
+ self .user ,
79
+ auth_protocol ,
80
+ self .auth_key
81
+ )
82
+ else :
83
+ security_level = 'noAuthNoPriv'
84
+ config .add_v3_user (
85
+ snmp_engine ,
86
+ self .user ,
87
+ config .USM_AUTH_NONE ,
88
+ None
89
+ )
90
+
63
91
config .add_target_parameters (
64
92
snmp_engine ,
65
93
"my-creds" ,
66
94
self .user ,
67
- "noAuthNoPriv"
95
+ security_level
68
96
)
69
97
70
98
config .add_transport (
@@ -92,12 +120,12 @@ def _snmp_set(self, state: PowerState):
92
120
93
121
def callback (snmpEngine , sendRequestHandle , errorIndication ,
94
122
errorStatus , errorIndex , varBinds , cbCtx ):
95
- self .logger .info (f"Callback { errorIndication } { errorStatus } { errorIndex } { varBinds } " )
123
+ self .logger .debug (f"Callback { errorIndication } { errorStatus } { errorIndex } { varBinds } " )
96
124
if errorIndication :
97
- self .logger .info (f"SNMP error: { errorIndication } " )
125
+ self .logger .error (f"SNMP error: { errorIndication } " )
98
126
result ["error" ] = f"SNMP error: { errorIndication } "
99
127
elif errorStatus :
100
- self .logger .info (f"SNMP status: { errorStatus } " )
128
+ self .logger .error (f"SNMP status: { errorStatus } " )
101
129
result ["error" ] = (
102
130
f"SNMP error: { errorStatus .prettyPrint ()} at "
103
131
f"{ varBinds [int (errorIndex ) - 1 ][0 ] if errorIndex else '?' } "
@@ -106,7 +134,7 @@ def callback(snmpEngine, sendRequestHandle, errorIndication,
106
134
result ["success" ] = True
107
135
for oid , val in varBinds :
108
136
self .logger .debug (f"{ oid .prettyPrint ()} = { val .prettyPrint ()} " )
109
- self .logger .info (f"SNMP set result: { result } " )
137
+ self .logger .debug (f"SNMP set result: { result } " )
110
138
111
139
try :
112
140
self .logger .info (f"Sending power { state .name } command to { self .host } " )
@@ -136,30 +164,15 @@ def callback(snmpEngine, sendRequestHandle, errorIndication,
136
164
raise SNMPError (error_msg ) from e
137
165
138
166
@export
139
- def power_on (self ):
167
+ def on (self ):
140
168
"""Turn power on"""
141
169
return self ._snmp_set (PowerState .ON )
142
170
143
171
@export
144
- def power_off (self ):
172
+ def off (self ):
145
173
"""Turn power off"""
146
174
return self ._snmp_set (PowerState .OFF )
147
175
148
- @export
149
- def power_cycle (self ):
150
- """Power cycle the device"""
151
- try :
152
- self .logger .info ("Starting power cycle sequence" )
153
- self .power_off ()
154
- self .logger .info (f"Waiting { self .quiescent_period } seconds..." )
155
- time .sleep (self .quiescent_period )
156
- self .power_on ()
157
- return "Power cycle completed successfully"
158
- except Exception as e :
159
- error_msg = f"Power cycle failed: { str (e )} "
160
- self .logger .error (error_msg )
161
- raise SNMPError (error_msg ) from e
162
-
163
176
def close (self ):
164
177
"""No cleanup needed since engines are created per operation"""
165
178
if hasattr (super (), "close" ):
0 commit comments