Skip to content

Commit

Permalink
Improved efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
xicko7 committed Jun 13, 2023
1 parent 93beabd commit b422385
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 168 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ Makefile
config.yml
address.var
get_times.sh
exp_aux.sh
22 changes: 12 additions & 10 deletions Launcher.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,39 @@ echo -e "Running in $MAIN_DIR\n"
pdirun make -B simulation

# MPI VALUES
PARALLELISM1=8 # MPI nodes axis x
PARALLELISM2=8 # MPI nodes axis y
PARALLELISM1=4 # MPI nodes axis x
PARALLELISM2=4 # MPI nodes axis y
MPI_PER_NODE=16 # MPI processes per simulation node

# DATASIZE
DATASIZE1=$((4000*$PARALLELISM1)) # Number of elements axis x
DATASIZE2=$((4000*$PARALLELISM2)) # Number of elements axis y

# STEPS
GENERATION=250 # Number of iterations on the simulation
# STEPS
GENERATION=10 # Number of iterations on the simulation

# ANALYTICS HARDWARE
WORKER_NODES=$(($PARALLELISM1*$PARALLELISM2/16)) # DEISA uses (MPI_PROCESSES/4) worker nodes with 48 threads each one
CPUS_PER_WORKER=32 # 24 # Parallelism on each worker
WORKER_NODES=4 # DEISA uses (MPI_PROCESSES/4) worker nodes with 48 threads each one
CPUS_PER_WORKER=40 # 24 # Parallelism on each worker

# AUXILIAR VALUES
SIMUNODES=$(($PARALLELISM2 * $PARALLELISM1 / $MPI_PER_NODE)) # NUMBER OF SIMULATION NODES
NNODES=$(($WORKER_NODES + $SIMUNODES + 1)) # WORKERS + HEAD + SIMULATION (CLIENT WILL BE WITHIN THE HEAD NODE)
NPROC=$(($PARALLELISM2 * $PARALLELISM1 + $NNODES + 1)) # NUMBER OF DEPLOYED TASKS (MPI + ALL RAY INSTANCES + CLIENT)
MPI_TASKS=$(($PARALLELISM2 * $PARALLELISM1)) # NUMBER OF DEPLOYED TASKS (MPI + ALL RAY INSTANCES + CLIENT)
GLOBAL_SIZE=$(($DATASIZE1 * $DATASIZE2 * 8 / 1000000)) # NUMBER OF DEPLOYED TASKS (MPI + ALL RAY INSTANCES + CLIENT)
LOCAL_SIZE=$(($GLOBAL_SIZE / $MPI_TASKS)) # NUMBER OF DEPLOYED TASKS (MPI + ALL RAY INSTANCES + CLIENT)

# MANAGING FILES
date=$(date +%Y-%m-%d_%X)
OUTPUT=outputs/$date
OUTPUT=outputs/$date\_P$MPI_TASKS\_SN$SIMUNODES\_LS$LOCAL_SIZE\_GS$GLOBAL_SIZE\_I$GENERATION\_AN$WORKER_NODES
`which python` prescript.py $DATASIZE1 $DATASIZE2 $PARALLELISM1 $PARALLELISM2 $GENERATION $WORKER_NODES $MPI_PER_NODE $CPUS_PER_WORKER $WORKER_THREADING # Create config.yml
mkdir -p $OUTPUT
mkdir logs 2>/dev/null
touch logs/jobs.log
cp *.yml client.py reisa.py simulation Script.sh $OUTPUT
cp *.yml *.py simulation Script.sh $OUTPUT

