Skip to content

Commit

Permalink
Merge branch 'main' into opendal-comments
Browse files Browse the repository at this point in the history
  • Loading branch information
NickCao authored Feb 3, 2025
2 parents bd2ffb3 + 1382071 commit 2016b8b
Show file tree
Hide file tree
Showing 21 changed files with 237 additions and 150 deletions.
3 changes: 2 additions & 1 deletion __templates__/driver/jumpstarter_driver/driver.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ class ${DRIVER_CLASS}(Driver):
some_other_config: int = 69

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()
# some initialization here.

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ class CanClient(DriverClient, can.BusABC):
"""

def __post_init__(self):
if hasattr(super(), "__post_init__"):
super().__post_init__()

self._periodic_tasks: List[_SelfRemovingCyclicTask] = []
self._filters = None
self._is_shutdown: bool = False

super().__post_init__()

@property
@validate_call(validate_return=True)
def state(self) -> can.BusState:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def client(cls) -> str:
return "jumpstarter_driver_can.client.CanClient"

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()

self.bus = can.Bus(channel=self.channel, interface=self.interface)

@export
Expand Down Expand Up @@ -195,7 +197,9 @@ def client(cls) -> str:
return "jumpstarter_driver_can.client.IsoTpClient"

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()

self.bus = can.Bus(channel=self.channel, interface=self.interface)
self.notifier = can.Notifier(self.bus, [])
self.stack = isotp.NotifierBasedCanStack(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest
import usb


def pytest_runtest_call(item):
try:
item.runtest()
except FileNotFoundError:
pytest.skip("dutlink not available")
except usb.core.USBError:
pytest.skip("USB not available")
except usb.core.NoBackendError:
pytest.skip("No USB backend")
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class DutlinkConfig:
tty: str | None = field(init=False, default=None)

def __post_init__(self):
if hasattr(super(), "__post_init__"):
super().__post_init__()

for dev in usb.core.find(idVendor=0x2B23, idProduct=0x1012, find_all=True):
serial = usb.util.get_string(dev, dev.iSerialNumber)
if serial == self.serial or self.serial is None:
Expand Down Expand Up @@ -80,15 +83,17 @@ def control(self, direction, ty, actions, action, value):


@dataclass(kw_only=True)
class DutlinkSerial(DutlinkConfig, PySerial):
url: str | None = field(init=False, default=None)

class DutlinkSerialConfig(DutlinkConfig, Driver):
def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()

self.url = self.tty

super(PySerial, self).__post_init__()

@dataclass(kw_only=True)
class DutlinkSerial(PySerial, DutlinkSerialConfig):
url: str | None = field(init=False, default=None)


@dataclass(kw_only=True)
Expand Down Expand Up @@ -247,7 +252,8 @@ class Dutlink(DutlinkConfig, CompositeInterface, Driver):
"""

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()

