Skip to content

Commit

Permalink
refactor wps-process in workflow (reuse staging operations) + fix wor…
Browse files Browse the repository at this point in the history
…kflow collect output glob (fix #371)
  • Loading branch information
fmigneault committed Nov 25, 2021
1 parent 7ef3d4b commit 4a7477c
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 131 deletions.
23 changes: 20 additions & 3 deletions weaver/processes/wps1_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@
if TYPE_CHECKING:
from pywps.app import WPSRequest

from weaver.typedefs import CWL_RuntimeInputsMap, OWS_InputDataValues, ProcessOWS, UpdateStatusPartialFunction
from weaver.typedefs import (
CWL_ExpectedOutputs,
CWL_RuntimeInputsMap,
OWS_InputDataValues,
ProcessOWS,
UpdateStatusPartialFunction
)

LOGGER = logging.getLogger(__name__)

REMOTE_JOB_PROGRESS_REQ_PREP = 2
REMOTE_JOB_PROGRESS_PREPARE = 2
REMOTE_JOB_PROGRESS_EXECUTION = 5
REMOTE_JOB_PROGRESS_MONITORING = 10
REMOTE_JOB_PROGRESS_FETCH_OUT = 90
Expand Down Expand Up @@ -113,8 +119,10 @@ def get_input_values(self, process, workflow_inputs):
return wps_inputs

def execute(self, workflow_inputs, out_dir, expected_outputs):
# type: (CWL_RuntimeInputsMap, str, CWL_ExpectedOutputs) -> None

self.update_status("Preparing execute request for remote WPS1 provider.",
REMOTE_JOB_PROGRESS_REQ_PREP, status.STATUS_RUNNING)
REMOTE_JOB_PROGRESS_PREPARE, status.STATUS_RUNNING)
LOGGER.debug("Execute process WPS request for %s", self.process)
try:
try:
Expand All @@ -127,6 +135,9 @@ def execute(self, workflow_inputs, out_dir, expected_outputs):
except Exception as ex:
raise OWSNoApplicableCode("Failed to retrieve WPS process description. Error: [{}].".format(str(ex)))

# FIXME: above -> prepare() operation
# FIXME: here -> stage_job_inputs (must adapt dict/list formats accordingly)

wps_inputs = self.get_input_values(process, workflow_inputs)

# prepare outputs
Expand All @@ -138,6 +149,8 @@ def execute(self, workflow_inputs, out_dir, expected_outputs):
self.update_status("Executing job on remote WPS1 provider.",
REMOTE_JOB_PROGRESS_EXECUTION, status.STATUS_RUNNING)

# FIXME: below -> execute_process() [aka execute() becomes partial impl with calls to sub-operations)

mode = EXECUTE_MODE_ASYNC
execution = wps.execute(self.process, inputs=wps_inputs, output=outputs_as_ref, mode=mode, lineage=True)
if not execution.process and execution.errors:
Expand Down Expand Up @@ -181,6 +194,8 @@ def execute(self, workflow_inputs, out_dir, expected_outputs):
execution.percentCompleted, exec_msg, execution.statusLocation))
raise Exception(execution.statusMessage or "Job failed.")

# FIXME: below -> stage_job_results()

self.update_status("Fetching job outputs from remote WPS1 provider.",
REMOTE_JOB_PROGRESS_FETCH_OUT, status.STATUS_RUNNING)

Expand Down Expand Up @@ -209,6 +224,8 @@ def execute(self, workflow_inputs, out_dir, expected_outputs):
with open(dst_fn, mode="wb") as dst_fh:
dst_fh.write(resp.content)

# FIXME: below -> move to execute()

except Exception as exc:
exception_class = "{}.{}".format(type(exc).__module__, type(exc).__name__)
errors = "{0}: {1!s}".format(exception_class, exc)
Expand Down
118 changes: 27 additions & 91 deletions weaver/processes/wps3_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
from weaver.execute import EXECUTE_MODE_ASYNC, EXECUTE_RESPONSE_DOCUMENT, EXECUTE_TRANSMISSION_MODE_REFERENCE
from weaver.formats import CONTENT_TYPE_APP_FORM, CONTENT_TYPE_APP_JSON
from weaver.processes import opensearch
from weaver.processes.constants import OPENSEARCH_LOCAL_FILE_SCHEME
from weaver.processes.sources import get_data_source_from_url, retrieve_data_source_url
from weaver.processes.utils import map_progress
from weaver.processes.wps_process_base import WpsProcessInterface
from weaver.utils import (
fetch_file,
get_any_id,
get_any_message,
get_any_value,
Expand All @@ -35,15 +33,21 @@
)
from weaver.visibility import VISIBILITY_PUBLIC
from weaver.warning import MissingParameterWarning
from weaver.wps.utils import map_wps_output_location
from weaver.wps_restapi import swagger_definitions as sd

