Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support basic nested schema #188

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/reader.jl
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ function is_par_file(io)
magic = Array{UInt8}(undef, 4)
read!(io, magic)
(String(magic) == PAR_MAGIC) || return false

seek(io, sz - SZ_PAR_MAGIC)
magic = Array{UInt8}(undef, 4)
read!(io, magic)
Expand Down
152 changes: 88 additions & 64 deletions src/schema.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,34 @@
mutable struct Schema
schema::Vector{SchemaElement}
map_logical_types::TLogicalTypeMap
name_lookup::Dict{Vector{String},SchemaElement}
graph::Dict{Int,Vector{Int}}
name_lookup::Dict{Vector{String},Int}
type_lookup::Dict{Vector{String},Union{DataType,Union}}
nttype_lookup::Dict{Vector{String},Union{DataType,Union}}

function Schema(elems::Vector{SchemaElement}, map_logical_types::TLogicalTypeMap=TLogicalTypeMap())
name_lookup = Dict{Vector{String},SchemaElement}()
name_lookup = Dict{Vector{String},Int}()
graph = Dict{Int,Vector{Int}}(1 => [])
name_stack = String[]
nchildren_stack = Int[]
idx_stack = Vector{Int}()
base_idx = 1

for idx in 1:length(elems)
sch = elems[idx]
nested_name = [name_stack; sch.name]
name_lookup[nested_name] = sch
name_lookup[nested_name] = idx

if !isempty(idx_stack)
parent_idx = idx_stack[end]
if haskey(graph, parent_idx)
push!(graph[parent_idx], idx)
else
graph[parent_idx] = [idx]
end
else
base_idx != idx && push!(graph[base_idx], idx)
end

if !haskey(map_logical_types, nested_name)
if is_logical_string(sch)
Expand All @@ -34,40 +49,40 @@
if (idx > 1) && (num_children(sch) > 0)
push!(nchildren_stack, sch.num_children)
push!(name_stack, sch.name)
push!(idx_stack, idx)
elseif !isempty(nchildren_stack)
if nchildren_stack[end] == 1
while !isempty(nchildren_stack) && nchildren_stack[end] == 1
pop!(nchildren_stack)
pop!(name_stack)
else
pop!(idx_stack)
end
if !isempty(nchildren_stack)
nchildren_stack[end] -= 1
end
end
end
new(elems, map_logical_types, name_lookup, Dict{Vector{String},Union{DataType,Union}}(), Dict{Vector{String},Union{DataType,Union}}())
new(elems, map_logical_types, graph, name_lookup, Dict{Vector{String},Union{DataType,Union}}(), Dict{Vector{String},Union{DataType,Union}}())
end
end

leafname(schname::T) where {T <: AbstractVector{String}} = [schname[end]]
leafname(schname::T) where {T<:AbstractVector{String}} = [schname[end]]

Check warning on line 68 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L68

Added line #L68 was not covered by tests

parentname(schname::T) where {T <: AbstractVector{String}} = istoplevel(schname) ? schname : schname[1:(end-1)]
parentname(schname::T) where {T<:AbstractVector{String}} = istoplevel(schname) ? schname : schname[1:(end-1)]

istoplevel(schname::Vector) = !(length(schname) > 1)
istoplevel(schname::Vector) = length(schname) == 1

elem(sch::Schema, schname::T) where {T <: AbstractVector{String}} = sch.name_lookup[schname]
function elemindex(sch::Schema, schname::T) where {T <: AbstractVector{String}}
schema_element = elem(sch, schname)
findfirst(x->x===schema_element, sch.schema)
end
elem(sch::Schema, schname::T) where {T<:AbstractVector{String}} = sch.schema[sch.name_lookup[schname]]
elemindex(sch::Schema, schname::T) where {T<:AbstractVector{String}} = sch.name_lookup[schname]

Check warning on line 75 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L75

Added line #L75 was not covered by tests

isrepetitiontype(schelem::SchemaElement, repetition_type) = hasproperty(schelem, :repetition_type) && (schelem.repetition_type == repetition_type)

isrequired(sch::Schema, schname::T) where {T <: AbstractVector{String}} = isrequired(elem(sch, schname))
isrequired(sch::Schema, schname::T) where {T<:AbstractVector{String}} = isrequired(elem(sch, schname))
isrequired(schelem::SchemaElement) = isrepetitiontype(schelem, FieldRepetitionType.REQUIRED)

isoptional(sch::Schema, schname::T) where {T <: AbstractVector{String}} = isoptional(elem(sch, schname))
isoptional(sch::Schema, schname::T) where {T<:AbstractVector{String}} = isoptional(elem(sch, schname))

Check warning on line 82 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L82

Added line #L82 was not covered by tests
isoptional(schelem::SchemaElement) = isrepetitiontype(schelem, FieldRepetitionType.OPTIONAL)

