-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathstepper_control_thread.py
125 lines (109 loc) · 3.6 KB
/
stepper_control_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import time
from multiprocessing import Process, Queue
import RPi.GPIO as GPIO
class StepperThread:
'''
Thread that controls a stepper motor.
'''
def __init__(self, step_pin, direction_pin, enable_pin, sleep_time=0.0005, continuous=True, steps_per_rev=1600, max_step=400, min_step=0):
GPIO.setmode(GPIO.BOARD)
self.direction_pin = direction_pin
self.step_pin = step_pin
self.sleep_time = sleep_time
self.enable_pin = enable_pin
self.kill = False
self.direction = -1 # this means the steppers are stopped
self.last_step = 0
self.process = None
self.dir_queue = None
self.step_queue = None
self.continuous = continuous
# need to finish implementing software
# endstops. Should be able to detect if the stepper is at the end
self.steps_per_rev = steps_per_rev
self.max_step = max_step
self.min_step = min_step
self.current_step = 0
def start(self):
'''
Sets up the GPIO pins and starts the thread.
'''
self.setup()
self.dir_queue = Queue()
self.step_queue = Queue()
self.process = Process(target=self.run, args=())
self.process.start()
return self
def setup(self):
'''
Sets up the GPIO pins.
'''
GPIO.setup(self.step_pin, GPIO.OUT)
GPIO.setup(self.direction_pin, GPIO.OUT)
GPIO.setup(self.enable_pin, GPIO.OUT)
GPIO.output(self.enable_pin, GPIO.HIGH)
# direction is 0 for forward, 1 for backwards, -1 for stopped
def set_direction(self, direction):
'''
Sets the direction of the stepper motor.
'''
if self.direction != direction:
self.direction = direction
self.dir_queue.put(direction)
def stop(self):
'''
Stops the stepper motor.
'''
self.set_direction(-1)
def cleanup(self):
'''
Stops the thread gracefully.
'''
self.kill = True
self.process.join()
def close(self):
'''
Stops the thread gracefully.
'''
self.cleanup()
def step(self, direction, steps=200):
'''
Steps the stepper motor.
'''
if not self.continuous:
self.set_direction(direction)
self.step_queue.put(steps)
def run(self):
'''
The main loop of the thread.
'''
last_direction = -1
while not self.kill:
if self.dir_queue.empty():
direction = last_direction
else:
direction = self.dir_queue.get()
if direction != -1 and direction != last_direction:
GPIO.output(self.direction_pin, direction)
last_direction = direction
if self.continuous:
if direction != -1:
GPIO.output(self.step_pin, GPIO.HIGH)
time.sleep(self.sleep_time)
GPIO.output(self.step_pin, GPIO.LOW)
time.sleep(self.sleep_time)
else:
if self.step_queue.empty():
steps = 0
else:
steps = self.step_queue.get()
for _ in range(steps):
GPIO.output(self.step_pin, GPIO.HIGH)
time.sleep(self.sleep_time)
GPIO.output(self.step_pin, GPIO.LOW)
time.sleep(self.sleep_time)
GPIO.output(self.enable_pin, GPIO.LOW)
try:
GPIO.cleanup()
except:
pass