Skip to content

Commit f6a6645

Browse files
committed
Add Yepkit YKUSH Usb driver
This USB Hub enables power control of the ports, making it useful to control the power of different targets as long as those targets can be USB powered.
1 parent 4f3f59a commit f6a6645

File tree

9 files changed

+367
-21
lines changed

9 files changed

+367
-21
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Yepkit
2+
3+
Drivers for yepkit products.
4+
5+
## Ykush driver
6+
7+
This driver provides a client for the [Ykush USB switch](https://www.yepkit.com/products/ykush).
8+
**driver**: `jumpstarter_driver_yepkit.driver.Ykush`
9+
10+
### Driver configuration
11+
```yaml
12+
export:
13+
power:
14+
type: jumpstarter_driver_yepkit.driver.Ykush
15+
config:
16+
serial: "YK25838"
17+
port: "1"
18+
19+
power2:
20+
type: jumpstarter_driver_yepkit.driver.Ykush
21+
config:
22+
serial: "YK25838"
23+
port: "2"
24+
25+
```
26+
### Config parameters
27+
28+
| Parameter | Description | Type | Required | Default |
29+
|-----------|-------------|------|----------|---------|
30+
| serial | The serial number of the ykush hub, empty means auto-detection | no | None | |
31+
| port | The port number to be managed, "0", "1", "2", "a" which means all | str | yes | "a" |
32+
33+
34+
### PowerClient API
35+
36+
The yepkit ykush driver provides a `PowerClient` with the following API:
37+
38+
```{eval-rst}
39+
.. autoclass:: jumpstarter_driver_power.client.PowerClient
40+
:members: on, off
41+
```
42+
43+
### Examples
44+
Powering on and off a device
45+
```{testcode}
46+
client.power.on()
47+
time.sleep(1)
48+
client.power.off()
49+
```
50+
51+
### CLI access
52+
```bash
53+
$ sudo ~/.cargo/bin/uv run jmp exporter shell -c ./packages/jumpstarter-driver-yepkit/examples/exporter.yaml
54+
WARNING:Ykush:No serial number provided for ykush, using the first one found: YK25838
55+
INFO:Ykush:Power OFF for Ykush YK25838 on port 1
56+
INFO:Ykush:Power OFF for Ykush YK25838 on port 2
57+
58+
$$ j
59+
Usage: j [OPTIONS] COMMAND [ARGS]...
60+
61+
Generic composite device
62+
63+
Options:
64+
--help Show this message and exit.
65+
66+
Commands:
67+
power Generic power
68+
power2 Generic power
69+
70+
$$ j power on
71+
INFO:Ykush:Power ON for Ykush YK25838 on port 1
72+
73+
$$ exit
74+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Jumpstarter Driver for ...
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apiVersion: jumpstarter.dev/v1alpha1
2+
kind: ExporterConfig
3+
endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082
4+
token: "<token>"
5+
export:
6+
power:
7+
type: jumpstarter_driver_yepkit.driver.Ykush
8+
config:
9+
port: "1"
10+
11+
power2:
12+
type: jumpstarter_driver_yepkit.driver.Ykush
13+
config:
14+
serial: "YK25838"
15+
port: "2"
16+
17+
power3:
18+
type: jumpstarter_driver_yepkit.driver.Ykush
19+
config:
20+
port: "3"
21+
22+
all:
23+
type: jumpstarter_driver_yepkit.driver.Ykush
24+
config:
25+
port: "a"
26+

packages/jumpstarter-driver-yepkit/jumpstarter_driver_yepkit/__init__.py

Whitespace-only changes.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import pytest
2+
import usb
3+
4+
5+
def pytest_runtest_call(item):
6+
try:
7+
item.runtest()
8+
except FileNotFoundError:
9+
pytest.skip("dutlink not available")
10+
except usb.core.USBError:
11+
pytest.skip("USB not available, could need root permissions")
12+
except usb.core.NoBackendError:
13+
pytest.skip("No USB backend")
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import threading
2+
from collections.abc import AsyncGenerator
3+
from dataclasses import dataclass, field
4+
5+
import usb.core
6+
import usb.util
7+
from jumpstarter_driver_power.driver import PowerReading
8+
9+
from jumpstarter.driver import Driver, export
10+
11+
VID = 0x04D8
12+
PID = 0xF2F7
13+
14+
PORT_UP_COMMANDS = {
15+
'1': 0x11,
16+
'2': 0x12,
17+
'3': 0x13,
18+
'a': 0x1A
19+
}
20+
21+
PORT_DOWN_COMMANDS = {
22+
'1': 0x01,
23+
'2': 0x02,
24+
'3': 0x03,
25+
'a': 0x0A
26+
}
27+
28+
PORT_STATUS_COMMANDS = {
29+
'1': 0x21,
30+
'2': 0x22,
31+
'3': 0x23
32+
}
33+
34+
VALID_DEFAULTS = ["on", "off", "keep"]
35+
36+
# static shared array of usb devices, interfaces on same device cannot be claimed multiple times
37+
_USB_DEVS = {}
38+
_USB_DEVS_LOCK = threading.Lock() # Lock for synchronizing access, we don't do multithread, but just in case..
39+
40+
@dataclass(kw_only=True)
41+
class Ykush(Driver):
42+
""" driver for Yepkit Ykush USB Hub with Power control """
43+
serial: str | None = field(default=None)
44+
default: str = "off"
45+
port: str = "a"
46+
47+
dev: usb.core.Device = field(init=False)
48+
49+
@classmethod
50+
def client(cls) -> str:
51+
# we use the standard power interface, even though we cannot provide power readings
52+
return "jumpstarter_driver_power.client.PowerClient"
53+
54+
def __post_init__(self):
55+
if hasattr(super(), "__post_init__"):
56+
super().__post_init__()
57+
58+
keys = PORT_UP_COMMANDS.keys()
59+
if self.port not in keys:
60+
raise ValueError(
61+
f"The ykush driver port must be any of the following values: {keys}")
62+
63+
if self.default not in VALID_DEFAULTS:
64+
raise ValueError(
65+
f"The ykush driver default must be any of the following values: {VALID_DEFAULTS}")
66+
67+
with _USB_DEVS_LOCK:
68+
# another instance already claimed this device?
69+
if self.serial is None and len(_USB_DEVS.keys()) > 0:
70+
self.serial = list(_USB_DEVS.keys())[0]
71+
self.dev = _USB_DEVS[self.serial]
72+
return
73+
74+
if self.serial in _USB_DEVS:
75+
self.dev = _USB_DEVS[self.serial]
76+
return
77+
78+
for dev in usb.core.find(idVendor=VID, idProduct=PID, find_all=True):
79+
serial = usb.util.get_string(dev, dev.iSerialNumber, 0)
80+
if serial == self.serial or self.serial is None:
81+
_USB_DEVS[serial] = dev
82+
if self.serial is None:
83+
self.logger.warning(
84+
f"No serial number provided for ykush, using the first one found: {serial}")
85+
self.serial = serial
86+
self.dev = dev
87+
return
88+
89+
raise FileNotFoundError("failed to find ykush device")
90+
91+
def _send_cmd(self, cmd, report_size=64):
92+
out_ep, in_ep = self._get_endpoints(self.dev)
93+
out_buf = [0x00] * report_size
94+
out_buf[0] = cmd # YKUSH command
95+
96+
# Write to the OUT endpoint
97+
out_ep.write(out_buf)
98+
99+
# Read from the IN endpoint
100+
in_buf = in_ep.read(report_size, timeout=2000)
101+
return list(in_buf)
102+
103+
def _get_endpoints(self, dev):
104+
"""
105+
From the active configuration, find the first IN and OUT endpoints.
106+
"""
107+
cfg = self.dev.get_active_configuration()
108+
interface = cfg[(0, 0)]
109+
110+
out_endpoint = usb.util.find_descriptor(
111+
interface,
112+
custom_match=lambda e: \
113+
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
114+
)
115+
116+
in_endpoint = usb.util.find_descriptor(
117+
interface,
118+
custom_match=lambda e: \
119+
usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
120+
)
121+
122+
if not out_endpoint or not in_endpoint:
123+
raise RuntimeError("Could not find both IN and OUT endpoints for ykush.")
124+
125+
return out_endpoint, in_endpoint
126+
127+
# reset function is called by the exporter to setup the default state
128+
def reset(self):
129+
if self.default == "on":
130+
self.on()
131+
elif self.default == "off":
132+
self.off()
133+
134+
@export
135+
def on(self):
136+
self.logger.info(f"Power ON for Ykush {self.serial} on port {self.port}")
137+
cmd = PORT_UP_COMMANDS.get(self.port)
138+
_ = self._send_cmd(cmd)
139+
return
140+
141+
@export
142+
def off(self):
143+
self.logger.info(f"Power OFF for Ykush {self.serial} on port {self.port}")
144+
cmd = PORT_DOWN_COMMANDS.get(self.port)
145+
_ = self._send_cmd(cmd)
146+
return
147+
148+
@export
149+
def read(self) -> AsyncGenerator[PowerReading, None]:
150+
raise NotImplementedError
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .driver import Ykush
2+
from jumpstarter.common.utils import serve
3+
4+
5+
def test_drivers_yepkit():
6+
instance = Ykush()
7+
8+
with serve(instance) as client:
9+
client.on()
10+
client.off()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
[project]
2+
name = "jumpstarter-driver-yepkit"
3+
version = "0.1.0"
4+
description = "Add your description here"
5+
readme = "README.md"
6+
authors = [
7+
{ name = "Miguel Angel Ajo", email = "[email protected]" }
8+
]
9+
requires-python = ">=3.11"
10+
dependencies = [
11+
"anyio>=4.6.2.post1",
12+
"pyusb>=1.2.1",
13+
"jumpstarter_driver_power",
14+
"jumpstarter",
15+
]
16+
17+
[tool.hatch.version]
18+
source = "vcs"
19+
raw-options = { 'root' = '../../'}
20+
21+
[tool.hatch.metadata.hooks.vcs.urls]
22+
Homepage = "https://jumpstarter.dev"
23+
source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip"
24+
25+
[tool.pytest.ini_options]
26+
addopts = "--cov --cov-report=html --cov-report=xml"
27+
log_cli = true
28+
log_cli_level = "INFO"
29+
testpaths = ["jumpstarter_driver_yepkit"]
30+
asyncio_mode = "auto"
31+
32+
[build-system]
33+
requires = ["hatchling", "hatch-vcs"]
34+
build-backend = "hatchling.build"
35+
36+
[dependency-groups]
37+
dev = [
38+
"pytest-cov>=6.0.0",
39+
"pytest>=8.3.3",
40+
]

0 commit comments

Comments
 (0)