Skip to content

Commit

Permalink
BlockNameInErrors: Limit try/except to the generate call
Browse files Browse the repository at this point in the history
instructlab#128

Signed-off-by: Gabe Goodhart <[email protected]>
  • Loading branch information
gabe-l-hart committed Jul 16, 2024
1 parent 1cfc8af commit 10955fe
Showing 1 changed file with 39 additions and 37 deletions.
76 changes: 39 additions & 37 deletions src/instructlab/sdg/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,45 +66,47 @@ def generate(self, dataset) -> Dataset:
Generate the dataset by running the pipeline steps.
dataset: the input dataset
"""
try:
for block_prop in self.chained_blocks:
block_name = block_prop["name"]
block_type = _lookup_block_type(block_prop["type"])
block_config = block_prop["config"]
drop_columns = block_prop.get("drop_columns", [])
drop_duplicates_cols = block_prop.get("drop_duplicates", False)
block = block_type(self.ctx, self, block_name, **block_config)

logger.info("Running block: %s", block_name)
logger.info(dataset)

for block_prop in self.chained_blocks:
# Parse and instantiate the block
block_name = block_prop["name"]
block_type = _lookup_block_type(block_prop["type"])
block_config = block_prop["config"]
drop_columns = block_prop.get("drop_columns", [])
drop_duplicates_cols = block_prop.get("drop_duplicates", False)
block = block_type(self.ctx, self, block_name, **block_config)
logger.info("Running block: %s", block_name)
logger.info(dataset)

# Execute the block and wrap errors with the block name/type
try:
dataset = block.generate(dataset)

# If at any point we end up with an empty data set, the pipeline has failed
if len(dataset) == 0:
raise EmptyDatasetError(
f"Pipeline stopped: Empty dataset after running block: {block_name}"
)

drop_columns_in_ds = [
e for e in drop_columns if e in dataset.column_names
]
if drop_columns:
dataset = dataset.remove_columns(drop_columns_in_ds)

if drop_duplicates_cols:
dataset = self._drop_duplicates(dataset, cols=drop_duplicates_cols)
except Exception as err:
block_exc_err = f"BLOCK ERROR [{block_type.__name__}/{block_name}]: {err}"

# Try to raise the same exception type. This can fail if the
# exception is a non-standard type that has a different init
# signature, so fall back to raising a RuntimeError in that case.
try:
wrapper_err = type(err)(block_exc_err)
except TypeError:
wrapper_err = RuntimeError(block_exc_err)
raise wrapper_err from err
except Exception as err:
block_exc_err = (
f"BLOCK ERROR [{block_type.__name__}/{block_name}]: {err}"
)

# Try to raise the same exception type. This can fail if the
# exception is a non-standard type that has a different init
# signature, so fall back to raising a RuntimeError in that case.
try:
wrapper_err = type(err)(block_exc_err)
except TypeError:
wrapper_err = RuntimeError(block_exc_err)
raise wrapper_err from err

# If at any point we end up with an empty data set, the pipeline has failed
if len(dataset) == 0:
raise EmptyDatasetError(
f"Pipeline stopped: Empty dataset after running block: {block_name}"
)

drop_columns_in_ds = [e for e in drop_columns if e in dataset.column_names]
if drop_columns:
dataset = dataset.remove_columns(drop_columns_in_ds)

if drop_duplicates_cols:
dataset = self._drop_duplicates(dataset, cols=drop_duplicates_cols)

return dataset

Expand Down

0 comments on commit 10955fe

Please sign in to comment.