self.children["power"] = DutlinkPower(serial=self.serial, timeout_s=self.timeout_s)
self.children["storage"] = DutlinkStorageMux(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,64 @@
import pytest
import usb
from time import sleep

from jumpstarter_driver_network.adapters import PexpectAdapter

from jumpstarter_driver_dutlink.driver import Dutlink
from jumpstarter_driver_dutlink.driver import Dutlink, DutlinkPower, DutlinkSerial, DutlinkStorageMux

from jumpstarter.common.utils import serve

STORAGE_DEVICE = "/dev/null" # MANUAL: replace with path to block device

def test_drivers_dutlink():
try:
instance = Dutlink(
storage_device="/dev/null",
)
except FileNotFoundError:
pytest.skip("dutlink not available")
except usb.core.USBError:
pytest.skip("USB not available")
except usb.core.NoBackendError:
pytest.skip("No USB backend")

def power_test(power):
power.on() # MANUAL: led DUT_ON should be turned on
sleep(1)
assert next(power.read()).current != 0
power.off() # MANUAL: led DUT_ON should be turned off


def storage_test(storage):
storage.write_local_file("/dev/null")


def serial_test(serial):
with PexpectAdapter(client=serial) as expect:
expect.send("\x02" * 5)

expect.send("about\r\n")
expect.expect("Jumpstarter test-harness")

expect.send("console\r\n")
expect.expect("Entering console mode")

expect.send("hello")
expect.expect("hello")


def test_drivers_dutlink_power():
instance = DutlinkPower()

with serve(instance) as client:
with PexpectAdapter(client=client.console) as expect:
expect.send("\x02" * 5)
power_test(client)

expect.send("about\r\n")
expect.expect("Jumpstarter test-harness")

expect.send("console\r\n")
expect.expect("Entering console mode")
def test_drivers_dutlink_storage_mux():
instance = DutlinkStorageMux(storage_device=STORAGE_DEVICE)

client.power.off()
with serve(instance) as client:
storage_test(client)

client.storage.write_local_file("/dev/null")
client.storage.dut()

client.power.on()
def test_drivers_dutlink_serial():
instance = DutlinkSerial() # MANUAL: connect tx to rx

expect.send("\x02" * 5)
expect.expect("Exiting console mode")
with serve(instance) as client:
serial_test(client)

client.power.off()

def test_drivers_dutlink():
instance = Dutlink(storage_device=STORAGE_DEVICE)

with serve(instance) as client:
power_test(client.power)
storage_test(client.storage)
serial_test(client.console)
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class HttpServer(Driver):
runner: Optional[web.AppRunner] = field(init=False, default=None)

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()

os.makedirs(self.root_dir, exist_ok=True)
self.app.router.add_routes(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class PySerial(Driver):
baudrate: int = field(default=115200)

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()

self.device = serial_for_url(self.url, baudrate=self.baudrate)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def client(cls) -> str:
return "jumpstarter_driver_raspberrypi.client.DigitalOutputClient"

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()
# Initialize as InputDevice first
self.device = InputDevice(pin=self.pin)

Expand Down Expand Up @@ -49,7 +50,8 @@ def client(cls) -> str:
return "jumpstarter_driver_raspberrypi.client.DigitalInputClient"

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()
self.device = DigitalInputDevice(pin=self.pin)

@export
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ class SDWire(StorageMuxInterface, Driver):
storage_device: str | None = field(default=None)

def __post_init__(self):
super().__post_init__()
if hasattr(super(), "__post_init__"):
super().__post_init__()

for dev in usb.core.find(idVendor=0x04E8, idProduct=0x6001, find_all=True):
if self.storage_device is None:
context = pyudev.Context()
Expand Down
62 changes: 24 additions & 38 deletions packages/jumpstarter-driver-tftp/examples/tftp_test.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,37 @@
import logging
import time

import pytest
from jumpstarter_driver_tftp.driver import FileNotFound, TftpError
from jumpstarter_testing.pytest import JumpstarterTest

log = logging.getLogger(__name__)


class TestResource(JumpstarterTest):
filter_labels = {"board": "rpi4"}

@pytest.fixture()
def test_tftp_upload(self, client):
def setup_tftp(self, client):
# Move the setup code to a fixture
client.tftp.start()
yield client
client.tftp.stop()

def test_tftp_operations(self, setup_tftp):
client = setup_tftp
test_file = "test.bin"

# Create test file
with open(test_file, "wb") as f:
f.write(b"Hello from TFTP streaming test!")

try:
client.tftp.start()
print("TFTP server started")

time.sleep(1)

test_file = "test.bin"
with open(test_file, "wb") as f:
f.write(b"Hello from TFTP streaming test!")

try:
client.tftp.put_local_file(test_file)
print(f"Successfully uploaded {test_file}")

files = client.tftp.list_files()
print(f"Files in TFTP root: {files}")

if test_file in files:
client.tftp.delete_file(test_file)
print(f"Successfully deleted {test_file}")
else:
print(f"Warning: {test_file} not found in TFTP root")

except TftpError as e:
print(f"TFTP operation failed: {e}")
except FileNotFound as e:
print(f"File not found: {e}")

except Exception as e:
print(f"Error: {e}")
finally:
try:
client.tftp.stop()
print("TFTP server stopped")
except Exception as e:
print(f"Error stopping server: {e}")
# Test upload
client.tftp.put_local_file(test_file)
assert test_file in client.tftp.list_files()

# Test delete
client.tftp.delete_file(test_file)
assert test_file not in client.tftp.list_files()

except (TftpError, FileNotFound) as e:
pytest.fail(f"Test failed: {e}")
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CHUNK_SIZE = 1024 * 1024 * 4 # 4MB
Loading

0 comments on commit 2016b8b

Please sign in to comment.