Skip to content

Commit

Permalink
analyze tasks and only use preprocessing tasks intersecting with actu…
Browse files Browse the repository at this point in the history
…al process_aoi; add MapcheteConfig.overview_levels and MapcheteConfig.processing_levels; clean up dead code
  • Loading branch information
ungarj committed Dec 5, 2023
1 parent d78a532 commit 743fffe
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 68 deletions.
71 changes: 41 additions & 30 deletions mapchete/commands/_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from contextlib import AbstractContextManager
from multiprocessing import cpu_count
from typing import Any, List, Optional, Tuple, Type, Union
from typing import List, Optional, Tuple, Type, Union

from rasterio.crs import CRS
from shapely.geometry.base import BaseGeometry
Expand All @@ -14,7 +14,11 @@
from mapchete.enums import Concurrency, ProcessingMode, Status
from mapchete.errors import JobCancelledError
from mapchete.executor import Executor
from mapchete.processing.profilers import preconfigured_profilers, pretty_bytes
from mapchete.executor.types import Profiler
from mapchete.pretty import pretty_bytes, pretty_seconds
from mapchete.processing.profilers import preconfigured_profilers
from mapchete.processing.profilers.time import measure_time
from mapchete.processing.tasks import TaskInfo
from mapchete.types import MPathLike, Progress

logger = logging.getLogger(__name__)
Expand All @@ -35,7 +39,6 @@ def execute(
concurrency: Concurrency = Concurrency.processes,
workers: int = None,
multiprocessing_start_method: str = None,
dask_client: Optional[Any] = None,
dask_settings: DaskSettings = DaskSettings(),
executor_getter: AbstractContextManager = Executor,
profiling: bool = False,
Expand Down Expand Up @@ -83,7 +86,6 @@ def execute(
Reusable Client instance if required. Otherwise a new client will be created.
"""
try:
print_task_details = True
mode = "overwrite" if overwrite else mode
all_observers = Observers(observers)

Expand Down Expand Up @@ -132,21 +134,23 @@ def execute(
)

all_observers.notify(status=Status.initializing)

# determine tasks
tasks = mp.tasks(zoom=zoom, tile=tile, mode=mode)
tasks = mp.tasks(zoom=zoom, tile=tile)

if len(tasks) == 0:
all_observers.notify(status=Status.done)
all_observers.notify(
status=Status.done, message="no tasks to process"
)
return

all_observers.notify(
message=f"processing {len(tasks)} tasks on {workers} worker(s)"
)

all_observers.notify(message="waiting for executor ...")
with executor_getter(
concurrency="dask"
if dask_settings.scheduler or dask_settings.client
else concurrency,
concurrency=concurrency,
dask_scheduler=dask_settings.scheduler,
dask_client=dask_settings.client,
multiprocessing_start_method=multiprocessing_start_method,
Expand All @@ -155,6 +159,10 @@ def execute(
if profiling:
for profiler in preconfigured_profilers:
executor.add_profiler(profiler)
else:
executor.add_profiler(
Profiler(name="time", decorator=measure_time)
)
all_observers.notify(
status=Status.running,
progress=Progress(total=len(tasks)),
Expand All @@ -167,34 +175,19 @@ def execute(
for task_info in mp.execute(
executor=executor,
tasks=tasks,
profiling=profiling,
dask_settings=dask_settings,
):
count += 1
if print_task_details:
msg = f"task {task_info.id}: {task_info.process_msg}"
if task_info.profiling: # pragma: no cover
max_allocated = task_info.profiling[
"memory"
].max_allocated
head_requests = task_info.profiling[
"requests"
].head_count
get_requests = task_info.profiling[
"requests"
].get_count
requests = head_requests + get_requests
transferred = task_info.profiling[
"requests"
].get_bytes
msg += f" (max memory usage: {pretty_bytes(max_allocated)}"
msg += f", {requests} GET and HEAD requests"
msg += f", {pretty_bytes(transferred)} transferred)"
all_observers.notify(message=msg)

msg = f"task {task_info.id}: {task_info.process_msg}"

if task_info.profiling:
msg += profiling_info(task_info)

all_observers.notify(
progress=Progress(total=len(tasks), current=count),
task_result=task_info,
message=msg,
)

all_observers.notify(status=Status.done)
Expand All @@ -217,3 +210,21 @@ def execute(
except Exception as exception:
all_observers.notify(status=Status.failed, exception=exception)
raise


def profiling_info(task_info: TaskInfo) -> str:
profiling_info = []
if task_info.profiling.get("time"):
elapsed = task_info.profiling["time"].elapsed
profiling_info.append(f"time: {pretty_seconds(elapsed)}")
if task_info.profiling.get("memory"): # pragma: no cover
max_allocated = task_info.profiling["memory"].max_allocated
profiling_info.append(f"max memory usage: {pretty_bytes(max_allocated)}")
if task_info.profiling.get("memory"): # pragma: no cover
head_requests = task_info.profiling["requests"].head_count
get_requests = task_info.profiling["requests"].get_count
requests = head_requests + get_requests
transferred = task_info.profiling["requests"].get_bytes
profiling_info.append(f"{requests} GET and HEAD requests")
profiling_info.append(f"{pretty_bytes(transferred)} transferred")
return f" ({', '.join(profiling_info)})"
18 changes: 17 additions & 1 deletion mapchete/config/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ def input(self):
@cached_property
def baselevels(self):
"""
Optional baselevels configuration.
Base levels are zoom levels which are processed but not generated by other zoom levels.
If base levels are not defined, all zoom levels will be processed.
baselevels:
min: <zoom>
Expand Down Expand Up @@ -516,6 +518,20 @@ def baselevels(self):
),
)

