Skip to content

Commit

Permalink
v0.10.5
Browse files Browse the repository at this point in the history
- Fix: when showing progress meter, changing `JobGroup.x` was not thread safe. #
- Fix: when showing progress meter, now CPU and MEM was not computed again from JobGroup, but just fetch from a global variable `RESOURCE(cpu, mem)::Resource`. `RESOURCE` is computed when `update_queue!()`
  • Loading branch information
cihga39871 committed Nov 8, 2024
1 parent ff14252 commit c34c396
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "JobSchedulers"
uuid = "eeff360b-c02d-44d3-ab26-4013c616a17e"
authors = ["Jiacheng Chuan <[email protected]>"]
version = "0.10.4"
version = "0.10.5"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
5 changes: 5 additions & 0 deletions docs/src/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

v0.10.5

- Fix: when showing progress meter, changing `JobGroup.x` was not thread safe.
- Fix: when showing progress meter, now CPU and MEM was not computed again from JobGroup, but just fetch from a global variable `RESOURCE(cpu, mem)::Resource`. `RESOURCE` is computed when `update_queue!()`

v0.10.4

- Optimize: remove extra `scheduler_status` check in `queue_progress(...)`.
Expand Down
1 change: 1 addition & 0 deletions src/JobQueue.jl
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ function update_queue!()

# update running: update state, cancel jobs reaching wall time, moving finished from running to others, add next recur of successfully finished job to future queue, and compute current usage.
used_ncpu, used_mem = update_running!(current)
update_resource(used_ncpu, used_mem)

free_ncpu = SCHEDULER_MAX_CPU - used_ncpu
free_mem = SCHEDULER_MAX_MEM - used_mem
Expand Down
121 changes: 72 additions & 49 deletions src/progress_computing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ const BAR_LEFT = "▕"
const BAR_RIGHT = ""
const BLOCK = ""

CPU_RUNNING::Float64 = 0
MEM_RUNNING::Int = 0
mutable struct Resource
cpu::Float64
mem::Int
end
const RESOURCE = Resource(0,0)

function update_resource(cpu::Real, mem::Int)
global RESOURCE
RESOURCE.cpu = cpu
RESOURCE.mem = mem
end

"""
GROUP_SEPERATOR::Regex = r": *"
Expand Down Expand Up @@ -49,21 +58,23 @@ end
"""
mutable struct JobGroup
name::String
total::Int
queuing::Int
running::Int
done::Int
failed::Int
cancelled::Int
@atomic total::Int
@atomic queuing::Int
@atomic running::Int
@atomic done::Int
@atomic failed::Int
@atomic cancelled::Int
jobs::Set{Job} # running jobs only
lock_jobs::ReentrantLock # lock when updating/query jobs
end

function JobGroup(group_name::AbstractString)
JobGroup(group_name, 0, 0, 0, 0, 0, 0, Set{Job}())
JobGroup(group_name, 0, 0, 0, 0, 0, 0, Set{Job}(), ReentrantLock())
end

const ALL_JOB_GROUP = JobGroup("ALL JOBS")
const JOB_GROUPS = OrderedDict{String, JobGroup}()
const JOB_GROUPS_LOCK = ReentrantLock()
const OTHER_JOB_GROUP = JobGroup("OTHERS")

"""
Expand All @@ -78,41 +89,45 @@ function update_group_state!(job::Job)
# initial job
job._group = get_group(job.name)

if haskey(JOB_GROUPS, job._group)
jg = JOB_GROUPS[job._group]
else
jg = JobGroup(job._group)
JOB_GROUPS[job._group] = jg
jg = lock(JOB_GROUPS_LOCK) do
if haskey(JOB_GROUPS, job._group)
jg = JOB_GROUPS[job._group]
else
jg = JobGroup(job._group)
JOB_GROUPS[job._group] = jg
end
end

jg.total += 1
ALL_JOB_GROUP.total += 1
@atomic jg.total += 1
@atomic ALL_JOB_GROUP.total += 1
else
jg = JOB_GROUPS[job._group]
jg = lock(JOB_GROUPS_LOCK) do
JOB_GROUPS[job._group]
end
# group state before updating
setfield!(jg, job._group_state, getfield(jg, job._group_state) - 1)
setfield!(ALL_JOB_GROUP, job._group_state, getfield(ALL_JOB_GROUP, job._group_state) - 1)
modifyfield!(jg, job._group_state, -, 1, :sequentially_consistent)
modifyfield!(ALL_JOB_GROUP, job._group_state, -, 1, :sequentially_consistent)
end

# group state before updating
if job._group_state === RUNNING # previous running, not current
try
pop!(jg.jobs, job)
lock(jg.lock_jobs) do
pop!(jg.jobs, job)
end
catch
end
global CPU_RUNNING -= job.ncpu
global MEM_RUNNING -= job.mem
end

# updated group state
job._group_state = job.state
setfield!(jg, job._group_state, getfield(jg, job._group_state) + 1)
setfield!(ALL_JOB_GROUP, job._group_state, getfield(ALL_JOB_GROUP, job._group_state) + 1)
modifyfield!(jg, job._group_state, +, 1, :sequentially_consistent)
modifyfield!(ALL_JOB_GROUP, job._group_state, +, 1, :sequentially_consistent)

if job._group_state === RUNNING # current running
push!(jg.jobs, job)
global CPU_RUNNING += job.ncpu
global MEM_RUNNING += job.mem
lock(jg.lock_jobs) do
push!(jg.jobs, job)
end
end

nothing
Expand All @@ -125,7 +140,9 @@ Prepare group state for existing jobs
"""
function init_group_state!()
clear_job_group!(ALL_JOB_GROUP)
empty!(JOB_GROUPS)
lock(JOB_GROUPS_LOCK) do
empty!(JOB_GROUPS)
end
# clear_job_group!(OTHER_JOB_GROUP) # no need to init, will compute later anyway

