Skip to content

Commit

Permalink
test workflow working for nestd output globs (resolves #371)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Dec 9, 2021
1 parent 9c72588 commit dbf5414
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 62 deletions.
22 changes: 15 additions & 7 deletions tests/functional/application-packages/DockerCopyNestedOutDir.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@ requirements:
dockerPull: debian:stretch-slim
InitialWorkDirRequirement:
listing:
- entryname: script.sh
entry: |
set -x
echo "Input: $2"
echo "Output: $1"
mkdir -p nested/output/directory/
cp $1 "nested/output/directory/"
# copy the file to the output location with nested directory path
# add prefix within the contents to allow validation of final output
# note: Any $(cmd) or ${var} notation must be escaped to avoid conflict with CWL parsing.
- entryname: script.sh
entry: |
set -x
echo "Input: $2"
echo "Output: $1"
mkdir -p nested/output/directory/
for file in $1; do
name="\$(basename \${file})"
path="nested/output/directory/\${name}"
echo "COPY:" > "\${path}"
cat "\${file}" >> "\${path}"
done
inputs:
input_files:
type:
Expand Down
136 changes: 91 additions & 45 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
STATUS_RUNNING,
STATUS_SUCCEEDED
)
from weaver.utils import get_weaver_url, make_dirs, now, request_extra
from weaver.utils import fetch_file, get_weaver_url, make_dirs, now, request_extra
from weaver.visibility import VISIBILITY_PUBLIC
from weaver.wps_restapi.utils import get_wps_restapi_base_url

if TYPE_CHECKING:
from typing import Any, Callable, Dict, Iterable, Optional, Set, Union

from responses import RequestsMock

from weaver.typedefs import AnyResponseType, CookiesType, HeadersType, JSON, SettingsType


Expand Down Expand Up @@ -197,14 +199,36 @@ def setUpClass(cls):
else:
url = "http://{}".format(cls.WEAVER_TEST_SERVER_HOSTNAME)
cls.app = WebTestApp(url)
cls.WEAVER_URL = get_weaver_url(cls.settings())
cls.WEAVER_RESTAPI_URL = get_wps_restapi_base_url(cls.settings())
cls.WEAVER_URL = get_weaver_url(cls.settings.fget(cls))
cls.WEAVER_RESTAPI_URL = get_wps_restapi_base_url(cls.settings.fget(cls))

# validation
cls.setup_test_processes_before()
cls.setup_test_processes()
cls.setup_test_processes_after()

@property
def settings(self):
# type: (...) -> SettingsType
"""
Provide basic settings that must be defined to use various weaver utility functions.
"""
if not self.__settings__:
weaver_url = os.getenv("WEAVER_URL", "{}{}".format(self.WEAVER_TEST_SERVER_HOSTNAME,
self.WEAVER_TEST_SERVER_BASE_PATH))
if not weaver_url.startswith("http"):
if not weaver_url.startswith("/") and weaver_url != "":
weaver_url = "http://{}".format(weaver_url)
self.__settings__ = get_settings_from_testapp(self.app)
self.__settings__.update(get_settings_from_config_ini(self.WEAVER_TEST_CONFIG_INI_PATH))
self.__settings__.update({
"weaver.url": weaver_url,
"weaver.configuration": self.WEAVER_TEST_CONFIGURATION,
"weaver.wps_restapi_path": self.WEAVER_TEST_SERVER_API_PATH,
"weaver.request_options": {},
})
return self.__settings__

@classmethod
def tearDownClass(cls):
cls.clean_test_processes()
Expand Down Expand Up @@ -422,28 +446,6 @@ def current_case_name(cls):
def current_test_name(self):
return self.id().split(".")[-1]

@classmethod
def settings(cls):
# type: (...) -> SettingsType
"""
Provide basic settings that must be defined to use various weaver utility functions.
"""
if not cls.__settings__:
weaver_url = os.getenv("WEAVER_URL", "{}{}".format(cls.WEAVER_TEST_SERVER_HOSTNAME,
cls.WEAVER_TEST_SERVER_BASE_PATH))
if not weaver_url.startswith("http"):
if not weaver_url.startswith("/") and weaver_url != "":
weaver_url = "http://{}".format(weaver_url)
cls.__settings__ = get_settings_from_testapp(cls.app)
cls.__settings__.update(get_settings_from_config_ini(cls.WEAVER_TEST_CONFIG_INI_PATH))
cls.__settings__.update({
"weaver.url": weaver_url,
"weaver.configuration": cls.WEAVER_TEST_CONFIGURATION,
"weaver.wps_restapi_path": cls.WEAVER_TEST_SERVER_API_PATH,
"weaver.request_options": {},
})
return cls.__settings__