# RUNNING
cd $OUTPUT
echo $1 > comment.txt
echo -e "Executing $(sbatch --parsable -N $NNODES --partition cpu_short --ntasks=$NPROC Script.sh $SIMUNODES $MPI_PER_NODE $CPUS_PER_WORKER) in $OUTPUT" >> $MAIN_DIR/logs/jobs.log
echo -e "Executing $(sbatch --parsable -N $NNODES --partition cpu_short --ntasks=$NPROC Script.sh $SIMUNODES $MPI_PER_NODE $CPUS_PER_WORKER) in $PWD " >> $MAIN_DIR/logs/jobs.log
cd $MAIN_DIR
36 changes: 21 additions & 15 deletions Script.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
#SBATCH --time=00:45:00
#SBATCH --time=01:00:00
#SBATCH -o reisa.log
#SBATCH --error reisa.log
#SBATCH --mem-per-cpu=4G
Expand All @@ -14,10 +14,10 @@ unset RAY_ADDRESS;
export RAY_record_ref_creation_sites=0
export RAY_SCHEDULER_EVENTS=0
export OMP_NUM_THREADS=1 # To prevent errors
export RAY_PROFILING=1
export RAY_task_events_report_interval_ms=200
export RAY_memory_monitor_refresh_ms=250
export RAY_memory_usage_threshold=0.85
export RAY_PROFILING=0
# export RAY_task_events_report_interval_ms=1000
export RAY_memory_monitor_refresh_ms=500
export RAY_memory_usage_threshold=0.99
export RAY_verbose_spill_logs=0
export REISA_DIR=$PWD

Expand All @@ -28,7 +28,7 @@ MPI_PER_NODE=$2
CPUS_PER_WORKER=$3
NUM_SIM_NODES=$1
WORKER_NUM=$(($SLURM_JOB_NUM_NODES - 1 - $NUM_SIM_NODES))
IN_SITU_RESOURCES=8
IN_SITU_RESOURCES=0

# GET ALLOCATED NODES
NODES=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
Expand All @@ -42,16 +42,16 @@ echo -e "Initing Ray (1 head node + $WORKER_NUM worker nodes + $NUM_SIM_NODES si

# GET HEAD NODE INFO
head_node=${NODES_ARRAY[0]}
head_node_ip=$(srun -N 1 -n 1 --relative=0 echo $(ip -f inet addr show ib0 | sed -En -e 's/.*inet ([0-9.]+).*/\1/p') &)
ulimit -u 16384
head_node_ip=$(srun -N 1 -n 1 --relative=0 echo $(ip -f inet addr show ib0 | sed -En -e 's/.*inet ([0-9.]+).*/\1/p') &)
ulimit -n 16384 >> reisa.log
port=6379
echo -e "Head node: $head_node_ip:$port\n"
export RAY_ADDRESS=$head_node_ip:$port

# START RAY IN THE HEAD NODE
srun --nodes=1 --ntasks=1 --relative=0 --cpus-per-task=$CPUS_PER_WORKER \
ray start --head --node-ip-address="$head_node_ip" --port=$port --redis-password "$REDIS_PASSWORD" --include-dashboard True\
--num-cpus $CPUS_PER_WORKER --block --resources='{"compute": 0, "head":1}' --system-config='{"local_fs_capacity_threshold":0.999}' 1>/dev/null 2>&1 &
ray start --head --node-ip-address="$head_node_ip" --port=$port --redis-password "$REDIS_PASSWORD" --include-dashboard False\
--num-cpus $CPUS_PER_WORKER --block --resources='{"compute": 0, "head":1}' --system-config='{"local_fs_capacity_threshold":0.999}' 1>/dev/null 2>&1 &
echo $RAY_ADDRESS > ../../address.var

cnt=0
Expand All @@ -69,15 +69,16 @@ for ((i = 1; i <= WORKER_NUM; i++)); do
node_i=${NODES_ARRAY[$i]}
srun --nodes=1 --ntasks=1 --relative=$i --cpus-per-task=$CPUS_PER_WORKER --mem=128G \
ray start --address $RAY_ADDRESS --redis-password "$REDIS_PASSWORD" \
--num-cpus $CPUS_PER_WORKER --block --resources="{\"compute\": ${CPUS_PER_WORKER}, \"transit\": 1}" --object-store-memory=$((72*10**9)) 1>/dev/null 2>&1 &
--num-cpus $CPUS_PER_WORKER --block --resources="{\"compute\": ${CPUS_PER_WORKER}, \"transit\": 1}" --object-store-memory=$((96*10**9)) 1>/dev/null 2>&1 &
done


