Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more filters #154

Merged
merged 35 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f348c46
Document the Filter interface
asinghvi17 Aug 20, 2024
9d765b3
Move filters to a folder
asinghvi17 Aug 20, 2024
cb374ce
Factor out variable-length filters to a new file
asinghvi17 Aug 20, 2024
12931a2
Add docstrings to filter API functions
asinghvi17 Aug 20, 2024
7d7606a
Add a Fletcher32 filter and test
asinghvi17 Aug 20, 2024
6a34368
re-add the dictionary entries for the vlen filters
asinghvi17 Aug 20, 2024
fbf911e
Semi-working fixed scale offset filter
asinghvi17 Aug 20, 2024
5a0a172
Merge remote-tracking branch 'origin/master' into as/filters
asinghvi17 Aug 21, 2024
b960c37
Add FixedScaleOffset tests
asinghvi17 Aug 22, 2024
dcae156
Add shuffle filter (buggy in the last few bytes, indexing issues)
asinghvi17 Aug 22, 2024
7a5a5a0
WIP quantize filter
asinghvi17 Aug 22, 2024
e55b398
Merge remote-tracking branch 'origin/master' into as/filters
asinghvi17 Aug 23, 2024
231c0a1
ShuffleFilter working and tested
asinghvi17 Aug 23, 2024
ecdbeea
Semi working quantize filter
asinghvi17 Aug 23, 2024
5b8210f
Format tests better
asinghvi17 Aug 23, 2024
16306be
Complete interface and test quantize
asinghvi17 Aug 23, 2024
eec1b0d
Uncomment the FixedScaleOffset tests
asinghvi17 Aug 24, 2024
42995b2
fix getfilter syntax
asinghvi17 Aug 28, 2024
594ffdc
Add delta filter
asinghvi17 Aug 29, 2024
d7ce424
Adapt for Kerchunk playing fast and loose with the spec
asinghvi17 Sep 4, 2024
7518c43
Fix the delta and quantize JSON.lower
asinghvi17 Oct 9, 2024
a3c7710
Change the tests to be more sensible/Julian and avoid truncation errors
asinghvi17 Oct 9, 2024
c233e42
Fix the FixedScaleOffset filter materializer
asinghvi17 Oct 9, 2024
c211b6f
Fix decoding for fill values to use `reinterpret` on unsigned -> integer
asinghvi17 Oct 9, 2024
086b3b8
If `getfilter` fails, show the filter name and then throw an error
asinghvi17 Oct 9, 2024
ffdc629
Apply reinterpret before multiplication in fixed-scale-offset filter
asinghvi17 Oct 21, 2024
24a68e6
Only reinterpret negative integers when decoding fill values to unsigned
asinghvi17 Oct 21, 2024
85c1189
Revert "Only reinterpret negative integers when decoding fill values …
asinghvi17 Oct 21, 2024
58d1f89
Merge remote-tracking branch 'origin/master' into as/filters
asinghvi17 Oct 21, 2024
9cd47d0
Merge remote-tracking branch 'origin/master' into as/filters
asinghvi17 Nov 22, 2024
3fca4eb
let Fletcher32 operate on n-dimensional arrays
asinghvi17 Nov 22, 2024
fdb5def
fix FixedScaleOffset in many ways
asinghvi17 Nov 22, 2024
cf60242
add filter tests in Python
asinghvi17 Nov 22, 2024
1fe11f6
Fix filter astype, id to conform to Python names
asinghvi17 Nov 22, 2024
4ca87a6
remove encoding validity check for quantize - it's pointless
asinghvi17 Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Compressors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ function zcompress!(compressed, data, c, f)
end

function zuncompress!(data, compressed, c, f)
data2 = zuncompress(compressed, c, desttype(last(f)))
data2 = zuncompress(compressed, c, desttype(last(f)))
a2 = foldr(f, init = data2) do fnow, anow
zdecode(anow, fnow)
end
copyto!(data, a2)
copyto!(data, _reinterpret(Base.nonmissingtype(eltype(data)), a2))
end


