Skip to content

Commit

Permalink
Introduce sync utility with aria2c. (#69)
Browse files Browse the repository at this point in the history
* Introduce sync utility with aria2c.

* Implement comments.

* Fix search with args.
  • Loading branch information
evetion authored Jan 9, 2024
1 parent 6f4c280 commit b5829fe
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 46 deletions.
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.4.0"

[deps]
AWSS3 = "1c724243-ef5b-51ab-93f4-b0a88ac62a95"
Aria2_jll = "9ab3bdc3-1250-5043-8fac-ac7e82d2cbc9"
CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597"
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Expand Down
3 changes: 3 additions & 0 deletions src/GEDI/GEDI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Base.@kwdef mutable struct GEDI_Granule{product} <: Granule
polygons::MultiPolygonType = MultiPolygonType()
end

sproduct(::GEDI_Granule{product}) where {product} = product
mission(::GEDI_Granule) = :GEDI

function Base.copy(g::GEDI_Granule{product}) where {product}
GEDI_Granule{product}(g.id, g.url, g.info, copy(g.polygons))
end
Expand Down
3 changes: 3 additions & 0 deletions src/ICESat-2/ICESat-2.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Base.@kwdef mutable struct ICESat2_Granule{product} <: Granule
polygons::MultiPolygonType = MultiPolygonType()
end

sproduct(::ICESat2_Granule{product}) where {product} = product
mission(::ICESat2_Granule) = :ICESat2

function Base.copy(g::ICESat2_Granule{product}) where {product}
ICESat2_Granule{product}(g.id, g.url, g.info, copy(g.polygons))
end
Expand Down
14 changes: 10 additions & 4 deletions src/ICESat/ICESat.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ Base.@kwdef mutable struct ICESat_Granule{product} <: Granule
polygons::MultiPolygonType = MultiPolygonType()
end

sproduct(::ICESat_Granule{product}) where {product} = product
mission(::ICESat_Granule) = :ICESat

function Base.copy(g::ICESat_Granule{product}) where {product}
return ICESat_Granule{product}(g.id, g.url, g.info, copy(g.polygons))
end
Expand Down Expand Up @@ -47,17 +50,20 @@ end

function icesat_info(filename)
id, _ = splitext(basename(filename))
type, revision, orbit, cycle, track, segment, version, filetype =
type, release, orbit, cycle, track, segment, revision, filetype =
split(id, "_")
return (
type = Symbol(type),
phase = parse(Int, orbit[1]),
rgt = parse(Int, track[2]),
instance = parse(Int, track[3:4]),
rgt = parse(Int, orbit[2]),
instance = parse(Int, orbit[3:4]),
cycle = parse(Int, cycle),
track = parse(Int, track),
segment = parse(Int, segment),
version = parse(Int, version),
revision = parse(Int, revision),
calibration = parse(Int, release[1]),
filetype = parse(Int, filetype),
version = parse(Int, release[2:3]),
)
end

Expand Down
2 changes: 1 addition & 1 deletion src/SpaceLiDAR.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ include("search.jl")
include("geointerface.jl")
include("env.jl")

export find, search, download!, download, netrc!, instantiate, info, angle, angle!, shift
export find, search, sync, download!, download, netrc!, instantiate, info, angle, angle!, shift
export lines, points, in_bbox, bounds, classify, isvalid, rm, to_egm2008!
export ICESat_Granule, ICESat2_Granule, GEDI_Granule, convert
export granule_from_file, granules_from_folder, write_granule_urls!
Expand Down
92 changes: 87 additions & 5 deletions src/granule.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using HDF5
import Downloads
import AWSS3
using Aria2_jll

# Custom downloader for Julia 1.6 which doensn't have NETRC + Cookie support
# This is a method because it will segfault if precompiled.
Expand Down Expand Up @@ -58,7 +59,6 @@ function _show(io, g::T) where {T<:Granule}
print(io, "$T with id $(g.id)")
end


MultiPolygonType = Vector{Vector{Vector{Vector{Float64}}}}

function HDF5.h5open(granule::Granule)
Expand Down Expand Up @@ -124,22 +124,104 @@ end
download!(granules::Vector{<:Granule}, folder=".")
Like [`download!`](@ref), but for a vector of `granules`.
Will make use of aria2c (parallel).
"""
function download!(granules::Vector{Granule}, folder::AbstractString = ".")
function download!(granules::Vector{<:Granule}, folder::AbstractString = ".")

# Download serially if s3 links are present
if any(g -> startswith(g.url, "s3"), granules)
return map(g -> download!(g, folder), granules)
end

f = write_urls(granules)
cmd = `$(Aria2_jll.aria2c()) -i $f -c -d $folder`
local io
try
io = run(pipeline(cmd, stdout = stdout, stderr = stderr), wait = false)
while process_running(io)
sleep(1)
end
catch e
kill(io)
println()
throw(e)
end

for granule in granules
download!(granule, folder)
granule.url = joinpath(folder, granule.id)
end
granules
end

"""
download(granules::Vector{<:Granule}, folder=".")
Like [`download`](@ref), but for a vector of `granules`.
"""
function download(granules::Vector{Granule}, folder::AbstractString = ".")
map(granule -> download(granule, folder), granules)
function download(granules::Vector{<:Granule}, folder::AbstractString = ".")

# Download serially if s3 links are present
if any(g -> startswith(g.url, "s3"), granules)
return map(g -> download(g, folder), granules)
else
download!(copy.(granules), folder)
end
end

function Base.filesize(granule::T) where {T<:Granule}
filesize(granule.url)
end

Base.isequal(a::Granule, b::Granule) = a.id == b.id
Base.hash(g::Granule, h::UInt) = hash(g.id, h)

"""
sync(folder::AbstractString, all::Bool=false; kwargs...)
sync(folders::AbstractVector{<:AbstractString}, all::Bool=false; kwargs...)
sync(product::Symbol, folder::AbstractString, all::Bool=false; kwargs...)
sync(product::Symbol, folders::AbstractVector{<:AbstractString}, all::Bool=false; kwargs...)
Syncronize an existing archive of local granules in `folder(s)` with the latest granules available.
Specifically, this will run [`search`](@ref) and [`download`](@ref) for any granules not yet
present in folder(s), to the *first* folder in the list.
!!! warning
Using sync could result in downloading significant (TB+) amounts of data.
Assumes all folders contain granules of the same product. If not, pass the
product as Symbol: [`sync(::Symbol, folders, all)`](@ref) instead.
When `all` is false (the default), sync will search only for granules past the date of
the latest granule found in `folders`. If true, it will search for all granules.
Note that ICESat granules are not timestamped, so sync will try to download
*all* ICESat granules not yet present, regardless of this setting.
Any `kwargs...` are passed to the [`search`](@ref) function. This enables
sync to only download granules within a certain extent, for example.
"""
function sync(folders::AbstractVector{<:AbstractString}, all::Bool = false; kwargs...)
grans = reduce(vcat, granules.(folders))
_sync!(grans, first(folders), all; kwargs...)
end
sync(folder::AbstractString, all::Bool = false; kwargs...) = sync([folder], all; kwargs...)

function sync(product::Symbol, folders::AbstractVector{<:AbstractString}, all::Bool = false; kwargs...)
grans = reduce(vcat, granules.(folders))
filter!(g -> sproduct(g) == product, grans)
_sync!(grans, first(folders), all; kwargs...)
end
sync(product::Symbol, folder::AbstractString, all::Bool = false; kwargs...) = sync(product, [folder], all; kwargs...)

function _sync!(granules, folder, all; kwargs...)
isempty(granules) && error("No granules found in provided folder(s).")
g = first(granules)
ngranules = if length(granules) == 0 || !haskey(info(granules[end]), :date) || all
Set(search(g; kwargs...))
else
sort!(granules, by = x -> x.id)
Set(search(g; after = info(granules[end]).date, kwargs...))
end
setdiff!(ngranules, Set(granules))
download!(collect(ngranules), folder)
end
10 changes: 9 additions & 1 deletion src/search.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ function search(
end

search(::Mission{X}, product, args...; kwargs...) where {X} =
throw(ArgumentError("Mission $X not supported. Currently supported are :ICESat, :ICESat2, and :GEDI."))
throw(ArgumentError("Search doesn't support arguments $args. Did you mean to use keywords?"))

search(::Mission{X}, product; kwargs...) where {X} =
throw(ArgumentError("Combination of Mission $X and Product $product not supported. Please make an issue."))

@deprecate find(mission::Symbol, product::AbstractString, bbox, version) search(
mission,
Expand All @@ -134,6 +137,11 @@ function search(mission::Symbol, product::Symbol, args...; kwargs...)
search(Mission(mission), product, args...; kwargs...)
end

function search(g::Granule; kwargs...)
initial = (; version = info(g).version)
search(mission(g), sproduct(g); merge(initial, kwargs)...)
end

function parse_polygon(polygons, T = Float64)
o = Vector{Vector{Vector{Vector{T}}}}()
for polygon in polygons
Expand Down
20 changes: 15 additions & 5 deletions src/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Create mission specific granules from a folder with .h5 files, using [`granule`]
function granules(foldername::AbstractString)
return [
granule(joinpath(foldername, file)) for
file in readdir(foldername) if lowercase(splitext(file)[end]) == ".h5"
file in readdir(foldername) if lowercase(splitext(file)[end]) == ".h5" && !isfile("$(file).aria2")
]
end
@deprecate granules_from_folder(foldername::AbstractString) granules(foldername::AbstractString)
Expand Down Expand Up @@ -101,16 +101,26 @@ urls(g::Vector{<:Granule}) = getfield.(g, :url)
Write all granule urls to a file.
"""
function write_urls(fn::String, granules::Vector{<:Granule})
function write_urls(fn::String, granules::AbstractVector{<:Granule})
open(fn, "w") do f
for granule in granules
println(f, url(granule))
end
write_urls(f, granules)
end
abspath(fn)
end
@deprecate write_granule_urls! write_urls

function write_urls(granules::AbstractVector{<:Granule})
fn, io = mktemp()
write_urls(io, granules)
close(io)
fn
end

function write_urls(f::IOStream, granules::AbstractVector{<:Granule})
for granule in granules
println(f, url(granule))
end
end

"""
isvalid(g::Granule)
Expand Down
70 changes: 40 additions & 30 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,36 +87,40 @@ empty_bbox = (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)
@test_throws ErrorException SpaceLiDAR.search(:ICESat2, :ATL08, after = now() - Month(47), before = now() - Month(48))
end

# @testset "download" begin
# if "EARTHDATA_USER" in keys(ENV)
# @info "Setting up Earthdata credentials for Github Actions"
# SpaceLiDAR.netrc!(
# get(ENV, "EARTHDATA_USER", ""),
# get(ENV, "EARTHDATA_PW", ""),
# )
# end
# granules = search(:ICESat, :GLAH06, bbox = convert(Extent, (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)))
# g = granules[1]

# try
# SL.download!(g)
# @test isfile(g)
# catch e
# if e isa Downloads.RequestError
# @error "Could not download granule due to network error(s)"
# else
# rethrow(e)
# end
# end
# rm(g)

# # This only works on us-west-2 region in AWS
# # granules = search(:ICESat2, :ATL08, bbox = convert(Extent, (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)), s3 = true)
# # g = granules[1]
# # SL.download!(g)
# # @test isfile(g)
# # rm(g)
# end
@testset "download" begin
if "EARTHDATA_USER" in keys(ENV)
@info "Setting up Earthdata credentials for Github Actions"
SpaceLiDAR.netrc!(
get(ENV, "EARTHDATA_USER", ""),
get(ENV, "EARTHDATA_PW", ""),
)
end
granules = search(:ICESat, :GLAH06, bbox = convert(Extent, (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)))
g = granules[1]

try
SL.download!(g)
@test isfile(g)
catch e
if e isa Downloads.RequestError
@error "Could not download granule due to network error(s)"
else
rethrow(e)
end
end
rm(g)

# Test syncing of granules
sync(["data/"], after = now(), bbox = convert(Extent, (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)))
sync(:GLAH14, "data/", after = now(), bbox = convert(Extent, (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)))

# This only works on us-west-2 region in AWS
# granules = search(:ICESat2, :ATL08, bbox = convert(Extent, (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)), s3 = true)
# g = granules[1]
# SL.download!(g)
# @test isfile(g)
# rm(g)
end

@testset "granules" begin
og = SL.granule_from_file(GLAH06_fn)
Expand All @@ -129,6 +133,12 @@ empty_bbox = (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0)
@test length(gs) == 7
copies = copy.(gs)

# Set different path, but same id
og.url = "data"
@test !(og === g)
@test isequal(og, g)
@test hash(og) == hash(g)

fgs = SL.in_bbox(gs, (min_x = 4.0, min_y = 40.0, max_x = 5.0, max_y = 50.0))
@test length(fgs) == 2
SL.bounds.(fgs)
Expand Down

0 comments on commit b5829fe

Please sign in to comment.