From 4f1344328ad3974281814fdf5997fa564fdfc30c Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Mon, 15 Jan 2024 14:11:04 +0100 Subject: [PATCH 1/5] keep CancelledError from dask instead of wrapping it within a MapcheteTaskFailed --- mapchete/commands/_execute.py | 15 ++++++------- mapchete/executor/future.py | 40 ++++++++++++++++++----------------- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 76830f1b..9ce82735 100644 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -171,21 +171,18 @@ def execute( ) # TODO it would be nice to track the time it took sending tasks to the executor try: - count = 0 - for task_info in mp.execute( - executor=executor, - tasks=tasks, - dask_settings=dask_settings, + for count, task_info in enumerate( + mp.execute( + executor=executor, + tasks=tasks, + dask_settings=dask_settings, + ) ): - count += 1 - all_observers.notify( progress=Progress(total=len(tasks), current=count), task_info=task_info, ) - all_observers.notify(status=Status.done) - return except cancel_on_exception: # special exception indicating job was cancelled from the outside diff --git a/mapchete/executor/future.py b/mapchete/executor/future.py index 5440caa0..8891fbf1 100644 --- a/mapchete/executor/future.py +++ b/mapchete/executor/future.py @@ -152,9 +152,16 @@ def exception(self, **kwargs) -> Union[Exception, None]: return self._exception - def cancelled(self) -> bool: # pragma: no cover + def cancelled(self) -> bool: """Sequential futures cannot be cancelled.""" - return self._cancelled + return self._cancelled or self.status == "cancelled" + + def failed(self) -> bool: + return ( + self.status == "error" + # concurrent.futures futures + or self.exception(timeout=mapchete_options.future_timeout) is not None + ) def _populate_from_future( self, timeout: int = mapchete_options.future_timeout, **kwargs @@ -184,21 +191,6 @@ def _set_result(self, result: Any) -> None: else: self._result = result - def failed_or_cancelled(self) -> bool: - """ - Return whether future is failed or cancelled. - - This is a workaround between the slightly different APIs of dask and concurrent.futures. - It also tries to avoid potentially expensive calls to the dask scheduler. - """ - if self.cancelled(): # pragma: no cover - return True - elif self.status: - return self.status in ["error", "cancelled"] - # concurrent.futures futures - else: - return self.exception(timeout=mapchete_options.future_timeout) is not None - def raise_if_failed(self) -> None: """ Checks whether future contains an exception and raises it as MapcheteTaskFailed. @@ -209,7 +201,16 @@ def raise_if_failed(self) -> None: # Let's directly re-raise these to be more transparent. keep_exceptions = (CancelledError, TimeoutError, CommClosedError) - if self.failed_or_cancelled(): + if self.cancelled(): + try: + raise self.exception(timeout=mapchete_options.future_timeout) + except Exception as exc: # pragma: no cover + raise CancelledError( + f"{self.name} got cancelled (status: {self.status}) but original " + f"exception could not be recovered due to {exc}" + ) + + elif self.failed(): try: exception = self.exception(timeout=mapchete_options.future_timeout) except Exception: # pragma: no cover @@ -218,7 +219,8 @@ def raise_if_failed(self) -> None: # sometimes, exceptions are simply empty or cannot be retreived if exception is None: # pragma: no cover raise MapcheteTaskFailed( - f"{self.name} failed (status: {self.status}), but exception could not be recovered" + f"{self.name} failed (status: {self.status}), but exception could " + "not be recovered" ) # keep some exceptions as they are From b85f5c9e3e8d069b7f2f0ffdbe28042a1ab13db6 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Mon, 15 Jan 2024 14:11:51 +0100 Subject: [PATCH 2/5] add ServerDisconnectedError and FSTimeoutError to retryable exceptions --- mapchete/settings.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mapchete/settings.py b/mapchete/settings.py index 6b7af2f2..16d272fa 100644 --- a/mapchete/settings.py +++ b/mapchete/settings.py @@ -3,7 +3,9 @@ """ from typing import Tuple, Type +from aiohttp.client_exceptions import ServerDisconnectedError from fiona.errors import FionaError +from fsspec.exceptions import FSTimeoutError from pydantic_settings import BaseSettings, SettingsConfigDict from rasterio.errors import RasterioError @@ -59,6 +61,8 @@ class IORetrySettings(BaseSettings): TimeoutError, RasterioError, FionaError, + FSTimeoutError, + ServerDisconnectedError, ) # read from environment model_config = SettingsConfigDict(env_prefix="MAPCHETE_IO_RETRY_") From 8a98a2fdee9637b2964beb23e14afdd3dcba6161 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Mon, 15 Jan 2024 15:18:10 +0100 Subject: [PATCH 3/5] add return again --- mapchete/commands/_execute.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 9ce82735..948b1150 100644 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -183,6 +183,7 @@ def execute( task_info=task_info, ) all_observers.notify(status=Status.done) + return except cancel_on_exception: # special exception indicating job was cancelled from the outside From c86035261b5bad9797498a4fdc789ae47aa4c1c5 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Mon, 15 Jan 2024 15:30:42 +0100 Subject: [PATCH 4/5] fix task count --- mapchete/commands/_execute.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 948b1150..9d2bf6e9 100644 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -176,7 +176,8 @@ def execute( executor=executor, tasks=tasks, dask_settings=dask_settings, - ) + ), + 1, ): all_observers.notify( progress=Progress(total=len(tasks), current=count), From a9aa9e570f7a101708dd73a44c218a4fb78a3ffb Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Mon, 15 Jan 2024 15:53:50 +0100 Subject: [PATCH 5/5] exclude from test coverage --- mapchete/executor/future.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mapchete/executor/future.py b/mapchete/executor/future.py index 8891fbf1..351c82c3 100644 --- a/mapchete/executor/future.py +++ b/mapchete/executor/future.py @@ -201,7 +201,7 @@ def raise_if_failed(self) -> None: # Let's directly re-raise these to be more transparent. keep_exceptions = (CancelledError, TimeoutError, CommClosedError) - if self.cancelled(): + if self.cancelled(): # pragma: no cover try: raise self.exception(timeout=mapchete_options.future_timeout) except Exception as exc: # pragma: no cover