Expand Down
95 changes: 95 additions & 0 deletions src/Filters/Filters.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import JSON

"""
abstract type Filter{T,TENC}

The supertype for all Zarr filters.

## Interface

All subtypes MUST implement the following methods:

- [`zencode(ain, filter::Filter)`](@ref zencode): Encodes data `ain` using the filter, and returns a vector of bytes.
- [`zdecode(ain, filter::Filter)`](@ref zdecode): Decodes data `ain`, a vector of bytes, using the filter, and returns the original data.
- [`JSON.lower`](@ref): Returns a JSON-serializable dictionary representing the filter, according to the Zarr specification.
- [`getfilter(::Type{<: Filter}, filterdict)`](@ref getfilter): Returns the filter type read from a given filter dictionary.

If the filter has type parameters, it MUST also implement:
- [`sourcetype(::Filter)::T`](@ref sourcetype): equivalent to `dtype` in the Python Zarr implementation.
- [`desttype(::Filter)::T`](@ref desttype): equivalent to `atype` in the Python Zarr implementation.

Finally, an entry MUST be added to the `filterdict` dictionary for each filter type.
This must also follow the Zarr specification's name for that filter. The name of the filter
is the key, and the value is the filter type (e.g. `VLenUInt8Filter` or `Fletcher32Filter`).


Subtypes include: [`VLenArrayFilter`](@ref), [`VLenUTF8Filter`](@ref), [`Fletcher32Filter`](@ref).
"""
abstract type Filter{T,TENC} end

"""
zencode(ain, filter::Filter)

Encodes data `ain` using the filter, and returns a vector of bytes.
"""
function zencode end

"""
zdecode(ain, filter::Filter)

Decodes data `ain`, a vector of bytes, using the filter, and returns the original data.
"""
function zdecode end

"""
getfilter(::Type{<: Filter}, filterdict)

Returns the filter type read from a given specification dictionary, which must follow the Zarr specification.
"""
function getfilter end

"""
sourcetype(::Filter)::T

Returns the source type of the filter.
"""
function sourcetype end

"""
desttype(::Filter)::T

Returns the destination type of the filter.
"""
function desttype end

filterdict = Dict{String,Type{<:Filter}}()

function getfilters(d::Dict)
if !haskey(d,"filters")
return nothing
else
if d["filters"] === nothing || isempty(d["filters"])
return nothing
end
f = map(d["filters"]) do f
try
getfilter(filterdict[f["id"]], f)
catch e
@show f
rethrow(e)
end
end
return (f...,)
end
end
sourcetype(::Filter{T}) where T = T
desttype(::Filter{<:Any,T}) where T = T

zencode(ain,::Nothing) = ain

include("vlenfilters.jl")
include("fletcher32.jl")
include("fixedscaleoffset.jl")
include("shuffle.jl")
include("quantize.jl")
include("delta.jl")
45 changes: 45 additions & 0 deletions src/Filters/delta.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#=
# Delta compression


=#

"""
DeltaFilter(; DecodingType, [EncodingType = DecodingType])

Delta-based compression for Zarr arrays. (Delta encoding is Julia `diff`, decoding is Julia `cumsum`).
"""
struct DeltaFilter{T, TENC} <: Filter{T, TENC}
end

function DeltaFilter(; DecodingType = Float16, EncodingType = DecodingType)
return DeltaFilter{DecodingType, EncodingType}()
end

DeltaFilter{T}() where T = DeltaFilter{T, T}()

