diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 76830f1b..9d2bf6e9 100644 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -171,19 +171,18 @@ def execute( ) # TODO it would be nice to track the time it took sending tasks to the executor try: - count = 0 - for task_info in mp.execute( - executor=executor, - tasks=tasks, - dask_settings=dask_settings, + for count, task_info in enumerate( + mp.execute( + executor=executor, + tasks=tasks, + dask_settings=dask_settings, + ), + 1, ): - count += 1 - all_observers.notify( progress=Progress(total=len(tasks), current=count), task_info=task_info, ) - all_observers.notify(status=Status.done) return diff --git a/mapchete/executor/future.py b/mapchete/executor/future.py index 5440caa0..351c82c3 100644 --- a/mapchete/executor/future.py +++ b/mapchete/executor/future.py @@ -152,9 +152,16 @@ def exception(self, **kwargs) -> Union[Exception, None]: return self._exception - def cancelled(self) -> bool: # pragma: no cover + def cancelled(self) -> bool: """Sequential futures cannot be cancelled.""" - return self._cancelled + return self._cancelled or self.status == "cancelled" + + def failed(self) -> bool: + return ( + self.status == "error" + # concurrent.futures futures + or self.exception(timeout=mapchete_options.future_timeout) is not None + ) def _populate_from_future( self, timeout: int = mapchete_options.future_timeout, **kwargs @@ -184,21 +191,6 @@ def _set_result(self, result: Any) -> None: else: self._result = result - def failed_or_cancelled(self) -> bool: - """ - Return whether future is failed or cancelled. - - This is a workaround between the slightly different APIs of dask and concurrent.futures. - It also tries to avoid potentially expensive calls to the dask scheduler. - """ - if self.cancelled(): # pragma: no cover - return True - elif self.status: - return self.status in ["error", "cancelled"] - # concurrent.futures futures - else: - return self.exception(timeout=mapchete_options.future_timeout) is not None - def raise_if_failed(self) -> None: """ Checks whether future contains an exception and raises it as MapcheteTaskFailed. @@ -209,7 +201,16 @@ def raise_if_failed(self) -> None: # Let's directly re-raise these to be more transparent. keep_exceptions = (CancelledError, TimeoutError, CommClosedError) - if self.failed_or_cancelled(): + if self.cancelled(): # pragma: no cover + try: + raise self.exception(timeout=mapchete_options.future_timeout) + except Exception as exc: # pragma: no cover + raise CancelledError( + f"{self.name} got cancelled (status: {self.status}) but original " + f"exception could not be recovered due to {exc}" + ) + + elif self.failed(): try: exception = self.exception(timeout=mapchete_options.future_timeout) except Exception: # pragma: no cover @@ -218,7 +219,8 @@ def raise_if_failed(self) -> None: # sometimes, exceptions are simply empty or cannot be retreived if exception is None: # pragma: no cover raise MapcheteTaskFailed( - f"{self.name} failed (status: {self.status}), but exception could not be recovered" + f"{self.name} failed (status: {self.status}), but exception could " + "not be recovered" ) # keep some exceptions as they are diff --git a/mapchete/settings.py b/mapchete/settings.py index 6b7af2f2..16d272fa 100644 --- a/mapchete/settings.py +++ b/mapchete/settings.py @@ -3,7 +3,9 @@ """ from typing import Tuple, Type +from aiohttp.client_exceptions import ServerDisconnectedError from fiona.errors import FionaError +from fsspec.exceptions import FSTimeoutError from pydantic_settings import BaseSettings, SettingsConfigDict from rasterio.errors import RasterioError @@ -59,6 +61,8 @@ class IORetrySettings(BaseSettings): TimeoutError, RasterioError, FionaError, + FSTimeoutError, + ServerDisconnectedError, ) # read from environment model_config = SettingsConfigDict(env_prefix="MAPCHETE_IO_RETRY_")