From 0435abb37b8b6499e71ed92dcaf1534082bd220a Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 5 Jul 2024 13:22:05 +0100 Subject: [PATCH 01/10] Replace LLMBlock model_prompt param with model_family In preparation for custom pipeline configuration files, do not require model_prompt as an LLMBlock param - it can have built-in knowledge of the correct prompt to use per model_family. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/default_flows.py | 47 +++++++++------------------- src/instructlab/sdg/generate_data.py | 3 +- src/instructlab/sdg/llmblock.py | 26 ++++++++++++--- 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/src/instructlab/sdg/default_flows.py b/src/instructlab/sdg/default_flows.py index 818c4972..dd3e781e 100644 --- a/src/instructlab/sdg/default_flows.py +++ b/src/instructlab/sdg/default_flows.py @@ -10,23 +10,6 @@ from .llmblock import LLMBlock from .utilblocks import CombineColumnsBlock -MODEL_FAMILY_MIXTRAL = "mixtral" -MODEL_FAMILY_MERLINITE = "merlinite" - -_MODEL_PROMPT_MIXTRAL = " [INST] {prompt} [/INST]" -_MODEL_PROMPT_MERLINITE = "'<|system|>\nYou are an AI language model developed by IBM Research. You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior.\n<|user|>\n{prompt}\n<|assistant|>\n'" - -_MODEL_PROMPTS = { - MODEL_FAMILY_MIXTRAL: _MODEL_PROMPT_MIXTRAL, - MODEL_FAMILY_MERLINITE: _MODEL_PROMPT_MERLINITE, -} - - -def _get_model_prompt(model_family): - if model_family not in _MODEL_PROMPTS: - raise ValueError(f"Unknown model family: {model_family}") - return _MODEL_PROMPTS[model_family] - class Flow(ABC): def __init__( @@ -53,7 +36,7 @@ def get_flow(self) -> list: "config_path": "", # must be set by subclass "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["output"], }, "gen_kwargs": { @@ -110,7 +93,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["mmlubench_question", "mmlubench_answer"], }, "gen_kwargs": { @@ -135,7 +118,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["question", "response"], "parser_kwargs": { "parser_name": "custom", @@ -157,7 +140,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["explanation", "judgment"], }, "gen_kwargs": { @@ -186,7 +169,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["feedback", "score"], }, "gen_kwargs": { @@ -216,7 +199,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["explanation", "rating"], }, "gen_kwargs": { @@ -253,7 +236,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["question"], "batch_kwargs": { "num_samples": self.num_instructions_to_generate, @@ -271,7 +254,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, @@ -299,7 +282,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["response"], }, }, @@ -313,7 +296,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, @@ -347,7 +330,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["context"], }, "gen_kwargs": { @@ -367,7 +350,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["question"], "batch_kwargs": { "num_samples": 3, @@ -385,7 +368,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, @@ -413,7 +396,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["response"], }, }, @@ -427,7 +410,7 @@ def get_flow(self) -> list: ), "client": self.client, "model_id": self.model_id, - "model_prompt": _get_model_prompt(self.model_family), + "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, diff --git a/src/instructlab/sdg/generate_data.py b/src/instructlab/sdg/generate_data.py index 36c6cad4..89a3ae5b 100644 --- a/src/instructlab/sdg/generate_data.py +++ b/src/instructlab/sdg/generate_data.py @@ -18,8 +18,6 @@ # pylint: disable=ungrouped-imports from instructlab.sdg import SDG, utils from instructlab.sdg.default_flows import ( - MODEL_FAMILY_MERLINITE, - MODEL_FAMILY_MIXTRAL, MMLUBenchFlow, SimpleFreeformSkillFlow, SimpleGroundedSkillFlow, @@ -28,6 +26,7 @@ SynthKnowledgeFlow, SynthSkillsFlow, ) +from instructlab.sdg.llmblock import MODEL_FAMILY_MERLINITE, MODEL_FAMILY_MIXTRAL from instructlab.sdg.pipeline import Pipeline from instructlab.sdg.utils import models from instructlab.sdg.utils.taxonomy import ( diff --git a/src/instructlab/sdg/llmblock.py b/src/instructlab/sdg/llmblock.py index 4153a191..ad21dd68 100644 --- a/src/instructlab/sdg/llmblock.py +++ b/src/instructlab/sdg/llmblock.py @@ -13,6 +13,23 @@ logger = setup_logger(__name__) +MODEL_FAMILY_MIXTRAL = "mixtral" +MODEL_FAMILY_MERLINITE = "merlinite" + +_MODEL_PROMPT_MIXTRAL = " [INST] {prompt} [/INST]" +_MODEL_PROMPT_MERLINITE = "'<|system|>\nYou are an AI language model developed by IBM Research. You are a cautious assistant. You carefully follow instructions. You are helpful and harmless and you follow ethical guidelines and promote positive behavior.\n<|user|>\n{prompt}\n<|assistant|>\n'" + +_MODEL_PROMPTS = { + MODEL_FAMILY_MIXTRAL: _MODEL_PROMPT_MIXTRAL, + MODEL_FAMILY_MERLINITE: _MODEL_PROMPT_MERLINITE, +} + + +def _get_model_prompt(model_family): + if model_family not in _MODEL_PROMPTS: + raise ValueError(f"Unknown model family: {model_family}") + return _MODEL_PROMPTS[model_family] + def server_supports_batched(client, model_id: str) -> bool: supported = getattr(client, "server_supports_batched", None) @@ -42,9 +59,9 @@ def __init__( config_path, client, model_id, + model_family, output_cols, parser_kwargs={}, - model_prompt="{prompt}", **batch_kwargs, ) -> None: super().__init__(block_name) @@ -55,7 +72,8 @@ def __init__( self.prompt_template = self.prompt_struct.format(**self.block_config) self.client = client self.model = model_id - self.model_prompt = model_prompt + self.model_family = model_family + self.model_prompt = _get_model_prompt(self.model_family) self.output_cols = output_cols self.batch_params = batch_kwargs.get("batch_kwargs", {}) self.parser_name = parser_kwargs.get("parser_name", None) @@ -193,10 +211,10 @@ def __init__( config_paths, client, model_id, + model_family, output_cols, selector_column_name, parser_kwargs={}, - model_prompt="{prompt}", **batch_kwargs, ) -> None: super().__init__( @@ -204,9 +222,9 @@ def __init__( config_paths[0][0], client, model_id, + model_family, output_cols, parser_kwargs=parser_kwargs, - model_prompt=model_prompt, **batch_kwargs, ) self.selector_column_name = selector_column_name From 49c87d57cdf390264fd1bfce3d41c99935c12b40 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Tue, 2 Jul 2024 13:47:20 +0100 Subject: [PATCH 02/10] Add a PipelineContext class In order to prepare for pipeline definitions in YAML, remove runtime parameters like the OpenAI client, model ID, and model family from the pipeline definition into a PipelineContext object that all blocks have access to. Signed-off-by: Mark McLoughlin --- scripts/test_freeform_skills.py | 6 +- scripts/test_grounded_skills.py | 6 +- scripts/test_knowledge.py | 13 ++-- src/instructlab/sdg/block.py | 3 +- src/instructlab/sdg/default_flows.py | 100 +++++++-------------------- src/instructlab/sdg/filterblock.py | 11 ++- src/instructlab/sdg/generate_data.py | 48 +++++-------- src/instructlab/sdg/llmblock.py | 31 ++++----- src/instructlab/sdg/pipeline.py | 20 +++++- src/instructlab/sdg/utilblocks.py | 16 +++-- tests/test_filterblock.py | 3 + 11 files changed, 112 insertions(+), 145 deletions(-) diff --git a/scripts/test_freeform_skills.py b/scripts/test_freeform_skills.py index a8612c09..058fd64f 100644 --- a/scripts/test_freeform_skills.py +++ b/scripts/test_freeform_skills.py @@ -5,7 +5,7 @@ # First Party from src.instructlab.sdg import SDG from src.instructlab.sdg.default_flows import SynthSkillsFlow -from src.instructlab.sdg.pipeline import Pipeline +from src.instructlab.sdg.pipeline import Pipeline, PipelineContext # for vLLM endpoints, the api_key remains "EMPTY" openai_api_key = "EMPTY" @@ -49,7 +49,9 @@ ds = Dataset.from_list(samples) -skills_flow = SynthSkillsFlow(client, "mixtral", teacher_model, 1).get_flow() +ctx = PipelineContext(client, "mixtral", teacher_model, 1) + +skills_flow = SynthSkillsFlow(ctx).get_flow() skills_pipe = Pipeline(skills_flow) sdg = SDG([skills_pipe]) diff --git a/scripts/test_grounded_skills.py b/scripts/test_grounded_skills.py index 338edb6c..6d0bdc1b 100644 --- a/scripts/test_grounded_skills.py +++ b/scripts/test_grounded_skills.py @@ -5,7 +5,7 @@ # First Party from src.instructlab.sdg import SDG from src.instructlab.sdg.default_flows import SynthGroundedSkillsFlow -from src.instructlab.sdg.pipeline import Pipeline +from src.instructlab.sdg.pipeline import Pipeline, PipelineContext # for vLLM endpoints, the api_key remains "EMPTY" openai_api_key = "EMPTY" @@ -97,7 +97,9 @@ ds = Dataset.from_list(samples) -skills_flow = SynthGroundedSkillsFlow(client, "mixtral", teacher_model, 10).get_flow() +ctx = PipelineContext(client, "mixtral", teacher_model, 10) + +skills_flow = SynthGroundedSkillsFlow(ctx).get_flow() skills_pipe = Pipeline(skills_flow) sdg = SDG([skills_pipe]) diff --git a/scripts/test_knowledge.py b/scripts/test_knowledge.py index aeedcf59..2b534903 100644 --- a/scripts/test_knowledge.py +++ b/scripts/test_knowledge.py @@ -8,7 +8,7 @@ # First Party from src.instructlab.sdg import SDG from src.instructlab.sdg.default_flows import MMLUBenchFlow, SynthKnowledgeFlow -from src.instructlab.sdg.pipeline import Pipeline +from src.instructlab.sdg.pipeline import Pipeline, PipelineContext # Please don't add you vLLM endpoint key here openai_api_key = "EMPTY" @@ -38,12 +38,13 @@ ds = Dataset.from_list(samples) -mmlu_flow = MMLUBenchFlow(client, "mixtral", teacher_model, 1).get_flow() -knowledge_flow = SynthKnowledgeFlow(client, "mixtral", teacher_model, 1).get_flow() -knowledge_pipe = Pipeline(knowledge_flow) -mmlu_pipe = Pipeline(mmlu_flow) +ctx = PipelineContext(client, "mixtral", teacher_model, 1) -sdg = SDG([mmlu_pipe, knowledge_pipe]) +mmlu_flow = MMLUBenchFlow(ctx).get_flow() +knowledge_flow = SynthKnowledgeFlow(ctx).get_flow() +knowledge_pipe = Pipeline(mmlu_flow + knowledge_flow) + +sdg = SDG([knowledge_pipe]) mmlubench_data = sdg.generate(ds) print(mmlubench_data) diff --git a/src/instructlab/sdg/block.py b/src/instructlab/sdg/block.py index 09433f55..e8807420 100644 --- a/src/instructlab/sdg/block.py +++ b/src/instructlab/sdg/block.py @@ -14,7 +14,8 @@ class Block(ABC): - def __init__(self, block_name: str) -> None: + def __init__(self, ctx, block_name: str) -> None: + self.ctx = ctx self.block_name = block_name @staticmethod diff --git a/src/instructlab/sdg/default_flows.py b/src/instructlab/sdg/default_flows.py index dd3e781e..ab6396d2 100644 --- a/src/instructlab/sdg/default_flows.py +++ b/src/instructlab/sdg/default_flows.py @@ -1,7 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # Standard from abc import ABC, abstractmethod -from importlib import resources import operator import os @@ -12,14 +11,8 @@ class Flow(ABC): - def __init__( - self, client, model_family, model_id, num_instructions_to_generate - ) -> None: - self.client = client - self.model_family = model_family - self.model_id = model_id - self.num_instructions_to_generate = num_instructions_to_generate - self.sdg_base = resources.files(__package__) + def __init__(self, ctx) -> None: + self.ctx = ctx @abstractmethod def get_flow(self) -> list: @@ -34,15 +27,12 @@ def get_flow(self) -> list: "block_config": { "block_name": "", # must be set by subclass "config_path": "", # must be set by subclass - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["output"], }, "gen_kwargs": { "max_tokens": 2048, "temperature": 0.7, - "n": self.num_instructions_to_generate, + "n": self.ctx.num_instructions_to_generate, }, "drop_duplicates": ["output"], } @@ -53,7 +43,7 @@ class SimpleKnowledgeFlow(_SimpleFlow): def get_flow(self) -> list: flow = super().get_flow() flow[0]["block_config"]["config_path"] = os.path.join( - self.sdg_base, "configs/knowledge/simple_generate_qa.yaml" + self.ctx.sdg_base, "configs/knowledge/simple_generate_qa.yaml" ) flow[0]["block_config"]["block_name"] = "gen_knowledge" return flow @@ -63,10 +53,9 @@ class SimpleFreeformSkillFlow(_SimpleFlow): def get_flow(self) -> list: flow = super().get_flow() flow[0]["block_config"]["config_path"] = os.path.join( - self.sdg_base, "configs/skills/simple_generate_qa_freeform.yaml" + self.ctx.sdg_base, "configs/skills/simple_generate_qa_freeform.yaml" ) flow[0]["block_config"]["block_name"] = "gen_skill_freeform" - flow[0]["block_config"]["block_name"] = "gen_skill_freeform" return flow @@ -74,7 +63,7 @@ class SimpleGroundedSkillFlow(_SimpleFlow): def get_flow(self) -> list: flow = super().get_flow() flow[0]["block_config"]["config_path"] = os.path.join( - self.sdg_base, "configs/skills/simple_generate_qa_grounded.yaml" + self.ctx.sdg_base, "configs/skills/simple_generate_qa_grounded.yaml" ) flow[0]["block_config"]["block_name"] = "gen_skill_grounded" return flow @@ -82,18 +71,14 @@ def get_flow(self) -> list: class MMLUBenchFlow(Flow): def get_flow(self) -> list: - self.sdg_base = resources.files(__package__) return [ { "block_type": LLMBlock, "block_config": { "block_name": "gen_mmlu_knowledge", "config_path": os.path.join( - self.sdg_base, "configs/knowledge/mcq_generation.yaml" + self.ctx.sdg_base, "configs/knowledge/mcq_generation.yaml" ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["mmlubench_question", "mmlubench_answer"], }, "gen_kwargs": { @@ -113,12 +98,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "gen_knowledge", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/knowledge/generate_questions_responses.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["question", "response"], "parser_kwargs": { "parser_name": "custom", @@ -136,11 +118,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "eval_faithfulness_qa_pair", "config_path": os.path.join( - self.sdg_base, "configs/knowledge/evaluate_faithfulness.yaml" + self.ctx.sdg_base, + "configs/knowledge/evaluate_faithfulness.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["explanation", "judgment"], }, "gen_kwargs": { @@ -165,11 +145,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "eval_relevancy_qa_pair", "config_path": os.path.join( - self.sdg_base, "configs/knowledge/evaluate_relevancy.yaml" + self.ctx.sdg_base, + "configs/knowledge/evaluate_relevancy.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["feedback", "score"], }, "gen_kwargs": { @@ -195,11 +173,8 @@ def get_flow(self) -> list: "block_config": { "block_name": "eval_verify_question", "config_path": os.path.join( - self.sdg_base, "configs/knowledge/evaluate_question.yaml" + self.ctx.sdg_base, "configs/knowledge/evaluate_question.yaml" ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["explanation", "rating"], }, "gen_kwargs": { @@ -231,15 +206,12 @@ def get_flow(self) -> list: "block_config": { "block_name": "gen_questions", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/freeform_questions.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["question"], "batch_kwargs": { - "num_samples": self.num_instructions_to_generate, + "num_samples": self.ctx.num_instructions_to_generate, }, }, "drop_duplicates": ["question"], @@ -249,12 +221,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "eval_questions", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/evaluate_freeform_questions.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, @@ -277,12 +246,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "gen_responses", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/freeform_responses.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["response"], }, }, @@ -291,12 +257,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "evaluate_qa_pair", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/evaluate_freeform_pair.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, @@ -325,18 +288,15 @@ def get_flow(self) -> list: "block_config": { "block_name": "gen_contexts", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/contexts.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["context"], }, "gen_kwargs": { "temperature": 0.7, "max_tokens": 2048, - "n": self.num_instructions_to_generate, + "n": self.ctx.num_instructions_to_generate, }, "drop_duplicates": ["context"], }, @@ -345,12 +305,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "gen_grounded_questions", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/grounded_questions.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["question"], "batch_kwargs": { "num_samples": 3, @@ -363,12 +320,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "eval_grounded_questions", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/evaluate_grounded_questions.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, @@ -391,12 +345,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "gen_grounded_responses", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/grounded_responses.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["response"], }, }, @@ -405,12 +356,9 @@ def get_flow(self) -> list: "block_config": { "block_name": "evaluate_grounded_qa_pair", "config_path": os.path.join( - self.sdg_base, + self.ctx.sdg_base, "configs/skills/evaluate_grounded_pair.yaml", ), - "client": self.client, - "model_id": self.model_id, - "model_family": self.model_family, "output_cols": ["evaluation", "score"], }, }, diff --git a/src/instructlab/sdg/filterblock.py b/src/instructlab/sdg/filterblock.py index f5551b02..609ce142 100644 --- a/src/instructlab/sdg/filterblock.py +++ b/src/instructlab/sdg/filterblock.py @@ -11,12 +11,19 @@ class FilterByValueBlock(Block): def __init__( - self, filter_column, filter_value, operation, convert_dtype=None, **batch_kwargs + self, + ctx, + filter_column, + filter_value, + operation, + convert_dtype=None, + **batch_kwargs, ) -> None: """ Initializes a new instance of the FilterByValueBlock class. Parameters: + - ctx (PipelineContext): A PipelineContext object containing runtime parameters. - filter_column (str): The name of the column in the dataset to apply the filter on. - filter_value (any or list of any): The value(s) to filter by. - operation (callable): A function that takes two arguments (column value and filter value) and returns a boolean indicating whether the row should be included in the filtered dataset. @@ -26,7 +33,7 @@ def __init__( Returns: None """ - super().__init__(block_name=self.__class__.__name__) + super().__init__(ctx, block_name=self.__class__.__name__) self.value = filter_value if isinstance(filter_value, list) else [filter_value] self.column_name = filter_column self.operation = operation diff --git a/src/instructlab/sdg/generate_data.py b/src/instructlab/sdg/generate_data.py index 89a3ae5b..abcd6665 100644 --- a/src/instructlab/sdg/generate_data.py +++ b/src/instructlab/sdg/generate_data.py @@ -27,7 +27,7 @@ SynthSkillsFlow, ) from instructlab.sdg.llmblock import MODEL_FAMILY_MERLINITE, MODEL_FAMILY_MIXTRAL -from instructlab.sdg.pipeline import Pipeline +from instructlab.sdg.pipeline import Pipeline, PipelineContext from instructlab.sdg.utils import models from instructlab.sdg.utils.taxonomy import ( leaf_node_to_samples, @@ -183,37 +183,25 @@ def _sdg_init(pipeline, client, model_family, model_name, num_instructions_to_ge else: raise utils.GenerateException(f"Error: pipeline ({pipeline}) is not supported.") - sdg_knowledge = SDG( - [ - Pipeline( - flow_type( - client, model_family, model_name, num_instructions_to_generate - ).get_flow() - ) - for flow_type in knowledge_flow_types - ] - ) - sdg_freeform_skill = SDG( - [ - Pipeline( - flow_type( - client, model_family, model_name, num_instructions_to_generate - ).get_flow() - ) - for flow_type in freeform_skill_flow_types - ] + ctx = PipelineContext( + client, model_family, model_name, num_instructions_to_generate ) - sdg_grounded_skill = SDG( - [ - Pipeline( - flow_type( - client, model_family, model_name, num_instructions_to_generate - ).get_flow() - ) - for flow_type in grounded_skill_flow_types - ] + + def build_pipeline(flow_types): + block_configs = [] + for flow_type in flow_types: + block_configs.extend(flow_type(ctx).get_flow()) + return Pipeline(ctx, block_configs) + + knowledge_pipeline = build_pipeline(knowledge_flow_types) + freeform_skill_pipeline = build_pipeline(freeform_skill_flow_types) + grounded_skill_pipeline = build_pipeline(grounded_skill_flow_types) + + return ( + SDG([knowledge_pipeline]), + SDG([freeform_skill_pipeline]), + SDG([grounded_skill_pipeline]), ) - return sdg_knowledge, sdg_freeform_skill, sdg_grounded_skill # TODO - parameter removal needs to be done in sync with a CLI change. diff --git a/src/instructlab/sdg/llmblock.py b/src/instructlab/sdg/llmblock.py index ad21dd68..eaa58556 100644 --- a/src/instructlab/sdg/llmblock.py +++ b/src/instructlab/sdg/llmblock.py @@ -55,39 +55,36 @@ class LLMBlock(Block): # pylint: disable=too-many-instance-attributes def __init__( self, + ctx, block_name, config_path, - client, - model_id, - model_family, output_cols, parser_kwargs={}, **batch_kwargs, ) -> None: - super().__init__(block_name) + super().__init__(ctx, block_name) self.block_config = self._load_config(config_path) self.prompt_struct = ( """{system}\n{introduction}\n{principles}\n{examples}\n{generation}""" ) self.prompt_template = self.prompt_struct.format(**self.block_config) - self.client = client - self.model = model_id - self.model_family = model_family - self.model_prompt = _get_model_prompt(self.model_family) + self.model_prompt = _get_model_prompt(self.ctx.model_family) self.output_cols = output_cols self.batch_params = batch_kwargs.get("batch_kwargs", {}) self.parser_name = parser_kwargs.get("parser_name", None) self.parsing_pattern = parser_kwargs.get("parsing_pattern", None) self.parser_cleanup_tags = parser_kwargs.get("parser_cleanup_tags", None) self.defaults = { - "model": self.model, + "model": self.ctx.model_id, "temperature": 0, "max_tokens": 12000, } # Whether the LLM server supports a list of input prompts # and supports the n parameter to generate n outputs per input - self.server_supports_batched = server_supports_batched(client, model_id) + self.server_supports_batched = server_supports_batched( + self.ctx.client, self.ctx.model_id + ) def _parse(self, generated_string) -> dict: matches = {} @@ -137,14 +134,16 @@ def _generate(self, samples, **gen_kwargs) -> list: generate_args = {**self.defaults, **gen_kwargs} if self.server_supports_batched: - response = self.client.completions.create(prompt=prompts, **generate_args) + response = self.ctx.client.completions.create( + prompt=prompts, **generate_args + ) return [choice.text.strip() for choice in response.choices] n = gen_kwargs.get("n", 1) results = [] for prompt in prompts: for _ in range(n): - response = self.client.completions.create( + response = self.ctx.client.completions.create( prompt=prompt, **generate_args ) results.append(response.choices[0].text.strip()) @@ -207,22 +206,18 @@ def generate(self, samples: Dataset, **gen_kwargs) -> Dataset: class ConditionalLLMBlock(LLMBlock): def __init__( self, + ctx, block_name, config_paths, - client, - model_id, - model_family, output_cols, selector_column_name, parser_kwargs={}, **batch_kwargs, ) -> None: super().__init__( + ctx, block_name, config_paths[0][0], - client, - model_id, - model_family, output_cols, parser_kwargs=parser_kwargs, **batch_kwargs, diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index bc570a83..93464601 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -1,4 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 +# Standard +from importlib import resources + # Third Party from datasets import Dataset @@ -8,12 +11,25 @@ logger = setup_logger(__name__) +class PipelineContext: + def __init__( + self, client, model_family, model_id, num_instructions_to_generate + ) -> None: + self.client = client + self.model_family = model_family + self.model_id = model_id + self.num_instructions_to_generate = num_instructions_to_generate + self.sdg_base = resources.files(__package__) + + class Pipeline: - def __init__(self, chained_blocks: list) -> None: + def __init__(self, ctx, chained_blocks: list) -> None: """ Initialize the Pipeline class with a configuration dictionary. config_dict: the run config py or yaml loaded into a dictionary """ + # ctx is a PipelineContext object that supplies context configuration to every block + self.ctx = ctx # pipeline config is the run configuration that consists of the pipeline steps self.chained_blocks = chained_blocks @@ -36,7 +52,7 @@ def generate(self, dataset) -> Dataset: drop_columns = block_prop.get("drop_columns", []) gen_kwargs = block_prop.get("gen_kwargs", {}) drop_duplicates_cols = block_prop.get("drop_duplicates", False) - block = block_type(**block_config) + block = block_type(self.ctx, **block_config) logger.info("Running block: %s", block_config["block_name"]) logger.info(dataset) diff --git a/src/instructlab/sdg/utilblocks.py b/src/instructlab/sdg/utilblocks.py index db04b5a1..a93a5742 100644 --- a/src/instructlab/sdg/utilblocks.py +++ b/src/instructlab/sdg/utilblocks.py @@ -10,9 +10,11 @@ class SamplePopulatorBlock(Block): - def __init__(self, config_paths, column_name, post_fix="", **batch_kwargs) -> None: + def __init__( + self, ctx, config_paths, column_name, post_fix="", **batch_kwargs + ) -> None: super().__init__( - block_name=self.__class__.__name__ + ctx, block_name=self.__class__.__name__ ) # Call the base class's __init__ self.configs = {} for config in config_paths: @@ -35,8 +37,8 @@ def generate(self, samples) -> Dataset: class SelectorBlock(Block): - def __init__(self, choice_map, choice_col, output_col, **batch_kwargs) -> None: - super().__init__(block_name=self.__class__.__name__) + def __init__(self, ctx, choice_map, choice_col, output_col, **batch_kwargs) -> None: + super().__init__(ctx, block_name=self.__class__.__name__) self.choice_map = choice_map self.choice_col = choice_col self.output_col = output_col @@ -52,8 +54,10 @@ def generate(self, samples: Dataset) -> Dataset: class CombineColumnsBlock(Block): - def __init__(self, columns, output_col, separator="\n\n", **batch_kwargs) -> None: - super().__init__(block_name=self.__class__.__name__) + def __init__( + self, ctx, columns, output_col, separator="\n\n", **batch_kwargs + ) -> None: + super().__init__(ctx, block_name=self.__class__.__name__) self.columns = columns self.output_col = output_col self.separator = separator diff --git a/tests/test_filterblock.py b/tests/test_filterblock.py index 7b8b1ce7..53531fd0 100644 --- a/tests/test_filterblock.py +++ b/tests/test_filterblock.py @@ -8,17 +8,20 @@ # First Party from instructlab.sdg.filterblock import FilterByValueBlock +from instructlab.sdg.pipeline import PipelineContext class TestFilterByValueBlock(unittest.TestCase): def setUp(self): self.block = FilterByValueBlock( + PipelineContext(None, None, None, None), filter_column="age", filter_value=30, operation=operator.eq, convert_dtype=int, ) self.block_with_list = FilterByValueBlock( + PipelineContext(None, None, None, None), filter_column="age", filter_value=[30, 35], operation=operator.eq, From 7cfbaa9dc11a6ad338a5aabc133cb1a8736142a8 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Wed, 10 Jul 2024 13:13:06 -0400 Subject: [PATCH 03/10] Fix multiprocessing issues in FilterByValueBlock This addresses issues with using num_proc>1 with Dataset.map() and Dataset.filter(). The first issue is: ``` File "/usr/lib64/python3.11/pickle.py", line 578, in save rv = reduce(self.proto) ^^^^^^^^^^^^^^^^^^ TypeError: cannot pickle 'SSLContext' object ``` What was happening here is that the entire FilterByValueBlock object was being serialized to send to the multiprocessing worker. And now that this includes PipelineContext, which includes the OpenAI client object, which includes SSLContext, we hit a known issue: uqfoundation/dill#308 The second issue is specific to map(): ``` ValueError: The features can't be aligned because the key score of features {'task_description': Value(dtype='string', id=None), 'seed_question': Value(dtype='string', id=None), 'seed_response': Value(dtype='string', id=None), 'num_samples': Value(dtype='int64', id=None), 'question': Value(dtype='string', id=None), '__index_level_0__': Value(dtype='int64', id=None), 'evaluation': Value(dtype='string', id=None), 'score': Value(dtype='string', id=None)} has unexpected type - Value(dtype='string', id=None) (expected either Value(dtype='float64', id=None) or Value("null"). ``` It appears the the datasets, only in the case of num_proc>1, when we hit the "error converting dtype" case and set the column to None, it ends up being still considered a string column rather than the new dtype. This second issue deserves further investigation and may require a fix to the datasets library. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/filterblock.py | 55 ++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/src/instructlab/sdg/filterblock.py b/src/instructlab/sdg/filterblock.py index 609ce142..96d3e7af 100644 --- a/src/instructlab/sdg/filterblock.py +++ b/src/instructlab/sdg/filterblock.py @@ -9,6 +9,39 @@ logger = setup_logger(__name__) +# Note - this is not a method on the class below in order to avoid +# serializing the object itself when multi-processing is used. +# In particular, SSLContext - embedded in the OpenAI client object - +# cannot be pickled. +def _filter_by_values(samples, column, op, values, num_proc=1): + return samples.filter( + lambda x: any(op(x[column], value) for value in values), + num_proc=num_proc, + ) + + +def _map_dtype(samples, column, dtype, num_proc=1): + def convert_column(sample): + try: + sample[column] = dtype(sample[column]) + except ValueError as e: + logger.error( + "Error converting dtype: %s, filling with None to be filtered later", e + ) + sample[column] = None + return sample + + # FIXME: it appears multiprocessing map has issues with + # None columns. If we pass num_proc>1 here and the error + # case is triggered above, we get: + # ValueError: The features can't be aligned ... + # because the column is still considered a string not + # the new dtype. + num_proc = 1 + + return samples.map(convert_column, num_proc=num_proc) + + class FilterByValueBlock(Block): def __init__( self, @@ -40,26 +73,12 @@ def __init__( self.convert_dtype = convert_dtype self.num_procs = batch_kwargs.get("num_procs", 1) - def _convert_dtype(self, sample): - try: - sample[self.column_name] = self.convert_dtype(sample[self.column_name]) - except ValueError as e: - logger.error( - "Error converting dtype: %s, filling with None to be filtered later", e - ) - sample[self.column_name] = None - return sample - def generate(self, samples) -> Dataset: if self.convert_dtype: - samples = samples.map( - self._convert_dtype, - num_proc=self.num_procs, + samples = _map_dtype( + samples, self.column_name, self.convert_dtype, self.num_procs ) - return samples.filter( - lambda x: any( - self.operation(x[self.column_name], value) for value in self.value - ), - num_proc=self.num_procs, + return _filter_by_values( + samples, self.column_name, self.operation, self.value, self.num_procs ) From 9d925486db22b7ff3fed66822912e894ecbcbd39 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Thu, 11 Jul 2024 22:08:43 +0100 Subject: [PATCH 04/10] Fix multiprocessing issues in utilblocks Address the following issue with using num_proc>1 with Dataset.map(): ``` File "/usr/lib64/python3.11/pickle.py", line 578, in save rv = reduce(self.proto) ^^^^^^^^^^^^^^^^^^ TypeError: cannot pickle 'SSLContext' object ``` The entire block object is being serialized to sent to the multiprocessing worker. And now that this includes PipelineContext, which includes the OpenAI client object, which includes SSLContext, we hit a known issue: uqfoundation/dill#308 Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/utilblocks.py | 53 +++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/src/instructlab/sdg/utilblocks.py b/src/instructlab/sdg/utilblocks.py index a93a5742..4da8330c 100644 --- a/src/instructlab/sdg/utilblocks.py +++ b/src/instructlab/sdg/utilblocks.py @@ -27,13 +27,18 @@ def __init__( self.column_name = column_name self.num_procs = batch_kwargs.get("num_procs", 8) - def _generate(self, sample) -> dict: - sample = {**sample, **self.configs[sample[self.column_name]]} - return sample + # Using a static method to avoid serializing self when using multiprocessing + @staticmethod + def _map_populate(samples, configs, column_name, num_proc=1): + def populate(sample): + return {**sample, **configs[sample[column_name]]} + + return samples.map(populate, num_proc) def generate(self, samples) -> Dataset: - samples = samples.map(self._generate, num_proc=self.num_procs) - return samples + return self._map_populate_samples( + samples, self.configs, self.column_name, self.num_procs + ) class SelectorBlock(Block): @@ -44,13 +49,23 @@ def __init__(self, ctx, choice_map, choice_col, output_col, **batch_kwargs) -> N self.output_col = output_col self.num_procs = batch_kwargs.get("num_procs", 8) - def _generate(self, sample) -> dict: - sample[self.output_col] = sample[self.choice_map[sample[self.choice_col]]] - return sample + # Using a static method to avoid serializing self when using multiprocessing + @staticmethod + def _map_select_choice(samples, choice_map, choice_col, output_col, num_proc=1): + def select_choice(sample) -> dict: + sample[output_col] = sample[choice_map[sample[choice_col]]] + return sample + + return samples.map(select_choice, num_proc) def generate(self, samples: Dataset) -> Dataset: - samples = samples.map(self._generate, num_proc=self.num_procs) - return samples + return self._map_select_choice( + samples, + self.choice_map, + self.choice_col, + self.output_col, + self.num_procs, + ) class CombineColumnsBlock(Block): @@ -63,12 +78,16 @@ def __init__( self.separator = separator self.num_procs = batch_kwargs.get("num_procs", 8) - def _generate(self, sample) -> dict: - sample[self.output_col] = self.separator.join( - [sample[col] for col in self.columns] - ) - return sample + # Using a static method to avoid serializing self when using multiprocessing + @staticmethod + def _map_combine(samples, columns, output_col, separator, num_proc=1): + def combine(sample): + sample[output_col] = separator.join([sample[col] for col in columns]) + return sample + + return samples.map(combine, num_proc=num_proc) def generate(self, samples: Dataset) -> Dataset: - samples = samples.map(self._generate, num_proc=self.num_procs) - return samples + return self._map_combine( + samples, self.columns, self.output_col, self.separator, self.num_procs + ) From 23dd08ea73804a5d765dad0214f1bbb160c1ba66 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 12 Jul 2024 00:02:39 +0100 Subject: [PATCH 05/10] Allow block_config.config_path to be relative In order to remove another runtime parameter from pipeline definitions to allow us to move to using YAML files. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/block.py | 6 +++ src/instructlab/sdg/default_flows.py | 81 +++++++--------------------- 2 files changed, 26 insertions(+), 61 deletions(-) diff --git a/src/instructlab/sdg/block.py b/src/instructlab/sdg/block.py index e8807420..a28136c4 100644 --- a/src/instructlab/sdg/block.py +++ b/src/instructlab/sdg/block.py @@ -3,6 +3,7 @@ from abc import ABC from collections import ChainMap from typing import Any, Dict, Union +import os.path # Third Party import yaml @@ -42,8 +43,13 @@ def _load_config(self, config_path: str) -> Union[Dict[str, Any], None]: """ Load the configuration file for this block. + If the supplied configuration file is a relative path, it is assumed + to be part of this Python package. + :param config_path: The path to the configuration file. :return: The loaded configuration. """ + if not os.path.isabs(config_path): + config_path = os.path.join(self.ctx.sdg_base, config_path) with open(config_path, "r", encoding="utf-8") as config_file: return yaml.safe_load(config_file) diff --git a/src/instructlab/sdg/default_flows.py b/src/instructlab/sdg/default_flows.py index ab6396d2..2839e212 100644 --- a/src/instructlab/sdg/default_flows.py +++ b/src/instructlab/sdg/default_flows.py @@ -2,7 +2,6 @@ # Standard from abc import ABC, abstractmethod import operator -import os # Local from .filterblock import FilterByValueBlock @@ -42,8 +41,8 @@ def get_flow(self) -> list: class SimpleKnowledgeFlow(_SimpleFlow): def get_flow(self) -> list: flow = super().get_flow() - flow[0]["block_config"]["config_path"] = os.path.join( - self.ctx.sdg_base, "configs/knowledge/simple_generate_qa.yaml" + flow[0]["block_config"]["config_path"] = ( + "configs/knowledge/simple_generate_qa.yaml" ) flow[0]["block_config"]["block_name"] = "gen_knowledge" return flow @@ -52,8 +51,8 @@ def get_flow(self) -> list: class SimpleFreeformSkillFlow(_SimpleFlow): def get_flow(self) -> list: flow = super().get_flow() - flow[0]["block_config"]["config_path"] = os.path.join( - self.ctx.sdg_base, "configs/skills/simple_generate_qa_freeform.yaml" + flow[0]["block_config"]["config_path"] = ( + "configs/skills/simple_generate_qa_freeform.yaml" ) flow[0]["block_config"]["block_name"] = "gen_skill_freeform" return flow @@ -62,8 +61,8 @@ def get_flow(self) -> list: class SimpleGroundedSkillFlow(_SimpleFlow): def get_flow(self) -> list: flow = super().get_flow() - flow[0]["block_config"]["config_path"] = os.path.join( - self.ctx.sdg_base, "configs/skills/simple_generate_qa_grounded.yaml" + flow[0]["block_config"]["config_path"] = ( + "configs/skills/simple_generate_qa_grounded.yaml" ) flow[0]["block_config"]["block_name"] = "gen_skill_grounded" return flow @@ -76,9 +75,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "gen_mmlu_knowledge", - "config_path": os.path.join( - self.ctx.sdg_base, "configs/knowledge/mcq_generation.yaml" - ), + "config_path": "configs/knowledge/mcq_generation.yaml", "output_cols": ["mmlubench_question", "mmlubench_answer"], }, "gen_kwargs": { @@ -97,10 +94,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "gen_knowledge", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/knowledge/generate_questions_responses.yaml", - ), + "config_path": "configs/knowledge/generate_questions_responses.yaml", "output_cols": ["question", "response"], "parser_kwargs": { "parser_name": "custom", @@ -117,10 +111,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "eval_faithfulness_qa_pair", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/knowledge/evaluate_faithfulness.yaml", - ), + "config_path": "configs/knowledge/evaluate_faithfulness.yaml", "output_cols": ["explanation", "judgment"], }, "gen_kwargs": { @@ -144,10 +135,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "eval_relevancy_qa_pair", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/knowledge/evaluate_relevancy.yaml", - ), + "config_path": "configs/knowledge/evaluate_relevancy.yaml", "output_cols": ["feedback", "score"], }, "gen_kwargs": { @@ -172,9 +160,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "eval_verify_question", - "config_path": os.path.join( - self.ctx.sdg_base, "configs/knowledge/evaluate_question.yaml" - ), + "config_path": "configs/knowledge/evaluate_question.yaml", "output_cols": ["explanation", "rating"], }, "gen_kwargs": { @@ -205,10 +191,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "gen_questions", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/freeform_questions.yaml", - ), + "config_path": "configs/skills/freeform_questions.yaml", "output_cols": ["question"], "batch_kwargs": { "num_samples": self.ctx.num_instructions_to_generate, @@ -220,10 +203,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "eval_questions", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/evaluate_freeform_questions.yaml", - ), + "config_path": "configs/skills/evaluate_freeform_questions.yaml", "output_cols": ["evaluation", "score"], }, }, @@ -245,10 +225,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "gen_responses", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/freeform_responses.yaml", - ), + "config_path": "configs/skills/freeform_responses.yaml", "output_cols": ["response"], }, }, @@ -256,10 +233,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "evaluate_qa_pair", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/evaluate_freeform_pair.yaml", - ), + "config_path": "configs/skills/evaluate_freeform_pair.yaml", "output_cols": ["evaluation", "score"], }, }, @@ -287,10 +261,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "gen_contexts", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/contexts.yaml", - ), + "config_path": "configs/skills/contexts.yaml", "output_cols": ["context"], }, "gen_kwargs": { @@ -304,10 +275,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "gen_grounded_questions", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/grounded_questions.yaml", - ), + "config_path": "configs/skills/grounded_questions.yaml", "output_cols": ["question"], "batch_kwargs": { "num_samples": 3, @@ -319,10 +287,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "eval_grounded_questions", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/evaluate_grounded_questions.yaml", - ), + "config_path": "configs/skills/evaluate_grounded_questions.yaml", "output_cols": ["evaluation", "score"], }, }, @@ -344,10 +309,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "gen_grounded_responses", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/grounded_responses.yaml", - ), + "config_path": "configs/skills/grounded_responses.yaml", "output_cols": ["response"], }, }, @@ -355,10 +317,7 @@ def get_flow(self) -> list: "block_type": LLMBlock, "block_config": { "block_name": "evaluate_grounded_qa_pair", - "config_path": os.path.join( - self.ctx.sdg_base, - "configs/skills/evaluate_grounded_pair.yaml", - ), + "config_path": "configs/skills/evaluate_grounded_pair.yaml", "output_cols": ["evaluation", "score"], }, }, From 9fc272ca1e01962410ba09c87def800b29629076 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 12 Jul 2024 00:14:40 +0100 Subject: [PATCH 06/10] Fix block_name handling All Block subclasses but LLMBlock are failing to pass the block_name from block_config down to the base class, instead they are incorrectly passing the block type as its name. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/filterblock.py | 4 +++- src/instructlab/sdg/utilblocks.py | 16 ++++++++-------- tests/test_filterblock.py | 2 ++ 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/instructlab/sdg/filterblock.py b/src/instructlab/sdg/filterblock.py index 96d3e7af..afb58b7b 100644 --- a/src/instructlab/sdg/filterblock.py +++ b/src/instructlab/sdg/filterblock.py @@ -46,6 +46,7 @@ class FilterByValueBlock(Block): def __init__( self, ctx, + block_name, filter_column, filter_value, operation, @@ -57,6 +58,7 @@ def __init__( Parameters: - ctx (PipelineContext): A PipelineContext object containing runtime parameters. + - block_name (str): An identifier for this block. - filter_column (str): The name of the column in the dataset to apply the filter on. - filter_value (any or list of any): The value(s) to filter by. - operation (callable): A function that takes two arguments (column value and filter value) and returns a boolean indicating whether the row should be included in the filtered dataset. @@ -66,7 +68,7 @@ def __init__( Returns: None """ - super().__init__(ctx, block_name=self.__class__.__name__) + super().__init__(ctx, block_name) self.value = filter_value if isinstance(filter_value, list) else [filter_value] self.column_name = filter_column self.operation = operation diff --git a/src/instructlab/sdg/utilblocks.py b/src/instructlab/sdg/utilblocks.py index 4da8330c..871b2ce8 100644 --- a/src/instructlab/sdg/utilblocks.py +++ b/src/instructlab/sdg/utilblocks.py @@ -11,11 +11,9 @@ class SamplePopulatorBlock(Block): def __init__( - self, ctx, config_paths, column_name, post_fix="", **batch_kwargs + self, ctx, block_name, config_paths, column_name, post_fix="", **batch_kwargs ) -> None: - super().__init__( - ctx, block_name=self.__class__.__name__ - ) # Call the base class's __init__ + super().__init__(ctx, block_name) self.configs = {} for config in config_paths: if post_fix: @@ -42,8 +40,10 @@ def generate(self, samples) -> Dataset: class SelectorBlock(Block): - def __init__(self, ctx, choice_map, choice_col, output_col, **batch_kwargs) -> None: - super().__init__(ctx, block_name=self.__class__.__name__) + def __init__( + self, ctx, block_name, choice_map, choice_col, output_col, **batch_kwargs + ) -> None: + super().__init__(ctx, block_name) self.choice_map = choice_map self.choice_col = choice_col self.output_col = output_col @@ -70,9 +70,9 @@ def generate(self, samples: Dataset) -> Dataset: class CombineColumnsBlock(Block): def __init__( - self, ctx, columns, output_col, separator="\n\n", **batch_kwargs + self, ctx, block_name, columns, output_col, separator="\n\n", **batch_kwargs ) -> None: - super().__init__(ctx, block_name=self.__class__.__name__) + super().__init__(ctx, block_name) self.columns = columns self.output_col = output_col self.separator = separator diff --git a/tests/test_filterblock.py b/tests/test_filterblock.py index 53531fd0..5e00c80b 100644 --- a/tests/test_filterblock.py +++ b/tests/test_filterblock.py @@ -15,6 +15,7 @@ class TestFilterByValueBlock(unittest.TestCase): def setUp(self): self.block = FilterByValueBlock( PipelineContext(None, None, None, None), + block_name="filter_by_age", filter_column="age", filter_value=30, operation=operator.eq, @@ -22,6 +23,7 @@ def setUp(self): ) self.block_with_list = FilterByValueBlock( PipelineContext(None, None, None, None), + block_name="filter_by_ages", filter_column="age", filter_value=[30, 35], operation=operator.eq, From 8cb673b2b06d0734e08b6a018d5a4f3102589f67 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 12 Jul 2024 00:36:35 +0100 Subject: [PATCH 07/10] Move FilterByValue multiprocessing config to PipelineContext In every use of FilterByValue in the default flows, we use batch_kwargs to set num_proc=8. This doesn't appear to be a pipeline author concern, but rather a runtime parameter which should in future be based on the number of available CPUs and (perhaps) user configuration. For now, just move it from batch_kwargs to PipelineContext. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/default_flows.py | 25 ------------------------- src/instructlab/sdg/filterblock.py | 7 ++----- src/instructlab/sdg/pipeline.py | 2 ++ src/instructlab/sdg/utilblocks.py | 21 ++++++--------------- 4 files changed, 10 insertions(+), 45 deletions(-) diff --git a/src/instructlab/sdg/default_flows.py b/src/instructlab/sdg/default_flows.py index 2839e212..f7e0419e 100644 --- a/src/instructlab/sdg/default_flows.py +++ b/src/instructlab/sdg/default_flows.py @@ -125,9 +125,6 @@ def get_flow(self) -> list: "filter_column": "judgment", "filter_value": "YES", "operation": operator.eq, - "batch_kwargs": { - "num_procs": 8, - }, }, "drop_columns": ["judgment", "explanation"], }, @@ -150,9 +147,6 @@ def get_flow(self) -> list: "filter_value": 2.0, "operation": operator.eq, "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, }, "drop_columns": ["feedback", "score"], }, @@ -175,9 +169,6 @@ def get_flow(self) -> list: "filter_value": 1.0, "operation": operator.eq, "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, }, "drop_columns": ["explanation", "rating", "__index_level_0__"], }, @@ -215,9 +206,6 @@ def get_flow(self) -> list: "filter_value": 1.0, "operation": operator.eq, "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, }, "drop_columns": ["evaluation", "score", "num_samples"], }, @@ -245,9 +233,6 @@ def get_flow(self) -> list: "filter_value": 2.0, "operation": operator.ge, "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, }, "drop_columns": ["evaluation", "score"], }, @@ -299,9 +284,6 @@ def get_flow(self) -> list: "filter_value": 1.0, "operation": operator.eq, "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, }, "drop_columns": ["evaluation", "score", "num_samples"], }, @@ -329,9 +311,6 @@ def get_flow(self) -> list: "filter_value": 2.0, "operation": operator.ge, "convert_dtype": float, - "batch_kwargs": { - "num_procs": 8, - }, }, }, { @@ -340,10 +319,6 @@ def get_flow(self) -> list: "block_name": "combine_question_and_context", "columns": ["context", "question"], "output_col": "question", - "batch_kwargs": { - "num_procs": 8, - "batched": True, - }, }, }, ] diff --git a/src/instructlab/sdg/filterblock.py b/src/instructlab/sdg/filterblock.py index afb58b7b..5b820df5 100644 --- a/src/instructlab/sdg/filterblock.py +++ b/src/instructlab/sdg/filterblock.py @@ -51,7 +51,6 @@ def __init__( filter_value, operation, convert_dtype=None, - **batch_kwargs, ) -> None: """ Initializes a new instance of the FilterByValueBlock class. @@ -63,7 +62,6 @@ def __init__( - filter_value (any or list of any): The value(s) to filter by. - operation (callable): A function that takes two arguments (column value and filter value) and returns a boolean indicating whether the row should be included in the filtered dataset. - convert_dtype (callable, optional): A function to convert the data type of the filter column before applying the filter. Defaults to None. - - **batch_kwargs: Additional kwargs for batch processing. Returns: None @@ -73,14 +71,13 @@ def __init__( self.column_name = filter_column self.operation = operation self.convert_dtype = convert_dtype - self.num_procs = batch_kwargs.get("num_procs", 1) def generate(self, samples) -> Dataset: if self.convert_dtype: samples = _map_dtype( - samples, self.column_name, self.convert_dtype, self.num_procs + samples, self.column_name, self.convert_dtype, self.ctx.num_procs ) return _filter_by_values( - samples, self.column_name, self.operation, self.value, self.num_procs + samples, self.column_name, self.operation, self.value, self.ctx.num_procs ) diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index 93464601..a9db1970 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -20,6 +20,8 @@ def __init__( self.model_id = model_id self.num_instructions_to_generate = num_instructions_to_generate self.sdg_base = resources.files(__package__) + # FIXME: base this on the available number of CPUs + self.num_procs = 8 class Pipeline: diff --git a/src/instructlab/sdg/utilblocks.py b/src/instructlab/sdg/utilblocks.py index 871b2ce8..b4e39a5b 100644 --- a/src/instructlab/sdg/utilblocks.py +++ b/src/instructlab/sdg/utilblocks.py @@ -10,9 +10,7 @@ class SamplePopulatorBlock(Block): - def __init__( - self, ctx, block_name, config_paths, column_name, post_fix="", **batch_kwargs - ) -> None: + def __init__(self, ctx, block_name, config_paths, column_name, post_fix="") -> None: super().__init__(ctx, block_name) self.configs = {} for config in config_paths: @@ -23,7 +21,6 @@ def __init__( config_key = config.split("/")[-1].split(".")[0] self.configs[config_key] = self._load_config(config_name) self.column_name = column_name - self.num_procs = batch_kwargs.get("num_procs", 8) # Using a static method to avoid serializing self when using multiprocessing @staticmethod @@ -35,19 +32,16 @@ def populate(sample): def generate(self, samples) -> Dataset: return self._map_populate_samples( - samples, self.configs, self.column_name, self.num_procs + samples, self.configs, self.column_name, self.ctx.num_procs ) class SelectorBlock(Block): - def __init__( - self, ctx, block_name, choice_map, choice_col, output_col, **batch_kwargs - ) -> None: + def __init__(self, ctx, block_name, choice_map, choice_col, output_col) -> None: super().__init__(ctx, block_name) self.choice_map = choice_map self.choice_col = choice_col self.output_col = output_col - self.num_procs = batch_kwargs.get("num_procs", 8) # Using a static method to avoid serializing self when using multiprocessing @staticmethod @@ -64,19 +58,16 @@ def generate(self, samples: Dataset) -> Dataset: self.choice_map, self.choice_col, self.output_col, - self.num_procs, + self.ctx.num_procs, ) class CombineColumnsBlock(Block): - def __init__( - self, ctx, block_name, columns, output_col, separator="\n\n", **batch_kwargs - ) -> None: + def __init__(self, ctx, block_name, columns, output_col, separator="\n\n") -> None: super().__init__(ctx, block_name) self.columns = columns self.output_col = output_col self.separator = separator - self.num_procs = batch_kwargs.get("num_procs", 8) # Using a static method to avoid serializing self when using multiprocessing @staticmethod @@ -89,5 +80,5 @@ def combine(sample): def generate(self, samples: Dataset) -> Dataset: return self._map_combine( - samples, self.columns, self.output_col, self.separator, self.num_procs + samples, self.columns, self.output_col, self.separator, self.ctx.num_procs ) From b956643bce940a2943d2c4c674d4115096fcb67b Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Mon, 8 Jul 2024 11:19:48 +0100 Subject: [PATCH 08/10] Add `add_num_samples` to LLMBlock config Two pipelines include an LLMBlock which use `{num_samples}` in their instructions to the teacher model. There needs to be some way to configure the LLMBlock so that `num_samples` will be included, but as per #82 (commit a01b04e) the value of `num_samples` should be based on the `num_instructions_to_generate` parameter. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/default_flows.py | 8 ++------ src/instructlab/sdg/llmblock.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/instructlab/sdg/default_flows.py b/src/instructlab/sdg/default_flows.py index f7e0419e..056ac861 100644 --- a/src/instructlab/sdg/default_flows.py +++ b/src/instructlab/sdg/default_flows.py @@ -184,9 +184,7 @@ def get_flow(self) -> list: "block_name": "gen_questions", "config_path": "configs/skills/freeform_questions.yaml", "output_cols": ["question"], - "batch_kwargs": { - "num_samples": self.ctx.num_instructions_to_generate, - }, + "add_num_samples": True, }, "drop_duplicates": ["question"], }, @@ -262,9 +260,7 @@ def get_flow(self) -> list: "block_name": "gen_grounded_questions", "config_path": "configs/skills/grounded_questions.yaml", "output_cols": ["question"], - "batch_kwargs": { - "num_samples": 3, - }, + "add_num_samples": True, }, "drop_duplicates": ["question"], }, diff --git a/src/instructlab/sdg/llmblock.py b/src/instructlab/sdg/llmblock.py index eaa58556..4a32a708 100644 --- a/src/instructlab/sdg/llmblock.py +++ b/src/instructlab/sdg/llmblock.py @@ -59,6 +59,7 @@ def __init__( block_name, config_path, output_cols, + add_num_samples=False, parser_kwargs={}, **batch_kwargs, ) -> None: @@ -69,6 +70,7 @@ def __init__( ) self.prompt_template = self.prompt_struct.format(**self.block_config) self.model_prompt = _get_model_prompt(self.ctx.model_family) + self.add_num_samples = add_num_samples self.output_cols = output_cols self.batch_params = batch_kwargs.get("batch_kwargs", {}) self.parser_name = parser_kwargs.get("parser_name", None) @@ -156,11 +158,12 @@ def generate(self, samples: Dataset, **gen_kwargs) -> Dataset: :return: The parsed output after generation. """ - num_samples = self.batch_params.get("num_samples", None) logger.debug("Generating outputs for {} samples".format(len(samples))) - if (num_samples is not None) and ("num_samples" not in samples.column_names): - samples = samples.add_column("num_samples", [num_samples] * len(samples)) + if self.add_num_samples and ("num_samples" not in samples.column_names): + samples = samples.add_column( + "num_samples", [self.ctx.num_instructions_to_generate] * len(samples) + ) # validate each sample # Log errors and remove invalid samples @@ -211,6 +214,7 @@ def __init__( config_paths, output_cols, selector_column_name, + add_num_samples=False, parser_kwargs={}, **batch_kwargs, ) -> None: @@ -219,6 +223,7 @@ def __init__( block_name, config_paths[0][0], output_cols, + add_num_samples=add_num_samples, parser_kwargs=parser_kwargs, **batch_kwargs, ) From 18f1513897f6d31f0dd059e398305fa8792843bf Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 12 Jul 2024 01:10:14 +0100 Subject: [PATCH 09/10] Fix LLMBlock batch_kwargs constructor param It's hard to spot, but this: def __init__(self, ..., **batch_kwargs): ... self.batch_params = batch_kwargs.get("batch_kwargs", {}) is equivalent to this: def __init__(self, ..., **kwargs): ... self.batch_params = kwargs.get("batch_kwargs", {}) which is equivalent to this: def __init__(self, ..., batch_kwargs={}, **kwargs): ... self.batch_params = batch_kwargs except that trailing **kwargs meant we were silently accepting unknown block_config parameters. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/llmblock.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/instructlab/sdg/llmblock.py b/src/instructlab/sdg/llmblock.py index 4a32a708..fc794158 100644 --- a/src/instructlab/sdg/llmblock.py +++ b/src/instructlab/sdg/llmblock.py @@ -61,7 +61,7 @@ def __init__( output_cols, add_num_samples=False, parser_kwargs={}, - **batch_kwargs, + batch_kwargs={}, ) -> None: super().__init__(ctx, block_name) self.block_config = self._load_config(config_path) @@ -72,7 +72,7 @@ def __init__( self.model_prompt = _get_model_prompt(self.ctx.model_family) self.add_num_samples = add_num_samples self.output_cols = output_cols - self.batch_params = batch_kwargs.get("batch_kwargs", {}) + self.batch_params = batch_kwargs self.parser_name = parser_kwargs.get("parser_name", None) self.parsing_pattern = parser_kwargs.get("parsing_pattern", None) self.parser_cleanup_tags = parser_kwargs.get("parser_cleanup_tags", None) @@ -216,7 +216,7 @@ def __init__( selector_column_name, add_num_samples=False, parser_kwargs={}, - **batch_kwargs, + batch_kwargs={}, ) -> None: super().__init__( ctx, @@ -225,7 +225,7 @@ def __init__( output_cols, add_num_samples=add_num_samples, parser_kwargs=parser_kwargs, - **batch_kwargs, + batch_kwargs=batch_kwargs, ) self.selector_column_name = selector_column_name self.prompt_template = {} From 82aadd9f6582093ee2516728e47d8119a204e604 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 12 Jul 2024 01:13:32 +0100 Subject: [PATCH 10/10] Remove batch_kwargs This appears to be unused now - now pipeline definitions include it, and it's not used in LLMBlock anywhere. Signed-off-by: Mark McLoughlin --- src/instructlab/sdg/llmblock.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/instructlab/sdg/llmblock.py b/src/instructlab/sdg/llmblock.py index fc794158..83e88621 100644 --- a/src/instructlab/sdg/llmblock.py +++ b/src/instructlab/sdg/llmblock.py @@ -61,7 +61,6 @@ def __init__( output_cols, add_num_samples=False, parser_kwargs={}, - batch_kwargs={}, ) -> None: super().__init__(ctx, block_name) self.block_config = self._load_config(config_path) @@ -72,7 +71,6 @@ def __init__( self.model_prompt = _get_model_prompt(self.ctx.model_family) self.add_num_samples = add_num_samples self.output_cols = output_cols - self.batch_params = batch_kwargs self.parser_name = parser_kwargs.get("parser_name", None) self.parsing_pattern = parser_kwargs.get("parsing_pattern", None) self.parser_cleanup_tags = parser_kwargs.get("parser_cleanup_tags", None) @@ -216,7 +214,6 @@ def __init__( selector_column_name, add_num_samples=False, parser_kwargs={}, - batch_kwargs={}, ) -> None: super().__init__( ctx, @@ -225,7 +222,6 @@ def __init__( output_cols, add_num_samples=add_num_samples, parser_kwargs=parser_kwargs, - batch_kwargs=batch_kwargs, ) self.selector_column_name = selector_column_name self.prompt_template = {}