Skip to content

Commit

Permalink
Merge pull request #430 from khaledsulayman/ks-chunking-refactor
Browse files Browse the repository at this point in the history
Refactor Document Chunker to always use docling
  • Loading branch information
mergify[bot] authored Jan 15, 2025
2 parents f67e7d7 + e3a3e1e commit 13bf595
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 430 deletions.
3 changes: 1 addition & 2 deletions src/instructlab/sdg/generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,7 @@ def generate_data(
is_knowledge = False
leaf_node_path = leaf_node[0]["taxonomy_path"].replace("->", "_")
samples = leaf_node_to_samples(
leaf_node,
taxonomy,
leaf_node, # pylint: disable=duplicate-code
server_ctx_size,
chunk_word_count,
document_output_dir,
Expand Down
226 changes: 54 additions & 172 deletions src/instructlab/sdg/utils/chunkers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# Standard
from abc import ABC, abstractmethod
from collections import defaultdict
from enum import Enum
from pathlib import Path
from typing import DefaultDict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional
import json
import logging
import re
Expand All @@ -26,6 +24,7 @@

logger = logging.getLogger(__name__)
_DEFAULT_CHUNK_OVERLAP = 100
SUPPORTED_FILETYPES = [".pdf", ".md"]


def _num_tokens_from_words(num_words) -> int:
Expand Down Expand Up @@ -68,186 +67,55 @@ def resolve_ocr_options() -> OcrOptions:
return None


class FileTypes(Enum):
MD = ".md"
PDF = ".pdf"
def split_docs_by_filetype(document_paths: List[Path]) -> Dict[str, List[Path]]:
"""Split document paths into a dict of lists based on their file extension."""
document_dict = defaultdict(list)
for path in document_paths:
filetype = path.suffix
if filetype not in SUPPORTED_FILETYPES:
raise ValueError(f"Provided unsupported filetype {filetype}")

document_dict[filetype].append(path)

class ChunkerBase(ABC):
@abstractmethod
def chunk_documents(self):
pass


class DocumentChunker:
"""A factory chunker class that instantiates the applicable chunker
Currently, only Markdown and PDF are supported. For Markdown, returns
TextSplitChunker, and for PDF, returns ContextAwareChunker"""

def __new__(
cls,
leaf_node,
taxonomy_path,
output_dir: Path,
server_ctx_size=4096,
chunk_word_count=1024,
tokenizer_model_name: Optional[str] = None,
docling_model_path: Optional[str] = None,
):
"""Insantiate the appropriate chunker for the provided document
Args:
leaf_node: a leaf node dict containing "documents",
"filepaths", and "taxonomy_path" keys
output_dir (Path): directory where artifacts should be stored
server_ctx_size (int): Context window size of server
chunk_word_count (int): Maximum number of words to chunk a document
tokenizer_model_name (Optional[str]): name of huggingface model to get
tokenizer from
Returns:
TextSplitChunker | ContextAwareChunker: Object of the appropriate
chunker class for the provided filetype
"""
documents = leaf_node[0]["documents"]

if not isinstance(taxonomy_path, Path):
taxonomy_path = Path(taxonomy_path)
return dict(document_dict)

if isinstance(documents, str):
documents = [documents]
logger.info(
"Converted single string into a list of string. Assumed the string passed in is the document. Normally, chunk_document() should take a list as input."
)
elif not isinstance(documents, list):
raise TypeError(
"Expected: documents to be a list, but got {}".format(type(documents))
)

filepaths = leaf_node[0]["filepaths"]

doc_dict = cls._split_docs_by_filetype(documents, filepaths)
if len(doc_dict.keys()) > 1:
raise ValueError("Received multiple document types")
if len(doc_dict.keys()) < 1:
raise ValueError("Received no document types")

if FileTypes.MD in doc_dict:
doc_contents = [d for d, _ in doc_dict[FileTypes.MD]]
return TextSplitChunker(
doc_contents,
server_ctx_size,
chunk_word_count,
output_dir,
)

if FileTypes.PDF in doc_dict:
doc_paths = [p for _, p in doc_dict[FileTypes.PDF]]
return ContextAwareChunker(
doc_paths,
filepaths,
output_dir,
chunk_word_count,
tokenizer_model_name,
docling_model_path=docling_model_path,
)

@staticmethod
def _split_docs_by_filetype(
documents: List[str], filepaths: List[Path]
) -> DefaultDict[FileTypes, List[Tuple[str, Path]]]:
"""Separate documents into lists based on their filetype.
Currently, only Markdown and PDF are supported.
Args:
documents (List[str]): A list of the document contents as strings
filepaths (List[Path]): Corresponding document filepaths
Returns:
DefaultDict: Dictionary with either ".md" or ".pdf" as a key.
Markdown items contain document contents, PDF items contain
paths to documents.
"""
doc_dict = defaultdict(list)
for doc, path in zip(documents, filepaths):
if path.suffix == ".md":
# append doc contents
doc_dict[FileTypes.MD].append((doc, path))
elif path.suffix == ".pdf":
# append doc paths
doc_dict[FileTypes.PDF].append((doc, path))
else:
raise ValueError(
f"Received document of type .{path.suffix}, which is not a supported filetype"
)
return doc_dict


class TextSplitChunker(ChunkerBase):
class DocumentChunker: # pylint: disable=too-many-instance-attributes
def __init__(
self,
document_contents: List | str,
server_ctx_size: int,
chunk_word_count: int,
document_paths: List[Path],
output_dir: Path,
tokenizer_model_name: str | Path,
docling_model_path: Optional[Path] = None,
server_ctx_size: int = 4096,
chunk_word_count: int = 1024,
):
self.document_contents = document_contents
self.server_ctx_size = server_ctx_size
self.chunk_word_count = chunk_word_count
self.output_dir = output_dir
if not document_paths:
raise ValueError("Provided empty list of documents")

def chunk_documents(self) -> List:
"""Naively chunk markdown documents based on the word count provided by the user.
Returns:
List[str]: List of chunked documents.
"""
num_tokens_per_doc = _num_tokens_from_words(self.chunk_word_count)
if num_tokens_per_doc > int(self.server_ctx_size - 1024):
raise ValueError(
"Error: {}".format(
str(
f"Given word count ({self.chunk_word_count}) per doc will exceed the server context window size ({self.server_ctx_size})"
)
)
)
if self.document_contents == []:
return []
document_dict = split_docs_by_filetype(document_paths)

chunk_size = _num_chars_from_tokens(num_tokens_per_doc)
return chunk_markdowns(self.document_contents, chunk_size)
if len(document_dict) > 1:
raise ValueError("Provided multiple document types")

# We know there is only 1 key, value pair, so we take the first
self.document_filetype, self.document_paths = next(iter(document_dict.items()))
self.docling_model_path = docling_model_path
self.converter = self._init_docling_converter()

class ContextAwareChunker(ChunkerBase): # pylint: disable=too-many-instance-attributes
def __init__(
self,
document_paths,
filepaths,
output_dir: Path,
chunk_word_count: int,
tokenizer_model_name: Optional[str],
docling_model_path=None,
):
self.document_paths = document_paths
self.filepaths = filepaths
self.output_dir = self._path_validator(output_dir)
self.server_ctx_size = server_ctx_size
self.chunk_word_count = chunk_word_count
self.tokenizer = self.create_tokenizer(tokenizer_model_name)
self.docling_model_path = docling_model_path

def chunk_documents(self) -> List:
"""Semantically chunk PDF documents.

Returns:
List: a list of chunks from the documents
"""
def _init_docling_converter(self):
"""Initialize docling converter with filetype-specific configurations"""
# triggers torch loading, import lazily
# pylint: disable=import-outside-toplevel
# Third Party
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline

if self.document_paths == []:
return []

if self.docling_model_path is None:
logger.info("Docling models not found on disk, downloading models...")
self.docling_model_path = StandardPdfPipeline.download_models_hf()
Expand All @@ -258,17 +126,29 @@ def chunk_documents(self) -> List:
artifacts_path=self.docling_model_path,
do_ocr=False,
)

ocr_options = resolve_ocr_options()
if ocr_options is not None:
pipeline_options.do_ocr = True
pipeline_options.ocr_options = ocr_options

converter = DocumentConverter(
return DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
}
)
parsed_documents = converter.convert_all(self.filepaths)

