diff --git a/tests/processes/__init__.py b/tests/processes/__init__.py index 0b847ace4..5882a9ef1 100644 --- a/tests/processes/__init__.py +++ b/tests/processes/__init__.py @@ -99,3 +99,38 @@ def bbox(request, response): area = request.inputs['area'][0].data response.outputs['extent'].data = area return response + + +class Sleep(Process): + """A long running process, just sleeping.""" + def __init__(self): + inputs = [ + LiteralInput('seconds', title='Seconds', data_type='float') + ] + outputs = [ + LiteralOutput('finished', title='Finished', data_type='boolean') + ] + + super(Sleep, self).__init__( + self._handler, + identifier='sleep', + title='Sleep', + abstract='Wait for specified number of seconds.', + inputs=inputs, + outputs=outputs, + store_supported=True, + status_supported=True + ) + + @staticmethod + def _handler(request, response): + import time + + seconds = request.inputs['seconds'][0].data + step = seconds / 3 + for i in range(3): + response.update_status('Sleep in progress...', i / 3 * 100) + time.sleep(step) + + response.outputs['finished'].data = "True" + return response diff --git a/tests/test_assync.py b/tests/test_assync.py index 2db2eb074..f64a53b79 100644 --- a/tests/test_assync.py +++ b/tests/test_assync.py @@ -4,67 +4,67 @@ ################################################################## import unittest +import pytest import time -from pywps import Service, Process, LiteralInput, LiteralOutput +from pywps import Service, configuration from pywps import get_ElementMakerForVersion -from pywps.tests import client_for, assert_response_accepted +from pywps.tests import client_for, assert_response_accepted, assert_response_success +from .processes import Sleep +from owslib.wps import WPSExecution +from pathlib import Path VERSION = "1.0.0" WPS, OWS = get_ElementMakerForVersion(VERSION) -def create_sleep(): +class ExecuteTest(unittest.TestCase): + def setUp(self) -> None: + # Running processes using the MultiProcessing scheduler and a file-based database + configuration.CONFIG.set('processing', 'mode', 'distributed') + configuration.CONFIG.set("logging", "database", "sqlite:////tmp/test-pywps-logs.sqlite3") - def sleep(request, response): - seconds = request.inputs['seconds'][0].data - assert isinstance(seconds, float) + def tearDown(self) -> None: + configuration.load_configuration() - step = seconds / 3 - for i in range(3): - # How is status working in version 4 ? - #self.status.set("Waiting...", i * 10) - time.sleep(step) + def test_async(self): + client = client_for(Service(processes=[Sleep()])) + wps = WPSExecution() - response.outputs['finished'].data = "True" - return response + # Build an asynchronous request (requires specifying outputs and setting the mode). + doc = wps.buildRequest('sleep', + inputs=[('seconds', '.01')], + output=[('finished', None, None)], + mode='async') - return Process(handler=sleep, - identifier='sleep', - title='Sleep', - inputs=[ - LiteralInput('seconds', title='Seconds', data_type='float') - ], - outputs=[ - LiteralOutput('finished', title='Finished', data_type='boolean') - ] - ) + resp = client.post_xml(doc=doc) + wps.parseResponse(resp.xml) + assert_response_accepted(resp) + # The process should not have finished by now. If it does, it's running in sync mode. + with pytest.raises(AssertionError): + assert_response_success(resp) -class ExecuteTest(unittest.TestCase): + # Parse response to extract the status file path + url = resp.xml.xpath("//@statusLocation")[0] - def test_assync(self): - client = client_for(Service(processes=[create_sleep()])) - request_doc = WPS.Execute( - OWS.Identifier('sleep'), - WPS.DataInputs( - WPS.Input( - OWS.Identifier('seconds'), - WPS.Data( - WPS.LiteralData( - "0.3" - ) - ) - ) - ), - version="1.0.0" - ) - resp = client.post_xml(doc=request_doc) - assert_response_accepted(resp) + # OWSlib only reads from URLs, not local files. So we need to read the response manually. + p = Path(url[6:]) + + # Poll the process until it completes + total_time = 0 + sleep_time = .01 + while wps.status not in ["ProcessSucceeded", "ProcessFailed"]: + resp = p.read_bytes() + if resp: + wps.checkStatus(response=resp, sleepSecs=0.01) + else: + time.sleep(sleep_time) + total_time += sleep_time + if total_time > 1: + raise TimeoutError - # TODO: - # . extract the status URL from the response - # . send a status request + assert wps.status == 'ProcessSucceeded' def load_tests(loader=None, tests=None, pattern=None):