diff --git a/tests/functional/application-packages/DockerCopyNestedOutDir.cwl b/tests/functional/application-packages/DockerCopyNestedOutDir.cwl index d8cc10664..e38f54517 100644 --- a/tests/functional/application-packages/DockerCopyNestedOutDir.cwl +++ b/tests/functional/application-packages/DockerCopyNestedOutDir.cwl @@ -8,13 +8,21 @@ requirements: dockerPull: debian:stretch-slim InitialWorkDirRequirement: listing: - - entryname: script.sh - entry: | - set -x - echo "Input: $2" - echo "Output: $1" - mkdir -p nested/output/directory/ - cp $1 "nested/output/directory/" + # copy the file to the output location with nested directory path + # add prefix within the contents to allow validation of final output + # note: Any $(cmd) or ${var} notation must be escaped to avoid conflict with CWL parsing. + - entryname: script.sh + entry: | + set -x + echo "Input: $2" + echo "Output: $1" + mkdir -p nested/output/directory/ + for file in $1; do + name="\$(basename \${file})" + path="nested/output/directory/\${name}" + echo "COPY:" > "\${path}" + cat "\${file}" >> "\${path}" + done inputs: input_files: type: diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index 5a9183b7b..438ef3c19 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -45,13 +45,15 @@ STATUS_RUNNING, STATUS_SUCCEEDED ) -from weaver.utils import get_weaver_url, make_dirs, now, request_extra +from weaver.utils import fetch_file, get_weaver_url, make_dirs, now, request_extra from weaver.visibility import VISIBILITY_PUBLIC from weaver.wps_restapi.utils import get_wps_restapi_base_url if TYPE_CHECKING: from typing import Any, Callable, Dict, Iterable, Optional, Set, Union + from responses import RequestsMock + from weaver.typedefs import AnyResponseType, CookiesType, HeadersType, JSON, SettingsType @@ -197,14 +199,36 @@ def setUpClass(cls): else: url = "http://{}".format(cls.WEAVER_TEST_SERVER_HOSTNAME) cls.app = WebTestApp(url) - cls.WEAVER_URL = get_weaver_url(cls.settings()) - cls.WEAVER_RESTAPI_URL = get_wps_restapi_base_url(cls.settings()) + cls.WEAVER_URL = get_weaver_url(cls.settings.fget(cls)) + cls.WEAVER_RESTAPI_URL = get_wps_restapi_base_url(cls.settings.fget(cls)) # validation cls.setup_test_processes_before() cls.setup_test_processes() cls.setup_test_processes_after() + @property + def settings(self): + # type: (...) -> SettingsType + """ + Provide basic settings that must be defined to use various weaver utility functions. + """ + if not self.__settings__: + weaver_url = os.getenv("WEAVER_URL", "{}{}".format(self.WEAVER_TEST_SERVER_HOSTNAME, + self.WEAVER_TEST_SERVER_BASE_PATH)) + if not weaver_url.startswith("http"): + if not weaver_url.startswith("/") and weaver_url != "": + weaver_url = "http://{}".format(weaver_url) + self.__settings__ = get_settings_from_testapp(self.app) + self.__settings__.update(get_settings_from_config_ini(self.WEAVER_TEST_CONFIG_INI_PATH)) + self.__settings__.update({ + "weaver.url": weaver_url, + "weaver.configuration": self.WEAVER_TEST_CONFIGURATION, + "weaver.wps_restapi_path": self.WEAVER_TEST_SERVER_API_PATH, + "weaver.request_options": {}, + }) + return self.__settings__ + @classmethod def tearDownClass(cls): cls.clean_test_processes() @@ -422,28 +446,6 @@ def current_case_name(cls): def current_test_name(self): return self.id().split(".")[-1] - @classmethod - def settings(cls): - # type: (...) -> SettingsType - """ - Provide basic settings that must be defined to use various weaver utility functions. - """ - if not cls.__settings__: - weaver_url = os.getenv("WEAVER_URL", "{}{}".format(cls.WEAVER_TEST_SERVER_HOSTNAME, - cls.WEAVER_TEST_SERVER_BASE_PATH)) - if not weaver_url.startswith("http"): - if not weaver_url.startswith("/") and weaver_url != "": - weaver_url = "http://{}".format(weaver_url) - cls.__settings__ = get_settings_from_testapp(cls.app) - cls.__settings__.update(get_settings_from_config_ini(cls.WEAVER_TEST_CONFIG_INI_PATH)) - cls.__settings__.update({ - "weaver.url": weaver_url, - "weaver.configuration": cls.WEAVER_TEST_CONFIGURATION, - "weaver.wps_restapi_path": cls.WEAVER_TEST_SERVER_API_PATH, - "weaver.request_options": {}, - }) - return cls.__settings__ - @classmethod def get_test_process(cls, process_id): # type: (WorkflowProcesses) -> ProcessInfo @@ -619,7 +621,7 @@ def request(cls, method, url, ignore_errors=False, force_requests=False, log_ena if with_requests: kw.update({"verify": False, "timeout": cls.WEAVER_TEST_REQUEST_TIMEOUT}) # retry request if the error was caused by some connection error - resp = request_extra(method, url, json=json_body, data=data_body, retries=3, settings=cls.settings(), **kw) + resp = request_extra(method, url, json=json_body, data=data_body, retries=3, settings=cls.settings, **kw) # add some properties similar to `webtest.TestApp` resp_body = getattr(resp, "body", None) # if error is pyramid HTTPException, body is byte only @@ -649,7 +651,7 @@ def request(cls, method, url, ignore_errors=False, force_requests=False, log_ena if with_mock_req: # NOTE: # Very important to mock requests only matching local test application. - # Otherwise, other mocks like 'mock_wps_output' cannot do their job since no real request gets fired. + # Otherwise, mocks like 'mocked_wps_output' cannot do their job since real requests won't be sent. resp = mocked_sub_requests(cls.app, method, url, only_local=True, **kw) else: resp = cls.app._gen_request(method, url, **kw) @@ -681,25 +683,49 @@ def request(cls, method, url, ignore_errors=False, force_requests=False, log_ena cls.indent("Headers: {headers}\n".format(headers=headers), 1)) return resp - def workflow_runner(self, test_workflow_id, test_application_ids, log_full_trace=False, allow_conflict=False): - # type: (WorkflowProcesses, Iterable[WorkflowProcesses], bool, bool) -> None - """ - Simplify test for demonstration purpose. + def workflow_runner(self, + test_workflow_id, # type: WorkflowProcesses + test_application_ids, # type: Iterable[WorkflowProcesses] + log_full_trace=False, # type: bool + requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]] + ): # type: (...) -> JSON + """ + Main runner method that prepares and evaluates the full :term:`Workflow` execution and its step dependencies. + + .. note:: + When running on a local :class:`WebTestApp`, mocks :func:`mocked_wps_output` + (with sub-call to :func:`mocked_file_server`) and :func:`mocked_sub_requests` are already being applied. + If further request methods/endpoints need to be added for a given test case, they should be defined using + a function provided with :paramref:`mock_request_callback` to extend the existing response mock. + This is because it is not possible to apply multiple responses mocks one on top of another due to the + patching methodology to catch all sent requests that matches any criteria. + + :param test_workflow_id: + Identifier of the :term:`Workflow` to test. + Must be a member amongst preloaded :attr:`WEAVER_TEST_WORKFLOW_SET` definitions. + :param test_application_ids: + Identifiers of all intermediate :term:`Process` steps expected by the :term:`Workflow` to test. + Must be members amongst preloaded :attr:`WEAVER_TEST_APPLICATION_SET` definitions. + :param log_full_trace: + Flag to provide extensive trace logs of all request and response details for each operation. + :param requests_mock_callback: + Function to add further requests mock specifications as needed by the calling test case. + :returns: Response contents of the final :term:`Workflow` results for further validations if needed. """ # test will log basic information self.__class__.log_full_trace = log_full_trace # deploy processes and make them visible for workflow + has_duplicate_apps = len(set(test_application_ids)) != len(list(test_application_ids)) path_deploy = "/processes" for process_id in test_application_ids: path_visible = "{}/{}/visibility".format(path_deploy, self.test_processes_info[process_id].test_id) data_visible = {"value": VISIBILITY_PUBLIC} - allowed_status = [HTTPCreated.code, HTTPConflict.code] if allow_conflict else HTTPCreated.code - self.request("POST", path_deploy, headers=self.headers, + allowed_status = [HTTPCreated.code, HTTPConflict.code] if has_duplicate_apps else HTTPCreated.code + self.request("POST", path_deploy, status=allowed_status, headers=self.headers, json=self.test_processes_info[process_id].deploy_payload, - message="Expect deployed application process.", - ignore_errors=allow_conflict, status=allowed_status) + message="Expect deployed application process.") self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible, message="Expect visible application process.") @@ -726,7 +752,9 @@ def workflow_runner(self, test_workflow_id, test_application_ids, log_full_trace for mock_exec in mocked_execute_process(): stack_exec.enter_context(mock_exec) # mock HTTP HEAD request to validate WPS output access (see 'setUpClass' details) - stack_exec.enter_context(mocked_wps_output(self.settings(), mock_head=True, mock_get=False)) + mock_req = stack_exec.enter_context(mocked_wps_output(self.settings, mock_head=True, mock_get=False)) + if requests_mock_callback: + requests_mock_callback(mock_req) # execute workflow execute_body = workflow_info.execute_payload @@ -739,10 +767,11 @@ def workflow_runner(self, test_workflow_id, test_application_ids, log_full_trace job_id = resp.json.get("jobID") self.assert_test(lambda: job_id and job_location and job_location.endswith(job_id), message="Response process execution job ID must match to validate results.") - self.validate_test_job_execution(job_location, None, None) + resp = self.validate_test_job_execution(job_location, None, None) + return resp.json def validate_test_job_execution(self, job_location_url, user_headers=None, user_cookies=None): - # type: (str, Optional[HeadersType], Optional[CookiesType]) -> None + # type: (str, Optional[HeadersType], Optional[CookiesType]) -> AnyResponseType """ Validates that the job is stated, running, and polls it until completed successfully. @@ -781,8 +810,9 @@ def validate_test_job_execution(self, job_location_url, user_headers=None, user_ self.assert_test(lambda: not failed, message=msg) break self.assert_test(lambda: False, message="Unknown job execution status: '{}'.".format(status)) - self.request("GET", "{}/result".format(job_location_url), - headers=user_headers, cookies=user_cookies, status=HTTPOk.code) + resp = self.request("GET", "{}/results".format(job_location_url), + headers=user_headers, cookies=user_cookies, status=HTTPOk.code) + return resp def try_retrieve_logs(self, workflow_job_url): """ @@ -866,10 +896,26 @@ def test_workflow_subdir_output_glob(self): with contextlib.ExitStack() as stack: tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowCopyNestedOutDir.json' tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) - stack.enter_context(mocked_file_server(tmp_dir, tmp_host, self.settings)) with open(os.path.join(tmp_dir, "test-file.txt"), "w") as tmp_file: # must match execution body tmp_file.write("DUMMY DATA") - self.workflow_runner(WorkflowProcesses.WORKFLOW_CHAIN_COPY, - [WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR, - WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR], - log_full_trace=True, allow_conflict=True) + + def mock_tmp_input(requests_mock): + mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) + + results = self.workflow_runner(WorkflowProcesses.WORKFLOW_CHAIN_COPY, + [WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR, + WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR], + log_full_trace=True, requests_mock_callback=mock_tmp_input) + + stack.enter_context(mocked_wps_output(self.settings)) # allow retrieval of HTTP WPS output + stage_out_tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # different dir to avoid override + final_output = results.get("output", {}).get("href", "") + self.assert_test(lambda: final_output.startswith("http") and final_output.endswith("test-file.txt"), + message="Workflow output file with nested directory globs should have been automatically" + "mapped between steps until the final staging WPS output URL.") + output_path = fetch_file(final_output, stage_out_tmp_dir, self.settings) + with open(output_path, "r") as out_file: + output_data = out_file.read() + self.assert_test(lambda: output_data == "COPY:\nCOPY:\nDUMMY DATA", + message="Workflow output file with nested directory globs should contain " + "two COPY prefixes, one added by each intermediate step of the Workflow.") diff --git a/tests/utils.py b/tests/utils.py index 63af2a2c0..bff6c9ab6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -630,7 +630,14 @@ def mock_requests_wps1(*args, **kwargs): return mocked_remote_server_wrapper -def mocked_file_server(directory, url, settings, mock_get=True, mock_head=True, headers_override=None): +def mocked_file_server(directory, # type: str + url, # type: str + settings, # type: SettingsType + mock_get=True, # type: bool + mock_head=True, # type: bool + headers_override=None, # type: Optional[AnyHeadersContainer] + requests_mock=None, # type: Optional[responses.RequestsMock] + ): # type: (...) -> responses.RequestsMock """ Mocks a file server endpoint hosting some local directory files. @@ -639,6 +646,11 @@ def mocked_file_server(directory, url, settings, mock_get=True, mock_head=True, and the targeted :paramref:`url` should differ from the :class:`TestApp` URL to avoid incorrect handling by different mocks. + .. note:: + Multiple requests patch operations by calling this function more than once can be applied by providing back + the mock returned on a previous call to the subsequent ones as input. In such case, each mock call should + refer to distinct endpoints that will not cause conflicting request patching configurations. + .. seealso:: For WPS output directory/endpoint, consider using :func:`mocked_wps_output` instead. @@ -648,6 +660,7 @@ def mocked_file_server(directory, url, settings, mock_get=True, mock_head=True, :param mock_get: Whether to mock HTTP GET methods received on WPS output URL. :param mock_head: Whether to mock HTTP HEAD methods received on WPS output URL. :param headers_override: Override specified headers in produced response. + :param requests_mock: Previously defined request mock instance to extend with new definitions. :return: Mocked response that would normally be obtained by a file server hosting WPS output directory. """ if directory.startswith("file://"): @@ -690,7 +703,7 @@ def request_callback(request): return 405, {}, "" return 404, {}, "" - mock_req = responses.RequestsMock(assert_all_requests_are_fired=False) + mock_req = requests_mock or responses.RequestsMock(assert_all_requests_are_fired=False) any_file_url = re.compile(r"{}/[\w\-_/.]+".format(url)) # match any sub-directory/file structure if mock_get: mock_req.add_callback(responses.GET, any_file_url, callback=request_callback) @@ -699,8 +712,12 @@ def request_callback(request): return mock_req -def mocked_wps_output(settings, mock_get=True, mock_head=True, headers_override=None): - # type: (SettingsType, bool, bool, Optional[AnyHeadersContainer]) -> Union[responses.RequestsMock, MockPatch] +def mocked_wps_output(settings, # type: SettingsType + mock_get=True, # type: bool + mock_head=True, # type: bool + headers_override=None, # type: Optional[AnyHeadersContainer] + requests_mock=None, # type: Optional[responses.RequestsMock] + ): # type: (...) -> Union[responses.RequestsMock, MockPatch] """ Mocks the mapping resolution from HTTP WPS output URL to hosting of matched local file in WPS output directory. @@ -718,11 +735,12 @@ def mocked_wps_output(settings, mock_get=True, mock_head=True, headers_override= :param mock_get: Whether to mock HTTP GET methods received on WPS output URL. :param mock_head: Whether to mock HTTP HEAD methods received on WPS output URL. :param headers_override: Override specified headers in produced response. + :param requests_mock: Previously defined request mock instance to extend with new definitions. :return: Mocked response that would normally be obtained by a file server hosting WPS output directory. """ wps_url = get_wps_output_url(settings) wps_dir = get_wps_output_dir(settings) - return mocked_file_server(wps_dir, wps_url, settings, mock_get, mock_head, headers_override) + return mocked_file_server(wps_dir, wps_url, settings, mock_get, mock_head, headers_override, requests_mock) def mocked_execute_process(): diff --git a/weaver/processes/wps1_process.py b/weaver/processes/wps1_process.py index cc620b43f..a5dd85854 100644 --- a/weaver/processes/wps1_process.py +++ b/weaver/processes/wps1_process.py @@ -37,7 +37,6 @@ JobInputs, JobOutputs, JobResults, - UpdateStatusPartialFunction, OWS_InputDataValues, ProcessOWS, UpdateStatusPartialFunction diff --git a/weaver/processes/wps3_process.py b/weaver/processes/wps3_process.py index 208967867..cb4a0b6b4 100644 --- a/weaver/processes/wps3_process.py +++ b/weaver/processes/wps3_process.py @@ -72,7 +72,7 @@ def __init__(self, ): super(Wps3Process, self).__init__( request, - lambda _message, _progress, _status: update_status(_message, _progress, _status, self.provider) + lambda _message, _progress, _status: update_status(_message, _progress, _status, self.provider or "local") ) self.provider, self.url, self.deploy_body = self.resolve_data_source(step_payload, joborder) self.process = process @@ -98,8 +98,8 @@ def resolve_data_source(self, step_payload, joborder): except (IndexError, KeyError) as exc: raise PackageExecutionError("Failed to save package outputs. [{!r}]".format(exc)) - self.provider = data_source # fix immediately for `update_status` - self.update_status("{provider} is selected {reason}.".format(provider=data_source, reason=reason), + self.provider = data_source # fix immediately for below `update_status` call + self.update_status("Provider {provider} is selected {reason}.".format(provider=data_source, reason=reason), REMOTE_JOB_PROGRESS_PROVIDER, status.STATUS_RUNNING) return data_source, url, deploy_body diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index f8ba6fa5b..11725087d 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -1150,7 +1150,7 @@ def step_update_status(self, message, progress, start_step_progress, end_step_pr target_host, status): # type: (str, Number, Number, Number, str, AnyValue, str) -> None self.update_status( - message="{0} [{1}] - {2}".format(target_host, step_name, str(message).strip()), + message="[provider: {0}, step: {1}] - {2}".format(target_host, step_name, str(message).strip()), progress=map_progress(progress, start_step_progress, end_step_progress), status=status, )