function zencode(data::AbstractArray, filter::DeltaFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
arr = reinterpret(DecodingType, vec(data))

enc = similar(arr, EncodingType)
# perform the delta operation
enc[begin] = arr[begin]
enc[begin+1:end] .= diff(arr)
return enc
end

function zdecode(data::AbstractArray, filter::DeltaFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
encoded = reinterpret(EncodingType, vec(data))
decoded = DecodingType.(cumsum(encoded))
return decoded
end

function JSON.lower(filter::DeltaFilter{T, Tenc}) where {T, Tenc}
return Dict("id" => "delta", "dtype" => typestr(T), "atype" => typestr(Tenc))
end

function getfilter(::Type{<: DeltaFilter}, d)
return DeltaFilter{typestr(d["dtype"], haskey(d, "atype") ? typestr(d["atype"]) : d["dtype"])}()
end

filterdict["delta"] = DeltaFilter
50 changes: 50 additions & 0 deletions src/Filters/fixedscaleoffset.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

"""
FixedScaleOffsetFilter{T,TENC}(scale, offset)

A compressor that scales and offsets the data.

!!! note
The geographic CF standards define scale/offset decoding as `x * scale + offset`,
but this filter defines it as `x / scale + offset`. Constructing a `FixedScaleOffsetFilter`
from CF data means `FixedScaleOffsetFilter(1/cf_scale_factor, cf_add_offset)`.
"""
struct FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc} <: Filter{T, Tenc}
scale::ScaleOffsetType
offset::ScaleOffsetType
end

FixedScaleOffsetFilter{T}(scale::ScaleOffsetType, offset::ScaleOffsetType) where {T, ScaleOffsetType} = FixedScaleOffsetFilter{T, ScaleOffsetType}(scale, offset)
FixedScaleOffsetFilter(scale::ScaleOffsetType, offset::ScaleOffsetType) where {ScaleOffsetType} = FixedScaleOffsetFilter{ScaleOffsetType, ScaleOffsetType}(scale, offset)

function FixedScaleOffsetFilter(; scale::ScaleOffsetType, offset::ScaleOffsetType, T, Tenc = T) where ScaleOffsetType
return FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}(scale, offset)
end

function zencode(a::AbstractArray, c::FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}) where {T, Tenc, ScaleOffsetType}
return @. convert(Tenc, # convert to the encoding type after applying the scale and offset
round((a - c.offset) * c.scale) # apply scale and offset, and round to nearest integer
)
end

function zdecode(a::AbstractArray, c::FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}) where {T, Tenc, ScaleOffsetType}
return _reinterpret(Base.nonmissingtype(T), @. a / c.scale + c.offset)
end


function getfilter(::Type{<: FixedScaleOffsetFilter}, d::Dict)
scale = d["scale"]
offset = d["offset"]
# Types must be converted from strings to the actual Julia types they represent.
string_T = d["dtype"]
string_Tenc = get(d, "atype", string_T)
T = typestr(string_T)
Tenc = typestr(string_Tenc)
return FixedScaleOffsetFilter{Tenc, T, Tenc}(scale, offset)
end

function JSON.lower(c::FixedScaleOffsetFilter{ScaleOffsetType, T, Tenc}) where {ScaleOffsetType, T, Tenc}
return Dict("id" => "fixedscaleoffset", "scale" => c.scale, "offset" => c.offset, "dtype" => typestr(T), "atype" => typestr(Tenc))
end

filterdict["fixedscaleoffset"] = FixedScaleOffsetFilter
85 changes: 85 additions & 0 deletions src/Filters/fletcher32.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#=
# Fletcher32 filter

This "filter" basically injects a 4-byte checksum at the end of the data, to ensure data integrity.

