Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
spasmann committed Jan 24, 2024
1 parent b7098b0 commit 7a16c11
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 21 deletions.
330 changes: 310 additions & 20 deletions mcdc/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2961,6 +2961,74 @@ def weight_window(P, mcdc):
# ==============================================================================
# Quasi Monte Carlo
# ==============================================================================
import numba_mpi as mpi
from numba import typeof

@njit
def deconstruct_particles(bank):
# uint64 not supported by numba_mpi

size = len(bank)
numG = len(bank[0]["iqmc"]["w"])
if size == 0:
return [np.zeros(0., dtype=np.float64),
np.zeros(0., dtype=np.float64),
np.zeros(0., dtype=np.float64),
np.zeros(0., dtype=np.float64),
np.zeros(0., dtype=np.float64),
np.zeros(0., dtype=np.float64),
np.zeros(0., dtype=np.float64),
np.zeros(0., dtype=np.float64),
np.zeros((0.,0.), dtype=np.float64),]

x = np.zeros(size, dtype=np.float64)
y = np.zeros(size, dtype=np.float64)
z = np.zeros(size, dtype=np.float64)
ux = np.zeros(size, dtype=np.float64)
uy = np.zeros(size, dtype=np.float64)
uz = np.zeros(size, dtype=np.float64)
w = np.zeros(size, dtype=np.float64)
sensitivity_ID = np.zeros(size, dtype=np.int64)
iqmc_w = np.zeros((size,numG), dtype=np.float64)
# g = np.zeros(size, dtype=np.uint64)
# rng_seed = np.zeros(size, dtype=np.uint64)

i = 0
for particle in bank:
x[i] = particle["x"]
y[i] = particle["y"]
z[i] = particle["z"]
ux[i] = particle["ux"]
uy[i] = particle["uy"]
uz[i] = particle["uz"]
w[i] = particle["w"]
sensitivity_ID[i] = particle["sensitivity_ID"]
# g[i] = particle["g"]
# rng_seed = particle["rng_seed"]
for j in range(numG):
iqmc_w[i,j] = particle["iqmc"]["w"][j]
i += 1

return [x,y,z,ux,uy,uz,w,sensitivity_ID,iqmc_w]


@njit
def reconstruct_particles(numpy_array_list):
size = len(numpy_array_list[0])
bank = np.zeros(size, dtype=type_.particle_record)

for i in range(size):
bank[i]["x"] = numpy_array_list[0][i]
bank[i]["y"] = numpy_array_list[1][i]
bank[i]["z"] = numpy_array_list[2][i]
bank[i]["ux"] = numpy_array_list[3][i]
bank[i]["uy"] = numpy_array_list[4][i]
bank[i]["uz"] = numpy_array_list[5][i]
bank[i]["w"] = numpy_array_list[6][i]
bank[i]["sensitivity_ID"] = numpy_array_list[7][i]
bank[i]["iqmc"]["w"] = numpy_array_list[8][i]

return bank


@njit
Expand Down Expand Up @@ -2990,7 +3058,7 @@ def iqmc_improved_kull(mcdc):
"zn_neigh",
]
requests = []
tot_messages = 0

# for each nieghbor surrounding the domain
for name in neighbors:
bank = mcdc["bank_domain_" + name[:2]]
Expand All @@ -3003,12 +3071,16 @@ def iqmc_improved_kull(mcdc):
end = start + ratio
if i == len(mcdc["technique"][name]) - 1:
end = size
send_bank = np.array(bank["particles"][start:end])

send_bank = bank["particles"][start:end]
numpy_array_list = deconstruct_particles(send_bank)
# print('\n rank ', mcdc["mpi_rank"], "sent message to rank ", mcdc["technique"][name][i] , " of size ", send_bank.shape)
requests.append(
MPI.COMM_WORLD.isend(send_bank, dest=mcdc["technique"][name][i])
)
tot_messages += 1
for data in numpy_array_list:
tag = 0
requests.append(
mpi.isend(data, dest=mcdc["technique"][name][i], tag=tag)
)
tag += 1

# =========================================================================
# Blocking Receive
Expand All @@ -3018,25 +3090,26 @@ def iqmc_improved_kull(mcdc):
# nonblocking prob + while loop.
# source: https://stackoverflow.com/questions/43823458/mpi-iprobe-vs-mpi-probe

# store received data to bankr
bankr = np.zeros(0, dtype=type_.particle_record)
received_messages = 0
while received_messages < tot_messages:
# for each neighbor
for name in neighbors:
# for each processor in neighbor
for source in mcdc["technique"][name]:
# blocking test for a message:
if MPI.COMM_WORLD.iprobe(source=source):
received = MPI.COMM_WORLD.recv(source=source)
# print('\n rank ', mcdc["mpi_rank"], "received message from rank ", source, " of size ", received.shape)
bankr = np.append(bankr, received)
received_messages += 1

