|
4 | 4 | ##################################################################
|
5 | 5 |
|
6 | 6 | import unittest
|
| 7 | +import pytest |
7 | 8 | import time
|
8 |
| -from pywps import Service, Process, LiteralInput, LiteralOutput |
| 9 | +from pywps import Service, configuration |
9 | 10 | from pywps import get_ElementMakerForVersion
|
10 |
| -from pywps.tests import client_for, assert_response_accepted |
| 11 | +from pywps.tests import client_for, assert_response_accepted, assert_response_success |
11 | 12 | from .processes import Sleep
|
| 13 | +from owslib.wps import WPSExecution |
| 14 | +from pathlib import Path |
12 | 15 |
|
13 | 16 | VERSION = "1.0.0"
|
14 | 17 |
|
15 | 18 | WPS, OWS = get_ElementMakerForVersion(VERSION)
|
16 | 19 |
|
17 | 20 |
|
18 | 21 | class ExecuteTest(unittest.TestCase):
|
| 22 | + def setUp(self) -> None: |
| 23 | + # Running processes using the MultiProcessing scheduler and a file-based database |
| 24 | + configuration.CONFIG.set('processing', 'mode', 'distributed') |
| 25 | + configuration.CONFIG.set("logging", "database", "sqlite:////tmp/test-pywps-logs.sqlite3") |
19 | 26 |
|
20 |
| - def test_assync(self): |
| 27 | + def tearDown(self) -> None: |
| 28 | + configuration.load_configuration() |
| 29 | + |
| 30 | + def test_async(self): |
21 | 31 | client = client_for(Service(processes=[Sleep()]))
|
22 |
| - request_doc = WPS.Execute( |
23 |
| - OWS.Identifier('sleep'), |
24 |
| - WPS.DataInputs( |
25 |
| - WPS.Input( |
26 |
| - OWS.Identifier('seconds'), |
27 |
| - WPS.Data( |
28 |
| - WPS.LiteralData( |
29 |
| - "0.3" |
30 |
| - ) |
31 |
| - ) |
32 |
| - ) |
33 |
| - ), |
34 |
| - version="1.0.0" |
35 |
| - ) |
36 |
| - resp = client.post_xml(doc=request_doc) |
| 32 | + wps = WPSExecution() |
| 33 | + |
| 34 | + # Build an asynchronous request (requires specifying outputs and setting the mode). |
| 35 | + doc = wps.buildRequest('sleep', |
| 36 | + inputs=[('seconds', '.01')], |
| 37 | + output=[('finished', None, None)], |
| 38 | + mode='async') |
| 39 | + |
| 40 | + resp = client.post_xml(doc=doc) |
| 41 | + wps.parseResponse(resp.xml) |
37 | 42 | assert_response_accepted(resp)
|
38 | 43 |
|
39 |
| - # TODO: |
40 |
| - # . extract the status URL from the response |
41 |
| - # . send a status request |
| 44 | + # The process should not have finished by now. If it does, it's running in sync mode. |
| 45 | + with pytest.raises(AssertionError): |
| 46 | + assert_response_success(resp) |
| 47 | + |
| 48 | + # Parse response to extract the status file path |
| 49 | + url = resp.xml.xpath("//@statusLocation")[0] |
| 50 | + |
| 51 | + # OWSlib only reads from URLs, not local files. So we need to read the response manually. |
| 52 | + p = Path(url[6:]) |
| 53 | + |
| 54 | + # Poll the process until it completes |
| 55 | + total_time = 0 |
| 56 | + sleep_time = .01 |
| 57 | + while wps.status not in ["ProcessSucceeded", "ProcessFailed"]: |
| 58 | + resp = p.read_bytes() |
| 59 | + if resp: |
| 60 | + wps.checkStatus(response=resp, sleepSecs=0.01) |
| 61 | + else: |
| 62 | + time.sleep(sleep_time) |
| 63 | + total_time += sleep_time |
| 64 | + if total_time > 1: |
| 65 | + raise TimeoutError |
| 66 | + |
| 67 | + assert wps.status == 'ProcessSucceeded' |
42 | 68 |
|
43 | 69 |
|
44 | 70 | def load_tests(loader=None, tests=None, pattern=None):
|
|
0 commit comments