Skip to content

Commit

Permalink
adjust results location resolution to pass references to following st…
Browse files Browse the repository at this point in the history
…eps during Workflow execution (fixes #358) + remove some useless file fetch steps (relates to #183)
  • Loading branch information
fmigneault committed Nov 11, 2021
1 parent 6a7381e commit 3a95d04
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 21 deletions.
16 changes: 16 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ Changes

.. **REPLACE AND/OR ADD SECTION ENTRIES ACCORDINGLY WITH APPLIED CHANGES**
TMP FUTURE RELEASE
==================

Changes:
--------
- Add ``map_wps_output_location`` utility function to handle recurrent mapping of ``weaver.wps_output_dir`` back and
forth with resolved ``weaver.wps_output_url``.
- Add more detection of map-able WPS output location to avoid fetching files unnecessarily. Common cases
are ``Workflow`` running multiple steps on the same server or `Application Package` ``Process`` that reuses an output
produced by a previous execution. Relates to `#183 <https://github.com/crim-ca/weaver/issues/183>`_.

Fixes:
------
- Fix incorrect resolution of ``Process`` results endpoint to pass contents from one step to another
during ``Workflow`` execution (resolves `#358 <https://github.com/crim-ca/weaver/issues/358>`_).

`Unreleased <https://github.com/crim-ca/weaver/tree/master>`_ (latest)
========================================================================

Expand Down
1 change: 1 addition & 0 deletions docs/source/processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ Workflow Operations

.. todo:: same as prev + 'operations' (deploy based on data-source, visibility, exec-remote for each step, pull-result)

.. _file_reference_types:

File Reference Types
~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
73 changes: 57 additions & 16 deletions weaver/processes/wps3_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from weaver.processes.utils import map_progress
from weaver.processes.wps_process_base import WpsProcessInterface
from weaver.utils import (
fetch_file,
get_any_id,
get_any_message,
get_any_value,
Expand All @@ -34,10 +35,11 @@
)
from weaver.visibility import VISIBILITY_PUBLIC
from weaver.warning import MissingParameterWarning
from weaver.wps.utils import map_wps_output_location
from weaver.wps_restapi import swagger_definitions as sd

if TYPE_CHECKING:
from typing import Union
from typing import List, Union

from pywps.app import WPSRequest

Expand Down Expand Up @@ -318,17 +320,33 @@ def execute(self, workflow_inputs, out_dir, expected_outputs):
REMOTE_JOB_PROGRESS_FETCH_OUT, status.STATUS_RUNNING)
results = self.get_job_results(job_status["jobID"])
for result in results:
if get_any_id(result) in expected_outputs:
# This is where cwl expect the output file to be written
# TODO We will probably need to handle multiple output value...
dst_fn = "/".join([out_dir.rstrip("/"), expected_outputs[get_any_id(result)]])

# TODO Should we handle other type than File reference?
resp = request_extra("get", get_any_value(result), allow_redirects=True, settings=self.settings)
LOGGER.debug("Fetching result output from [%s] to cwl output destination: [%s]",
get_any_value(result), dst_fn)
with open(dst_fn, mode="wb") as dst_fh:
dst_fh.write(resp.content)
res_id = get_any_id(result)
# CWL expect the output file to be written matching definition in 'expected_outputs',
# but this definition could be a glob pattern to match multiple file.
# Therefore, we cannot rely on a specific name from it.
# Furthermore, a glob pattern could match multiple files.
if res_id in expected_outputs:
# plan ahead when list of multiple output values could be supported
result_values = get_any_value(result)
if not isinstance(result_values, list):
result_values = [result_values]
cwl_out_dir = out_dir.rstrip("/")
for value in result_values:
src_name = value.split("/")[-1]
dst_path = "/".join([cwl_out_dir, src_name])
# performance improvement: bypass download if file can be resolved as local
map_path = map_wps_output_location(value, self.settings)
if map_path:
LOGGER.info("Detected result [%s] from [%s] as local reference to this instance. "
"Skipping fetch and using local copy in output destination: [%s]",
res_id, value, dst_path)
LOGGER.debug("Mapped result [%s] to local reference: [%s]", value, map_path)
src_path = map_path
else:
LOGGER.info("Fetching result [%s] from [%s] to CWL output destination: [%s]",
res_id, value, dst_path)
src_path = value
fetch_file(src_path, cwl_out_dir, settings=self.settings)