lock(JOB_QUEUE.lock_queuing) do
Expand All @@ -152,13 +169,15 @@ function init_group_state!(job::Job)
end

function clear_job_group!(g::JobGroup)
g.total = 0
g.queuing = 0
g.running = 0
g.done = 0
g.failed = 0
g.cancelled = 0
empty!(g.jobs)
@atomic g.total = 0
@atomic g.queuing = 0
@atomic g.running = 0
@atomic g.done = 0
@atomic g.failed = 0
@atomic g.cancelled = 0
lock(g.lock_jobs) do
empty!(g.jobs)
end
end

"""
Expand All @@ -178,20 +197,22 @@ function get_group(name::AbstractString, group_seperator = GROUP_SEPERATOR)
end

function compute_other_job_group!(groups_shown::Vector{JobGroup})
OTHER_JOB_GROUP.total = ALL_JOB_GROUP.total
OTHER_JOB_GROUP.queuing = ALL_JOB_GROUP.queuing
OTHER_JOB_GROUP.running = ALL_JOB_GROUP.running
OTHER_JOB_GROUP.done = ALL_JOB_GROUP.done
OTHER_JOB_GROUP.failed = ALL_JOB_GROUP.failed
OTHER_JOB_GROUP.cancelled = ALL_JOB_GROUP.cancelled
empty!(OTHER_JOB_GROUP.jobs)
@atomic OTHER_JOB_GROUP.total = ALL_JOB_GROUP.total
@atomic OTHER_JOB_GROUP.queuing = ALL_JOB_GROUP.queuing
@atomic OTHER_JOB_GROUP.running = ALL_JOB_GROUP.running
@atomic OTHER_JOB_GROUP.done = ALL_JOB_GROUP.done
@atomic OTHER_JOB_GROUP.failed = ALL_JOB_GROUP.failed
@atomic OTHER_JOB_GROUP.cancelled = ALL_JOB_GROUP.cancelled
lock(OTHER_JOB_GROUP.lock_jobs) do
empty!(OTHER_JOB_GROUP.jobs)
end
for g in groups_shown
OTHER_JOB_GROUP.total -= g.total
OTHER_JOB_GROUP.queuing -= g.queuing
OTHER_JOB_GROUP.running -= g.running
OTHER_JOB_GROUP.done -= g.done
OTHER_JOB_GROUP.failed -= g.failed
OTHER_JOB_GROUP.cancelled -= g.cancelled
@atomic OTHER_JOB_GROUP.total -= g.total
@atomic OTHER_JOB_GROUP.queuing -= g.queuing
@atomic OTHER_JOB_GROUP.running -= g.running
@atomic OTHER_JOB_GROUP.done -= g.done
@atomic OTHER_JOB_GROUP.failed -= g.failed
@atomic OTHER_JOB_GROUP.cancelled -= g.cancelled
end
if OTHER_JOB_GROUP.running > 0
# find one that is running
Expand All @@ -201,7 +222,9 @@ function compute_other_job_group!(groups_shown::Vector{JobGroup})
if job._group in shown_group_names
continue
end
push!(OTHER_JOB_GROUP.jobs, job)
lock(OTHER_JOB_GROUP.lock_jobs) do
push!(OTHER_JOB_GROUP.jobs, job)
end
break
end
end
Expand Down
42 changes: 15 additions & 27 deletions src/progress_view.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@


# https://gist.github.com/ConnerWill/d4b6c776b509add763e17f9f113fd25b#erase-functions
erase_rest_line = Terming.CSI * "0K"
erase_current_line = Terming.CSI * "2K"
const erase_rest_line = Terming.CSI * "0K"
const erase_current_line = Terming.CSI * "2K"

"""
progress_bar(percent::Float64, width::Int = 20)
Expand Down Expand Up @@ -100,22 +100,16 @@ function queue_progress(;remove_tmp_files::Bool = true, kwargs...)
stderr_tmp_file = joinpath(homedir(), "julia_$(now_str).err")
stderr_tmp = open(stderr_tmp_file, "w+")

