1
- import asyncio
2
- import contextlib
3
- import time
4
-
5
- import numpy as np
6
- from bluesky .protocols import (
7
- Flyable ,
8
- Preparable ,
9
- Stoppable ,
10
- )
11
1
from ophyd_async .core import (
12
- AsyncStatus ,
13
- FlyMotorInfo ,
14
2
StandardReadable ,
15
- WatchableAsyncStatus ,
16
- WatcherUpdate ,
17
- observe_value ,
18
- soft_signal_r_and_setter ,
19
3
soft_signal_rw ,
20
4
)
21
- from ophyd_async .core import StandardReadableFormat as Format
22
- from ophyd_async .epics .motor import MotorLimitsException
5
+ from ophyd_async .sim import SimMotor
23
6
24
7
25
- class SimRealMotor (
26
- StandardReadable ,
27
- Stoppable ,
28
- Flyable ,
29
- Preparable ,
30
- ):
8
+ class SimMotorExtra (SimMotor ):
31
9
def __init__ (self , name = "" , instant = True ) -> None :
32
10
"""
33
- A simulated motor that mimics the behavior of a real motor. Unlike
34
- ophyd_async.sim.SimMotor, this class takes FlyMotorInfo rather than
35
- FlySimMotorInfo. This makes it better for testing plans, as it more
36
- closely represents a real motor's configuration.
37
- This only required for cases where plan supply motor speed.
38
-
39
- Simulated motor device
40
- args:
41
- - prefix: str: Signal names prefix
42
- - name: str: name of device
43
- - instant: bool: whether to move instantly, or with a delay
11
+ SimMotor with extra signal
44
12
"""
45
13
# Define some signals
46
- with self .add_children_as_readables (Format .HINTED_SIGNAL ):
47
- self .user_readback , self ._user_readback_set = soft_signal_r_and_setter (
48
- float , 0
49
- )
50
- with self .add_children_as_readables (Format .CONFIG_SIGNAL ):
51
- self .velocity = soft_signal_rw (float , 0 if instant else 88.88 )
52
- self .units = soft_signal_rw (str , "mm" )
53
- self .user_setpoint = soft_signal_rw (float , 0 )
14
+
54
15
self .max_velocity = soft_signal_rw (float , 100 )
55
- self .acceleration_time = soft_signal_rw (float , 0.0 )
56
- self .precision = soft_signal_rw (int , 3 )
57
- self .deadband = soft_signal_rw (float , 0.05 )
58
- self .motor_done_move = soft_signal_rw (int , 1 )
59
16
self .low_limit_travel = soft_signal_rw (float , - 100 )
60
17
self .high_limit_travel = soft_signal_rw (float , 100 )
61
- # Whether set() should complete successfully or not
62
- self ._set_success = True
63
- self ._move_status : AsyncStatus | None = None
64
- super ().__init__ (name = name )
65
-
66
- async def _move (self , old_position : float , new_position : float , move_time : float ):
67
- start = time .monotonic ()
68
- # Make an array of relative update times at 10Hz intervals
69
- update_times = np .arange (0.1 , move_time , 0.1 )
70
- # With the end position appended
71
- update_times = np .concatenate ((update_times , [move_time ]))
72
- # Interpolate the [old, new] position array with those update times
73
- new_positions = np .interp (
74
- update_times , [0 , move_time ], [old_position , new_position ]
75
- )
76
- for update_time , new_position in zip (update_times , new_positions , strict = True ): # type: ignore
77
- # Calculate how long to wait to get there
78
- relative_time = time .monotonic () - start
79
- await asyncio .sleep (update_time - relative_time )
80
- # Update the readback position
81
- self ._user_readback_set (new_position )
82
18
83
- async def stop (self , success = True ):
84
- """
85
- Stop the motor if it is moving
86
- """
87
- self ._set_success = success
88
- if self ._move_status :
89
- self ._move_status .task .cancel ()
90
- self ._move_status = None
91
- await self .user_setpoint .set (await self .user_readback .get_value ())
92
-
93
- @AsyncStatus .wrap
94
- async def prepare (self , value : FlyMotorInfo ):
95
- """Calculate required velocity and run-up distance, then if motor limits aren't
96
- breached, move to start position minus run-up distance"""
97
- self ._fly_timeout = value .timeout
98
- # Velocity, at which motor travels from start_position to end_position, in motor
99
- # egu/s.
100
- fly_velocity = await self ._prepare_velocity (
101
- value .start_position ,
102
- value .end_position ,
103
- value .time_for_move ,
104
- )
105
- # start_position with run_up_distance added on.
106
- fly_prepared_position = await self ._prepare_motor_path (
107
- abs (fly_velocity ), value .start_position , value .end_position
108
- )
109
- await self .set (fly_prepared_position )
110
-
111
- @AsyncStatus .wrap
112
- async def kickoff (self ):
113
- """Begin moving motor from prepared position to final position."""
114
- assert self ._fly_completed_position , (
115
- "Motor must be prepared before attempting to kickoff"
116
- )
117
- self ._fly_status = self .set (self ._fly_completed_position )
118
-
119
- def complete (self ) -> WatchableAsyncStatus :
120
- """Mark as complete once motor reaches completed position."""
121
- assert self ._fly_status , "kickoff not called"
122
- return self ._fly_status
123
-
124
- async def _prepare_velocity (
125
- self , start_position : float , end_position : float , time_for_move : float
126
- ) -> float :
127
- fly_velocity = (start_position - end_position ) / time_for_move
128
- max_speed , egu = await asyncio .gather (
129
- self .max_velocity .get_value (), self .units .get_value ()
130
- )
131
- if abs (fly_velocity ) > max_speed :
132
- raise MotorLimitsException (
133
- f"Motor speed of { abs (fly_velocity )} { egu } /s was requested for a motor "
134
- f" with max speed of { max_speed } { egu } /s"
135
- )
136
- await self .velocity .set (abs (fly_velocity ))
137
- return fly_velocity
138
-
139
- async def _prepare_motor_path (
140
- self , fly_velocity : float , start_position : float , end_position : float
141
- ) -> float :
142
- # Distance required for motor to accelerate from stationary to fly_velocity, and
143
- # distance required for motor to decelerate from fly_velocity to stationary
144
- run_up_distance = (
145
- (await self .acceleration_time .get_value ()) * fly_velocity * 0.5
146
- )
147
- self ._fly_completed_position = end_position + run_up_distance
148
- # Prepared position not used after prepare, so no need to store in self
149
- fly_prepared_position = start_position - run_up_distance
150
- motor_lower_limit , motor_upper_limit , egu = await asyncio .gather (
151
- self .low_limit_travel .get_value (),
152
- self .high_limit_travel .get_value (),
153
- self .units .get_value (),
154
- )
155
- if (
156
- not motor_upper_limit >= fly_prepared_position >= motor_lower_limit
157
- or not motor_upper_limit
158
- >= self ._fly_completed_position
159
- >= motor_lower_limit
160
- ):
161
- raise MotorLimitsException (
162
- f"Motor trajectory for requested fly is from "
163
- f"{ fly_prepared_position } { egu } to "
164
- f"{ self ._fly_completed_position } { egu } but motor limits are "
165
- f"{ motor_lower_limit } { egu } <= x <= { motor_upper_limit } { egu } "
166
- )
167
- return fly_prepared_position
168
-
169
- @WatchableAsyncStatus .wrap
170
- async def set (self , value : float ):
171
- """
172
- Asynchronously move the motor to a new position.
173
- """
174
- # Make sure any existing move tasks are stopped
175
- await self .stop ()
176
- old_position , units , velocity = await asyncio .gather (
177
- self .user_setpoint .get_value (),
178
- self .units .get_value (),
179
- self .velocity .get_value (),
180
- )
181
- # If zero velocity, do instant move
182
- move_time = abs (value - old_position ) / velocity if velocity else 0
183
- self ._move_status = AsyncStatus (self ._move (old_position , value , move_time ))
184
- # If stop is called then this will raise a CancelledError, ignore it
185
- with contextlib .suppress (asyncio .CancelledError ):
186
- async for current_position in observe_value (
187
- self .user_readback , done_status = self ._move_status
188
- ):
189
- yield WatcherUpdate (
190
- current = current_position ,
191
- initial = old_position ,
192
- target = value ,
193
- name = self .name ,
194
- unit = units ,
195
- )
196
- if not self ._set_success :
197
- raise RuntimeError ("Motor was stopped" )
19
+ super ().__init__ (name = name , instant = instant )
198
20
199
21
200
22
class SimStage (StandardReadable ):
@@ -203,8 +25,8 @@ class SimStage(StandardReadable):
203
25
def __init__ (self , name = "" , instant = True ) -> None :
204
26
# Define some child Devices
205
27
with self .add_children_as_readables ():
206
- self .x = SimRealMotor (instant = instant )
207
- self .y = SimRealMotor (instant = instant )
208
- self .z = SimRealMotor (instant = instant )
28
+ self .x = SimMotorExtra (instant = instant )
29
+ self .y = SimMotorExtra (instant = instant )
30
+ self .z = SimMotorExtra (instant = instant )
209
31
# Set name of device and child devices
210
32
super ().__init__ (name = name )
0 commit comments