self.update_status("Execution on remote ADES completed.",
REMOTE_JOB_PROGRESS_COMPLETED, status.STATUS_SUCCEEDED)
Expand Down Expand Up @@ -356,9 +374,32 @@ def get_job_status(self, job_status_uri, retry=True):
return job_status

def get_job_results(self, job_id):
# type: (str) -> List[JSON]
"""
Obtains produced output results from successful job status ID.
"""
# use results endpoint instead of '/outputs' to ensure support with other
result_url = self.url + sd.process_results_service.path.format(process_id=self.process, job_id=job_id)
response = self.make_request(method="GET",
url=result_url,
retry=True)
response = self.make_request(method="GET", url=result_url, retry=True)
response.raise_for_status()
return response.json().get("outputs", {})
contents = response.json()

# backward compatibility for ADES that returns output IDs nested under 'outputs'
if "outputs" in contents:
# ensure that we don't incorrectly pick a specific output ID named 'outputs'
maybe_outputs = contents["outputs"]
if isinstance(maybe_outputs, dict) and get_any_id(maybe_outputs) is None:
contents = maybe_outputs
# backward compatibility for ADES that returns list of outputs nested under 'outputs'
# (i.e.: as Weaver-specific '/outputs' endpoint)
elif isinstance(maybe_outputs, list) and all(get_any_id(out) is not None for out in maybe_outputs):
contents = maybe_outputs

# rebuild the expected (old) list format for calling method
if isinstance(contents, dict) and all(get_any_value(out) is not None for out in contents.values()):
outputs = []
for out_id, out_val in contents.items():
out_val.update({"id": out_id})
outputs.append(out_val)
contents = outputs
return contents
37 changes: 32 additions & 5 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
request_extra,
setup_loggers
)
from weaver.wps.utils import get_wps_output_dir, get_wps_output_url
from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, map_wps_output_location
from weaver.wps_restapi.swagger_definitions import process_service

if TYPE_CHECKING:
Expand Down Expand Up @@ -1252,7 +1252,7 @@ def must_fetch(self, input_ref):
if input_ref.startswith("s3://"):
return True
return False
return True
return not os.path.isfile(input_ref)

def make_inputs(self,
wps_inputs, # type: Dict[str, Deque[WPS_Input_Type]]
Expand Down Expand Up @@ -1305,9 +1305,18 @@ def make_location_input(self, input_type, input_definition):
"""
Generates the JSON content required to specify a `CWL` ``File`` input definition from a location.
.. note::
If the process requires ``OpenSearch`` references that should be preserved as is, use scheme defined by
:py:data:`weaver.processes.constants.OPENSEARCH_LOCAL_FILE_SCHEME` prefix instead of ``http(s)://``.
If the input reference corresponds to an HTTP URL that is detected as matching the local WPS output endpoint,
implicitly convert the reference to the local WPS output directory to avoid useless download of available file.
Since that endpoint could be protected though, perform a minimal HEAD request to validate its accessibility.
Otherwise, this operation could incorrectly grant unauthorized access to protected files by forging the URL.
If the process requires ``OpenSearch`` references that should be preserved as is, scheme defined by
:py:data:`weaver.processes.constants.OPENSEARCH_LOCAL_FILE_SCHEME` prefix instead of ``http(s)://`` is expected.
Any other variant of file reference will be fetched as applicable by the relevant schemes.
.. seealso::
Documentation details of resolution based on schemes defined in :ref:`file_reference_types` section.
"""
# NOTE:
# When running as EMS, must not call data/file methods if URL reference, otherwise contents
Expand Down Expand Up @@ -1360,11 +1369,23 @@ def make_location_input(self, input_type, input_definition):
):
self.logger.debug("File input (%s) DROPPED. Detected default format as data.", input_definition.identifier)
return None

