Skip to content

Commit

Permalink
v0.11.1: compat: 32bit system
Browse files Browse the repository at this point in the history
  • Loading branch information
cihga39871 committed Jan 11, 2025
2 parents b4f3a40 + 3a60eae commit a11ba98
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 104 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "JobSchedulers"
uuid = "eeff360b-c02d-44d3-ab26-4013c616a17e"
authors = ["Jiacheng Chuan <[email protected]>"]
version = "0.11.0"
version = "0.11.1"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Expand Down
4 changes: 4 additions & 0 deletions docs/src/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

v0.11.1

- Compat: 32-bit system.

v0.11.0

- Fix and compat: `Cron` has been rewritten based on the standard crontab, including its bug described [here](https://crontab.guru/cron-bug.html).
Expand Down
8 changes: 4 additions & 4 deletions src/JobQueue.jl
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ function n_job_remaining()
n
end

function are_remaining_jobs_more_than(x::Int)
function are_remaining_jobs_more_than(x::Integer)
n = length(JOB_QUEUE.running)
n > x && return true

Expand Down Expand Up @@ -182,13 +182,13 @@ end
update running: update state, cancel jobs reaching wall time, moving finished from running to others
"""
function update_running!(current::DateTime)
isempty(JOB_QUEUE.running) && return (0.0, 0)
isempty(JOB_QUEUE.running) && return (0.0, Int64(0))

@debug "update_running! lock_running"
used_ncpu, used_mem = lock(JOB_QUEUE.lock_running) do
id_delete = Int[]
used_ncpu = 0.0
used_mem = 0
used_mem = Int64(0)
for (i, job) in enumerate(JOB_QUEUE.running)

if job.state === CANCELLED
Expand Down Expand Up @@ -282,7 +282,7 @@ function move_future_to_queuing(current::DateTime)
@debug "move_future_to_queuing lock_queuing ok"
end

function run_queuing!(current::DateTime, free_ncpu::Float64, free_mem::Int)
function run_queuing!(current::DateTime, free_ncpu::Real, free_mem::Int64)
@debug "run_queuing! lock_queuing"
lock(JOB_QUEUE.lock_queuing)
try
Expand Down
4 changes: 2 additions & 2 deletions src/JobSchedulers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ function __init__()

# SCHEDULER_MAX_CPU must be the same as THREAD_POOL (if nthreads > 1), or the scheduler will stop.
global SCHEDULER_MAX_CPU = default_ncpu()
global SCHEDULER_MAX_MEM = round(Int, Sys.total_memory() * 0.9)
global SCHEDULER_UPDATE_SECOND = ifelse(nthreads() > 1, 0.01, 0.05)
global SCHEDULER_MAX_MEM = round(Int64, Sys.total_memory() * 0.9)
global SCHEDULER_UPDATE_SECOND = Float64(ifelse(nthreads() > 1, 0.01, 0.05))
scheduler_start(verbose=false)
end

Expand Down
2 changes: 1 addition & 1 deletion src/backup.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ function recover_backup(filepath::AbstractString; recover_settings::Bool = true,
@debug "recover_backup($filepath; recover_settings = $recover_settings, recover_queue = $recover_queue)"

# get the current jobs, to avoid copying existing jobs
current_job_ids = Set{Int}()
current_job_ids = Set{Int64}()
lock(JOB_QUEUE.lock_queuing) do
for jobs in values(JOB_QUEUE.queuing)
for job in jobs
Expand Down
14 changes: 7 additions & 7 deletions src/bit.jl
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@

## Indexing

@inline function unsafe_bitgetindex(uint::UInt64, i::Int)
@inline function unsafe_bitgetindex(uint::UInt64, i::Integer)
u = UInt64(1) << i
r = (uint & u) != 0
return r
end

function bitcheckbounds(uint::Unsigned, i::Int)
function bitcheckbounds(uint::Unsigned, i::Integer)
if i < 0 || i > (sizeof(uint) * 8 - 1)
error("BoundsError: attempt to access bits of $(typeof(uint)) at index [$i]")
end
end

@inline function bitgetindex(uint::UInt64, i::Int)
@inline function bitgetindex(uint::UInt64, i::Integer)
@boundscheck bitcheckbounds(uint, i)
unsafe_bitgetindex(uint, i)
end

function unsafe_bitfindnext(uint::UInt64, start::Int)
function unsafe_bitfindnext(uint::UInt64, start::Integer)
mask = 0xffffffffffffffff << start
if uint & mask != 0
return trailing_zeros(uint & mask)
Expand All @@ -43,7 +43,7 @@ end
Returns the index of the next true element in the `range` of `uint`, or nothing if all false. Index of unit starts from 0.
"""
function bitfindnext(uint::UInt64, start::Integer, r::UnitRange{Int64}; not_found = nothing)
function bitfindnext(uint::UInt64, start::Integer, r::UnitRange{<:Integer}; not_found = nothing)
start = Int(start)
if start == r.stop + 1 # carry (in math)
start = r.start
Expand Down Expand Up @@ -75,8 +75,8 @@ function bitfindnext(uint::UInt64, start::Integer, r::UnitRange{Int64}; not_foun
end


function bitsfind(uint::UInt64, r::UnitRange{Int64}; empty_add_0::Bool = false)
res = Vector{Int64}()
function bitsfind(uint::UInt64, r::UnitRange{<:Integer}; empty_add_0::Bool = false)
res = Vector{Int}()
for i in r
if bitgetindex(uint, i)
push!(res, i)
Expand Down
6 changes: 3 additions & 3 deletions src/compat_pipelines.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ function Job(p::Program;
name::AbstractString = p.name,
user::AbstractString = "",
ncpu::Real = 1.0,
mem::Int64 = 0,
mem::Integer = 0,
schedule_time::Union{DateTime,Period} = DateTime(0),
wall_time::Period = Year(1),
cron::Cron = Cron(:none),
until::Union{DateTime,Period} = DateTime(9999),
priority::Int = 20,
dependency = Vector{Pair{Symbol,Union{Int64,Job}}}(),
dependency = Vector{Pair{Symbol,Union{Int,Job}}}(),
stdout = nothing,
stderr = nothing,
dir::AbstractString = "",
Expand Down Expand Up @@ -64,7 +64,7 @@ function Job(p::Program;
end
elseif pair.second == :mem
if mem == 0
mem = val isa Number ? Int(val) : parse(Int, val)
mem = val isa Number ? Int64(val) : parse(Int64, val)
end
end
end
Expand Down
26 changes: 13 additions & 13 deletions src/control.jl
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ function scheduler_status(; verbose=true)
end

"""
set_scheduler_update_second(s::Float64 = 0.6)
set_scheduler_update_second(s::AbstractFloat = 0.6)
Set the update interval of scheduler.
"""
function set_scheduler_update_second(s::Float64 = 0.6)
function set_scheduler_update_second(s::AbstractFloat = 0.6)
@warn "set_scheduler_update_second(s) is no longer required. The Job's scheduler updates when needed automatically." maxlog=1
s <= 0.001 && error("schedular update interval cannot be less than 0.001.")
global SCHEDULER_UPDATE_SECOND = s
global SCHEDULER_UPDATE_SECOND = Float64(s)
end
set_scheduler_update_second(s) = set_scheduler_update_second(convert(Float64, s))

Expand Down Expand Up @@ -183,10 +183,10 @@ function set_scheduler_max_cpu(percent::Float64)
end
end

default_mem() = round(Int, Sys.total_memory() * 0.8)
default_mem() = round(Int64, Sys.total_memory() * 0.8)
"""
set_scheduler_max_mem(mem::Int = default_mem())
set_scheduler_max_mem(percent::Float64)
set_scheduler_max_mem(mem::Integer = default_mem())
set_scheduler_max_mem(percent::AbstractFloat)
Set the maximum RAM the scheduler can use.
Expand All @@ -200,16 +200,16 @@ Set the maximum RAM the scheduler can use.
set_scheduler_max_mem(0.5) # use 50% of total memory
"""
function set_scheduler_max_mem(mem::Int = default_mem())
function set_scheduler_max_mem(mem::Integer = default_mem())
mem < 1 && error("number of memory cannot be less than 1.")
if mem > Sys.total_memory() * 0.9
@warn "Assigning memory > 90% of total memory."
end
global SCHEDULER_MAX_MEM = mem
end
function set_scheduler_max_mem(percent::Float64)
function set_scheduler_max_mem(percent::AbstractFloat)
if 0.0 < percent < 1.0
set_scheduler_max_mem(round(Int, Sys.total_memory() * percent))
set_scheduler_max_mem(round(Int64, Sys.total_memory() * percent))
else
@error "Percent::Float64 should be between 0 and 1. Are you looking for set_scheduler_max_mem(mem::Int) ?"
end
Expand All @@ -231,8 +231,8 @@ end

"""
set_scheduler(;
max_cpu::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_MEM,
max_cpu::Real = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem::Real = JobSchedulers.SCHEDULER_MAX_MEM,
max_job::Int = JobSchedulers.JOB_QUEUE.max_done,
max_cancelled_job::Int = JobSchedulers.JOB_QUEUE.max_cancelled_job
)
Expand All @@ -246,8 +246,8 @@ See details:
[`set_scheduler_max_job`](@ref)
"""
function set_scheduler(;
max_cpu::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem::Union{Int,Float64} = JobSchedulers.SCHEDULER_MAX_MEM,
max_cpu::Real = JobSchedulers.SCHEDULER_MAX_CPU,
max_mem::Real = JobSchedulers.SCHEDULER_MAX_MEM,
max_job::Int = JobSchedulers.JOB_QUEUE.max_done,
max_cancelled_job::Int = JobSchedulers.JOB_QUEUE.max_cancelled_job,
update_second = JobSchedulers.SCHEDULER_UPDATE_SECOND
Expand Down
12 changes: 6 additions & 6 deletions src/job_recur.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ end
Jobs are executed when the **second, minute, hour and month** fields match the current time. If neither `day_of_month` nor `day_of_week` **starts with** `*`, cron takes the union (∪) of their values `day_of_month ∪ day_of_week`. Otherwise cron takes the intersection (∩) of their values `day_of_month ∩ day_of_week`.
## When an argument is an `Int64`:
## When an argument is an `Int`:
| Field | Allowed values |
| -------------- | -------------------------- |
Expand Down Expand Up @@ -373,8 +373,8 @@ end
c.month & 0x0000000000001ffe == 0
end

@inline function is_one_at(uint::UInt64, idx::Int64)
x = 1 << idx
@inline function is_one_at(uint::Unsigned, idx::Signed)
x = UInt64(1) << idx
uint & x == x
end

Expand Down Expand Up @@ -501,7 +501,7 @@ function get_time_description(c::Cron)
str
end

function get_second_description(seconds::Vector{Int64})
function get_second_description(seconds::Vector{Int})
if length(seconds) == 0
"0 second"
elseif length(seconds) == 1
Expand All @@ -511,7 +511,7 @@ function get_second_description(seconds::Vector{Int64})
"$(str) seconds"
end
end
function get_minute_description(minutes::Vector{Int64})
function get_minute_description(minutes::Vector{Int})
if length(minutes) == 0
"0 minute"
elseif length(minutes) == 1
Expand All @@ -521,7 +521,7 @@ function get_minute_description(minutes::Vector{Int64})
"$(str) minutes"
end
end
function get_hour_description(hours::Vector{Int64})
function get_hour_description(hours::Vector{Int})
if length(hours) == 0
"0 hour"
elseif length(hours) == 1
Expand Down
31 changes: 17 additions & 14 deletions src/jobs.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

const JOB_ID = Ref{Int}()
const JOB_ID = Ref{Int64}()
const JOB_ID_INCREMENT_LOCK = ReentrantLock()
"""
generate_id() :: Int64
Expand Down Expand Up @@ -28,7 +28,7 @@ end
- `name::String = ""`: job name.
- `user::String = ""`: user that job belongs to.
- `ncpu::Real = 1.0`: number of CPU this job is about to use (can be `Float64`, eg: `1.5` will use 150% CPU).
- `mem::Int64 = 0`: number of memory this job is about to use (supports TB, GB, MB, KB, B=1).
- `mem::Integer = 0`: number of memory this job is about to use (supports TB, GB, MB, KB, B=1).
- `schedule_time::Union{DateTime,Period} = DateTime(0)`: The expected time to run.
- `dependency`: defer job until specified jobs reach specified state (QUEUING, RUNNING, DONE, FAILED, CANCELLED, PAST). PAST is the super set of DONE, FAILED, CANCELLED, which means the job will not run in the future. Eg: `DONE => job`, `[DONE => job1; PAST => job2]`.
Expand Down Expand Up @@ -79,10 +79,10 @@ mutable struct Job
_group::String
_group_state::Symbol

function Job(id::Int64, name::String, user::String, ncpu::Real, mem::Int64, schedule_time::ST, submit_time::DateTime, start_time::DateTime, stop_time::DateTime, wall_time::Period, cron::Cron, until::ST2, state::Symbol, priority::Int, dependency, task::Union{Task,Nothing}, stdout::Union{IO,AbstractString,Nothing}, stderr::Union{IO,AbstractString,Nothing}, _thread_id::Int, _func::Union{Function,Nothing}, _need_redirect::Bool = check_need_redirect(stdout, stderr), _group::AbstractString = "") where {ST<:Union{DateTime,Period}, ST2<:Union{DateTime,Period}}
function Job(id::Integer, name::String, user::String, ncpu::Real, mem::Integer, schedule_time::ST, submit_time::DateTime, start_time::DateTime, stop_time::DateTime, wall_time::Period, cron::Cron, until::ST2, state::Symbol, priority::Int, dependency, task::Union{Task,Nothing}, stdout::Union{IO,AbstractString,Nothing}, stderr::Union{IO,AbstractString,Nothing}, _thread_id::Int, _func::Union{Function,Nothing}, _need_redirect::Bool = check_need_redirect(stdout, stderr), _group::AbstractString = "") where {ST<:Union{DateTime,Period}, ST2<:Union{DateTime,Period}}
check_ncpu_mem(ncpu, mem)
check_priority(priority)
new(id, name, user, Float64(ncpu), mem, period2datetime(schedule_time), submit_time, start_time, stop_time, wall_time, cron, period2datetime(until), state, priority, convert_dependency(dependency), task, stdout, stderr, _thread_id, _func, _need_redirect, _group, :nothing)
new(Int64(id), name, user, Float64(ncpu), Int64(mem), period2datetime(schedule_time), submit_time, start_time, stop_time, wall_time, cron, period2datetime(until), state, priority, convert_dependency(dependency), task, stdout, stderr, _thread_id, _func, _need_redirect, _group, :nothing)
end
end

Expand All @@ -93,13 +93,13 @@ function Job(task::Task;
name::AbstractString = "",
user::AbstractString = "",
ncpu::Real = 1.0,
mem::Int64 = 0,
mem::Integer = 0,
schedule_time::Union{DateTime,Period} = DateTime(0),
wall_time::Period = Year(1),
cron::Cron = cron_none,
until::Union{DateTime,Period} = DateTime(9999),
priority::Int = 20,
dependency = Vector{Pair{Symbol,Int64}}(),
dependency = Vector{Pair{Symbol,Int}}(),
stdout=nothing, stderr=nothing, append::Bool=false
)
if ncpu > 1.001
Expand Down Expand Up @@ -146,13 +146,13 @@ function Job(f::Function;
name::AbstractString = "",
user::AbstractString = "",
ncpu::Real = 1.0,
mem::Int64 = 0,
mem::Integer = 0,
schedule_time::Union{DateTime,Period} = DateTime(0),
wall_time::Period = Year(1),
cron::Cron = cron_none,
until::Union{DateTime,Period} = DateTime(9999),
priority::Int = 20,
dependency = Vector{Pair{Symbol,Int64}}(),
dependency = Vector{Pair{Symbol,Int}}(),
stdout=nothing, stderr=nothing, append::Bool=false
)
if ncpu > 1.001
Expand Down Expand Up @@ -198,13 +198,13 @@ function Job(command::Base.AbstractCmd;
name::AbstractString = "",
user::AbstractString = "",
ncpu::Real = 1.0,
mem::Int64 = 0,
mem::Integer = 0,
schedule_time::Union{DateTime,Period} = DateTime(0),
wall_time::Period = Year(1),
cron::Cron = cron_none,
until::Union{DateTime,Period} = DateTime(9999),
priority::Int = 20,
dependency = Vector{Pair{Symbol,Int64}}(),
dependency = Vector{Pair{Symbol,Int}}(),
stdout=nothing, stderr=nothing, append::Bool=false
)
f() = run(command)
Expand Down Expand Up @@ -259,7 +259,7 @@ end
period2datetime(t::DateTime) = t
period2datetime(t::Period) = now() + t

function check_ncpu_mem(ncpu::Real, mem::Int64)
function check_ncpu_mem(ncpu::Real, mem::Integer)
if ncpu < 0
error("ncpu < 0 is not supported for Job.")
elseif 0.001 <= ncpu <= 0.999
Expand Down Expand Up @@ -296,9 +296,12 @@ end
function convert_dependency_element(p::Pair{Symbol,T}) where T # do not specify T's type!!!
p
end
function convert_dependency_element(job::T) where T <: Union{Int64, Job}
function convert_dependency_element(job::Job)
DONE => job
end
function convert_dependency_element(job::Integer)
DONE => Int64(job)
end

"""
isqueuing(j::Job) :: Bool
Expand Down Expand Up @@ -355,7 +358,7 @@ get_priority(job::Job) = job.priority

"""
solve_optimized_ncpu(default::Int;
ncpu_range::UnitRange{Int64} = 1:total_cpu,
ncpu_range::UnitRange{Int} = 1:total_cpu,
njob::Int = 1,
total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU,
side_jobs_cpu::Int = 0)
Expand All @@ -368,7 +371,7 @@ Find the optimized number of CPU for a job.
- `total_cpu`: the total CPU that can be used by JobSchedulers.
- `side_jobs_cpu`: some small jobs that might be run when the job is running, so the job won't use up all of the resources and stop small tasks.
"""
function solve_optimized_ncpu(default::Int; njob::Int = 1, total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU, ncpu_range::UnitRange{Int64} = 1:total_cpu, side_jobs_cpu::Int = 0)
function solve_optimized_ncpu(default::Int; njob::Int = 1, total_cpu::Int = JobSchedulers.SCHEDULER_MAX_CPU, ncpu_range::UnitRange{Int} = 1:total_cpu, side_jobs_cpu::Int = 0)
mincpu = max(ncpu_range.start, 1)
maxcpu = ncpu_range.stop
if !(mincpu <= maxcpu <= total_cpu)
Expand Down
Loading

0 comments on commit a11ba98

Please sign in to comment.