if TYPE_CHECKING:
from typing import List, Union
from typing import Any, Union

from pywps.app import WPSRequest

from weaver.typedefs import JSON, UpdateStatusPartialFunction
from weaver.typedefs import (
JSON,
JobInputs,
JobOutputs,
JobResults,
UpdateStatusPartialFunction
)


LOGGER = logging.getLogger(__name__)

Expand All @@ -66,10 +70,10 @@ def __init__(self,
request, # type: WPSRequest
update_status, # type: UpdateStatusPartialFunction
):
super(Wps3Process, self).__init__(request)
self.provider = None # overridden if data source properly resolved
self.update_status = lambda _message, _progress, _status: update_status(
self.provider, _message, _progress, _status)
super(Wps3Process, self).__init__(
request,
lambda _message, _progress, _status: update_status(_message, _progress, _status, self.provider)
)
self.provider, self.url, self.deploy_body = self.resolve_data_source(step_payload, joborder)
self.process = process

Expand Down Expand Up @@ -240,39 +244,28 @@ def prepare(self):
except Exception as exc:
pass_http_error(exc, HTTPNotFound)

def execute(self, workflow_inputs, out_dir, expected_outputs):
self.update_status("Preparing process on remote ADES.",
REMOTE_JOB_PROGRESS_PREPARE, status.STATUS_RUNNING)
self.prepare()
def format_outputs(self, workflow_outputs):
# type: (JobOutputs) -> JobOutputs
for output in workflow_outputs:
output.update({"transmissionMode": EXECUTE_TRANSMISSION_MODE_REFERENCE})
return workflow_outputs

self.update_status("Process ready for execute request on remote ADES.",
REMOTE_JOB_PROGRESS_READY, status.STATUS_RUNNING)
def dispatch(self, process_inputs, process_outputs):
# type: (JobInputs, JobOutputs) -> Any
LOGGER.debug("Execute process WPS request for [%s]", self.process)
execute_body_inputs = self.stage_job_inputs(workflow_inputs)
execute_body_outputs = [
{"id": output, "transmissionMode": EXECUTE_TRANSMISSION_MODE_REFERENCE} for output in expected_outputs
]
self.update_status("Executing job on remote ADES.", REMOTE_JOB_PROGRESS_EXECUTION, status.STATUS_RUNNING)
execute_body = {
"mode": EXECUTE_MODE_ASYNC,
"response": EXECUTE_RESPONSE_DOCUMENT,
"inputs": execute_body_inputs,
"outputs": execute_body_outputs
"inputs": process_inputs,
"outputs": process_outputs
}
request_url = self.url + sd.process_jobs_service.path.format(process_id=self.process)
response = self.make_request(method="POST", url=request_url, json=execute_body, retry=True)
if response.status_code != 201:
raise Exception("Was expecting a 201 status code from the execute request : {0}".format(request_url))

job_status_uri = response.headers["Location"]
job_id = self.monitor(job_status_uri)

self.update_status("Fetching job outputs from remote ADES.",
REMOTE_JOB_PROGRESS_FETCH_OUT, status.STATUS_RUNNING)
results = self.get_job_results(job_id)
self.stage_job_results(results, expected_outputs, out_dir)
self.update_status("Execution on remote ADES completed.",
REMOTE_JOB_PROGRESS_COMPLETED, status.STATUS_SUCCEEDED)
return job_status_uri

def monitor(self, job_status_uri):
job_status = self.get_job_status(job_status_uri)
Expand Down Expand Up @@ -327,13 +320,13 @@ def get_job_status(self, job_status_uri, retry=True):
job_status["status"] = status.map_status(job_status["status"])
return job_status

def get_job_results(self, job_id):
# type: (str) -> List[JSON]
def get_results(self, job_status_uri):
# type: (str) -> JobResults
"""
Obtains produced output results from successful job status ID.
"""
# use results endpoint instead of '/outputs' to ensure support with other
result_url = self.url + sd.process_results_service.path.format(process_id=self.process, job_id=job_id)
# use '/results' endpoint instead of '/outputs' to ensure support with other
result_url = job_status_uri + "/results"
response = self.make_request(method="GET", url=result_url, retry=True)
response.raise_for_status()
contents = response.json()
Expand All @@ -357,60 +350,3 @@ def get_job_results(self, job_id):
outputs.append(out_val)
contents = outputs
return contents

