From 2803539126af8a1284c6ad274b5d9e366b09b13c Mon Sep 17 00:00:00 2001 From: Liam Gray Date: Thu, 19 Dec 2024 12:46:15 -0800 Subject: [PATCH] feat(pipeline): pass the same key object to task with multiple process arguments --- caput/pipeline.py | 47 ++++++++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/caput/pipeline.py b/caput/pipeline.py index 010b7ccd..19960613 100644 --- a/caput/pipeline.py +++ b/caput/pipeline.py @@ -1527,33 +1527,38 @@ def _pipeline_queue_product(self, key, product): # First, check requires keys if key in self._requires_keys: - ii = self._requires_keys.index(key) - logger.debug( - f"{self!s} stowing data product with key {key} for `requires`." - ) - if self._requires is None: - raise PipelineRuntimeError( - "Tried to set 'requires' data product, but `setup()` already run." - ) - if self._requires[ii] is not None: - raise PipelineRuntimeError( - "'requires' data product set more than once." + # It's possible that the same key could be passed multiple times + indices = (ii for ii, k in enumerate(self._requires_keys) if k == key) + + for ii in indices: + logger.debug( + f"{self!s} stowing data product with key {key} for `requires`." ) - self._requires[ii] = product + if self._requires is None: + raise PipelineRuntimeError( + "Tried to set 'requires' data product, but `setup()` already run." + ) + if self._requires[ii] is not None: + raise PipelineRuntimeError( + "'requires' data product set more than once." + ) + self._requires[ii] = product - result = True + result = True if key in self._in_keys: - ii = self._in_keys.index(key) - logger.debug(f"{self!s} stowing data product with key {key} for `in`.") - if self._in is None: - raise PipelineRuntimeError( - "Tried to queue 'in' data product, but `next()` already run." - ) + indices = (ii for ii, k in enumerate(self._in_keys) if k == key) + + for ii in indices: + logger.debug(f"{self!s} stowing data product with key {key} for `in`.") + if self._in is None: + raise PipelineRuntimeError( + "Tried to queue 'in' data product, but `next()` already run." + ) - self._in[ii].put(product) + self._in[ii].put(product) - result = True + result = True return result