-
-
Notifications
You must be signed in to change notification settings - Fork 254
Add as checks with tests #192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
17c3817
0889fda
2af69ee
269b6b1
588bffa
2ec55fa
4e41d59
fa1f068
1a4cf21
31d353f
7e697e1
3554715
0d8e647
51a1fd0
d235aa7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,8 @@ | |
|
||
class TestClient(unittest.TestCase): | ||
|
||
process = None | ||
|
||
@classmethod | ||
def setUpClass(cls): | ||
cls.process = Process(target=mainloop) | ||
|
@@ -309,8 +311,9 @@ def test_as_ct_write(self): | |
def test_as_db_fill(self): | ||
self.client.as_db_fill() | ||
|
||
@unittest.skip("TODO: not yet fully implemented") | ||
def test_as_db_get(self): | ||
self.client.db_get(db_number=db_number) | ||
self.client.as_db_get(db_number=db_number) | ||
|
||
@unittest.skip("TODO: crash client: FATAL: exception not rethrown") | ||
def test_as_db_read(self): | ||
|
@@ -328,6 +331,7 @@ def test_as_db_write(self): | |
data = bytearray(size) | ||
self.client.as_db_write(db_number=1, start=0, data=data) | ||
|
||
@unittest.skip("TODO: not yet fully implemented") | ||
def test_as_download(self): | ||
data = bytearray(128) | ||
self.client.as_download(block_num=-1, data=data) | ||
|
@@ -421,6 +425,7 @@ def test_ab_write_with_byte_literal_does_not_throw(self): | |
finally: | ||
self.client._library.Cli_ABWrite = original | ||
|
||
@unittest.skip("TODO: not yet fully implemented") | ||
def test_as_ab_write_with_byte_literal_does_not_throw(self): | ||
mock_write = mock.MagicMock() | ||
mock_write.return_value = None | ||
|
@@ -437,6 +442,7 @@ def test_as_ab_write_with_byte_literal_does_not_throw(self): | |
finally: | ||
self.client._library.Cli_AsABWrite = original | ||
|
||
@unittest.skip("TODO: not yet fully implemented") | ||
def test_as_db_write_with_byte_literal_does_not_throw(self): | ||
mock_write = mock.MagicMock() | ||
mock_write.return_value = None | ||
|
@@ -451,6 +457,7 @@ def test_as_db_write_with_byte_literal_does_not_throw(self): | |
finally: | ||
self.client._library.Cli_AsDBWrite = original | ||
|
||
@unittest.skip("TODO: not yet fully implemented") | ||
def test_as_download_with_byte_literal_does_not_throw(self): | ||
mock_download = mock.MagicMock() | ||
mock_download.return_value = None | ||
|
@@ -474,6 +481,86 @@ def test_set_plc_datetime(self): | |
# Can't actual set datetime in emulated PLC, get_plc_datetime always returns system time. | ||
# self.assertEqual(new_dt, self.client.get_plc_datetime()) | ||
|
||
def test_wait_as_completion_pass(self, timeout=10): | ||
# Cli_WaitAsCompletion | ||
# prepare Server with values | ||
area = snap7.types.areas.DB | ||
dbnumber = 1 | ||
size = 1 | ||
start = 1 | ||
data = bytearray(size) | ||
self.client.write_area(area, dbnumber, start, data) | ||
# start as_request and test | ||
p_data = self.client.as_db_read(dbnumber, start, size) | ||
try: | ||
self.client.wait_as_completion(ctypes.c_ulong(timeout)) | ||
swamper123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except Snap7Exception as s7_err: | ||
if s7_err.args[0] == b'CLI : Job Timeout': | ||
self.fail(f"Request was timeouted after {timeout} seconds - FAIL") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the try except can be removed here, the test will then just fail if an exception is raised, no reason to capture it and manually fail |
||
else: | ||
self.fail(f"Snap7Exception was thrown: {s7_err} - FAIL") | ||
swamper123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except BaseException as pyt_err: | ||
self.fail(f"Exception was thrown: {pyt_err}") | ||
self.assertEqual(bytearray(p_data), data) | ||
|
||
def test_wait_as_completion_timeouted(self, timeout=0, tries=500): | ||
# Cli_WaitAsCompletion | ||
# prepare Server | ||
area = snap7.types.areas.DB | ||
dbnumber = 1 | ||
size = 1 | ||
start = 1 | ||
data = bytearray(size) | ||
self.client.write_area(area, dbnumber, start, data) | ||
# start as_request and wait for zero seconds to try trigger timeout | ||
for i in range(tries): | ||
swamper123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.client.as_db_read(dbnumber, start, size) | ||
try: | ||
self.client.wait_as_completion(ctypes.c_ulong(0)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the try except can be removed here, the test will then just fail if an exception is raised, no reason to capture it and manually fail There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The method itself is raising an Snap7Exception in case of a timeout, which is the original behaviour of the snap7 version as well. |
||
except Snap7Exception as s7_err: | ||
if not s7_err.args[0] == b'CLI : Job Timeout': | ||
self.fail(f"While waiting another error appeared: {s7_err}") | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you return here? shouldn't it just continue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Otherwise I would stuck in the loop, testing the behaviour multiple times(which is unnecessarry) and then, if the loop would be done, the test would fail. So if once a Job Timeout was raised correctly, the test shall stop here and pass (via return). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah yes, skimmed too quickly. |
||
except BaseException as pyt_err: | ||
self.fail(f"Exception was thrown: {pyt_err}") | ||
self.skipTest(f"After {tries} tries, no timout could be envoked. Skip test.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you don't want to skip on timeout, just fail |
||
|
||
def test_check_as_completion(self, timeout=5): | ||
# Cli_CheckAsCompletion | ||
check_status = ctypes.c_int(-1) | ||
pending_checked = False | ||
# preparing Server values | ||
size = 40 | ||
start = 0 | ||
db = 1 | ||
data = bytearray(40) | ||
try: | ||
self.client.db_write(db_number=db, start=start, data=data) | ||
gijzelaerr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except Snap7Exception as s7_err: | ||
self.fail(f"Snap7Exception raised while preparing as_check_completion test: {s7_err}") | ||
except BaseException as python_err: | ||
self.fail(f"Other exception raised while preparing as_check_completion test: {python_err}") | ||
# Execute test | ||
p_data = self.client.as_db_read(db, start, size) | ||
for i in range(10): | ||
try: | ||
self.client.check_as_completion(ctypes.byref(check_status)) | ||
if check_status.value == 0: | ||
swamper123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
data_result = bytearray(p_data) | ||
self.assertEqual(data_result, data) | ||
break | ||
pending_checked = True | ||
time.sleep(1) | ||
except Snap7Exception as s7_err: | ||
gijzelaerr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.fail(f"Snap7Exception raised: {s7_err}") | ||
except BaseException as python_err: | ||
self.fail(f"Other exception raised: {python_err}") | ||
else: | ||
self.fail(f"TimeoutError - Process pends for more than {timeout} seconds") | ||
if pending_checked is False: | ||
logging.warning("Pending was never reached, because Server was to fast," | ||
" but request to server was successfull.") | ||
|
||
def test_asebread(self): | ||
# Cli_AsEBRead | ||
with self.assertRaises(NotImplementedError): | ||
|
@@ -539,11 +626,6 @@ def test_aswritearea(self): | |
with self.assertRaises(NotImplementedError): | ||
self.client.aswritearea() | ||
|
||
def test_checkascompletion(self): | ||
# Cli_CheckAsCompletion | ||
with self.assertRaises(NotImplementedError): | ||
self.client.checkascompletion() | ||
|
||
def test_copyramtorom(self): | ||
# Cli_CopyRamToRom | ||
with self.assertRaises(NotImplementedError): | ||
|
@@ -659,11 +741,6 @@ def test_readszllist(self): | |
with self.assertRaises(NotImplementedError): | ||
self.client.readszllist() | ||
|
||
def test_setascallback(self): | ||
# Cli_SetAsCallback | ||
with self.assertRaises(NotImplementedError): | ||
self.client.setascallback() | ||
|
||
def test_setparam(self): | ||
# Cli_SetParam | ||
with self.assertRaises(NotImplementedError): | ||
|
@@ -689,11 +766,6 @@ def test_tmwrite(self): | |
with self.assertRaises(NotImplementedError): | ||
self.client.tmwrite() | ||
|
||
def test_waitascompletion(self): | ||
# Cli_WaitAsCompletion | ||
with self.assertRaises(NotImplementedError): | ||
self.client.waitascompletion() | ||
|
||
def test_writemultivars(self): | ||
# Cli_WriteMultiVars | ||
with self.assertRaises(NotImplementedError): | ||
|
Uh oh!
There was an error while loading. Please reload this page.