Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom log file behaviour #159

Merged
merged 39 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f50794c
feat: added flags for intended logfile behaviour - no code, yet
cmeesters Oct 20, 2024
b64b3a9
fix: syntax errors and formatting
cmeesters Oct 21, 2024
a7d9df7
fix: working interface
cmeesters Oct 31, 2024
5133877
Merge branch 'main' into feat/logfeatures
cmeesters Nov 7, 2024
9c68936
fix: log prefix instead of logdir. - not working
cmeesters Nov 7, 2024
099292e
Merge branch 'main' into feat/logfeatures
cmeesters Nov 11, 2024
db4172a
feat: implementing all features as described
cmeesters Nov 18, 2024
0d6a200
fix: removed unnecessary pathlib import
cmeesters Nov 18, 2024
34adb4e
fix: lininting issues
cmeesters Nov 18, 2024
730433f
fix: using atexit to decouple the function from __del__, moved all co…
cmeesters Nov 19, 2024
490a7c3
fix: deleted unused import
cmeesters Nov 19, 2024
a9ff35c
fix: deleted unused import
cmeesters Nov 19, 2024
e567349
fix: linting issues
cmeesters Nov 19, 2024
4770c8b
fix: linting issues II
cmeesters Nov 19, 2024
9cc019b
feat: not rellying on '/home/$USER' any more, this is dangerous. Inst…
cmeesters Nov 19, 2024
bfd9cd6
fix: removed trailing whitespace
cmeesters Nov 19, 2024
d7e0e93
fix: using os.path.join for path concatenation, like it should be
cmeesters Nov 19, 2024
39cf201
Update snakemake_executor_plugin_slurm/__init__.py
cmeesters Nov 19, 2024
568080a
Merge branch 'feat/logfeatures' of github.com:snakemake/snakemake-exe…
cmeesters Nov 19, 2024
e727989
fix: formatting and linting
cmeesters Nov 19, 2024
51ae157
fix: moved cleanup code before __post_init__
cmeesters Nov 19, 2024
e43f108
fix: removed one more trailing whitespace
cmeesters Nov 19, 2024
88b6705
fix: those who want to keep all logs should be pleased
cmeesters Nov 19, 2024
009e216
docs: documenting the new feature
cmeesters Nov 19, 2024
8260f6b
fix: removed table of command line flags special to the executor - it…
cmeesters Dec 5, 2024
f750600
feat: same code - based on on the pathlib library
cmeesters Dec 9, 2024
b661dd8
Update snakemake_executor_plugin_slurm/utils.py
cmeesters Jan 2, 2025
aaad25d
fix: no multiline warnings
cmeesters Jan 2, 2025
6f74d18
fix: removed outcommented code
cmeesters Jan 2, 2025
63b4f59
fix: reordered such that functions follow 'post_init'
cmeesters Jan 2, 2025
829a889
fix: converted help strings to single line strings
cmeesters Jan 2, 2025
c9c0eed
fix: reverted to previous default of logging in workdir
cmeesters Jan 6, 2025
8a18089
fix: back to default SLURM logdir NOT being in HOME, all code now bas…
cmeesters Jan 7, 2025
1ef9e98
fix: removed (once more) the additional flags section
cmeesters Jan 7, 2025
9764842
feat: documentation on the new features
cmeesters Jan 7, 2025
59cf40e
Update snakemake_executor_plugin_slurm/utils.py
cmeesters Jan 7, 2025
c9c7b8e
fix: recursively deleting log subdirs
cmeesters Jan 8, 2025
739f878
Update snakemake_executor_plugin_slurm/__init__.py
johanneskoester Jan 8, 2025
d6f5567
Update snakemake_executor_plugin_slurm/__init__.py
johanneskoester Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions docs/further.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,7 @@ A workflow rule may support several
[resource specifications](https://snakemake.readthedocs.io/en/latest/snakefiles/rules.html#resources).
For a SLURM cluster, a mapping between Snakemake and SLURM needs to be performed.

You can use the following specifications:

| SLURM | Snakemake | Description |
|----------------|------------|---------------------------------------|
| `--partition` | `slurm_partition` | the partition a rule/job is to use |
| `--time` | `runtime` | the walltime per job in minutes |
| `--constraint` | `constraint` | may hold features on some clusters |
| `--mem` | `mem`, `mem_mb` | memory a cluster node must |
| | | provide (`mem`: string with unit), `mem_mb`: i |
| `--mem-per-cpu` | `mem_mb_per_cpu` | memory per reserved CPU |
| `--ntasks` | `tasks` | number of concurrent tasks / ranks |
| `--cpus-per-task` | `cpus_per_task` | number of cpus per task (in case of SMP, rather use `threads`) |
| `--nodes` | `nodes` | number of nodes |
| `--clusters` | `clusters` | comma separated string of clusters |

Each of these can be part of a rule, e.g.:
Each of the listed command line flags can be part of a rule, e.g.:

``` python
rule:
Expand Down Expand Up @@ -158,16 +143,6 @@ set-resources:
cpus_per_task: 40
```

#### Additional Command Line Flags

This plugin defines additional command line flags.
As always, these can be set on the command line or in a profile.

| Flag | Meaning |
|-------------|----------|
| `--slurm_init_seconds_before_status_checks`| modify time before initial job status check; the default of 40 seconds avoids load on querying slurm databases, but shorter wait times are for example useful during workflow development |
| `--slurm_requeue` | allows jobs to be resubmitted automatically if they fail or are preempted. See the [section "retries" for details](#retries)|

#### Multicluster Support

For reasons of scheduling multicluster support is provided by the `clusters` flag in resources sections. Note, that you have to write `clusters`, not `cluster`!
Expand Down Expand Up @@ -279,6 +254,18 @@ export SNAKEMAKE_PROFILE="$HOME/.config/snakemake"

==This is ongoing development. Eventually you will be able to annotate different file access patterns.==

### Log Files - Getting Information on Failures

Snakemake, via this SLURM executor, submits itself as a job. This ensures that all features are preserved in the job context. SLURM requires a logfile to be written for _every_ job. This is redundant information and only contains the Snakemake output already printed on the terminal. If a rule is equipped with a `log` directive, SLURM logs only contain Snakemake's output.

This executor will remove SLURM logs of sucessful jobs immediately when they are finished. You can change this behaviour with the flag `--slurm-keep-successful-logs`. A log file for a failed job will be preserved per default for 10 days. You may change this value using the `--slurm-delete-logfiles-older-than` flag.

The default location of Snakemake log files are relative to the directory where the workflow is started or relative to the directory indicated with `--directory`. SLURM logs, produced by Snakemake, can be redirected using `--slurm-logdir`. If you want avoid that log files accumulate in different directories, you can store them in your home directory. Best put the parameter in your profile then, e.g.:

```YAML
slurm-logdir: "/home/<username>/.snakemake/slurm_logs"
```

### Retries - Or Trying again when a Job failed

Some cluster jobs may fail. In this case Snakemake can be instructed to try another submit before the entire workflow fails, in this example up to 3 times:
Expand Down
112 changes: 95 additions & 17 deletions snakemake_executor_plugin_slurm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
__email__ = "[email protected]"
__license__ = "MIT"

import atexit
import csv
from io import StringIO
import os
from pathlib import Path
import re
import shlex
import subprocess
Expand All @@ -26,30 +28,59 @@
from snakemake_interface_common.exceptions import WorkflowError
from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task

from .utils import delete_slurm_environment
from .utils import delete_slurm_environment, delete_empty_dirs


@dataclass
class ExecutorSettings(ExecutorSettingsBase):
logdir: Optional[Path] = field(
default=None,
metadata={
"help": "Per default the SLURM log directory is relative to "
"the working directory."
"This flag allows to set an alternative directory.",
"env_var": False,
"required": False,
},
)
keep_successful_logs: bool = field(
default=False,
metadata={
"help": "Per default SLURM log files will be deleted upon sucessful "
"completion of a job. Whenever a SLURM job fails, its log "
"file will be preserved. "
"This flag allows to keep all SLURM log files, even those "
"of successful jobs.",
"env_var": False,
"required": False,
},
)
delete_logfiles_older_than: Optional[int] = field(
default=10,
metadata={
"help": "Per default SLURM log files in the SLURM log directory "
"of a workflow will be deleted after 10 days. For this, "
"best leave the default log directory unaltered. "
"Setting this flag allows to change this behaviour. "
"If set to <=0, no old files will be deleted. ",
},
)
init_seconds_before_status_checks: Optional[int] = field(
default=40,
metadata={
"help": """
Defines the time in seconds before the first status
check is performed after job submission.
""",
"help": "Defines the time in seconds before the first status "
"check is performed after job submission.",
"env_var": False,
"required": False,
},
)
requeue: bool = field(
default=False,
metadata={
"help": """
Allow requeuing preempted of failed jobs,
if no cluster default. Results in `sbatch ... --requeue ...`
This flag has no effect, if not set.
""",
"help": "Allow requeuing preempted of failed jobs, "
"if no cluster default. Results in "
"`sbatch ... --requeue ...` "
"This flag has no effect, if not set.",
"env_var": False,
"required": False,
},
Expand Down Expand Up @@ -91,6 +122,32 @@ def __post_init__(self):
self._fallback_account_arg = None
self._fallback_partition = None
self._preemption_warning = False # no preemption warning has been issued
self.slurm_logdir = None
atexit.register(self.clean_old_logs)

def clean_old_logs(self) -> None:
"""Delete files older than specified age from the SLURM log directory."""
# shorthands:
age_cutoff = self.workflow.executor_settings.delete_logfiles_older_than
keep_all = self.workflow.executor_settings.keep_successful_logs
if age_cutoff <= 0 or keep_all:
return
cutoff_secs = age_cutoff * 86400
current_time = time.time()
self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s)")
for path in self.slurm_logdir.rglob("*.log"):
if path.is_file():
try:
file_age = current_time - path.stat().st_mtime
if file_age > cutoff_secs:
path.unlink()
except (OSError, FileNotFoundError) as e:
self.logger.warning(f"Could not delete logfile {path}: {e}")
# we need a 2nd iteration to remove putatively empty directories
try:
delete_empty_dirs(self.slurm_logdir)
except (OSError, FileNotFoundError) as e:
self.logger.warning(f"Could not delete empty directory {path}: {e}")

def warn_on_jobcontext(self, done=None):
if not done:
Expand Down Expand Up @@ -123,18 +180,22 @@ def run_job(self, job: JobExecutorInterface):
except AttributeError:
wildcard_str = ""

slurm_logfile = os.path.abspath(
f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}/%j.log"
self.slurm_logdir = (
Path(self.workflow.executor_settings.logdir)
if self.workflow.executor_settings.logdir
else Path(".snakemake/slurm_logs").resolve()
)
logdir = os.path.dirname(slurm_logfile)

self.slurm_logdir.mkdir(parents=True, exist_ok=True)
slurm_logfile = self.slurm_logdir / group_or_rule / wildcard_str / "%j.log"
slurm_logfile.parent.mkdir(parents=True, exist_ok=True)
# this behavior has been fixed in slurm 23.02, but there might be plenty of
# older versions around, hence we should rather be conservative here.
assert "%j" not in logdir, (
assert "%j" not in str(self.slurm_logdir), (
"bug: jobid placeholder in parent dir of logfile. This does not work as "
"we have to create that dir before submission in order to make sbatch "
"happy. Otherwise we get silent fails without logfiles being created."
)
os.makedirs(logdir, exist_ok=True)

# generic part of a submission string:
# we use a run_uuid as the job-name, to allow `--name`-based
Expand Down Expand Up @@ -247,7 +308,9 @@ def run_job(self, job: JobExecutorInterface):
slurm_jobid = out.strip().split(";")[0]
if not slurm_jobid:
raise WorkflowError("Failed to retrieve SLURM job ID from sbatch output.")
slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
slurm_logfile = slurm_logfile.with_name(
slurm_logfile.name.replace("%j", slurm_jobid)
)
self.logger.info(
f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} "
f"(log: {slurm_logfile})."
Expand Down Expand Up @@ -380,6 +443,19 @@ async def check_active_jobs(
self.report_job_success(j)
any_finished = True
active_jobs_seen_by_sacct.remove(j.external_jobid)
if not self.workflow.executor_settings.keep_successful_logs:
self.logger.debug(
"removing log for successful job "
f"with SLURM ID '{j.external_jobid}'"
)
try:
if j.aux["slurm_logfile"].exists():
j.aux["slurm_logfile"].unlink()
except (OSError, FileNotFoundError) as e:
self.logger.warning(
"Could not remove log file"
f" {j.aux['slurm_logfile']}: {e}"
)
elif status == "PREEMPTED" and not self._preemption_warning:
self._preemption_warning = True
self.logger.warning(
Expand All @@ -404,7 +480,9 @@ async def check_active_jobs(
# with a new sentence
f"'{status}'. "
)
self.report_job_error(j, msg=msg, aux_logs=[j.aux["slurm_logfile"]])
self.report_job_error(
j, msg=msg, aux_logs=[j.aux["slurm_logfile"]._str]
)
active_jobs_seen_by_sacct.remove(j.external_jobid)
else: # still running?
yield j
Expand Down
26 changes: 26 additions & 0 deletions snakemake_executor_plugin_slurm/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# utility functions for the SLURM executor plugin

import os
from pathlib import Path


def delete_slurm_environment():
Expand All @@ -14,3 +15,28 @@ def delete_slurm_environment():
for var in os.environ:
if var.startswith("SLURM_"):
del os.environ[var]


def delete_empty_dirs(path: Path) -> None:
"""
Function to delete all empty directories in a given path.
This is needed to clean up the working directory after
a job has sucessfully finished. This function is needed because
the shutil.rmtree() function does not delete empty
directories.
"""
if not path.is_dir():
return

# Process subdirectories first (bottom-up)
for child in path.iterdir():
if child.is_dir():
delete_empty_dirs(child)

try:
# Check if directory is now empty after processing children
if not any(path.iterdir()):
path.rmdir()
except (OSError, FileNotFoundError) as e:
# Provide more context in the error message
raise OSError(f"Failed to remove empty directory {path}: {e}") from e
Loading