Skip to content

Commit

Permalink
introduce threadsafe so last and some custom funcs can be fast
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaqz committed Dec 4, 2023
1 parent ea20181 commit 58f1e19
Showing 1 changed file with 50 additions and 21 deletions.
71 changes: 50 additions & 21 deletions src/methods/rasterize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,31 @@ _reduce_op(::typeof(sum)) = Base.add_sum
_reduce_op(::typeof(prod)) = Base.mul_prod
_reduce_op(::typeof(minimum)) = min
_reduce_op(::typeof(maximum)) = max
_reduce_op(::typeof(last)) = (a, b) -> b
# _reduce_op(::typeof(first)) = (a, b) -> a # Note: `first` must also be special-cased on the first call
_reduce_op(x) = nothing

_reduce_init(reducer, st::AbstractRasterStack) = map(A -> _reduce_init(reducer, A), st)
_reduce_init(reducer, ::AbstractRaster{T}) where T = _reduce_init(reducer, T)
_reduce_init(reducer, nt::NamedTuple) = map(x -> _reduce_init(reducer, x), nt)
_reduce_init(f, x) = _reduce_init(f, typeof(x))

_reduce_init(::Nothing, x::Type{T}) where T = zero(T)
_reduce_init(f::Function, ::Type{T}) where T = zero(f((zero(nonmissingtype(T)), zero(nonmissingtype(T)))))
_reduce_init(::typeof(sum), ::Type{T}) where T = zero(nonmissingtype(T))
_reduce_init(::typeof(prod), ::Type{T}) where T = oneunit(nonmissingtype(T))
_reduce_init(::typeof(minimum), ::Type{T}) where T = typemax(nonmissingtype(T))
_reduce_init(::typeof(maximum), ::Type{T}) where T = typemin(nonmissingtype(T))
_is_op_threadsafe(::typeof(sum)) = true
_is_op_threadsafe(::typeof(prod)) = true
_is_op_threadsafe(::typeof(minimum)) = true
_is_op_threadsafe(::typeof(maximum)) = true
_is_op_threadsafe(::typeof(last)) = false
# _is_op_threadsafe(::typeof(first)) = false
_is_op_threadsafe(f) = false

_reduce_init(reducer, st::AbstractRasterStack, missingval) = map(A -> _reduce_init(reducer, A, missingval), st)
_reduce_init(reducer, ::AbstractRaster{T}, missingval) where T = _reduce_init(reducer, T, missingval)
_reduce_init(reducer, nt::NamedTuple, missingval) = map(x -> _reduce_init(reducer, x, missingval), nt)
_reduce_init(f, x, missingval) = _reduce_init(f, typeof(x), missingval)

_reduce_init(::Nothing, x::Type{T}, missingval) where T = zero(T)
_reduce_init(f::Function, ::Type{T}, missingval) where T = zero(f((zero(nonmissingtype(T)), zero(nonmissingtype(T)))))
_reduce_init(::typeof(sum), ::Type{T}, missingval) where T = zero(nonmissingtype(T))
_reduce_init(::typeof(prod), ::Type{T}, missingval) where T = oneunit(nonmissingtype(T))
_reduce_init(::typeof(minimum), ::Type{T}, missingval) where T = typemax(nonmissingtype(T))
_reduce_init(::typeof(maximum), ::Type{T}, missingval) where T = typemin(nonmissingtype(T))
_reduce_init(::typeof(last), ::Type{T}, missingval) where T = missingval
# _reduce_init(::typeof(first), ::Type{T}, missingval) where T = missingval