isrepeated(sch::Schema, schname::T) where {T <: AbstractVector{String}} = isrepeated(elem(sch, schname))
isrepeated(sch::Schema, schname::T) where {T<:AbstractVector{String}} = isrepeated(elem(sch, schname))
isrepeated(schelem::SchemaElement) = isrepetitiontype(schelem, FieldRepetitionType.REPEATED)

is_logical_string(sch::SchemaElement) = hasproperty(sch, :_type) && (sch._type === _Type.BYTE_ARRAY) && ((hasproperty(sch, :converted_type) && (sch.converted_type === ConvertedType.UTF8)) || (hasproperty(sch, :logicalType) && hasproperty(sch.logicalType, :STRING)))
Expand All @@ -87,14 +102,14 @@
end

function path_in_schema(sch::Schema, schelem::SchemaElement)
for (n,v) in sch.name_lookup
(v === schelem) && return n
for (n, v) in sch.name_lookup
(sch.schema[v] === schelem) && return n
end
error("schema element not found in schema")
end

function logical_converter(sch::Schema, schname::T) where {T <: AbstractVector{String}}
elem = sch.name_lookup[schname]
function logical_converter(sch::Schema, schname::T) where {T<:AbstractVector{String}}
elem = sch.schema[sch.name_lookup[schname]]

if schname in keys(sch.map_logical_types)
_logical_type, converter = sch.map_logical_types[schname]
Expand All @@ -107,7 +122,7 @@
end
end

function logical_convert(sch::Schema, schname::T, val) where {T <: AbstractVector{String}}
function logical_convert(sch::Schema, schname::T, val) where {T<:AbstractVector{String}}

Check warning on line 125 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L125

Added line #L125 was not covered by tests
elem = sch.name_lookup[schname]

if schname in keys(sch.map_logical_types)
Expand All @@ -121,19 +136,20 @@
end
end

elemtype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.type_lookup, schname) do
elem = sch.name_lookup[schname]

if schname in keys(sch.map_logical_types)
logical_type, _converter = sch.map_logical_types[schname]
logical_type
elseif hasproperty(elem, :_type) && (elem._type in keys(sch.map_logical_types))
logical_type, _converter = sch.map_logical_types[elem._type]
logical_type
else
elemtype(elem)
elemtype(sch::Schema, schname::T) where {T<:AbstractVector{String}} =
get!(sch.type_lookup, schname) do
elem = sch.schema[sch.name_lookup[schname]]

if schname in keys(sch.map_logical_types)
logical_type, _converter = sch.map_logical_types[schname]
logical_type
elseif hasproperty(elem, :_type) && (elem._type in keys(sch.map_logical_types))
logical_type, _converter = sch.map_logical_types[elem._type]
logical_type

Check warning on line 148 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L147-L148

Added lines #L147 - L148 were not covered by tests
else
elemtype(elem)
end
end
end
function elemtype(schelem::SchemaElement)
jtype = Nothing

Expand All @@ -151,47 +167,55 @@
jtype
end

ntcolstype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
ntcolstype(sch, sch.name_lookup[schname])
end
function ntcolstype(sch::Schema, schelem::SchemaElement)
@assert num_children(schelem) > 0
idx = findfirst(x->x===schelem, sch.schema)
children_range = (idx+1):(idx+schelem.num_children)
names = [Symbol(x.name) for x in sch.schema[children_range]]
types = [(num_children(x) > 0) ? ntelemtype(sch, path_in_schema(sch, x)) : elemtype(sch, path_in_schema(sch, x)) for x in sch.schema[children_range]]
optionals = [isoptional(x) for x in sch.schema[children_range]]
types = [Vector{opt ? Union{t,Missing} : t} for (t,opt) in zip(types, optionals)]
ntcolstype(sch::Schema, schname::T) where {T<:AbstractVector{String}} =

Check warning on line 170 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L170

Added line #L170 was not covered by tests
get!(sch.nttype_lookup, schname) do
ntcolstype(sch, sch.name_lookup[schname])

Check warning on line 172 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L172

Added line #L172 was not covered by tests
end
function ntcolstype(sch::Schema, idx::Int)
child_indices = sch.graph[idx] # will error out in case of idx having no children
names = [Symbol(x.name) for x in sch.schema[child_indices]]
types = [(num_children(x) > 0) ? ntelemtype(sch, path_in_schema(sch, x)) : elemtype(sch, path_in_schema(sch, x)) for x in sch.schema[child_indices]]
optionals = [isoptional(x) for x in sch.schema[child_indices]]
types = [Vector{opt ? Union{t,Missing} : t} for (t, opt) in zip(types, optionals)]
NamedTuple{(names...,),Tuple{types...}}
end

