Skip to content

Commit

Permalink
- renamed internal psij functions to have a _psij_ prefix in order …
Browse files Browse the repository at this point in the history
…to avoid

clashes with other things that might be defined in the environment.
- use `StageOutFlags` to express when cleanup happens rather than a boolean
flag
- fixed equality operator in `JobSpec`
- added `__str__`, `__eq__` and `__hash__` methods for staging objects
  • Loading branch information
hategan committed Mar 12, 2024
1 parent 30d0d34 commit 2190c87
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 69 deletions.
2 changes: 1 addition & 1 deletion src/psij/executors/batch/cobalt/cobalt.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ PSIJ_NODEFILE="$COBALT_NODEFILE"
export PSIJ_NODEFILE

{{> stagein}}
update_status ACTIVE
_psij_update_status ACTIVE

{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
_PSIJ_JOB_EC=$?
Expand Down
98 changes: 48 additions & 50 deletions src/psij/executors/batch/common/batch_lib.mustache
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
update_status() {
_psij_update_status() {
STATUS="$1"
ADDRS={{psij.us_addrs}}
Expand All @@ -7,55 +7,55 @@ update_status() {
done
}

fail() {
[ "{{psij.debug}}" != "0" ] && update_status "LOG Failing: $2"
_psij_fail() {
[ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Failing: $2"
echo $2
exit $1
}

check_remote() {
_psij_check_remote() {
SCHEME="$1"
HOSTPORT="$2"
if [ "$SCHEME" != "" ] && [ "$SCHEME" != "file" ]; then
fail 121 "$SCHEME staging is not supported"
_psij_fail 121 "$SCHEME staging is not supported"
fi
if [ "$HOSTPORT" != "" ] && [ "$HOSTPORT" != "localhost" ]; then
fail 121 "The host, if specified, must be \"localhost\". Got \"$HOSTPORT\"."
_psij_fail 121 "The host, if specified, must be \"localhost\". Got \"$HOSTPORT\"."
fi
}

do_stagein() {
_psij_do_stagein() {
SOURCE="$1"
TARGET="$2"
MODE="$3"
SCHEME="$6"
HOSTPORT="$7"
check_remote "$SCHEME" "$HOSTPORT" || exit $?
_psij_check_remote "$SCHEME" "$HOSTPORT" || exit $?
do_stage "$SOURCE" "$TARGET" "$MODE" 0
_psij_do_stage "$SOURCE" "$TARGET" "$MODE" 0
}

do_stage() {
_psij_do_stage() {
SOURCE="$1"
TARGET="$2"
MODE="$3"
MISSING_OK="$4"
[ "{{psij.debug}}" != "0" ] && update_status "LOG Stage $SOURCE -> $TARGET, mode: $MODE, missingok: $MISSING_OK"
[ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Stage $SOURCE -> $TARGET, mode: $MODE, missingok: $MISSING_OK"
if [ ! -e "$SOURCE" ]; then
if [ "$MISSING_OK" == "0" ]; then
[ "{{psij.debug}}" != "0" ] && update_status "LOG Missing source file: $SOURCE"
fail 121 "Missing source file: $SOURCE"
[ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Missing source file: $SOURCE"
_psij_fail 121 "Missing source file: $SOURCE"
else
[ "{{psij.debug}}" != "0" ] && update_status "LOG Skipping staging of missing file $SOURCE"
[ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Skipping staging of missing file $SOURCE"
return 0
fi
fi
[ "{{psij.debug}}" != "0" ] && update_status "LOG Staging $SOURCE to $TARGET"
[ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Staging $SOURCE to $TARGET"
TARGET_DIR=`dirname "$TARGET"`
Expand All @@ -64,22 +64,22 @@ do_stage() {
fi
if [ -d "$TARGET" ] && [ ! -d "$SOURCE" ]; then
fail 121 "Target is a directory: $TARGET"
_psij_fail 121 "Target is a directory: $TARGET"
fi
if [ "$MODE" == "1" ]; then
# copy
cp -r -T "$SOURCE" "$TARGET" || fail 121 "Failed to copy \"$SOURCE\" to \"$TARGET\""
cp -r -T "$SOURCE" "$TARGET" || _psij_fail 121 "Failed to copy \"$SOURCE\" to \"$TARGET\""
elif [ "$MODE" == "2" ]; then
# link
{{!we want the same semantics as cp and mv, which is "overwrite if exists"}}
{{!we resolve the source since it may be a path relative to the job dir}}
rm -f "$TARGET"
SOURCE=`readlink -m $SOURCE`
ln -s "$SOURCE" "$TARGET" || fail 121 "Failed to link \"$SOURCE\" to \"$TARGET\""
ln -s "$SOURCE" "$TARGET" || _psij_fail 121 "Failed to link \"$SOURCE\" to \"$TARGET\""
elif [ "$MODE" == "3" ]; then
# move
mv -T -f "$SOURCE" "$TARGET" || fail 121 "Failed to move \"$SOURCE\" to \"$TARGET\""
mv -T -f "$SOURCE" "$TARGET" || _psij_fail 121 "Failed to move \"$SOURCE\" to \"$TARGET\""
fi
}

Expand All @@ -88,7 +88,7 @@ _FLAG_ON_SUCCESS=2
_FLAG_ON_ERROR=4
_FLAG_ON_CANCEL=8

do_stageout() {
_psij_do_stageout() {
SOURCE="$1"
TARGET="$2"
MODE="$3"
Expand All @@ -97,44 +97,42 @@ do_stageout() {
SCHEME="$6"
HOSTPORT="$7"
check_remote "$SCHEME" "$HOSTPORT"
_psij_check_remote "$SCHEME" "$HOSTPORT"
[ "{{psij.debug}}" != "0" ] && update_status "LOG do_stageout $SOURCE -> $TARGET, mode: $MODE, flags: $FLAGS, failed: $FAILED"
[ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG do_stageout $SOURCE -> $TARGET, mode: $MODE, flags: $FLAGS, failed: $FAILED"
if [ "$FAILED" == "0" ] && [ "$((FLAGS & _FLAG_ON_SUCCESS))" != "0" ]; then
do_stage "$SOURCE" "$TARGET" "$MODE" $((FLAGS & _FLAG_IF_PRESENT))
elif [ "$FAILED" != "0" ] && [ "$((FLAGS & _FLAG_ON_ERROR))" != "0" ]; then
do_stage "$SOURCE" "$TARGET" "$MODE" $((FLAGS & _FLAG_IF_PRESENT))
if [ "$FAILED" == "0" ] && [ "$((FLAGS & _FLAG_ON_SUCCESS))" == "0" ]; then
return 0
fi
if [ "$FAILED" != "0" ] && [ "$((FLAGS & _FLAG_ON_ERROR))" == "0" ]; then
return 0
fi
_psij_do_stage "$SOURCE" "$TARGET" "$MODE" $((FLAGS & _FLAG_IF_PRESENT))
}

do_cleanup() {
_psij_do_cleanup() {
TARGET="$1"
FAILED="$2"
if [ "$FAILED" == "0" ] || [ "{{job.spec.cleanup_on_failure}}" != "0" ]; then
TARGET=`readlink -m "$TARGET"`
DIR=`readlink -m "{{job.spec.directory}}"`
[ "{{psij.debug}}" != "0" ] && update_status "LOG Cleaning up $TARGET"
FLAGS="$2"
FAILED="$3"
case "$TARGET" in
"$DIR"*)
rm -rf "$TARGET"
;;
*)
fail 121 "Cannot clean $TARGET outside of job directory $DIR"
;;
esac
if [ "$FAILED" == "0" ] && [ "$((FLAGS & _FLAG_ON_SUCCESS))" == "0" ]; then
return 0
fi
}
if [ "$FAILED" != "0" ] && [ "$((FLAGS & _FLAG_ON_ERROR))" == "0" ]; then
return 0
fi
TARGET=`readlink -m "$TARGET"`
DIR=`readlink -m "{{job.spec.directory}}"`
stagein() {
update_status STAGE_IN
[ "{{psij.debug}}" != "0" ] && _psij_update_status "LOG Cleaning up $TARGET"
{{#job.spec.stage_in}}
do_stagein "{{source.path}}" "{{target}}" {{mode}} \
"{{{source.scheme}}}" "{{#source.hostname}}{{{.}}}{{#source.port}}:{{{.}}}{{/source.port}}{{/source.hostname}}"
{{/job.spec.stage_in}}
case "$TARGET" in
"$DIR"*)
rm -rf "$TARGET"
;;
*)
_psij_fail 121 "Cannot clean $TARGET outside of job directory $DIR"
;;
esac
}
4 changes: 2 additions & 2 deletions src/psij/executors/batch/common/cleanup.mustache
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
update_status CLEANUP
_psij_update_status CLEANUP

{{#job.spec.cleanup}}
do_cleanup {{.}} $_PSIJ_JOB_EC
_psij_do_cleanup {{.}} {{job.spec.cleanup_flags}} $_PSIJ_JOB_EC
{{/job.spec.cleanup}}
4 changes: 2 additions & 2 deletions src/psij/executors/batch/common/stagein.mustache
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
update_status STAGE_IN
_psij_update_status STAGE_IN
{{#job.spec.stage_in}}
do_stagein "{{source.path}}" "{{target}}" {{mode}} \
_psij_do_stagein "{{source.path}}" "{{target}}" {{mode}} \
"{{{source.scheme}}}" "{{#source.hostname}}{{{.}}}{{#source.port}}:{{{.}}}{{/source.port}}{{/source.hostname}}"
{{/job.spec.stage_in}}
4 changes: 2 additions & 2 deletions src/psij/executors/batch/common/stageout.mustache
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
update_status STAGE_OUT
_psij_update_status STAGE_OUT
{{#job.spec.stage_out}}
do_stageout "{{source}}" "{{target.path}}" {{mode}} {{flags}} $_PSIJ_JOB_EC \
_psij_do_stageout "{{source}}" "{{target.path}}" {{mode}} {{flags}} $_PSIJ_JOB_EC \
"{{{target.scheme}}}" "{{#target.hostname}}{{{.}}}{{#target.port}}:{{{.}}}{{/target.port}}{{/target.hostname}}"
{{/job.spec.stage_out}}
2 changes: 1 addition & 1 deletion src/psij/executors/batch/lsf/lsf.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ PSIJ_NODEFILE="$LSB_HOSTS"
export PSIJ_NODEFILE

{{> stagein}}
update_status ACTIVE
_psij_update_status ACTIVE

{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
_PSIJ_JOB_EC=$?
Expand Down
2 changes: 1 addition & 1 deletion src/psij/executors/batch/pbs/pbs_classic.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ cd "{{.}}"
{{/job.spec.directory}}

{{> stagein}}
update_status ACTIVE
_psij_update_status ACTIVE

{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
_PSIJ_JOB_EC=$?
Expand Down
2 changes: 1 addition & 1 deletion src/psij/executors/batch/pbs/pbspro.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ cd "{{.}}"
{{/job.spec.directory}}

{{> stagein}}
update_status ACTIVE
_psij_update_status ACTIVE

{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
_PSIJ_JOB_EC=$?
Expand Down
2 changes: 1 addition & 1 deletion src/psij/executors/batch/slurm/slurm.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fi
export PSIJ_NODEFILE

{{> stagein}}
update_status ACTIVE
_psij_update_status ACTIVE

{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
_PSIJ_JOB_EC=$?
Expand Down
2 changes: 1 addition & 1 deletion src/psij/executors/local/local.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cd "{{.}}"
{{/job.spec.directory}}

{{> stagein}}
update_status ACTIVE
_psij_update_status ACTIVE

set +e
{{#job.spec.inherit_environment}}env \{{/job.spec.inherit_environment}}{{^job.spec.inherit_environment}}env --ignore-environment \{{/job.spec.inherit_environment}}{{#env}}
Expand Down
13 changes: 9 additions & 4 deletions src/psij/job_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import psij.resource_spec
import psij.job_attributes
from psij.staging import StageIn, StageOut
from psij.staging import StageIn, StageOut, StageOutFlags


def _to_path(arg: Union[str, pathlib.Path, None]) -> Optional[pathlib.Path]:
Expand Down Expand Up @@ -70,7 +70,7 @@ def __init__(self, executable: Optional[str] = None, arguments: Optional[List[st
stage_in: Optional[Set[StageIn]] = None,
stage_out: Optional[Set[StageOut]] = None,
cleanup: Optional[Set[Union[str, pathlib.Path]]] = None,
cleanup_on_failure: bool = True):
cleanup_flags: StageOutFlags = StageOutFlags.ALWAYS):
"""
:param executable: An executable, such as "/bin/date".
:param arguments: The argument list to be passed to the executable. Unlike with execve(),
Expand Down Expand Up @@ -107,6 +107,10 @@ def __init__(self, executable: Optional[str] = None, arguments: Optional[List[st
:param stage_in: Specifies a set of files to be staged in before the job is launched.
:param stage_out: Specifies a set of files to be staged out after the job terminates.
:param cleanup: Specifies a set of files to remove after the stage out process.
:param cleanup_flags: Specifies the conditions under which the files in `cleanup` should
be removed, such as when the job completes successfully. The flag
`StageOutFlags.IF_PRESENT` is ignored and no error condition is triggered if a file
specified by the `cleanup` argument is not present.
All constructor parameters are accessible as properties.
Expand Down Expand Up @@ -171,7 +175,7 @@ def __init__(self, executable: Optional[str] = None, arguments: Optional[List[st
self.stage_in = stage_in
self.stage_out = stage_out
self._cleanup = _all_to_path(cleanup)
self.cleanup_on_failure = cleanup_on_failure
self.cleanup_flags = cleanup_flags

# TODO: `resources` is of type `ResourceSpec`, not `ResourceSpecV1`. An
# connector trying to access `job.spec.resources.process_count`
Expand Down Expand Up @@ -284,7 +288,8 @@ def __eq__(self, o: object) -> bool:

for prop_name in ['name', 'executable', 'arguments', 'directory', 'inherit_environment',
'environment', 'stdin_path', 'stdout_path', 'stderr_path', 'resources',
'attributes', 'pre_launch', 'post_launch', 'launcher']:
'attributes', 'pre_launch', 'post_launch', 'launcher', 'stage_in',
'stage_out', 'cleanup', 'cleanup_flags']:
if getattr(self, prop_name) != getattr(o, prop_name):
return False

Expand Down
51 changes: 50 additions & 1 deletion src/psij/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,25 @@ def password(self) -> Optional[str]:
return self.parts.password

Check warning on line 112 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L112

Added line #L112 was not covered by tests

def __str__(self) -> str:
"""Returns a string representation of this URL."""
"""Returns a string representation of this URI."""
return self.parts.geturl()

Check warning on line 116 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L116

Added line #L116 was not covered by tests

def __eq__(self, other: object) -> bool:
"""
Tests if the parameter `other` is equal to this `URI`.
Returns `True` if `other` is a `URI` and if it represents the same
resource as this `URI`.
"""
if isinstance(other, URI):
return self.parts == other.parts

Check warning on line 126 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L125-L126

Added lines #L125 - L126 were not covered by tests
else:
return False

Check warning on line 128 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L128

Added line #L128 was not covered by tests

def __hash__(self) -> int:
"""Computes a hash of this object."""
return hash(self.parts)


class StagingMode(Enum):
"""
Expand Down Expand Up @@ -223,6 +239,22 @@ def __init__(self, source: Union[URI, Path, str], target: Union[str, Path],
self.target = target
self.mode = mode

def __str__(self) -> str:
"""Returns a string representation of this object."""
return 'StageIn[%s -> %s, %s]' % (self.source, self.target, self.mode)

Check warning on line 244 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L244

Added line #L244 was not covered by tests

def __eq__(self, other: object) -> bool:
"""Compares `other` to this object."""
if isinstance(other, StageIn):
return (self.source == other.source and self.target == other.target

Check warning on line 249 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L248-L249

Added lines #L248 - L249 were not covered by tests
and self.mode == other.mode)
else:
return False

Check warning on line 252 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L252

Added line #L252 was not covered by tests

def __hash__(self) -> int:
"""Computes a hash of this object."""
return (hash(self.source) << 16) + (hash(self.target) << 8) + hash(self.mode)


def _normalize_flags(flags: StageOutFlags) -> StageOutFlags:
if (flags & StageOutFlags.ALWAYS).value == 0:
Expand Down Expand Up @@ -285,3 +317,20 @@ def flags(self) -> StageOutFlags:
@flags.setter
def flags(self, flags: StageOutFlags) -> None:
self._flags = _normalize_flags(flags)

def __str__(self) -> str:
"""Returns a string representation of this object."""
return 'StageOut[%s -> %s, %s, %s]' % (self.source, self.target, self.flags, self.mode)

Check warning on line 323 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L323

Added line #L323 was not covered by tests

def __eq__(self, other: object) -> bool:
"""Compares `other` to this object."""
if isinstance(other, StageOut):
return (self.source == other.source and self.target == other.target

Check warning on line 328 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L327-L328

Added lines #L327 - L328 were not covered by tests
and self.mode == other.mode and self.flags == other.flags)
else:
return False

Check warning on line 331 in src/psij/staging.py

View check run for this annotation

Codecov / codecov/patch

src/psij/staging.py#L331

Added line #L331 was not covered by tests

def __hash__(self) -> int:
"""Computes a hash of this object."""
return ((hash(self.source) << 24) + (hash(self.target) << 16) + (hash(self.mode) << 8)
+ hash(self.flags))
2 changes: 1 addition & 1 deletion tests/plugins1/_batch_test/test/test.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ done
export PSIJ_NODEFILE

{{> stagein}}
update_status ACTIVE
_psij_update_status ACTIVE

{{#job.spec.inherit_environment}}env \{{/job.spec.inherit_environment}}{{^job.spec.inherit_environment}}env --ignore-environment \{{/job.spec.inherit_environment}}{{#env}}
{{name}}="{{value}}" \{{/env}}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def test_cleanup2(execparams: ExecutorTestParams) -> None:
StageOut('out.txt', out_path, flags=StageOutFlags.IF_PRESENT),
}
job.spec.cleanup = {Path('out.txt')}
job.spec.cleanup_on_failure = False
job.spec.cleanup_flags = StageOutFlags.ON_SUCCESS
ex = _get_executor_instance(execparams, job)
ex.submit(job)
status = job.wait(timeout=_get_timeout(execparams))
Expand Down

0 comments on commit 2190c87

Please sign in to comment.