# START RAY IN SIMULATION NODES
for ((; i < $SLURM_JOB_NUM_NODES; i++)); do
node_i=${NODES_ARRAY[$i]}
srun --nodes=1 --ntasks=1 --relative=$i --cpus-per-task=$(($MPI_PER_NODE+2)) \
ray start --address $RAY_ADDRESS --block --num-cpus=$((1+$IN_SITU_RESOURCES)) --resources="{\"actor\": 1, \"compute\": ${IN_SITU_RESOURCES}}" --object-store-memory $((64*10**9)) 1>/dev/null 2>&1 &
srun --nodes=1 --ntasks=1 --relative=$i --cpus-per-task=$(($MPI_PER_NODE+2)) --mem=128G \
ray start --address $RAY_ADDRESS --redis-password "$REDIS_PASSWORD" \
--num-cpus=$((1+$IN_SITU_RESOURCES)) --block --resources="{\"actor\": 1, \"compute\": ${IN_SITU_RESOURCES}}" --object-store-memory $((96*10**9)) 1>/dev/null 2>&1 &
done

cnt=0
Expand All @@ -93,15 +94,18 @@ end=$(date +%s%N)

# LAUNCH THE CLIENT WITHIN THE HEAD NODE
srun --oversubscribe --overcommit --nodes=1 --ntasks=1 --relative=0 -c 1\
`which python` client.py --ray-timeline &
`which python` reduction.py &
# `which python` derivative.py &
# `which python` client.py --ray-timeline &
client=$!

ray status --address=$RAY_ADDRESS
# ray status --address=$RAY_ADDRESS

# LAUNCH THE SIMULATION
pdirun srun --oversubscribe --overcommit -N $NUM_SIM_NODES --ntasks-per-node=$MPI_PER_NODE\
-n $MPI_TASKS --nodelist=$SIM_NODE_LIST --cpus-per-task=1\
./simulation $SLURM_JOB_ID &
sim=$!

# PRINT CLUSTER DEPLOYING TIME
elapsed=$((end-start))
Expand All @@ -111,4 +115,6 @@ printf "\n%-21s%s\n" "RAY_DEPLOY_TIME:" "$elapsed"

# WAIT FOR THE RESULTS
wait $client
wait $sim

echo -e "\nSlurm job finished at $(date +%d/%m/%Y_%X)"
38 changes: 15 additions & 23 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,37 @@

# The user can decide which task is executed on each level of the following tree
'''
p0 p1 p2 p3 p0 p1 p2 p3 # One task per process per iteration
\ / \ / \ / \ /
\/ \/ \/ \/
iteration 0 iteration 1 # One task per iteration (typically gathering the results of all the actors in that iteration)
\ /
\ /
\ /
result # Array with iteration-level results
[p0 p1 p2 p3]-->[p0 p1 p2 p3] # One task per process per iteration (we can get previous iterations data)
\ / \ / \ / \ /
\/ \/ \/ \/
iteration 0--> iteration 1 --> [...] --> result
# One task per iteration (typically gathering the results of all the actors in that iteration)
# We can get the result of the previous iterations
'''

# Get infiniband address
address = os.environ.get("RAY_ADDRESS").split(":")[0]
# Starting reisa (mandatory)
handler = Reisa("config.yml", address)
max = handler.iterations

# Process-level analytics code
def process_func(rank: int, i: int, queue):
# "queue" will be the available data for "rank" process since the simulation has started (from iteration 0 to "i")
# We must take into acount that some data will be erased to free memory
c0 = 2. / 3.
dx = 1
data = queue[-5:] # Using square brackets to select the data
F = np.array(data)
dFdx = np.average(c0 / dx * (F[3: - 1] - F[1: - 3] - (F[4:] - F[:- 4]) / 8.))
return dFdx
gt = np.array(queue[i])
return np.sum(gt)

# Iteration-level analytics code
def iter_func(i: int, process_results):
res = np.average(process_results[i][:]) # get the data with square brackets
return res
def iter_func(i: int, current_results, previous_iterations):
return np.sum(current_results[:])

