From 4f1fad4f36a776966cc588265c7a82e89fbd9623 Mon Sep 17 00:00:00 2001 From: Benny Zlotnik Date: Thu, 6 Feb 2025 17:18:36 +0200 Subject: [PATCH] add tests and doc Signed-off-by: Benny Zlotnik --- docs/source/api-reference/drivers/snmp.md | 82 ++++++++++ .../examples/exporter.yaml | 18 ++ .../jumpstarter_driver_snmp/driver.py | 116 +++++++------ .../jumpstarter_driver_snmp/driver_test.py | 154 ++++++++++++++++++ 4 files changed, 324 insertions(+), 46 deletions(-) create mode 100644 docs/source/api-reference/drivers/snmp.md create mode 100644 packages/jumpstarter-driver-snmp/examples/exporter.yaml create mode 100644 packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver_test.py diff --git a/docs/source/api-reference/drivers/snmp.md b/docs/source/api-reference/drivers/snmp.md new file mode 100644 index 00000000..c42f52fe --- /dev/null +++ b/docs/source/api-reference/drivers/snmp.md @@ -0,0 +1,82 @@ +# SNMP + +**driver**: `jumpstarter_driver_snmp.driver.SNMPServer` + +A driver for controlling power via SNMP-enabled PDUs (Power Distribution Units). + +## Driver configuration +```yaml +export: + power: + type: "jumpstarter_driver_snmp.driver.SNMPServer" + config: + host: "pdu.mgmt.com" + user: "labuser" + plug: 32 + port: 161 + oid: "1.3.6.1.4.1.13742.6.4.1.2.1.2.1" + auth_protocol: "NONE" + auth_key: null + priv_protocol: "NONE" + priv_key: null + timeout: 5.0 +``` + +### Config parameters + +| Parameter | Description | Type | Required | Default | +|-----------|-------------|------|----------|---------| +| host | Hostname or IP address of the SNMP-enabled PDU | str | yes | | +| user | SNMP v3 username | str | yes | | +| plug | PDU outlet number to control | int | yes | | +| port | SNMP port number | int | no | 161 | +| oid | Base OID for power control | str | no | "1.3.6.1.4.1.13742.6.4.1.2.1.2.1" | +| auth_protocol | Authentication protocol ("NONE", "MD5", "SHA") | str | no | "NONE" | +| auth_key | Authentication key when auth_protocol is not "NONE" | str | no | null | +| priv_protocol | Privacy protocol ("NONE", "DES", "AES") | str | no | "NONE" | +| priv_key | Privacy key when priv_protocol is not "NONE" | str | no | null | +| timeout | SNMP timeout in seconds | float | no | 5.0 | + +## SNMPServerClient API + +### Methods + +#### on() +Turn power on for the configured outlet. + +Returns: +- str: Confirmation message + +#### off() +Turn power off for the configured outlet. + +Returns: +- str: Confirmation message + +#### cycle(quiescent_period: int = 2) +Power cycle the device with a configurable wait period between off and on states. + +Parameters: +- quiescent_period: Time to wait in seconds between power off and power on + +Returns: +- str: Confirmation message + +## Examples + +Power cycling a device: +```python +snmp_client.cycle(quiescent_period=3) +``` + +Basic power control: +```python +snmp_client.off() +snmp_client.on() +``` + +Using the CLI: +```bash +j power on +j power off +j power cycle --wait 3 diff --git a/packages/jumpstarter-driver-snmp/examples/exporter.yaml b/packages/jumpstarter-driver-snmp/examples/exporter.yaml new file mode 100644 index 00000000..31a59aec --- /dev/null +++ b/packages/jumpstarter-driver-snmp/examples/exporter.yaml @@ -0,0 +1,18 @@ +apiVersion: jumpstarter.dev/v1alpha1 +kind: ExporterConfig +endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082 +metadata: + namespace: default + name: demo +tls: + ca: '' + insecure: true +token: +export: + power: + type: "jumpstarter_driver_snmp.driver.SNMPServer" + config: + host: "pdu.mgmt.com" + user: "labuser" + plug: 32 + oid: "1.3.6.1.4.1.13742.6.4.1.2.1.2.1" diff --git a/packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver.py b/packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver.py index 458b9c76..3f5cd857 100644 --- a/packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver.py +++ b/packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver.py @@ -1,7 +1,7 @@ import asyncio import socket from dataclasses import dataclass, field -from enum import IntEnum +from enum import Enum, IntEnum from pysnmp.carrier.asyncio.dgram import udp from pysnmp.entity import config, engine @@ -11,6 +11,16 @@ from jumpstarter.driver import Driver, export +class AuthProtocol(str, Enum): + NONE = "NONE" + MD5 = "MD5" + SHA = "SHA" + +class PrivProtocol(str, Enum): + NONE = "NONE" + DES = "DES" + AES = "AES" + class PowerState(IntEnum): OFF = 0 ON = 1 @@ -25,14 +35,13 @@ class SNMPServer(Driver): host: str = field() user: str = field() port: int = field(default=161) - quiescent_period: int = field(default=5) - timeout: int = 3 plug: int = field() oid: str = field(default="1.3.6.1.4.1.13742.6.4.1.2.1.2.1") - auth_protocol: str = field(default=None) # 'MD5' or 'SHA' - auth_key: str = field(default=None) - priv_protocol: str = field(default=None) # 'DES' or 'AES' - priv_key: str = field(default=None) + auth_protocol: AuthProtocol = field(default=AuthProtocol.NONE) + auth_key: str | None = field(default=None) + priv_protocol: PrivProtocol = field(default=PrivProtocol.NONE) + priv_key: str | None = field(default=None) + timeout: float = field(default=5.0) def __post_init__(self): if hasattr(super(), "__post_init__"): @@ -47,45 +56,54 @@ def __post_init__(self): self.full_oid = tuple(int(x) for x in self.oid.split('.')) + (self.plug,) def _setup_snmp(self): - try: - asyncio.get_running_loop() - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - snmp_engine = engine.SnmpEngine() - if self.auth_protocol and self.auth_key: - if self.priv_protocol and self.priv_key: - security_level = 'authPriv' - auth_protocol = getattr(config, f'usmHMAC{self.auth_protocol}AuthProtocol') - priv_protocol = getattr(config, f'usmPriv{self.priv_protocol}Protocol') - - config.add_v3_user( - snmp_engine, - self.user, - auth_protocol, - self.auth_key, - priv_protocol, - self.priv_key - ) - else: - security_level = 'authNoPriv' - auth_protocol = getattr(config, f'usmHMAC{self.auth_protocol}AuthProtocol') - - config.add_v3_user( - snmp_engine, - self.user, - auth_protocol, - self.auth_key - ) + AUTH_PROTOCOLS = { + AuthProtocol.NONE: config.USM_AUTH_NONE, + AuthProtocol.MD5: config.USM_AUTH_HMAC96_MD5, + AuthProtocol.SHA: config.USM_AUTH_HMAC96_SHA, + } + + PRIV_PROTOCOLS = { + PrivProtocol.NONE: config.USM_PRIV_NONE, + PrivProtocol.DES: config. USM_PRIV_CBC56_DES, + PrivProtocol.AES: config.USM_PRIV_CFB128_AES, + } + + auth_protocol = AUTH_PROTOCOLS[self.auth_protocol] + priv_protocol = PRIV_PROTOCOLS[self.priv_protocol] + + if self.auth_protocol == AuthProtocol.NONE: + security_level = "noAuthNoPriv" + elif self.priv_protocol == PrivProtocol.NONE: + security_level = "authNoPriv" else: - security_level = 'noAuthNoPriv' + security_level = "authPriv" + + if security_level == "noAuthNoPriv": + config.add_v3_user( + snmp_engine, + self.user + ) + elif security_level == "authNoPriv": + if not self.auth_key: + raise SNMPError("Authentication key required when auth_protocol is specified") config.add_v3_user( snmp_engine, self.user, - config.USM_AUTH_NONE, - None + auth_protocol, + self.auth_key + ) + else: + if not self.auth_key or not self.priv_key: + raise SNMPError("Both auth_key and priv_key required for authenticated privacy") + config.add_v3_user( + snmp_engine, + self.user, + auth_protocol, + self.auth_key, + priv_protocol, + self.priv_key ) config.add_target_parameters( @@ -95,18 +113,19 @@ def _setup_snmp(self): security_level ) - config.add_transport( + config.add_target_address( snmp_engine, + "my-target", udp.DOMAIN_NAME, - udp.UdpAsyncioTransport().open_client_mode() + (self.ip_address, self.port), + "my-creds", + timeout=int(self.timeout * 100), ) - config.add_target_address( + config.add_transport( snmp_engine, - "my-target", udp.DOMAIN_NAME, - (self.ip_address, self.port), - "my-creds" + udp.UdpAsyncioTransport().open_client_mode() ) return snmp_engine @@ -138,6 +157,11 @@ def callback(snmpEngine, sendRequestHandle, errorIndication, try: self.logger.info(f"Sending power {state.name} command to {self.host}") + try: + asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) snmp_engine = self._setup_snmp() diff --git a/packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver_test.py b/packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver_test.py new file mode 100644 index 00000000..880b3e2c --- /dev/null +++ b/packages/jumpstarter-driver-snmp/jumpstarter_driver_snmp/driver_test.py @@ -0,0 +1,154 @@ +from unittest.mock import MagicMock, patch + +import pytest +from pysnmp.entity import config as snmp_config + +from jumpstarter_driver_snmp.driver import AuthProtocol, PrivProtocol, SNMPServer + + +class MockMibObject: + def getInstIdFromIndices(self, *args): + return (1, 3, 6) + +def setup_mock_snmp_engine(): + mock_engine = MagicMock() + mock_builder = MagicMock() + + mock_entry = MockMibObject() + mock_builder.import_symbols.return_value = [mock_entry] + mock_engine.get_mib_builder.return_value = mock_builder + + mock_engine.transport_dispatcher = MagicMock() + mock_engine.transport_dispatcher.start = MagicMock() + mock_engine.transport_dispatcher.stop = MagicMock() + + return mock_engine + +@pytest.mark.parametrize("auth_config", [ + { + "user": "usr-no-auth", + "auth_protocol": AuthProtocol.NONE, + "auth_key": None, + "priv_protocol": PrivProtocol.NONE, + "priv_key": None, + "expected_args_len": 2 # only user and engine args for noAuth + }, + { + "user": "usr-md5-none", + "auth_protocol": AuthProtocol.MD5, + "auth_key": "authkey1", + "priv_protocol": PrivProtocol.NONE, + "priv_key": None, + "expected_args_len": 4 # engine, user, auth_protocol, auth_key + }, + { + "user": "usr-sha-des", + "auth_protocol": AuthProtocol.SHA, + "auth_key": "authkey1", + "priv_protocol": PrivProtocol.DES, + "priv_key": "privkey1", + "expected_args_len": 6 # engine, user, auth_protocol, auth_key, priv_protocol, priv_key + }, +]) +def test_snmp_auth_configurations(auth_config): + """Test different SNMP authentication configurations""" + with patch('pysnmp.entity.config.add_v3_user') as mock_add_user, \ + patch('pysnmp.entity.engine.SnmpEngine', return_value=setup_mock_snmp_engine()), \ + patch('pysnmp.entity.config.add_target_parameters'), \ + patch('pysnmp.entity.config.add_target_address'), \ + patch('pysnmp.entity.config.add_transport'): + + server = SNMPServer( + host="localhost", + user=auth_config["user"], + plug=1, + auth_protocol=auth_config["auth_protocol"], + auth_key=auth_config["auth_key"], + priv_protocol=auth_config["priv_protocol"], + priv_key=auth_config["priv_key"] + ) + + server._setup_snmp() + + args, _ = mock_add_user.call_args + + assert len(args) == auth_config["expected_args_len"] + + assert args[1] == auth_config["user"] + + if auth_config["auth_protocol"] != AuthProtocol.NONE: + if auth_config["auth_protocol"] == AuthProtocol.MD5: + expected_auth = snmp_config.USM_AUTH_HMAC96_MD5 + else: + expected_auth = snmp_config.USM_AUTH_HMAC96_SHA + + assert args[2] == expected_auth + assert args[3] == auth_config["auth_key"] + + if auth_config["priv_protocol"] != PrivProtocol.NONE: + if auth_config["priv_protocol"] == PrivProtocol.DES: + expected_priv = snmp_config.USM_PRIV_CBC56_DES + else: + expected_priv = snmp_config.USM_PRIV_CFB128_AES + + assert args[4] == expected_priv + assert args[5] == auth_config["priv_key"] + +@patch('pysnmp.entity.config.add_v3_user') +@patch('pysnmp.entity.engine.SnmpEngine') +def test_power_on_command(mock_engine, mock_add_user): + """Test power on command execution""" + mock_engine.return_value = setup_mock_snmp_engine() + + with patch('pysnmp.entity.rfc3413.cmdgen.SetCommandGenerator.send_varbinds') as mock_send, \ + patch('asyncio.get_running_loop', side_effect=RuntimeError), \ + patch('asyncio.new_event_loop'), \ + patch('asyncio.set_event_loop'), \ + patch('pysnmp.entity.config.add_target_parameters'), \ + patch('pysnmp.entity.config.add_target_address'), \ + patch('pysnmp.entity.config.add_transport'): + + server = SNMPServer( + host="localhost", + user="testuser", + plug=1 + ) + + def side_effect(*args): + callback = args[-1] + callback(None, None, None, None, None, [], None) + mock_send.side_effect = side_effect + + result = server.on() + assert "Power ON command sent successfully" in result + mock_send.assert_called_once() + + +@patch('pysnmp.entity.config.add_v3_user') +@patch('pysnmp.entity.engine.SnmpEngine') +def test_power_off_command(mock_engine, mock_add_user): + """Test power off command execution""" + mock_engine.return_value = setup_mock_snmp_engine() + + with patch('pysnmp.entity.rfc3413.cmdgen.SetCommandGenerator.send_varbinds') as mock_send, \ + patch('asyncio.get_running_loop', side_effect=RuntimeError), \ + patch('asyncio.new_event_loop'), \ + patch('asyncio.set_event_loop'), \ + patch('pysnmp.entity.config.add_target_parameters'), \ + patch('pysnmp.entity.config.add_target_address'), \ + patch('pysnmp.entity.config.add_transport'): + + server = SNMPServer( + host="localhost", + user="testuser", + plug=1 + ) + + def side_effect(*args): + callback = args[-1] + callback(None, None, None, None, None, [], None) + mock_send.side_effect = side_effect + + result = server.off() + assert "Power OFF command sent successfully" in result + mock_send.assert_called_once()