Skip to content

Commit

Permalink
Merge pull request #109 from c-hydro/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
ltrotter authored Oct 21, 2024
2 parents 63092d1 + e7595a2 commit 8a32ae9
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 36 deletions.
15 changes: 11 additions & 4 deletions door/base_downloaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,23 @@ def get_source(cls, source: Optional[str] = None):
elif hasattr(cls, 'source'):
return cls.source

def set_bounds(self, bounds: None|BoundingBox|list[float]|tuple[float]) -> None:
def set_bounds(self, bounds: None|BoundingBox|list[float]|tuple[float]|Dataset) -> None:
"""
Set the bounds of the data to download.
"""
if bounds is None:
return
elif isinstance(bounds, (list, tuple)):
bounds = BoundingBox(*bounds)

self.bounds = bounds
_bounds = BoundingBox(*bounds)
elif isinstance(bounds, str):
_bounds = BoundingBox.from_file(bounds)
else:
try:
_bounds = BoundingBox.from_dataset(bounds)
except:
raise ValueError('Invalid bounds')

self.bounds = _bounds

def set_destination(self, destination: Dataset|dict|str|None) -> None:
"""
Expand Down
8 changes: 6 additions & 2 deletions door/data_sources/cds/cds_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ class CDSDownloader(APIDownloader):

name = "CDS_downloader"
apikey_env_vars = 'CDSAPI_KEY' # this should be in the form UID:API_KEY already
cds_url = 'https://cds-beta.climate.copernicus.eu/api'
cds_url = 'https://cds.climate.copernicus.eu/api'

def __init__(self, dataset) -> None:

# if key is None, this will automatically look for the .cdsapirc file
client = cdsapi.Client(url=self.cds_url, key=os.getenv(self.apikey_env_vars))
key = os.getenv(self.apikey_env_vars, None)
if isinstance(key, str):
if key.startswith("'" or '"') and key.endswith("'" or '"'):
key = key[1:-1]
client = cdsapi.Client(url=self.cds_url, key=key)

super().__init__(client)
self.dataset = dataset
Expand Down
42 changes: 34 additions & 8 deletions door/data_sources/cds/era5_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from ...tools import timestepping as ts
from ...tools.timestepping.timestep import TimeStep
from ...tools.timestepping.fixed_num_timestep import FixedNTimeStep

class ERA5Downloader(CDSDownloader):

Expand Down Expand Up @@ -152,7 +153,28 @@ def build_request(self,
}

return request


def get_last_published_ts(self, ts_per_year = None, **kwargs) -> ts.TimeRange:

"""
Get the last published date for the dataset.
"""
if ts_per_year is None:
ts_per_year = self.ts_per_year

# get the last published timestep
last_published = self.get_last_published_date()
if ts_per_year == 365:
TimeStep = ts.Day
else:
TimeStep = FixedNTimeStep.get_subclass(ts_per_year)
return TimeStep.from_date(last_published + dt.timedelta(days=1)) - 1

def get_last_published_date(self, **kwargs) -> dt.datetime:
now = dt.datetime.now()
now = now.replace(hour=0, minute=0, second=0, microsecond=0)
return now - dt.timedelta(days=6)

def _get_data_ts(self,
timestep: TimeStep,
space_bounds: BoundingBox,
Expand Down Expand Up @@ -251,22 +273,26 @@ def _get_data_ts(self,
timestep_start = agg_timestep.start
timestep_end = agg_timestep.end

# filter data to the aggregation timestep
inrange = (vardata.time.dt.date >= timestep_start.date()) & (vardata.time.dt.date <= timestep_end.date())
vardata_ts = vardata.sel(time = inrange)

# add start and end time as attributes
vardata.attrs['start_time'] = timestep_start
vardata.attrs['end_time'] = timestep_end
vardata_ts.attrs['start_time'] = timestep_start
vardata_ts.attrs['end_time'] = timestep_end

# do the necessary aggregations:
for agg in varopts['agg_method']:

vardata.attrs['agg_function'] = agg
vardata_ts.attrs['agg_function'] = agg
if agg == 'mean':
aggdata = vardata.mean(dim='time', skipna = False)
aggdata = vardata_ts.mean(dim='time', skipna = False)
elif agg == 'max':
aggdata = vardata.max(dim='time', skipna = False)
aggdata = vardata_ts.max(dim='time', skipna = False)
elif agg == 'min':
aggdata = vardata.min(dim='time', skipna = False)
aggdata = vardata_ts.min(dim='time', skipna = False)
elif agg == 'sum':
aggdata = vardata.sum(dim='time', skipna = False)
aggdata = vardata_ts.sum(dim='time', skipna = False)

aggdata = aggdata.rio.set_spatial_dims('longitude', 'latitude')
aggdata = aggdata.rio.write_crs(self.spatial_ref)
Expand Down
40 changes: 40 additions & 0 deletions door/data_sources/chirps/chirps_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
from typing import Generator
import numpy as np
import xarray as xr
import datetime as dt
import requests

from ...base_downloaders import URLDownloader
from ...tools import timestepping as ts
from ...tools.timestepping.timestep import TimeStep
from ...tools.timestepping.fixed_num_timestep import FixedNTimeStep

from ...utils.space import BoundingBox, crop_to_bb
from ...utils.io import decompress_gz
Expand Down Expand Up @@ -64,6 +68,42 @@ def set_product(self, product: str) -> None:
self.nodata = self.available_products[product]["nodata"]
self.prelim_nodata = self.available_products[product]["prelim_nodata"]

def get_last_published_ts(self, prelim = None, product = None, **kwargs) -> ts.TimeRange:

"""
Get the last published date for the dataset.
"""
if prelim is None:
prelim = self.get_prelim

if product is None:
product = self.product

ts_per_year = self.available_products[product]["ts_per_year"]
url = self.available_products[product]["url"] if not prelim else self.available_products[product]["prelim_url"]

if ts_per_year == 365:
TimeStep = ts.Day
else:
TimeStep = FixedNTimeStep.get_subclass(ts_per_year)

current_timestep = TimeStep.from_date(dt.datetime.now())
while True:
current_url = url.format(timestep = current_timestep)

# send a request to the url
response = requests.head(current_url)

# if the request is successful, the last published timestep is the current timestep
if response.status_code == 200:
return current_timestep

# if the request is not successful, move to the previous timestep
current_timestep -= 1

def get_last_published_date(self, **kwargs) -> dt.datetime:
return self.get_last_published_ts(**kwargs).end

def _get_data_ts(self,
timestep: TimeStep,
space_bounds: BoundingBox,
Expand Down
39 changes: 21 additions & 18 deletions door/data_sources/earthdata/cmr_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class CMRDownloader(DOORDownloader):
'make_mosaic': True,
'crop_to_bounds': True,
'keep_tiles_naming': False,
'selected_tiles' : None
}

file_ext = ['.hdf', '.h5']
Expand Down Expand Up @@ -167,7 +168,7 @@ def build_cmr_query(self, time_start: datetime, time_end: datetime, bounding_box

cmr_base_url = ('{0}provider={1}'
'&sort_key=start_date&sort_key=producer_granule_id'
'&scroll=true&page_size={2}'.format(self.cmr_url, self.provider, self.cmr_page_size))
'&page_size={2}'.format(self.cmr_url, self.provider, self.cmr_page_size))

product_query = self.fomat_product(self.product_id)
version_query = self.format_version(self.version)
Expand All @@ -189,33 +190,31 @@ def cmr_search(self, time: ts.TimeRange, space_bounds: BoundingBox) -> dict:
time_end = time.end

cmr_query_url = self.build_cmr_query(time_start, time_end, bounding_box)
cmr_scroll_id = None
cmr_searchafter = None
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
try:
urls = []
while True:
req = Request(cmr_query_url)
if cmr_scroll_id:
req.add_header('cmr-scroll-id', cmr_scroll_id)
if cmr_searchafter:
req.add_header('CMR-Search-After', cmr_searchafter)
response = urlopen(req, context=ctx)
if not cmr_scroll_id:
# Python 2 and 3 have different case for the http headers
headers = {k.lower(): v for k, v in dict(response.info()).items()}
cmr_scroll_id = headers['cmr-scroll-id']
hits = int(headers['cmr-hits'])

# the header 'cmr-search-after' is used to get the next page of results
# once we hit a page with no 'cmr-search-after' header, we have all the results
headers = {k.lower(): v for k, v in dict(response.info()).items()}
cmr_searchafter = headers.get('cmr-search-after', None)
if not cmr_searchafter:
break

search_page = response.read()
search_page = json.loads(search_page.decode('utf-8'))
url_scroll_results = cmr_filter_urls(search_page, extensions=self.file_ext)
if not url_scroll_results:
break
if hits > self.cmr_page_size:
sys.stdout.flush()
urls += url_scroll_results
valid_results = cmr_filter_urls(search_page, extensions=self.file_ext, selected_tiles=self.selected_tiles)

urls += valid_results

if hits > self.cmr_page_size:
print()
return urls
except KeyboardInterrupt:
quit()
Expand Down Expand Up @@ -273,7 +272,7 @@ def format_filename_filter(time: datetime) -> str:
filename_filter = time.strftime('*A%Y%j*')
return f'&producer_granule_id[]={filename_filter}&options[producer_granule_id][pattern]=true'

def cmr_filter_urls(search_results, extensions=['.hdf', '.h5']) -> list[str]:
def cmr_filter_urls(search_results, extensions=['.hdf', '.h5'], selected_tiles = None) -> list[str]:
"""Select only the desired data files from CMR response."""
if 'feed' not in search_results or 'entry' not in search_results['feed']:
return []
Expand Down Expand Up @@ -312,6 +311,10 @@ def cmr_filter_urls(search_results, extensions=['.hdf', '.h5']) -> list[str]:
continue
unique_filenames.add(filename)

if selected_tiles is not None:
if not any(tile in filename for tile in selected_tiles):
continue

urls.append(link['href'])

return urls
40 changes: 40 additions & 0 deletions door/data_sources/hsaf/hsaf_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
from ...utils.auth import get_credentials
from ...utils.io import decompress_bz2

import datetime as dt
import requests

from ...base_downloaders import URLDownloader
from ...tools import timestepping as ts
from ...tools.timestepping.timestep import TimeStep
from ...tools.timestepping.fixed_num_timestep import FixedNTimeStep

# from dam.utils.io_geotiff import read_geotiff_asXarray, write_geotiff_fromXarray

Expand Down Expand Up @@ -124,6 +130,40 @@ def find_parents_of_custom_variables(self, custom_variables: dict) -> dict:
parents[var]['weights'].append(overlap / size)

return parents

def get_last_published_ts(self, product = None, **kwargs) -> ts.TimeRange:

"""
Get the last published date for the dataset.
"""

if product is None:
product = self.product

ts_per_year = self.available_products[product]["ts_per_year"]
url = self.available_products[product]["url"]

if ts_per_year == 365:
TimeStep = ts.Day
else:
TimeStep = FixedNTimeStep.get_subclass(ts_per_year)

current_timestep = TimeStep.from_date(dt.datetime.now())
while True:
current_url = url.format(timestep = current_timestep)

# send a request to the url
response = requests.head(current_url)

# if the request is successful, the last published timestep is the current timestep
if response.status_code == 200:
return current_timestep

# if the request is not successful, move to the previous timestep
current_timestep -= 1

def get_last_published_date(self, **kwargs) -> dt.datetime:
return self.get_last_published_ts(**kwargs).end

def _get_data_ts(self,
timestep: TimeStep,
Expand Down
11 changes: 11 additions & 0 deletions door/utils/space.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import xarray as xr
import rioxarray as rxr

from ..tools.data import Dataset

class BoundingBox():

default_datum = 'EPSG:4326'
Expand All @@ -34,6 +36,15 @@ def __init__(self,

# buffer the bounding box
self.buffer_bbox(buffer)

@staticmethod
def from_dataset(dataset: Dataset, buffer: float = 0.0):
data:xr.DataArray = dataset.get_data()

# get the bounding box of the data
left, bottom, right, top = data.rio.bounds()

return BoundingBox(left, bottom, right, top, datum = data.rio.crs.to_wkt(), buffer = buffer)

@staticmethod
def from_file(grid_file, buffer: float = 0.0):
Expand Down
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name='door',
version='2.2.0-alpha',
version='2.2.0',
packages=find_packages(),
description='A package for operational retrieval of raster data from different sources',
author='Luca Trotter',
Expand All @@ -32,13 +32,14 @@
'dask',
'scipy',
'ecmwf-opendata >= 0.2.0',
'cdsapi',
'cdsapi >= 0.7.2',
'ftpretty',
'pandas',
'drops2 @ git+https://github.com/CIMAFoundation/drops2.git',
'matplotlib',
'geopandas',
'boto3'
'boto3',
'img2pdf'
],
python_requires='>=3.10',
test_suite='tests',
Expand Down

0 comments on commit 8a32ae9

Please sign in to comment.