struct FillChooser{F,I,M}
fill::F
Expand Down Expand Up @@ -81,6 +93,7 @@ struct Rasterizer{T,G,F,R,O,I,M}
verbose::Bool
progress::Bool
threaded::Bool
threadsafe_op::Bool
end
function Rasterizer(geom, fill, fillitr;
reducer=nothing,
Expand All @@ -103,6 +116,8 @@ function Rasterizer(geom, fill, fillitr;

op = _reduce_op(reducer)

threadsafe_op = _is_op_threadsafe(op)

shape = if isnothing(shape)
if GI.isgeometry(geom)
_geom_shape(geom)
Expand Down Expand Up @@ -130,7 +145,7 @@ function Rasterizer(geom, fill, fillitr;
end

stable_reductions = (first, last, sum, prod, maximum, minimum)
init = isnothing(init) ? _reduce_init(reducer, filleltype) : init
init = isnothing(init) ? _reduce_init(reducer, filleltype, missingval) : init
if shape == :points &&
!GI.isgeometry(geom) &&
!GI.trait(first(geom)) isa GI.PointTrait &&
Expand All @@ -140,7 +155,7 @@ function Rasterizer(geom, fill, fillitr;
eltype, missingval = get_eltype_missingval(eltype, missingval, fillitr, init, filename, op, reducer)
lock = threaded ? Threads.SpinLock() : nothing

return Rasterizer(eltype, geom, fillitr, reducer, op, init, missingval, lock, shape, boundary, verbose, progress, threaded)
return Rasterizer(eltype, geom, fillitr, reducer, op, init, missingval, lock, shape, boundary, verbose, progress, threaded, threadsafe_op)
end
function Rasterizer(data::T; fill, geomcolumn=nothing, kw...) where T
if Tables.istable(T) # typeof so we dont check iterable table fallback in Tables.jl
Expand Down Expand Up @@ -324,6 +339,12 @@ const RASTERIZE_KEYWORDS = """
- `verbose`: print information and warnings whne there are problems with the rasterisation.
`true` by default.
$THREADED_KEYWORD
- `threadsafe`: specify that custom `reducer` and/or `op` functions are thread-safe,
in that the order of operation or blocking does not matter. For example,
`sum` and `maximum` are thread-safe, because the answer is approximately (besides
floating point error) the same after running on nested blocks, or on all the data.
In contrast, `median` or `last` are not, because the blocking (`median`) or order (`last`)
matters.
"""

const RASTERIZE_ARGUMENTS = """
Expand Down Expand Up @@ -358,6 +379,12 @@ $RASTERIZE_KEYWORDS
$FILENAME_KEYWORD
$SUFFIX_KEYWORD
Note on threading. Performance may be much better with `threaded=false`
if `reducer`/`op` are not `threadsafe`. `sum`, `prod`, `maximum`, `minimum`
`count` and `mean` (by combining `sum` and `count`) are threadsafe. If you know
your algorithm is threadsafe, use `threadsafe=true` to allow all optimisations.
Functions passed to `fill` are always threadsafe, and ignore the `threadsafe` argument.
# Example
Rasterize a shapefile for China and plot, with a border.
Expand Down Expand Up @@ -408,14 +435,14 @@ function rasterize(reducer::typeof(count), data; fill=nothing, init=nothing, kw.
isnothing(fill) || _count_fill_info(fill)
rasterize(data; kw..., name=:count, init=0, reducer=nothing, fill=_count_fill, missingval=0)
end
# `mean` is sum / count
# We can do better than this, but its easy for now
# `mean` is sum ./ count. This is actually optimal with threading,
# as its means order is irrelivent so its threadsafe.
function rasterize(reducer::typeof(DD.Statistics.mean), data; fill, kw...)
sums = rasterize(sum, data; kw..., fill)
counts = rasterize(count, data; kw..., fill=nothing)
rebuild(sums ./ counts; name=:mean)
end
function rasterize(data; to=nothing, fill, threaded=true, kw...)
function rasterize(data; to=nothing, fill, threaded=false, kw...)
r = Rasterizer(data; fill, threaded, kw...)
rc = RasterCreator(to, data; kw..., eltype=r.eltype, fill, missingval=r.missingval)
allocs = r.shape == :points ? nothing : _burning_allocs(rc.to; threaded)
Expand Down Expand Up @@ -541,7 +568,7 @@ function rasterize!(reducer::typeof(count), x::RasterStackOrArray, data; fill=no
isnothing(init) || @info _count_init_info(init)
rasterize!(x::RasterStackOrArray, data; kw..., reducer=nothing, op=nothing, fill=_count_fill, init=0)
end
function rasterize!(x::RasterStackOrArray, data; threaded=true, kw...)
function rasterize!(x::RasterStackOrArray, data; threaded=false, kw...)
if prod(size(x)) == 0
@warn "Destination is empty, rasterization skipped"
return x
Expand Down Expand Up @@ -583,8 +610,7 @@ function _rasterize!(A, ::GI.AbstractGeometryTrait, geom, fill, r::Rasterizer; a
end
# Fill points
function _rasterize!(A, trait::GI.AbstractPointTrait, point, fill, r::Rasterizer; allocs=nothing)
# Avoid race conditions whern Point is in a mixed set of Geometries
return _fill_point!(A, trait, point; fill, r.lock)
return _fill_point!(A, trait, point; fill, lock=r.lock)
end
function _rasterize!(A, trait::Nothing, geoms, fill, r::Rasterizer; allocs=nothing)
if r.shape === :point
Expand All @@ -598,10 +624,13 @@ end

# We rasterize all iterables from here
function _rasterize_iterable!(A, geoms, reducer, op, fillitr, r::Rasterizer, allocs)
# reduce
if isnothing(op) && !(fillitr isa Function)
# Rasterise into a bitarray and reduce it, on one or many threads.
# Memory-intensive for large workloads, but thread-safe and safe for e.g. `median`
if !(fillitr isa Function) && ((r.threaded && !r.threadsafe) || isnothing(op))
return _reduce_bitarray!(reducer, A, geoms, fillitr, r, allocs)
end
# Reduce by rasterizing directly on one or many threads,, with a lock
# TODO: use separate arrays and combine when `r.threadsafe == true`
range = _geomindices(geoms)
burnchecks = _alloc_burnchecks(range)
_run(range, r.threaded, r.progress, "Rasterizing...") do i
Expand Down

0 comments on commit 58f1e19

Please sign in to comment.