Skip to content

Commit

Permalink
DaggerMPI: changes to occupancy for execute and copy
Browse files Browse the repository at this point in the history
  • Loading branch information
fda-tome committed Jan 13, 2025
1 parent b4e2611 commit fc5e15f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
1 change: 0 additions & 1 deletion src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ include("utils/scopes.jl")
include("queue.jl")
include("thunk.jl")
include("submission.jl")
include("cancellation.jl")
include("memory-spaces.jl")
include("chunks.jl")

Expand Down
10 changes: 5 additions & 5 deletions src/datadeps.jl
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
get_write_deps!(state, ainfo, task, write_num, copy_to_syncdeps)
@dagdebug nothing :spawn_datadeps "($(repr(spec.f)))[$idx][$dep_mod] $(length(copy_to_syncdeps)) syncdeps"
#@dagdebug nothing :mpi "[$(MPI.Comm_rank(current_acceleration().comm))] Scheduled move from $(arg_local.handle.id) into $(arg_remote.handle.id)\n"
copy_to = Dagger.@spawn scope=copy_to_scope occupancy=Dict(MPIProcessor=>0) syncdeps=copy_to_syncdeps meta=true Dagger.move!(dep_mod, our_space, data_space, arg_remote, arg_local)
copy_to = Dagger.@spawn scope=copy_to_scope occupancy=Dict(Any=>0) syncdeps=copy_to_syncdeps meta=true Dagger.move!(dep_mod, our_space, data_space, arg_remote, arg_local)
add_writer!(state, ainfo, copy_to, write_num)
astate.data_locality[ainfo] = our_space
else
Expand All @@ -772,7 +772,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
copy_to_syncdeps = Set{Any}()
get_write_deps!(state, arg, task, write_num, copy_to_syncdeps)
@dagdebug nothing :spawn_datadeps "($(repr(spec.f)))[$idx] $(length(copy_to_syncdeps)) syncdeps"
copy_to = Dagger.@spawn scope=copy_to_scope occupancy=Dict(MPIProcessor=>0) syncdeps=copy_to_syncdeps Dagger.move!(identity, our_space, data_space, arg_remote, arg_local)
copy_to = Dagger.@spawn scope=copy_to_scope occupancy=Dict(Any=>0) syncdeps=copy_to_syncdeps Dagger.move!(identity, our_space, data_space, arg_remote, arg_local)
add_writer!(state, arg, copy_to, write_num)

astate.data_locality[arg] = our_space
Expand Down Expand Up @@ -824,7 +824,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)

# Launch user's task
task_scope = our_scope
spec.options = merge(spec.options, (;syncdeps, scope=task_scope))
spec.options = merge(spec.options, (;syncdeps, scope=task_scope, occupancy=Dict(Any=>0)))
enqueue!(upper_queue, spec=>task)

# Update read/write tracking for arguments
Expand Down Expand Up @@ -917,7 +917,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
get_write_deps!(state, ainfo, nothing, write_num, copy_from_syncdeps)
@dagdebug nothing :spawn_datadeps "$(length(copy_from_syncdeps)) syncdeps"
#@dagdebug nothing :mpi "[$(MPI.Comm_rank(current_acceleration().comm))] Scheduled move from $(arg_remote.handle.id) into $(arg_local.handle.id)\n"
copy_from = Dagger.@spawn scope=copy_from_scope occupancy=Dict(MPIProcessor=>0) syncdeps=copy_from_syncdeps meta=true Dagger.move!(dep_mod, data_local_space, data_remote_space, arg_local, arg_remote)
copy_from = Dagger.@spawn scope=copy_from_scope occupancy=Dict(Any=>0) syncdeps=copy_from_syncdeps meta=true Dagger.move!(dep_mod, data_local_space, data_remote_space, arg_local, arg_remote)
else
@dagdebug nothing :spawn_datadeps "[$dep_mod] Skipped copy-from (local): $data_remote_space"
end
Expand Down Expand Up @@ -945,7 +945,7 @@ function distribute_tasks!(queue::DataDepsTaskQueue)
copy_from_syncdeps = Set()
get_write_deps!(state, arg, nothing, write_num, copy_from_syncdeps)
@dagdebug nothing :spawn_datadeps "$(length(copy_from_syncdeps)) syncdeps"
copy_from = Dagger.@spawn scope=copy_from_scope occupancy=Dict(MPIProcessor=>0) syncdeps=copy_from_syncdeps meta=true Dagger.move!(identity, data_local_space, data_remote_space, arg_local, arg_remote)
copy_from = Dagger.@spawn scope=copy_from_scope occupancy=Dict(Any=>0) syncdeps=copy_from_syncdeps meta=true Dagger.move!(identity, data_local_space, data_remote_space, arg_local, arg_remote)
else
@dagdebug nothing :spawn_datadeps "Skipped copy-from (local): $data_remote_space"
end
Expand Down
7 changes: 4 additions & 3 deletions src/sch/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ end

function can_use_proc(state, task, gproc, proc, opts, scope)
# Check against proclist
pid = Dagger.root_worker_id(gproc)
if opts.proclist !== nothing
@warn "The `proclist` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
if opts.proclist isa Function
Expand Down Expand Up @@ -355,8 +356,8 @@ function can_use_proc(state, task, gproc, proc, opts, scope)
# Check against single
if opts.single !== nothing
@warn "The `single` option is deprecated, please use scopes instead\nSee https://juliaparallel.org/Dagger.jl/stable/scopes/ for details" maxlog=1
if gproc.pid != opts.single
@dagdebug task :scope "Rejected $proc: gproc.pid ($(gproc.pid)) != single ($(opts.single))"
if pid != opts.single
@dagdebug task :scope "Rejected $proc: pid ($(pid)) != single ($(opts.single))"
return false, scope
end
scope = constrain(scope, Dagger.ProcessScope(opts.single))
Expand Down Expand Up @@ -438,7 +439,7 @@ function populate_processor_cache_list!(state, procs)
# Populate the cache if empty
if state.procs_cache_list[] === nothing
current = nothing
for p in map(x->x.pid, procs)
for p in map(x->Dagger.root_worker_id(x), procs)
for proc in get_processors(OSProc(p))
next = ProcessorCacheEntry(OSProc(p), proc)
if current === nothing
Expand Down

0 comments on commit fc5e15f

Please sign in to comment.