From 987298f1ea1693c8f8a81b6ff00dab02dbdfbc34 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Sun, 21 Feb 2021 16:26:03 -0500 Subject: [PATCH 01/16] Add initial support for Distributed.jl rpc over UCX AM --- Project.toml | 1 + examples/benchmarks/README.md | 12 + examples/benchmarks/legacy/latency.jl | 69 +++++ examples/distributed.jl | 60 ++++ src/UCX.jl | 2 + src/legacy.jl | 393 ++++++++++++++++++++++++++ 6 files changed, 537 insertions(+) create mode 100644 examples/benchmarks/README.md create mode 100644 examples/benchmarks/legacy/latency.jl create mode 100644 examples/distributed.jl create mode 100644 src/legacy.jl diff --git a/Project.toml b/Project.toml index 8371653..b0feb78 100644 --- a/Project.toml +++ b/Project.toml @@ -6,6 +6,7 @@ version = "0.3.0" CEnum = "fa961155-64e5-5f13-b03f-caf6b980ea82" FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" FunctionWrappers = "069b7b12-0de2-55c6-9aab-29f3d0a68a2e" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" diff --git a/examples/benchmarks/README.md b/examples/benchmarks/README.md new file mode 100644 index 0000000..b40a333 --- /dev/null +++ b/examples/benchmarks/README.md @@ -0,0 +1,12 @@ +# Run the benchmarks + +## Setup +``` +julia --project=examples/benchmarks +pkg> dev . +``` + +## Running +``` +JULIA_PROJECT=(pwd)/examples/benchmarks julia examples/benchmarks/legacy/latency.jl +``` \ No newline at end of file diff --git a/examples/benchmarks/legacy/latency.jl b/examples/benchmarks/legacy/latency.jl new file mode 100644 index 0000000..5a28913 --- /dev/null +++ b/examples/benchmarks/legacy/latency.jl @@ -0,0 +1,69 @@ +using Distributed + +include(joinpath(@__DIR__, "..", "config.jl")) + +addprocs(1) + +@everywhere using UCX +@everywhere UCX.Legacy.wireup() + +@everywhere function target(A) + nothing +end + +const MAX_MESSAGE_SIZE = 1<<22 +# const MAX_MESSAGE_SIZE = 4096 +const LARGE_MESSAGE_SIZE = 8192 + +const LAT_LOOP_SMALL = 10000 +const LAT_SKIP_SMALL = 100 +const LAT_LOOP_LARGE = 1000 +const LAT_SKIP_LARGE = 10 + +function touch_data(send_buf, size) + send_buf[1:size] .= 'A' % UInt8 +end + +function benchmark() + send_buf = Vector{UInt8}(undef, MAX_MESSAGE_SIZE) + t = Table(msg_size = Int[], latency = Float64[], kind=Symbol[]) + + size = 1 + while size <= MAX_MESSAGE_SIZE + @info "sending" size + flush(stderr) + touch_data(send_buf, size) + + if size > LARGE_MESSAGE_SIZE + loop = LAT_LOOP_LARGE + skip = LAT_SKIP_LARGE + else + loop = LAT_LOOP_SMALL + skip = LAT_SKIP_SMALL + end + + t_start = 0 + for i in -skip:loop + if i == 1 + t_start = Base.time_ns() + end + + UCX.Legacy.remotecall_wait(target, 2, view(send_buf, 1:size)) + + end + t_end = Base.time_ns() + + t_delta = t_end-t_start + t_op = t_delta / loop + + push!(t, (msg_size = size, latency = t_op, kind = :distributed)) + + size *= 2 + end + + CSV.write(joinpath(@__DIR__, "latency.csv"), t) +end + +if !isinteractive() + benchmark() +end diff --git a/examples/distributed.jl b/examples/distributed.jl new file mode 100644 index 0000000..dba55e8 --- /dev/null +++ b/examples/distributed.jl @@ -0,0 +1,60 @@ +using Distributed +using Test +using BenchmarkTools + +addprocs(1) + +@everywhere begin + using Pkg + Pkg.activate(@__DIR__) +end + +@everywhere using UCX +@everywhere UCX.Legacy.wireup() + +@test UCX.Legacy.remotecall_fetch(()->true, 2) +@test fetch(UCX.Legacy.remotecall(()->true, 2)) + +# f() = for i in 1:1000 +# UCX.Legacy.remotecall_wait(()->true, 2) +# end + +# g() = for i in 1:1000 +# remotecall_wait(()->true, 2) +# end + +# @profview f() +# @profview g() + +# @benchmark UCX.Legacy.remotecall(()->true, 2) # 2.502 μs +# @benchmark remotecall(()->true, 2) # 11.502 μs + +# data = Array{UInt8}(undef, 8192) +# @benchmark UCX.Legacy.remotecall((x)->true, 2, $data) # 2.767 μs +# @benchmark remotecall((x)->true, 2, $data) # 17.380 μs + +# @benchmark UCX.Legacy.remote_do(()->true, 2) # 1.802 μs +# @benchmark remote_do(()->true, 2) # 10.190 μs + +# @benchmark UCX.Legacy.remotecall_wait(()->true, 2) # 1ms (Timer) 20.320 μs (busy) 42μs (poll_fd) +# @benchmark remotecall_wait(()->true, 2) # 40 μs + +# @benchmark UCX.Legacy.remotecall_fetch(()->true, 2) # 1ms (Timer) 14.560 μs (busy) 31μs (poll_fd) +# @benchmark remotecall_fetch(()->true, 2) # 40 μs + +# # Base line +# @benchmark(wait(@async(nothing))) # 1 μs + + +# @everywhere using Profile, PProf + +# Profile.clear() +# remotecall_wait(Profile.clear, 2) +# remotecall_wait(Profile.start_timer, 2) +# Profile.start_timer() +# @benchmark UCX.Legacy.remotecall_wait(()->true, 2) +# Profile.stop_timer() +# remotecall_wait(Profile.stop_timer, 2) +# remotecall_wait(PProf.pprof, 2, web=false, out="proc2.pb.gz") +# PProf.pprof(web=false, out="proc1.pb.gz") + diff --git a/src/UCX.jl b/src/UCX.jl index 4821323..41501b3 100644 --- a/src/UCX.jl +++ b/src/UCX.jl @@ -993,4 +993,6 @@ function stream_recv(ep::Endpoint, args...) stream_recv(ep.ep, args...) end +include("legacy.jl") + end #module diff --git a/src/legacy.jl b/src/legacy.jl new file mode 100644 index 0000000..27039b1 --- /dev/null +++ b/src/legacy.jl @@ -0,0 +1,393 @@ +module Legacy + +#= +md""" +# The nature of a `remotecall` + +1. Sender: + - remotecall + - `AMHeader` --> Ref (heap allocated) + - `msg` --> serialized into IOBuffer :/ +2. Receiver: + - AMHandler + - ccall to `am_recv_callback` + - dynamic call to `AMHandler.func` :/ -- can we precompute this -- FunctionWrapper.jl + - `AMHandler.func` == `am_remotecall` + Check: `code_typed(UCX.Legacy.am_remotecall, (Ptr{Cvoid}, Csize_t, Ptr{Cvoid}, Csize_t, Ptr{UCX.API.ucp_am_recv_param_t}))` + - `deserialize()::Distributed.CallMsg{:call}` + - handle_msg -> creates closure... :/ + - call to `schedule_call` + - creates closure + task (@async) +""" +=# + +import ..UCX +import Distributed + +struct AMHeader + from::Int + hdr::Distributed.MsgHeader +end + +function handle_msg(msg::Distributed.CallMsg{:call}, header) + Distributed.schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) +end + +function handle_msg(msg::Distributed.CallMsg{:call_fetch}, header) + UCX.@async_showerr begin + v = Distributed.run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false) + if isa(v, Distributed.SyncTake) + try + req = deliver_result(:call_fetch, header.notify_oid, v) + finally + unlock(v.rv.synctake) + end + else + req = deliver_result(:call_fetch, header.notify_oid, v) + end + if @isdefined(req) + wait(req) + end + end +end + +function handle_msg(msg::Distributed.CallWaitMsg, header) + UCX.@async_showerr begin + rv = Distributed.schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) + req = deliver_result(:call_wait, header.notify_oid, fetch(rv.c)) + wait(req) + end +end + +function handle_msg(msg::Distributed.RemoteDoMsg, header) + UCX.@async_showerr begin + Distributed.run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true) + end +end + +function handle_msg(msg::Distributed.ResultMsg, header) + put!(Distributed.lookup_ref(header.response_oid), msg.value) +end + +@inline function am_handler(::Type{Msg}, worker, header, header_length, data, length, _param) where Msg + @assert header_length == sizeof(AMHeader) + phdr = Base.unsafe_convert(Ptr{AMHeader}, header) + am_hdr = Base.unsafe_load(phdr) + + param = Base.unsafe_load(_param)::UCX.API.ucp_am_recv_param_t + if (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) == 0 + if false + ptr = Base.unsafe_convert(Ptr{UInt8}, data) + buf = IOBuffer(Base.unsafe_wrap(Array, ptr, length)) + + msg = lock(proc_to_serializer(am_hdr.from)) do serializer + prev_io = serializer.io + serializer.io = buf + msg = Distributed.deserialize_msg(serializer)::Msg + serializer.io = prev_io + msg + end + + handle_msg(msg, am_hdr.hdr) + return UCX.API.UCS_OK + else + UCX.@async_showerr begin + ptr = Base.unsafe_convert(Ptr{UInt8}, data) + buf = IOBuffer(Base.unsafe_wrap(Array, ptr, length)) + + # We could do this asynchronous + # Would need to return `IN_PROGRESS` and use UCX.am_data_release + msg = lock(proc_to_serializer(am_hdr.from)) do serializer + prev_io = serializer.io + serializer.io = buf + msg = Distributed.deserialize_msg(serializer)::Msg + serializer.io = prev_io + msg + end + UCX.am_data_release(worker, data) + + handle_msg(msg, am_hdr.hdr) + end + return UCX.API.UCS_INPROGRESS + end + else + @assert (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) != 0 + UCX.@async_showerr begin + # Allocate rendezvous buffer + # XXX: Support CuArray etc. + buffer = Array{UInt8}(undef, length) + req = UCX.am_recv(worker, data, buffer, length) + wait(req) + # UCX.am_data_release not necessary due to am_recv + + buf = IOBuffer(buffer) + peer_id = am_hdr.from + msg = lock(proc_to_serializer(am_hdr.from)) do serializer + prev_io = serializer.io + serializer.io = buf + msg = Base.invokelatest(Distributed.deserialize_msg, serializer)::Msg + serializer.io = prev_io + msg + end + + handle_msg(msg, am_hdr.hdr) + end + return UCX.API.UCS_INPROGRESS + end +end + +const AM_REMOTECALL = 1 +function am_remotecall(worker, header, header_length, data, length, param) + am_handler(Distributed.CallMsg{:call}, worker, header, header_length, data, length, param) +end + +const AM_REMOTECALL_FETCH = 2 +function am_remotecall_fetch(worker, header, header_length, data, length, param) + am_handler(Distributed.CallMsg{:call_fetch}, worker, header, header_length, data, length, param) +end + +const AM_REMOTECALL_WAIT = 3 +function am_remotecall_wait(worker, header, header_length, data, length, param) + am_handler(Distributed.CallWaitMsg, worker, header, header_length, data, length, param) +end + +const AM_REMOTE_DO = 4 +function am_remote_do(worker, header, header_length, data, length, param) + am_handler(Distributed.RemoteDoMsg, worker, header, header_length, data, length, param) +end + +const AM_RESULT = 5 +function am_result(worker, header, header_length, data, length, param) + am_handler(Distributed.ResultMsg, worker, header, header_length, data, length, param) +end + +function start() + ctx = UCX.UCXContext() + worker = UCX.UCXWorker(ctx) + + UCX.AMHandler(worker, am_remotecall, AM_REMOTECALL) + UCX.AMHandler(worker, am_remotecall_fetch, AM_REMOTECALL_FETCH) + UCX.AMHandler(worker, am_remotecall_wait, AM_REMOTECALL_WAIT) + UCX.AMHandler(worker, am_remote_do, AM_REMOTE_DO) + UCX.AMHandler(worker, am_result, AM_RESULT) + + global UCX_WORKER = worker + atexit() do + close(worker) + end + + @async begin + while isopen(worker) + wait(worker) + end + close(worker) + end + + addr = UCX.UCXAddress(worker) + GC.@preserve addr begin + ptr = Base.unsafe_convert(Ptr{UInt8}, addr.handle) + addr_buf = Base.unsafe_wrap(Array, ptr, addr.len; own=false) + bind_addr = similar(addr_buf) + copyto!(bind_addr, addr_buf) + end + + return bind_addr +end + +struct UCXSerializer + serializer::Distributed.ClusterSerializer{Base.GenericIOBuffer{Array{UInt8,1}}} + lock::Base.ReentrantLock +end +function Base.lock(f, ucx::UCXSerializer) + lock(ucx.lock) do + f(ucx.serializer) + end +end + +const UCX_PROC_ENDPOINT = Dict{Int, UCX.UCXEndpoint}() +const UCX_ADDR_LISTING = Dict{Int, Vector{UInt8}}() +const UCX_SERIALIZERS = Dict{Int, UCXSerializer}() + +function wireup(procs=Distributed.procs()) + # Ideally we would use FluxRM or PMI and use their + # distributed KVS. + ucx_addr = Dict{Int, Vector{UInt8}}() + @sync for p in procs + @async begin + ucx_addr[p] = Distributed.remotecall_fetch(start, p) + end + end + + @sync for p in procs + @async begin + Distributed.remotecall_wait(p, ucx_addr) do ucx_addr + merge!(UCX_ADDR_LISTING, ucx_addr) + end + end + end +end + +function proc_to_endpoint(p) + get!(UCX_PROC_ENDPOINT, p) do + worker = UCX_WORKER::UCX.UCXWorker + UCX.UCXEndpoint(worker, UCX_ADDR_LISTING[p]) + end +end + +function proc_to_serializer(p) + this = get!(UCX_SERIALIZERS, p) do + cs = Distributed.ClusterSerializer(IOBuffer()) + cs.pid = p + UCXSerializer(cs, Base.ReentrantLock()) + end +end + + +function send_msg(pid, hdr, msg, id) + ep = proc_to_endpoint(pid) + data = lock(proc_to_serializer(pid)) do serializer + Base.invokelatest(Distributed.serialize_msg, serializer, msg) + take!(serializer.io) + end + + header = Ref(hdr) + + req = UCX.am_send(ep, id, header, data) + UCX.fence(ep.worker) # Gurantuee order + req +end + +abstract type UCXRemoteRef <: Distributed.AbstractRemoteRef end + +function Distributed.call_on_owner(f, rr::UCXRemoteRef, args...) + rid = Distributed.remoteref_id(rr) + remotecall_fetch(f, rr.rr.where, rid, args...) +end + +struct UCXFuture <:UCXRemoteRef + rr::Distributed.Future +end +Distributed.remoteref_id(rr::UCXFuture) = Distributed.remoteref_id(rr.rr) + +function Distributed.fetch(ur::UCXFuture) + r = ur.rr + r.v !== nothing && return something(r.v) + v = Distributed.call_on_owner(Distributed.fetch_ref, ur) + r.v = Some(v) + Distributed.send_del_client(r) + v +end + +function Distributed.isready(ur::UCXFuture) + rr = ur.rr + rr.v === nothing || return true + + rid = remoteref_id(rr) + return if rr.where == myid() + isready(Distributed.lookup_ref(rid).c) + else + remotecall_fetch(rid->isready(Distributed.lookup_ref(rid).c), rr.where, rid) + end +end + +function Distributed.wait(ur::UCXFuture) + r = ur.rr + if r.v !== nothing + return ur + else + Distributed.call_on_owner(Distributed.wait_ref, ur, Distributed.myid()) + return ur + end +end + +function Distributed.put!(ur::UCXFuture, v) + rr = ur.rr + rr.v !== nothing && error("Future can be set only once") + call_on_owner(put_future, ur, v, myid()) + rr.v = Some(v) + ur +end + +# struct UCXRemoteChannel{RC<:Distributed.RemoteChannel} <: Distributed.AbstractRemoteRef +# rc::RC +# end +# Distributed.remoteref_id(rr::UCXRemoteChannel) = Distributed.remoteref_id(rr.rc) +# Base.eltype(::Type{UCXRemoteChannel{RC}}) where {RC} = eltype(RC) + +function remotecall(f, pid, args...; kwargs...) + rr = Distributed.Future(pid) + + hdr = Distributed.MsgHeader(Distributed.remoteref_id(rr)) + header = AMHeader(Distributed.myid(), hdr) + msg = Distributed.CallMsg{:call}(f, args, kwargs) + + req = send_msg(pid, header, msg, AM_REMOTECALL) + # XXX: ensure that req is making progress + UCXFuture(rr) +end + +function remotecall_fetch(f, pid, args...; kwargs...) + oid = Distributed.RRID() + rv = Distributed.lookup_ref(oid) + rv.waitingfor = pid + + hdr = Distributed.MsgHeader(Distributed.RRID(0,0), oid) + header = AMHeader(Distributed.myid(), hdr) + msg = Distributed.CallMsg{:call_fetch}(f, args, kwargs) + + req = send_msg(pid, header, msg, AM_REMOTECALL_FETCH) + wait(req) + v = take!(rv) + lock(Distributed.client_refs) do + delete!(Distributed.PGRP.refs, oid) + end + return isa(v, Distributed.RemoteException) ? throw(v) : v +end + +function remotecall_wait(f, pid, args...; kwargs...) + prid = Distributed.RRID() + rv = Distributed.lookup_ref(prid) + rv.waitingfor = pid + rr = Distributed.Future(pid) + ur = UCXFuture(rr) + + hdr = Distributed.MsgHeader(Distributed.remoteref_id(rr), prid) + header = AMHeader(Distributed.myid(), hdr) + msg = Distributed.CallWaitMsg(f, args, kwargs) + + req = send_msg(pid, header, msg, AM_REMOTECALL_WAIT) + wait(req) + v = fetch(rv.c) + lock(Distributed.client_refs) do + delete!(Distributed.PGRP.refs, prid) + end + isa(v, Distributed.RemoteException) && throw(v) + return ur +end + +function remote_do(f, pid, args...; kwargs...) + + hdr = Distributed.MsgHeader() + header = AMHeader(Distributed.myid(), hdr) + + msg = Distributed.RemoteDoMsg(f, args, kwargs) + send_msg(pid, header, msg, AM_REMOTE_DO) + # XXX: ensure that req is making progress + nothing +end + +function deliver_result(msg, oid, value) + if msg === :call_fetch || isa(value, Distributed.RemoteException) + val = value + else + val = :OK + end + + hdr = Distributed.MsgHeader(oid) + header = AMHeader(Distributed.myid(), hdr) + _msg = Distributed.ResultMsg(val) + + send_msg(oid.whence, header, _msg, AM_RESULT) +end + +end # module \ No newline at end of file From e61e3e3b2b26a8bcbb4fbe4cf19920084ea2e25b Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 22 Feb 2021 11:58:03 -0500 Subject: [PATCH 02/16] fix UCX benchmarks --- examples/benchmarks/README.md | 12 ++++++++++- examples/benchmarks/ucx/latency.jl | 34 +++++++++++++++++------------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/examples/benchmarks/README.md b/examples/benchmarks/README.md index b40a333..3fa9500 100644 --- a/examples/benchmarks/README.md +++ b/examples/benchmarks/README.md @@ -7,6 +7,16 @@ pkg> dev . ``` ## Running + +### MPI benchmarks (TCP) + +``` +julia --project=examples/benchmarks -e 'ENV["JULIA_MPI_BINARY"]="system"; using Pkg; Pkg.build("MPI"; verbose=true)' +mpiexec --mca btl tcp,self -n 2 julia --project=examples/benchmarks examples/benchmarks/mpi/latency.jl +``` + + + ``` JULIA_PROJECT=(pwd)/examples/benchmarks julia examples/benchmarks/legacy/latency.jl -``` \ No newline at end of file +``` diff --git a/examples/benchmarks/ucx/latency.jl b/examples/benchmarks/ucx/latency.jl index e95d395..fadae65 100644 --- a/examples/benchmarks/ucx/latency.jl +++ b/examples/benchmarks/ucx/latency.jl @@ -38,14 +38,18 @@ function benchmark(ep, myid) t_start = Base.time_ns() end - UCX.stream_send(ep, send_buf, size) - UCX.stream_recv(ep, recv_buf, size) + req1 = UCX.stream_send(ep, send_buf, size) + req2 = UCX.stream_recv(ep, recv_buf, size) + wait(req1) + wait(req2) end t_end = Base.time_ns() else for i in -skip:loop - UCX.stream_recv(ep, recv_buf, size) - UCX.stream_send(ep, send_buf, size) + req1 = UCX.stream_recv(ep, recv_buf, size) + req2 = UCX.stream_send(ep, send_buf, size) + wait(req1) + wait(req2) end end @@ -69,23 +73,23 @@ function start_server() ctx = UCX.UCXContext() worker = UCX.UCXWorker(ctx) - function listener_callback(conn_request_h::UCX.API.ucp_conn_request_h, args::Ptr{Cvoid}) - conn_request = UCX.UCXConnectionRequest(conn_request_h) - Threads.@spawn begin + function listener_callback(::UCX.UCXListener, conn_request::UCX.UCXConnectionRequest) + UCX.@spawn_showerr begin try benchmark(UCX.UCXEndpoint($worker, $conn_request), 0) - catch err - showerror(stderr, err, catch_backtrace()) - exit(-1) + finally + close($worker) end end nothing end - cb = @cfunction($listener_callback, Cvoid, (UCX.API.ucp_conn_request_h, Ptr{Cvoid})) - listener = UCX.UCXListener(worker, port, cb) - while true - UCX.progress(worker) - yield() + listener = UCX.UCXListener(worker, listener_callback, port) + + GC.@preserve listener begin + while isopen(worker) + wait(worker) + end + close(worker) end end From a26e5ec952d6fd46a0b3d72630ae7b1a8a393398 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 22 Feb 2021 13:46:00 -0500 Subject: [PATCH 03/16] Fix benchmark plotting --- examples/benchmarks/plot.jl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/examples/benchmarks/plot.jl b/examples/benchmarks/plot.jl index 63eedae..37d9328 100644 --- a/examples/benchmarks/plot.jl +++ b/examples/benchmarks/plot.jl @@ -5,19 +5,22 @@ using CSV # Latency dist_latency = Table(CSV.File("distributed/latency.csv")) -ucx_latency = Table(CSV.File("ucx/latency.csv")) +legacy_latency = Table(CSV.File("legacy/latency_tcp.csv")) +ucx_latency = Table(CSV.File("ucx/latency_tcp.csv")) mpi_latency = Table(CSV.File("mpi/latency.csv")) let f = Figure() - fig = f[1, 1] = Axis(f, palette = (color = [:black],)) + fig = f[1, 1] = Axis(f) fig.xlabel = "Message size (bytes)" fig.ylabel = "Latency (ns)" - lines!(dist_latency.msg_size, dist_latency.latency, label = "Distributed", linewidth = 2) - lines!(ucx_latency.msg_size, ucx_latency.latency, label = "UCX", linewidth = 2) - lines!(mpi_latency.msg_size, mpi_latency.latency, label = "MPI", linewidth = 2) + lines!(dist_latency.msg_size, dist_latency.latency, label = "Distributed", linewidth = 2, color=:red) + lines!(legacy_latency.msg_size, legacy_latency.latency, label = "Distributed over UCX", linewidth = 2, color=:blue) + lines!(ucx_latency.msg_size, ucx_latency.latency, label = "UCX", linewidth = 2, color=:green) + lines!(mpi_latency.msg_size, mpi_latency.latency, label = "MPI", linewidth = 2, color=:black) f[1, 2] = Legend(f, fig) f -end \ No newline at end of file + save("latency.png", f) +end From c41341f9e9669211f68a0e4dfef5ee97409edad0 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 22 Feb 2021 15:07:23 -0500 Subject: [PATCH 04/16] synchronous am handler for small messages --- src/legacy.jl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/legacy.jl b/src/legacy.jl index 27039b1..50f4ae7 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -76,7 +76,8 @@ end param = Base.unsafe_load(_param)::UCX.API.ucp_am_recv_param_t if (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) == 0 - if false + # For small messages do a synchronous receive + if length < 512 ptr = Base.unsafe_convert(Ptr{UInt8}, data) buf = IOBuffer(Base.unsafe_wrap(Array, ptr, length)) @@ -91,12 +92,10 @@ end handle_msg(msg, am_hdr.hdr) return UCX.API.UCS_OK else - UCX.@async_showerr begin + UCX.@spawn_showerr begin ptr = Base.unsafe_convert(Ptr{UInt8}, data) buf = IOBuffer(Base.unsafe_wrap(Array, ptr, length)) - # We could do this asynchronous - # Would need to return `IN_PROGRESS` and use UCX.am_data_release msg = lock(proc_to_serializer(am_hdr.from)) do serializer prev_io = serializer.io serializer.io = buf @@ -112,7 +111,7 @@ end end else @assert (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) != 0 - UCX.@async_showerr begin + UCX.@spawn_showerr begin # Allocate rendezvous buffer # XXX: Support CuArray etc. buffer = Array{UInt8}(undef, length) From 64c41a3f094c9abcd332ade0679212b02d02c1c5 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 22 Feb 2021 17:04:53 -0500 Subject: [PATCH 05/16] short-circuit self --- examples/distributed.jl | 24 ++++++++++++++++++++++++ src/legacy.jl | 34 +++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/examples/distributed.jl b/examples/distributed.jl index dba55e8..f0b9888 100644 --- a/examples/distributed.jl +++ b/examples/distributed.jl @@ -15,6 +15,30 @@ end @test UCX.Legacy.remotecall_fetch(()->true, 2) @test fetch(UCX.Legacy.remotecall(()->true, 2)) + +@test UCX.Legacy.remotecall_fetch(()->true, 1) +@test fetch(UCX.Legacy.remotecall(()->true, 1)) + +# f() = for i in 1:1000 +# UCX.Legacy.remotecall_wait(()->true, 2) +# end + +# g() = for i in 1:1000 +# remotecall_wait(()->true, 2) +# end + +# @profview f() +# @profview g() + +# @benchmark UCX.Legacy.remotecall(()->true, 2) # 2.502 μs +# @benchmark remotecall(()->true, 2) # 11.502 μs + +# data = Array{UInt8}(undef, 8192) +# @benchmark UCX.Legacy.remotecall((x)->true, 2, $data) # 2.767 μs +# @benchmark remotecall((x)->true, 2, $data) # 17.380 μs + +# @benchmark UCX.Legacy.remote_do(()->true, 2) # 1.802 μs + # f() = for i in 1:1000 # UCX.Legacy.remotecall_wait(()->true, 2) # end diff --git a/src/legacy.jl b/src/legacy.jl index 50f4ae7..c994e1c 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -195,7 +195,7 @@ end struct UCXSerializer serializer::Distributed.ClusterSerializer{Base.GenericIOBuffer{Array{UInt8,1}}} - lock::Base.ReentrantLock + lock::Base.Threads.SpinLock end function Base.lock(f, ucx::UCXSerializer) lock(ucx.lock) do @@ -237,23 +237,31 @@ function proc_to_serializer(p) this = get!(UCX_SERIALIZERS, p) do cs = Distributed.ClusterSerializer(IOBuffer()) cs.pid = p - UCXSerializer(cs, Base.ReentrantLock()) + UCXSerializer(cs, Base.Threads.SpinLock()) end end +@inline function send_msg(pid, hdr, msg, id) + # Short circuit self send + if pid == hdr.from + req = UCX.UCXRequest(UCX_WORKER, nothing) + UCX.unroot(req) + handle_msg(msg, hdr.hdr) + notify(req) + req + else + ep = proc_to_endpoint(pid) + data = lock(proc_to_serializer(pid)) do serializer + Base.invokelatest(Distributed.serialize_msg, serializer, msg) + take!(serializer.io) + end -function send_msg(pid, hdr, msg, id) - ep = proc_to_endpoint(pid) - data = lock(proc_to_serializer(pid)) do serializer - Base.invokelatest(Distributed.serialize_msg, serializer, msg) - take!(serializer.io) - end - - header = Ref(hdr) + header = Ref(hdr) - req = UCX.am_send(ep, id, header, data) - UCX.fence(ep.worker) # Gurantuee order - req + req = UCX.am_send(ep, id, header, data) + UCX.fence(ep.worker) # Gurantuee order + req + end end abstract type UCXRemoteRef <: Distributed.AbstractRemoteRef end From de644c3591732a79049b59eff4190fa3bcbf8315 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 23 Feb 2021 15:36:51 -0500 Subject: [PATCH 06/16] refactore deserialization_msg --- src/legacy.jl | 49 ++++++++++++++++--------------------------------- 1 file changed, 16 insertions(+), 33 deletions(-) diff --git a/src/legacy.jl b/src/legacy.jl index c994e1c..3f9bb05 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -69,42 +69,36 @@ function handle_msg(msg::Distributed.ResultMsg, header) put!(Distributed.lookup_ref(header.response_oid), msg.value) end +@inline function deserialize_msg(::Type{Msg}, from, data) where Msg + buf = IOBuffer(data) + msg = lock(proc_to_serializer(from)) do serializer + prev_io = serializer.io + serializer.io = buf + msg = Distributed.deserialize_msg(serializer)::Msg + serializer.io = prev_io + msg + end +end + @inline function am_handler(::Type{Msg}, worker, header, header_length, data, length, _param) where Msg @assert header_length == sizeof(AMHeader) phdr = Base.unsafe_convert(Ptr{AMHeader}, header) am_hdr = Base.unsafe_load(phdr) + from = am_hdr.from param = Base.unsafe_load(_param)::UCX.API.ucp_am_recv_param_t if (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) == 0 # For small messages do a synchronous receive if length < 512 ptr = Base.unsafe_convert(Ptr{UInt8}, data) - buf = IOBuffer(Base.unsafe_wrap(Array, ptr, length)) - - msg = lock(proc_to_serializer(am_hdr.from)) do serializer - prev_io = serializer.io - serializer.io = buf - msg = Distributed.deserialize_msg(serializer)::Msg - serializer.io = prev_io - msg - end - + msg = deserialize_msg(Msg, from, Base.unsafe_wrap(Array, ptr, length))::Msg handle_msg(msg, am_hdr.hdr) return UCX.API.UCS_OK else UCX.@spawn_showerr begin ptr = Base.unsafe_convert(Ptr{UInt8}, data) - buf = IOBuffer(Base.unsafe_wrap(Array, ptr, length)) - - msg = lock(proc_to_serializer(am_hdr.from)) do serializer - prev_io = serializer.io - serializer.io = buf - msg = Distributed.deserialize_msg(serializer)::Msg - serializer.io = prev_io - msg - end + msg = deserialize_msg(Msg, from, Base.unsafe_wrap(Array, ptr, length))::Msg UCX.am_data_release(worker, data) - handle_msg(msg, am_hdr.hdr) end return UCX.API.UCS_INPROGRESS @@ -113,22 +107,11 @@ end @assert (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) != 0 UCX.@spawn_showerr begin # Allocate rendezvous buffer - # XXX: Support CuArray etc. buffer = Array{UInt8}(undef, length) req = UCX.am_recv(worker, data, buffer, length) wait(req) # UCX.am_data_release not necessary due to am_recv - - buf = IOBuffer(buffer) - peer_id = am_hdr.from - msg = lock(proc_to_serializer(am_hdr.from)) do serializer - prev_io = serializer.io - serializer.io = buf - msg = Base.invokelatest(Distributed.deserialize_msg, serializer)::Msg - serializer.io = prev_io - msg - end - + msg = deserialize_msg(Msg, from, buffer)::Msg handle_msg(msg, am_hdr.hdr) end return UCX.API.UCS_INPROGRESS @@ -258,8 +241,8 @@ end header = Ref(hdr) - req = UCX.am_send(ep, id, header, data) UCX.fence(ep.worker) # Gurantuee order + req = UCX.am_send(ep, id, header, data) req end end From 578ef722bda877dbc23a2fef12192852ff00d95d Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 23 Feb 2021 15:37:06 -0500 Subject: [PATCH 07/16] add send_arg --- src/legacy.jl | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/legacy.jl b/src/legacy.jl index 3f9bb05..0919510 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -247,6 +247,29 @@ end end end +@inline function send_arg(pid, arg::Array{T, N}) where {T, N} + self = Distributed.myid() + if self != pid && Base.isbitstype(T) + rr = Distributed.RRID() + shape = size(arg) + alloc = ()->Array{T,N}(undef, shape) + header = AMArgHeader(self, rr, alloc) + + ep = proc_to_endpoint(pid) + raw_header = lock(proc_to_serializer(pid)) do serializer + write(serializer.io, Int(header.from)) # yes... + Base.invokelatest(Distributed.serialize, serializer, header) + take!(serializer.io) + end + + UCX.am_send(ep, AM_ARGUMENT, raw_header, arg) + return AMArg(rr) + else + return arg + end +end +send_arg(pid, arg::Any) = arg + abstract type UCXRemoteRef <: Distributed.AbstractRemoteRef end function Distributed.call_on_owner(f, rr::UCXRemoteRef, args...) From 387ebc5c63b5279cfec5d1b10d37e6bb8f5be191 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 22 Feb 2021 21:33:49 -0500 Subject: [PATCH 08/16] send arguments early and without serialization if possible --- examples/benchmarks/legacy/latency.jl | 4 +- examples/benchmarks/plot.jl | 4 +- examples/distributed.jl | 6 +- src/legacy.jl | 82 ++++++++++++++++++++++++++- 4 files changed, 90 insertions(+), 6 deletions(-) diff --git a/examples/benchmarks/legacy/latency.jl b/examples/benchmarks/legacy/latency.jl index 5a28913..634913e 100644 --- a/examples/benchmarks/legacy/latency.jl +++ b/examples/benchmarks/legacy/latency.jl @@ -25,13 +25,13 @@ function touch_data(send_buf, size) end function benchmark() - send_buf = Vector{UInt8}(undef, MAX_MESSAGE_SIZE) t = Table(msg_size = Int[], latency = Float64[], kind=Symbol[]) size = 1 while size <= MAX_MESSAGE_SIZE @info "sending" size flush(stderr) + send_buf = Vector{UInt8}(undef, size) touch_data(send_buf, size) if size > LARGE_MESSAGE_SIZE @@ -48,7 +48,7 @@ function benchmark() t_start = Base.time_ns() end - UCX.Legacy.remotecall_wait(target, 2, view(send_buf, 1:size)) + UCX.Legacy.remotecall_wait(target, 2, send_buf) end t_end = Base.time_ns() diff --git a/examples/benchmarks/plot.jl b/examples/benchmarks/plot.jl index 37d9328..6a3599d 100644 --- a/examples/benchmarks/plot.jl +++ b/examples/benchmarks/plot.jl @@ -6,6 +6,7 @@ using CSV dist_latency = Table(CSV.File("distributed/latency.csv")) legacy_latency = Table(CSV.File("legacy/latency_tcp.csv")) +amarg_latency = Table(CSV.File("legacy/latency_amarg.csv")) ucx_latency = Table(CSV.File("ucx/latency_tcp.csv")) mpi_latency = Table(CSV.File("mpi/latency.csv")) @@ -16,7 +17,8 @@ let fig.ylabel = "Latency (ns)" lines!(dist_latency.msg_size, dist_latency.latency, label = "Distributed", linewidth = 2, color=:red) - lines!(legacy_latency.msg_size, legacy_latency.latency, label = "Distributed over UCX", linewidth = 2, color=:blue) + lines!(legacy_latency.msg_size, legacy_latency.latency, label = "Distributed(UCX) -- Serialization", linewidth = 2, color=:blue) + lines!(amarg_latency.msg_size, amarg_latency.latency, label = "Distributed(UCX) -- AMArguments", linewidth = 2, color=:cyan) lines!(ucx_latency.msg_size, ucx_latency.latency, label = "UCX", linewidth = 2, color=:green) lines!(mpi_latency.msg_size, mpi_latency.latency, label = "MPI", linewidth = 2, color=:black) diff --git a/examples/distributed.jl b/examples/distributed.jl index f0b9888..8987f6c 100644 --- a/examples/distributed.jl +++ b/examples/distributed.jl @@ -12,8 +12,12 @@ end @everywhere using UCX @everywhere UCX.Legacy.wireup() -@test UCX.Legacy.remotecall_fetch(()->true, 2) @test fetch(UCX.Legacy.remotecall(()->true, 2)) +@test UCX.Legacy.remotecall_fetch(()->true, 2) + +data = rand(8192) +UCX.Legacy.remotecall_wait(sum, 2, data) +@test UCX.Legacy.remotecall_fetch(sum, 2, data) == sum(data) @test UCX.Legacy.remotecall_fetch(()->true, 1) diff --git a/src/legacy.jl b/src/legacy.jl index 0919510..6ad973b 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -29,13 +29,28 @@ struct AMHeader hdr::Distributed.MsgHeader end +struct AMArg + rr::Distributed.RRID +end + +function ensure_args(args) + map(args) do arg + if arg isa AMArg + Distributed.fetch_ref(arg.rr) + else + return arg + end + end +end + function handle_msg(msg::Distributed.CallMsg{:call}, header) Distributed.schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) end function handle_msg(msg::Distributed.CallMsg{:call_fetch}, header) UCX.@async_showerr begin - v = Distributed.run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false) + args = ensure_args(msg.args) + v = Distributed.run_work_thunk(()->msg.f(args...; msg.kwargs...), false) if isa(v, Distributed.SyncTake) try req = deliver_result(:call_fetch, header.notify_oid, v) @@ -53,7 +68,8 @@ end function handle_msg(msg::Distributed.CallWaitMsg, header) UCX.@async_showerr begin - rv = Distributed.schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) + args = ensure_args(msg.args) + rv = Distributed.schedule_call(header.response_oid, ()->msg.f(args...; msg.kwargs...)) req = deliver_result(:call_wait, header.notify_oid, fetch(rv.c)) wait(req) end @@ -143,6 +159,63 @@ function am_result(worker, header, header_length, data, length, param) am_handler(Distributed.ResultMsg, worker, header, header_length, data, length, param) end +struct AMArgHeader + from::Int + rr::Distributed.RRID + alloc::Any +end + +function unsafe_copyto!(out, data) + ptr = Base.unsafe_convert(Ptr{eltype(out)}, data) + in = Base.unsafe_wrap(typeof(out), ptr, size(out)) + copyto!(out, in) +end + +const AM_ARGUMENT = 6 +function am_argument(worker, header, header_length, data, length, _param) + # Very different from the other am endpoints. We send the type in the header + # instead of the actual data, so that we can allocate it on the output + buf = IOBuffer(Base.unsafe_wrap(Array, Base.unsafe_convert(Ptr{UInt8}, header), header_length)) + from = read(buf, Int) + amarg = lock(proc_to_serializer(from)) do serializer + prev_io = serializer.io + serializer.io = buf + amarg = Distributed.deserialize(serializer)::AMArgHeader + serializer.io = prev_io + amarg + end + + param = Base.unsafe_load(_param)::UCX.API.ucp_am_recv_param_t + if (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) == 0 + # For small messages do a synchronous receive + if length < 512 + out = amarg.alloc() + unsafe_copyto!(out, data) + put!(Distributed.lookup_ref(amarg.rr), out) + return UCX.API.UCS_OK + else + UCX.@spawn_showerr begin + out = amarg.alloc() + unsafe_copyto!(out, data) + put!(Distributed.lookup_ref(amarg.rr), out) + UCX.am_data_release(worker, data) + end + return UCX.API.UCS_INPROGRESS + end + else + @assert (param.recv_attr & UCX.API.UCP_AM_RECV_ATTR_FLAG_RNDV) != 0 + UCX.@spawn_showerr begin + # Allocate rendezvous buffer + out = amarg.alloc() + req = UCX.am_recv(worker, data, out, length) + wait(req) + # UCX.am_data_release not necessary due to am_recv + put!(Distributed.lookup_ref(amarg.rr), out) + end + return UCX.API.UCS_INPROGRESS + end +end + function start() ctx = UCX.UCXContext() worker = UCX.UCXWorker(ctx) @@ -152,6 +225,7 @@ function start() UCX.AMHandler(worker, am_remotecall_wait, AM_REMOTECALL_WAIT) UCX.AMHandler(worker, am_remote_do, AM_REMOTE_DO) UCX.AMHandler(worker, am_result, AM_RESULT) + UCX.AMHandler(worker, am_argument, AM_ARGUMENT) global UCX_WORKER = worker atexit() do @@ -247,6 +321,8 @@ end end end +# TODO: +# views @inline function send_arg(pid, arg::Array{T, N}) where {T, N} self = Distributed.myid() if self != pid && Base.isbitstype(T) @@ -346,6 +422,7 @@ function remotecall_fetch(f, pid, args...; kwargs...) hdr = Distributed.MsgHeader(Distributed.RRID(0,0), oid) header = AMHeader(Distributed.myid(), hdr) + args = map((arg)->send_arg(pid, arg), args) msg = Distributed.CallMsg{:call_fetch}(f, args, kwargs) req = send_msg(pid, header, msg, AM_REMOTECALL_FETCH) @@ -366,6 +443,7 @@ function remotecall_wait(f, pid, args...; kwargs...) hdr = Distributed.MsgHeader(Distributed.remoteref_id(rr), prid) header = AMHeader(Distributed.myid(), hdr) + args = map((arg)->send_arg(pid, arg), args) msg = Distributed.CallWaitMsg(f, args, kwargs) req = send_msg(pid, header, msg, AM_REMOTECALL_WAIT) From a01a6dcff0fd5e88b9f843d583f86bb200171c9f Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 22 Feb 2021 22:11:01 -0500 Subject: [PATCH 09/16] improve worker wait --- src/legacy.jl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/legacy.jl b/src/legacy.jl index 6ad973b..e70e5a1 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -298,13 +298,13 @@ function proc_to_serializer(p) end end -@inline function send_msg(pid, hdr, msg, id) +@inline function send_msg(pid, hdr, msg, id, notify=false) # Short circuit self send if pid == hdr.from req = UCX.UCXRequest(UCX_WORKER, nothing) UCX.unroot(req) handle_msg(msg, hdr.hdr) - notify(req) + Base.notify(req) req else ep = proc_to_endpoint(pid) @@ -317,6 +317,7 @@ end UCX.fence(ep.worker) # Gurantuee order req = UCX.am_send(ep, id, header, data) + notify && Base.notify(ep.worker) req end end @@ -339,6 +340,7 @@ end end UCX.am_send(ep, AM_ARGUMENT, raw_header, arg) + notify(ep.worker) # wake worker up to make progress quicker return AMArg(rr) else return arg @@ -410,8 +412,7 @@ function remotecall(f, pid, args...; kwargs...) header = AMHeader(Distributed.myid(), hdr) msg = Distributed.CallMsg{:call}(f, args, kwargs) - req = send_msg(pid, header, msg, AM_REMOTECALL) - # XXX: ensure that req is making progress + req = send_msg(pid, header, msg, AM_REMOTECALL, #=notify=# true) UCXFuture(rr) end @@ -462,8 +463,7 @@ function remote_do(f, pid, args...; kwargs...) header = AMHeader(Distributed.myid(), hdr) msg = Distributed.RemoteDoMsg(f, args, kwargs) - send_msg(pid, header, msg, AM_REMOTE_DO) - # XXX: ensure that req is making progress + send_msg(pid, header, msg, AM_REMOTE_DO, #=notify=# true) nothing end From 64b95250162e9fdc739d8c728d784f95365c140e Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 22 Feb 2021 22:53:13 -0500 Subject: [PATCH 10/16] fast view --- examples/benchmarks/legacy/latency.jl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/examples/benchmarks/legacy/latency.jl b/examples/benchmarks/legacy/latency.jl index 634913e..350d2ab 100644 --- a/examples/benchmarks/legacy/latency.jl +++ b/examples/benchmarks/legacy/latency.jl @@ -26,12 +26,12 @@ end function benchmark() t = Table(msg_size = Int[], latency = Float64[], kind=Symbol[]) + send_buf = Vector{UInt8}(undef, MAX_MESSAGE_SIZE) size = 1 while size <= MAX_MESSAGE_SIZE @info "sending" size flush(stderr) - send_buf = Vector{UInt8}(undef, size) touch_data(send_buf, size) if size > LARGE_MESSAGE_SIZE @@ -48,7 +48,12 @@ function benchmark() t_start = Base.time_ns() end - UCX.Legacy.remotecall_wait(target, 2, send_buf) + GC.@preserve send_buf begin + ptr = pointer(send_buf) + subset = Base.unsafe_wrap(Array, ptr, size) + # avoid view + UCX.Legacy.remotecall_wait(target, 2, subset) + end end t_end = Base.time_ns() From ce692a6824918916ccef06dd2461c5b4a2036e76 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 23 Feb 2021 11:13:37 -0500 Subject: [PATCH 11/16] improve plotting --- examples/benchmarks/plot.jl | 46 ++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/examples/benchmarks/plot.jl b/examples/benchmarks/plot.jl index 6a3599d..71bb80a 100644 --- a/examples/benchmarks/plot.jl +++ b/examples/benchmarks/plot.jl @@ -1,6 +1,7 @@ using CairoMakie using TypedTables using CSV +using Printf # Latency @@ -10,17 +11,46 @@ amarg_latency = Table(CSV.File("legacy/latency_amarg.csv")) ucx_latency = Table(CSV.File("ucx/latency_tcp.csv")) mpi_latency = Table(CSV.File("mpi/latency.csv")) +function bytes_label(bytes) + bytes = 2^round(Int, bytes) # data in log space + if bytes < 1024 + return string(bytes) + else + bytes = bytes ÷ 1024 + end + if bytes < 1024 + return string(bytes, 'K') + else + bytes = bytes ÷ 1024 + end + return string(bytes, 'M') +end + +function prettytime(t) + if t < 1e3 + value, units = t, "ns" + elseif t < 1e6 + value, units = t / 1e3, "μs" + elseif t < 1e9 + value, units = t / 1e6, "ms" + else + value, units = t / 1e9, "s" + end + return string(@sprintf("%.1f", value), " ", units) +end + let - f = Figure() - fig = f[1, 1] = Axis(f) + f = Figure(resolution = (1200, 900)) + fig = f[1, 1] = Axis(f, xticks = LinearTicks(16), + xtickformat = ticks -> bytes_label.(ticks), + ytickformat = ticks -> prettytime.(ticks)) fig.xlabel = "Message size (bytes)" - fig.ylabel = "Latency (ns)" + fig.ylabel = "Latency" - lines!(dist_latency.msg_size, dist_latency.latency, label = "Distributed", linewidth = 2, color=:red) - lines!(legacy_latency.msg_size, legacy_latency.latency, label = "Distributed(UCX) -- Serialization", linewidth = 2, color=:blue) - lines!(amarg_latency.msg_size, amarg_latency.latency, label = "Distributed(UCX) -- AMArguments", linewidth = 2, color=:cyan) - lines!(ucx_latency.msg_size, ucx_latency.latency, label = "UCX", linewidth = 2, color=:green) - lines!(mpi_latency.msg_size, mpi_latency.latency, label = "MPI", linewidth = 2, color=:black) + lines!(log.(2, dist_latency.msg_size), dist_latency.latency, label = "Distributed", linewidth = 2, color=:red) + lines!(log.(2, amarg_latency.msg_size), amarg_latency.latency, label = "Distributed (UCX)", linewidth = 2, color=:cyan) + lines!(log.(2, ucx_latency.msg_size), ucx_latency.latency, label = "Raw UCX", linewidth = 2, color=:green) + lines!(log.(2, mpi_latency.msg_size), mpi_latency.latency, label = "MPI", linewidth = 2, color=:black) f[1, 2] = Legend(f, fig) f From f255fe153f9ae9b32fbde175eea11bb85d8308b6 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 23 Feb 2021 16:52:26 -0500 Subject: [PATCH 12/16] add distributed.jl to tested examples --- examples/distributed.jl | 71 ----------------------------------------- examples/profiling.md | 66 ++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 5 +++ 3 files changed, 71 insertions(+), 71 deletions(-) create mode 100644 examples/profiling.md diff --git a/examples/distributed.jl b/examples/distributed.jl index 8987f6c..b4ecd33 100644 --- a/examples/distributed.jl +++ b/examples/distributed.jl @@ -1,14 +1,8 @@ using Distributed using Test -using BenchmarkTools addprocs(1) -@everywhere begin - using Pkg - Pkg.activate(@__DIR__) -end - @everywhere using UCX @everywhere UCX.Legacy.wireup() @@ -19,70 +13,5 @@ data = rand(8192) UCX.Legacy.remotecall_wait(sum, 2, data) @test UCX.Legacy.remotecall_fetch(sum, 2, data) == sum(data) - @test UCX.Legacy.remotecall_fetch(()->true, 1) @test fetch(UCX.Legacy.remotecall(()->true, 1)) - -# f() = for i in 1:1000 -# UCX.Legacy.remotecall_wait(()->true, 2) -# end - -# g() = for i in 1:1000 -# remotecall_wait(()->true, 2) -# end - -# @profview f() -# @profview g() - -# @benchmark UCX.Legacy.remotecall(()->true, 2) # 2.502 μs -# @benchmark remotecall(()->true, 2) # 11.502 μs - -# data = Array{UInt8}(undef, 8192) -# @benchmark UCX.Legacy.remotecall((x)->true, 2, $data) # 2.767 μs -# @benchmark remotecall((x)->true, 2, $data) # 17.380 μs - -# @benchmark UCX.Legacy.remote_do(()->true, 2) # 1.802 μs - -# f() = for i in 1:1000 -# UCX.Legacy.remotecall_wait(()->true, 2) -# end - -# g() = for i in 1:1000 -# remotecall_wait(()->true, 2) -# end - -# @profview f() -# @profview g() - -# @benchmark UCX.Legacy.remotecall(()->true, 2) # 2.502 μs -# @benchmark remotecall(()->true, 2) # 11.502 μs - -# data = Array{UInt8}(undef, 8192) -# @benchmark UCX.Legacy.remotecall((x)->true, 2, $data) # 2.767 μs -# @benchmark remotecall((x)->true, 2, $data) # 17.380 μs - -# @benchmark UCX.Legacy.remote_do(()->true, 2) # 1.802 μs -# @benchmark remote_do(()->true, 2) # 10.190 μs - -# @benchmark UCX.Legacy.remotecall_wait(()->true, 2) # 1ms (Timer) 20.320 μs (busy) 42μs (poll_fd) -# @benchmark remotecall_wait(()->true, 2) # 40 μs - -# @benchmark UCX.Legacy.remotecall_fetch(()->true, 2) # 1ms (Timer) 14.560 μs (busy) 31μs (poll_fd) -# @benchmark remotecall_fetch(()->true, 2) # 40 μs - -# # Base line -# @benchmark(wait(@async(nothing))) # 1 μs - - -# @everywhere using Profile, PProf - -# Profile.clear() -# remotecall_wait(Profile.clear, 2) -# remotecall_wait(Profile.start_timer, 2) -# Profile.start_timer() -# @benchmark UCX.Legacy.remotecall_wait(()->true, 2) -# Profile.stop_timer() -# remotecall_wait(Profile.stop_timer, 2) -# remotecall_wait(PProf.pprof, 2, web=false, out="proc2.pb.gz") -# PProf.pprof(web=false, out="proc1.pb.gz") - diff --git a/examples/profiling.md b/examples/profiling.md new file mode 100644 index 0000000..5764b5a --- /dev/null +++ b/examples/profiling.md @@ -0,0 +1,66 @@ +# VSCode + +```julia +f() = for i in 1:1000 + UCX.Legacy.remotecall_wait(()->true, 2) +end + +g() = for i in 1:1000 + remotecall_wait(()->true, 2) +end + +@profview f() +@profview g() +``` + +# Microbenchmarks + +```julia +using BenchmarkTools + + +@benchmark UCX.Legacy.remotecall(()->true, 2) # 2.502 μs +@benchmark remotecall(()->true, 2) # 11.502 μs + +data = Array{UInt8}(undef, 8192) +@benchmark UCX.Legacy.remotecall((x)->true, 2, $data) # 2.767 μs +@benchmark remotecall((x)->true, 2, $data) # 17.380 μs + +@benchmark UCX.Legacy.remote_do(()->true, 2) # 1.802 μs + + +@benchmark UCX.Legacy.remotecall(()->true, 2) # 2.502 μs +@benchmark remotecall(()->true, 2) # 11.502 μs + +data = Array{UInt8}(undef, 8192) +@benchmark UCX.Legacy.remotecall((x)->true, 2, $data) # 2.767 μs +@benchmark remotecall((x)->true, 2, $data) # 17.380 μs + +@benchmark UCX.Legacy.remote_do(()->true, 2) # 1.802 μs +@benchmark remote_do(()->true, 2) # 10.190 μs + +@benchmark UCX.Legacy.remotecall_wait(()->true, 2) # 1ms (Timer) 20.320 μs (busy) 42μs (poll_fd) +@benchmark remotecall_wait(()->true, 2) # 40 μs + +@benchmark UCX.Legacy.remotecall_fetch(()->true, 2) # 1ms (Timer) 14.560 μs (busy) 31μs (poll_fd) +@benchmark remotecall_fetch(()->true, 2) # 40 μs + +# Base line +@benchmark(wait(@async(nothing))) # 1 μs +``` + +# PProf + +```julia +@everywhere using Profile, PProf + +Profile.clear() +remotecall_wait(Profile.clear, 2) +remotecall_wait(Profile.start_timer, 2) +Profile.start_timer() +@benchmark UCX.Legacy.remotecall_wait(()->true, 2) +Profile.stop_timer() +remotecall_wait(Profile.stop_timer, 2) +remotecall_wait(PProf.pprof, 2, web=false, out="proc2.pb.gz") +PProf.pprof(web=false, out="proc1.pb.gz") +``` \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 4158d27..90370e9 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -78,4 +78,9 @@ end @test success(pipeline(`$cmd $script test $(2^i)`, stderr=stderr, stdout=stdout)) end end + + @testset "Distributed.jl over UCX" begin + script = joinpath(examples_dir, "distributed.jl") + @test success(pipeline(`$cmd $script`, stderr=stderr, stdout=stdout)) + end end From 4b72a7d4136369f05050d09f0efec123437ce4d0 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Wed, 24 Feb 2021 17:57:32 -0500 Subject: [PATCH 13/16] don't leak memory --- src/legacy.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/legacy.jl b/src/legacy.jl index e70e5a1..c9d7242 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -36,7 +36,7 @@ end function ensure_args(args) map(args) do arg if arg isa AMArg - Distributed.fetch_ref(arg.rr) + Distributed.take_ref(arg.rr, Distributed.myid()) else return arg end From 79114f57b62258a8bb05fa6c3f821eaaae19cdd6 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Wed, 24 Feb 2021 21:31:07 -0500 Subject: [PATCH 14/16] protoypical cuda support --- Project.toml | 5 +- examples/benchmarks/distributed/latency.jl | 9 ++- examples/benchmarks/distributed/setup.jl | 2 + examples/benchmarks/legacy/latency.jl | 17 +++--- examples/benchmarks/legacy/latency_cuda.jl | 67 ++++++++++++++++++++++ examples/benchmarks/legacy/setup.jl | 2 + examples/benchmarks/satori/Project.toml | 12 ++++ examples/benchmarks/satori/driver.jl | 3 + examples/benchmarks/satori/launch.sh | 35 +++++++++++ examples/benchmarks/satori/setup.jl | 52 +++++++++++++++++ examples/distributed.jl | 7 +++ src/UCX.jl | 25 ++++++-- src/buffer.jl | 26 +++++++++ src/cuda.jl | 30 ++++++++++ src/legacy.jl | 29 +++++----- 15 files changed, 290 insertions(+), 31 deletions(-) create mode 100644 examples/benchmarks/distributed/setup.jl create mode 100644 examples/benchmarks/legacy/latency_cuda.jl create mode 100644 examples/benchmarks/legacy/setup.jl create mode 100644 examples/benchmarks/satori/Project.toml create mode 100644 examples/benchmarks/satori/driver.jl create mode 100644 examples/benchmarks/satori/launch.sh create mode 100644 examples/benchmarks/satori/setup.jl create mode 100644 src/buffer.jl create mode 100644 src/cuda.jl diff --git a/Project.toml b/Project.toml index b0feb78..18fca4b 100644 --- a/Project.toml +++ b/Project.toml @@ -4,17 +4,18 @@ version = "0.3.0" [deps] CEnum = "fa961155-64e5-5f13-b03f-caf6b980ea82" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" FunctionWrappers = "069b7b12-0de2-55c6-9aab-29f3d0a68a2e" -Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +Requires = "ae029012-a4dd-5104-9daa-d747884805df" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" UCX_jll = "16e4e860-d6b8-5056-a518-93e88b6392ae" [compat] CEnum = "0.4" +FunctionWrappers = "1.1" UCX_jll = "1.10" julia = "1.5" -FunctionWrappers = "1.1" diff --git a/examples/benchmarks/distributed/latency.jl b/examples/benchmarks/distributed/latency.jl index fca6973..c63ee04 100644 --- a/examples/benchmarks/distributed/latency.jl +++ b/examples/benchmarks/distributed/latency.jl @@ -2,8 +2,6 @@ using Distributed include(joinpath(@__DIR__, "..", "config.jl")) -addprocs(1) - @everywhere function target(A) nothing end @@ -42,7 +40,12 @@ function benchmark() t_start = Base.time_ns() end - remotecall_wait(target, 2, view(send_buf, 1:size)) + GC.@preserve send_buf begin + ptr = pointer(send_buf) + subset = Base.unsafe_wrap(Array, ptr, size) + # avoid view + remotecall_wait(target, 2, subset) + end end t_end = Base.time_ns() diff --git a/examples/benchmarks/distributed/setup.jl b/examples/benchmarks/distributed/setup.jl new file mode 100644 index 0000000..43a6024 --- /dev/null +++ b/examples/benchmarks/distributed/setup.jl @@ -0,0 +1,2 @@ +using Distributed +addprocs(1) \ No newline at end of file diff --git a/examples/benchmarks/legacy/latency.jl b/examples/benchmarks/legacy/latency.jl index 350d2ab..4898a2f 100644 --- a/examples/benchmarks/legacy/latency.jl +++ b/examples/benchmarks/legacy/latency.jl @@ -1,13 +1,9 @@ -using Distributed +@everywhere using UCX +UCX.Legacy.wireup() include(joinpath(@__DIR__, "..", "config.jl")) -addprocs(1) - -@everywhere using UCX -@everywhere UCX.Legacy.wireup() - -@everywhere function target(A) +@everywhere function target(::Any) nothing end @@ -66,7 +62,12 @@ function benchmark() size *= 2 end - CSV.write(joinpath(@__DIR__, "latency.csv"), t) + if length(ARGS) > 0 + suffix = string("_", ARGS[1]) + else + suffix = "" + end + CSV.write(joinpath(@__DIR__, "latency$suffix.csv"), t) end if !isinteractive() diff --git a/examples/benchmarks/legacy/latency_cuda.jl b/examples/benchmarks/legacy/latency_cuda.jl new file mode 100644 index 0000000..0d52320 --- /dev/null +++ b/examples/benchmarks/legacy/latency_cuda.jl @@ -0,0 +1,67 @@ +@everywhere using UCX +UCX.Legacy.wireup() + +@everywhere using CUDA + +include(joinpath(@__DIR__, "..", "config.jl")) + +@everywhere function target(::Any) + nothing +end + +const MAX_MESSAGE_SIZE = 1<<22 +# const MAX_MESSAGE_SIZE = 4096 +const LARGE_MESSAGE_SIZE = 8192 + +const LAT_LOOP_SMALL = 10000 +const LAT_SKIP_SMALL = 100 +const LAT_LOOP_LARGE = 1000 +const LAT_SKIP_LARGE = 10 + +function touch_data(send_buf, size) + send_buf[1:size] .= 'A' % UInt8 +end + +function benchmark() + t = Table(msg_size = Int[], latency = Float64[], kind=Symbol[]) + send_buf = CuArray{UInt8, 1}(undef, MAX_MESSAGE_SIZE) + + size = 1 + while size <= MAX_MESSAGE_SIZE + @info "sending" size + flush(stderr) + touch_data(send_buf, size) + + if size > LARGE_MESSAGE_SIZE + loop = LAT_LOOP_LARGE + skip = LAT_SKIP_LARGE + else + loop = LAT_LOOP_SMALL + skip = LAT_SKIP_SMALL + end + + t_start = 0 + for i in -skip:loop + if i == 1 + t_start = Base.time_ns() + end + + UCX.Legacy.remotecall_wait(target, 2, view(send_buf, 1:size)) + + end + t_end = Base.time_ns() + + t_delta = t_end-t_start + t_op = t_delta / loop + + push!(t, (msg_size = size, latency = t_op, kind = :distributed)) + + size *= 2 + end + + CSV.write(joinpath(@__DIR__, "latency_cuda.csv"), t) +end + +if !isinteractive() + benchmark() +end diff --git a/examples/benchmarks/legacy/setup.jl b/examples/benchmarks/legacy/setup.jl new file mode 100644 index 0000000..43a6024 --- /dev/null +++ b/examples/benchmarks/legacy/setup.jl @@ -0,0 +1,2 @@ +using Distributed +addprocs(1) \ No newline at end of file diff --git a/examples/benchmarks/satori/Project.toml b/examples/benchmarks/satori/Project.toml new file mode 100644 index 0000000..7ad37b5 --- /dev/null +++ b/examples/benchmarks/satori/Project.toml @@ -0,0 +1,12 @@ +[deps] +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +CUDA = "052768ef-5323-5732-b1bb-66c8b64840ba" +ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" +LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" +PProf = "e4faabce-9ead-11e9-39d9-4379958e3056" +Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" +TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9" +UCX = "6b349878-927d-5bd5-ab28-bc3aa4175a33" diff --git a/examples/benchmarks/satori/driver.jl b/examples/benchmarks/satori/driver.jl new file mode 100644 index 0000000..e49de9a --- /dev/null +++ b/examples/benchmarks/satori/driver.jl @@ -0,0 +1,3 @@ +@everywhere using UCX +UCX.Legacy.wireup() + diff --git a/examples/benchmarks/satori/launch.sh b/examples/benchmarks/satori/launch.sh new file mode 100644 index 0000000..dab6624 --- /dev/null +++ b/examples/benchmarks/satori/launch.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Begin SLURM Directives +#SBATCH --job-name=UCX +#SBATCH --time=1:00:00 +#SBATCH --mem=0 +#SBATCH --ntasks-per-node=1 +#SBATCH --gpus-per-node=0 +#SBATCH --cpus-per-task=1 + +# Clear the environment from any previously loaded modules +module purge > /dev/null 2>&1 + +module add spack + +module load julia/1.5.3 +module load cuda/10.1.243 + +spack env activate pappa +export UCX_LOG_LEVEL=debug + +export HOME2=/nobackup/users/vchuravy + +export JULIA_PROJECT=`pwd` +export JULIA_DEPOT_PATH=${HOME2}/julia_depot + +export JULIA_CUDA_USE_BINARYBUILDER=false + +julia -e 'using Pkg; pkg"instantiate"' +julia -e 'using Pkg; pkg"precompile"' + +UCX_TLS=all julia -L setup.jl ../legacy/latency.jl +UCX_TLS=tcp,self julia -L setup.jl ../legacy/latency.jl tcp +UCX_TLS=ib,self julia -L setup.jl ../legacy/latency.jl ib +UCX_TLS=ib,self julia -L setup.jl ../legacy/latency_cuda.jl +julia -L setup.jl ../distributed/latency.jl diff --git a/examples/benchmarks/satori/setup.jl b/examples/benchmarks/satori/setup.jl new file mode 100644 index 0000000..138042f --- /dev/null +++ b/examples/benchmarks/satori/setup.jl @@ -0,0 +1,52 @@ +using Distributed +using ClusterManagers + +# Usage: +# - Set `export JULIA_PROJECT=`pwd`` + +if haskey(ENV, "SLURM_JOB_ID") + jobid = ENV["SLURM_JOB_ID"] + ntasks = parse(Int, ENV["SLURM_NTASKS"]) + cpus_per_task = parse(Int, ENV["SLURM_CPUS_PER_TASK"]) + @info "Running on Slurm cluster" jobid ntasks cpus_per_task + manager = SlurmManager(ntasks) +else + ntasks = 2 + cpus_per_task = div(Sys.CPU_THREADS, ntasks) + @info "Running locally" ntasks + manager = Distributed.LocalManager(ntasks, false) +end +flush(stderr) + +addprocs(manager; exeflags = ["-t $cpus_per_task"]) + +@everywhere begin + import Dates + using Logging, LoggingExtras + const date_format = "HH:MM:SS" + + function distributed_logger(logger) + logger = MinLevelLogger(logger, Logging.Info) + logger = TransformerLogger(logger) do log + merge(log, (; message = "$(Dates.format(Dates.now(), date_format)) ($(myid())) $(log.message)")) + end + return logger + end + + # set the global logger + if !(stderr isa IOStream) + ConsoleLogger(stderr) + else + FileLogger(stderr, always_flush=true) + end |> distributed_logger |> global_logger +end + +@everywhere begin + if myid() != 1 + @info "Worker started" Base.Threads.nthreads() + end + sysimg = unsafe_string((Base.JLOptions()).image_file) + project = Base.active_project() + host = strip(read(`hostname`, String)) + @info "Environment" sysimg project host +end diff --git a/examples/distributed.jl b/examples/distributed.jl index b4ecd33..702ff7d 100644 --- a/examples/distributed.jl +++ b/examples/distributed.jl @@ -15,3 +15,10 @@ UCX.Legacy.remotecall_wait(sum, 2, data) @test UCX.Legacy.remotecall_fetch(()->true, 1) @test fetch(UCX.Legacy.remotecall(()->true, 1)) + +@everywhere using CUDA +data = CuArray(data) + +@info "sending cuda" +flush(stderr) +@test UCX.Legacy.remotecall_fetch(sum, 2, data) == sum(data) diff --git a/src/UCX.jl b/src/UCX.jl index 41501b3..5950dfe 100644 --- a/src/UCX.jl +++ b/src/UCX.jl @@ -3,6 +3,7 @@ module UCX using Sockets: InetAddr, IPv4, listenany using Random import FunctionWrappers: FunctionWrapper +using Requires const PROGRESS_MODE = Ref(:idling) @@ -21,6 +22,7 @@ function __init__() ccall((:ucs_debug_disable_signals, API.libucs), Cvoid, ()) @assert version() >= VersionNumber(API.UCP_API_MAJOR, API.UCP_API_MINOR) + mode = get(ENV, "JLUCX_PROGRESS_MODE", "idling") if mode == "busy" PROGRESS_MODE[] = :busy @@ -32,8 +34,13 @@ function __init__() error("JLUCX_PROGRESS_MODE set to unkown progress mode: $mode") end @debug "UCX progress mode" mode + + @require CUDA="052768ef-5323-5732-b1bb-66c8b64840ba" include("cuda.jl") end +primitive type UCXPtr Sys.WORD_SIZE end +include("buffer.jl") + function memzero!(ref::Ref) ccall(:memset, Ptr{Cvoid}, (Ptr{Cvoid}, Cint, Csize_t), ref, 0, sizeof(ref)) end @@ -749,7 +756,7 @@ function recv_callback(req::Ptr{Cvoid}, status::API.ucs_status_t, info::Ptr{API. nothing end -@inline function request_param(dt, request, cb, flags=nothing) +@inline function request_param(dt, request, cb, flags=nothing, mem_type=nothing) attr_mask = API.UCP_OP_ATTR_FIELD_CALLBACK | API.UCP_OP_ATTR_FIELD_USER_DATA | API.UCP_OP_ATTR_FIELD_DATATYPE @@ -758,6 +765,10 @@ end attr_mask |= API.UCP_OP_ATTR_FIELD_FLAGS end + if mem_type !== nothing + attr_mask |= API.UCP_OP_ATTR_FIELD_MEMORY_TYPE + end + param = Ref{API.ucp_request_param_t}() memzero!(param) set!(param, :op_attr_mask, attr_mask) @@ -767,6 +778,9 @@ end if flags !== nothing set!(param, :flags, flags) end + if mem_type !== nothing + set!(param, :memory_type, mem_type) + end param end @@ -885,7 +899,7 @@ function am_send(ep::UCXEndpoint, id, header, buffer=nothing, flags=nothing) data = C_NULL nbytes = 0 else - data = Base.unsafe_convert(Ptr{Cvoid}, Base.cconvert(Ptr{Cvoid}, buffer)) + data = reinterpret(Ptr{Cvoid}, Base.unsafe_convert(UCXPtr, Base.cconvert(UCXPtr, buffer))) nbytes = sizeof(buffer) end header_ptr = Base.unsafe_convert(Ptr{Cvoid}, Base.cconvert(Ptr{Cvoid}, header)) @@ -905,13 +919,14 @@ function am_data_recv_callback(req::Ptr{Cvoid}, status::API.ucs_status_t, length end function am_recv(worker::UCXWorker, data_desc, buffer, nbytes) + request = UCXRequest(worker, buffer) # rooted through ep.worker + mem_type = memory_type(buffer) dt = ucp_dt_make_contig(1) # since we are sending nbytes cb = @cfunction(am_data_recv_callback, Cvoid, (Ptr{Cvoid}, API.ucs_status_t, Csize_t, Ptr{Cvoid})) - request = UCXRequest(worker, buffer) # rooted through ep.worker - param = request_param(dt, request, cb) + param = request_param(dt, request, cb, #=flags=# nothing, mem_type) GC.@preserve buffer begin - data = pointer(buffer) + data = reinterpret(Ptr{Cvoid}, Base.unsafe_convert(UCXPtr, Base.cconvert(UCXPtr, buffer))) ptr = API.ucp_am_recv_data_nbx(worker, data_desc, data, nbytes, param) end return handle_request(request, ptr) diff --git a/src/buffer.jl b/src/buffer.jl new file mode 100644 index 0000000..2c5a090 --- /dev/null +++ b/src/buffer.jl @@ -0,0 +1,26 @@ +UCXBuffertype{T} = Union{Ptr{T}, Array{T}, Ref{T}} + +Base.cconvert(::Type{UCXPtr}, x::Union{Ptr{T}, Array{T}, Ref{T}}) where T = Base.cconvert(Ptr{T}, x) +function Base.unsafe_convert(::Type{UCXPtr}, x::UCXBuffertype{T}) where T + ptr = Base.unsafe_convert(Ptr{T}, x) + reinterpret(UCXPtr, ptr) +end + +function Base.cconvert(::Type{UCXPtr}, x::String) + x +end +function Base.unsafe_convert(::Type{UCXPtr}, x::String) + reinterpret(MPIPtr, pointer(x)) +end + +# TODO: alignment & datatype... +function unsafe_copyto!(out::UCXBuffertype, in) + GC.@preserve out begin + ptr = Base.unsafe_convert(Ptr{Cvoid}, Base.cconvert(Ptr{Cvoid}, out)) + ptr = Base.unsafe_convert(Ptr{UInt8}, ptr) + in = Base.unsafe_convert(Ptr{UInt8}, in) + Base.unsafe_copyto!(ptr, in, sizeof(out)) + end +end + +memory_type(::UCXBuffertype) = UCX.API.UCS_MEMORY_TYPE_HOST \ No newline at end of file diff --git a/src/cuda.jl b/src/cuda.jl new file mode 100644 index 0000000..cf7e9e2 --- /dev/null +++ b/src/cuda.jl @@ -0,0 +1,30 @@ +import .CUDA + +function Base.cconvert(::Type{UCXPtr}, buf::CUDA.CuArray{T}) where T + Base.cconvert(CUDA.CuPtr{T}, buf) # returns DeviceBuffer +end + +function Base.unsafe_convert(::Type{UCXPtr}, X::CUDA.CuArray{T}) where T + reinterpret(UCXPtr, Base.unsafe_convert(CUDA.CuPtr{T}, X)) +end + +UCX.Legacy.direct(::CUDA.CuArray{T}) where T = isbitstype(T) +function UCX.Legacy.alloc_func(arg::CUDA.CuArray{T,N}) where {T,N} + shape = size(arg) + ()->CUDA.CuArray{T, N}(undef, shape) +end + +function UCX.unsafe_copyto!(out::CUDA.CuArray, in) + # XXX: It seems that these are always host buffers + inbuf = unsafe_wrap(Array, Base.unsafe_convert(Ptr{UInt8}, in), sizeof(out)) + copyto!(out, inbuf) + + # GC.@preserve out begin + # ptr = pointer(out) + # ptr = Base.unsafe_convert(CUDA.CuPtr{UInt8}, ptr) + # in = Base.reinterpret(CUDA.CuPtr{UInt8}, in) + # Base.unsafe_copyto!(ptr, in, sizeof(out)) + # end +end + +UCX.memory_type(::Union{CUDA.CuArray, CUDA.CuPtr}) = UCX.API.UCS_MEMORY_TYPE_CUDA \ No newline at end of file diff --git a/src/legacy.jl b/src/legacy.jl index c9d7242..db163f0 100644 --- a/src/legacy.jl +++ b/src/legacy.jl @@ -165,12 +165,6 @@ struct AMArgHeader alloc::Any end -function unsafe_copyto!(out, data) - ptr = Base.unsafe_convert(Ptr{eltype(out)}, data) - in = Base.unsafe_wrap(typeof(out), ptr, size(out)) - copyto!(out, in) -end - const AM_ARGUMENT = 6 function am_argument(worker, header, header_length, data, length, _param) # Very different from the other am endpoints. We send the type in the header @@ -190,13 +184,13 @@ function am_argument(worker, header, header_length, data, length, _param) # For small messages do a synchronous receive if length < 512 out = amarg.alloc() - unsafe_copyto!(out, data) + UCX.unsafe_copyto!(out, data) put!(Distributed.lookup_ref(amarg.rr), out) return UCX.API.UCS_OK else UCX.@spawn_showerr begin out = amarg.alloc() - unsafe_copyto!(out, data) + UCX.unsafe_copyto!(out, data) put!(Distributed.lookup_ref(amarg.rr), out) UCX.am_data_release(worker, data) end @@ -281,6 +275,7 @@ function wireup(procs=Distributed.procs()) end end end + @info "UCX Config" UCX_WORKER.context.config... end function proc_to_endpoint(p) @@ -324,12 +319,21 @@ end # TODO: # views -@inline function send_arg(pid, arg::Array{T, N}) where {T, N} +function alloc_func(arg::Array{T, N}) where {T, N} + shape = size(arg) + () -> Array{T, N}(undef, shape) +end + +function direct(::Array{T}) where T + return Base.isbitstype(T) +end +direct(::Any) = false + +@inline function send_arg(pid, arg) self = Distributed.myid() - if self != pid && Base.isbitstype(T) + if self != pid && direct(arg) rr = Distributed.RRID() - shape = size(arg) - alloc = ()->Array{T,N}(undef, shape) + alloc = alloc_func(arg) header = AMArgHeader(self, rr, alloc) ep = proc_to_endpoint(pid) @@ -346,7 +350,6 @@ end return arg end end -send_arg(pid, arg::Any) = arg abstract type UCXRemoteRef <: Distributed.AbstractRemoteRef end From 9d1d5896f0ad49a8e1c8d216041392f49270f619 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Thu, 25 Feb 2021 16:09:37 -0500 Subject: [PATCH 15/16] add latency measurments from satori --- .../benchmarks/distributed/latency_cuda.jl | 64 +++++++++++++++++++ .../distributed/latency_cuda_n1.csv | 24 +++++++ .../distributed/latency_cuda_n2.csv | 24 +++++++ .../benchmarks/distributed/latency_n1.csv | 24 +++++++ .../benchmarks/distributed/latency_n2.csv | 24 +++++++ examples/benchmarks/legacy/latency_cuda.jl | 3 +- .../benchmarks/legacy/latency_cuda_n1.csv | 24 +++++++ .../benchmarks/legacy/latency_cuda_n2.csv | 24 +++++++ examples/benchmarks/legacy/latency_n1.csv | 24 +++++++ examples/benchmarks/legacy/latency_n2.csv | 24 +++++++ examples/benchmarks/satori/Project.toml | 1 + examples/benchmarks/satori/launch.sh | 40 +++--------- examples/benchmarks/satori/setup.jl | 39 ++++++----- examples/benchmarks/satori/slurm.sh | 42 ++++++++++++ 14 files changed, 334 insertions(+), 47 deletions(-) create mode 100644 examples/benchmarks/distributed/latency_cuda.jl create mode 100644 examples/benchmarks/distributed/latency_cuda_n1.csv create mode 100644 examples/benchmarks/distributed/latency_cuda_n2.csv create mode 100644 examples/benchmarks/distributed/latency_n1.csv create mode 100644 examples/benchmarks/distributed/latency_n2.csv create mode 100644 examples/benchmarks/legacy/latency_cuda_n1.csv create mode 100644 examples/benchmarks/legacy/latency_cuda_n2.csv create mode 100644 examples/benchmarks/legacy/latency_n1.csv create mode 100644 examples/benchmarks/legacy/latency_n2.csv mode change 100644 => 100755 examples/benchmarks/satori/launch.sh create mode 100644 examples/benchmarks/satori/slurm.sh diff --git a/examples/benchmarks/distributed/latency_cuda.jl b/examples/benchmarks/distributed/latency_cuda.jl new file mode 100644 index 0000000..7c5eb5d --- /dev/null +++ b/examples/benchmarks/distributed/latency_cuda.jl @@ -0,0 +1,64 @@ +@everywhere using CUDA + +include(joinpath(@__DIR__, "..", "config.jl")) + +@everywhere function target(::Any) + nothing +end + +const MAX_MESSAGE_SIZE = 1<<22 +# const MAX_MESSAGE_SIZE = 4096 +const LARGE_MESSAGE_SIZE = 8192 + +const LAT_LOOP_SMALL = 10000 +const LAT_SKIP_SMALL = 100 +const LAT_LOOP_LARGE = 1000 +const LAT_SKIP_LARGE = 10 + +function touch_data(send_buf, size) + send_buf[1:size] .= 'A' % UInt8 +end + +function benchmark() + t = Table(msg_size = Int[], latency = Float64[], kind=Symbol[]) + send_buf = CuArray{UInt8, 1}(undef, MAX_MESSAGE_SIZE) + + size = 1 + while size <= MAX_MESSAGE_SIZE + @info "sending" size + flush(stderr) + touch_data(send_buf, size) + + if size > LARGE_MESSAGE_SIZE + loop = LAT_LOOP_LARGE + skip = LAT_SKIP_LARGE + else + loop = LAT_LOOP_SMALL + skip = LAT_SKIP_SMALL + end + + t_start = 0 + for i in -skip:loop + if i == 1 + t_start = Base.time_ns() + end + + remotecall_wait(target, 2, view(send_buf, 1:size)) + + end + t_end = Base.time_ns() + + t_delta = t_end-t_start + t_op = t_delta / loop + + push!(t, (msg_size = size, latency = t_op, kind = :distributed)) + + size *= 2 + end + + CSV.write(joinpath(@__DIR__, "latency_cuda.csv"), t) +end + +if !isinteractive() + benchmark() +end diff --git a/examples/benchmarks/distributed/latency_cuda_n1.csv b/examples/benchmarks/distributed/latency_cuda_n1.csv new file mode 100644 index 0000000..ba5baf7 --- /dev/null +++ b/examples/benchmarks/distributed/latency_cuda_n1.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,194326.9893,distributed +2,258492.2302,distributed +4,163384.0434,distributed +8,181657.0711,distributed +16,161660.3484,distributed +32,170364.3234,distributed +64,183142.3,distributed +128,189219.9591,distributed +256,191429.4955,distributed +512,169848.7063,distributed +1024,178409.7519,distributed +2048,182247.2716,distributed +4096,181372.9255,distributed +8192,186916.8803,distributed +16384,227877.354,distributed +32768,262931.695,distributed +65536,299395.474,distributed +131072,400552.752,distributed +262144,726228.433,distributed +524288,1.123765e6,distributed +1048576,1.941801365e6,distributed +2097152,3.637946629e6,distributed +4194304,8.655691419e6,distributed diff --git a/examples/benchmarks/distributed/latency_cuda_n2.csv b/examples/benchmarks/distributed/latency_cuda_n2.csv new file mode 100644 index 0000000..848d8cf --- /dev/null +++ b/examples/benchmarks/distributed/latency_cuda_n2.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,225954.7719,distributed +2,317755.7089,distributed +4,216636.6364,distributed +8,215174.4908,distributed +16,215454.6626,distributed +32,258174.5308,distributed +64,216194.8179,distributed +128,231967.9421,distributed +256,225013.8801,distributed +512,236940.8486,distributed +1024,231660.4506,distributed +2048,222922.7825,distributed +4096,232425.0653,distributed +8192,246477.4929,distributed +16384,298064.405,distributed +32768,342866.96,distributed +65536,371927.251,distributed +131072,502949.961,distributed +262144,788521.308,distributed +524288,1.369897892e6,distributed +1048576,2.571240613e6,distributed +2097152,4.916306884e6,distributed +4194304,1.1224723223e7,distributed diff --git a/examples/benchmarks/distributed/latency_n1.csv b/examples/benchmarks/distributed/latency_n1.csv new file mode 100644 index 0000000..619d26e --- /dev/null +++ b/examples/benchmarks/distributed/latency_n1.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,96009.0072,distributed +2,97592.2913,distributed +4,104222.7204,distributed +8,96774.2828,distributed +16,106855.5027,distributed +32,113018.8739,distributed +64,109522.7947,distributed +128,110182.2696,distributed +256,159410.2898,distributed +512,128404.4605,distributed +1024,132115.1629,distributed +2048,114721.292,distributed +4096,115280.7431,distributed +8192,110971.0442,distributed +16384,169943.913,distributed +32768,123538.35,distributed +65536,148656.547,distributed +131072,139377.027,distributed +262144,157263.064,distributed +524288,196206.107,distributed +1048576,280161.416,distributed +2097152,450651.949,distributed +4194304,875355.147,distributed diff --git a/examples/benchmarks/distributed/latency_n2.csv b/examples/benchmarks/distributed/latency_n2.csv new file mode 100644 index 0000000..aba4cef --- /dev/null +++ b/examples/benchmarks/distributed/latency_n2.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,131763.9244,distributed +2,133011.9576,distributed +4,143194.8101,distributed +8,132870.4986,distributed +16,141274.7225,distributed +32,132614.0035,distributed +64,174522.3768,distributed +128,144640.3753,distributed +256,131596.4903,distributed +512,145297.8343,distributed +1024,136729.8732,distributed +2048,135132.3934,distributed +4096,156707.2372,distributed +8192,152365.9384,distributed +16384,196623.648,distributed +32768,188031.68,distributed +65536,228107.612,distributed +131072,269893.838,distributed +262144,387071.053,distributed +524288,617363.447,distributed +1048576,1.06839952e6,distributed +2097152,2.069559307e6,distributed +4194304,3.721738061e6,distributed diff --git a/examples/benchmarks/legacy/latency_cuda.jl b/examples/benchmarks/legacy/latency_cuda.jl index 0d52320..be2c553 100644 --- a/examples/benchmarks/legacy/latency_cuda.jl +++ b/examples/benchmarks/legacy/latency_cuda.jl @@ -9,6 +9,7 @@ include(joinpath(@__DIR__, "..", "config.jl")) nothing end +const MIN_MESSAGE_SIZE = 1 const MAX_MESSAGE_SIZE = 1<<22 # const MAX_MESSAGE_SIZE = 4096 const LARGE_MESSAGE_SIZE = 8192 @@ -26,7 +27,7 @@ function benchmark() t = Table(msg_size = Int[], latency = Float64[], kind=Symbol[]) send_buf = CuArray{UInt8, 1}(undef, MAX_MESSAGE_SIZE) - size = 1 + size = MIN_MESSAGE_SIZE while size <= MAX_MESSAGE_SIZE @info "sending" size flush(stderr) diff --git a/examples/benchmarks/legacy/latency_cuda_n1.csv b/examples/benchmarks/legacy/latency_cuda_n1.csv new file mode 100644 index 0000000..6e289d7 --- /dev/null +++ b/examples/benchmarks/legacy/latency_cuda_n1.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,375444.6842,distributed +2,180356.4877,distributed +4,190866.5862,distributed +8,189152.3119,distributed +16,184727.9465,distributed +32,225978.3438,distributed +64,188077.658,distributed +128,190211.7779,distributed +256,194756.171,distributed +512,201423.7476,distributed +1024,201161.3452,distributed +2048,201616.0648,distributed +4096,208915.9859,distributed +8192,205980.812,distributed +16384,246849.56,distributed +32768,257380.612,distributed +65536,346178.696,distributed +131072,361134.603,distributed +262144,559074.239,distributed +524288,847497.23,distributed +1048576,1.602522563e6,distributed +2097152,3.014586021e6,distributed +4194304,6.378893333e6,distributed diff --git a/examples/benchmarks/legacy/latency_cuda_n2.csv b/examples/benchmarks/legacy/latency_cuda_n2.csv new file mode 100644 index 0000000..e8b0136 --- /dev/null +++ b/examples/benchmarks/legacy/latency_cuda_n2.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,355919.3739,distributed +2,155100.8949,distributed +4,170991.3227,distributed +8,186267.0693,distributed +16,173908.5767,distributed +32,168154.5784,distributed +64,201975.2871,distributed +128,176074.8021,distributed +256,175530.2833,distributed +512,173217.0391,distributed +1024,200978.0787,distributed +2048,191857.5288,distributed +4096,187442.9623,distributed +8192,192548.356,distributed +16384,208170.155,distributed +32768,575150.29,distributed +65536,677563.087,distributed +131072,799885.68,distributed +262144,1.007147862e6,distributed +524288,1.492842212e6,distributed +1048576,2.431816274e6,distributed +2097152,4.231298012e6,distributed +4194304,9.034090592e6,distributed diff --git a/examples/benchmarks/legacy/latency_n1.csv b/examples/benchmarks/legacy/latency_n1.csv new file mode 100644 index 0000000..3f66bb6 --- /dev/null +++ b/examples/benchmarks/legacy/latency_n1.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,131796.25,distributed +2,134239.8956,distributed +4,129024.5421,distributed +8,133010.5551,distributed +16,148679.6656,distributed +32,144703.0073,distributed +64,136514.8048,distributed +128,121965.4292,distributed +256,152139.1169,distributed +512,144215.4731,distributed +1024,143750.7439,distributed +2048,158625.7127,distributed +4096,145384.2171,distributed +8192,135458.2598,distributed +16384,238937.911,distributed +32768,321149.099,distributed +65536,207582.956,distributed +131072,232304.223,distributed +262144,308064.234,distributed +524288,350314.565,distributed +1048576,489330.873,distributed +2097152,553359.073,distributed +4194304,685112.372,distributed diff --git a/examples/benchmarks/legacy/latency_n2.csv b/examples/benchmarks/legacy/latency_n2.csv new file mode 100644 index 0000000..5a84ed8 --- /dev/null +++ b/examples/benchmarks/legacy/latency_n2.csv @@ -0,0 +1,24 @@ +msg_size,latency,kind +1,144703.8142,distributed +2,144063.0384,distributed +4,133647.0162,distributed +8,142267.1021,distributed +16,145387.4857,distributed +32,152134.5153,distributed +64,156303.8734,distributed +128,130868.0653,distributed +256,147170.6158,distributed +512,149794.4918,distributed +1024,148227.5345,distributed +2048,143802.1492,distributed +4096,160296.3501,distributed +8192,156855.8773,distributed +16384,198224.901,distributed +32768,197301.937,distributed +65536,203463.3,distributed +131072,240399.663,distributed +262144,230375.214,distributed +524288,296092.0,distributed +1048576,393183.222,distributed +2097152,547875.317,distributed +4194304,836790.5,distributed diff --git a/examples/benchmarks/satori/Project.toml b/examples/benchmarks/satori/Project.toml index 7ad37b5..4c510c6 100644 --- a/examples/benchmarks/satori/Project.toml +++ b/examples/benchmarks/satori/Project.toml @@ -6,6 +6,7 @@ ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" +PMI = "430b9b6a-ac6d-4acb-aa7c-f6c2aea375ca" PProf = "e4faabce-9ead-11e9-39d9-4379958e3056" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" TypedTables = "9d95f2ec-7b3d-5a63-8d20-e2491e220bb9" diff --git a/examples/benchmarks/satori/launch.sh b/examples/benchmarks/satori/launch.sh old mode 100644 new mode 100755 index dab6624..485264e --- a/examples/benchmarks/satori/launch.sh +++ b/examples/benchmarks/satori/launch.sh @@ -1,35 +1,13 @@ #!/bin/bash -# Begin SLURM Directives -#SBATCH --job-name=UCX -#SBATCH --time=1:00:00 -#SBATCH --mem=0 -#SBATCH --ntasks-per-node=1 -#SBATCH --gpus-per-node=0 -#SBATCH --cpus-per-task=1 -# Clear the environment from any previously loaded modules -module purge > /dev/null 2>&1 +# UCX_TLS=all julia -L setup.jl ../legacy/latency.jl +# UCX_TLS=tcp,self julia -L setup.jl ../legacy/latency.jl tcp +# UCX_TLS=ib,self julia -L setup.jl ../legacy/latency.jl ib +# UCX_TLS=ib,self julia -L setup.jl ../legacy/latency_cuda.jl -module add spack +export JULIA_WORKER_TIMEOUT=180 -module load julia/1.5.3 -module load cuda/10.1.243 - -spack env activate pappa -export UCX_LOG_LEVEL=debug - -export HOME2=/nobackup/users/vchuravy - -export JULIA_PROJECT=`pwd` -export JULIA_DEPOT_PATH=${HOME2}/julia_depot - -export JULIA_CUDA_USE_BINARYBUILDER=false - -julia -e 'using Pkg; pkg"instantiate"' -julia -e 'using Pkg; pkg"precompile"' - -UCX_TLS=all julia -L setup.jl ../legacy/latency.jl -UCX_TLS=tcp,self julia -L setup.jl ../legacy/latency.jl tcp -UCX_TLS=ib,self julia -L setup.jl ../legacy/latency.jl ib -UCX_TLS=ib,self julia -L setup.jl ../legacy/latency_cuda.jl -julia -L setup.jl ../distributed/latency.jl +flux mini run --nodes=1 --ntasks=2 julia -L setup.jl ../legacy/latency.jl +flux mini run --nodes=1 --ntasks=2 julia -L setup.jl ../legacy/latency_cuda.jl +flux mini run --nodes=1 --ntasks=2 julia -L setup.jl ../distributed/latency.jl +flux mini run --nodes=1 --ntasks=2 julia -L setup.jl ../distributed/latency_cuda.jl \ No newline at end of file diff --git a/examples/benchmarks/satori/setup.jl b/examples/benchmarks/satori/setup.jl index 138042f..931a608 100644 --- a/examples/benchmarks/satori/setup.jl +++ b/examples/benchmarks/satori/setup.jl @@ -1,24 +1,33 @@ using Distributed -using ClusterManagers + # Usage: # - Set `export JULIA_PROJECT=`pwd`` -if haskey(ENV, "SLURM_JOB_ID") - jobid = ENV["SLURM_JOB_ID"] - ntasks = parse(Int, ENV["SLURM_NTASKS"]) - cpus_per_task = parse(Int, ENV["SLURM_CPUS_PER_TASK"]) - @info "Running on Slurm cluster" jobid ntasks cpus_per_task - manager = SlurmManager(ntasks) -else - ntasks = 2 - cpus_per_task = div(Sys.CPU_THREADS, ntasks) - @info "Running locally" ntasks - manager = Distributed.LocalManager(ntasks, false) -end -flush(stderr) +# using ClusterManagers +# if haskey(ENV, "SLURM_JOB_ID") +# jobid = ENV["SLURM_JOB_ID"] +# ntasks = parse(Int, ENV["SLURM_NTASKS"]) +# cpus_per_task = parse(Int, ENV["SLURM_CPUS_PER_TASK"]) +# @info "Running on Slurm cluster" jobid ntasks cpus_per_task +# manager = SlurmManager(ntasks) +# else +# ntasks = 2 +# cpus_per_task = div(Sys.CPU_THREADS, ntasks) +# @info "Running locally" ntasks +# manager = Distributed.LocalManager(ntasks, false) +# end +# flush(stderr) + +# addprocs(manager; exeflags = ["-t $cpus_per_task"]) + +@info "Using PMI" +using PMI + +include(joinpath(dirname(pathof(PMI)), "..", "examples", "distributed.jl")) +WireUp.wireup() -addprocs(manager; exeflags = ["-t $cpus_per_task"]) +@everywhere ENV["CUDA_VISIBLE_DEVICES"] = myid() @everywhere begin import Dates diff --git a/examples/benchmarks/satori/slurm.sh b/examples/benchmarks/satori/slurm.sh new file mode 100644 index 0000000..7e2316c --- /dev/null +++ b/examples/benchmarks/satori/slurm.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# Begin SLURM Directives +#SBATCH --job-name=UCX +#SBATCH --time=1:00:00 +#SBATCH --mem=0 +#SBATCH --gpus-per-node=4 +#SBATCH --exclusive + +# Clear the environment from any previously loaded modules +module purge > /dev/null 2>&1 + +module add spack + +module load julia/1.5.3 +module load cuda/10.2 + +# spack install cuda@10.2 +# spack install ucx@1.10-dev +cuda +thread_multiple +logging +# spack install flux-core +cuda + +spack env activate pappa3 +# export UCX_LOG_LEVEL=debug + +export HOME2=/nobackup/users/vchuravy + +export JULIA_PROJECT=`pwd` +export JULIA_DEPOT_PATH=${HOME2}/julia_depot + +export JULIA_CUDA_USE_BINARYBUILDER=false +export JULIA_SYSTEM_UCX=1 + +julia -e 'using Pkg; pkg"instantiate"' +julia -e 'using Pkg; pkg"precompile"' + +srun -N ${SLURM_NNODES} -n ${SLURM_NNODES} --mpi=none --cpu-bind=none flux start ./launch.sh + +# UCX_TLS=all julia -L setup.jl ../legacy/latency.jl +# # UCX_TLS=tcp,self julia -L setup.jl ../legacy/latency.jl tcp +# # UCX_TLS=ib,self julia -L setup.jl ../legacy/latency.jl ib +# UCX_TLS=all julia -L setup.jl ../legacy/latency_cuda.jl +# julia -L setup.jl ../distributed/latency.jl + From 977e68a3d22e0f1abba7fa43ef8428421bc494e2 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Fri, 26 Feb 2021 10:47:50 -0500 Subject: [PATCH 16/16] prettier plots --- examples/benchmarks/plot.jl | 51 ++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/examples/benchmarks/plot.jl b/examples/benchmarks/plot.jl index 71bb80a..da17faa 100644 --- a/examples/benchmarks/plot.jl +++ b/examples/benchmarks/plot.jl @@ -5,11 +5,14 @@ using Printf # Latency -dist_latency = Table(CSV.File("distributed/latency.csv")) -legacy_latency = Table(CSV.File("legacy/latency_tcp.csv")) -amarg_latency = Table(CSV.File("legacy/latency_amarg.csv")) -ucx_latency = Table(CSV.File("ucx/latency_tcp.csv")) -mpi_latency = Table(CSV.File("mpi/latency.csv")) +am_n2_latency = Table(CSV.File("legacy/latency_n2.csv")) +am_n1_latency = Table(CSV.File("legacy/latency_n1.csv")) +am_gpu_n2_latency = Table(CSV.File("legacy/latency_cuda_n2.csv")) +am_gpu_n1_latency = Table(CSV.File("legacy/latency_cuda_n1.csv")) +dist_n2_latency = Table(CSV.File("distributed/latency_n2.csv")) +dist_n1_latency = Table(CSV.File("distributed/latency_n1.csv")) +dist_gpu_n2_latency = Table(CSV.File("distributed/latency_cuda_n2.csv")) +dist_gpu_n1_latency = Table(CSV.File("distributed/latency_cuda_n1.csv")) function bytes_label(bytes) bytes = 2^round(Int, bytes) # data in log space @@ -27,6 +30,7 @@ function bytes_label(bytes) end function prettytime(t) + t = exp10(t) if t < 1e3 value, units = t, "ns" elseif t < 1e6 @@ -39,20 +43,33 @@ function prettytime(t) return string(@sprintf("%.1f", value), " ", units) end -let - f = Figure(resolution = (1200, 900)) - fig = f[1, 1] = Axis(f, xticks = LinearTicks(16), + +function vis(f, label, am, dist) + + fig = Axis(f, xticks = LinearTicks(16), yticks=LinearTicks(10), xtickformat = ticks -> bytes_label.(ticks), ytickformat = ticks -> prettytime.(ticks)) - fig.xlabel = "Message size (bytes)" - fig.ylabel = "Latency" + fig.xlabel = "Message size (bytes) - Logscale" + fig.ylabel = "Latency - Logscale" + fig.title = label - lines!(log.(2, dist_latency.msg_size), dist_latency.latency, label = "Distributed", linewidth = 2, color=:red) - lines!(log.(2, amarg_latency.msg_size), amarg_latency.latency, label = "Distributed (UCX)", linewidth = 2, color=:cyan) - lines!(log.(2, ucx_latency.msg_size), ucx_latency.latency, label = "Raw UCX", linewidth = 2, color=:green) - lines!(log.(2, mpi_latency.msg_size), mpi_latency.latency, label = "MPI", linewidth = 2, color=:black) + lines!(log.(2, dist.msg_size), log.(10, dist.latency), label = "Distributed", linewidth = 2, color=:red) + lines!(log.(2, am.msg_size), log.(10, am.latency), label = "UCX", linewidth = 2, color=:blue) - f[1, 2] = Legend(f, fig) - f - save("latency.png", f) + axislegend(position=:lt) + fig end + +f = Figure(resolution = (2000, 1600)) +f[1, 1] = cpu1 = vis(f, "Single Node CPU Latency", am_n1_latency, dist_n1_latency) +f[2, 1] = gpu1 = vis(f, "Single Node GPU Latency", am_gpu_n1_latency, dist_gpu_n1_latency) +f[1, 2] = cpu2 = vis(f, "Two Node CPU Latency", am_n2_latency, dist_n2_latency) +f[2, 2] = gpu2 =vis(f, "Two Node GPU Latency", am_gpu_n2_latency, dist_gpu_n2_latency) + +linkyaxes!(cpu1, cpu2) +linkyaxes!(gpu1, gpu2) +linkxaxes!(cpu1, cpu2, gpu1, gpu2) + +Label(f[0, :], "Latency test on IBM Power9 & NVidia V100", textsize = 30) +f +save("latency.png", f)