ntelemtype(sch::Schema, schname::T) where {T <: AbstractVector{String}} = get!(sch.nttype_lookup, schname) do
ntelemtype(sch, sch.name_lookup[schname])
function ntcolstype(sch::Schema, schelem::SchemaElement)
idx = findfirst(x -> x === schelem, sch.schema)
return ntcolstype(sch, idx)
end
function ntelemtype(sch::Schema, schelem::SchemaElement)
@assert num_children(schelem) > 0
idx = findfirst(x->x===schelem, sch.schema)
children_range = (idx+1):(idx+schelem.num_children)

ntelemtype(sch::Schema, schname::T) where {T<:AbstractVector{String}} =
get!(sch.nttype_lookup, schname) do
ntelemtype(sch, sch.name_lookup[schname])
end
function ntelemtype(sch::Schema, idx::Int)
schelem = sch.schema[idx] # will error out in case of idx having no children
child_indices = sch.graph[idx]
names = [Symbol(x.name) for x in sch.schema[child_indices]]
repeated = hasproperty(schelem, :repetition_type) && (schelem.repetition_type == FieldRepetitionType.REPEATED)
names = [Symbol(x.name) for x in sch.schema[children_range]]
types = [(num_children(x) > 0) ? ntelemtype(sch, path_in_schema(sch, x)) : elemtype(sch, path_in_schema(sch, x)) for x in sch.schema[children_range]]
optionals = [isoptional(x) for x in sch.schema[children_range]]
types = [opt ? Union{t,Missing} : t for (t,opt) in zip(types, optionals)]
types = [(num_children(x) > 0) ? ntelemtype(sch, path_in_schema(sch, x)) : elemtype(sch, path_in_schema(sch, x)) for x in sch.schema[child_indices]]
optionals = [isoptional(x) for x in sch.schema[child_indices]]
types = [opt ? Union{t,Missing} : t for (t, opt) in zip(types, optionals)]
T = NamedTuple{(names...,),Tuple{types...}}
repeated ? Vector{T} : T
end
function ntelemtype(sch::Schema, schelem::SchemaElement)
idx = findfirst(x -> x === schelem, sch.schema)
return ntelemtype(sch, idx)
end

bit_or_byte_length(sch::Schema, schname::Vector{String}) = bit_or_byte_length(elem(sch, schname))
bit_or_byte_length(schelem::SchemaElement) = hasproperty(schelem, :type_length) ? schelem.type_length : 0

num_children(schelem::SchemaElement) = hasproperty(schelem, :num_children) ? schelem.num_children : 0

function max_repetition_level(sch::Schema, schname::T) where {T <: AbstractVector{String}}
function max_repetition_level(sch::Schema, schname::T) where {T<:AbstractVector{String}}
lev = isrepeated(sch, schname) ? 1 : 0
istoplevel(schname) ? lev : (lev + max_repetition_level(sch, parentname(schname)))
end
end

function max_definition_level(sch::Schema, schname::T) where {T <: AbstractVector{String}}
function max_definition_level(sch::Schema, schname::T) where {T<:AbstractVector{String}}
lev = isrequired(sch, schname) ? 0 : 1
istoplevel(schname) ? lev : (lev + max_definition_level(sch, parentname(schname)))
end
Expand All @@ -205,19 +229,19 @@
end

logical_decimal_unscaled_type(precision::Int32) = (precision < 5) ? UInt16 :
(precision < 10) ? UInt32 :
(precision < 19) ? UInt64 : UInt128
(precision < 10) ? UInt32 :
(precision < 19) ? UInt64 : UInt128

function map_logical_decimal(precision::Int32, scale::Int32; use_float::Bool=false)
T = logical_decimal_unscaled_type(precision)
if scale == 0
# integral values
return (signed(T), (bytes)->logical_decimal_integer(bytes, T))
return (signed(T), (bytes) -> logical_decimal_integer(bytes, T))
elseif use_float
# use Float64
return (Float64, (bytes)->logical_decimal_float64(bytes, T, scale))
return (Float64, (bytes) -> logical_decimal_float64(bytes, T, scale))

Check warning on line 242 in src/schema.jl

View check run for this annotation

Codecov / codecov/patch

src/schema.jl#L242

Added line #L242 was not covered by tests
else
# use Decimal
return (Decimal, (bytes)->logical_decimal_scaled(bytes, T, scale))
return (Decimal, (bytes) -> logical_decimal_scaled(bytes, T, scale))
end
end
end
8 changes: 5 additions & 3 deletions src/show.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,16 @@ function show(io::IO, schema::SchemaElement, indent::AbstractString="", nchildre
if hasproperty(schema, :num_children) && (getproperty(schema, :num_children) > 0)
push!(nchildren, schema.num_children)
print(io, " {")
elseif lchildren > 0
nchildren[lchildren] -= 1
if nchildren[lchildren] == 0
else
while !isempty(nchildren) && nchildren[end] == 1
pop!(nchildren)
println(io, "")
print_indent(io, length(nchildren))
print(io, indent, "}")
end
if !isempty(nchildren)
nchildren[end] -= 1
end
end

println(io, "")
Expand Down
Loading