Skip to content

Commit

Permalink
Merge pull request #509 from ExaWorks/mpirun_tag_output
Browse files Browse the repository at this point in the history
Better extraction of stdout/stderr streams from mpirun
  • Loading branch information
hategan authored Feb 27, 2025
2 parents 0c14103 + 9504745 commit 06120fc
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 6 deletions.
5 changes: 4 additions & 1 deletion src/psij/launchers/script_based_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,7 @@ def is_launcher_failure(self, output: str) -> bool:

def get_launcher_failure_message(self, output: str) -> str:
"""See :func:`~psij.Launcher.get_launcher_failure_message`."""
return '\n'.join(output.split('\n')[:-2])
# If, according to the above, it is a launcher failure, then
# the magic line should not be present (aka, all of the output
# is the failure).
return output
23 changes: 20 additions & 3 deletions src/psij/launchers/scripts/mpi_launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,29 @@ fi

pre_launch

filter_out() {
sed -nE 's/^\[[^]]+\]<stdout>:(.*)/\1/p'
}

filter_err() {
sed -nE 's/^\[[^]]+\]<stderr>:(.*)/\1/p'
}

filter_out_5() {
sed -nE 's/^\[[^]]+\]<stdout>: (.*)/\1/p'
}

filter_err_5() {
sed -nE 's/^\[[^]]+\]<stderr>: (.*)/\1/p'
}

set +e
if [ "$IS_OPENMPI_5" == "1" ]; then
# there is no -q parameter in OMPI 5
mpirun --oversubscribe -n $_PSI_J_PROCESS_COUNT "$@" 1>$_PSI_J_STDOUT 2>$_PSI_J_STDERR <$_PSI_J_STDIN
mpirun --oversubscribe --output TAG -n $_PSI_J_PROCESS_COUNT "$@" \
1> >(filter_out_5 > $_PSI_J_STDOUT) 2> >(filter_err_5 > $_PSI_J_STDERR) <$_PSI_J_STDIN
elif [ "$IS_OPENMPI" == "1" ]; then
mpirun --oversubscribe -q -n $_PSI_J_PROCESS_COUNT "$@" 1>$_PSI_J_STDOUT 2>$_PSI_J_STDERR <$_PSI_J_STDIN
mpirun --oversubscribe --tag-output -q -n $_PSI_J_PROCESS_COUNT "$@" \
1> >(filter_out > "$_PSI_J_STDOUT") 2> >(filter_err > $_PSI_J_STDERR) <$_PSI_J_STDIN
else
mpirun -n $_PSI_J_PROCESS_COUNT "$@" 1>$_PSI_J_STDOUT 2>$_PSI_J_STDERR <$_PSI_J_STDIN
fi
Expand Down
9 changes: 7 additions & 2 deletions tests/_test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ def _read_file(path: Optional[Path]) -> str:
if path is None:
return ''

with open(path, 'r') as f:
return f.read()
try:
with open(path, 'r') as f:
return f.read()
except FileNotFoundError:
return '<missing>'
except Exception as ex:
return f'<error: {ex}>'


def assert_completed(job: Job, status: Optional[JobStatus], attached: bool = False) -> None:
Expand Down
16 changes: 16 additions & 0 deletions tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ def test_simple_job_redirect(execparams: ExecutorTestParams) -> None:
assert contents == '_x_'


def test_stderr_redirect(execparams: ExecutorTestParams) -> None:
_make_test_dir()
with TemporaryDirectory(dir=Path.home() / '.psij' / 'test') as td:
outp = Path(td, 'stderr.txt')
job = Job(JobSpec(executable='/bin/bash', arguments=['-c', 'echo -n _x_ 1>&2'],
stderr_path=outp))
ex = _get_executor_instance(execparams, job)
ex.submit(job)
status = job.wait(timeout=_get_timeout(execparams))
assert_completed(job, status)
f = outp.open("r")
contents = f.read()
f.close()
assert contents == '_x_'


def test_attach(execparams: ExecutorTestParams) -> None:
job1 = Job(JobSpec(executable='/bin/sleep', arguments=['1']))
ex = _get_executor_instance(execparams, job1)
Expand Down

0 comments on commit 06120fc

Please sign in to comment.