# for each neighbor
for name in neighbors:
# for each processor in neighbor
for source in mcdc["technique"][name]:
for data in numpy_array_list:
tag = 0
mpi.recv(data, source=source, tag=tag)
tag += 1
received = reconstruct_particles(numpy_array_list)
print('\n rank ', mcdc["mpi_rank"], "received message from rank ", source, " of size ", received.shape)
bankr = np.append(bankr, received)

# =========================================================================
# Wait for all nonblocking sends and transfer particles to active bank
# and add particles to source bank
# =========================================================================
MPI.Request.waitall(requests)
mpi.waitall(requests)
size = bankr.shape[0]
# Set output buffer
for i in range(size):
Expand All @@ -3056,6 +3129,223 @@ def iqmc_improved_kull(mcdc):
mcdc["bank_domain_zp"]["size"] = 0
mcdc["bank_domain_zn"]["size"] = 0

# @njit
# def iqmc_improved_kull(mcdc):
# """
# implementation that avoids iprobe() but still sends particle bank


# Implementation of the Improved KULL algorithm originally developed for
# IMC codes at LLNL.

# Brunner, T. A., Urbatsch, T. J., Evans, T. M., & Gentile, N. A. (2006).
# Comparison of four parallel algorithms for domain decomposed implicit Monte
# Carlo. Journal of Computational Physics, 212, 527–539.

# """
# # =========================================================================
# # Nonblocking send of particles to neighbor
# # =========================================================================
# buff = np.zeros(
# mcdc["bank_domain_xp"]["particles"].shape[0], dtype=type_.particle_record
# )
# with objmode(size="int64"):
# neighbors = [
# "xp_neigh",
# "xn_neigh",
# "yp_neigh",
# "yn_neigh",
# "zp_neigh",
# "zn_neigh",
# ]
# requests = []
# neighbor_ranks = []
# # for each nieghbor surrounding the domain
# for name in neighbors:
# bank = mcdc["bank_domain_" + name[:2]]
# neighbor_ranks.extend(mcdc["technique"][name])
# # and for each processor in that neighbor
# for i in range(len(mcdc["technique"][name])):
# # send an equal number of particles that crossed that domain
# size = bank["size"]
# ratio = int(size / len(mcdc["technique"][name]))
# start = ratio * i
# end = start + ratio
# if i == len(mcdc["technique"][name]) - 1:
# end = size
# send_bank = np.array(bank["particles"][start:end])
# print("\n Numba typeof ", typeof(send_bank))
# print('\n rank ', mcdc["mpi_rank"], "sent message to rank ", mcdc["technique"][name][i] , " of size ", send_bank.shape)
# # requests.append(
# # MPI.COMM_WORLD.isend(send_bank, dest=mcdc["technique"][name][i])
# # )
# requests.append(
# mpi.isend(send_bank, dest=mcdc["technique"][name][i])
# )

# # =========================================================================
# # Blocking Receive
# # =========================================================================
# # Here I break slightly from the original "improved KULL" algorithim
# # I use a blocking probe but this serves the same purpose as a
# # nonblocking prob + while loop.
# # source: https://stackoverflow.com/questions/43823458/mpi-iprobe-vs-mpi-probe

# # list of messages received (TODO: replace with np.array)
# received_messages = {rank: False for rank in neighbor_ranks}

# # store received data to bankr
# bankr = np.zeros(0, dtype=type_.particle_record)

# # while not any(received_messages[rank] for rank in neighbor_ranks):
# # # for each neighbor
# # for name in neighbors:
# # # for each processor in neighbor
# # for source in mcdc["technique"][name]:
# # # check to see if we have received a message from that rank
# # if not received_messages[source]:
# # request = MPI.COMM_WORLD.irecv(source=source)
# # # requeset = mpi.irecv(source=source)
# # if request.test():
# # data = request.wait()
# # print('\n rank ', mcdc["mpi_rank"], "received message from rank ", source, " of size ", data.shape)
# # # if data is not None:
# # bankr = np.append(bankr, data)
# # received_messages[source] = True

# # for each neighbor
# for name in neighbors:
# # for each processor in neighbor
# for source in mcdc["technique"][name]:
# # check to see if we have received a message from that rank
# # received = MPI.COMM_WORLD.recv(source=source)
# received = mpi.irecv(source=source)
# print('\n rank ', mcdc["mpi_rank"], "received message from rank ", source, " of size ", received.shape)
# bankr = np.append(bankr, received)
# received_messages[source] = True


# # =========================================================================
# # Wait for all nonblocking sends and transfer particles to active bank
# # and add particles to source bank
# # =========================================================================
# MPI.Request.waitall(requests)
# size = bankr.shape[0]
# # Set output buffer
# for i in range(size):
# buff[i] = bankr[i]
# # reset the particle bank to zero
# mcdc["bank_source"]["size"] = 0
# # place received particles in bank_source
# for i in range(size):
# add_particle(buff[i], mcdc["bank_source"])

