Skip to content

Commit

Permalink
Combine base_data_to_db implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeParky committed Nov 24, 2024
1 parent 5b9ff6b commit 59114df
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 57 deletions.
2 changes: 1 addition & 1 deletion floodresilience/run_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
DEFAULT_MODULES_TO_PARAMETERS = {
retrieve_from_instructions: {
"log_level": LogLevel.INFO,
"instruction_json_path": pathlib.Path("floodresilience/static_boundary_instructions.json")
"instruction_json_path": pathlib.Path("floodresilience/static_boundary_instructions.json").as_posix()
},
process_hydro_dem: {
"log_level": LogLevel.INFO
Expand Down
31 changes: 4 additions & 27 deletions floodresilience/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import xarray

from src.digitaltwin import setup_environment, retrieve_from_instructions
from src.digitaltwin.utils import retry_function, setup_logging
from src.tasks import app, OnFailureStateTask, wkt_to_gdf # pylint: disable=cyclic-import
from src.digitaltwin.utils import setup_logging
from src.tasks import add_base_data_to_db, app, OnFailureStateTask, wkt_to_gdf # pylint: disable=cyclic-import
from floodresilience.dynamic_boundary_conditions.rainfall import main_rainfall
from floodresilience.dynamic_boundary_conditions.river import main_river
from floodresilience.dynamic_boundary_conditions.tide import main_tide_slr
Expand Down Expand Up @@ -58,7 +58,8 @@ def on_startup(sender: Consumer, **_kwargs: None) -> None: # pylint: disable=mi
# Gather area of interest from file.
aoi_wkt = gpd.read_file("selected_polygon.geojson").to_crs(4326).geometry[0].wkt
# Send a task to initialise this area of interest.
sender.app.send_task("floodresilience.tasks.add_base_data_to_db", args=[aoi_wkt], connection=conn)
base_data_parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions]
sender.app.send_task("src.tasks.add_base_data_to_db", args=[aoi_wkt, base_data_parameters], connection=conn)


def create_model_for_area(selected_polygon_wkt: str, scenario_options: dict) -> result.GroupResult:
Expand Down Expand Up @@ -87,30 +88,6 @@ def create_model_for_area(selected_polygon_wkt: str, scenario_options: dict) ->
)()


@app.task(base=OnFailureStateTask)
def add_base_data_to_db(selected_polygon_wkt: str) -> None:
"""
Task to ensure static base data for the given area is added to the database.
Parameters
----------
selected_polygon_wkt : str
The polygon defining the selected area to add base data for. Defined in WKT form.
"""
parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions]
selected_polygon = wkt_to_gdf(selected_polygon_wkt)
# Set up retry/timeout controls
retries = 3
delay_seconds = 30
# Try to initialise db, with a retry set up in case of database exceptions that happen when concurrent access occurs
retry_function(retrieve_from_instructions.main,
retries,
delay_seconds,
sqlalchemy.exc.IntegrityError,
selected_polygon,
**parameters)