def stage_job_results(self, results, expected_outputs, out_dir):
for result in results:
res_id = get_any_id(result)
# CWL expect the output file to be written matching definition in 'expected_outputs',
# but this definition could be a glob pattern to match multiple file.
# Therefore, we cannot rely on a specific name from it.
if res_id in expected_outputs:
# plan ahead when list of multiple output values could be supported
result_values = get_any_value(result)
if not isinstance(result_values, list):
result_values = [result_values]
cwl_out_dir = out_dir.rstrip("/")
for value in result_values:
src_name = value.split("/")[-1]
dst_path = "/".join([cwl_out_dir, src_name])
# performance improvement:
# Bypass download if file can be resolved as local resource (already fetched or same server).
# Because CWL expects the file to be in specified 'out_dir', make a link for it to be found
# even though the file is stored in the full job output location instead (already staged by step).
map_path = map_wps_output_location(value, self.settings)
as_link = False
if map_path:
LOGGER.info("Detected result [%s] from [%s] as local reference to this instance. "
"Skipping fetch and using local copy in output destination: [%s]",
res_id, value, dst_path)
LOGGER.debug("Mapped result [%s] to local reference: [%s]", value, map_path)
src_path = map_path
as_link = True
else:
LOGGER.info("Fetching result [%s] from [%s] to CWL output destination: [%s]",
res_id, value, dst_path)
src_path = value
fetch_file(src_path, cwl_out_dir, settings=self.settings, link=as_link)

def stage_job_inputs(self, workflow_inputs):
execute_body_inputs = []
for workflow_input_key, workflow_input_value in workflow_inputs.items():
if not isinstance(workflow_input_value, list):
workflow_input_value = [workflow_input_value]
for workflow_input_value_item in workflow_input_value:
if isinstance(workflow_input_value_item, dict) and "location" in workflow_input_value_item:
location = workflow_input_value_item["location"]
execute_body_inputs.append({"id": workflow_input_key, "href": location})
else:
execute_body_inputs.append({"id": workflow_input_key, "data": workflow_input_value_item})

for exec_input in execute_body_inputs:
if "href" in exec_input and isinstance(exec_input["href"], str):
LOGGER.debug("Original input location [%s] : [%s]", exec_input["id"], exec_input["href"])
if exec_input["href"].startswith("{0}://".format(OPENSEARCH_LOCAL_FILE_SCHEME)):
exec_input["href"] = "file{0}".format(exec_input["href"][len(OPENSEARCH_LOCAL_FILE_SCHEME):])
LOGGER.debug("OpenSearch intermediate input [%s] : [%s]", exec_input["id"], exec_input["href"])
elif exec_input["href"].startswith("file://"):
exec_input["href"] = self.host_file(exec_input["href"])
LOGGER.debug("Hosting intermediate input [%s] : [%s]", exec_input["id"], exec_input["href"])
return execute_body_inputs
6 changes: 3 additions & 3 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
CWL_Results,
JSON,
Number,
ToolPathObjectType,
CWL_ToolPathObjectType,
ValueType
)

Expand Down Expand Up @@ -1596,7 +1596,7 @@ def make_location_output(self, cwl_result, output_id):
self.logger.info("Resolved WPS output [%s] as file reference: [%s]", output_id, result_wps)

def make_tool(self, toolpath_object, loading_context):
# type: (ToolPathObjectType, LoadingContext) -> ProcessCWL
# type: (CWL_ToolPathObjectType, LoadingContext) -> ProcessCWL
from weaver.processes.wps_workflow import default_make_tool
return default_make_tool(toolpath_object, loading_context, self.get_job_process_definition)

Expand Down Expand Up @@ -1638,7 +1638,7 @@ def get_job_process_definition(self, jobname, joborder, tool): # noqa: E811
self.update_status("Preparing to launch {type} {name}.".format(type=jobtype, name=jobname),
start_step_progress, STATUS_RUNNING)

def _update_status_dispatch(_provider, _message, _progress, _status):
def _update_status_dispatch(_message, _progress, _status, _provider):
self.step_update_status(
_message, _progress, start_step_progress, end_step_progress, jobname, _provider, _status
)
Expand Down
Loading

0 comments on commit 4a7477c

Please sign in to comment.