From 743fffe766e66e4e2c5f1c2e4d516cdbd1f92ac8 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Tue, 5 Dec 2023 09:31:16 +0100 Subject: [PATCH] analyze tasks and only use preprocessing tasks intersecting with actual process_aoi; add MapcheteConfig.overview_levels and MapcheteConfig.processing_levels; clean up dead code --- mapchete/commands/_execute.py | 71 +++++++++++++---------- mapchete/config/base.py | 18 +++++- mapchete/executor/__init__.py | 3 + mapchete/executor/concurrent_futures.py | 3 + mapchete/executor/dask.py | 6 ++ mapchete/executor/sequential.py | 3 + mapchete/pretty.py | 31 ++++++++++ mapchete/processing/base.py | 37 +++++++++--- mapchete/processing/profilers/__init__.py | 13 ----- mapchete/processing/tasks.py | 32 ++++++++-- mapchete/timer.py | 11 +--- mapchete/types.py | 7 +++ test/test_commands.py | 6 +- 13 files changed, 173 insertions(+), 68 deletions(-) create mode 100644 mapchete/pretty.py diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 572f0ddc..323271f2 100644 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -2,7 +2,7 @@ import logging from contextlib import AbstractContextManager from multiprocessing import cpu_count -from typing import Any, List, Optional, Tuple, Type, Union +from typing import List, Optional, Tuple, Type, Union from rasterio.crs import CRS from shapely.geometry.base import BaseGeometry @@ -14,7 +14,11 @@ from mapchete.enums import Concurrency, ProcessingMode, Status from mapchete.errors import JobCancelledError from mapchete.executor import Executor -from mapchete.processing.profilers import preconfigured_profilers, pretty_bytes +from mapchete.executor.types import Profiler +from mapchete.pretty import pretty_bytes, pretty_seconds +from mapchete.processing.profilers import preconfigured_profilers +from mapchete.processing.profilers.time import measure_time +from mapchete.processing.tasks import TaskInfo from mapchete.types import MPathLike, Progress logger = logging.getLogger(__name__) @@ -35,7 +39,6 @@ def execute( concurrency: Concurrency = Concurrency.processes, workers: int = None, multiprocessing_start_method: str = None, - dask_client: Optional[Any] = None, dask_settings: DaskSettings = DaskSettings(), executor_getter: AbstractContextManager = Executor, profiling: bool = False, @@ -83,7 +86,6 @@ def execute( Reusable Client instance if required. Otherwise a new client will be created. """ try: - print_task_details = True mode = "overwrite" if overwrite else mode all_observers = Observers(observers) @@ -132,21 +134,23 @@ def execute( ) all_observers.notify(status=Status.initializing) + # determine tasks - tasks = mp.tasks(zoom=zoom, tile=tile, mode=mode) + tasks = mp.tasks(zoom=zoom, tile=tile) if len(tasks) == 0: - all_observers.notify(status=Status.done) + all_observers.notify( + status=Status.done, message="no tasks to process" + ) return + all_observers.notify( message=f"processing {len(tasks)} tasks on {workers} worker(s)" ) all_observers.notify(message="waiting for executor ...") with executor_getter( - concurrency="dask" - if dask_settings.scheduler or dask_settings.client - else concurrency, + concurrency=concurrency, dask_scheduler=dask_settings.scheduler, dask_client=dask_settings.client, multiprocessing_start_method=multiprocessing_start_method, @@ -155,6 +159,10 @@ def execute( if profiling: for profiler in preconfigured_profilers: executor.add_profiler(profiler) + else: + executor.add_profiler( + Profiler(name="time", decorator=measure_time) + ) all_observers.notify( status=Status.running, progress=Progress(total=len(tasks)), @@ -167,34 +175,19 @@ def execute( for task_info in mp.execute( executor=executor, tasks=tasks, - profiling=profiling, dask_settings=dask_settings, ): count += 1 - if print_task_details: - msg = f"task {task_info.id}: {task_info.process_msg}" - if task_info.profiling: # pragma: no cover - max_allocated = task_info.profiling[ - "memory" - ].max_allocated - head_requests = task_info.profiling[ - "requests" - ].head_count - get_requests = task_info.profiling[ - "requests" - ].get_count - requests = head_requests + get_requests - transferred = task_info.profiling[ - "requests" - ].get_bytes - msg += f" (max memory usage: {pretty_bytes(max_allocated)}" - msg += f", {requests} GET and HEAD requests" - msg += f", {pretty_bytes(transferred)} transferred)" - all_observers.notify(message=msg) + + msg = f"task {task_info.id}: {task_info.process_msg}" + + if task_info.profiling: + msg += profiling_info(task_info) all_observers.notify( progress=Progress(total=len(tasks), current=count), task_result=task_info, + message=msg, ) all_observers.notify(status=Status.done) @@ -217,3 +210,21 @@ def execute( except Exception as exception: all_observers.notify(status=Status.failed, exception=exception) raise + + +def profiling_info(task_info: TaskInfo) -> str: + profiling_info = [] + if task_info.profiling.get("time"): + elapsed = task_info.profiling["time"].elapsed + profiling_info.append(f"time: {pretty_seconds(elapsed)}") + if task_info.profiling.get("memory"): # pragma: no cover + max_allocated = task_info.profiling["memory"].max_allocated + profiling_info.append(f"max memory usage: {pretty_bytes(max_allocated)}") + if task_info.profiling.get("memory"): # pragma: no cover + head_requests = task_info.profiling["requests"].head_count + get_requests = task_info.profiling["requests"].get_count + requests = head_requests + get_requests + transferred = task_info.profiling["requests"].get_bytes + profiling_info.append(f"{requests} GET and HEAD requests") + profiling_info.append(f"{pretty_bytes(transferred)} transferred") + return f" ({', '.join(profiling_info)})" diff --git a/mapchete/config/base.py b/mapchete/config/base.py index 9fed322e..5883309e 100644 --- a/mapchete/config/base.py +++ b/mapchete/config/base.py @@ -474,7 +474,9 @@ def input(self): @cached_property def baselevels(self): """ - Optional baselevels configuration. + Base levels are zoom levels which are processed but not generated by other zoom levels. + + If base levels are not defined, all zoom levels will be processed. baselevels: min: @@ -516,6 +518,20 @@ def baselevels(self): ), ) + @cached_property + def overview_levels(self) -> Union[ZoomLevels, None]: + if self.baselevels: + return self.zoom_levels.difference(self.baselevels["zooms"]) + else: + return None + + @cached_property + def processing_levels(self) -> ZoomLevels: + if self.overview_levels: + return self.zoom_levels.difference(self.overview_levels) + else: + return self.zoom_levels + def get_process_func_params(self, zoom): """Return function kwargs.""" return self.process.filter_parameters( diff --git a/mapchete/executor/__init__.py b/mapchete/executor/__init__.py index 3e1f9b33..b1abdf97 100644 --- a/mapchete/executor/__init__.py +++ b/mapchete/executor/__init__.py @@ -16,6 +16,9 @@ class Executor: """ def __new__(cls, *args, concurrency=None, **kwargs) -> ExecutorBase: + if kwargs.get("dask_scheduler") or kwargs.get("dask_client"): + concurrency = "dask" + if concurrency == "dask": return DaskExecutor(*args, **kwargs) diff --git a/mapchete/executor/concurrent_futures.py b/mapchete/executor/concurrent_futures.py index ba53390b..e83e6da9 100644 --- a/mapchete/executor/concurrent_futures.py +++ b/mapchete/executor/concurrent_futures.py @@ -73,6 +73,9 @@ def __init__( ) super().__init__(*args, **kwargs) + def __str__(self) -> str: + return f"" + def as_completed( self, func, diff --git a/mapchete/executor/dask.py b/mapchete/executor/dask.py index cfc53950..9fa5c0e5 100644 --- a/mapchete/executor/dask.py +++ b/mapchete/executor/dask.py @@ -1,6 +1,7 @@ import logging import os from functools import cached_property +from sys import getsizeof from typing import Any, Callable, Iterable, Iterator, List, Optional, Union from dask.delayed import Delayed, DelayedLeaf @@ -10,6 +11,7 @@ from mapchete.executor.base import ExecutorBase from mapchete.executor.future import MFuture from mapchete.executor.types import Result +from mapchete.pretty import pretty_bytes from mapchete.timer import Timer logger = logging.getLogger(__name__) @@ -46,6 +48,9 @@ def __init__( self._submitted = 0 super().__init__(*args, **kwargs) + def __str__(self) -> str: + return f"" + def map( self, func: Callable, @@ -206,6 +211,7 @@ def compute_task_graph( raise_errors: bool = False, ) -> Iterator[MFuture]: # send to scheduler + logger.debug("task graph has %s", pretty_bytes(getsizeof(dask_collection))) with Timer() as t: futures = self._executor.compute( diff --git a/mapchete/executor/sequential.py b/mapchete/executor/sequential.py index 281a6827..d5ee08dc 100644 --- a/mapchete/executor/sequential.py +++ b/mapchete/executor/sequential.py @@ -15,6 +15,9 @@ def __init__(self, *args, **kwargs): logger.debug("init SequentialExecutor") super().__init__(*args, **kwargs) + def __str__(self) -> str: + return "" + def as_completed( self, func, iterable, fargs=None, fkwargs=None, item_skip_bool=False, **kwargs ) -> Iterator[MFuture]: diff --git a/mapchete/pretty.py b/mapchete/pretty.py new file mode 100644 index 00000000..a381299d --- /dev/null +++ b/mapchete/pretty.py @@ -0,0 +1,31 @@ +def pretty_bytes(count: float, round_value: int = 2) -> str: + """Return human readable bytes.""" + for measurement in [ + "bytes", + "KiB", + "MiB", + "GiB", + "TiB", + "PiB", + "EiB", + "ZiB", + "YiB", + ]: + out = f"{round(count, round_value)} {measurement}" + if count < 1024.0: + break + count /= 1024.0 + + return out + + +def pretty_seconds(elapsed_seconds: float, round_value: int = 3): + """Return human readable seconds.""" + minutes, seconds = divmod(elapsed_seconds, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{int(hours)}h {int(minutes)}m {int(seconds)}s" + elif minutes: + return f"{int(minutes)}m {int(seconds)}s" + else: + return f"{round(seconds, round_value)}s" diff --git a/mapchete/processing/base.py b/mapchete/processing/base.py index a710be7f..2271641d 100644 --- a/mapchete/processing/base.py +++ b/mapchete/processing/base.py @@ -8,6 +8,8 @@ from typing import Any, Iterator, List, Optional, Tuple, Union from cachetools import LRUCache +from shapely.geometry import Polygon, base +from shapely.ops import unary_union from mapchete.config import DaskSettings, MapcheteConfig from mapchete.enums import Concurrency, ProcessingMode @@ -165,7 +167,6 @@ def tasks( self, zoom: Optional[ZoomLevelsLike] = None, tile: Optional[TileLike] = None, - mode: Optional[ProcessingMode] = None, profilers: Optional[List[Profiler]] = None, ) -> Tasks: """ @@ -178,12 +179,10 @@ def tasks( tile=tile, profilers=profilers, ), - mode=mode or self.config.mode, ) def preprocessing_tasks( self, - mode: Optional[ProcessingMode] = None, profilers: Optional[List[Profiler]] = None, ) -> Tasks: """ @@ -194,7 +193,6 @@ def preprocessing_tasks( self, profilers=profilers, ), - mode=mode or self.config.mode, ) def execute( @@ -207,7 +205,6 @@ def execute( workers: int = os.cpu_count(), propagate_results: bool = False, dask_settings: DaskSettings = DaskSettings(), - profiling: bool = False, remember_preprocessing_results: bool = False, ) -> Iterator[TaskInfo]: """ @@ -669,26 +666,50 @@ def _task_batches( profilers = profilers or [] if tile: tile = process.config.process_pyramid.tile(*tile) + + # first, materialize tile task batches tile_task_batches = _tile_task_batches( process=process, zoom=zoom, tile=tile, profilers=profilers, ) - # TODO: create processing AOI (i.e. processing area without overviews) - yield from _preprocessing_task_batches(process=process, profilers=profilers) + # create processing AOI (i.e. processing area without overviews) + if process.config.preprocessing_tasks().values(): + zoom_aois = [] + for zoom in process.config.processing_levels: + for batch in tile_task_batches: + if batch.id == f"zoom-{zoom}": + zoom_aois.append(batch.geometry) + process_aoi = unary_union(zoom_aois) if zoom_aois else Polygon() + else: + process_aoi = None + + yield from _preprocessing_task_batches( + process=process, profilers=profilers, process_aoi=process_aoi + ) + yield from tile_task_batches def _preprocessing_task_batches( process: Mapchete, profilers: Optional[List[Profiler]] = None, + process_aoi: Optional[base.BaseGeometry] = None, ) -> Iterator[TaskBatch]: with Timer() as duration: + tasks = [] + for task in process.config.preprocessing_tasks().values(): + if process_aoi and task.has_geometry(): + if task.geometry.intersects(process_aoi): + tasks.append(task) + else: + tasks.append(task) + # preprocessing tasks preprocessing_batch = TaskBatch( id="preprocessing_tasks", - tasks=process.config.preprocessing_tasks().values(), + tasks=tasks, profilers=profilers, ) if len(preprocessing_batch): diff --git a/mapchete/processing/profilers/__init__.py b/mapchete/processing/profilers/__init__.py index c5f0cec7..a9bc68e4 100644 --- a/mapchete/processing/profilers/__init__.py +++ b/mapchete/processing/profilers/__init__.py @@ -10,16 +10,3 @@ Profiler(name="requests", decorator=measure_requests), Profiler(name="memory", decorator=measure_memory), ] - - -def pretty_bytes(count: float, round_value: int = 2) -> str: - """Return human readable bytes.""" - count = float(count) - - for measurement in ["bytes", "KiB", "MiB", "GiB", "TiB"]: - out = f"{round(count, round_value)} {measurement}" - if count < 1024.0: - break - count /= 1024.0 - - return out diff --git a/mapchete/processing/tasks.py b/mapchete/processing/tasks.py index 7972d8b5..7e4e0905 100644 --- a/mapchete/processing/tasks.py +++ b/mapchete/processing/tasks.py @@ -1,22 +1,23 @@ import logging from abc import ABC from enum import Enum +from functools import cached_property from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union from uuid import uuid4 import numpy.ma as ma from dask.delayed import Delayed, DelayedLeaf, delayed -from shapely.geometry import base, box, mapping, shape +from shapely.geometry import Polygon, base, box, mapping, shape +from shapely.ops import unary_union from mapchete.config import MapcheteConfig from mapchete.config.process_func import ProcessFunc -from mapchete.enums import ProcessingMode from mapchete.errors import ( MapcheteNodataTile, MapcheteProcessOutputError, NoTaskGeometry, ) -from mapchete.executor.base import Profiler, func_partial +from mapchete.executor.base import Profiler from mapchete.io import raster from mapchete.io._geometry_operations import to_shape from mapchete.io.vector import IndexedFeatures @@ -139,6 +140,17 @@ def __init__( self.fkwargs = fkwargs or {} self.profilers = profilers or [] + @cached_property + def geometry(self) -> base.BaseGeometry: + if self.tasks: + return unary_union([shape(task) for task in self.tasks]) + else: + return Polygon() + + @property + def __geo_interface__(self) -> mapping: + return mapping(self.geometry) + def __repr__(self): # pragma: no cover return f"TaskBatch(id={self.id}, bounds={self.bounds}, tasks={len(self.tasks)})" @@ -438,6 +450,13 @@ def __init__( self.fkwargs = fkwargs or {} self.profilers = profilers or [] + @cached_property + def geometry(self) -> base.BaseGeometry: + if self.tasks: + return unary_union([task.bbox for task in self.tasks]) + else: + return Polygon() + def __repr__(self): # pragma: no cover return f"TileTaskBatch(id={self.id}, bounds={self.bounds}, tasks={len(self.tasks)})" @@ -507,7 +526,6 @@ class Tasks: def __init__( self, task_batches_generator: Iterator[Union[TaskBatch, TileTaskBatch]], - mode: ProcessingMode = ProcessingMode.CONTINUE, ): self._task_batches_generator = task_batches_generator @@ -577,12 +595,15 @@ def to_dask_collection( previous_batch = None for batch in batches: logger.debug("converting batch %s", batch) + if batch.id == "preprocessing_tasks": task_func = preprocessing_task_wrapper or batch.func else: task_func = tile_task_wrapper or batch.func + if previous_batch: logger.debug("previous batch had %s tasks", len(previous_batch)) + for task in batch.values(): if previous_batch: dependencies = { @@ -596,6 +617,7 @@ def to_dask_collection( ) else: dependencies = {} + tasks[task] = delayed( task_func, pure=True, @@ -607,5 +629,7 @@ def to_dask_collection( **batch.fkwargs, dask_key_name=f"{task.result_key_name}", ) + previous_batch = batch + return list(tasks.values()) diff --git a/mapchete/timer.py b/mapchete/timer.py index 57d257e4..51630897 100644 --- a/mapchete/timer.py +++ b/mapchete/timer.py @@ -1,5 +1,7 @@ import time +from mapchete.pretty import pretty_seconds + class Timer: """ @@ -60,14 +62,7 @@ def __repr__(self): ) def __str__(self): - minutes, seconds = divmod(self.elapsed, 60) - hours, minutes = divmod(minutes, 60) - if hours: - return "%sh %sm %ss" % (int(hours), int(minutes), int(seconds)) - elif minutes: - return "%sm %ss" % (int(minutes), int(seconds)) - else: - return "%ss" % round(seconds, self._str_round) + return pretty_seconds(self.elapsed, self._str_round) @property def elapsed(self): diff --git a/mapchete/types.py b/mapchete/types.py index 803194dc..c3167ba8 100644 --- a/mapchete/types.py +++ b/mapchete/types.py @@ -292,6 +292,13 @@ def intersection(self, other: ZoomLevelsLike) -> ZoomLevels: raise ValueError("ZoomLevels do not intersect") return ZoomLevels(min(intersection), max(intersection)) + def difference(self, other: ZoomLevelsLike) -> ZoomLevels: + other = other if isinstance(other, ZoomLevels) else ZoomLevels(other) + difference = set(self).difference(set(other)) + if len(difference) == 0: + raise ValueError("ZoomLevels do not differ") + return ZoomLevels(min(difference), max(difference)) + def intersects(self, other: ZoomLevelsLike) -> bool: other = other if isinstance(other, ZoomLevels) else ZoomLevels(other) try: diff --git a/test/test_commands.py b/test/test_commands.py index 41312722..39525f4e 100644 --- a/test/test_commands.py +++ b/test/test_commands.py @@ -264,7 +264,7 @@ def test_execute_point(mp_tmpdir, example_mapchete, dummy2_tif): ], ) def test_execute_preprocessing_tasks( - concurrency, preprocess_cache_raster_vector, dask_executor, process_graph + concurrency, preprocess_cache_raster_vector, process_graph ): execute_kwargs = dict(concurrency=concurrency) if concurrency == "dask": @@ -287,9 +287,7 @@ def test_execute_preprocessing_tasks( (None, False), ], ) -def test_execute_profiling( - cleantopo_br_metatiling_1, concurrency, process_graph, dask_executor -): +def test_execute_profiling(cleantopo_br_metatiling_1, concurrency, process_graph): execute_kwargs = dict(concurrency=concurrency) if concurrency == "dask": execute_kwargs.update(dask_settings=DaskSettings(process_graph=process_graph))