Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch split task to token based splitting #283

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
51 changes: 12 additions & 39 deletions client/src/nv_ingest_client/primitives/tasks/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,8 @@


class SplitTaskSchema(BaseModel):
split_by: Optional[str] = "sentence"
split_length: Optional[int] = 10
split_overlap: Optional[int] = 0
max_character_length: Optional[int] = 1024
sentence_window_size: Optional[int] = 0

@validator("split_by")
def split_by_must_be_valid(cls, v):
valid_criteria = ["page", "size", "word", "sentence"]
if v not in valid_criteria:
raise ValueError(f"split_by must be one of {valid_criteria}")
return v
split_by: str = "intfloat/e5-large-unsupervised"
chunk_size: int = 300

class Config:
extra = "forbid"
Expand All @@ -42,37 +32,26 @@ class SplitTask(Task):
Object for document splitting task
"""

_TypeSplitBy = Literal["word", "sentence", "passage"]

def __init__(
self,
split_by: _TypeSplitBy = None,
split_length: int = None,
split_overlap: int = None,
max_character_length: int = None,
sentence_window_size: int = None,
tokenizer: str = None,
chunk_size: int = None,
) -> None:
"""
Setup Split Task Config
"""
super().__init__()
self._split_by = split_by
self._split_length = split_length
self._split_overlap = split_overlap
self._max_character_length = max_character_length
self._sentence_window_size = sentence_window_size
self._tokenizer = tokenizer
self._chunk_size = chunk_size

def __str__(self) -> str:
"""
Returns a string with the object's config and run time state
"""
info = ""
info += "Split Task:\n"
info += f" split_by: {self._split_by}\n"
info += f" split_length: {self._split_length}\n"
info += f" split_overlap: {self._split_overlap}\n"
info += f" split_max_character_length: {self._max_character_length}\n"
info += f" split_sentence_window_size: {self._sentence_window_size}\n"
info += f" tokenizer: {self._tokenizer}\n"
info += f" chunk_size: {self._chunk_size}\n"
return info

def to_dict(self) -> Dict:
Expand All @@ -81,15 +60,9 @@ def to_dict(self) -> Dict:
"""
split_params = {}

if self._split_by is not None:
split_params["split_by"] = self._split_by
if self._split_length is not None:
split_params["split_length"] = self._split_length
if self._split_overlap is not None:
split_params["split_overlap"] = self._split_overlap
if self._max_character_length is not None:
split_params["max_character_length"] = self._max_character_length
if self._sentence_window_size is not None:
split_params["sentence_window_size"] = self._sentence_window_size
if self._tokenizer is not None:
split_params["tokenizer"] = self._tokenizer
if self._chunk_size is not None:
split_params["chunk_size"] = self._chunk_size

return {"type": "split", "task_properties": split_params}
119 changes: 34 additions & 85 deletions src/nv_ingest/modules/transforms/nemo_doc_splitter.py
ChrisJar marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0


import os
import copy
import logging
import traceback
Expand All @@ -13,6 +14,7 @@

import mrc
import pandas as pd
from transformers import AutoTokenizer
from more_itertools import windowed
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
Expand All @@ -33,24 +35,16 @@
logger = logging.getLogger(__name__)


def _build_split_documents(row, text_splits: List[str], sentence_window_size: int) -> List[dict[str, Any]]:
"""Build documents from text splits with window text."""
def _build_split_documents(row, chunks: List[str]) -> List[dict[str, Any]]:
"""Build documents from text chunks"""
documents: List[dict] = []

window_size = sentence_window_size
for i, text in enumerate(text_splits):
for i, text in enumerate(chunks):
if text is None or not text.strip():
continue

metadata = row.metadata if hasattr(row, "metadata") and isinstance(row.metadata, dict) else {}
metadata = copy.deepcopy(metadata)
if window_size > 0:
window_text = "".join(
text_splits[max(0, i - window_size) : min(i + 1 + window_size, len(text_splits))] # noqa: E203
)

metadata["window"] = window_text
metadata["original_text"] = text

metadata["content"] = text