# auto-map local if possible after security check
input_local_ref = map_wps_output_location(input_location, self.settings)
if input_local_ref:
resp = request_extra("HEAD", input_location, settings=self.settings)
if resp.status_code == 200: # if failed, following fetch will produce the appropriate HTTP error
self.logger.debug("Detected and validated remotely accessible reference [%s] "
"matching local WPS outputs [%s]. Skipping fetch using direct reference.",
input_location, input_local_ref)
input_location = input_local_ref

if self.must_fetch(input_location):
self.logger.info("File input (%s) ATTEMPT fetch: [%s]", input_definition.identifier, input_location)
input_location = fetch_file(input_location, input_definition.workdir, settings=self.settings)
else:
self.logger.info("File input (%s) SKIPPED fetch: [%s]", input_definition.identifier, input_location)

location = {"location": input_location, "class": input_type}
if input_definition.data_format is not None and input_definition.data_format.mime_type:
fmt = get_cwl_file_format(input_definition.data_format.mime_type, make_reference=True)
Expand All @@ -1385,6 +1406,12 @@ def make_outputs(self, cwl_result):
"Dropping additional output values (%s total), only 1 supported per identifier.",
len(cwl_result[output_id])
)
# provide more details than poorly descriptive IndexError
if not len(cwl_result[output_id]):
raise PackageExecutionError(
"Process output '{}' expects at least one value but none was found. "
"Possible incorrect glob pattern definition in CWL Application Package.".format(output_id)
)
cwl_result[output_id] = cwl_result[output_id][0] # expect only one output

if "location" not in cwl_result[output_id] and os.path.isfile(str(cwl_result[output_id])):
Expand Down
3 changes: 3 additions & 0 deletions weaver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,9 @@ def fetch_file(file_reference, file_outdir, settings=None, **request_kwargs):
with open(file_path, "wb") as file:
resp = request_extra("get", file_reference, stream=True, retries=3, settings=settings, **request_kwargs)
if resp.status_code >= 400:
# use method since response object does not derive from Exception, therefore cannot be raised directly
if hasattr(resp, "raise_for_status"):
resp.raise_for_status()
raise resp
# NOTE:
# Setting 'chunk_size=None' lets the request find a suitable size according to
Expand Down
25 changes: 25 additions & 0 deletions weaver/wps/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,31 @@ def get_wps_local_status_location(url_status_location, container, must_exist=Tru
return out_path


def map_wps_output_location(reference, container, reverse=False, exists=True):
# type: (str, AnySettingsContainer, bool, bool) -> Optional[str]
"""
Obtains the mapped WPS output location of a file where applicable.
:param reference: local file path (normal) or file URL (reverse) to be mapped.
:param container: retrieve application settings.
:param reverse: perform the reverse operation (local path -> URL endpoint), or process normally (URL -> local path).
:param exists: ensure that the mapped file exists, otherwise don't map it.
:returns: mapped reference that corresponds to the local WPS output location.
"""
settings = get_settings(container)
wps_out_dir = get_wps_output_dir(settings)
wps_out_url = get_wps_output_url(settings)
if reverse and reference.startswith(wps_out_dir):
wps_out_ref = reference.replace(wps_out_dir, wps_out_url)
if not exists or os.path.isfile(wps_out_ref):
return wps_out_ref
elif not reverse and reference.startswith(wps_out_url):
wps_out_ref = reference.replace(wps_out_url, wps_out_dir)
if not exists or os.path.isfile(wps_out_ref):
return wps_out_ref
return None


@cache_region("request")
def _describe_process_cached(self, identifier, xml=None):
# type: (WebProcessingService, str, Optional[xml_util.XML]) -> ProcessOWS
Expand Down

0 comments on commit 3a95d04

Please sign in to comment.