From c34c396079442579ef95839de66c3b948934cedb Mon Sep 17 00:00:00 2001 From: Jiacheng Chuan Date: Fri, 8 Nov 2024 17:22:38 -0400 Subject: [PATCH] v0.10.5 - Fix: when showing progress meter, changing `JobGroup.x` was not thread safe. # - Fix: when showing progress meter, now CPU and MEM was not computed again from JobGroup, but just fetch from a global variable `RESOURCE(cpu, mem)::Resource`. `RESOURCE` is computed when `update_queue!()` --- Project.toml | 2 +- docs/src/changelog.md | 5 ++ src/JobQueue.jl | 1 + src/progress_computing.jl | 121 +++++++++++++++++++++++--------------- src/progress_view.jl | 42 +++++-------- 5 files changed, 94 insertions(+), 77 deletions(-) diff --git a/Project.toml b/Project.toml index cefc8c1..a72cf54 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "JobSchedulers" uuid = "eeff360b-c02d-44d3-ab26-4013c616a17e" authors = ["Jiacheng Chuan "] -version = "0.10.4" +version = "0.10.5" [deps] DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" diff --git a/docs/src/changelog.md b/docs/src/changelog.md index 3ccb4a3..2ab4864 100644 --- a/docs/src/changelog.md +++ b/docs/src/changelog.md @@ -1,5 +1,10 @@ # Changelog +v0.10.5 + +- Fix: when showing progress meter, changing `JobGroup.x` was not thread safe. +- Fix: when showing progress meter, now CPU and MEM was not computed again from JobGroup, but just fetch from a global variable `RESOURCE(cpu, mem)::Resource`. `RESOURCE` is computed when `update_queue!()` + v0.10.4 - Optimize: remove extra `scheduler_status` check in `queue_progress(...)`. diff --git a/src/JobQueue.jl b/src/JobQueue.jl index 5436a6e..84b4be6 100644 --- a/src/JobQueue.jl +++ b/src/JobQueue.jl @@ -165,6 +165,7 @@ function update_queue!() # update running: update state, cancel jobs reaching wall time, moving finished from running to others, add next recur of successfully finished job to future queue, and compute current usage. used_ncpu, used_mem = update_running!(current) + update_resource(used_ncpu, used_mem) free_ncpu = SCHEDULER_MAX_CPU - used_ncpu free_mem = SCHEDULER_MAX_MEM - used_mem diff --git a/src/progress_computing.jl b/src/progress_computing.jl index b798e3c..43f484f 100644 --- a/src/progress_computing.jl +++ b/src/progress_computing.jl @@ -10,8 +10,17 @@ const BAR_LEFT = "▕" const BAR_RIGHT = "▎" const BLOCK = "█" -CPU_RUNNING::Float64 = 0 -MEM_RUNNING::Int = 0 +mutable struct Resource + cpu::Float64 + mem::Int +end +const RESOURCE = Resource(0,0) + +function update_resource(cpu::Real, mem::Int) + global RESOURCE + RESOURCE.cpu = cpu + RESOURCE.mem = mem +end """ GROUP_SEPERATOR::Regex = r": *" @@ -49,21 +58,23 @@ end """ mutable struct JobGroup name::String - total::Int - queuing::Int - running::Int - done::Int - failed::Int - cancelled::Int + @atomic total::Int + @atomic queuing::Int + @atomic running::Int + @atomic done::Int + @atomic failed::Int + @atomic cancelled::Int jobs::Set{Job} # running jobs only + lock_jobs::ReentrantLock # lock when updating/query jobs end function JobGroup(group_name::AbstractString) - JobGroup(group_name, 0, 0, 0, 0, 0, 0, Set{Job}()) + JobGroup(group_name, 0, 0, 0, 0, 0, 0, Set{Job}(), ReentrantLock()) end const ALL_JOB_GROUP = JobGroup("ALL JOBS") const JOB_GROUPS = OrderedDict{String, JobGroup}() +const JOB_GROUPS_LOCK = ReentrantLock() const OTHER_JOB_GROUP = JobGroup("OTHERS") """ @@ -78,41 +89,45 @@ function update_group_state!(job::Job) # initial job job._group = get_group(job.name) - if haskey(JOB_GROUPS, job._group) - jg = JOB_GROUPS[job._group] - else - jg = JobGroup(job._group) - JOB_GROUPS[job._group] = jg + jg = lock(JOB_GROUPS_LOCK) do + if haskey(JOB_GROUPS, job._group) + jg = JOB_GROUPS[job._group] + else + jg = JobGroup(job._group) + JOB_GROUPS[job._group] = jg + end end - jg.total += 1 - ALL_JOB_GROUP.total += 1 + @atomic jg.total += 1 + @atomic ALL_JOB_GROUP.total += 1 else - jg = JOB_GROUPS[job._group] + jg = lock(JOB_GROUPS_LOCK) do + JOB_GROUPS[job._group] + end # group state before updating - setfield!(jg, job._group_state, getfield(jg, job._group_state) - 1) - setfield!(ALL_JOB_GROUP, job._group_state, getfield(ALL_JOB_GROUP, job._group_state) - 1) + modifyfield!(jg, job._group_state, -, 1, :sequentially_consistent) + modifyfield!(ALL_JOB_GROUP, job._group_state, -, 1, :sequentially_consistent) end # group state before updating if job._group_state === RUNNING # previous running, not current try - pop!(jg.jobs, job) + lock(jg.lock_jobs) do + pop!(jg.jobs, job) + end catch end - global CPU_RUNNING -= job.ncpu - global MEM_RUNNING -= job.mem end # updated group state job._group_state = job.state - setfield!(jg, job._group_state, getfield(jg, job._group_state) + 1) - setfield!(ALL_JOB_GROUP, job._group_state, getfield(ALL_JOB_GROUP, job._group_state) + 1) + modifyfield!(jg, job._group_state, +, 1, :sequentially_consistent) + modifyfield!(ALL_JOB_GROUP, job._group_state, +, 1, :sequentially_consistent) if job._group_state === RUNNING # current running - push!(jg.jobs, job) - global CPU_RUNNING += job.ncpu - global MEM_RUNNING += job.mem + lock(jg.lock_jobs) do + push!(jg.jobs, job) + end end nothing @@ -125,7 +140,9 @@ Prepare group state for existing jobs """ function init_group_state!() clear_job_group!(ALL_JOB_GROUP) - empty!(JOB_GROUPS) + lock(JOB_GROUPS_LOCK) do + empty!(JOB_GROUPS) + end # clear_job_group!(OTHER_JOB_GROUP) # no need to init, will compute later anyway lock(JOB_QUEUE.lock_queuing) do @@ -152,13 +169,15 @@ function init_group_state!(job::Job) end function clear_job_group!(g::JobGroup) - g.total = 0 - g.queuing = 0 - g.running = 0 - g.done = 0 - g.failed = 0 - g.cancelled = 0 - empty!(g.jobs) + @atomic g.total = 0 + @atomic g.queuing = 0 + @atomic g.running = 0 + @atomic g.done = 0 + @atomic g.failed = 0 + @atomic g.cancelled = 0 + lock(g.lock_jobs) do + empty!(g.jobs) + end end """ @@ -178,20 +197,22 @@ function get_group(name::AbstractString, group_seperator = GROUP_SEPERATOR) end function compute_other_job_group!(groups_shown::Vector{JobGroup}) - OTHER_JOB_GROUP.total = ALL_JOB_GROUP.total - OTHER_JOB_GROUP.queuing = ALL_JOB_GROUP.queuing - OTHER_JOB_GROUP.running = ALL_JOB_GROUP.running - OTHER_JOB_GROUP.done = ALL_JOB_GROUP.done - OTHER_JOB_GROUP.failed = ALL_JOB_GROUP.failed - OTHER_JOB_GROUP.cancelled = ALL_JOB_GROUP.cancelled - empty!(OTHER_JOB_GROUP.jobs) + @atomic OTHER_JOB_GROUP.total = ALL_JOB_GROUP.total + @atomic OTHER_JOB_GROUP.queuing = ALL_JOB_GROUP.queuing + @atomic OTHER_JOB_GROUP.running = ALL_JOB_GROUP.running + @atomic OTHER_JOB_GROUP.done = ALL_JOB_GROUP.done + @atomic OTHER_JOB_GROUP.failed = ALL_JOB_GROUP.failed + @atomic OTHER_JOB_GROUP.cancelled = ALL_JOB_GROUP.cancelled + lock(OTHER_JOB_GROUP.lock_jobs) do + empty!(OTHER_JOB_GROUP.jobs) + end for g in groups_shown - OTHER_JOB_GROUP.total -= g.total - OTHER_JOB_GROUP.queuing -= g.queuing - OTHER_JOB_GROUP.running -= g.running - OTHER_JOB_GROUP.done -= g.done - OTHER_JOB_GROUP.failed -= g.failed - OTHER_JOB_GROUP.cancelled -= g.cancelled + @atomic OTHER_JOB_GROUP.total -= g.total + @atomic OTHER_JOB_GROUP.queuing -= g.queuing + @atomic OTHER_JOB_GROUP.running -= g.running + @atomic OTHER_JOB_GROUP.done -= g.done + @atomic OTHER_JOB_GROUP.failed -= g.failed + @atomic OTHER_JOB_GROUP.cancelled -= g.cancelled end if OTHER_JOB_GROUP.running > 0 # find one that is running @@ -201,7 +222,9 @@ function compute_other_job_group!(groups_shown::Vector{JobGroup}) if job._group in shown_group_names continue end - push!(OTHER_JOB_GROUP.jobs, job) + lock(OTHER_JOB_GROUP.lock_jobs) do + push!(OTHER_JOB_GROUP.jobs, job) + end break end end diff --git a/src/progress_view.jl b/src/progress_view.jl index 8356838..56e4d0e 100644 --- a/src/progress_view.jl +++ b/src/progress_view.jl @@ -1,8 +1,8 @@ # https://gist.github.com/ConnerWill/d4b6c776b509add763e17f9f113fd25b#erase-functions -erase_rest_line = Terming.CSI * "0K" -erase_current_line = Terming.CSI * "2K" +const erase_rest_line = Terming.CSI * "0K" +const erase_current_line = Terming.CSI * "2K" """ progress_bar(percent::Float64, width::Int = 20) @@ -100,10 +100,6 @@ function queue_progress(;remove_tmp_files::Bool = true, kwargs...) stderr_tmp_file = joinpath(homedir(), "julia_$(now_str).err") stderr_tmp = open(stderr_tmp_file, "w+") - # stdlog_tmp_file = joinpath(homedir(), "julia_$(now_str).log") - # stdlog_tmp_io = open(stdlog_tmp_file, "w+") - # stdlog_tmp = Logging.SimpleLogger(stdlog_tmp_io) - try queue_progress(stdout_tmp, stderr_tmp; kwargs...) catch @@ -111,11 +107,9 @@ function queue_progress(;remove_tmp_files::Bool = true, kwargs...) finally close(stdout_tmp) close(stderr_tmp) - # close(stdlog_tmp_io) if remove_tmp_files rm(stdout_tmp_file) rm(stderr_tmp_file) - # rm(stdlog_tmp_file) end end end @@ -307,7 +301,7 @@ function style_line(line::String, log_style::Symbol) end function view_update_resources(h::Int, w::Int; row::Int = 2, max_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU, max_mem::Int = JobSchedulers.SCHEDULER_MAX_MEM, is_in_terminal::Bool = true) - + global RESOURCE if h - row < 5 # no render: height not enough return row @@ -316,9 +310,9 @@ function view_update_resources(h::Int, w::Int; row::Int = 2, max_cpu::Int = JobS title = is_in_terminal ? @bold("CURRENT RESOURCES:") : "CURRENT RESOURCES:" cpu_text = (" CPU: ") - cpu_val = "$CPU_RUNNING/$max_cpu" + cpu_val = "$(RESOURCE.cpu)/$max_cpu" cpu_width = 9 + length(cpu_val) - if CPU_RUNNING < max_cpu + if RESOURCE.cpu < max_cpu cpu_text *= is_in_terminal ? @green(cpu_val) : cpu_val else cpu_text *= is_in_terminal ? @yellow(cpu_val) : cpu_val @@ -326,9 +320,9 @@ function view_update_resources(h::Int, w::Int; row::Int = 2, max_cpu::Int = JobS mem_text = (" MEM: ") - mem_percent = @sprintf("%3.2f%%", MEM_RUNNING / max_mem * 100) + mem_percent = @sprintf("%3.2f%%", RESOURCE.mem / max_mem * 100) mem_width = 9 + length(mem_percent) - if MEM_RUNNING < max_mem + if RESOURCE.mem < max_mem mem_text *= is_in_terminal ? @green(mem_percent) : mem_percent else mem_text *= is_in_terminal ? @yellow(mem_percent) : mem_percent @@ -353,15 +347,8 @@ end function view_update_job_group_title(h::Int, w::Int; row::Int = 2, is_in_terminal::Bool = true) - # description_plain = "[$(ALL_JOB_GROUP.running) running, $(ALL_JOB_GROUP.failed) failed + $(ALL_JOB_GROUP.cancelled) cancelled, $(ALL_JOB_GROUP.done) done / $(ALL_JOB_GROUP.total) total]" - if is_in_terminal title = @bold("JOB PROGRESS:") - # description = "[" * @green("$(ALL_JOB_GROUP.running) running") * ", " * - # @red("$(ALL_JOB_GROUP.failed) failed") * - # @yellow(" + $(ALL_JOB_GROUP.cancelled) cancelled") * ", " * - # "$(ALL_JOB_GROUP.done) done / " * - # @bold("$(ALL_JOB_GROUP.total) total") * "]" description = @dim("[") * @green("running") * @dim(", ") * @red("failed") * @yellow("+cancelled") * @dim(", ") * @@ -369,7 +356,6 @@ function view_update_job_group_title(h::Int, w::Int; row::Int = 2, is_in_termina @bold("total") * @dim("]") else title = "JOB PROGRESS:" - # description = "[$(ALL_JOB_GROUP.running) running, $(ALL_JOB_GROUP.failed) failed + $(ALL_JOB_GROUP.cancelled) cancelled, $(ALL_JOB_GROUP.done) done / $(ALL_JOB_GROUP.total) total]" description = "[running, failed+cancelled, done/total]" end width_description = 43 # 4 + length(description_plain) @@ -592,13 +578,15 @@ function view_update(h, w; row = 1, groups_shown::Vector{JobGroup} = JobGroup[], row = view_update_job_group(h, w; row = row, job_group = ALL_JOB_GROUP, highlight = true, is_in_terminal = is_in_terminal, group_seperator_at_begining = group_seperator_at_begining) # specific job groups - for job_group in values(JOB_GROUPS) - job_group.total < 2 && continue - if row >= h - 1 - break + lock(JOB_GROUPS_LOCK) do + for job_group in values(JOB_GROUPS) + job_group.total < 2 && continue + if row >= h - 1 + break + end + row = view_update_job_group(h, w; row = row, job_group = job_group, is_in_terminal = is_in_terminal, group_seperator_at_begining = group_seperator_at_begining) + push!(groups_shown, job_group) end - row = view_update_job_group(h, w; row = row, job_group = job_group, is_in_terminal = is_in_terminal, group_seperator_at_begining = group_seperator_at_begining) - push!(groups_shown, job_group) end compute_other_job_group!(groups_shown)