Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep CancelledError #615

Merged
merged 7 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions mapchete/commands/_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,18 @@ def execute(
)
# TODO it would be nice to track the time it took sending tasks to the executor
try:
count = 0
for task_info in mp.execute(
executor=executor,
tasks=tasks,
dask_settings=dask_settings,
for count, task_info in enumerate(
mp.execute(
executor=executor,
tasks=tasks,
dask_settings=dask_settings,
),
1,
):
count += 1

all_observers.notify(
progress=Progress(total=len(tasks), current=count),
task_info=task_info,
)

all_observers.notify(status=Status.done)
return

Expand Down
40 changes: 21 additions & 19 deletions mapchete/executor/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,16 @@ def exception(self, **kwargs) -> Union[Exception, None]:

return self._exception

def cancelled(self) -> bool: # pragma: no cover
def cancelled(self) -> bool:
"""Sequential futures cannot be cancelled."""
return self._cancelled
return self._cancelled or self.status == "cancelled"

def failed(self) -> bool:
return (
self.status == "error"
# concurrent.futures futures
or self.exception(timeout=mapchete_options.future_timeout) is not None
)

def _populate_from_future(
self, timeout: int = mapchete_options.future_timeout, **kwargs
Expand Down Expand Up @@ -184,21 +191,6 @@ def _set_result(self, result: Any) -> None:
else:
self._result = result

def failed_or_cancelled(self) -> bool:
"""
Return whether future is failed or cancelled.

This is a workaround between the slightly different APIs of dask and concurrent.futures.
It also tries to avoid potentially expensive calls to the dask scheduler.
"""
if self.cancelled(): # pragma: no cover
return True
elif self.status:
return self.status in ["error", "cancelled"]
# concurrent.futures futures
else:
return self.exception(timeout=mapchete_options.future_timeout) is not None

def raise_if_failed(self) -> None:
"""
Checks whether future contains an exception and raises it as MapcheteTaskFailed.
Expand All @@ -209,7 +201,16 @@ def raise_if_failed(self) -> None:
# Let's directly re-raise these to be more transparent.
keep_exceptions = (CancelledError, TimeoutError, CommClosedError)

if self.failed_or_cancelled():
if self.cancelled(): # pragma: no cover
try:
raise self.exception(timeout=mapchete_options.future_timeout)
except Exception as exc: # pragma: no cover
raise CancelledError(
f"{self.name} got cancelled (status: {self.status}) but original "
f"exception could not be recovered due to {exc}"
)

elif self.failed():
try:
exception = self.exception(timeout=mapchete_options.future_timeout)
except Exception: # pragma: no cover
Expand All @@ -218,7 +219,8 @@ def raise_if_failed(self) -> None:
# sometimes, exceptions are simply empty or cannot be retreived
if exception is None: # pragma: no cover
raise MapcheteTaskFailed(
f"{self.name} failed (status: {self.status}), but exception could not be recovered"
f"{self.name} failed (status: {self.status}), but exception could "
"not be recovered"
)

# keep some exceptions as they are
Expand Down
4 changes: 4 additions & 0 deletions mapchete/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
"""
from typing import Tuple, Type

from aiohttp.client_exceptions import ServerDisconnectedError
from fiona.errors import FionaError
from fsspec.exceptions import FSTimeoutError
from pydantic_settings import BaseSettings, SettingsConfigDict
from rasterio.errors import RasterioError

Expand Down Expand Up @@ -59,6 +61,8 @@ class IORetrySettings(BaseSettings):
TimeoutError,
RasterioError,
FionaError,
FSTimeoutError,
ServerDisconnectedError,
)
# read from environment
model_config = SettingsConfigDict(env_prefix="MAPCHETE_IO_RETRY_")
Expand Down
Loading