Expand All @@ -59,70 +53,33 @@ def _build_split_documents(row, text_splits: List[str], sentence_window_size: in
return documents


def _split_into_units(text: str, split_by: Literal["word", "sentence", "passage"]) -> List[str]:
if split_by == "passage":
split_at = "\n\n"
elif split_by == "sentence":
split_at = "." # why not ?,!, etc..?
elif split_by == "word":
split_at = " "
else:
raise NotImplementedError("DocumentSplitter only supports 'passage', 'sentence'" " or 'word' split_by options.")
units = text.split(split_at)
# Add the delimiter back to all units except the last one
for i in range(len(units) - 1):
units[i] += split_at
def _split_into_chunks(text, tokenizer, chunk_size=300):
# Tokenize the text into token IDs
encoding = tokenizer.encode_plus(text, add_special_tokens=False, return_offsets_mapping=True)

return units
# Get the token IDs and offsets for splitting
tokens = encoding['input_ids']
offsets = encoding['offset_mapping']

# Split the tokens into chunks of the desired size
chunks = [tokens[i:i + chunk_size] for i in range(0, len(tokens), chunk_size)]

def _concatenate_units(units: List[str], split_length: int, split_overlap: int, max_character_length: int) -> List[str]:
text_splits = []
segments = windowed(units, n=split_length, step=split_length - split_overlap)
for seg in segments:
current_units = [unit for unit in seg if unit is not None]
txt = "".join(current_units)
if max_character_length and len(txt) > max_character_length:
text_splits.extend(_split_long_text(txt, max_character_length))
elif len(txt) > 0:
text_splits.append(txt)
# Convert token chunks back to text while preserving original spacing and case
text_chunks = []
for chunk in chunks:
# Find the start and end offsets for the current chunk
chunk_offsets = offsets[:len(chunk)]
start_offset = chunk_offsets[0][0]
end_offset = chunk_offsets[-1][1]

return text_splits
# Extract the original text for this chunk based on offsets
text_chunk = text[start_offset:end_offset]
text_chunks.append(text_chunk)

# Remove processed offsets for the next iteration
offsets = offsets[len(chunk):]

def _split_long_text(text: str, max_character_length: int) -> List[str]:
"""
Splits a long text into smaller segments that
do not exceed max_character_length.
"""
split_texts = []
while text:
# Take the maximum possible substring without exceeding max_character_length
segment = text[:max_character_length]
split_texts.append(segment)
text = text[max_character_length:] # noqa: E203

return split_texts


def _process_content(row, validated_config):
content = row["metadata"]["content"]

if content is None:
raise ValueError(
"DocumentSplitter only works with text documents but one or more 'content' " "values are None."
)

units = _split_into_units(content, validated_config.split_by)
text_splits = _concatenate_units(
units,
validated_config.split_length,
validated_config.split_overlap,
max_character_length=validated_config.max_character_length,
)
split_docs = _build_split_documents(row, text_splits, sentence_window_size=validated_config.sentence_window_size)

return split_docs
return text_chunks


MODULE_NAME = "nemo_document_splitter"
Expand Down Expand Up @@ -167,16 +124,11 @@ def split_and_forward(message: ControlMessage):
return message

# Override parameters if set
split_by = task_props.get("split_by", validated_config.split_by)
split_length = task_props.get("split_length", validated_config.split_length)
split_overlap = task_props.get("split_overlap", validated_config.split_overlap)
max_character_length = task_props.get("max_character_length", validated_config.max_character_length)
sentence_window_size = task_props.get("sentence_window_size", validated_config.sentence_window_size)
tokenizer = task_props.get("tokenizer", validated_config.tokenizer)
chunk_size = task_props.get("chunk_size", validated_config.chunk_size)

logger.info(
ChrisJar marked this conversation as resolved.
Show resolved Hide resolved
f"Splitting documents with split_by: {split_by}, split_length: {split_length}, "
f"split_overlap: {split_overlap}, max_character_length: {max_character_length}, "
f"sentence_window_size: {sentence_window_size}"
f"Splitting documents with tokenizer: {tokenizer}, chunk_size: {chunk_size} tokens"
)

split_docs = []
Expand All @@ -188,14 +140,11 @@ def split_and_forward(message: ControlMessage):
"DocumentSplitter only works with text documents but one or more " "'content' values are None."
)

units = _split_into_units(content, split_by)
text_splits = _concatenate_units(
units,
split_length,
split_overlap,
max_character_length=max_character_length,
)
split_docs.extend(_build_split_documents(row, text_splits, sentence_window_size=sentence_window_size))
os.environ['TOKENIZERS_PARALLELISM'] = "False"
ChrisJar marked this conversation as resolved.
Show resolved Hide resolved
tokenizer_model = AutoTokenizer.from_pretrained(tokenizer)
drobison00 marked this conversation as resolved.
Show resolved Hide resolved

chunks = _split_into_chunks(content, tokenizer_model, chunk_size)
split_docs.extend(_build_split_documents(row, chunks))

split_docs_df = pd.DataFrame(split_docs)

Expand Down
13 changes: 2 additions & 11 deletions src/nv_ingest/schemas/ingest_job_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,8 @@ class TracingOptionsSchema(BaseModelNoExt):


class IngestTaskSplitSchema(BaseModelNoExt):
split_by: Literal["word", "sentence", "passage"]
split_length: conint(gt=0)
split_overlap: conint(ge=0)
max_character_length: Optional[conint(gt=0)]
sentence_window_size: Optional[conint(ge=0)]

@validator("sentence_window_size")
def check_sentence_window_size(cls, v, values, **kwargs):
if v is not None and v > 0 and values["split_by"] != "sentence":
raise ValueError("When using sentence_window_size, split_by must be 'sentence'.")
return v
tokenizer: str
chunk_size: conint(gt=0)


class IngestTaskExtractSchema(BaseModelNoExt):
Expand Down
17 changes: 2 additions & 15 deletions src/nv_ingest/schemas/nemo_doc_splitter_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,11 @@
# SPDX-License-Identifier: Apache-2.0


from typing import Literal
from typing import Optional

from pydantic import BaseModel
from pydantic import conint
from pydantic import validator


class DocumentSplitterSchema(BaseModel):
split_by: Literal["word", "sentence", "passage"] = "word"
split_length: conint(gt=0) = 60
split_overlap: conint(ge=0) = 10
max_character_length: Optional[conint(gt=0)] = 450
sentence_window_size: Optional[conint(ge=0)] = 0
tokenizer: str = "intfloat/e5-large-unsupervised"
chunk_size: conint(gt=0) = 300
raise_on_failure: bool = False

@validator("sentence_window_size")
def check_sentence_window_size(cls, v, values, **kwargs):
if v is not None and v > 0 and values["split_by"] != "sentence":
raise ValueError("When using sentence_window_size, split_by must be 'sentence'.")
return v
Loading