def chunk_documents(self) -> List:
"""Split a list of documents into chunks
Returns:
List: a list of chunks from the documents
"""

if self.document_paths == []:
return []

parsed_documents = self.converter.convert_all(self.document_paths)

docling_artifacts_path = self.export_documents(parsed_documents)

Expand Down Expand Up @@ -348,7 +228,7 @@ def fuse_texts(
return fused_texts

@staticmethod
def create_tokenizer(model_name: Optional[str]):
def create_tokenizer(model_path: str | Path):
"""
Create a tokenizer instance from a pre-trained model or a local directory.
Expand All @@ -363,10 +243,8 @@ def create_tokenizer(model_name: Optional[str]):
# Third Party
from transformers import AutoTokenizer

if model_name is None:
raise TypeError("No model path provided")

model_path = Path(model_name)
if not isinstance(model_path, Path):
model_path = Path(model_path)
error_info_message = (
"Please run `ilab model download {download_args}` and try again"
)
Expand Down Expand Up @@ -486,7 +364,7 @@ def get_table_page_number(self, json_book, idx):
prev_page_num = book_element["prov"][0]["page"]
break
for book_element in json_book["main-text"][idx:]:
if "prov" in book_element:
if "prov" in book_element and book_element["prov"]:
next_page_num = book_element["prov"][0]["page"]
break
if prev_page_num is not None and next_page_num is not None:
Expand Down Expand Up @@ -545,8 +423,14 @@ def build_chunks_from_docling_json(
current_book_page_number = self.get_table_page_number(
json_book, idx
)
book_text = self.get_table(json_book, book_element["$ref"])
elif book_element["prov"]:
current_book_page_number = book_element["prov"][0][
"page"
] # TODO export to function to handle empty ["prov"]
book_text = book_element["text"]
else:
current_book_page_number = book_element["prov"][0]["page"]
current_book_page_number = None
book_text = book_element["text"]

if book_element["type"] == "subtitle-level-1":
Expand Down Expand Up @@ -599,8 +483,6 @@ def build_chunks_from_docling_json(

if book_element["type"] == "paragraph":
book_text = self.add_heading_formatting(book_text)
elif book_element["type"] == "table":
book_text = self.get_table(json_book, book_element["$ref"])
if "## References" in book_text or "## Acknowledgements" in book_text:
# For research papers we ignore everything after this sections
break
Expand Down
Loading

0 comments on commit 13bf595

Please sign in to comment.