diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index f4bef099e42..982d2e6c718 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -102,7 +102,7 @@ def step( input_paths_filename=None, split_index=None, opt_namespace=None, - retry_count=None, + retry_count=None, # TODO remove this from interface, look for effecting codepaths max_user_code_retries=None, clone_only=None, clone_run_id=None, @@ -155,6 +155,15 @@ def step( ctx.obj.monitor, ubf_context, ) + latest_done_attempt = task.flow_datastore.get_latest_done_attempt( + run_id=run_id, step_name=step_name, task_id=task_id + ) + if latest_done_attempt: + retry_count = latest_done_attempt + 1 + # Not sure what are the side effects to this. + if retry_count >= max_user_code_retries: + max_user_code_retries = retry_count + if clone_only: task.clone_only( step_name, diff --git a/metaflow/datastore/flow_datastore.py b/metaflow/datastore/flow_datastore.py index 16318ed7693..d8beec6d9c7 100644 --- a/metaflow/datastore/flow_datastore.py +++ b/metaflow/datastore/flow_datastore.py @@ -1,5 +1,6 @@ import itertools import json +from typing import Optional from .. import metaflow_config @@ -67,6 +68,13 @@ def __init__( def datastore_root(self): return self._storage_impl.datastore_root + def get_latest_done_attempt(self, run_id, step_name, task_id) -> Optional[int]: + t_datastores = self.get_task_datastores( + pathspecs=[f"{run_id}/{step_name}/{task_id}"], + include_prior=True + ) + return max([t.attempt for t in t_datastores], default=None) # returns default, if this was a first attempt. + def get_task_datastores( self, run_id=None, diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 415a934cbe4..9bb44d2b871 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -507,7 +507,7 @@ # # Note also that DataStoreSet resolves the latest attempt_id using # lexicographic ordering of attempts. This won't work if MAX_ATTEMPTS > 99. -MAX_ATTEMPTS = 6 +MAX_ATTEMPTS = from_conf("MAX_ATTEMPTS", 6) # Feature flag (experimental features that are *explicitly* unsupported) diff --git a/metaflow/mflog/save_logs.py b/metaflow/mflog/save_logs.py index beedfcb2a11..729eecd8cea 100644 --- a/metaflow/mflog/save_logs.py +++ b/metaflow/mflog/save_logs.py @@ -21,7 +21,6 @@ def _read_file(path): # these env vars are set by mflog.mflog_env pathspec = os.environ["MF_PATHSPEC"] - attempt = os.environ["MF_ATTEMPT"] ds_type = os.environ["MF_DATASTORE"] ds_root = os.environ.get("MF_DATASTORE_ROOT") paths = (os.environ["MFLOG_STDOUT"], os.environ["MFLOG_STDERR"]) @@ -37,8 +36,10 @@ def print_clean(line, **kwargs): flow_datastore = FlowDataStore( flow_name, None, storage_impl=storage_impl, ds_root=ds_root ) + # Use inferred attempt - to save task_stdout.log and task_stderr.log + latest_done_attempt = flow_datastore.get_latest_done_attempt(run_id=run_id, step_name=step_name, task_id=task_id) task_datastore = flow_datastore.get_task_datastore( - run_id, step_name, task_id, int(attempt), mode="w" + run_id, step_name, task_id, latest_done_attempt, mode="w" ) try: diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 336d880da0c..d9978c66d72 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -46,6 +46,7 @@ SERVICE_HEADERS, SERVICE_INTERNAL_URL, UI_URL, + MAX_ATTEMPTS, ) from metaflow.metaflow_config_funcs import config_values from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars @@ -1733,6 +1734,7 @@ def _container_templates(self): "METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA, "METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes", "METAFLOW_OWNER": self.username, + "METAFLOW_MAX_ATTEMPTS": MAX_ATTEMPTS, }, **{ # Configuration for Argo Events. Keep these in sync with the diff --git a/metaflow/plugins/kubernetes/kubernetes.py b/metaflow/plugins/kubernetes/kubernetes.py index 6625047395a..71eb7d79cdc 100644 --- a/metaflow/plugins/kubernetes/kubernetes.py +++ b/metaflow/plugins/kubernetes/kubernetes.py @@ -39,6 +39,7 @@ SERVICE_HEADERS, KUBERNETES_SECRETS, SERVICE_INTERNAL_URL, + MAX_ATTEMPTS, ) from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK from metaflow.metaflow_config_funcs import config_values @@ -299,6 +300,7 @@ def create_jobset( # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes # pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set ( # see get_datastore_root_from_config in datastore/local.py). + .environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS) ) for k in list( @@ -602,6 +604,7 @@ def create_job_object( # assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes # pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set ( # see get_datastore_root_from_config in datastore/local.py). + .environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS) ) # Temporary passing of *some* environment variables. Do not rely on this