Skip to content

Commit

Permalink
datamixing: include auxiliary instructions in pipeline config
Browse files Browse the repository at this point in the history
We have been trying to figure out the best way to discover this file
on disk, but consider a case where there is the instructions for the
full pipeline installed with the Python package, and two custom
pipelines installed in /usr/share/instructlab/sdg/pipelines, each
with their own set of instructions. At the very least, the location
of the instructions needs to be included in the pipeline config.

However, the instructions are clearly tightly coupled with the pipeline
config, so it makes sense to do e.g.

version: "1.0"
blocks:
...
  - name: flatten_auxiliary_columns
    type: FlattenColumnsBlock
    config:
      var_cols:
        - spellcheck
        - base_document
      value_name: corrected_document
      var_name: dataset_type
...
datamixing:
  auxiliary_instructions:
    spellcheck:
      - Correct any spelling errors in the document and output the corrected version.
      - Rewrite the document to remove any spelling errors.

Signed-off-by: Mark McLoughlin <[email protected]>
  • Loading branch information
markmc committed Jul 29, 2024
1 parent 477e88d commit 658d7f3
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 51 deletions.
54 changes: 28 additions & 26 deletions src/instructlab/sdg/datamixing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Standard
from importlib import resources
from typing import Optional
from typing import Dict, List, Optional
import json
import logging
import os.path
Expand Down Expand Up @@ -375,29 +374,15 @@ def _conv_pretrain(rec):
return rec


def _load_auxiliary_instructions():
"""
Load the auxiliary instructions yaml from disk, returning the loaded
object if found or None if the instructions yaml file does not exist.
"""
auxiliary_path = resources.files(__package__).joinpath(
"instructions/auxiliary_knowledge.yaml"
)
if os.path.isfile(auxiliary_path):
with open(auxiliary_path, "r", encoding="utf-8") as fp:
return yaml.safe_load(fp)
else:
return None


def _create_auxiliary_dataset(generated_dataset: Dataset):
def _create_auxiliary_dataset(
generated_dataset: Dataset, auxiliary_inst: Optional[Dict[str, List[str]]]
):
# Samples that went through the auxiliary generation pipeline will
# have a dataset_type column created by that pipeline. If that's
# not present, then we may be running in a pipeline without any
# auxiliary dataset generation enabled.
if "dataset_type" not in generated_dataset.column_names:
return None
auxiliary_inst = _load_auxiliary_instructions()
# If we didn't find any auxiliary instructions to load, then
# that's also another sign that we're not running with any
# auxiliary datasets enabled.
Expand Down Expand Up @@ -450,7 +435,9 @@ def __create_auxiliary_ds(rec):
return unique_document_auxiliary


def _create_phase10_ds(generated_dataset: Dataset):
def _create_phase10_ds(
generated_dataset: Dataset, auxiliary_inst: Optional[Dict[str, List[str]]]
):
"""
Create a dataset for Phase 1.0 of downstream training.
Expand All @@ -463,15 +450,17 @@ def _create_phase10_ds(generated_dataset: Dataset):
)
knowledge_ds = _add_extra_contexts_to_samples(knowledge_ds, p=0.4)

auxiliary_dataset = _create_auxiliary_dataset(generated_dataset)
auxiliary_dataset = _create_auxiliary_dataset(generated_dataset, auxiliary_inst)
if auxiliary_dataset is not None:
phase10 = concatenate_datasets([knowledge_ds, auxiliary_dataset])
else:
phase10 = knowledge_ds
return phase10


def _create_phase07_ds(generated_dataset: Dataset):
def _create_phase07_ds(
generated_dataset: Dataset, auxiliary_inst: Optional[Dict[str, List[str]]]
):
"""
Create a dataset for Phase 0.7 of downstream training.
Expand All @@ -485,7 +474,7 @@ def _create_phase07_ds(generated_dataset: Dataset):
)
knowledge_ds = knowledge_ds.map(_conv_pretrain)

auxiliary_dataset = _create_auxiliary_dataset(generated_dataset)
auxiliary_dataset = _create_auxiliary_dataset(generated_dataset, auxiliary_inst)
if auxiliary_dataset is not None:
auxiliary_dataset = auxiliary_dataset.map(_conv_pretrain)
phase07 = concatenate_datasets([knowledge_ds, auxiliary_dataset])
Expand Down Expand Up @@ -527,12 +516,21 @@ class DataMixer:
# once.
NUM_SYNTH_SKILLS = 30

def __init__(self, data_dirs, output_dir, date_suffix, sys_prompt, num_procs):
def __init__(
self,
data_dirs,
output_dir,
date_suffix,
sys_prompt,
num_procs,
auxiliary_inst=None,
):
self.data_dirs = data_dirs
self.output_dir = output_dir
self.sys_prompt = sys_prompt
self.date_suffix = date_suffix
self.num_procs = num_procs
self.auxiliary_inst = auxiliary_inst

self.knowledge_recipe = self._load_default_recipe("knowledge.yaml")
self.skills_recipe = self._load_default_recipe("skills.yaml")
Expand Down Expand Up @@ -569,7 +567,9 @@ def _gen_leaf_node_data(

def collect(self, leaf_node_path, new_generated_data, is_knowledge):
if is_knowledge:
knowledge_phase_data = _create_phase07_ds(new_generated_data)
knowledge_phase_data = _create_phase07_ds(
new_generated_data, self.auxiliary_inst
)
output_file_leaf_knowledge = (
f"node_datasets_{self.date_suffix}/{leaf_node_path}_p07.jsonl"
)
Expand All @@ -579,7 +579,9 @@ def collect(self, leaf_node_path, new_generated_data, is_knowledge):
output_file_leaf_knowledge,
)

