Skip to content

Commit

Permalink
Merge branch 'update_backup_gfs_downloader' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
JanvE97 authored Oct 30, 2024
2 parents aedd984 + f60b812 commit 29022f2
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 140 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
.env
secrets.py
secrets2.py
local_secret.py
data/*
test/*
*.vrt
*.ipynb
*.tif
*.tif.aux.xml
*.nc
*.csv

# Local data files
pipeline/data
Expand Down
304 changes: 183 additions & 121 deletions flash_flood_pipeline/data_download/collect_data.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,28 @@
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import requests

from math import pi
from numpy import cos, sin
import netCDF4 as nc
from pathlib import Path
import rioxarray
from rasterio.enums import Resampling
import os
import json
from data_download.get_gauge_from_gmail import get_satellite_data
from data_download.utils.tunnel_fast import tunnel_fast
from data_download.utils.extract_lat_lon import extract_lat_lon
from data_download.process_compacted_iridium_data import (
process_compacted_data,
)
from utils.general_utils.round_to_nearest_hour import (
round_to_nearest_hour,
)
from settings.base import (
HISTORIC_TIME_PERIOD_DAYS,
WATERLEVEL_SENSOR,
METEO_RAIN_SENSOR,
)

from itertools import compress


def tunnel_fast(latvar, lonvar, lat0, lon0):
"""
Find closest point in a set of (lat,lon) points to specified point
latvar - 2D latitude variable from an open netCDF dataset
lonvar - 2D longitude variable from an open netCDF dataset
lat0,lon0 - query point
Returns iy,ix such that the square of the tunnel distance
between (latval[it,ix],lonval[iy,ix]) and (lat0,lon0)
is minimum.
"""
rad_factor = pi / 180.0 # for trignometry, need angles in radians
# Read latitude and longitude from file into numpy arrays
latvals = latvar[:] * rad_factor
lonvals = lonvar[:] * rad_factor
ny, nx = latvals.shape
lat0_rad = lat0 * rad_factor
lon0_rad = lon0 * rad_factor
# Compute numpy arrays for all values, no loops
clat, clon = cos(latvals), cos(lonvals)
slat, slon = sin(latvals), sin(lonvals)
delX = cos(lat0_rad) * cos(lon0_rad) - clat * clon
delY = cos(lat0_rad) * sin(lon0_rad) - clat * slon
delZ = sin(lat0_rad) - slat
dist_sq = delX**2 + delY**2 + delZ**2
minindex_1d = dist_sq.argmin() # 1D index of minimum element
iy_min, ix_min = np.unravel_index(minindex_1d, latvals.shape)
return iy_min, ix_min


class dataGetter:
def __init__(self, ta_gdf):
self.ta_gdf = ta_gdf
Expand Down Expand Up @@ -226,102 +194,196 @@ def get_rain_gauge(self):
return rain_df

def get_rain_forecast(self):
gfs_data = {}
# historic_start = round_to_nearest_hour(datetime.now()) - timedelta(
# days=2, hours=3
# )
# historic_date_list = [
# historic_start + timedelta(hours=3 * x) for x in range(18)
# ]
forecast_start = round_to_nearest_hour(datetime.now()) - timedelta(hours=9)
# forecast_date_list = [
# forecast_start + timedelta(hours=3 * x) for x in range(30)
# ]
forecast_start_hour = round(forecast_start.hour / 6) * 6
if forecast_start_hour < 12:
forecast_start_hour = "0" + str(forecast_start_hour)
if forecast_start_hour == 24:
forecast_start_hour = "00"
forecast_start = forecast_start + timedelta(days=1)
# nc_dataset_historic = nc.Dataset(
# "https://nomads.ncep.noaa.gov/dods/gfs_0p25/gfs{}/gfs_0p25_00z".format(
# historic_start.strftime("%Y%m%d")
# )
# )
# nc_dataset_forecast = nc.Dataset(
# "https://nomads.ncep.noaa.gov/dods/gfs_0p25/gfs{}/gfs_0p25_{}z".format(
# forecast_start.strftime("%Y%m%d"), forecast_start_hour
# )
# )
# latvar = nc_dataset_historic.variables["lat"][:]
# lat_dim = len(latvar)
# lonvar = nc_dataset_historic.variables["lon"][:]
# lon_dim = len(lonvar)
# latvar = np.stack([latvar for _ in range(lon_dim)], axis=0)
# lonvar = np.stack([lonvar for _ in range(lat_dim)], axis=1)
ta_gdf_4326 = self.ta_gdf.copy()
ta_gdf_4326.to_crs(4326, inplace=True)
now = datetime.now()

xds = rioxarray.open_rasterio(
r"data/cosmo/COSMO_MLW_{}T00_prec.nc".format(now.strftime("%Y%m%d"))
)
xds.rio.write_crs("epsg:4326", inplace=True)

upscale_factor = 8
new_width = xds.rio.width * upscale_factor
new_height = xds.rio.height * upscale_factor
xds_upsampled_forecast = xds.rio.reproject(
xds.rio.crs,
shape=(new_height, new_width),
resampling=Resampling.bilinear,
)
datetime_list_forecast = [
datetime.strptime(x.isoformat(), "%Y-%m-%dT%H:%M:%S")
for x in xds_upsampled_forecast.time.data[:]
]

past = now - timedelta(days=5)
xds = rioxarray.open_rasterio(

expected_cosmo_hindcast_path = Path(
r"data/cosmo/COSMO_MLW_{}T00_prec.nc".format(past.strftime("%Y%m%d"))
)
xds.rio.write_crs("epsg:4326", inplace=True)

xds_upsampled_hindcast = xds.rio.reproject(
xds.rio.crs,
shape=(new_height, new_width),
resampling=Resampling.bilinear,
expected_cosmo_forecast_path = Path(
r"data/cosmo/COSMO_MLW_{}T00_prec.nc".format(now.strftime("%Y%m%d"))
)
datetime_list_hindcast = [
datetime.strptime(x.isoformat(), "%Y-%m-%dT%H:%M:%S")
for x in xds_upsampled_hindcast.time.data[:]
]

for _, row in ta_gdf_4326.iterrows():
xds_clipped = xds_upsampled_hindcast.rio.clip(
[row["geometry"]], ta_gdf_4326.crs
gfs_data = {}

if (
not expected_cosmo_forecast_path.exists()
or not expected_cosmo_hindcast_path.exists()
):
# switch to gfs
hindcast_start = round_to_nearest_hour(datetime.now()) - timedelta(
days=2, hours=3
)
xds_clipped.data[xds_clipped.data > 1000] = np.nan
cum_mean_rain_ts = [np.nanmean(x) for x in xds_clipped.data]
mean_rain_ts = [cum_mean_rain_ts[0]]
for x in range(1, len(cum_mean_rain_ts)):
mean_rain_ts.append(cum_mean_rain_ts[x] - cum_mean_rain_ts[x - 1])
gfs_data[row["placeCode"]] = pd.DataFrame(
{"datetime": datetime_list_hindcast, "precipitation": mean_rain_ts}

forecast_start = round_to_nearest_hour(datetime.now()) - timedelta(hours=9)

forecast_start_hour = round(forecast_start.hour / 6) * 6

if forecast_start_hour < 12:
forecast_start_hour = "0" + str(forecast_start_hour)

if forecast_start_hour == 24:
forecast_start_hour = "00"
forecast_start = forecast_start + timedelta(days=1)

nc_dataset_hindcast = nc.Dataset(
"https://nomads.ncep.noaa.gov/dods/gfs_0p25/gfs{}/gfs_0p25_00z".format(
hindcast_start.strftime("%Y%m%d")
)
)

xds_clipped = xds_upsampled_forecast.rio.clip(
[row["geometry"]], ta_gdf_4326.crs
nc_dataset_forecast = nc.Dataset(
"https://nomads.ncep.noaa.gov/dods/gfs_0p25/gfs{}/gfs_0p25_{}z".format(
forecast_start.strftime("%Y%m%d"), forecast_start_hour
)
)
xds_clipped.data[xds_clipped.data > 1000] = np.nan
cum_mean_rain_ts = [np.nanmean(x) for x in xds_clipped.data]
mean_rain_ts = [cum_mean_rain_ts[0]]
for x in range(1, len(cum_mean_rain_ts)):
mean_rain_ts.append(cum_mean_rain_ts[x] - cum_mean_rain_ts[x - 1])
gfs_data[row["placeCode"]] = pd.DataFrame(
{"datetime": datetime_list_forecast, "precipitation": mean_rain_ts}

latvar_hind, lonvar_hind = extract_lat_lon(ds=nc_dataset_hindcast)
latvar_fc, lonvar_fc = extract_lat_lon(ds=nc_dataset_forecast)

ta_gdf_4326 = self.ta_gdf.copy()
ta_gdf_4326.to_crs(4326, inplace=True)

ta_gdf_4326["centr"] = ta_gdf_4326.centroid

time_hindcast = [
datetime(year=1, month=1, day=1) + timedelta(days=d)
for d in nc_dataset_hindcast["time"][:]
]

time_forecast = [
datetime(year=1, month=1, day=1) + timedelta(days=d)
for d in nc_dataset_forecast["time"][:]
]

for _, row in ta_gdf_4326.iterrows():
iy_min_hist, ix_min_hist = tunnel_fast(
latvar=latvar_hind,
lonvar=lonvar_hind,
lat0=row.centr.x,
lon0=row.centr.y,
)
iy_min_fc, ix_min_fc = tunnel_fast(
latvar=latvar_fc,
lonvar=lonvar_fc,
lat0=row.centr.x,
lon0=row.centr.y,
)

gfs_hindcast_and_forecast = []

for ix_min, iy_min, time in [
(ix_min_hist, iy_min_hist, time_hindcast),
(ix_min_fc, iy_min_fc, time_forecast),
]:
gfs_rain_ts_cumulative = [
float(x) if x != "--" else 0.0
for x in nc_dataset_hindcast["apcpsfc"][:, ix_min, iy_min]
]
gfs_rain_ts_incremental = [
val - gfs_rain_ts_cumulative[i - 1] if i > 0 else val
for i, val in enumerate(gfs_rain_ts_cumulative)
]
gfs_hindcast_and_forecast.append(
pd.DataFrame(
data={
"datetime": time,
"precipitation": gfs_rain_ts_incremental,
}
)
)

gfs_precipitation = pd.concat(gfs_hindcast_and_forecast, axis=0)
gfs_precipitation = gfs_precipitation.sort_values("datetime")
gfs_precipitation = gfs_precipitation.drop_duplicates(
subset="datetime", keep="last"
)
gfs_data[row["placeCode"]] = gfs_precipitation
else:
ta_gdf_4326 = self.ta_gdf.copy()
ta_gdf_4326.to_crs(4326, inplace=True)

xds = rioxarray.open_rasterio(expected_cosmo_forecast_path)
xds.rio.write_crs("epsg:4326", inplace=True)

upscale_factor = 8
new_width = xds.rio.width * upscale_factor
new_height = xds.rio.height * upscale_factor
xds_upsampled_forecast = xds.rio.reproject(
xds.rio.crs,
shape=(new_height, new_width),
resampling=Resampling.bilinear,
)
gfs_data[row["placeCode"]] = gfs_data[row["placeCode"]].drop_duplicates(
subset="datetime", keep="last"
datetime_list_forecast = [
datetime.strptime(x.isoformat(), "%Y-%m-%dT%H:%M:%S")
for x in xds_upsampled_forecast.time.data[:]
]

past = now - timedelta(days=5)
xds = rioxarray.open_rasterio(expected_cosmo_hindcast_path)
xds.rio.write_crs("epsg:4326", inplace=True)

xds_upsampled_hindcast = xds.rio.reproject(
xds.rio.crs,
shape=(new_height, new_width),
resampling=Resampling.bilinear,
)
datetime_list_hindcast = [
datetime.strptime(x.isoformat(), "%Y-%m-%dT%H:%M:%S")
for x in xds_upsampled_hindcast.time.data[:]
]

for _, row in ta_gdf_4326.iterrows():
xds_clipped = xds_upsampled_hindcast.rio.clip(
[row["geometry"]], ta_gdf_4326.crs
)
xds_clipped.data[xds_clipped.data > 1000] = np.nan
cum_mean_rain_ts = [np.nanmean(x) for x in xds_clipped.data]
mean_rain_ts = [cum_mean_rain_ts[0]]

for x in range(1, len(cum_mean_rain_ts)):
mean_rain_ts.append(cum_mean_rain_ts[x] - cum_mean_rain_ts[x - 1])

gfs_data[row["placeCode"]] = pd.DataFrame(
{"datetime": datetime_list_hindcast, "precipitation": mean_rain_ts}
)
xds_clipped = xds_upsampled_forecast.rio.clip(
[row["geometry"]], ta_gdf_4326.crs
)
xds_clipped.data[xds_clipped.data > 1000] = np.nan
cum_mean_rain_ts = [np.nanmean(x) for x in xds_clipped.data]
mean_rain_ts = [cum_mean_rain_ts[0]]

for x in range(1, len(cum_mean_rain_ts)):
mean_rain_ts.append(cum_mean_rain_ts[x] - cum_mean_rain_ts[x - 1])

gfs_data[row["placeCode"]] = pd.concat(
[
gfs_data[row["placeCode"]],
pd.DataFrame(
{
"datetime": datetime_list_forecast,
"precipitation": mean_rain_ts,
}
),
],
axis=0,
)
gfs_data[row["placeCode"]] = gfs_data[row["placeCode"]].drop_duplicates(
subset="datetime", keep="last"
)
gfs_data[row["placeCode"]] = gfs_data[row["placeCode"]].sort_values(
"datetime", ascending=True
)

# combined_rainfall = []
# for _, val in gfs_data.items():
# val = val.set_index("datetime")
# combined_rainfall.append(val)

# print(pd.concat(combined_rainfall, axis=1).head)
# pd.concat(combined_rainfall, axis=1).to_csv(
# r"d:\VSCode\IBF-flash-flood-pipeline\flash_flood_pipeline\rainfall_test_gfs_sampling.csv"
# )
return gfs_data
19 changes: 19 additions & 0 deletions flash_flood_pipeline/data_download/utils/extract_lat_lon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import numpy as np


def extract_lat_lon(ds):
"""
Create 2D arrays with latitudes and longitudes from netCDF dataset variables.
ds - netCDF dataset
Returns latvar, lonvar
"""
latvar = ds.variables["lat"][:]
lat_dim = len(latvar)

lonvar = ds.variables["lon"][:]
lon_dim = len(lonvar)

latvar = np.stack([latvar for _ in range(lon_dim)], axis=0)
lonvar = np.stack([lonvar for _ in range(lat_dim)], axis=1)
return latvar, lonvar
Loading

0 comments on commit 29022f2

Please sign in to comment.