Skip to content

Commit

Permalink
introduce threadsafe rasterize so last and some custom funcs can be f…
Browse files Browse the repository at this point in the history
…ast (#575)

* introduce threadsafe so last and some custom funcs can be fast

* better rasterize eltype

* fix maybe missing

* f
  • Loading branch information
rafaqz authored Dec 10, 2023
1 parent f5f3cfd commit e69356b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 51 deletions.
136 changes: 86 additions & 50 deletions src/methods/rasterize.jl
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
struct _TakeFirst{MV}
missingval::MV
end
(tf::_TakeFirst)(a, b) = a === tf.missingval ? b : a
_take_last(a, b) = b

_reduce_op(::typeof(sum)) = Base.add_sum
_reduce_op(::typeof(prod)) = Base.mul_prod
_reduce_op(::typeof(minimum)) = min
_reduce_op(::typeof(maximum)) = max
_reduce_op(::typeof(last)) = _take_last
_reduce_op(f, missingval) = _reduce_op(f)
_reduce_op(::typeof(first), missingval) = _TakeFirst(missingval)
_reduce_op(x) = nothing

_reduce_init(reducer, st::AbstractRasterStack) = map(A -> _reduce_init(reducer, A), st)
_reduce_init(reducer, ::AbstractRaster{T}) where T = _reduce_init(reducer, T)
_reduce_init(reducer, nt::NamedTuple) = map(x -> _reduce_init(reducer, x), nt)
_reduce_init(f, x) = _reduce_init(f, typeof(x))

_reduce_init(::Nothing, x::Type{T}) where T = zero(T)
_reduce_init(f::Function, ::Type{T}) where T = zero(f((zero(nonmissingtype(T)), zero(nonmissingtype(T)))))
_reduce_init(::typeof(sum), ::Type{T}) where T = zero(nonmissingtype(T))
_reduce_init(::typeof(prod), ::Type{T}) where T = oneunit(nonmissingtype(T))
_reduce_init(::typeof(minimum), ::Type{T}) where T = typemax(nonmissingtype(T))
_reduce_init(::typeof(maximum), ::Type{T}) where T = typemin(nonmissingtype(T))
_is_op_threadsafe(::typeof(sum)) = true
_is_op_threadsafe(::typeof(prod)) = true
_is_op_threadsafe(::typeof(minimum)) = true
_is_op_threadsafe(::typeof(maximum)) = true
_is_op_threadsafe(f) = false

_reduce_init(reducer, st::AbstractRasterStack, missingval) = map(A -> _reduce_init(reducer, A, missingval), st)
_reduce_init(reducer, ::AbstractRaster{T}, missingval) where T = _reduce_init(reducer, T, missingval)
_reduce_init(reducer, nt::NamedTuple, missingval) = map(x -> _reduce_init(reducer, x, missingval), nt)
_reduce_init(f, x, missingval) = _reduce_init(f, typeof(x), missingval)

_reduce_init(::Nothing, x::Type{T}, missingval) where T = zero(T)
_reduce_init(f::Function, ::Type{T}, missingval) where T = zero(f((zero(nonmissingtype(T)), zero(nonmissingtype(T)))))
_reduce_init(::typeof(sum), ::Type{T}, missingval) where T = zero(nonmissingtype(T))
_reduce_init(::typeof(prod), ::Type{T}, missingval) where T = oneunit(nonmissingtype(T))
_reduce_init(::typeof(minimum), ::Type{T}, missingval) where T = typemax(nonmissingtype(T))
_reduce_init(::typeof(maximum), ::Type{T}, missingval) where T = typemin(nonmissingtype(T))
_reduce_init(::typeof(last), ::Type{T}, missingval) where T = _maybe_nothing_to_missing(missingval)

struct FillChooser{F,I,M}
fill::F
Expand Down Expand Up @@ -81,6 +95,7 @@ struct Rasterizer{T,G,F,R,O,I,M}
verbose::Bool
progress::Bool
threaded::Bool
threadsafe_op::Bool
end
function Rasterizer(geom, fill, fillitr;
reducer=nothing,
Expand All @@ -94,6 +109,7 @@ function Rasterizer(geom, fill, fillitr;
verbose=true,
progress=true,
threaded=false,
threadsafe=nothing,
kw...
)
# A single geometry does not need a reducing function
Expand All @@ -103,6 +119,8 @@ function Rasterizer(geom, fill, fillitr;

op = _reduce_op(reducer)

threadsafe_op = isnothing(threadsafe) ? _is_op_threadsafe(op) : threadsafe

shape = if isnothing(shape)
if GI.isgeometry(geom)
_geom_shape(geom)
Expand All @@ -113,34 +131,17 @@ function Rasterizer(geom, fill, fillitr;
shape
end

filleltype = if fillitr isa NamedTuple
if all(map(x -> x isa Number, fillitr))
map(typeof, fillitr)
elseif all(map(x -> Base.IteratorEltype(x) isa Base.HasEltype, fillitr))
map(Base.eltype, fillitr)
else
map(typeof first, fillitr) # This is not really correct.
end
elseif fill isa Function
isnothing(init) ? typeof(missingval) : typeof(fill(init))
elseif Base.IteratorEltype(fillitr) isa Base.HasEltype
Base.eltype(fillitr)
else
typeof(first(fillitr)) # This is not really correct
end

stable_reductions = (first, last, sum, prod, maximum, minimum)
init = isnothing(init) ? _reduce_init(reducer, filleltype) : init
if shape == :points &&
!GI.isgeometry(geom) &&
!GI.trait(first(geom)) isa GI.PointTrait &&
!(reducer in stable_reductions)
@warn "currently `:points` rasterization of multiple non-`PointTrait` geometries may be innaccurate for `reducer` methods besides $stable_reductions. Make a Rasters.jl github issue if you need this to work"
end
eltype, missingval = get_eltype_missingval(eltype, missingval, fillitr, init, filename, op, reducer)
eltype, missingval, init = get_eltype_missingval(eltype, missingval, fill, fillitr, init, filename, op, reducer)
lock = threaded ? Threads.SpinLock() : nothing

return Rasterizer(eltype, geom, fillitr, reducer, op, init, missingval, lock, shape, boundary, verbose, progress, threaded)
return Rasterizer(eltype, geom, fillitr, reducer, op, init, missingval, lock, shape, boundary, verbose, progress, threaded, threadsafe_op)
end
function Rasterizer(data::T; fill, geomcolumn=nothing, kw...) where T
if Tables.istable(T) # typeof so we dont check iterable table fallback in Tables.jl
Expand Down Expand Up @@ -193,35 +194,55 @@ function Rasterizer(::GI.AbstractGeometryTrait, geom; fill, kw...)
Rasterizer(geom, fill, _iterable_fill(geom, fill); kw...)
end

function get_eltype_missingval(eltype, missingval, fillitr, init::NamedTuple, filename, op, reducer)
function get_eltype_missingval(eltype, missingval, fill, fillitr, init, filename, op, reducer)
filleltype = if fillitr isa NamedTuple
if all(map(x -> x isa Number, fillitr))
map(typeof, fillitr)
elseif all(map(x -> Base.IteratorEltype(x) isa Base.HasEltype, fillitr))
map(Base.eltype, fillitr)
else
map(typeof first, fillitr) # This is not really correct.
end
elseif fill isa Function
isnothing(init) ? typeof(missingval) : typeof(fill(init))
elseif Base.IteratorEltype(fillitr) isa Base.HasEltype
Base.eltype(fillitr)
else
typeof(first(fillitr)) # This is not really correct
end
init = isnothing(init) ? _reduce_init(reducer, filleltype, missingval) : init
_get_eltype_missingval(eltype, missingval, filleltype, fillitr, init, filename, op, reducer)
end
function _get_eltype_missingval(eltype, missingval, filleltype, fillitr, init::NamedTuple, filename, op, reducer)
eltype = eltype isa NamedTuple ? eltype : map(_ -> eltype, init)
missingval = missingval isa NamedTuple ? missingval : map(_ -> missingval, init)
em = map(eltype, missingval, fillitr, init) do et, mv, fi, i
get_eltype_missingval(et, mv, fi, i, filename, op, reducer)
filleltype = filleltype isa NamedTuple ? filleltype : map(_ -> filleltype, init)
em = map(eltype, missingval, filleltype, fillitr, init) do et, mv, fe, fi, i
_get_eltype_missingval(et, mv, fe, fi, i, filename, op, reducer)
end
eltype = map(first, em)
missingval = map(last, em)
return eltype, missingval
missingval = map(x -> x[2], em)
return eltype, missingval, init
end
function get_eltype_missingval(known_eltype, missingval, fillitr, init, filename, op, reducer)
function _get_eltype_missingval(known_eltype, missingval, filleltype, fillitr, init, filename, op, reducer)
fillzero = zero(filleltype)
eltype = if isnothing(known_eltype)
if fillitr isa Function
typeof(fillitr(init))
promote_type(typeof(fillitr(init)), typeof(fillitr(fillzero)))
elseif op isa Function
typeof(op(init, init))
promote_type(typeof(op(init, init)), typeof(op(init, fillzero)), typeof(op(fillzero, fillzero)))
elseif reducer isa Function
typeof(reducer((init, init)))
promote_type(typeof(reducer((init, init))), typeof(reducer((init, fillzero))), typeof(reducer((fillzero, fillzero))))
else
typeof(init)
promote_type(filleltype, typeof(init))
end
else
known_eltype
end

missingval = isnothing(missingval) ? _writeable_missing(filename, eltype) : missingval
# eltype was not the actually array eltype, so promote it with the missingval
eltype = isnothing(known_eltype) ? promote_type(typeof(missingval), eltype) : eltype
return eltype, missingval
return eltype, missingval, init
end

_fill_key_error(names, fill) = throw(ArgumentError("fill key $fill not found in table, use one of: $(Tuple(names))"))
Expand Down Expand Up @@ -324,6 +345,12 @@ const RASTERIZE_KEYWORDS = """
- `verbose`: print information and warnings whne there are problems with the rasterisation.
`true` by default.
$THREADED_KEYWORD
- `threadsafe`: specify that custom `reducer` and/or `op` functions are thread-safe,
in that the order of operation or blocking does not matter. For example,
`sum` and `maximum` are thread-safe, because the answer is approximately (besides
floating point error) the same after running on nested blocks, or on all the data.
In contrast, `median` or `last` are not, because the blocking (`median`) or order (`last`)
matters.
"""

const RASTERIZE_ARGUMENTS = """
Expand Down Expand Up @@ -358,6 +385,12 @@ $RASTERIZE_KEYWORDS
$FILENAME_KEYWORD
$SUFFIX_KEYWORD
Note on threading. Performance may be much better with `threaded=false`
if `reducer`/`op` are not `threadsafe`. `sum`, `prod`, `maximum`, `minimum`
`count` and `mean` (by combining `sum` and `count`) are threadsafe. If you know
your algorithm is threadsafe, use `threadsafe=true` to allow all optimisations.
Functions passed to `fill` are always threadsafe, and ignore the `threadsafe` argument.
# Example
Rasterize a shapefile for China and plot, with a border.
Expand Down Expand Up @@ -408,14 +441,14 @@ function rasterize(reducer::typeof(count), data; fill=nothing, init=nothing, kw.
isnothing(fill) || _count_fill_info(fill)
rasterize(data; kw..., name=:count, init=0, reducer=nothing, fill=_count_fill, missingval=0)
end
# `mean` is sum / count
# We can do better than this, but its easy for now
# `mean` is sum ./ count. This is actually optimal with threading,
# as its means order is irrelivent so its threadsafe.
function rasterize(reducer::typeof(DD.Statistics.mean), data; fill, kw...)
sums = rasterize(sum, data; kw..., fill)
counts = rasterize(count, data; kw..., fill=nothing)
rebuild(sums ./ counts; name=:mean)
end
function rasterize(data; to=nothing, fill, threaded=true, kw...)
function rasterize(data; to=nothing, fill, threaded=false, kw...)
r = Rasterizer(data; fill, threaded, kw...)
rc = RasterCreator(to, data; kw..., eltype=r.eltype, fill, missingval=r.missingval)
allocs = r.shape == :points ? nothing : _burning_allocs(rc.to; threaded)
Expand Down Expand Up @@ -541,7 +574,7 @@ function rasterize!(reducer::typeof(count), x::RasterStackOrArray, data; fill=no
isnothing(init) || @info _count_init_info(init)
rasterize!(x::RasterStackOrArray, data; kw..., reducer=nothing, op=nothing, fill=_count_fill, init=0)
end
function rasterize!(x::RasterStackOrArray, data; threaded=true, kw...)
function rasterize!(x::RasterStackOrArray, data; threaded=false, kw...)
if prod(size(x)) == 0
@warn "Destination is empty, rasterization skipped"
return x
Expand Down Expand Up @@ -583,8 +616,7 @@ function _rasterize!(A, ::GI.AbstractGeometryTrait, geom, fill, r::Rasterizer; a
end
# Fill points
function _rasterize!(A, trait::GI.AbstractPointTrait, point, fill, r::Rasterizer; allocs=nothing)
# Avoid race conditions whern Point is in a mixed set of Geometries
return _fill_point!(A, trait, point; fill, r.lock)
return _fill_point!(A, trait, point; fill, lock=r.lock)
end
function _rasterize!(A, trait::Nothing, geoms, fill, r::Rasterizer; allocs=nothing)
if r.shape === :point
Expand All @@ -598,10 +630,14 @@ end

# We rasterize all iterables from here
function _rasterize_iterable!(A, geoms, reducer, op, fillitr, r::Rasterizer, allocs)
# reduce
if isnothing(op) && !(fillitr isa Function)
# Rasterise into a bitarray and reduce it, on one or many threads.
# Memory-intensive for large workloads, but thread-safe and safe for e.g. `median`
if !(fillitr isa Function) && ((r.threaded && !r.threadsafe_op) || isnothing(op))
(r.threaded && !r.threadsafe_op) && r.verbose && @warn "if `op` is not threadsafe, `threaded=true` may be slower than `threaded=false`"
return _reduce_bitarray!(reducer, A, geoms, fillitr, r, allocs)
end
# Reduce by rasterizing directly on one or many threads,, with a lock
# TODO: use separate arrays and combine when `r.threadsafe_op == true` ?
range = _geomindices(geoms)
burnchecks = _alloc_burnchecks(range)
_run(range, r.threaded, r.progress, "Rasterizing...") do i
Expand Down
5 changes: 4 additions & 1 deletion src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ end
_unwrap(::Val{X}) where X = X
_unwrap(x) = x

_missingval_or_missing(x) = missingval(x) isa Nothing ? missing : missingval(x)
_missingval_or_missing(x) = _maybe_nothing_to_missing(missingval(x))

_maybe_nothing_to_missing(::Nothing) = missing
_maybe_nothing_to_missing(missingval) = missingval

maybe_eps(dims::DimTuple) = map(maybe_eps, dims)
maybe_eps(dim::Dimension) = maybe_eps(eltype(dim))
Expand Down
7 changes: 7 additions & 0 deletions test/rasterize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ st = RasterStack((A1, copy(A1)))
geom = pointvec
geom = line_collection
geom = poly_collection
threaded = true

for A in (A1, A2),
geom in (pointvec, pointfc, multi_point, linestring, multi_linestring, linearring, polygon, multi_polygon, table, line_collection, poly_collection),
Expand All @@ -57,6 +58,8 @@ st = RasterStack((A1, copy(A1)))
rasterize!(sum, A, geom; shape=:point, fill=1, threaded);
@test sum(A) == 5.0
@test sum(rasterize(sum, geom; to=A, shape=:point, fill=1, missingval=0, threaded)) == 5.0
@test sum(rasterize(xs -> sum(xs), geom; to=A, shape=:point, fill=1, missingval=0, threaded)) == 5.0
@test sum(rasterize(xs -> sum(xs), geom; to=A, shape=:point, fill=1, missingval=0, threaded, threadsafe=true)) == 5.0
rasterize!(last, A, geom; shape=:point, fill=1, threaded);
@test sum(A) == 4.0
@test sum(rasterize(last, geom; to=A, shape=:point, fill=1, missingval=0, threaded)) == 4.0
Expand Down Expand Up @@ -124,6 +127,8 @@ end
rasterize!(sum, A, geom; shape=:line, fill=1, threaded)
@test sum(A) == 20 + 20 + 20 + 20
@test sum(rasterize(sum, geom; to=A, shape=:line, fill=1, missingval=0, threaded)) == 80
@test sum(rasterize(xs -> sum(xs), geom; to=A, shape=:line, fill=1, missingval=0, threaded, threadsafe=true)) == 80
@test sum(rasterize(xs -> sum(xs), geom; to=A, shape=:line, fill=1, missingval=0, threaded, threadsafe=false)) == 80
end
@testset ":line is detected for line geometries" begin
for A in (A1, A2), geom in (linestring, multi_linestring), threaded in (true, false)
Expand All @@ -139,6 +144,7 @@ end
A = A1
poly = polygon
poly = poly_collection
threaded = false
for A in (A1, A2), poly in (polygon, multi_polygon, poly_collection), threaded in (true, false)
A .= 0
ra = rasterize(last, poly; to=A, missingval=0, shape=:polygon, fill=1, boundary=:center, threaded)
Expand Down Expand Up @@ -195,6 +201,7 @@ end
end
@testset "Single value fill makes an array (ignoring table vals)" begin
ra = rasterize(sum, data; to=A, fill=0x03, missingval=0x00)
@test ra isa Raster
@test eltype(ra) == UInt64
@test sum(ra) === 0x000000000000000f
end
Expand Down

0 comments on commit e69356b

Please sign in to comment.