Skip to content

Commit be1e70b

Browse files
committed
Add Yepkit YKUSH Usb driver
This USB Hub enables power control of the ports, making it useful to control the power of different targets as long as those targets can be USB powered.
1 parent b5ad370 commit be1e70b

File tree

9 files changed

+375
-21
lines changed

9 files changed

+375
-21
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Yepkit
2+
3+
Drivers for yepkit products.
4+
5+
## Ykush driver
6+
7+
This driver provides a client for the [Ykush USB switch](https://www.yepkit.com/products/ykush).
8+
**driver**: `jumpstarter_driver_yepkit.driver.Ykush`
9+
10+
### Driver configuration
11+
```yaml
12+
export:
13+
power:
14+
type: jumpstarter_driver_yepkit.driver.Ykush
15+
config:
16+
serial: "YK25838"
17+
port: "1"
18+
19+
power2:
20+
type: jumpstarter_driver_yepkit.driver.Ykush
21+
config:
22+
serial: "YK25838"
23+
port: "2"
24+
25+
```
26+
### Config parameters
27+
28+
| Parameter | Description | Type | Required | Default |
29+
|-----------|-------------|------|----------|---------|
30+
| serial | The serial number of the ykush hub, empty means auto-detection | no | None | |
31+
| port | The port number to be managed, "0", "1", "2", "a" which means all | str | yes | "a" |
32+
33+
34+
### PowerClient API
35+
36+
The yepkit ykush driver provides a `PowerClient` with the following API:
37+
38+
```{eval-rst}
39+
.. autoclass:: jumpstarter_driver_power.client.PowerClient
40+
:members: on, off
41+
```
42+
43+
### Examples
44+
Powering on and off a device
45+
```{testcode}
46+
client.power.on()
47+
time.sleep(1)
48+
client.power.off()
49+
```
50+
51+
### CLI access
52+
```bash
53+
$ sudo ~/.cargo/bin/uv run jmp exporter shell -c ./packages/jumpstarter-driver-yepkit/examples/exporter.yaml
54+
WARNING:Ykush:No serial number provided for ykush, using the first one found: YK25838
55+
INFO:Ykush:Power OFF for Ykush YK25838 on port 1
56+
INFO:Ykush:Power OFF for Ykush YK25838 on port 2
57+
58+
$$ j
59+
Usage: j [OPTIONS] COMMAND [ARGS]...
60+
61+
Generic composite device
62+
63+
Options:
64+
--help Show this message and exit.
65+
66+
Commands:
67+
power Generic power
68+
power2 Generic power
69+
70+
$$ j power on
71+
INFO:Ykush:Power ON for Ykush YK25838 on port 1
72+
73+
$$ exit
74+
```
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Jumpstarter Driver for the ykush USB Hub from Yepkit
2+
3+
This driver is for the ykush USB Hub from Yepkit. It allows you to control the power of each port of the hub.
4+
5+
If you want to test this locally, you can use the following commands from the root of the repository:
6+
7+
```bash
8+
sudo $(which uv) run jmp exporter shell --config ./packages/jumpstarter-driver-yepkit/examples/exporter.yaml
9+
```
10+
11+
Please note that sudo is necessary to gain access to the raw USB interfaces.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
apiVersion: jumpstarter.dev/v1alpha1
2+
kind: ExporterConfig
3+
metadata:
4+
name: exporter
5+
namespace: default
6+
endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082
7+
token: "<token>"
8+
export:
9+
power:
10+
type: jumpstarter_driver_yepkit.driver.Ykush
11+
config:
12+
port: "1"
13+
14+
power2:
15+
type: jumpstarter_driver_yepkit.driver.Ykush
16+
config:
17+
serial: "YK25838"
18+
port: "2"
19+
20+
power3:
21+
type: jumpstarter_driver_yepkit.driver.Ykush
22+
config:
23+
port: "3"
24+
25+
all:
26+
type: jumpstarter_driver_yepkit.driver.Ykush
27+
config:
28+
port: "all"
29+

packages/jumpstarter-driver-yepkit/jumpstarter_driver_yepkit/__init__.py

Whitespace-only changes.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import pytest
2+
import usb
3+
4+
5+
def pytest_runtest_call(item):
6+
try:
7+
item.runtest()
8+
except FileNotFoundError:
9+
pytest.skip("yepkit not available")
10+
except usb.core.USBError:
11+
pytest.skip("USB not available, could need root permissions")
12+
except usb.core.NoBackendError:
13+
pytest.skip("No USB backend")
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import threading
2+
from collections.abc import AsyncGenerator
3+
from dataclasses import dataclass, field
4+
5+
import usb.core
6+
import usb.util
7+
from jumpstarter_driver_power.driver import PowerReading, PowerInterface
8+
9+
from jumpstarter.driver import Driver, export
10+
11+
VID = 0x04D8
12+
PID = 0xF2F7
13+
14+
PORT_UP_COMMANDS = {
15+
'1': 0x11,
16+
'2': 0x12,
17+
'3': 0x13,
18+
'all': 0x1A
19+
}
20+
21+
PORT_DOWN_COMMANDS = {
22+
'1': 0x01,
23+
'2': 0x02,
24+
'3': 0x03,
25+
'all': 0x0A
26+
}
27+
28+
PORT_STATUS_COMMANDS = {
29+
'1': 0x21,
30+
'2': 0x22,
31+
'3': 0x23
32+
}
33+
34+
VALID_DEFAULTS = ["on", "off", "keep"]
35+
36+
# static shared array of usb devices, interfaces on same device cannot be claimed multiple times
37+
_USB_DEVS = {}
38+
_USB_DEVS_LOCK = threading.Lock() # Lock for synchronizing access, we don't do multithread, but just in case..
39+
40+
@dataclass(kw_only=True)
41+
class Ykush(PowerInterface, Driver):
42+
""" driver for Yepkit Ykush USB Hub with Power control """
43+
serial: str | None = field(default=None)
44+
default: str = "off"
45+
port: str = "all"
46+
47+
dev: usb.core.Device = field(init=False)
48+
49+
def __post_init__(self):
50+
if hasattr(super(), "__post_init__"):
51+
super().__post_init__()
52+
53+
keys = PORT_UP_COMMANDS.keys()
54+
if self.port not in keys:
55+
raise ValueError(
56+
f"The ykush driver port must be any of the following values: {keys}")
57+
58+
if self.default not in VALID_DEFAULTS:
59+
raise ValueError(
60+
f"The ykush driver default must be any of the following values: {VALID_DEFAULTS}")
61+
62+
with _USB_DEVS_LOCK:
63+
# another instance already claimed this device?
64+
if self.serial is None and len(_USB_DEVS.keys()) > 0:
65+
self.serial = list(_USB_DEVS.keys())[0]
66+
self.dev = _USB_DEVS[self.serial]
67+
return
68+
69+
if self.serial in _USB_DEVS:
70+
self.dev = _USB_DEVS[self.serial]
71+
return
72+
73+
for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True):
74+
serial = usb.util.get_string(dev, dev.iSerialNumber, 0)
75+
if serial == self.serial or self.serial is None:
76+
_USB_DEVS[serial] = dev
77+
if self.serial is None:
78+
self.logger.warning(
79+
f"No serial number provided for ykush, using the first one found: {serial}")
80+
self.serial = serial
81+
self.dev = dev
82+
return
83+
84+
raise FileNotFoundError("failed to find ykush device")
85+
86+
def _send_cmd(self, cmd, report_size=64):
87+
out_ep, in_ep = self._get_endpoints(self.dev)
88+
out_buf = [0x00] * report_size
89+
out_buf[0] = cmd # YKUSH command
90+
91+
# Write to the OUT endpoint
92+
out_ep.write(out_buf)
93+
94+
# Read from the IN endpoint
95+
in_buf = in_ep.read(report_size, timeout=2000)
96+
return list(in_buf)
97+
98+
def _get_endpoints(self, dev):
99+
"""
100+
From the active configuration, find the first IN and OUT endpoints.
101+
"""
102+
cfg = self.dev.get_active_configuration()
103+
interface = cfg[(0, 0)]
104+
105+
out_endpoint = usb.util.find_descriptor(
106+
interface,
107+
custom_match=lambda e: \
108+
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
109+
)
110+
111+
in_endpoint = usb.util.find_descriptor(
112+
interface,
113+
custom_match=lambda e: \
114+
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
115+
)
116+
117+
if not out_endpoint or not in_endpoint:
118+
raise RuntimeError("Could not find both IN and OUT endpoints for ykush.")
119+
120+
return out_endpoint, in_endpoint
121+
122+
# reset function is called by the exporter to setup the default state
123+
def reset(self):
124+
if self.default == "on":
125+
self.on()
126+
elif self.default == "off":
127+
self.off()
128+
129+
@export
130+
def on(self):
131+
self.logger.info(f"Power ON for Ykush {self.serial} on port {self.port}")
132+
cmd = PORT_UP_COMMANDS.get(self.port)
133+
_ = self._send_cmd(cmd)
134+
return
135+
136+
@export
137+
def off(self):
138+
self.logger.info(f"Power OFF for Ykush {self.serial} on port {self.port}")
139+
cmd = PORT_DOWN_COMMANDS.get(self.port)
140+
_ = self._send_cmd(cmd)
141+
return
142+
143+
@export
144+
def read(self) -> AsyncGenerator[PowerReading, None]:
145+
raise NotImplementedError
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .driver import Ykush
2+
from jumpstarter.common.utils import serve
3+
4+
5+
def test_drivers_yepkit():
6+
instance = Ykush()
7+
8+
with serve(instance) as client:
9+
client.on()
10+
client.off()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
[project]
2+
name = "jumpstarter-driver-yepkit"
3+
version = "0.1.0"
4+
description = "Add your description here"
5+
readme = "README.md"
6+
authors = [
7+
{ name = "Miguel Angel Ajo", email = "[email protected]" }
8+
]
9+
requires-python = ">=3.11"
10+
dependencies = [
11+
"anyio>=4.6.2.post1",
12+
"pyusb>=1.2.1",
13+
"jumpstarter_driver_power",
14+
"jumpstarter",
15+
]
16+
17+
[tool.hatch.version]
18+
source = "vcs"
19+
raw-options = { 'root' = '../../'}
20+
21+
[tool.hatch.metadata.hooks.vcs.urls]
22+
Homepage = "https://jumpstarter.dev"
23+
source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip"
24+
25+
[tool.pytest.ini_options]
26+
addopts = "--cov --cov-report=html --cov-report=xml"
27+
log_cli = true
28+
log_cli_level = "INFO"
29+
testpaths = ["jumpstarter_driver_yepkit"]
30+
asyncio_mode = "auto"
31+
32+
[build-system]
33+
requires = ["hatchling", "hatch-vcs"]
34+
build-backend = "hatchling.build"
35+
36+
[dependency-groups]
37+
dev = [
38+
"pytest-cov>=6.0.0",
39+
"pytest>=8.3.3",
40+
]

0 commit comments

Comments
 (0)