diff --git a/docs/gallery/howto/autogen/parallel.py b/docs/gallery/howto/autogen/parallel.py index 97ce88a3..61f46c20 100644 --- a/docs/gallery/howto/autogen/parallel.py +++ b/docs/gallery/howto/autogen/parallel.py @@ -1,148 +1,295 @@ """ -======================= +===================== Run tasks in parallel -======================= +===================== """ # %% # Introduction # ============ -# In this tutorial, you will learn how to run task in parallel. -# -# Load the AiiDA profile. -# - +# In this tutorial, you will learn how to run tasks and WorkGraphs in parallel. +# When defining the dependencies WorkGraph by linking tasks the WorkGraph +# engine will automatically take care of parallelizing the independent tasks. One +# caveat is that we cannot use calcfunctions for this purpose as they all run +# in the same runner environment and therefore are blocking each other. For +# that reason we need to use `CalcJob`s that can be run in different runner +# environments and therefore can be run in parallel. +# Load the AiiDA profile. from aiida import load_profile load_profile() # %% -# First workflow -# ============== -# Suppose we want to calculate ```(x + y) * z ``` in two steps. First, add `x` and `y`, then multiply the result with `z`. And `X` is a list of values. We want to calculate these in parallel. -# -# Create task -# ------------ -# First, one should know that we can not launch a subprocess inside a `task` or a `calcfunction`. We need a create a `WorkGraph` to run tasksin parallel. And then treat this `WorkGraph` as a task. -# +# Parallel addition workflow +# ========================== +# Suppose we want to calculate ```x + y + u + v``` in a parallel, instead of +# computing sequentially ```(((x + y) + u) + v)``` we compute it like +# ```((x + y) + (u + v))``` to compute ```x + y``` and ```u + v``` in parallel. +# aiida-core already provides a ArithmeticAddCalculation CalcJob for performing +# addition which we will use it for this example + +from aiida_workgraph import WorkGraph, task +from aiida.calculations.arithmetic.add import ArithmeticAddCalculation +from aiida.orm import InstalledCode, load_computer, load_code, load_node +from aiida.common.exceptions import NotExistent + +# The ArithmeticAddCalculation needs to know where bash is stored +try: + code = load_code("add@localhost") # The computer label can also be omitted here +except NotExistent: + code = InstalledCode( + computer=load_computer("localhost"), + filepath_executable="/bin/bash", + label="add", + default_calc_job_plugin="core.arithmetic.add", + ).store() + +wg = WorkGraph("parallel") +x, y, u, v = (1, 2, 3, 4) +add_xy = wg.add_task(ArithmeticAddCalculation, name="add_xy", x=x, y=y, code=code) +add_xy.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds +add_uv = wg.add_task(ArithmeticAddCalculation, name="add_uv", x=u, y=v, code=code) +add_uv.set({"metadata.options.sleep": 3}) # the CalcJob will sleep 3 seconds +add_xyuv = wg.add_task( + ArithmeticAddCalculation, + name="add_xyuv", + x=add_xy.outputs["sum"], + y=add_uv.outputs["sum"], + code=code, +) +# %% +# We can verify that the tasks add_xy and add_uv are independent from each other +# and therefore will be run automatically in parallel. +wg.to_html() -from aiida_workgraph import task, WorkGraph -# define multiply task -@task.calcfunction() -def multiply(x, y): - return x * y +# %% +# Running workgraph +wg.submit(wait=True) -# Create a WorkGraph as a task -@task.graph_builder() -def multiply_parallel(X, y): - wg = WorkGraph() - # here the task `multiply` is created and will run in parallel - for key, value in X.items(): - wg.add_task(multiply, name=f"multiply_{key}", x=value, y=y) - return wg +# %% +# We look at the ctime (the time of creation when submitted/run) and the mtime (the time the task has been last modified which is when its state changes to finish). +print("add_xy created at:", add_xy.ctime.time(), "finished at:", add_xy.mtime.time()) +print("add_uv created at:", add_uv.ctime.time(), "finished at:", add_uv.mtime.time()) +# %% +# We can see that both CalcJob's have been created almost at the same time # %% -# Create the workflow -# --------------------- +# Comparison with a calcfunction +# ------------------------------ # -from aiida_workgraph import WorkGraph -from aiida.orm import Int, List -X = {"a": Int(1), "b": Int(2), "c": Int(3)} -y = Int(2) -z = Int(3) -wg = WorkGraph("parallel_tasks") -multiply_parallel1 = wg.add_task(multiply_parallel, name="multiply_parallel1", X=X, y=y) +@task.calcfunction() +def add(x, y, sleep): + import time + + time.sleep(sleep.value) + return x + y + + +wg = WorkGraph("parallel") +x, y, u, v = (1, 2, 3, 4) +add_xy = wg.add_task(add, x=x, y=y, sleep=3) +add_uv = wg.add_task(add, x=x, y=y, sleep=3) +add_xyuv = wg.add_task( + add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0 +) + +wg.to_html() + +# %% wg.submit(wait=True) +# %% +# Printing timings + +print("add_xy created at", add_xy.ctime.time(), "finished at", add_xy.mtime.time()) +print("add_uv created at", add_uv.ctime.time(), "finished at", add_uv.mtime.time()) # %% -# Check the status and results -# ----------------------------- -# +# We can see that the calcfunctions have been run with a 3 seconds delay + + +# %% +# Parallelizing WorkGraphs +# ======================== +# We will parallelize a workgraph by two ways, one time we submit all workgraphs, +# the other time we use the graph builder to submit once the whole workflow. + +# This is our initial WorkGraph we want to parallelize +@task.graph_builder( + inputs=[{"name": "integer"}], outputs=[{"name": "sum", "from": "sum_task.result"}] +) +def add10(integer): + wg = WorkGraph() + code = load_code("add@localhost") # code needs to loaded in the graph builder + add = wg.add_task( + ArithmeticAddCalculation, name="sum_task", x=10, y=integer, code=code + ) + add.set({"metadata.options.sleep": 3}) + return wg -print("State of WorkGraph: {}".format(wg.state)) # %% -# Generate node graph from the AiiDA process: -# -from aiida_workgraph.utils import generate_node_graph +wgs = [] +tasks = [] +for i in range(2): + wg = WorkGraph(f"parallel_wg{i}") + tasks.append(wg.add_task(add10, name="add10", integer=i)) + wgs.append(wg) -generate_node_graph(wg.pk) +# We use wait=False so we can continue submitting +wgs[0].submit() # do not wait (by default), so that we can continue to submit next WG. +wgs[1].submit(wait=True) +# we wait for all the WorkGraphs to finish +wgs[0].wait() # %% -# Second workflow: gather results -# ================================ -# Now I want to gather the results from the previous `multiply_parallel` tasks and calculate the sum of all their results. -# Let's update the `multiply_parallel` function to `multiply_parallel_gather`. -# +# We print the difference between the mtime (the time the WorkGraph has been +# last time changed) and the ctime (the time of creation). Since the +# WorkGraph's status is changed when finished, this give us a good estimate of +# the running time. +print( + "add10 task of WG0 created:", + load_node(tasks[0].pk).ctime.time(), + "finished:", + load_node(tasks[0].pk).mtime.time(), +) +print( + "add10 task of WG1 created:", + load_node(tasks[1].pk).ctime.time(), + "finished:", + load_node(tasks[1].pk).mtime.time(), +) + + +# %% +# Using graph builder +# ------------------- -@task.graph_builder(outputs=[{"name": "result", "from": "context.mul"}]) -def multiply_parallel_gather(X, y): +# This graph_builder runs the add10 over a loop and its +@task.graph_builder() +def parallel_add(nb_iterations): wg = WorkGraph() - for key, value in X.items(): - multiply1 = wg.add_task(multiply, x=value, y=y) - # add result of multiply1 to `self.context.mul` - # self.context.mul is a dict {"a": value1, "b": value2, "c": value3} - multiply1.set_context({"result": f"mul.{key}"}) + for i in range(nb_iterations): + wg.add_task(add10, name=f"add10_{i}", integer=i) return wg -@task.calcfunction() -# the input is dynamic, we must use a variable kewword argument. **datas -def sum(**datas): - from aiida.orm import Float +# Submitting a parallel that adds 10 two times to different numbers +wg = WorkGraph(f"parallel_graph_builder") +parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=2) +wg.to_html() + +# %% +wg.submit(wait=True) + +# %% +parallel_add_wg = WorkGraph.load(parallel_add_task.pk) +add10_0_task = parallel_add_wg.tasks["add10_0"] +add10_1_task = parallel_add_wg.tasks["add10_1"] +print( + "add10_0 task created:", + add10_0_task.ctime.time(), + "finished:", + add10_0_task.mtime.time(), +) +print( + "add10_1 task created:", + add10_1_task.ctime.time(), + "finished:", + add10_1_task.mtime.time(), +) - total = 0 - for key, data in datas.items(): - total += data - return Float(total) +# %% +# We can see that the time is less than 6 seconds which means that the two additions +# were performed in parallel + +# %% +# Increasing number of daemon workers +# ----------------------------------- +# Since each daemon worker can only manage one WorkGraph (handling the results) +# at a time, one can experience slow downs when running many jobs that can be +# run in parallel. The optimal number of workers depends highly on the jobs +# that are run. +from aiida.engine.daemon.client import get_daemon_client # %% -# Now, let's create a `WorkGraph` to use the new task: -# +# We run the 10 iterations with one daemon -from aiida_workgraph import WorkGraph -from aiida.orm import Int, List +client = get_daemon_client() +print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") +wg = WorkGraph("wg_daemon_worker_2") +parallel_add_task = wg.add_task(parallel_add, name="parallel_add", nb_iterations=10) +wg.to_html() -X = {"a": Int(1), "b": Int(2), "c": Int(3)} -y = Int(2) -z = Int(3) -wg = WorkGraph("parallel_tasks") -multiply_parallel_gather1 = wg.add_task(multiply_parallel_gather, X=X, y=y) -sum1 = wg.add_task(sum, name="sum1") -# wg.add_link(add1.outputs[0], multiply_parallel_gather1.inputs["uuids"]) -wg.add_link(multiply_parallel_gather1.outputs[0], sum1.inputs[0]) +# %% wg.submit(wait=True) # %% -# Get the result of the tasks: -# +# And look at the total time and see the overhead costs. + +print( + "Time for running parallelized graph builder", + parallel_add_task.mtime - parallel_add_task.ctime, +) -print("State of WorkGraph: {}".format(wg.state)) -print("Result of task add1: {}".format(wg.tasks["sum1"].outputs["result"].value)) # %% -# Generate node graph from the AiiDA process: -# +# We rerun it now with 2 damon workers +client.increase_workers(1) +print(f"Number of current daemon workers {client.get_numprocesses()['numprocesses']}") +wg = WorkGraph("wg_daemon_worker_2") +parallel_add_task_2 = wg.add_task(parallel_add, name="parallel_add", nb_iterations=10) +wg.to_html() + +# %% +wg.submit(wait=True) + +# %% +print( + "Time for running parallelized graph builder with 2 daemons", + parallel_add_task_2.mtime - parallel_add_task_2.ctime, +) + +# %% +# The time has not change as the handling of the CalcJobs. If one can increase +# the number of iterations to see a more significant difference. -from aiida_workgraph.utils import generate_node_graph -generate_node_graph(wg.pk) +# %% +# Reset back to one worker +client.decrease_workers(1) # %% -# You can see that the outputs of `multiply_parallel_gather` workgraph is linked to the input of the `sum` task. +# Maximum number of active WorkGraphs +# ----------------------------------- +# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time. +# To increase that limit one can set this variable to a higher value. # +# .. code-block:: bash +# +# verdi config set daemon.worker_process_slots 200 +# verdi daemon restart +# +# For more information about improving the performance please refer to the +# `"Tuning performance" section in the official AiiDA documentation `_ + +# %% +# Further reading +# --------------- +# Now you learned how to run tasks in parallel you might want to know how to +# aggregate the results of all these parallel tasks (e.g. taking the mean of +# all computed values). For this you can further read `how to aggregate outputs `_