Skip to content

Commit

Permalink
refactor task batch generation
Browse files Browse the repository at this point in the history
  • Loading branch information
ungarj committed Dec 4, 2023
1 parent 9e39718 commit d78a532
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 59 deletions.
2 changes: 0 additions & 2 deletions mapchete/executor/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def from_future(
profiling = future.profiling if hasattr(future, "profiling") else {}

if lazy:
# TODO: this is not covered by tests, why?
# keep around Future for later and don't call Future.result()
return MFuture(
result=result,
Expand Down Expand Up @@ -162,7 +161,6 @@ def cancelled(self) -> bool: # pragma: no cover
def _populate_from_future(self, timeout: int = FUTURE_TIMEOUT, **kwargs):
"""Fill internal cache with future.result() if future was provided."""
# only check if there is a cached future but no result nor exception
# TODO: this is because lazy loading, right?
if (
self._future is not None
and self._result is None
Expand Down
116 changes: 59 additions & 57 deletions mapchete/processing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ def tasks(
zoom: Optional[ZoomLevelsLike] = None,
tile: Optional[TileLike] = None,
mode: Optional[ProcessingMode] = None,
skip_output_check: bool = False,
propagate_results: bool = True,
profilers: Optional[List[Profiler]] = None,
) -> Tasks:
"""
Expand All @@ -178,8 +176,6 @@ def tasks(
self,
zoom=zoom,
tile=tile,
skip_output_check=skip_output_check,
propagate_results=propagate_results,
profilers=profilers,
),
mode=mode or self.config.mode,
Expand Down Expand Up @@ -667,28 +663,27 @@ def _task_batches(
process: Mapchete,
zoom: Optional[ZoomLevelsLike] = None,
tile: Optional[TileLike] = None,
skip_output_check: bool = False,
propagate_results: bool = True,
profilers: Optional[List[Profiler]] = None,
) -> Iterator[Union[TaskBatch, TileTaskBatch]]:
"""Create task batches for each processing stage."""
profilers = profilers or []
if tile:
tile = process.config.process_pyramid.tile(*tile)
yield from _preprocessing_task_batches(process=process, profilers=profilers)
yield from _tile_task_batches(
tile_task_batches = _tile_task_batches(
process=process,
zoom=zoom,
tile=tile,
skip_output_check=skip_output_check,
profilers=profilers,
)
# TODO: create processing AOI (i.e. processing area without overviews)
yield from _preprocessing_task_batches(process=process, profilers=profilers)
yield from tile_task_batches


def _preprocessing_task_batches(
process: Mapchete,
profilers: Optional[List[Profiler]] = None,
) -> Iterator[Union[TaskBatch, TileTaskBatch]]:
) -> Iterator[TaskBatch]:
with Timer() as duration:
# preprocessing tasks
preprocessing_batch = TaskBatch(
Expand All @@ -705,89 +700,96 @@ def _tile_task_batches(
process: Mapchete,
zoom: Optional[ZoomLevelsLike] = None,
tile: Optional[TileLike] = None,
skip_output_check: bool = False,
propagate_results: bool = True,
profilers: Optional[List[Profiler]] = None,
) -> Iterator[Union[TaskBatch, TileTaskBatch]]:
) -> List[TileTaskBatch]:
with Timer() as duration:
batches = []

if tile:
zoom_levels = ZoomLevels.from_inp(tile.zoom)
skip_output_check = True
tiles = {tile.zoom: [(tile, False)]}
# nothing to process here
if (
process.config.mode == "continue"
and process.config.output_reader.tiles_exist(tile)
):
pass

# process this tile
else:
batches = [
TileTaskBatch(
id=f"zoom-{tile.zoom}",
tasks=[
TileTask(
tile=tile,
config=process.config,
)
],
profilers=profilers,
)
]

else:
zoom_levels = (
process.config.zoom_levels
if zoom is None
else ZoomLevels.from_inp(zoom)
)
tiles = {}
# here we store the parents of tiles about to be processed so we can update overviews
# also in "continue" mode in case there were updates at the baselevel
overview_parents = set()
for i, zoom in enumerate(zoom_levels.descending()):
tiles[zoom] = []
tile_tasks = []

for tile, skip, _ in _filter_skipable(
for tile in _filter_skipable(
process=process,
tiles_batches=process.get_process_tiles(
zoom,
batch_by=batch_sort_property(
process.config.output_reader.tile_path_schema
),
),
target_set=(
overview_tiles=(
overview_parents if process.config.baselevels and i else None
),
skip_output_check=skip_output_check,
):
tiles[zoom].append((tile, skip))
# we don't need the current tile anymore
overview_parents.discard(tile)

# add tile task to batch
tile_tasks.append(TileTask(tile=tile, config=process.config))

# in case of building overviews from baselevels, remember which parent
# tile needs to be updated later on
if (
not skip_output_check
and process.config.baselevels
and tile.zoom > 0
):
# add parent tile
if process.config.baselevels and tile.zoom > 0:
# add parent tile to be reprocessed
overview_parents.add(tile.get_parent())
# we don't need the current tile anymore
overview_parents.discard(tile)

# tile tasks
for zoom in zoom_levels.descending():
yield TileTaskBatch(
id=f"zoom-{zoom}",
tasks=(
TileTask(
tile=tile,
config=process.config,
)
for tile, skip in tiles[zoom]
if not skip
or not (
process.config.mode == "continue"
and process.config.output_reader.tiles_exist(tile)
batches.append(
TileTaskBatch(
id=f"zoom-{zoom}",
tasks=tile_tasks,
profilers=profilers,
)
),
profilers=profilers,
)
)
logger.debug("tile task batches generated in %s", duration)
return batches


def _filter_skipable(
process: Mapchete,
tiles_batches: Iterator[Iterator[BufferedTile]],
target_set: Optional[set] = None,
skip_output_check: bool = False,
) -> Iterator[Tuple[BufferedTile, bool, Optional[str]]]:
if skip_output_check: # pragma: no cover
overview_tiles: Optional[set] = None,
) -> Iterator[Tuple[BufferedTile, bool]]:
# we don't filter here
if process.config.mode == ProcessingMode.OVERWRITE:
for batch in tiles_batches:
for tile in batch:
yield (tile, False, None)
yield from batch

# only yield tiles which don't yet exist or need to be reprocessed
else:
target_set = target_set or set()
overview_tiles = overview_tiles or set()
for tile, skip in process.skip_tiles(tiles_batches=tiles_batches):
if skip and tile not in target_set:
yield (tile, True, "output already exists")
if skip and tile not in overview_tiles:
pass
else:
yield (tile, False, None)
yield tile

0 comments on commit d78a532

Please sign in to comment.