diff --git a/.github/workflows/install.sh b/.github/workflows/install.sh index 9500fab66..5d225a3b3 100755 --- a/.github/workflows/install.sh +++ b/.github/workflows/install.sh @@ -24,7 +24,8 @@ if [ "${flavor}" = "testing" ]; then --extras=radar \ --extras=radarplus \ --extras=restapi \ - --extras=sql + --extras=sql \ + --extras=bufr elif [ "${flavor}" = "docs" ]; then poetry install --verbose --no-interaction --with=docs --extras=interpolation diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 96c22dc24..4debd18f2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -83,6 +83,12 @@ jobs: brew install eccodes export WD_ECCODES_DIR=$(brew --prefix eccodes) + - name: Install eccodes (Mac only) + run: | + if [ "$RUNNER_OS" == "macOS" ]; then + brew install eccodes && export WD_ECCODES_DIR=$(brew --prefix eccodes) + fi + - name: Install project run: .github/workflows/install.sh testing diff --git a/tests/provider/dwd/radar/test_api_historic.py b/tests/provider/dwd/radar/test_api_historic.py index 161df803c..8155b39a5 100644 --- a/tests/provider/dwd/radar/test_api_historic.py +++ b/tests/provider/dwd/radar/test_api_historic.py @@ -8,6 +8,7 @@ import pytest from dirty_equals import IsDatetime, IsDict, IsInt, IsList, IsNumeric, IsStr +from wetterdienst.eccodes import ensure_eccodes from wetterdienst.provider.dwd.radar import ( DwdRadarDataFormat, DwdRadarDataSubset, @@ -516,6 +517,26 @@ def test_radar_request_site_historic_pe_bufr(default_settings): decoder = pybufrkit.decoder.Decoder() decoder.process(payload, info_only=True) + if ensure_eccodes(): + df = results[0].df + + assert not df.empty + + print(df.dropna().query("value != 0")) + + assert df.columns.tolist() == [ + "station_id", + "latitude", + "longitude", + "height", + "projectionType", + "pictureType", + "date", + "echotops", + ] + + assert not df.dropna().empty + @pytest.mark.xfail(reason="month_year not matching start_date") @pytest.mark.remote @@ -569,6 +590,13 @@ def test_radar_request_site_historic_pe_timerange(fmt, default_settings): ) assert re.match(bytes(header, encoding="ascii"), payload[:115]) + first = results[0] + + if fmt == DwdRadarDataFormat.BUFR: + assert not first.df.dropna().empty + + assert first.df.columns == [""] + @pytest.mark.remote def test_radar_request_site_historic_px250_bufr_yesterday(default_settings): @@ -637,6 +665,10 @@ def test_radar_request_site_historic_px250_bufr_timerange(default_settings): assert len(results) == 12 + first = results[0] + + assert not first.df.dropna().empty + @pytest.mark.remote def test_radar_request_site_historic_sweep_vol_v_hdf5_yesterday(default_settings): diff --git a/tests/test_settings.py b/tests/test_settings.py index 354b13324..849daf003 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -32,6 +32,7 @@ def test_default_settings(caplog): default_settings = Settings.default() assert not default_settings.cache_disable assert re.match(WD_CACHE_DIR_PATTERN, default_settings.cache_dir) + assert default_settings.eccodes_dir is None assert default_settings.fsspec_client_kwargs == {} assert default_settings.ts_humanize assert default_settings.ts_shape == "long" @@ -44,6 +45,7 @@ def test_default_settings(caplog): "precipitation_height": 20.0, } assert default_settings.ts_interpolation_use_nearby_station_distance == 1 + assert not default_settings.read_bufr log_message = caplog.messages[0] assert re.match(WD_CACHE_ENABLED_PATTERN, log_message) diff --git a/wetterdienst/eccodes.py b/wetterdienst/eccodes.py new file mode 100644 index 000000000..dc8fb7a00 --- /dev/null +++ b/wetterdienst/eccodes.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2018-2022, earthobservations developers. +# Distributed under the MIT License. See LICENSE for more info. +def ensure_eccodes() -> bool: + """Function to ensure that eccodes is loaded""" + try: + import eccodes + + eccodes.eccodes.codes_get_api_version() + except (ModuleNotFoundError, RuntimeError): + return False + + return True diff --git a/wetterdienst/provider/dwd/radar/api.py b/wetterdienst/provider/dwd/radar/api.py index 33258299d..e6027ce11 100644 --- a/wetterdienst/provider/dwd/radar/api.py +++ b/wetterdienst/provider/dwd/radar/api.py @@ -55,6 +55,15 @@ log = logging.getLogger(__name__) +BUFR_PARAMETER_MAPPING = { + DwdRadarParameter.PE_ECHO_TOP: ["echoTops"], + DwdRadarParameter.PG_REFLECTIVITY: ["horizontalReflectivity"], + DwdRadarParameter.LMAX_VOLUME_SCAN: ["horizontalReflectivity"], + DwdRadarParameter.PX250_REFLECTIVITY: ["horizontalReflectivity"], +} + +ECCODES_FOUND = ensure_eccodes() + @dataclass class RadarResult: @@ -64,6 +73,8 @@ class RadarResult: """ data: BytesIO + # placeholder for bufr files, which are read into pandas.DataFrame if eccodes available + df: pl.DataFrame = field(default_factory=pl.DataFrame) timestamp: dt.datetime = None url: str = None filename: str = None @@ -415,6 +426,69 @@ def query(self) -> Iterator[RadarResult]: verify_hdf5(result.data) except Exception as e: # pragma: no cover log.exception(f"Unable to read HDF5 file. {e}") + + if self.format == DwdRadarDataFormat.BUFR: + if ECCODES_FOUND and self.settings.read_bufr: + buffer = result.data + + # TODO: pdbufr currently doesn't seem to allow reading directly from BytesIO + tf = tempfile.NamedTemporaryFile("w+b") + tf.write(buffer.read()) + tf.seek(0) + + df = pdbufr.read_bufr( + tf.name, + columns="data", + flat=True + ) + + value_vars = [] + parameters = BUFR_PARAMETER_MAPPING[self.parameter] + for par in parameters: + value_vars.extend([col for col in df.columns if par in col]) + value_vars = set(value_vars) + id_vars = df.columns.difference(value_vars) + id_vars = [iv for iv in id_vars if iv.startswith("#1#")] + + df = df.melt(id_vars=id_vars,var_name="parameter",value_vars=value_vars, value_name="value") + df.columns = [col[3:] if col.startswith("#1#") else col for col in df.columns] + + df = df.rename( + columns={ + "stationNumber": Columns.STATION_ID.value, + "latitude": Columns.LATITUDE.value, + "longitude": Columns.LONGITUDE.value, + "heightOfStation": Columns.HEIGHT.value, + } + ) + + + # df[Columns.STATION_ID.value] = df[Columns.STATION_ID.value].astype(int).astype(str) + + date_columns = ["year", "month", "day", "hour", "minute"] + dates = df.loc[:, date_columns].apply( + lambda x: datetime( + year=x.year, month=x.month, day=x.day, hour=x.hour, minute=x.minute + ), + axis=1, + ) + df.insert(len(df.columns) - 1, Columns.DATE.value, dates) + df = df.drop(columns=date_columns) + + def split_index_parameter(text: str): + split_index = text.index("#", 1) + if split_index == -1: + return text, None + index = text[1:split_index] + parameter = text[split_index+1:] + return parameter, float(index) + + df[["parameter", "index"]] = df.pop("parameter").map(split_index_parameter).tolist() + + df = df.sort_values(["parameter", "index"]) + + result.df = df + yield result @staticmethod