# # reset domain transfer banks to zero
# # for some reason doing this inside objmode() doesn't work in Numba mode ?
# mcdc["bank_domain_xp"]["size"] = 0
# mcdc["bank_domain_xn"]["size"] = 0
# mcdc["bank_domain_yp"]["size"] = 0
# mcdc["bank_domain_yn"]["size"] = 0
# mcdc["bank_domain_zp"]["size"] = 0
# mcdc["bank_domain_zn"]["size"] = 0


# @njit
# def iqmc_improved_kull(mcdc):
# """
# Original implementation


# Implementation of the Improved KULL algorithm originally developed for
# IMC codes at LLNL.

# Brunner, T. A., Urbatsch, T. J., Evans, T. M., & Gentile, N. A. (2006).
# Comparison of four parallel algorithms for domain decomposed implicit Monte
# Carlo. Journal of Computational Physics, 212, 527–539.

# """
# # =========================================================================
# # Nonblocking send of particles to neighbor
# # =========================================================================
# buff = np.zeros(
# mcdc["bank_domain_xp"]["particles"].shape[0], dtype=type_.particle_record
# )
# with objmode(size="int64"):
# neighbors = [
# "xp_neigh",
# "xn_neigh",
# "yp_neigh",
# "yn_neigh",
# "zp_neigh",
# "zn_neigh",
# ]
# requests = []
# tot_messages = 0
# # for each nieghbor surrounding the domain
# for name in neighbors:
# bank = mcdc["bank_domain_" + name[:2]]
# # and for each processor in that neighbor
# for i in range(len(mcdc["technique"][name])):
# # send an equal number of particles that crossed that domain
# size = bank["size"]
# ratio = int(size / len(mcdc["technique"][name]))
# start = ratio * i
# end = start + ratio
# if i == len(mcdc["technique"][name]) - 1:
# end = size
# send_bank = np.array(bank["particles"][start:end])
# print('\n rank ', mcdc["mpi_rank"], "sent message to rank ", mcdc["technique"][name][i] , " of size ", send_bank.shape)
# requests.append(
# MPI.COMM_WORLD.isend(send_bank, dest=mcdc["technique"][name][i])
# )
# tot_messages += 1

# # =========================================================================
# # Blocking Receive
# # =========================================================================
# # Here I break slightly from the original "improved KULL" algorithim
# # I use a blocking probe but this serves the same purpose as a
# # nonblocking prob + while loop.
# # source: https://stackoverflow.com/questions/43823458/mpi-iprobe-vs-mpi-probe

# bankr = np.zeros(0, dtype=type_.particle_record)
# received_messages = 0
# while received_messages < tot_messages:
# # for each neighbor
# for name in neighbors:
# # for each processor in neighbor
# for source in mcdc["technique"][name]:
# # blocking test for a message:
# if MPI.COMM_WORLD.iprobe(source=source):
# received = MPI.COMM_WORLD.recv(source=source)
# print('\n rank ', mcdc["mpi_rank"], "received message from rank ", source, " of size ", received.shape)
# bankr = np.append(bankr, received)
# received_messages += 1

# # =========================================================================
# # Wait for all nonblocking sends and transfer particles to active bank
# # and add particles to source bank
# # =========================================================================
# MPI.Request.waitall(requests)
# size = bankr.shape[0]
# # Set output buffer
# for i in range(size):
# buff[i] = bankr[i]
# # reset the particle bank to zero
# mcdc["bank_source"]["size"] = 0
# # place received particles in bank_source
# for i in range(size):
# add_particle(buff[i], mcdc["bank_source"])

# # reset domain transfer banks to zero
# # for some reason doing this inside objmode() doesn't work in Numba mode ?
# mcdc["bank_domain_xp"]["size"] = 0
# mcdc["bank_domain_xn"]["size"] = 0
# mcdc["bank_domain_yp"]["size"] = 0
# mcdc["bank_domain_yn"]["size"] = 0
# mcdc["bank_domain_zp"]["size"] = 0
# mcdc["bank_domain_zn"]["size"] = 0


@njit
def iqmc_continuous_weight_reduction(P, distance, mcdc):
Expand Down
2 changes: 1 addition & 1 deletion mcdc/type_.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def make_type_particle_record(iQMC, G):
Ng = 1
if iQMC:
Ng = G
iqmc_struct = [("w", float64, (Ng,)), ("d_id", int32)]
iqmc_struct = [("w", float64, (Ng,))]
struct += [("iqmc", iqmc_struct)]
particle_record = np.dtype(struct)

Expand Down

0 comments on commit 7a16c11

Please sign in to comment.