The implementation is based on the [numcodecs implementation here](https://github.com/zarr-developers/numcodecs/blob/79d1a8d4f9c89d3513836aba0758e0d2a2a1cfaf/numcodecs/fletcher32.pyx)
and the [original C implementation for NetCDF](https://github.com/Unidata/netcdf-c/blob/main/plugins/H5checksum.c#L109) linked therein.

=#

"""
Fletcher32Filter()

A compressor that uses the Fletcher32 checksum algorithm to compress and uncompress data.

Note that this goes from UInt8 to UInt8, and is effectively only checking
the checksum and cropping the last 4 bytes of the data during decoding.
"""
struct Fletcher32Filter <: Filter{UInt8, UInt8}
end

getfilter(::Type{<: Fletcher32Filter}, d::Dict) = Fletcher32Filter()
JSON.lower(::Fletcher32Filter) = Dict("id" => "fletcher32")
filterdict["fletcher32"] = Fletcher32Filter

function _checksum_fletcher32(data::AbstractVector{UInt8})
len = length(data) / 2 # length in 16-bit words
sum1::UInt32 = 0
sum2::UInt32 = 0
data_idx = 1

#=
Compute the checksum for pairs of bytes.
The magic `360` value is the largest number of sums that can be performed without overflow in UInt32.
=#
while len > 0
tlen = len > 360 ? 360 : len
len -= tlen
while tlen > 0
sum1 += begin # create a 16 bit word from two bytes, the first one shifted to the end of the word
(UInt16(data[data_idx]) << 8) | UInt16(data[data_idx + 1])
end
sum2 += sum1
data_idx += 2
tlen -= 1
if tlen < 1
break
end
end
sum1 = (sum1 & 0xffff) + (sum1 >> 16)
sum2 = (sum2 & 0xffff) + (sum2 >> 16)
end

# if the length of the data is odd, add the first byte to the checksum again (?!)
if length(data) % 2 == 1
sum1 += UInt16(data[1]) << 8
sum2 += sum1
sum1 = (sum1 & 0xffff) + (sum1 >> 16)
sum2 = (sum2 & 0xffff) + (sum2 >> 16)
end
return (sum2 << 16) | sum1
end

function zencode(data, ::Fletcher32Filter)
bytes = reinterpret(UInt8, data)
checksum = _checksum_fletcher32(bytes)
result = copy(bytes)
append!(result, reinterpret(UInt8, [checksum])) # TODO: decompose this without the extra allocation of wrapping in Array
return result
end

function zdecode(data, ::Fletcher32Filter)
bytes = reinterpret(UInt8, data)
checksum = _checksum_fletcher32(view(bytes, 1:length(bytes) - 4))
stored_checksum = only(reinterpret(UInt32, view(bytes, (length(bytes) - 3):length(bytes))))
if checksum != stored_checksum
throw(ErrorException("""
Checksum mismatch in Fletcher32 decoding.

The computed value is $(checksum) and the stored value is $(stored_checksum).
This might be a sign that the data is corrupted.
""")) # TODO: make this a custom error type
end
return view(bytes, 1:length(bytes) - 4)
end
56 changes: 56 additions & 0 deletions src/Filters/quantize.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#=
# Quantize compression


=#

"""
QuantizeFilter(; digits, DecodingType, [EncodingType = DecodingType])

Quantization based compression for Zarr arrays.
"""
struct QuantizeFilter{T, TENC} <: Filter{T, TENC}
digits::Int32
end

function QuantizeFilter(; digits = 10, T = Float16, Tenc = T)
return QuantizeFilter{T, Tenc}(digits)
end

QuantizeFilter{T, Tenc}(; digits = 10) where {T, Tenc} = QuantizeFilter{T, Tenc}(digits)
QuantizeFilter{T}(; digits = 10) where T = QuantizeFilter{T, T}(digits)

function zencode(data::AbstractArray, filter::QuantizeFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
arr = reinterpret(DecodingType, vec(data))

precision = 10.0^(-filter.digits)

_exponent = log(10, precision) # log 10 in base `precision`
exponent = _exponent < 0 ? floor(Int, _exponent) : ceil(Int, _exponent)

bits = ceil(log(2, 10.0^(-exponent)))
scale = 2.0^bits

enc = @. round(scale * arr) / scale

if EncodingType == DecodingType
return enc
else
return reinterpret(EncodingType, enc)
end
end

# Decoding is a no-op; quantization is a lossy filter but data is encoded directly.
function zdecode(data::AbstractArray, filter::QuantizeFilter{DecodingType, EncodingType}) where {DecodingType, EncodingType}
return data
end

function JSON.lower(filter::QuantizeFilter{T, Tenc}) where {T, Tenc}
return Dict("type" => "quantize", "digits" => filter.digits, "dtype" => typestr(T), "atype" => typestr(Tenc))
end

function getfilter(::Type{<: QuantizeFilter}, d)
return QuantizeFilter{typestr(d["dtype"], typestr(d["atype"]))}(; digits = d["digits"])
end

filterdict["quantize"] = QuantizeFilter
Loading
Loading