skills_phase_data = _create_phase10_ds(new_generated_data)
skills_phase_data = _create_phase10_ds(
new_generated_data, self.auxiliary_inst
)
output_file_leaf_skills = (
f"node_datasets_{self.date_suffix}/{leaf_node_path}_p10.jsonl"
)
Expand Down
5 changes: 3 additions & 2 deletions src/instructlab/sdg/generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def load_pipeline(yaml_basename):
)


def _mixer_init(ctx, output_dir, date_suffix):
def _mixer_init(ctx, output_dir, date_suffix, knowledge_auxiliary_inst):
pd = platformdirs.PlatformDirs(
appname=os.path.join("instructlab", "sdg"), multipath=True
)
Expand All @@ -258,6 +258,7 @@ def _mixer_init(ctx, output_dir, date_suffix):
date_suffix,
_SYS_PROMPT,
ctx.dataset_num_procs,
knowledge_auxiliary_inst,
)


Expand Down Expand Up @@ -367,7 +368,7 @@ def generate_data(
mmlu_ctx = dataclasses.replace(ctx, checkpoint_dir=None)
mmlu_bench_pipe = mmlubench_pipe_init(mmlu_ctx)

mixer = _mixer_init(ctx, output_dir, date_suffix)
mixer = _mixer_init(ctx, output_dir, date_suffix, sdg_knowledge.auxiliary_inst)

if console_output:
logger.info(
Expand Down
Empty file.
3 changes: 0 additions & 3 deletions src/instructlab/sdg/instructions/auxiliary_knowledge.yaml

This file was deleted.

13 changes: 10 additions & 3 deletions src/instructlab/sdg/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from importlib import resources
from typing import Iterable, Optional
from typing import Dict, Iterable, List, Optional
import logging
import math
import os.path
Expand Down Expand Up @@ -109,6 +109,7 @@ def __init__(
ctx: PipelineContext,
config_path: str,
chained_blocks: list[dict],
auxiliary_inst: Optional[Dict[str, List[str]]] = None,
) -> None:
"""
Initialize the Pipeline class with a configuration dictionary.
Expand All @@ -120,12 +121,14 @@ def __init__(
self.config_path = config_path
# pipeline config is the run configuration that consists of the pipeline steps
self.chained_blocks = chained_blocks
# datamixing instructions for auxiliary data generated by this pipeline
self.auxiliary_inst = auxiliary_inst

@classmethod
def from_file(cls, ctx, pipeline_yaml):
if not os.path.isabs(pipeline_yaml):
pipeline_yaml = os.path.join(resources.files(__package__), pipeline_yaml)
return cls(ctx, pipeline_yaml, _parse_pipeline_config_file(pipeline_yaml))
return cls(ctx, pipeline_yaml, *_parse_pipeline_config_file(pipeline_yaml))

def generate(self, dataset) -> Dataset:
"""
Expand Down Expand Up @@ -296,7 +299,11 @@ def _parse_pipeline_config_file(pipeline_yaml):
"The pipeline config file contains no 'blocks' section"
)

return content["blocks"]
auxiliary_inst = None
if "datamixing" in content and "auxiliary_instructions" in content["datamixing"]:
auxiliary_inst = content["datamixing"]["auxiliary_instructions"]

return content["blocks"], auxiliary_inst


# This is part of the public API.
Expand Down
6 changes: 6 additions & 0 deletions src/instructlab/sdg/pipelines/full/knowledge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,9 @@ blocks:
- explanation
- rating
- __index_level_0__

datamixing:
auxiliary_instructions:
spellcheck:
- Correct any spelling errors in the document and output the corrected version.
- Rewrite the document to remove any spelling errors.
17 changes: 17 additions & 0 deletions src/instructlab/sdg/pipelines/schema/v1.json
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,23 @@
}
}
}
},
"datamixing": {
"type": "object",
"additionalProperties": false,
"properties": {
"auxiliary_instructions": {
"type": "object",
"patternProperties": {
".*": {
"type": "array",
"items": {
"type": "string"
}
}
}
}
}
}
}
}
18 changes: 1 addition & 17 deletions tests/test_datamixing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@
from datasets import Dataset

# First Party
from instructlab.sdg.datamixing import (
DataMixer,
Recipe,
_add_extra_contexts_to_samples,
_load_auxiliary_instructions,
)
from instructlab.sdg.datamixing import DataMixer, Recipe, _add_extra_contexts_to_samples

# We mock out the actual things that use num_procs anyway, but just
# for a consistent value in the tests...
Expand Down Expand Up @@ -168,14 +163,3 @@ def test_add_extra_contexts_to_samples_with_six_samples():
)
dataset = _add_extra_contexts_to_samples(samples, p=0.4)
assert len(dataset) == 6


def test_load_auxiliary_instructions_finds_yaml_file():
"""
Test that the _load_auxiliary_instructions function actually
finds its yaml file and loads it. There have been a few iterations
of bugs causing the file to not get loaded and the resulting
auxiliary logic skipped.
"""
auxiliary_inst = _load_auxiliary_instructions()
assert auxiliary_inst is not None

0 comments on commit 658d7f3

Please sign in to comment.