# The iterations that will be executed (from 0 to end by default), in this case we will need 4 available timesteps
iterations = [i for i in range(4, handler.iterations)]
iterations = [i for i in range(0, max)]

# Launch the analytics (blocking operation), kept iters paramerter means the number of iterations kept in memory before the current iteration
result = handler.get_result(process_task=process_func, iter_task=iter_func, selected_iters=iterations, kept_iters=5, timeline=True)
result = handler.get_result(process_func, iter_func, selected_iters=iterations, kept_iters=1, timeline=False)

# Write the results
with open("results.log", "a") as f:
f.write("\nResult: "+str(result)+".\n")

f.write("\nResults per iteration: "+str(result)+".\n")

handler.shutdown()
47 changes: 47 additions & 0 deletions derivative.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import time
import numpy as np
from reisa import Reisa # Mandatory import
import os

# The user can decide which task is executed on each level of the following tree
'''
[p0 p1 p2 p3]-->[p0 p1 p2 p3] # One task per process per iteration (we can get previous iterations data)
\ / \ / \ / \ /
\/ \/ \/ \/
iteration 0--> iteration 1 --> [...] --> result
# One task per iteration (typically gathering the results of all the actors in that iteration)
# We can get the result of the previous iterations
'''

# Get infiniband address
address = os.environ.get("RAY_ADDRESS").split(":")[0]
# Starting reisa (mandatory)
handler = Reisa("config.yml", address)
max = handler.iterations

# Process-level analytics code
def process_func(rank: int, i: int, queue):
# "queue" will be the available data for "rank" process since the simulation has started (from iteration 0 to "i")
# We must take into acount that some data will be erased to free memory
gt = np.array(queue[-5:])
c0 = 2. / 3.
return np.average(c0 / 1 * (gt[3] - gt[1] - (gt[4] - gt[0]) / 8.))

# Iteration-level analytics code
def iter_func(i: int, current_results):
return np.average(current_results[:])

def global_func(final_results):
return np.average(final_results[:])

# The iterations that will be executed (from 0 to end by default), in this case we will need 4 available timesteps
iterations = [i for i in range(4, max)]

# Launch the analytics (blocking operation), kept iters paramerter means the number of iterations kept in memory before the current iteration
result = handler.get_result(process_func, iter_func, global_func=global_func, selected_iters=iterations, kept_iters=5, timeline=False)

# Write the results
with open("results.log", "a") as f:
f.write("\nResult: "+str(result)+".\n")

handler.shutdown()
41 changes: 41 additions & 0 deletions reduction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import time
import numpy as np
from reisa import Reisa # Mandatory import
import os

# The user can decide which task is executed on each level of the following tree
'''
[p0 p1 p2 p3]-->[p0 p1 p2 p3] # One task per process per iteration (we can get previous iterations data)
\ / \ / \ / \ /
\/ \/ \/ \/
iteration 0--> iteration 1 --> [...] --> result
# One task per iteration (typically gathering the results of all the actors in that iteration)
# We can get the result of the previous iterations
'''

# Get infiniband address
address = os.environ.get("RAY_ADDRESS").split(":")[0]
# Starting reisa (mandatory)
handler = Reisa("config.yml", address)
max = handler.iterations

# Process-level analytics code
def process_func(rank: int, i: int, queue):
gt = np.array(queue[i])
return np.sum(gt)

# Iteration-level analytics code
def iter_func(i: int, current_results):
return np.sum(current_results[:])

# The iterations that will be executed (from 0 to end by default), in this case we will need 4 available timesteps
iterations = [i for i in range(0, max)]

#Launch analytics (blocking operation), kept iters paramerter means the number of iterations kept in memory before the current iteration
result = handler.get_result(process_func, iter_func, selected_iters=iterations, kept_iters=1, timeline=False)

# Write the results
with open("results.log", "a") as f:
f.write("\nResults per iteration: "+str(result)+".\n")

handler.shutdown()
Loading

0 comments on commit b422385

Please sign in to comment.