From d78a5328698fa5a1b2ab9b1c7373279c5c5e315c Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Mon, 4 Dec 2023 11:29:30 +0100 Subject: [PATCH] refactor task batch generation --- mapchete/executor/future.py | 2 - mapchete/processing/base.py | 116 ++++++++++++++++++------------------ 2 files changed, 59 insertions(+), 59 deletions(-) diff --git a/mapchete/executor/future.py b/mapchete/executor/future.py index cac3d15b..8a4e93b6 100644 --- a/mapchete/executor/future.py +++ b/mapchete/executor/future.py @@ -90,7 +90,6 @@ def from_future( profiling = future.profiling if hasattr(future, "profiling") else {} if lazy: - # TODO: this is not covered by tests, why? # keep around Future for later and don't call Future.result() return MFuture( result=result, @@ -162,7 +161,6 @@ def cancelled(self) -> bool: # pragma: no cover def _populate_from_future(self, timeout: int = FUTURE_TIMEOUT, **kwargs): """Fill internal cache with future.result() if future was provided.""" # only check if there is a cached future but no result nor exception - # TODO: this is because lazy loading, right? if ( self._future is not None and self._result is None diff --git a/mapchete/processing/base.py b/mapchete/processing/base.py index 483f68b0..a710be7f 100644 --- a/mapchete/processing/base.py +++ b/mapchete/processing/base.py @@ -166,8 +166,6 @@ def tasks( zoom: Optional[ZoomLevelsLike] = None, tile: Optional[TileLike] = None, mode: Optional[ProcessingMode] = None, - skip_output_check: bool = False, - propagate_results: bool = True, profilers: Optional[List[Profiler]] = None, ) -> Tasks: """ @@ -178,8 +176,6 @@ def tasks( self, zoom=zoom, tile=tile, - skip_output_check=skip_output_check, - propagate_results=propagate_results, profilers=profilers, ), mode=mode or self.config.mode, @@ -667,28 +663,27 @@ def _task_batches( process: Mapchete, zoom: Optional[ZoomLevelsLike] = None, tile: Optional[TileLike] = None, - skip_output_check: bool = False, - propagate_results: bool = True, profilers: Optional[List[Profiler]] = None, ) -> Iterator[Union[TaskBatch, TileTaskBatch]]: """Create task batches for each processing stage.""" profilers = profilers or [] if tile: tile = process.config.process_pyramid.tile(*tile) - yield from _preprocessing_task_batches(process=process, profilers=profilers) - yield from _tile_task_batches( + tile_task_batches = _tile_task_batches( process=process, zoom=zoom, tile=tile, - skip_output_check=skip_output_check, profilers=profilers, ) + # TODO: create processing AOI (i.e. processing area without overviews) + yield from _preprocessing_task_batches(process=process, profilers=profilers) + yield from tile_task_batches def _preprocessing_task_batches( process: Mapchete, profilers: Optional[List[Profiler]] = None, -) -> Iterator[Union[TaskBatch, TileTaskBatch]]: +) -> Iterator[TaskBatch]: with Timer() as duration: # preprocessing tasks preprocessing_batch = TaskBatch( @@ -705,29 +700,47 @@ def _tile_task_batches( process: Mapchete, zoom: Optional[ZoomLevelsLike] = None, tile: Optional[TileLike] = None, - skip_output_check: bool = False, - propagate_results: bool = True, profilers: Optional[List[Profiler]] = None, -) -> Iterator[Union[TaskBatch, TileTaskBatch]]: +) -> List[TileTaskBatch]: with Timer() as duration: + batches = [] + if tile: - zoom_levels = ZoomLevels.from_inp(tile.zoom) - skip_output_check = True - tiles = {tile.zoom: [(tile, False)]} + # nothing to process here + if ( + process.config.mode == "continue" + and process.config.output_reader.tiles_exist(tile) + ): + pass + + # process this tile + else: + batches = [ + TileTaskBatch( + id=f"zoom-{tile.zoom}", + tasks=[ + TileTask( + tile=tile, + config=process.config, + ) + ], + profilers=profilers, + ) + ] + else: zoom_levels = ( process.config.zoom_levels if zoom is None else ZoomLevels.from_inp(zoom) ) - tiles = {} # here we store the parents of tiles about to be processed so we can update overviews # also in "continue" mode in case there were updates at the baselevel overview_parents = set() for i, zoom in enumerate(zoom_levels.descending()): - tiles[zoom] = [] + tile_tasks = [] - for tile, skip, _ in _filter_skipable( + for tile in _filter_skipable( process=process, tiles_batches=process.get_process_tiles( zoom, @@ -735,59 +748,48 @@ def _tile_task_batches( process.config.output_reader.tile_path_schema ), ), - target_set=( + overview_tiles=( overview_parents if process.config.baselevels and i else None ), - skip_output_check=skip_output_check, ): - tiles[zoom].append((tile, skip)) + # we don't need the current tile anymore + overview_parents.discard(tile) + + # add tile task to batch + tile_tasks.append(TileTask(tile=tile, config=process.config)) + # in case of building overviews from baselevels, remember which parent # tile needs to be updated later on - if ( - not skip_output_check - and process.config.baselevels - and tile.zoom > 0 - ): - # add parent tile + if process.config.baselevels and tile.zoom > 0: + # add parent tile to be reprocessed overview_parents.add(tile.get_parent()) - # we don't need the current tile anymore - overview_parents.discard(tile) - # tile tasks - for zoom in zoom_levels.descending(): - yield TileTaskBatch( - id=f"zoom-{zoom}", - tasks=( - TileTask( - tile=tile, - config=process.config, - ) - for tile, skip in tiles[zoom] - if not skip - or not ( - process.config.mode == "continue" - and process.config.output_reader.tiles_exist(tile) + batches.append( + TileTaskBatch( + id=f"zoom-{zoom}", + tasks=tile_tasks, + profilers=profilers, ) - ), - profilers=profilers, - ) + ) logger.debug("tile task batches generated in %s", duration) + return batches def _filter_skipable( process: Mapchete, tiles_batches: Iterator[Iterator[BufferedTile]], - target_set: Optional[set] = None, - skip_output_check: bool = False, -) -> Iterator[Tuple[BufferedTile, bool, Optional[str]]]: - if skip_output_check: # pragma: no cover + overview_tiles: Optional[set] = None, +) -> Iterator[Tuple[BufferedTile, bool]]: + # we don't filter here + if process.config.mode == ProcessingMode.OVERWRITE: for batch in tiles_batches: - for tile in batch: - yield (tile, False, None) + yield from batch + + # only yield tiles which don't yet exist or need to be reprocessed else: - target_set = target_set or set() + overview_tiles = overview_tiles or set() for tile, skip in process.skip_tiles(tiles_batches=tiles_batches): - if skip and tile not in target_set: - yield (tile, True, "output already exists") + if skip and tile not in overview_tiles: + pass else: - yield (tile, False, None) + yield tile