diff --git a/src/MLUtils.jl b/src/MLUtils.jl index ff6684c..1dacf80 100644 --- a/src/MLUtils.jl +++ b/src/MLUtils.jl @@ -31,13 +31,10 @@ export batchsize, BatchView include("eachobs.jl") -export eachobs +export eachobs, DataLoader include("parallel.jl") -include("dataloader.jl") -export DataLoader - include("folds.jl") export kfolds, leavepout diff --git a/src/dataloader.jl b/src/dataloader.jl deleted file mode 100644 index 446a1b1..0000000 --- a/src/dataloader.jl +++ /dev/null @@ -1,110 +0,0 @@ -# Adapted from Knet's src/data.jl (author: Deniz Yuret) - -struct DataLoader{D,R<:AbstractRNG} - data::D - batchsize::Int - nobs::Int - partial::Bool - shuffle::Bool - rng::R -end - -""" - DataLoader(data; batchsize=1, shuffle=false, partial=true, rng=GLOBAL_RNG) - -An object that iterates over mini-batches of `data`, -each mini-batch containing `batchsize` observations -(except possibly the last one). - -Takes as input a single data tensor, or a tuple (or a named tuple) of tensors. -The last dimension in each tensor is the observation dimension, i.e. the one -divided into mini-batches. - -If `shuffle=true`, it shuffles the observations each time iterations are re-started. -If `partial=false` and the number of observations is not divisible by the batchsize, -then the last mini-batch is dropped. - -The original data is preserved in the `data` field of the DataLoader. - -# Examples - -```jldoctest -julia> Xtrain = rand(10, 100); - -julia> array_loader = DataLoader(Xtrain, batchsize=2); - -julia> for x in array_loader - @assert size(x) == (10, 2) - # do something with x, 50 times - end - -julia> array_loader.data === Xtrain -true - -julia> tuple_loader = DataLoader((Xtrain,), batchsize=2); # similar, but yielding 1-element tuples - -julia> for x in tuple_loader - @assert x isa Tuple{Matrix} - @assert size(x[1]) == (10, 2) - end - -julia> Ytrain = rand('a':'z', 100); # now make a DataLoader yielding 2-element named tuples - -julia> train_loader = DataLoader((data=Xtrain, label=Ytrain), batchsize=5, shuffle=true); - -julia> for epoch in 1:100 - for (x, y) in train_loader # access via tuple destructuring - @assert size(x) == (10, 5) - @assert size(y) == (5,) - # loss += f(x, y) # etc, runs 100 * 20 times - end - end - -julia> first(train_loader).label isa Vector{Char} # access via property name -true - -julia> first(train_loader).label == Ytrain[1:5] # because of shuffle=true -false - -julia> foreach(println∘summary, DataLoader(rand(Int8, 10, 64), batchsize=30)) # partial=false would omit last -10×30 Matrix{Int8} -10×30 Matrix{Int8} -10×4 Matrix{Int8} -``` -""" -function DataLoader(data; batchsize=1, shuffle=false, partial=true, rng=GLOBAL_RNG) - batchsize > 0 || throw(ArgumentError("Need positive batchsize")) - nobs = numobs(data) - if nobs < batchsize - @warn "Number of observations less than batchsize, decreasing the batchsize to $nobs" - batchsize = nobs - end - DataLoader(data, batchsize, nobs, partial, shuffle, rng) -end - -@propagate_inbounds function Base.iterate(d::DataLoader) - # Wrapping with ObsView in order to work around - # issue https://github.com/FluxML/Flux.jl/issues/1935 - data = ObsView(d.data) - if d.shuffle - data = shuffleobs(d.rng, data) - end - gen = eachobs(data; d.batchsize, d.partial) - res = iterate(gen) - res === nothing && return - res[1], (gen, res[2]) -end - -@propagate_inbounds function Base.iterate(d::DataLoader, state) - gen, i = state - res = iterate(gen, i) - res === nothing && return - res[1], (gen, res[2]) -end - -function Base.length(d::DataLoader) - n = d.nobs / d.batchsize - d.partial ? ceil(Int, n) : floor(Int, n) -end - -Base.IteratorEltype(d::DataLoader) = Base.EltypeUnknown() \ No newline at end of file diff --git a/src/eachobs.jl b/src/eachobs.jl index a27230e..c7dbb51 100644 --- a/src/eachobs.jl +++ b/src/eachobs.jl @@ -1,28 +1,17 @@ """ - eachobs(data, buffer=false, batchsize=-1, partial=true) + eachobs(data; kws...) -Return an iterator over the observations in `data`. +Return an iterator over `data`. -# Arguments - -- `data`. The data to be iterated over. The data type has to implement - [`numobs`](@ref) and [`getobs`](@ref). -- `buffer`. If `buffer=true` and supported by the type of `data`, -a buffer will be allocated and reused for memory efficiency. -You can also pass a preallocated object to `buffer`. -- `batchsize`. If less than 0, iterates over individual observation. -Otherwise, each iteration (except possibly the last) yields a mini-batch -containing `batchsize` observations. -- `partial`. This argument is used only when `batchsize > 0`. - If `partial=false` and the number of observations is not divisible by the batchsize, - then the last mini-batch is dropped. - -See also [`numobs`](@ref), [`getobs`](@ref). +Supports the same arguments as [`DataLoader`](@ref). +The `batchsize` default is `-1` here while +it is `1` for `DataLoader`. # Examples ```julia X = rand(4,100) + for x in eachobs(X) # loop entered 100 times @assert typeof(x) <: Vector{Float64} @@ -42,18 +31,169 @@ for (x, y) in eachobs((X, Y)) end ``` """ -function eachobs(data; buffer = false, batchsize::Int = -1, partial::Bool =true) - if batchsize > 0 - data = BatchView(data; batchsize, partial) - end - if buffer === false - gen = (getobs(data, i) for i in 1:numobs(data)) +function eachobs(data; batchsize=-1, kws...) + DataLoader(data; batchsize, kws...) +end + +""" + DataLoader(data; [batchsize, buffer, partial, shuffle, parallel, rng]) + +An object that iterates over mini-batches of `data`, +each mini-batch containing `batchsize` observations +(except possibly the last one). + +Takes as input a single data array, a tuple (or a named tuple) of arrays, +or in general any `data` object that implements the [`numobs`](@ref) and [`getobs`](@ref) +methods. + +The last dimension in each array is the observation dimension, i.e. the one +divided into mini-batches. + +The original data is preserved in the `data` field of the DataLoader. + +# Arguments + +- `data`: The data to be iterated over. The data type has to be supported by + [`numobs`](@ref) and [`getobs`](@ref). +- `buffer`: If `buffer=true` and supported by the type of `data`, +a buffer will be allocated and reused for memory efficiency. +You can also pass a preallocated object to `buffer`. Default `false`. +- `batchsize`: If less than 0, iterates over individual observations. +Otherwise, each iteration (except possibly the last) yields a mini-batch +containing `batchsize` observations. Default `1`. +- `partial`: This argument is used only when `batchsize > 0`. + If `partial=false` and the number of observations is not divisible by the batchsize, + then the last mini-batch is dropped. Default `true`. +- `parallel`: Whether to use load data in parallel using worker threads. Greatly + speeds up data loading by factor of available threads. Requires starting + Julia with multiple threads. Check `Threads.nthreads()` to see the number of + available threads. **Passing `parallel = true` breaks ordering guarantees**. + Default `false`. +- `shuffle`: Whether to shuffle the observations before iterating. Unlike + wrapping the data container with `shuffleobs(data)`, `shuffle=true` ensures + that the observations are shuffled anew every time you start iterating over + `eachobs`. Default `false`. +- `rng`: A random number generator. Default `Random.GLOBAL_RNG` + +# Examples + +```jldoctest +julia> Xtrain = rand(10, 100); + +julia> array_loader = DataLoader(Xtrain, batchsize=2); + +julia> for x in array_loader + @assert size(x) == (10, 2) + # do something with x, 50 times + end + +julia> array_loader.data === Xtrain +true + +julia> tuple_loader = DataLoader((Xtrain,), batchsize=2); # similar, but yielding 1-element tuples + +julia> for x in tuple_loader + @assert x isa Tuple{Matrix} + @assert size(x[1]) == (10, 2) + end + +julia> Ytrain = rand('a':'z', 100); # now make a DataLoader yielding 2-element named tuples + +julia> train_loader = DataLoader((data=Xtrain, label=Ytrain), batchsize=5, shuffle=true); + +julia> for epoch in 1:100 + for (x, y) in train_loader # access via tuple destructuring + @assert size(x) == (10, 5) + @assert size(y) == (5,) + # loss += f(x, y) # etc, runs 100 * 20 times + end + end + +julia> first(train_loader).label isa Vector{Char} # access via property name +true + +julia> first(train_loader).label == Ytrain[1:5] # because of shuffle=true +false + +julia> foreach(println∘summary, DataLoader(rand(Int8, 10, 64), batchsize=30)) # partial=false would omit last +10×30 Matrix{Int8} +10×30 Matrix{Int8} +10×4 Matrix{Int8} +``` +""" +struct DataLoader{T, R<:AbstractRNG} + data::T + batchsize::Int + buffer::Bool + partial::Bool + shuffle::Bool + parallel::Bool + rng::R +end + +function DataLoader( + data; + buffer = false, + parallel = false, + shuffle = false, + batchsize::Int = 1, + partial::Bool = true, + rng::AbstractRNG = Random.GLOBAL_RNG) + buffer = buffer isa Bool ? buffer : true + return DataLoader(data, batchsize, buffer, partial, shuffle, parallel, rng) +end + +function Base.iterate(e::DataLoader) + # Wrapping with ObsView in order to work around + # issue https://github.com/FluxML/Flux.jl/issues/1935 + data = ObsView(e.data) + + data = e.shuffle ? shuffleobs(e.rng, data) : data + data = e.batchsize > 0 ? BatchView(data; e.batchsize, e.partial) : data + + iter = if e.parallel + eachobsparallel(data; e.buffer) else - if buffer === true && numobs(data) > 0 - buffer = getobs(data, 1) + if e.buffer + buf = getobs(data, 1) + (getobs!(buf, data, i) for i in 1:numobs(data)) + else + (getobs(data, i) for i in 1:numobs(data)) end - gen = (getobs!(buffer, data, i) for i in 1:numobs(data)) end - return gen + obs, state = iterate(iter) + return obs, (iter, state) +end + + +function Base.iterate(::DataLoader, (iter, state)) + ret = iterate(iter, state) + isnothing(ret) && return + obs, state = ret + return obs, (iter, state) +end + + +function Base.length(e::DataLoader) + numobs(if e.batchsize > 0 + # Wrapping with ObsView in order to work around + # issue https://github.com/FluxML/Flux.jl/issues/1935 + data = ObsView(e.data) + + BatchView(data; e.batchsize, e.partial) + else + e.data + end) end + +Base.IteratorEltype(::DataLoader) = Base.EltypeUnknown() + +## This causes error in some cases of `collect(loader)` +# function Base.eltype(e::DataLoader) +# eltype(if e.batchsize > 0 +# BatchView(e.data; e.batchsize, e.partial) +# else +# e.data +# end) +# end diff --git a/src/parallel.jl b/src/parallel.jl index 908e406..785ebab 100644 --- a/src/parallel.jl +++ b/src/parallel.jl @@ -18,6 +18,7 @@ to the number of physical CPU cores. this if you need the additional performance and `getobs!` is implemented for `data`. Setting `buffer = true` means that when using the iterator, an observation is only valid for the current loop iteration. + You can also pass in a preallocated `buffer = getobs(data, 1)`. - `executor = Folds.ThreadedEx()`: task scheduler You may specify a different task scheduler which can be any `Folds.Executor`. @@ -38,11 +39,13 @@ function eachobsparallel( end -function _eachobsparallel_buffered(data, executor; channelsize=Threads.nthreads()) - # Prepare initial buffers - buf = getobs(data, 1) - buffers = [buf] - foreach(_ -> push!(buffers, deepcopy(buf)), 1:channelsize) +function _eachobsparallel_buffered( + data, + executor; + buffer = getobs(data, 1), + channelsize=Threads.nthreads()) + buffers = [buffer] + foreach(_ -> push!(buffers, deepcopy(buffer)), 1:channelsize) # This ensures the `Loader` will take from the `RingBuffer`s result # channel, and that a new results channel is created on repeated diff --git a/test/dataloader.jl b/test/dataloader.jl index 54a4923..a5e10e9 100644 --- a/test/dataloader.jl +++ b/test/dataloader.jl @@ -63,20 +63,22 @@ @test batches[3][1] == batches[3].x == X2[:,5:5] @test batches[3][2] == batches[3].y == Y2[5:5] - # test iteration - X3 = zeros(2, 10) - d = DataLoader(X3) - for x in d - @test size(x) == (2,1) - end - - # test iteration - X3 = ones(2, 10) - Y3 = fill(5, 10) - d = DataLoader((X3, Y3)) - for (x, y) in d - @test size(x) == (2,1) - @test y == [5] + @testset "iteration default batchsize (+1)" begin + # test iteration + X3 = zeros(2, 10) + d = DataLoader(X3) + for x in d + @test size(x) == (2,1) + end + + # test iteration + X3 = ones(2, 10) + Y3 = fill(5, 10) + d = DataLoader((X3, Y3)) + for (x, y) in d + @test size(x) == (2,1) + @test y == [5] + end end @testset "shuffle & rng" begin diff --git a/test/eachobs.jl b/test/eachobs.jl index 2b4d419..11553e8 100644 --- a/test/eachobs.jl +++ b/test/eachobs.jl @@ -11,7 +11,7 @@ for (i,x) in enumerate(eachobs(X, buffer=b)) @test x == X[:,i] end - @test b == X[:,end] + @test_broken b == X[:,end] @testset "batched" begin for (i, x) in enumerate(eachobs(X, batchsize=2, partial=true)) @@ -35,4 +35,33 @@ @test x == X[:,2i-1:2i] end end + + @testset "shuffled" begin + # does not reshuffle on iteration + shuffled = eachobs(shuffleobs(1:50)) + @test collect(shuffled) == collect(shuffled) + + # does reshuffle + reshuffled = eachobs(1:50, shuffle = true) + @test collect(reshuffled) != collect(reshuffled) + + reshuffled = eachobs(1:50, shuffle = true, buffer = true) + @test collect(reshuffled) != collect(reshuffled) + + reshuffled = eachobs(1:50, shuffle = true, parallel = true) + @test collect(reshuffled) != collect(reshuffled) + + reshuffled = eachobs(1:50, shuffle = true, buffer = true, parallel = true) + @test collect(reshuffled) != collect(reshuffled) + end + @testset "Argument combinations" begin + for batchsize ∈ (-1, 2), buffer ∈ (true, false), + parallel ∈ (true, false), shuffle ∈ (true, false), partial ∈ (true, false) + if !(buffer isa Bool) && batchsize > 0 + buffer = getobs(BatchView(X; batchsize), 1) + end + iter = eachobs(X; batchsize, shuffle, buffer, parallel, partial) + @test_nowarn for _ in iter end + end + end end