-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[major] Refactor the Executor interface #548
Changes from 9 commits
97a0920
07c27a7
fc0298c
f3b8616
0b38e5f
41ca2ad
9f61c6e
4e903bc
7c19964
2e09bdc
1fb1859
57c5242
4f4516b
454b336
b89bca7
89240d2
8f01dfa
d9fcc61
28185b2
685c5dc
672196e
5072318
8a37e60
d261621
b7d5a8c
a0bfc9f
abeac75
814c0d2
acf4693
0fe7dfe
87b3380
9fd0ec3
0cf4b6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
from typing import Callable, Optional | ||
|
||
from executorlib.interactive.create import create_executor as _create_executor | ||
from executorlib.interactive.executor import ( | ||
ExecutorWithDependencies as _ExecutorWithDependencies, | ||
) | ||
from executorlib.standalone.inputcheck import ( | ||
check_plot_dependency_graph as _check_plot_dependency_graph, | ||
) | ||
from executorlib.standalone.inputcheck import ( | ||
check_refresh_rate as _check_refresh_rate, | ||
) | ||
|
||
|
||
class LocalExecutor: | ||
""" | ||
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or | ||
preferable the flux framework for distributing python functions within a given resource allocation. In contrast to | ||
the mpi4py.futures.MPIPoolExecutor the executorlib.Executor can be executed in a serial python process and does not | ||
require the python script to be executed with MPI. It is even possible to execute the executorlib.Executor directly | ||
in an interactive Jupyter notebook. | ||
|
||
Args: | ||
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of | ||
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is | ||
recommended, as computers have a limited number of compute cores. | ||
cache_directory (str, optional): The directory to store cache files. Defaults to "cache". | ||
max_cores (int): defines the number cores which can be used in parallel | ||
resource_dict (dict): A dictionary of resources required by the task. With the following keys: | ||
- cores (int): number of MPI cores to be used for each function call | ||
- threads_per_core (int): number of OpenMP threads to be used for each function call | ||
- gpus_per_core (int): number of GPUs per worker - defaults to 0 | ||
- cwd (str/None): current working directory where the parallel python task is executed | ||
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and | ||
SLURM only) - default False | ||
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) | ||
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the | ||
context of an HPC cluster this essential to be able to communicate to an | ||
Executor running on a different compute node within the same allocation. And | ||
in principle any computer should be able to resolve that their own hostname | ||
points to the same address as localhost. Still MacOS >= 12 seems to disable | ||
this look up for security reasons. So on MacOS it is required to set this | ||
option to true | ||
block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource | ||
requirements, executorlib supports block allocation. In this case all resources have | ||
to be defined on the executor, rather than during the submission of the individual | ||
function. | ||
init_function (None): optional function to preset arguments for functions which are submitted later | ||
disable_dependencies (boolean): Disable resolving future objects during the submission. | ||
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. | ||
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For | ||
debugging purposes and to get an overview of the specified dependencies. | ||
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. | ||
|
||
Examples: | ||
``` | ||
>>> import numpy as np | ||
>>> from executorlib.interfaces.local import LocalExecutor | ||
>>> | ||
>>> def calc(i, j, k): | ||
>>> from mpi4py import MPI | ||
>>> size = MPI.COMM_WORLD.Get_size() | ||
>>> rank = MPI.COMM_WORLD.Get_rank() | ||
>>> return np.array([i, j, k]), size, rank | ||
>>> | ||
>>> def init_k(): | ||
>>> return {"k": 3} | ||
>>> | ||
>>> with LocalExecutor(cores=2, init_function=init_k) as p: | ||
>>> fs = p.submit(calc, 2, j=4) | ||
>>> print(fs.result()) | ||
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] | ||
``` | ||
""" | ||
|
||
def __init__( | ||
self, | ||
max_workers: Optional[int] = None, | ||
cache_directory: Optional[str] = None, | ||
max_cores: Optional[int] = None, | ||
resource_dict: Optional[dict] = None, | ||
hostname_localhost: Optional[bool] = None, | ||
block_allocation: bool = False, | ||
init_function: Optional[Callable] = None, | ||
disable_dependencies: bool = False, | ||
refresh_rate: float = 0.01, | ||
plot_dependency_graph: bool = False, | ||
plot_dependency_graph_filename: Optional[str] = None, | ||
): | ||
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. | ||
pass | ||
|
||
def __new__( | ||
cls, | ||
max_workers: Optional[int] = None, | ||
cache_directory: Optional[str] = None, | ||
max_cores: Optional[int] = None, | ||
resource_dict: Optional[dict] = None, | ||
hostname_localhost: Optional[bool] = None, | ||
block_allocation: bool = False, | ||
init_function: Optional[Callable] = None, | ||
disable_dependencies: bool = False, | ||
refresh_rate: float = 0.01, | ||
plot_dependency_graph: bool = False, | ||
plot_dependency_graph_filename: Optional[str] = None, | ||
): | ||
""" | ||
Instead of returning a executorlib.Executor object this function returns either a executorlib.mpi.PyMPIExecutor, | ||
executorlib.slurm.PySlurmExecutor or executorlib.flux.PyFluxExecutor depending on which backend is available. The | ||
executorlib.flux.PyFluxExecutor is the preferred choice while the executorlib.mpi.PyMPIExecutor is primarily used | ||
for development and testing. The executorlib.flux.PyFluxExecutor requires flux-core from the flux-framework to be | ||
installed and in addition flux-sched to enable GPU scheduling. Finally, the executorlib.slurm.PySlurmExecutor | ||
requires the SLURM workload manager to be installed on the system. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Update backend references in the docstring. The docstring still references old executor classes ( |
||
|
||
Args: | ||
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the | ||
number of cores which can be used in parallel - just like the max_cores parameter. Using | ||
max_cores is recommended, as computers have a limited number of compute cores. | ||
cache_directory (str, optional): The directory to store cache files. Defaults to "cache". | ||
max_cores (int): defines the number cores which can be used in parallel | ||
resource_dict (dict): A dictionary of resources required by the task. With the following keys: | ||
- cores (int): number of MPI cores to be used for each function call | ||
- threads_per_core (int): number of OpenMP threads to be used for each function call | ||
- gpus_per_core (int): number of GPUs per worker - defaults to 0 | ||
- cwd (str/None): current working directory where the parallel python task is executed | ||
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI | ||
and SLURM only) - default False | ||
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM | ||
only) | ||
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the | ||
context of an HPC cluster this essential to be able to communicate to an | ||
Executor running on a different compute node within the same allocation. And | ||
in principle any computer should be able to resolve that their own hostname | ||
points to the same address as localhost. Still MacOS >= 12 seems to disable | ||
this look up for security reasons. So on MacOS it is required to set this | ||
option to true | ||
block_allocation (boolean): To accelerate the submission of a series of python functions with the same | ||
resource requirements, executorlib supports block allocation. In this case all | ||
resources have to be defined on the executor, rather than during the submission | ||
of the individual function. | ||
init_function (None): optional function to preset arguments for functions which are submitted later | ||
disable_dependencies (boolean): Disable resolving future objects during the submission. | ||
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. | ||
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For | ||
debugging purposes and to get an overview of the specified dependencies. | ||
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. | ||
|
||
""" | ||
default_resource_dict: dict = { | ||
"cores": 1, | ||
"threads_per_core": 1, | ||
"gpus_per_core": 0, | ||
"cwd": None, | ||
"openmpi_oversubscribe": False, | ||
"slurm_cmd_args": [], | ||
} | ||
if resource_dict is None: | ||
resource_dict = {} | ||
resource_dict.update( | ||
{k: v for k, v in default_resource_dict.items() if k not in resource_dict} | ||
) | ||
if not disable_dependencies: | ||
return _ExecutorWithDependencies( | ||
executor=_create_executor( | ||
max_workers=max_workers, | ||
backend="local", | ||
cache_directory=cache_directory, | ||
max_cores=max_cores, | ||
resource_dict=resource_dict, | ||
flux_executor=None, | ||
flux_executor_pmi_mode=None, | ||
flux_executor_nesting=False, | ||
flux_log_files=False, | ||
hostname_localhost=hostname_localhost, | ||
block_allocation=block_allocation, | ||
init_function=init_function, | ||
), | ||
max_cores=max_cores, | ||
refresh_rate=refresh_rate, | ||
plot_dependency_graph=plot_dependency_graph, | ||
plot_dependency_graph_filename=plot_dependency_graph_filename, | ||
) | ||
else: | ||
_check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) | ||
_check_refresh_rate(refresh_rate=refresh_rate) | ||
return _create_executor( | ||
max_workers=max_workers, | ||
backend="local", | ||
cache_directory=cache_directory, | ||
max_cores=max_cores, | ||
resource_dict=resource_dict, | ||
flux_executor=None, | ||
flux_executor_pmi_mode=None, | ||
flux_executor_nesting=False, | ||
flux_log_files=False, | ||
hostname_localhost=hostname_localhost, | ||
block_allocation=block_allocation, | ||
init_function=init_function, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update references to the deprecated
executorlib.Executor
in the docstring.The docstring still mentions "
executorlib.Executor
" instead ofLocalExecutor
, which may confuse users. Consider updating these references to reflect the new class name and usage.📝 Committable suggestion