From 58f1e198e9f8988ce2f7b348e0a3210a8752c833 Mon Sep 17 00:00:00 2001 From: Rafael Schouten Date: Mon, 4 Dec 2023 18:45:21 +0100 Subject: [PATCH] introduce threadsafe so last and some custom funcs can be fast --- src/methods/rasterize.jl | 71 ++++++++++++++++++++++++++++------------ 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/src/methods/rasterize.jl b/src/methods/rasterize.jl index 592d68692..99f2454be 100644 --- a/src/methods/rasterize.jl +++ b/src/methods/rasterize.jl @@ -4,19 +4,31 @@ _reduce_op(::typeof(sum)) = Base.add_sum _reduce_op(::typeof(prod)) = Base.mul_prod _reduce_op(::typeof(minimum)) = min _reduce_op(::typeof(maximum)) = max +_reduce_op(::typeof(last)) = (a, b) -> b +# _reduce_op(::typeof(first)) = (a, b) -> a # Note: `first` must also be special-cased on the first call _reduce_op(x) = nothing -_reduce_init(reducer, st::AbstractRasterStack) = map(A -> _reduce_init(reducer, A), st) -_reduce_init(reducer, ::AbstractRaster{T}) where T = _reduce_init(reducer, T) -_reduce_init(reducer, nt::NamedTuple) = map(x -> _reduce_init(reducer, x), nt) -_reduce_init(f, x) = _reduce_init(f, typeof(x)) - -_reduce_init(::Nothing, x::Type{T}) where T = zero(T) -_reduce_init(f::Function, ::Type{T}) where T = zero(f((zero(nonmissingtype(T)), zero(nonmissingtype(T))))) -_reduce_init(::typeof(sum), ::Type{T}) where T = zero(nonmissingtype(T)) -_reduce_init(::typeof(prod), ::Type{T}) where T = oneunit(nonmissingtype(T)) -_reduce_init(::typeof(minimum), ::Type{T}) where T = typemax(nonmissingtype(T)) -_reduce_init(::typeof(maximum), ::Type{T}) where T = typemin(nonmissingtype(T)) +_is_op_threadsafe(::typeof(sum)) = true +_is_op_threadsafe(::typeof(prod)) = true +_is_op_threadsafe(::typeof(minimum)) = true +_is_op_threadsafe(::typeof(maximum)) = true +_is_op_threadsafe(::typeof(last)) = false +# _is_op_threadsafe(::typeof(first)) = false +_is_op_threadsafe(f) = false + +_reduce_init(reducer, st::AbstractRasterStack, missingval) = map(A -> _reduce_init(reducer, A, missingval), st) +_reduce_init(reducer, ::AbstractRaster{T}, missingval) where T = _reduce_init(reducer, T, missingval) +_reduce_init(reducer, nt::NamedTuple, missingval) = map(x -> _reduce_init(reducer, x, missingval), nt) +_reduce_init(f, x, missingval) = _reduce_init(f, typeof(x), missingval) + +_reduce_init(::Nothing, x::Type{T}, missingval) where T = zero(T) +_reduce_init(f::Function, ::Type{T}, missingval) where T = zero(f((zero(nonmissingtype(T)), zero(nonmissingtype(T))))) +_reduce_init(::typeof(sum), ::Type{T}, missingval) where T = zero(nonmissingtype(T)) +_reduce_init(::typeof(prod), ::Type{T}, missingval) where T = oneunit(nonmissingtype(T)) +_reduce_init(::typeof(minimum), ::Type{T}, missingval) where T = typemax(nonmissingtype(T)) +_reduce_init(::typeof(maximum), ::Type{T}, missingval) where T = typemin(nonmissingtype(T)) +_reduce_init(::typeof(last), ::Type{T}, missingval) where T = missingval +# _reduce_init(::typeof(first), ::Type{T}, missingval) where T = missingval struct FillChooser{F,I,M} fill::F @@ -81,6 +93,7 @@ struct Rasterizer{T,G,F,R,O,I,M} verbose::Bool progress::Bool threaded::Bool + threadsafe_op::Bool end function Rasterizer(geom, fill, fillitr; reducer=nothing, @@ -103,6 +116,8 @@ function Rasterizer(geom, fill, fillitr; op = _reduce_op(reducer) + threadsafe_op = _is_op_threadsafe(op) + shape = if isnothing(shape) if GI.isgeometry(geom) _geom_shape(geom) @@ -130,7 +145,7 @@ function Rasterizer(geom, fill, fillitr; end stable_reductions = (first, last, sum, prod, maximum, minimum) - init = isnothing(init) ? _reduce_init(reducer, filleltype) : init + init = isnothing(init) ? _reduce_init(reducer, filleltype, missingval) : init if shape == :points && !GI.isgeometry(geom) && !GI.trait(first(geom)) isa GI.PointTrait && @@ -140,7 +155,7 @@ function Rasterizer(geom, fill, fillitr; eltype, missingval = get_eltype_missingval(eltype, missingval, fillitr, init, filename, op, reducer) lock = threaded ? Threads.SpinLock() : nothing - return Rasterizer(eltype, geom, fillitr, reducer, op, init, missingval, lock, shape, boundary, verbose, progress, threaded) + return Rasterizer(eltype, geom, fillitr, reducer, op, init, missingval, lock, shape, boundary, verbose, progress, threaded, threadsafe_op) end function Rasterizer(data::T; fill, geomcolumn=nothing, kw...) where T if Tables.istable(T) # typeof so we dont check iterable table fallback in Tables.jl @@ -324,6 +339,12 @@ const RASTERIZE_KEYWORDS = """ - `verbose`: print information and warnings whne there are problems with the rasterisation. `true` by default. $THREADED_KEYWORD +- `threadsafe`: specify that custom `reducer` and/or `op` functions are thread-safe, + in that the order of operation or blocking does not matter. For example, + `sum` and `maximum` are thread-safe, because the answer is approximately (besides + floating point error) the same after running on nested blocks, or on all the data. + In contrast, `median` or `last` are not, because the blocking (`median`) or order (`last`) + matters. """ const RASTERIZE_ARGUMENTS = """ @@ -358,6 +379,12 @@ $RASTERIZE_KEYWORDS $FILENAME_KEYWORD $SUFFIX_KEYWORD +Note on threading. Performance may be much better with `threaded=false` +if `reducer`/`op` are not `threadsafe`. `sum`, `prod`, `maximum`, `minimum` +`count` and `mean` (by combining `sum` and `count`) are threadsafe. If you know +your algorithm is threadsafe, use `threadsafe=true` to allow all optimisations. +Functions passed to `fill` are always threadsafe, and ignore the `threadsafe` argument. + # Example Rasterize a shapefile for China and plot, with a border. @@ -408,14 +435,14 @@ function rasterize(reducer::typeof(count), data; fill=nothing, init=nothing, kw. isnothing(fill) || _count_fill_info(fill) rasterize(data; kw..., name=:count, init=0, reducer=nothing, fill=_count_fill, missingval=0) end -# `mean` is sum / count -# We can do better than this, but its easy for now +# `mean` is sum ./ count. This is actually optimal with threading, +# as its means order is irrelivent so its threadsafe. function rasterize(reducer::typeof(DD.Statistics.mean), data; fill, kw...) sums = rasterize(sum, data; kw..., fill) counts = rasterize(count, data; kw..., fill=nothing) rebuild(sums ./ counts; name=:mean) end -function rasterize(data; to=nothing, fill, threaded=true, kw...) +function rasterize(data; to=nothing, fill, threaded=false, kw...) r = Rasterizer(data; fill, threaded, kw...) rc = RasterCreator(to, data; kw..., eltype=r.eltype, fill, missingval=r.missingval) allocs = r.shape == :points ? nothing : _burning_allocs(rc.to; threaded) @@ -541,7 +568,7 @@ function rasterize!(reducer::typeof(count), x::RasterStackOrArray, data; fill=no isnothing(init) || @info _count_init_info(init) rasterize!(x::RasterStackOrArray, data; kw..., reducer=nothing, op=nothing, fill=_count_fill, init=0) end -function rasterize!(x::RasterStackOrArray, data; threaded=true, kw...) +function rasterize!(x::RasterStackOrArray, data; threaded=false, kw...) if prod(size(x)) == 0 @warn "Destination is empty, rasterization skipped" return x @@ -583,8 +610,7 @@ function _rasterize!(A, ::GI.AbstractGeometryTrait, geom, fill, r::Rasterizer; a end # Fill points function _rasterize!(A, trait::GI.AbstractPointTrait, point, fill, r::Rasterizer; allocs=nothing) - # Avoid race conditions whern Point is in a mixed set of Geometries - return _fill_point!(A, trait, point; fill, r.lock) + return _fill_point!(A, trait, point; fill, lock=r.lock) end function _rasterize!(A, trait::Nothing, geoms, fill, r::Rasterizer; allocs=nothing) if r.shape === :point @@ -598,10 +624,13 @@ end # We rasterize all iterables from here function _rasterize_iterable!(A, geoms, reducer, op, fillitr, r::Rasterizer, allocs) - # reduce - if isnothing(op) && !(fillitr isa Function) + # Rasterise into a bitarray and reduce it, on one or many threads. + # Memory-intensive for large workloads, but thread-safe and safe for e.g. `median` + if !(fillitr isa Function) && ((r.threaded && !r.threadsafe) || isnothing(op)) return _reduce_bitarray!(reducer, A, geoms, fillitr, r, allocs) end + # Reduce by rasterizing directly on one or many threads,, with a lock + # TODO: use separate arrays and combine when `r.threadsafe == true` range = _geomindices(geoms) burnchecks = _alloc_burnchecks(range) _run(range, r.threaded, r.progress, "Rasterizing...") do i