Skip to content

Commit

Permalink
Github Action for GlobusComputeExecutor (#3619)
Browse files Browse the repository at this point in the history
* Support for testing GlobusComputeExecutor in a github action
* Adding shared_fs and staging_required tags to tests
* Adding GlobusComputeExecutor test config
  • Loading branch information
yadudoc committed Oct 17, 2024
1 parent 920319a commit a3cad96
Show file tree
Hide file tree
Showing 26 changed files with 191 additions and 15 deletions.
112 changes: 112 additions & 0 deletions .github/workflows/gce_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
name: GlobusComputeExecutor tests

on:
pull_request:
types:
- opened
- synchronize

workflow_dispatch:
inputs:
tags:
description: 'Test scenario tags'
required: false
type: boolean

jobs:
main-test-suite:
strategy:
matrix:
python-version: ["3.11"]
runs-on: ubuntu-20.04
timeout-minutes: 60

steps:
- uses: actions/checkout@master

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Collect Job Information
id: job-info
run: |
echo "Python Version: ${{ matrix.python-version }}" >> ci_job_info.txt
echo "CI Triggering Event: ${{ github.event_name }}" >> ci_job_info.txt
echo "Triggering Git Ref: ${{ github.ref }}" >> ci_job_info.txt
echo "Triggering Git SHA: ${{ github.sha }}" >> ci_job_info.txt
echo "Workflow Run: ${{ github.run_number }}" >> ci_job_info.txt
echo "Workflow Attempt: ${{ github.run_attempt }}" >> ci_job_info.txt
as_ascii="$(echo "${{ github.ref_name }}" | perl -pe "s/[^A-z0-9-]+/-/g; s/^-+|-+\$//g; s/--+/-/g;")"
echo "as-ascii=$as_ascii" >> $GITHUB_OUTPUT
- name: Non-requirements based install
run: |
# libpython3.5: make workqueue binary installer happy
# mpich: required by radical executor
sudo apt-get update -q
sudo apt-get install -qy libpython3.5 mpich
- name: setup virtual env
run: |
make virtualenv
source .venv/bin/activate
- name: make deps clean_coverage
run: |
source .venv/bin/activate
make deps
make clean_coverage
# Installing parsl into venv required for GCendpoint
pip3 install .
# Temporary fix, until changes make it into compute releases
git clone -b configure_tasks_working_dir https://github.com/globus/globus-compute.git
pip3 install globus-compute/compute_sdk globus-compute/compute_endpoint
- name: start globus_compute_endpoint
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source /home/runner/work/parsl/parsl/.venv/bin/activate
globus-compute-endpoint configure default
which globus-compute-endpoint
python3 -c "import globus_compute_sdk; print(globus_compute_sdk.__version__)"
python3 -c "import globus_compute_endpoint; print(globus_compute_endpoint.__version__)"
cat << EOF > /home/runner/.globus_compute/default/config.yaml
engine:
type: ThreadPoolEngine
max_workers: 4
working_dir: /home/runner/.globus_compute/default/tasks_working_dir
EOF
cat /home/runner/.globus_compute/default/config.yaml
mkdir ~/.globus_compute/default/tasks_working_dir
globus-compute-endpoint start default
globus-compute-endpoint list
- name: make test
env:
GLOBUS_COMPUTE_CLIENT_ID: ${{ secrets.GLOBUS_COMPUTE_CLIENT_ID }}
GLOBUS_COMPUTE_CLIENT_SECRET: ${{ secrets.GLOBUS_COMPUTE_SECRET_KEY }}
run: |
source .venv/bin/activate
export GLOBUS_COMPUTE_ENDPOINT=$(globus-compute-endpoint list | grep default | cut -c 3-38)
echo "GLOBUS_COMPUTE_ENDPOINT = $GLOBUS_COMPUTE_ENDPOINT"
# temporary; until test-matrixification
export PARSL_TEST_PRESERVE_NUM_RUNS=7
make gce_test
ln -s .pytest/parsltest-current test_runinfo
- name: Archive runinfo logs
if: ${{ always() }}
uses: actions/upload-artifact@v4
with:
name: runinfo-${{ matrix.python-version }}-${{ steps.job-info.outputs.as-ascii }}-${{ github.sha }}
path: |
runinfo/
.pytest/
ci_job_info.txt
compression-level: 9
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ clean_coverage:
mypy: ## run mypy checks
MYPYPATH=$(CWD)/mypy-stubs mypy parsl/

.PHONY: gce_test
gce_test: ## Run tests with GlobusComputeExecutor
pytest -v -k "not shared_fs and not issue_3620 and not staging_required" --config parsl/tests/configs/globus_compute.py parsl/tests/ --random-order --durations 10

.PHONY: local_thread_test
local_thread_test: ## run all tests with local_thread config
pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/local_threads.py --random-order --durations 10
Expand Down
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Executors
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor
parsl.executors.radical.RadicalPilotExecutor
parsl.executors.globus_compute.GlobusComputeExecutor

Manager Selectors
=================
Expand Down
12 changes: 3 additions & 9 deletions parsl/executors/globus_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@
from concurrent.futures import Future
from typing import Any, Callable, Dict, Optional, Union

import typeguard

from parsl.errors import OptionalModuleMissing
from parsl.executors.base import ParslExecutor
from parsl.utils import RepresentationMixin

UUID_LIKE_T = Union[uuid.UUID, str]



class GlobusComputeExecutor(ParslExecutor, RepresentationMixin):
""" GlobusComputeExecutor enables remote execution on Globus Compute endpoints
Expand All @@ -25,15 +22,14 @@ def __init__(
self,
endpoint_id: Optional[UUID_LIKE_T] = None,
task_group_id: Optional[UUID_LIKE_T] = None,
resource_specification: Optional[dict[str, Any]] = None,
user_endpoint_config: Optional[dict[str, Any]] = None,
resource_specification: Optional[Dict[str, Any]] = None,
user_endpoint_config: Optional[Dict[str, Any]] = None,
label: str = "GlobusComputeExecutor",
batch_size: int = 128,
amqp_port: Optional[int] = None,
**kwargs,
):
):
"""
Parameters
----------
Expand Down Expand Up @@ -141,5 +137,3 @@ def shutdown(self, wait=True, *, cancel_futures=False):
Tasks cannot be cancelled once they are registered.
"""
return self._executor.shutdown()


18 changes: 18 additions & 0 deletions parsl/tests/configs/globus_compute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

from parsl.config import Config
from parsl.executors import GlobusComputeExecutor


def fresh_config():

endpoint_id = os.environ["GLOBUS_COMPUTE_ENDPOINT"]

return Config(
executors=[
GlobusComputeExecutor(
label="globus_compute",
endpoint_id=endpoint_id
)
]
)
12 changes: 12 additions & 0 deletions parsl/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ def pytest_configure(config):
'markers',
'executor_supports_std_stream_tuples: Marks tests that require tuple support for stdout/stderr'
)
config.addinivalue_line(
'markers',
'globus_compute: Marks tests that require a valid globus_compute target'
)
config.addinivalue_line(
'markers',
'shared_fs: Marks tests that require a shared_fs between the workers are the test client'
)
config.addinivalue_line(
'markers',
'issue_3620: Marks tests that do not work correctly on GlobusComputeExecutor (ref: issue 3620)'
)


@pytest.fixture(autouse=True, scope='session')
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_bash_apps/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def foo(x, y, z=10, stdout=None, label=None):
return f"echo {x} {y} {z}"


@pytest.mark.shared_fs
def test_command_format_1(tmpd_cwd):
"""Testing command format for BashApps"""

Expand All @@ -38,6 +39,7 @@ def test_command_format_1(tmpd_cwd):
assert so_content == "1 4 10"


@pytest.mark.shared_fs
def test_auto_log_filename_format(caplog):
"""Testing auto log filename format for BashApps
"""
Expand Down Expand Up @@ -66,6 +68,7 @@ def test_auto_log_filename_format(caplog):
assert record.levelno < logging.ERROR


@pytest.mark.shared_fs
def test_parallel_for(tmpd_cwd, n=3):
"""Testing a simple parallel for loop"""
outdir = tmpd_cwd / "outputs/test_parallel"
Expand Down
4 changes: 4 additions & 0 deletions parsl/tests/test_bash_apps/test_error_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def bad_format(stderr='std.err', stdout='std.out'):
whitelist = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'configs', '*threads*')


@pytest.mark.shared_fs
def test_div_0(test_fn=div_0):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -73,6 +74,7 @@ def test_div_0(test_fn=div_0):
os.remove('std.out')


@pytest.mark.shared_fs
def test_bash_misuse(test_fn=bash_misuse):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -87,6 +89,7 @@ def test_bash_misuse(test_fn=bash_misuse):
os.remove('std.out')


@pytest.mark.shared_fs
def test_command_not_found(test_fn=command_not_found):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand All @@ -103,6 +106,7 @@ def test_command_not_found(test_fn=command_not_found):
return True


@pytest.mark.shared_fs
def test_not_executable(test_fn=not_executable):
err_code = test_matrix[test_fn]['exit_code']
f = test_fn()
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_bash_apps/test_kwarg_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def foo(z=2, stdout=None):
return f"echo {z}"


@pytest.mark.shared_fs
def test_command_format_1(tmpd_cwd):
"""Testing command format for BashApps
"""
Expand Down
8 changes: 2 additions & 6 deletions parsl/tests/test_bash_apps/test_memoize.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ def fail_on_presence(outputs=()):
return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0])


# This test is an oddity that requires a shared-FS and simply
# won't work if there's a staging provider.
# @pytest.mark.sharedFS_required
@pytest.mark.shared_fs
def test_bash_memoization(tmpd_cwd, n=2):
"""Testing bash memoization
"""
Expand All @@ -29,9 +27,7 @@ def fail_on_presence_kw(outputs=(), foo=None):
return 'if [ -f {0} ] ; then exit 1 ; else touch {0}; fi'.format(outputs[0])


# This test is an oddity that requires a shared-FS and simply
# won't work if there's a staging provider.
# @pytest.mark.sharedFS_required
@pytest.mark.shared_fs
def test_bash_memoization_keywords(tmpd_cwd, n=2):
"""Testing bash memoization
"""
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_bash_apps/test_memoize_ignore_args.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

import pytest

import parsl
from parsl.app.app import bash_app

Expand All @@ -21,6 +23,7 @@ def no_checkpoint_stdout_app_ignore_args(stdout=None):
return "echo X"


@pytest.mark.shared_fs
def test_memo_stdout(tmpd_cwd):
path_x = tmpd_cwd / "test.memo.stdout.x"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def no_checkpoint_stdout_app(stdout=None):
return "echo X"


@pytest.mark.shared_fs
def test_memo_stdout(tmpd_cwd):
assert const_list_x == const_list_x_arg

Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_bash_apps/test_multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def multiline(inputs=(), outputs=(), stderr=None, stdout=None):
""".format(inputs=inputs, outputs=outputs)


@pytest.mark.shared_fs
def test_multiline(tmpd_cwd):
so, se = tmpd_cwd / "std.out", tmpd_cwd / "std.err"
f = multiline(
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_bash_apps/test_stdout.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def echo_to_streams(msg, stderr=None, stdout=None):
]


@pytest.mark.shared_fs
@pytest.mark.parametrize('spec', speclist, ids=testids)
def test_bad_stdout_specs(spec):
"""Testing bad stdout spec cases"""
Expand Down Expand Up @@ -91,6 +92,7 @@ def test_bad_stderr_file():


@pytest.mark.executor_supports_std_stream_tuples
@pytest.mark.shared_fs
def test_stdout_truncate(tmpd_cwd, caplog):
"""Testing truncation of prior content of stdout"""

Expand All @@ -110,6 +112,7 @@ def test_stdout_truncate(tmpd_cwd, caplog):
assert record.levelno < logging.ERROR


@pytest.mark.shared_fs
def test_stdout_append(tmpd_cwd, caplog):
"""Testing appending to prior content of stdout (default open() mode)"""

Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_docs/test_from_slides.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

import pytest

from parsl.app.app import bash_app, python_app
from parsl.data_provider.files import File

Expand All @@ -15,6 +17,7 @@ def cat(inputs=[]):
return f.readlines()


@pytest.mark.staging_required
def test_slides():
"""Testing code snippet from slides """

Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_docs/test_kwargs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Functions used to explain kwargs"""
from pathlib import Path

import pytest

from parsl import File, python_app


Expand All @@ -19,6 +21,7 @@ def reduce_app(inputs=()):
assert reduce_future.result() == 6


@pytest.mark.shared_fs
def test_outputs(tmpd_cwd):
@python_app()
def write_app(message, outputs=()):
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_docs/test_workflow1.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def save(message, outputs=[]):
return 'echo {m} &> {o}'.format(m=message, o=outputs[0])


@pytest.mark.shared_fs
@pytest.mark.staging_required
def test_procedural(N=2):
"""Procedural workflow example from docs on
Expand Down
Loading

0 comments on commit a3cad96

Please sign in to comment.