-
-
Notifications
You must be signed in to change notification settings - Fork 150
/
Copy pathtest_ws.py
155 lines (131 loc) Β· 5.79 KB
/
test_ws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Copyright 2022 Arduino SA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import time
import json
import base64
import pytest
from sys import platform
from common import running_on_ci
message = []
@pytest.mark.skipif(
platform == "darwin",
reason="on macOS the user is prompted to install certificates",
)
def test_ws_connection(socketio):
print('my sid is', socketio.sid)
assert socketio.sid is not None
@pytest.mark.skipif(
platform == "darwin",
reason="on macOS the user is prompted to install certificates",
)
def test_list(socketio, message):
socketio.emit('command', 'list')
time.sleep(.2)
print (message)
assert any("list" in i for i in message)
assert any("Ports" in i for i in message)
# NOTE run the following tests with a board connected to the PC
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message)
# NOTE run the following tests with a board connected to the PC and with the sketch found in tests/testdata/SerialEcho.ino on it be sure to change serial_address in conftest.py
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_serial(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message)
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_emoji_serial(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message)
def general_open_serial(socketio, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message)
# test the closing of the serial port, we are gonna use close_port for the other tests
socketio.emit('command', 'close ' + serial_port)
time.sleep(.2)
print (message)
#check if port has been closed
assert any("\"IsOpen\": false," in i for i in message)
def general_send_serial(socketio, close_port, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message)
# send the string "ciao" using the serial connection
socketio.emit('command', 'send ' + serial_port + ' ciao')
time.sleep(1)
print(message)
# check if the send command has been registered
assert any("send " + serial_port + " ciao" in i for i in message)
#check if message has been sent back by the connected board
output = extract_serial_data(message)
assert "ciao" in output
# the serial connection is closed by close_port() fixture: even if in case of test failure
def general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message)
# send a lot of emoji: they can be messed up
socketio.emit('command', 'send ' + serial_port + ' /"π§π§π§π§π§π§π§π§π§π§/"')
time.sleep(1)
print(message)
# check if the send command has been registered
assert any("send " + serial_port + " /\"π§π§π§π§π§π§π§π§π§π§/\"" in i for i in message)
output = extract_serial_data(message)
assert "/\"π§π§π§π§π§π§π§π§π§π§/\"" in output
# the serial connection is closed by close_port() fixture: even if in case of test failure
def open_serial_port(socketio, serial_port, baudrate, message):
#open a new serial connection
socketio.emit('command', 'open ' + serial_port + ' ' + baudrate)
# give time to the message var to be filled
time.sleep(.5)
print(message)
# the serial connection should be open now
assert any("\"IsOpen\": true" in i for i in message)
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_sendraw_serial(socketio, close_port, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message)
#test with bytes
integers = [1, 2, 3, 4, 5]
bytes_array=bytearray(integers)
encoded_integers = base64.b64encode(bytes_array).decode('ascii')
socketio.emit('command', 'sendraw ' + serial_port + ' ' + encoded_integers)
time.sleep(1)
print(message)
# check if the send command has been registered
assert any(("sendraw " + serial_port + ' ' + encoded_integers) in i for i in message)
#check if message has been sent back by the connected board
output = extract_serial_data(message) # TODO use decode_output()
print (output)
assert encoded_integers in output
# helper function used to extract serial data from its JSON representation
# NOTE make sure to pass a clean message (maybe reinitialize the message global var before populating it)
def extract_serial_data(msg):
serial_data = ""
for i in msg:
if "{\"P\"" in i:
print (json.loads(i)["D"])
serial_data+=json.loads(i)["D"]
print("serialdata:"+serial_data)
return serial_data
def decode_output(raw_output):
# print(raw_output)
base64_bytes = raw_output.encode('ascii') #encode rawoutput message into a bytes-like object
output_bytes = base64.b64decode(base64_bytes)
return output_bytes.decode('utf-8')