# stdlog_tmp_file = joinpath(homedir(), "julia_$(now_str).log")
# stdlog_tmp_io = open(stdlog_tmp_file, "w+")
# stdlog_tmp = Logging.SimpleLogger(stdlog_tmp_io)

try
queue_progress(stdout_tmp, stderr_tmp; kwargs...)
catch
rethrow()
finally
close(stdout_tmp)
close(stderr_tmp)
# close(stdlog_tmp_io)
if remove_tmp_files
rm(stdout_tmp_file)
rm(stderr_tmp_file)
# rm(stdlog_tmp_file)
end
end
end
Expand Down Expand Up @@ -307,7 +301,7 @@ function style_line(line::String, log_style::Symbol)
end

function view_update_resources(h::Int, w::Int; row::Int = 2, max_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU, max_mem::Int = JobSchedulers.SCHEDULER_MAX_MEM, is_in_terminal::Bool = true)

global RESOURCE
if h - row < 5
# no render: height not enough
return row
Expand All @@ -316,19 +310,19 @@ function view_update_resources(h::Int, w::Int; row::Int = 2, max_cpu::Int = JobS
title = is_in_terminal ? @bold("CURRENT RESOURCES:") : "CURRENT RESOURCES:"

cpu_text = (" CPU: ")
cpu_val = "$CPU_RUNNING/$max_cpu"
cpu_val = "$(RESOURCE.cpu)/$max_cpu"
cpu_width = 9 + length(cpu_val)
if CPU_RUNNING < max_cpu
if RESOURCE.cpu < max_cpu
cpu_text *= is_in_terminal ? @green(cpu_val) : cpu_val
else
cpu_text *= is_in_terminal ? @yellow(cpu_val) : cpu_val
end


mem_text = (" MEM: ")
mem_percent = @sprintf("%3.2f%%", MEM_RUNNING / max_mem * 100)
mem_percent = @sprintf("%3.2f%%", RESOURCE.mem / max_mem * 100)
mem_width = 9 + length(mem_percent)
if MEM_RUNNING < max_mem
if RESOURCE.mem < max_mem
mem_text *= is_in_terminal ? @green(mem_percent) : mem_percent
else
mem_text *= is_in_terminal ? @yellow(mem_percent) : mem_percent
Expand All @@ -353,23 +347,15 @@ end

function view_update_job_group_title(h::Int, w::Int; row::Int = 2, is_in_terminal::Bool = true)

# description_plain = "[$(ALL_JOB_GROUP.running) running, $(ALL_JOB_GROUP.failed) failed + $(ALL_JOB_GROUP.cancelled) cancelled, $(ALL_JOB_GROUP.done) done / $(ALL_JOB_GROUP.total) total]"

if is_in_terminal
title = @bold("JOB PROGRESS:")
# description = "[" * @green("$(ALL_JOB_GROUP.running) running") * ", " *
# @red("$(ALL_JOB_GROUP.failed) failed") *
# @yellow(" + $(ALL_JOB_GROUP.cancelled) cancelled") * ", " *
# "$(ALL_JOB_GROUP.done) done / " *
# @bold("$(ALL_JOB_GROUP.total) total") * "]"
description = @dim("[") * @green("running") * @dim(", ") *
@red("failed") *
@yellow("+cancelled") * @dim(", ") *
"done/" *
@bold("total") * @dim("]")
else
title = "JOB PROGRESS:"
# description = "[$(ALL_JOB_GROUP.running) running, $(ALL_JOB_GROUP.failed) failed + $(ALL_JOB_GROUP.cancelled) cancelled, $(ALL_JOB_GROUP.done) done / $(ALL_JOB_GROUP.total) total]"
description = "[running, failed+cancelled, done/total]"
end
width_description = 43 # 4 + length(description_plain)
Expand Down Expand Up @@ -592,13 +578,15 @@ function view_update(h, w; row = 1, groups_shown::Vector{JobGroup} = JobGroup[],
row = view_update_job_group(h, w; row = row, job_group = ALL_JOB_GROUP, highlight = true, is_in_terminal = is_in_terminal, group_seperator_at_begining = group_seperator_at_begining)

# specific job groups
for job_group in values(JOB_GROUPS)
job_group.total < 2 && continue
if row >= h - 1
break
lock(JOB_GROUPS_LOCK) do
for job_group in values(JOB_GROUPS)
job_group.total < 2 && continue
if row >= h - 1
break
end
row = view_update_job_group(h, w; row = row, job_group = job_group, is_in_terminal = is_in_terminal, group_seperator_at_begining = group_seperator_at_begining)
push!(groups_shown, job_group)
end
row = view_update_job_group(h, w; row = row, job_group = job_group, is_in_terminal = is_in_terminal, group_seperator_at_begining = group_seperator_at_begining)
push!(groups_shown, job_group)
end

compute_other_job_group!(groups_shown)
Expand Down

0 comments on commit c34c396

Please sign in to comment.