@cached_property
def overview_levels(self) -> Union[ZoomLevels, None]:
if self.baselevels:
return self.zoom_levels.difference(self.baselevels["zooms"])
else:
return None

@cached_property
def processing_levels(self) -> ZoomLevels:
if self.overview_levels:
return self.zoom_levels.difference(self.overview_levels)
else:
return self.zoom_levels

def get_process_func_params(self, zoom):
"""Return function kwargs."""
return self.process.filter_parameters(
Expand Down
3 changes: 3 additions & 0 deletions mapchete/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class Executor:
"""

def __new__(cls, *args, concurrency=None, **kwargs) -> ExecutorBase:
if kwargs.get("dask_scheduler") or kwargs.get("dask_client"):
concurrency = "dask"

if concurrency == "dask":
return DaskExecutor(*args, **kwargs)

Expand Down
3 changes: 3 additions & 0 deletions mapchete/executor/concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def __init__(
)
super().__init__(*args, **kwargs)

def __str__(self) -> str:
return f"<ConcurrentFuturesExecutor max_workers={self.max_workers}, cls={self._executor_cls}>"

def as_completed(
self,
func,
Expand Down
6 changes: 6 additions & 0 deletions mapchete/executor/dask.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from functools import cached_property
from sys import getsizeof
from typing import Any, Callable, Iterable, Iterator, List, Optional, Union

from dask.delayed import Delayed, DelayedLeaf
Expand All @@ -10,6 +11,7 @@
from mapchete.executor.base import ExecutorBase
from mapchete.executor.future import MFuture
from mapchete.executor.types import Result
from mapchete.pretty import pretty_bytes
from mapchete.timer import Timer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -46,6 +48,9 @@ def __init__(
self._submitted = 0
super().__init__(*args, **kwargs)

def __str__(self) -> str:
return f"<DaskExecutor dashboard_link={self._executor.dashboard_link}>"

def map(
self,
func: Callable,
Expand Down Expand Up @@ -206,6 +211,7 @@ def compute_task_graph(
raise_errors: bool = False,
) -> Iterator[MFuture]:
# send to scheduler
logger.debug("task graph has %s", pretty_bytes(getsizeof(dask_collection)))

with Timer() as t:
futures = self._executor.compute(
Expand Down
3 changes: 3 additions & 0 deletions mapchete/executor/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ def __init__(self, *args, **kwargs):
logger.debug("init SequentialExecutor")
super().__init__(*args, **kwargs)

def __str__(self) -> str:
return "<SequentialExecutor>"

def as_completed(
self, func, iterable, fargs=None, fkwargs=None, item_skip_bool=False, **kwargs
) -> Iterator[MFuture]:
Expand Down
31 changes: 31 additions & 0 deletions mapchete/pretty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
def pretty_bytes(count: float, round_value: int = 2) -> str:
"""Return human readable bytes."""
for measurement in [
"bytes",
"KiB",
"MiB",
"GiB",
"TiB",
"PiB",
"EiB",
"ZiB",
"YiB",
]:
out = f"{round(count, round_value)} {measurement}"
if count < 1024.0:
break
count /= 1024.0

return out


def pretty_seconds(elapsed_seconds: float, round_value: int = 3):
"""Return human readable seconds."""
minutes, seconds = divmod(elapsed_seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
elif minutes:
return f"{int(minutes)}m {int(seconds)}s"
else:
return f"{round(seconds, round_value)}s"
37 changes: 29 additions & 8 deletions mapchete/processing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from typing import Any, Iterator, List, Optional, Tuple, Union

from cachetools import LRUCache
from shapely.geometry import Polygon, base
from shapely.ops import unary_union

from mapchete.config import DaskSettings, MapcheteConfig
from mapchete.enums import Concurrency, ProcessingMode
Expand Down Expand Up @@ -165,7 +167,6 @@ def tasks(
self,
zoom: Optional[ZoomLevelsLike] = None,
tile: Optional[TileLike] = None,
mode: Optional[ProcessingMode] = None,
profilers: Optional[List[Profiler]] = None,
) -> Tasks:
"""
Expand All @@ -178,12 +179,10 @@ def tasks(
tile=tile,
profilers=profilers,
),
mode=mode or self.config.mode,
)

