Skip to content

Commit

Permalink
Fix apply performance (#63)
Browse files Browse the repository at this point in the history
* fix performance of apply

---------

Co-authored-by: Anshul Singhvi <[email protected]>
  • Loading branch information
rafaqz and asinghvi17 authored Mar 3, 2024
1 parent 99fd7cf commit 844338a
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
fail-fast: false
matrix:
version:
- '1.6'
- '1.9'
- '1'
# - 'nightly'
os:
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ExactPredicates = "2"
GeoInterface = "1.2"
GeometryBasics = "0.4.7"
Proj = "1"
julia = "1.3"
julia = "1.9"

[extras]
ArchGDAL = "c9ce4bd3-c3d5-55b8-8973-c0e20141b8c3"
Expand Down
241 changes: 130 additions & 111 deletions src/primitives.jl
Original file line number Diff line number Diff line change
Expand Up @@ -93,42 +93,60 @@ flipped_geom = GO.apply(GI.PointTrait, geom) do p
(GI.y(p), GI.x(p))
end
"""
apply(f, ::Type{Target}, geom; kw...) where Target = _apply(f, Target, geom; kw...)
@inline function apply(
f::F, ::Type{Target}, geom; calc_extent=false, threaded=false, kw...
) where {F,Target}
threaded = _booltype(threaded)
calc_extent = _booltype(calc_extent)
_apply(f, Target, geom; threaded, calc_extent, kw...)
end

#=
We pass `threading` and `calc_extent` as types, not simple boolean values.
This is to help compilation - with a type to hold on to, it's easier for
the compiler to separate threaded and non-threaded code paths.
=#
struct _True end
struct _False end

@inline _booltype(x::Bool) = x ? _True() : _False()
@inline _booltype(x::Union{_True,_False}) = x

# Call _apply again with the trait of `geom`
_apply(f, ::Type{Target}, geom; kw...) where Target =
@inline _apply(f::F, ::Type{Target}, geom; kw...) where {Target,F} =
_apply(f, Target, GI.trait(geom), geom; kw...)
# There is no trait and this is an AbstractArray - so just iterate over it calling _apply on the contents
function _apply(f, ::Type{Target}, ::Nothing, A::AbstractArray; threaded=false, kw...) where Target
@inline function _apply(f::F, ::Type{Target}, ::Nothing, A::AbstractArray; threaded, kw...) where {F,Target}
# For an Array there is nothing else to do but map `_apply` over all values
# _maptasks may run this level threaded if `threaded==true`,
# but deeper `_apply` called in the closure will not be threaded
_maptasks(eachindex(A); threaded) do i
_apply(f, Target, A[i]; threaded=false, kw...)
end
apply_to_array(i) = _apply(f, Target, A[i]; threaded=_False(), kw...)
_maptasks(apply_to_array, eachindex(A), threaded)
end
# There is no trait and this is not an AbstractArray.
# Try to call _apply over it. We can't use threading
# as we don't know if we can can index into it. So just `map`.
function _apply(f, ::Type{Target}, ::Nothing, iterable; threaded=false, kw...) where Target
@inline function _apply(f::F, ::Type{Target}, ::Nothing, iterable; threaded, kw...) where {F,Target}
if threaded
# `collect` first so we can use threads
_apply(f, Target, collect(iterable); threaded, kw...)
else
map(x -> _apply(f, Target, x; kw...), iterable)
apply_to_iterable(x) = _apply(f, Target, x; kw...)
map(apply_to_iterable, iterable)
end
end
# Rewrap all FeatureCollectionTrait feature collections as GI.FeatureCollection
# Maybe use threads to call _apply on componenet features
function _apply(f, ::Type{Target}, ::GI.FeatureCollectionTrait, fc;
crs=GI.crs(fc), calc_extent=false, threaded=false
) where Target
@inline function _apply(f::F, ::Type{Target}, ::GI.FeatureCollectionTrait, fc;
crs=GI.crs(fc), calc_extent=_False(), threaded
) where {F,Target}

# Run _apply on all `features` in the feature collection, possibly threaded
features = _maptasks(1:GI.nfeature(fc); threaded) do i
feature = GI.getfeature(fc, i)
_apply(f, Target, feature; crs, calc_extent, threaded=false)::GI.Feature
end
if calc_extent
apply_to_feature(i) =
_apply(f, Target, GI.getfeature(fc, i); crs, calc_extent, threaded=_False())::GI.Feature
features = _maptasks(apply_to_feature, 1:GI.nfeature(fc), threaded)
if calc_extent isa _True
# Calculate the extent of the features
extent = mapreduce(GI.extent, Extents.union, features)
# Return a FeatureCollection with features, crs and caculated extent
Expand All @@ -139,14 +157,14 @@ function _apply(f, ::Type{Target}, ::GI.FeatureCollectionTrait, fc;
end
end
# Rewrap all FeatureTrait features as GI.Feature, keeping the properties
function _apply(f, ::Type{Target}, ::GI.FeatureTrait, feature;
crs=GI.crs(feature), calc_extent=false, threaded=false
) where Target
@inline function _apply(f::F, ::Type{Target}, ::GI.FeatureTrait, feature;
crs=GI.crs(feature), calc_extent=_False(), threaded
) where {F,Target}
# Run _apply on the contained geometry
geometry = _apply(f, Target, GI.geometry(feature); crs, calc_extent, threaded)
# Get the feature properties
properties = GI.properties(feature)
if calc_extent
if calc_extent isa _True
# Calculate the extent of the geometry
extent = GI.extent(geometry)
# Return a new Feature with the new geometry and calculated extent, but the oroginal properties and crs
Expand All @@ -158,41 +176,42 @@ function _apply(f, ::Type{Target}, ::GI.FeatureTrait, feature;
end
# Reconstruct nested geometries,
# maybe using threads to call _apply on component geoms
function _apply(f, ::Type{Target}, trait, geom;
crs=GI.crs(geom), calc_extent=false, threaded=false
)::(GI.geointerface_geomtype(trait)) where Target
@inline function _apply(f::F, ::Type{Target}, trait, geom;
crs=GI.crs(geom), calc_extent=_False(), threaded
)::(GI.geointerface_geomtype(trait)) where {F,Target}
# Map `_apply` over all sub geometries of `geom`
# to create a new vector of geometries
# TODO handle zero length
geoms = _maptasks(1:GI.ngeom(geom); threaded) do i
_apply(f, Target, GI.getgeom(geom, i); crs, calc_extent, threaded=false)
end
if calc_extent
# Calculate the extent of the sub geometries
extent = mapreduce(GI.extent, Extents.union, geoms)
# Return a new geometry of the same trait as `geom`,
# holding tnew `geoms` with `crs` and calcualted extent
return rebuild(geom, geoms; crs, extent)
else
# Return a new geometryof the same trait as `geom`, holding the new `geoms` with `crs`
return rebuild(geom, geoms; crs)
end
apply_to_geom(i) = _apply(f, Target, GI.getgeom(geom, i); crs, calc_extent, threaded=_False())
geoms = _maptasks(apply_to_geom, 1:GI.ngeom(geom), threaded)
return _apply_inner(geom, geoms, crs, calc_extent)
end
function _apply_inner(geom, geoms, crs, calc_extent::_True)
# Calculate the extent of the sub geometries
extent = mapreduce(GI.extent, Extents.union, geoms)
# Return a new geometry of the same trait as `geom`,
# holding tnew `geoms` with `crs` and calcualted extent
return rebuild(geom, geoms; crs, extent)
end
function _apply_inner(geom, geoms, crs, calc_extent::_False)
# Return a new geometryof the same trait as `geom`, holding the new `geoms` with `crs`
return rebuild(geom, geoms; crs)
end
# Fail loudly if we hit PointTrait without running `f`
# (after PointTrait there is no further to dig with `_apply`)
_apply(f, ::Type{Target}, trait::GI.PointTrait, geom; crs=nothing, kw...) where Target =
@inline _apply(f, ::Type{Target}, trait::GI.PointTrait, geom; crs=nothing, kw...) where Target =
throw(ArgumentError("target $Target not found, but reached a `PointTrait` leaf"))
# Finally, these short methods are the main purpose of `apply`.
# The `Trait` is a subtype of the `Target` (or identical to it)
# So the `Target` is found. We apply `f` to geom and return it to previous
# _apply calls to be wrapped with the outer geometries/feature/featurecollection/array.
_apply(f, ::Type{Target}, ::Trait, geom; crs=GI.crs(geom), kw...) where {Target,Trait<:Target} = f(geom)
_apply(f::F, ::Type{Target}, ::Trait, geom; crs=GI.crs(geom), kw...) where {F,Target,Trait<:Target} = f(geom)
# Define some specific cases of this match to avoid method ambiguity
for T in (
GI.PointTrait, GI.LinearRing, GI.LineString,
GI.MultiPoint, GI.FeatureTrait, GI.FeatureCollectionTrait
)
@eval _apply(f, target::Type{$T}, trait::$T, x; kw...) = f(x)
@eval _apply(f::F, target::Type{$T}, trait::$T, x; kw...) where F = f(x)
end

"""
Expand All @@ -206,54 +225,52 @@ The order and grouping of application of `op` is not guaranteed.
If `threaded==true` threads will be used over arrays and iterables,
feature collections and nested geometries.
"""
function applyreduce end
# Add dispatch argument for trait
applyreduce(f, op, target::Type, geom; threaded=false, init=nothing) =
_applyreduce(f, op, target, geom; threaded, init)
@inline function applyreduce(
f::F, op, ::Type{Target}, geom; threaded=false, init=nothing
) where {F,Target}
threaded = _booltype(threaded)
_applyreduce(f, op, Target, geom; threaded, init)
end

_applyreduce(f, op, target, geom; threaded, init) =
_applyreduce(f, op, target, GI.trait(geom), geom; threaded, init)
@inline _applyreduce(f::F, op, ::Type{Target}, geom; threaded, init) where {F,Target} =
_applyreduce(f, op, Target, GI.trait(geom), geom; threaded, init)
# Maybe use threads recucing over arrays
function _applyreduce(f, op, target::Type, ::Nothing, A::AbstractArray; threaded, init)
_mapreducetasks(op, eachindex(A); threaded, init) do i
_applyreduce(f, op, target, A[i]; threaded=false, init)
end
@inline function _applyreduce(f::F, op, ::Type{Target}, ::Nothing, A::AbstractArray; threaded, init) where {F,Target}
applyreduce_array(i) = _applyreduce(f, op, Target, A[i]; threaded=_False(), init)
_mapreducetasks(applyreduce_array, op, eachindex(A), threaded; init)
end
# Try to applyreduce over iterables
function _applyreduce(f, op, target::Type, ::Nothing, iterable; threaded, init)
@inline function _applyreduce(f::F, op, ::Type{Target}, ::Nothing, iterable; threaded, init) where {F,Target}
applyreduce_iterable(i) = _applyreduce(f, op, Target, x; threaded=_False(), init)
if threaded # Try to `collect` and reduce over the vector with threads
_applyreduce(f, op, target, collect(iterable); threaded, init)
_applyreduce(f, op, Target, collect(iterable); threaded, init)
else
# Try to `mapreduce` the iterable as-is
mapreduce(op, iterable; init) do x
_applyreduce(f, op, target, x; threaded=false, init)
end
mapreduce(applyreduce_iterable, op, iterable; init)
end
end
# Maybe use threads reducing over features of feature collections
function _applyreduce(f, op, target::Type, ::GI.FeatureCollectionTrait, fc; threaded, init)
_mapreducetasks(op, 1:GI.nfeature(fc); threaded, init) do i
_applyreduce(f, op, target, GI.getfeature(fc, i); threaded=false, init)
end
@inline function _applyreduce(f::F, op, ::Type{Target}, ::GI.FeatureCollectionTrait, fc; threaded, init) where {F,Target}
applyreduce_fc(i) = _applyreduce(f, op, Target, GI.getfeature(fc, i); threaded=_False(), init)
_mapreducetasks(applyreduce_fc, op, 1:GI.nfeature(fc), threaded; init)
end
# Features just applyreduce to their geometry
_applyreduce(f, op, target::Type, ::GI.FeatureTrait, feature; threaded, init) =
@inline _applyreduce(f::F, op, ::Type{Target}, ::GI.FeatureTrait, feature; threaded, init) where {F,Target} =
_applyreduce(f, op, target, GI.geometry(feature); threaded, init)
# Maybe use threads over components of nested geometries
function _applyreduce(f, op, target::Type, trait, geom; threaded, init)
_mapreducetasks(op, 1:GI.ngeom(geom); threaded, init) do i
_applyreduce(f, op, target, GI.getgeom(geom, i); threaded=false, init)
end
@inline function _applyreduce(f::F, op, ::Type{Target}, trait, geom; threaded, init) where {F,Target}
applyreduce_geom(i) = _applyreduce(f, op, Target, GI.getgeom(geom, i); threaded=_False(), init)
_mapreducetasks(applyreduce_geom, op, 1:GI.ngeom(geom), threaded; init)
end
# Don't thread over points it won't pay off
function _applyreduce(
f, op, target::Type, trait::Union{GI.LinearRing,GI.LineString,GI.MultiPoint}, geom;
@inline function _applyreduce(
f::F, op, ::Type{Target}, trait::Union{GI.LinearRing,GI.LineString,GI.MultiPoint}, geom;
threaded, init
)
_applyreduce(f, op, target, GI.getgeom(geom); threaded=false, init)
) where {F,Target}
_applyreduce(f, op, Target, GI.getgeom(geom); threaded=_False(), init)
end
# Apply f to the target
function _applyreduce(f, op, ::Type{Target}, ::Trait, x; kw...) where {Target<:GI.AbstractTrait,Trait<:Target}
@inline function _applyreduce(f::F, op, ::Type{Target}, ::Trait, x; kw...) where {F,Target<:GI.AbstractTrait,Trait<:Target}
f(x)
end
# Fail if we hit PointTrait
Expand All @@ -263,7 +280,7 @@ for T in (
GI.PointTrait, GI.LinearRing, GI.LineString,
GI.MultiPoint, GI.FeatureTrait, GI.FeatureCollectionTrait
)
@eval _applyreduce(f, op, target::Type{$T}, trait::$T, x; kw...) = f(x)
@eval _applyreduce(f::F, op, target::Type{$T}, trait::$T, x; kw...) where F = f(x)
end

"""
Expand Down Expand Up @@ -437,29 +454,32 @@ using Base.Threads: nthreads, @threads, @spawn
# Threading utility, modified Mason Protters threading PSA
# run `f` over ntasks, where f recieves an AbstractArray/range
# of linear indices
function _maptasks(f, taskrange; threaded=false)
if threaded
ntasks = length(taskrange)
# Customize this as needed.
# More tasks have more overhead, but better load balancing
tasks_per_thread = 2
chunk_size = max(1, ntasks ÷ (tasks_per_thread * nthreads()))
# partition the range into chunks
task_chunks = Iterators.partition(taskrange, chunk_size)
# Map over the chunks
tasks = map(task_chunks) do chunk
# Spawn a task to process this chunk
@spawn begin
# Where we map `f` over the chunk indices
map(f, chunk)
end
@inline function _maptasks(f::F, taskrange, threaded::_True)::Vector where F
ntasks = length(taskrange)
# Customize this as needed.
# More tasks have more overhead, but better load balancing
tasks_per_thread = 2
chunk_size = max(1, ntasks ÷ (tasks_per_thread * nthreads()))
# partition the range into chunks
task_chunks = Iterators.partition(taskrange, chunk_size)
# Map over the chunks
tasks = map(task_chunks) do chunk
# Spawn a task to process this chunk
@spawn begin
# Where we map `f` over the chunk indices
map(f, chunk)
end

# Finally we join the results into a new vector
return mapreduce(fetch, vcat, tasks)
else
return map(f, taskrange)
end

# Finally we join the results into a new vector
return mapreduce(fetch, vcat, tasks)
end
#=
Here we use the compiler directive `@assume_effects :foldable` to force the compiler
to lookup through the closure. This alone makes e.g. `flip` 2.5x faster!
=#
Base.@assume_effects :foldable @inline function _maptasks(f::F, taskrange, threaded::_False)::Vector where F
map(f, taskrange)
end

# Threading utility, modified Mason Protters threading PSA
Expand All @@ -468,27 +488,26 @@ end
#
# WARNING: this will not work for mean/median - only ops
# where grouping is possible
function _mapreducetasks(f, op, taskrange; threaded=false, init)
if threaded
ntasks = length(taskrange)
# Customize this as needed.
# More tasks have more overhead, but better load balancing
tasks_per_thread = 2
chunk_size = max(1, ntasks ÷ (tasks_per_thread * nthreads()))
# partition the range into chunks
task_chunks = Iterators.partition(taskrange, chunk_size)
# Map over the chunks
tasks = map(task_chunks) do chunk
# Spawn a task to process this chunk
@spawn begin
# Where we map `f` over the chunk indices
mapreduce(f, op, chunk; init)
end
@inline function _mapreducetasks(f::F, op, taskrange, threaded::_True; init) where F
ntasks = length(taskrange)
# Customize this as needed.
# More tasks have more overhead, but better load balancing
tasks_per_thread = 2
chunk_size = max(1, ntasks ÷ (tasks_per_thread * nthreads()))
# partition the range into chunks
task_chunks = Iterators.partition(taskrange, chunk_size)
# Map over the chunks
tasks = map(task_chunks) do chunk
# Spawn a task to process this chunk
@spawn begin
# Where we map `f` over the chunk indices
mapreduce(f, op, chunk; init)
end

# Finally we join the results into a new vector
return mapreduce(fetch, op, tasks; init)
else
return mapreduce(f, op, taskrange; init)
end

# Finally we join the results into a new vector
return mapreduce(fetch, op, tasks; init)
end
Base.@assume_effects :foldable function _mapreducetasks(f::F, op, taskrange, threaded::_False; init) where F
mapreduce(f, op, taskrange; init)
end
Loading

0 comments on commit 844338a

Please sign in to comment.