From 59114df3dcf26e2c67c72d49bc93b2942afbabcb Mon Sep 17 00:00:00 2001 From: Luke Parkinson Date: Sun, 24 Nov 2024 16:07:31 +1300 Subject: [PATCH] Combine base_data_to_db implementations --- floodresilience/run_all.py | 2 +- floodresilience/tasks.py | 31 ++++--------------------------- otakaro/run_all.py | 2 +- otakaro/tasks.py | 36 +++++++++--------------------------- src/tasks.py | 29 ++++++++++++++++++++++++++++- 5 files changed, 43 insertions(+), 57 deletions(-) diff --git a/floodresilience/run_all.py b/floodresilience/run_all.py index 4a4bfcd66..d55c3042b 100644 --- a/floodresilience/run_all.py +++ b/floodresilience/run_all.py @@ -15,7 +15,7 @@ DEFAULT_MODULES_TO_PARAMETERS = { retrieve_from_instructions: { "log_level": LogLevel.INFO, - "instruction_json_path": pathlib.Path("floodresilience/static_boundary_instructions.json") + "instruction_json_path": pathlib.Path("floodresilience/static_boundary_instructions.json").as_posix() }, process_hydro_dem: { "log_level": LogLevel.INFO diff --git a/floodresilience/tasks.py b/floodresilience/tasks.py index bb4c76b39..6097cf71f 100644 --- a/floodresilience/tasks.py +++ b/floodresilience/tasks.py @@ -14,8 +14,8 @@ import xarray from src.digitaltwin import setup_environment, retrieve_from_instructions -from src.digitaltwin.utils import retry_function, setup_logging -from src.tasks import app, OnFailureStateTask, wkt_to_gdf # pylint: disable=cyclic-import +from src.digitaltwin.utils import setup_logging +from src.tasks import add_base_data_to_db, app, OnFailureStateTask, wkt_to_gdf # pylint: disable=cyclic-import from floodresilience.dynamic_boundary_conditions.rainfall import main_rainfall from floodresilience.dynamic_boundary_conditions.river import main_river from floodresilience.dynamic_boundary_conditions.tide import main_tide_slr @@ -58,7 +58,8 @@ def on_startup(sender: Consumer, **_kwargs: None) -> None: # pylint: disable=mi # Gather area of interest from file. aoi_wkt = gpd.read_file("selected_polygon.geojson").to_crs(4326).geometry[0].wkt # Send a task to initialise this area of interest. - sender.app.send_task("floodresilience.tasks.add_base_data_to_db", args=[aoi_wkt], connection=conn) + base_data_parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions] + sender.app.send_task("src.tasks.add_base_data_to_db", args=[aoi_wkt, base_data_parameters], connection=conn) def create_model_for_area(selected_polygon_wkt: str, scenario_options: dict) -> result.GroupResult: @@ -87,30 +88,6 @@ def create_model_for_area(selected_polygon_wkt: str, scenario_options: dict) -> )() -@app.task(base=OnFailureStateTask) -def add_base_data_to_db(selected_polygon_wkt: str) -> None: - """ - Task to ensure static base data for the given area is added to the database. - - Parameters - ---------- - selected_polygon_wkt : str - The polygon defining the selected area to add base data for. Defined in WKT form. - """ - parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions] - selected_polygon = wkt_to_gdf(selected_polygon_wkt) - # Set up retry/timeout controls - retries = 3 - delay_seconds = 30 - # Try to initialise db, with a retry set up in case of database exceptions that happen when concurrent access occurs - retry_function(retrieve_from_instructions.main, - retries, - delay_seconds, - sqlalchemy.exc.IntegrityError, - selected_polygon, - **parameters) - - @app.task(base=OnFailureStateTask) def process_dem(selected_polygon_wkt: str) -> None: """ diff --git a/otakaro/run_all.py b/otakaro/run_all.py index 1ac4cb05e..e7f62ae8e 100644 --- a/otakaro/run_all.py +++ b/otakaro/run_all.py @@ -12,7 +12,7 @@ DEFAULT_MODULES_TO_PARAMETERS = { retrieve_from_instructions: { "log_level": LogLevel.INFO, - "instruction_json_path": pathlib.Path("otakaro/otakaro_data_instructions.json") + "instruction_json_path": pathlib.Path("otakaro/otakaro_data_instructions.json").as_posix() }, initialise_db_with_files: { "log_level": LogLevel.INFO, diff --git a/otakaro/tasks.py b/otakaro/tasks.py index 486e8ed24..ac15afe9b 100644 --- a/otakaro/tasks.py +++ b/otakaro/tasks.py @@ -11,8 +11,8 @@ import geopandas as gpd from src.digitaltwin import retrieve_from_instructions -from src.digitaltwin.utils import retry_function, setup_logging -from src.tasks import app, OnFailureStateTask, wkt_to_gdf +from src.digitaltwin.utils import setup_logging +from src.tasks import add_base_data_to_db, app, OnFailureStateTask, wkt_to_gdf from otakaro import initialise_db_with_files from otakaro.environmental.water_quality import surface_water_sites from otakaro.pollution_model import run_medusa_2 @@ -36,34 +36,11 @@ def on_startup(sender: Consumer, **_kwargs: None) -> None: # pylint: disable=mi # Gather area of interest from file. aoi_wkt = gpd.read_file("selected_polygon.geojson").to_crs(4326).geometry[0].wkt # Send a task to initialise this area of interest. - sender.app.send_task("otakaro.tasks.add_base_data_to_db", args=[aoi_wkt], connection=conn) + base_data_parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions] + sender.app.send_task("src.tasks.add_base_data_to_db", args=[aoi_wkt, base_data_parameters], connection=conn) sender.app.send_task("otakaro.tasks.add_files_data_to_db", connection=conn) -@app.task(base=OnFailureStateTask) -def add_base_data_to_db(selected_polygon_wkt: str) -> None: - """ - Task to ensure static base data for the given area is added to the database. - - Parameters - ---------- - selected_polygon_wkt : str - The polygon defining the selected area to add base data for. Defined in WKT form. - """ - parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions] - selected_polygon = wkt_to_gdf(selected_polygon_wkt) - # Set up retry/timeout controls - retries = 3 - delay_seconds = 30 - # Try to initialise db, with a retry set up in case of database exceptions that happen when concurrent access occurs - retry_function(retrieve_from_instructions.main, - retries, - delay_seconds, - sqlalchemy.exc.IntegrityError, - selected_polygon, - **parameters) - - @app.task(base=OnFailureStateTask) def add_files_data_to_db() -> None: """Read roof surface polygons data then store them into database.""" @@ -100,6 +77,11 @@ def run_medusa_model(selected_polygon_wkt: str, """ # Convert wkt string into a GeoDataFrame selected_polygon = wkt_to_gdf(selected_polygon_wkt) + + # Initialise base data + base_data_parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions] + add_base_data_to_db.delay(selected_polygon, base_data_parameters).get() + # Read log level from default parameters log_level = DEFAULT_MODULES_TO_PARAMETERS[run_medusa_2]["log_level"] # Run Medusa model diff --git a/src/tasks.py b/src/tasks.py index ac8021cb7..69413de94 100644 --- a/src/tasks.py +++ b/src/tasks.py @@ -9,10 +9,12 @@ import billiard.einfo import geopandas as gpd import shapely +import sqlalchemy.exc from celery import Celery, states from src.config import EnvVariable -from src.digitaltwin.utils import setup_logging +from src.digitaltwin import retrieve_from_instructions +from src.digitaltwin.utils import retry_function, setup_logging # Setup celery backend task management message_broker_url = f"redis://{EnvVariable.MESSAGE_BROKER_HOST}:6379/0" @@ -47,6 +49,31 @@ def on_failure(self, }) +@app.task(base=OnFailureStateTask) +def add_base_data_to_db(selected_polygon_wkt: str, base_data_parameters: Dict[str, str]) -> None: + """ + Task to ensure static base data for the given area is added to the database. + + Parameters + ---------- + selected_polygon_wkt : str + The polygon defining the selected area to add base data for. Defined in WKT form. + base_data_parameters : Dict[str, str] + The parameters from DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions] for the particular module. + """ + selected_polygon = wkt_to_gdf(selected_polygon_wkt) + # Set up retry/timeout controls + retries = 3 + delay_seconds = 30 + # Try to initialise db, with a retry set up in case of database exceptions that happen when concurrent access occurs + retry_function(retrieve_from_instructions.main, + retries, + delay_seconds, + sqlalchemy.exc.IntegrityError, + selected_polygon, + **base_data_parameters) + + def wkt_to_gdf(wkt: str) -> gpd.GeoDataFrame: """ Transform a WKT string polygon into a GeoDataFrame.