Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TangoShutter: refactor (changes from https://github.com/mxcube/mxcubecore/pull/847/) #1158

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions mxcubecore/HardwareObjects/TangoShutter.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,35 @@
<command type="tango" name="Open">Open</command>
<command type="tango" name="Close">Close</command>
<channel type="tango" name="State" polling="1000">State</channel>
<values>{"open": "OPEN", "cloded": "CLOSED", "DISABLE" : "DISABLE"}</values>
<values>{"OPEN": "MYOPEN", "NEWSTATE": ["MYSTATE", "BUSY"]}</values>
</object>

In this example the <values> tag contains a json dictionary that maps spectific tango shutter states to the
convantional states defined in the TangoShutter Class. This tag is not necessay in cases where the tango shutter states
are all covered by the TangoShuter class conventional states.
In the example the <values> property contains a dictionary that redefines or
adds specific tango shutter states.
When redefining a known state, only the VALUES Enum will be updated.
When defining a new state (new key), the dictionary value should be a
list. The new state is added to both the VALUES and the SPECIFIC_STATES Enum.
Attention:
- do not use tuples or the python json parser will fail!
- make sure only double quotes are used inside the values dictionary. No single quotes (') are allowed !
- the second element of the list should be a standard HardwareObjectState name
(UNKNOWN, WARNING, BUSY, READY, FAULT, OFF - see in BaseHardwareObjects.py)!
The <values> property is optional.
"""

import json
import logging
from enum import (
Enum,
unique,
)

from tango import DevState

from mxcubecore.BaseHardwareObjects import HardwareObjectState
from mxcubecore.HardwareObjects.abstract.AbstractShutter import AbstractShutter

__copyright__ = """ Copyright © 2023 by the MXCuBE collaboration """
__copyright__ = """ Copyright © by the MXCuBE collaboration """
__license__ = "LGPLv3+"


Expand All @@ -63,20 +74,21 @@ class TangoShutterStates(Enum):
AUTOMATIC = HardwareObjectState.READY, "RUNNING"
UNKNOWN = HardwareObjectState.UNKNOWN, "RUNNING"
FAULT = HardwareObjectState.WARNING, "FAULT"
STANDBY = HardwareObjectState.WARNING, "STANDBY"


class TangoShutter(AbstractShutter):
"""TANGO implementation of AbstractShutter"""

SPECIFIC_STATES = TangoShutterStates

def __init__(self, name):
def __init__(self, name: str):
super().__init__(name)
self.open_cmd = None
self.close_cmd = None
self.state_channel = None

def init(self):
def init(self) -> None:
"""Initilise the predefined values"""
super().init()
self.open_cmd = self.get_command_object("Open")
Expand All @@ -86,26 +98,18 @@ def init(self):
self.state_channel.connect_signal("update", self._update_value)
self.update_state()

try:
self.config_values = json.loads(self.get_property("values"))
except:
self.config_values = None

def _update_value(self, value):
def _update_value(self, value: DevState) -> None:
"""Update the value.
Args:
value(str): The value reported by the state channel.
value: The value reported by the state channel.
"""
if self.config_values:
value = self.config_values[str(value)]
else:
value = str(value)

super().update_value(self.value_to_enum(value))
super().update_value(self.value_to_enum(str(value)))

def _initialise_values(self):
"""Add the tango states to VALUES"""
def _initialise_values(self) -> None:
"""Add specific tango states to VALUES and, if configured
in the xml file, to SPECIFIC_STATES"""
values_dict = {item.name: item.value for item in self.VALUES}
states_dict = {item.name: item.value for item in self.SPECIFIC_STATES}
values_dict.update(
{
"MOVING": "MOVING",
Expand All @@ -114,36 +118,41 @@ def _initialise_values(self):
"FAULT": "FAULT",
}
)
try:
config_values = json.loads(self.get_property("values"))
for key, val in config_values.items():
if isinstance(val, (tuple, list)):
values_dict.update({key: val[1]})
states_dict.update({key: (HardwareObjectState[val[1]], val[0])})
else:
values_dict.update({key: val})
except (ValueError, TypeError):
logging.exception("Exception in _initialise_values()")

self.VALUES = Enum("ValueEnum", values_dict)
self.SPECIFIC_STATES = Enum("TangoShutterStates", states_dict)

def get_state(self):
def get_state(self) -> HardwareObjectState:
"""Get the device state.
Returns:
(enum 'HardwareObjectState'): Device state.
"""
try:
if self.config_values:
_state = self.config_values[str(self.state_channel.get_value())]
else:
_state = str(self.state_channel.get_value())

_state = self.get_value().name
return self.SPECIFIC_STATES[_state].value[0]
except (AttributeError, KeyError):
logging.exception("Exception in get_state()")
return self.STATES.UNKNOWN

return self.SPECIFIC_STATES[_state].value[0]

def get_value(self):
def get_value(self) -> TangoShutterStates:
"""Get the device value
Returns:
(Enum): Enum member, corresponding to the 'VALUE' or UNKNOWN.
"""
if self.config_values:
_val = self.config_values[str(self.state_channel.get_value())]
else:
_val = str(self.state_channel.get_value())
_val = str(self.state_channel.get_value())
return self.value_to_enum(_val)

def _set_value(self, value):
def _set_value(self, value: TangoShutterStates) -> None:
if value.name == "OPEN":
self.open_cmd()
elif value.name == "CLOSED":
Expand Down
3 changes: 3 additions & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7118,3 +7118,6 @@ convention = "google"
"PT022",
"S108",
]
"test/pytest/test_hwo_tango_shutter.py" = [
"N802"
]
123 changes: 123 additions & 0 deletions test/pytest/test_hwo_tango_shutter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Test the HardwareObjects.TangoShutter shutter hardware object."""

from typing import Callable

import gevent
import pytest
from gevent import Timeout
from tango import (
DeviceProxy,
DevState,
)
from tango.server import (
Device,
command,
)
from tango.test_context import DeviceTestContext

from mxcubecore.HardwareObjects import TangoShutter

TANGO_SHUTTER_STATES_MAPPING = """
{"OPEN": "OPEN", "CLOSED": "CLOSE", "MOVING" : "MOVING"}
"""


class Shutter(Device):
"""Very simple Tango shutter device, that only goes
between 'open' and 'close' states."""

def __init__(self, *args, **kwargs):
self._is_open = False
super().__init__(*args, **kwargs)

@command()
def Open(self):
self._is_open = True

@command()
def Close(self):
self._is_open = False

def dev_state(self):
return DevState.OPEN if self._is_open else DevState.CLOSE


@pytest.fixture
def shutter():
tangods_test_context = DeviceTestContext(Shutter, process=True)
tangods_test_context.start()

#
# set up the TangoShutter hardware object
#
hwo_shutter = TangoShutter.TangoShutter("/random_name")
hwo_shutter.tangoname = tangods_test_context.get_device_access()
hwo_shutter.set_property("values", TANGO_SHUTTER_STATES_MAPPING)
hwo_shutter.add_channel(
{
"name": "State",
"type": "tango",
},
"State",
add_now=True,
)
hwo_shutter.add_command(
{
"name": "Open",
"type": "tango",
},
"Open",
add_now=True,
)
hwo_shutter.add_command(
{
"name": "Close",
"type": "tango",
},
"Close",
add_now=True,
)

hwo_shutter.init()

yield hwo_shutter

tangods_test_context.stop()
tangods_test_context.join()


def _wait_until(condition: Callable, condition_desc: str):
with Timeout(1.2, Exception(f"timed out while waiting for {condition_desc}")):
while not condition():
gevent.sleep(0.01)


def test_open(shutter):
"""
test opening the shutter
"""
dev = DeviceProxy(shutter.tangoname)

assert dev.State() == DevState.CLOSE
assert not shutter.is_open

shutter.open()

_wait_until(lambda: shutter.is_open, "shutter to open")
assert dev.State() == DevState.OPEN


def test_close(shutter):
"""
test closing the shutter
"""
dev = DeviceProxy(shutter.tangoname)
dev.Open()

assert dev.State() == DevState.OPEN
assert shutter.is_open

shutter.close()

_wait_until(lambda: not shutter.is_open, "shutter to close")
assert dev.State() == DevState.CLOSE