Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: make MAX_ATTEMPTS configurable #2275

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ccdd76b
add custom log
dhpikolo Feb 11, 2025
0427c73
use echo_always
dhpikolo Feb 11, 2025
9ab0b1c
debug log to spot attempt number
dhpikolo Feb 12, 2025
8617d44
pass kwargs get_task_datastore()
dhpikolo Feb 12, 2025
cfc0624
use try-excpet to mitigate datastore error
dhpikolo Feb 12, 2025
47b8dd3
enable allow_not_done
dhpikolo Feb 12, 2025
f2126ec
remove try/except block
dhpikolo Feb 12, 2025
966ac66
print datastore metadata
dhpikolo Feb 12, 2025
3576162
use flow_datastore intead
dhpikolo Feb 12, 2025
326d847
fix attrs
dhpikolo Feb 12, 2025
f4d0acd
Update metaflow/cli_components/step_cmd.py
dhpikolo Feb 12, 2025
8f5044f
Update metaflow/cli_components/step_cmd.py
dhpikolo Feb 12, 2025
f1aa3a6
add try except block
dhpikolo Feb 13, 2025
cc2a622
correct math
dhpikolo Feb 13, 2025
bba6504
do not use ca_store attrs
dhpikolo Feb 13, 2025
5603de0
improve logs
dhpikolo Feb 13, 2025
e605958
use getattr
dhpikolo Feb 13, 2025
f2829d2
set allow_not_done = True
dhpikolo Feb 13, 2025
5f7d50d
use taskdatastores instead
dhpikolo Feb 14, 2025
8fc9b37
add logs
dhpikolo Feb 14, 2025
ca8736f
Update metaflow/cli_components/step_cmd.py
dhpikolo Feb 14, 2025
98b7579
save task std logs
dhpikolo Feb 14, 2025
250b1ef
Merge branch 'logmle-debug-step-cli' of https://github.com/dhpikolo/m…
dhpikolo Feb 14, 2025
7f89598
Revert "save task std logs"
dhpikolo Feb 14, 2025
44e05cc
refactor + infer attempt in mflog.save_logs
dhpikolo Feb 14, 2025
b5bb050
change default to 0
dhpikolo Feb 17, 2025
a405118
Merge remote-tracking branch 'origin' into logmle-debug-step-cli
dhpikolo Feb 17, 2025
b69f81d
remove casting to int
dhpikolo Feb 17, 2025
10ec80f
set default to None
dhpikolo Feb 17, 2025
e815cf6
read from conf
dhpikolo Feb 17, 2025
8959ae5
move get latest done attempts before if-else
dhpikolo Feb 18, 2025
7177064
mount MAX_ATTEMPTS on kubernetes job
dhpikolo Feb 19, 2025
b6f82d2
Merge branch 'logmle-make-max-attempts-configurable' of https://githu…
dhpikolo Feb 19, 2025
9e2faed
Revert "mount MAX_ATTEMPTS on kubernetes job"
dhpikolo Feb 19, 2025
0f135b6
Revert "Revert "mount MAX_ATTEMPTS on kubernetes job""
dhpikolo Feb 19, 2025
55126a1
add max attempts envar in container template
dhpikolo Feb 19, 2025
0fd14d3
add in k8s create job
dhpikolo Feb 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion metaflow/cli_components/step_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def step(
input_paths_filename=None,
split_index=None,
opt_namespace=None,
retry_count=None,
retry_count=None, # TODO remove this from interface, look for effecting codepaths
max_user_code_retries=None,
clone_only=None,
clone_run_id=None,
Expand Down Expand Up @@ -155,6 +155,15 @@ def step(
ctx.obj.monitor,
ubf_context,
)
latest_done_attempt = task.flow_datastore.get_latest_done_attempt(
run_id=run_id, step_name=step_name, task_id=task_id
)
if latest_done_attempt:
retry_count = latest_done_attempt + 1
# Not sure what are the side effects to this.
if retry_count >= max_user_code_retries:
max_user_code_retries = retry_count

if clone_only:
task.clone_only(
step_name,
Expand Down
8 changes: 8 additions & 0 deletions metaflow/datastore/flow_datastore.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import json
from typing import Optional

from .. import metaflow_config

Expand Down Expand Up @@ -67,6 +68,13 @@ def __init__(
def datastore_root(self):
return self._storage_impl.datastore_root

def get_latest_done_attempt(self, run_id, step_name, task_id) -> Optional[int]:
t_datastores = self.get_task_datastores(
pathspecs=[f"{run_id}/{step_name}/{task_id}"],
include_prior=True
)
return max([t.attempt for t in t_datastores], default=None) # returns default, if this was a first attempt.

def get_task_datastores(
self,
run_id=None,
Expand Down
2 changes: 1 addition & 1 deletion metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@
#
# Note also that DataStoreSet resolves the latest attempt_id using
# lexicographic ordering of attempts. This won't work if MAX_ATTEMPTS > 99.
MAX_ATTEMPTS = 6
MAX_ATTEMPTS = from_conf("MAX_ATTEMPTS", 6)

# Feature flag (experimental features that are *explicitly* unsupported)

Expand Down
5 changes: 3 additions & 2 deletions metaflow/mflog/save_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def _read_file(path):

# these env vars are set by mflog.mflog_env
pathspec = os.environ["MF_PATHSPEC"]
attempt = os.environ["MF_ATTEMPT"]
ds_type = os.environ["MF_DATASTORE"]
ds_root = os.environ.get("MF_DATASTORE_ROOT")
paths = (os.environ["MFLOG_STDOUT"], os.environ["MFLOG_STDERR"])
Expand All @@ -37,8 +36,10 @@ def print_clean(line, **kwargs):
flow_datastore = FlowDataStore(
flow_name, None, storage_impl=storage_impl, ds_root=ds_root
)
# Use inferred attempt - to save task_stdout.log and task_stderr.log
latest_done_attempt = flow_datastore.get_latest_done_attempt(run_id=run_id, step_name=step_name, task_id=task_id)
task_datastore = flow_datastore.get_task_datastore(
run_id, step_name, task_id, int(attempt), mode="w"
run_id, step_name, task_id, latest_done_attempt, mode="w"
)

try:
Expand Down
2 changes: 2 additions & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
SERVICE_HEADERS,
SERVICE_INTERNAL_URL,
UI_URL,
MAX_ATTEMPTS,
)
from metaflow.metaflow_config_funcs import config_values
from metaflow.mflog import BASH_SAVE_LOGS, bash_capture_logs, export_mflog_env_vars
Expand Down Expand Up @@ -1733,6 +1734,7 @@ def _container_templates(self):
"METAFLOW_KUBERNETES_FETCH_EC2_METADATA": KUBERNETES_FETCH_EC2_METADATA,
"METAFLOW_RUNTIME_ENVIRONMENT": "kubernetes",
"METAFLOW_OWNER": self.username,
"METAFLOW_MAX_ATTEMPTS": MAX_ATTEMPTS,
},
**{
# Configuration for Argo Events. Keep these in sync with the
Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/kubernetes/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
SERVICE_HEADERS,
KUBERNETES_SECRETS,
SERVICE_INTERNAL_URL,
MAX_ATTEMPTS,
)
from metaflow.unbounded_foreach import UBF_CONTROL, UBF_TASK
from metaflow.metaflow_config_funcs import config_values
Expand Down Expand Up @@ -299,6 +300,7 @@ def create_jobset(
# assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes
# pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set (
# see get_datastore_root_from_config in datastore/local.py).
.environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS)
)

for k in list(
Expand Down Expand Up @@ -602,6 +604,7 @@ def create_job_object(
# assumes metadata is stored in DATASTORE_LOCAL_DIR on the Kubernetes
# pod; this happens when METAFLOW_DATASTORE_SYSROOT_LOCAL is NOT set (
# see get_datastore_root_from_config in datastore/local.py).
.environment_variable("METAFLOW_MAX_ATTEMPTS", MAX_ATTEMPTS)
)

# Temporary passing of *some* environment variables. Do not rely on this
Expand Down