@app.task(base=OnFailureStateTask)
def process_dem(selected_polygon_wkt: str) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion otakaro/run_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
DEFAULT_MODULES_TO_PARAMETERS = {
retrieve_from_instructions: {
"log_level": LogLevel.INFO,
"instruction_json_path": pathlib.Path("otakaro/otakaro_data_instructions.json")
"instruction_json_path": pathlib.Path("otakaro/otakaro_data_instructions.json").as_posix()
},
initialise_db_with_files: {
"log_level": LogLevel.INFO,
Expand Down
36 changes: 9 additions & 27 deletions otakaro/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import geopandas as gpd

from src.digitaltwin import retrieve_from_instructions
from src.digitaltwin.utils import retry_function, setup_logging
from src.tasks import app, OnFailureStateTask, wkt_to_gdf
from src.digitaltwin.utils import setup_logging
from src.tasks import add_base_data_to_db, app, OnFailureStateTask, wkt_to_gdf
from otakaro import initialise_db_with_files
from otakaro.environmental.water_quality import surface_water_sites
from otakaro.pollution_model import run_medusa_2
Expand All @@ -36,34 +36,11 @@ def on_startup(sender: Consumer, **_kwargs: None) -> None: # pylint: disable=mi
# Gather area of interest from file.
aoi_wkt = gpd.read_file("selected_polygon.geojson").to_crs(4326).geometry[0].wkt
# Send a task to initialise this area of interest.
sender.app.send_task("otakaro.tasks.add_base_data_to_db", args=[aoi_wkt], connection=conn)
base_data_parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions]
sender.app.send_task("src.tasks.add_base_data_to_db", args=[aoi_wkt, base_data_parameters], connection=conn)
sender.app.send_task("otakaro.tasks.add_files_data_to_db", connection=conn)


@app.task(base=OnFailureStateTask)
def add_base_data_to_db(selected_polygon_wkt: str) -> None:
"""
Task to ensure static base data for the given area is added to the database.
Parameters
----------
selected_polygon_wkt : str
The polygon defining the selected area to add base data for. Defined in WKT form.
"""
parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions]
selected_polygon = wkt_to_gdf(selected_polygon_wkt)
# Set up retry/timeout controls
retries = 3
delay_seconds = 30
# Try to initialise db, with a retry set up in case of database exceptions that happen when concurrent access occurs
retry_function(retrieve_from_instructions.main,
retries,
delay_seconds,
sqlalchemy.exc.IntegrityError,
selected_polygon,
**parameters)


@app.task(base=OnFailureStateTask)
def add_files_data_to_db() -> None:
"""Read roof surface polygons data then store them into database."""
Expand Down Expand Up @@ -100,6 +77,11 @@ def run_medusa_model(selected_polygon_wkt: str,
"""
# Convert wkt string into a GeoDataFrame
selected_polygon = wkt_to_gdf(selected_polygon_wkt)

# Initialise base data
base_data_parameters = DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions]
add_base_data_to_db.delay(selected_polygon, base_data_parameters).get()

# Read log level from default parameters
log_level = DEFAULT_MODULES_TO_PARAMETERS[run_medusa_2]["log_level"]
# Run Medusa model
Expand Down
29 changes: 28 additions & 1 deletion src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import billiard.einfo
import geopandas as gpd
import shapely
import sqlalchemy.exc
from celery import Celery, states

from src.config import EnvVariable
from src.digitaltwin.utils import setup_logging
from src.digitaltwin import retrieve_from_instructions
from src.digitaltwin.utils import retry_function, setup_logging

# Setup celery backend task management
message_broker_url = f"redis://{EnvVariable.MESSAGE_BROKER_HOST}:6379/0"
Expand Down Expand Up @@ -47,6 +49,31 @@ def on_failure(self,
})


@app.task(base=OnFailureStateTask)
def add_base_data_to_db(selected_polygon_wkt: str, base_data_parameters: Dict[str, str]) -> None:
"""
Task to ensure static base data for the given area is added to the database.
Parameters
----------
selected_polygon_wkt : str
The polygon defining the selected area to add base data for. Defined in WKT form.
base_data_parameters : Dict[str, str]
The parameters from DEFAULT_MODULES_TO_PARAMETERS[retrieve_from_instructions] for the particular module.
"""
selected_polygon = wkt_to_gdf(selected_polygon_wkt)
# Set up retry/timeout controls
retries = 3
delay_seconds = 30
# Try to initialise db, with a retry set up in case of database exceptions that happen when concurrent access occurs
retry_function(retrieve_from_instructions.main,
retries,
delay_seconds,
sqlalchemy.exc.IntegrityError,
selected_polygon,
**base_data_parameters)


def wkt_to_gdf(wkt: str) -> gpd.GeoDataFrame:
"""
Transform a WKT string polygon into a GeoDataFrame.
Expand Down

0 comments on commit 59114df

Please sign in to comment.