@classmethod
def get_test_process(cls, process_id):
# type: (WorkflowProcesses) -> ProcessInfo
Expand Down Expand Up @@ -619,7 +621,7 @@ def request(cls, method, url, ignore_errors=False, force_requests=False, log_ena
if with_requests:
kw.update({"verify": False, "timeout": cls.WEAVER_TEST_REQUEST_TIMEOUT})
# retry request if the error was caused by some connection error
resp = request_extra(method, url, json=json_body, data=data_body, retries=3, settings=cls.settings(), **kw)
resp = request_extra(method, url, json=json_body, data=data_body, retries=3, settings=cls.settings, **kw)

# add some properties similar to `webtest.TestApp`
resp_body = getattr(resp, "body", None) # if error is pyramid HTTPException, body is byte only
Expand Down Expand Up @@ -649,7 +651,7 @@ def request(cls, method, url, ignore_errors=False, force_requests=False, log_ena
if with_mock_req:
# NOTE:
# Very important to mock requests only matching local test application.
# Otherwise, other mocks like 'mock_wps_output' cannot do their job since no real request gets fired.
# Otherwise, mocks like 'mocked_wps_output' cannot do their job since real requests won't be sent.
resp = mocked_sub_requests(cls.app, method, url, only_local=True, **kw)
else:
resp = cls.app._gen_request(method, url, **kw)
Expand Down Expand Up @@ -681,25 +683,49 @@ def request(cls, method, url, ignore_errors=False, force_requests=False, log_ena
cls.indent("Headers: {headers}\n".format(headers=headers), 1))
return resp

def workflow_runner(self, test_workflow_id, test_application_ids, log_full_trace=False, allow_conflict=False):
# type: (WorkflowProcesses, Iterable[WorkflowProcesses], bool, bool) -> None
"""
Simplify test for demonstration purpose.
def workflow_runner(self,
test_workflow_id, # type: WorkflowProcesses
test_application_ids, # type: Iterable[WorkflowProcesses]
log_full_trace=False, # type: bool
requests_mock_callback=None, # type: Optional[Callable[[RequestsMock], None]]
): # type: (...) -> JSON
"""
Main runner method that prepares and evaluates the full :term:`Workflow` execution and its step dependencies.
.. note::
When running on a local :class:`WebTestApp`, mocks :func:`mocked_wps_output`
(with sub-call to :func:`mocked_file_server`) and :func:`mocked_sub_requests` are already being applied.
If further request methods/endpoints need to be added for a given test case, they should be defined using
a function provided with :paramref:`mock_request_callback` to extend the existing response mock.
This is because it is not possible to apply multiple responses mocks one on top of another due to the
patching methodology to catch all sent requests that matches any criteria.
:param test_workflow_id:
Identifier of the :term:`Workflow` to test.
Must be a member amongst preloaded :attr:`WEAVER_TEST_WORKFLOW_SET` definitions.
:param test_application_ids:
Identifiers of all intermediate :term:`Process` steps expected by the :term:`Workflow` to test.
Must be members amongst preloaded :attr:`WEAVER_TEST_APPLICATION_SET` definitions.
:param log_full_trace:
Flag to provide extensive trace logs of all request and response details for each operation.
:param requests_mock_callback:
Function to add further requests mock specifications as needed by the calling test case.
:returns: Response contents of the final :term:`Workflow` results for further validations if needed.
"""

# test will log basic information
self.__class__.log_full_trace = log_full_trace

# deploy processes and make them visible for workflow
has_duplicate_apps = len(set(test_application_ids)) != len(list(test_application_ids))
path_deploy = "/processes"
for process_id in test_application_ids:
path_visible = "{}/{}/visibility".format(path_deploy, self.test_processes_info[process_id].test_id)
data_visible = {"value": VISIBILITY_PUBLIC}
allowed_status = [HTTPCreated.code, HTTPConflict.code] if allow_conflict else HTTPCreated.code
self.request("POST", path_deploy, headers=self.headers,
allowed_status = [HTTPCreated.code, HTTPConflict.code] if has_duplicate_apps else HTTPCreated.code
self.request("POST", path_deploy, status=allowed_status, headers=self.headers,
json=self.test_processes_info[process_id].deploy_payload,
message="Expect deployed application process.",
ignore_errors=allow_conflict, status=allowed_status)
message="Expect deployed application process.")
self.request("PUT", path_visible, status=HTTPOk.code, headers=self.headers, json=data_visible,
message="Expect visible application process.")

Expand All @@ -726,7 +752,9 @@ def workflow_runner(self, test_workflow_id, test_application_ids, log_full_trace
for mock_exec in mocked_execute_process():
stack_exec.enter_context(mock_exec)
# mock HTTP HEAD request to validate WPS output access (see 'setUpClass' details)
stack_exec.enter_context(mocked_wps_output(self.settings(), mock_head=True, mock_get=False))
mock_req = stack_exec.enter_context(mocked_wps_output(self.settings, mock_head=True, mock_get=False))
if requests_mock_callback:
requests_mock_callback(mock_req)

# execute workflow
execute_body = workflow_info.execute_payload
Expand All @@ -739,10 +767,11 @@ def workflow_runner(self, test_workflow_id, test_application_ids, log_full_trace
job_id = resp.json.get("jobID")
self.assert_test(lambda: job_id and job_location and job_location.endswith(job_id),
message="Response process execution job ID must match to validate results.")
self.validate_test_job_execution(job_location, None, None)
resp = self.validate_test_job_execution(job_location, None, None)
return resp.json

def validate_test_job_execution(self, job_location_url, user_headers=None, user_cookies=None):
# type: (str, Optional[HeadersType], Optional[CookiesType]) -> None
# type: (str, Optional[HeadersType], Optional[CookiesType]) -> AnyResponseType
"""
Validates that the job is stated, running, and polls it until completed successfully.
Expand Down Expand Up @@ -781,8 +810,9 @@ def validate_test_job_execution(self, job_location_url, user_headers=None, user_
self.assert_test(lambda: not failed, message=msg)
break
self.assert_test(lambda: False, message="Unknown job execution status: '{}'.".format(status))
self.request("GET", "{}/result".format(job_location_url),
headers=user_headers, cookies=user_cookies, status=HTTPOk.code)
resp = self.request("GET", "{}/results".format(job_location_url),
headers=user_headers, cookies=user_cookies, status=HTTPOk.code)
return resp

def try_retrieve_logs(self, workflow_job_url):
"""
Expand Down Expand Up @@ -866,10 +896,26 @@ def test_workflow_subdir_output_glob(self):
with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowCopyNestedOutDir.json'
tmp_dir = stack.enter_context(tempfile.TemporaryDirectory())
stack.enter_context(mocked_file_server(tmp_dir, tmp_host, self.settings))
with open(os.path.join(tmp_dir, "test-file.txt"), "w") as tmp_file: # must match execution body
tmp_file.write("DUMMY DATA")
self.workflow_runner(WorkflowProcesses.WORKFLOW_CHAIN_COPY,
[WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR,
WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR],
log_full_trace=True, allow_conflict=True)

def mock_tmp_input(requests_mock):
mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock)

results = self.workflow_runner(WorkflowProcesses.WORKFLOW_CHAIN_COPY,
[WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR,
WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR],
log_full_trace=True, requests_mock_callback=mock_tmp_input)

stack.enter_context(mocked_wps_output(self.settings)) # allow retrieval of HTTP WPS output
stage_out_tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # different dir to avoid override
final_output = results.get("output", {}).get("href", "")
self.assert_test(lambda: final_output.startswith("http") and final_output.endswith("test-file.txt"),
message="Workflow output file with nested directory globs should have been automatically"
"mapped between steps until the final staging WPS output URL.")
output_path = fetch_file(final_output, stage_out_tmp_dir, self.settings)
with open(output_path, "r") as out_file:
output_data = out_file.read()
self.assert_test(lambda: output_data == "COPY:\nCOPY:\nDUMMY DATA",
message="Workflow output file with nested directory globs should contain "
"two COPY prefixes, one added by each intermediate step of the Workflow.")
28 changes: 23 additions & 5 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,14 @@ def mock_requests_wps1(*args, **kwargs):
return mocked_remote_server_wrapper


def mocked_file_server(directory, url, settings, mock_get=True, mock_head=True, headers_override=None):
def mocked_file_server(directory, # type: str
url, # type: str
settings, # type: SettingsType
mock_get=True, # type: bool
mock_head=True, # type: bool
headers_override=None, # type: Optional[AnyHeadersContainer]
requests_mock=None, # type: Optional[responses.RequestsMock]
): # type: (...) -> responses.RequestsMock
"""
Mocks a file server endpoint hosting some local directory files.
Expand All @@ -639,6 +646,11 @@ def mocked_file_server(directory, url, settings, mock_get=True, mock_head=True,
and the targeted :paramref:`url` should differ from the :class:`TestApp` URL to avoid incorrect handling
by different mocks.
.. note::
Multiple requests patch operations by calling this function more than once can be applied by providing back
the mock returned on a previous call to the subsequent ones as input. In such case, each mock call should
refer to distinct endpoints that will not cause conflicting request patching configurations.
.. seealso::
For WPS output directory/endpoint, consider using :func:`mocked_wps_output` instead.
Expand All @@ -648,6 +660,7 @@ def mocked_file_server(directory, url, settings, mock_get=True, mock_head=True,
:param mock_get: Whether to mock HTTP GET methods received on WPS output URL.
:param mock_head: Whether to mock HTTP HEAD methods received on WPS output URL.
:param headers_override: Override specified headers in produced response.
:param requests_mock: Previously defined request mock instance to extend with new definitions.
:return: Mocked response that would normally be obtained by a file server hosting WPS output directory.
"""
if directory.startswith("file://"):
Expand Down Expand Up @@ -690,7 +703,7 @@ def request_callback(request):
return 405, {}, ""
return 404, {}, ""

mock_req = responses.RequestsMock(assert_all_requests_are_fired=False)
mock_req = requests_mock or responses.RequestsMock(assert_all_requests_are_fired=False)
any_file_url = re.compile(r"{}/[\w\-_/.]+".format(url)) # match any sub-directory/file structure
if mock_get:
mock_req.add_callback(responses.GET, any_file_url, callback=request_callback)
Expand All @@ -699,8 +712,12 @@ def request_callback(request):
return mock_req


def mocked_wps_output(settings, mock_get=True, mock_head=True, headers_override=None):
# type: (SettingsType, bool, bool, Optional[AnyHeadersContainer]) -> Union[responses.RequestsMock, MockPatch]
def mocked_wps_output(settings, # type: SettingsType
mock_get=True, # type: bool
mock_head=True, # type: bool
headers_override=None, # type: Optional[AnyHeadersContainer]
requests_mock=None, # type: Optional[responses.RequestsMock]
): # type: (...) -> Union[responses.RequestsMock, MockPatch]
"""
Mocks the mapping resolution from HTTP WPS output URL to hosting of matched local file in WPS output directory.
Expand All @@ -718,11 +735,12 @@ def mocked_wps_output(settings, mock_get=True, mock_head=True, headers_override=
:param mock_get: Whether to mock HTTP GET methods received on WPS output URL.
:param mock_head: Whether to mock HTTP HEAD methods received on WPS output URL.
:param headers_override: Override specified headers in produced response.
:param requests_mock: Previously defined request mock instance to extend with new definitions.
:return: Mocked response that would normally be obtained by a file server hosting WPS output directory.
"""
wps_url = get_wps_output_url(settings)
wps_dir = get_wps_output_dir(settings)
return mocked_file_server(wps_dir, wps_url, settings, mock_get, mock_head, headers_override)
return mocked_file_server(wps_dir, wps_url, settings, mock_get, mock_head, headers_override, requests_mock)


def mocked_execute_process():
Expand Down
1 change: 0 additions & 1 deletion weaver/processes/wps1_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
JobInputs,
JobOutputs,
JobResults,
UpdateStatusPartialFunction,
OWS_InputDataValues,
ProcessOWS,
UpdateStatusPartialFunction
Expand Down
Loading

0 comments on commit dbf5414

Please sign in to comment.