def preprocessing_tasks(
self,
mode: Optional[ProcessingMode] = None,
profilers: Optional[List[Profiler]] = None,
) -> Tasks:
"""
Expand All @@ -194,7 +193,6 @@ def preprocessing_tasks(
self,
profilers=profilers,
),
mode=mode or self.config.mode,
)

def execute(
Expand All @@ -207,7 +205,6 @@ def execute(
workers: int = os.cpu_count(),
propagate_results: bool = False,
dask_settings: DaskSettings = DaskSettings(),
profiling: bool = False,
remember_preprocessing_results: bool = False,
) -> Iterator[TaskInfo]:
"""
Expand Down Expand Up @@ -669,26 +666,50 @@ def _task_batches(
profilers = profilers or []
if tile:
tile = process.config.process_pyramid.tile(*tile)

# first, materialize tile task batches
tile_task_batches = _tile_task_batches(
process=process,
zoom=zoom,
tile=tile,
profilers=profilers,
)
# TODO: create processing AOI (i.e. processing area without overviews)
yield from _preprocessing_task_batches(process=process, profilers=profilers)
# create processing AOI (i.e. processing area without overviews)
if process.config.preprocessing_tasks().values():
zoom_aois = []
for zoom in process.config.processing_levels:
for batch in tile_task_batches:
if batch.id == f"zoom-{zoom}":
zoom_aois.append(batch.geometry)
process_aoi = unary_union(zoom_aois) if zoom_aois else Polygon()
else:
process_aoi = None

yield from _preprocessing_task_batches(
process=process, profilers=profilers, process_aoi=process_aoi
)

yield from tile_task_batches


def _preprocessing_task_batches(
process: Mapchete,
profilers: Optional[List[Profiler]] = None,
process_aoi: Optional[base.BaseGeometry] = None,
) -> Iterator[TaskBatch]:
with Timer() as duration:
tasks = []
for task in process.config.preprocessing_tasks().values():
if process_aoi and task.has_geometry():
if task.geometry.intersects(process_aoi):
tasks.append(task)
else:
tasks.append(task)

# preprocessing tasks
preprocessing_batch = TaskBatch(
id="preprocessing_tasks",
tasks=process.config.preprocessing_tasks().values(),
tasks=tasks,
profilers=profilers,
)
if len(preprocessing_batch):
Expand Down
13 changes: 0 additions & 13 deletions mapchete/processing/profilers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,3 @@
Profiler(name="requests", decorator=measure_requests),
Profiler(name="memory", decorator=measure_memory),
]


def pretty_bytes(count: float, round_value: int = 2) -> str:
"""Return human readable bytes."""
count = float(count)

for measurement in ["bytes", "KiB", "MiB", "GiB", "TiB"]:
out = f"{round(count, round_value)} {measurement}"
if count < 1024.0:
break
count /= 1024.0

return out
Loading

0 comments on commit 743fffe

Please sign in to comment.