-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathserver.py
84 lines (71 loc) · 2.26 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# this is the websocket server to control
# the turret remotely
import asyncio
import logging
import os
import signal
import sys
import websockets
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
try:
from stepper_control_thread import StepperThread
except ModuleNotFoundError:
from dummy_stepper_control_thread import StepperThread
except RuntimeError:
from dummy_stepper_control_thread import StepperThread
SLEEP_TIME = 0.0005
STEP_X_PIN = 12
DIR_X_PIN = 16
DIR_Y_PIN = 40
STEP_Y_PIN = 38
ENABLE_PIN = 8
# recv format
# x1
# y1
# s1
# x2
x_stepper = StepperThread(
step_pin=STEP_X_PIN, direction_pin=DIR_X_PIN, enable_pin=ENABLE_PIN, sleep_time=SLEEP_TIME).start()
y_stepper = StepperThread(step_pin=STEP_Y_PIN,
direction_pin=DIR_Y_PIN, enable_pin=ENABLE_PIN, sleep_time=SLEEP_TIME).start()
def thread_cleanup(_):
x_stepper.cleanup()
y_stepper.cleanup()
sys.exit(0)
async def handler(websocket):
async for message in websocket:
if type(message) == bytes:
message = message.decode("utf-8")
axis = message[0]
try:
direction = int(message[1]) - 1
except ValueError:
await websocket.send("Invalid message")
continue
if axis == "x":
x_stepper.set_direction(direction)
elif axis == "y":
y_stepper.set_direction(direction)
elif axis == "s":
if direction == 1 or direction == 0:
print("shoot")
else:
print("no shoot")
print(message)
async def main():
# Set the stop condition when receiving SIGTERM.
loop = asyncio.get_running_loop()
stop = loop.create_future()
# these don't seem to work
# loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
# loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
# these do, so I added sys.exit to the cleanup
loop.add_signal_handler(signal.SIGTERM, thread_cleanup, None)
loop.add_signal_handler(signal.SIGINT, thread_cleanup, None)
port = int(os.environ.get("PORT", "8001"))
async with websockets.serve(handler, "", port):
await stop
if __name__ == "__main__":
asyncio.run(main())