Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix hybrid chunker token constraint #131

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 79 additions & 106 deletions docling_core/transforms/chunker/hybrid_chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ def _patch_tokenizer_and_max_tokens(self) -> Self:
)
return self

def _count_tokens(self, text: Optional[Union[str, list[str]]]):
def _count_text_tokens(self, text: Optional[Union[str, list[str]]]):
if text is None:
return 0
elif isinstance(text, list):
total = 0
for t in text:
total += self._count_tokens(t)
total += self._count_text_tokens(t)
return total
return len(self._tokenizer.tokenize(text, max_length=None))

Expand All @@ -80,102 +80,83 @@ class _ChunkLengthInfo(BaseModel):
text_len: int
other_len: int

def _count_chunk_tokens(self, doc_chunk: DocChunk):
ser_txt = self.serialize(chunk=doc_chunk)
return len(self._tokenizer.tokenize(text=ser_txt, max_length=None))

def _doc_chunk_length(self, doc_chunk: DocChunk):
text_length = self._count_tokens(doc_chunk.text)
headings_length = self._count_tokens(doc_chunk.meta.headings)
captions_length = self._count_tokens(doc_chunk.meta.captions)
total = text_length + headings_length + captions_length
text_length = self._count_text_tokens(doc_chunk.text)
total = self._count_chunk_tokens(doc_chunk=doc_chunk)
return self._ChunkLengthInfo(
total_len=total,
text_len=text_length,
other_len=total - text_length,
)

def _make_chunk_from_doc_items(
self, doc_chunk: DocChunk, window_text: str, window_start: int, window_end: int
self, doc_chunk: DocChunk, window_start: int, window_end: int
):
doc_items = doc_chunk.meta.doc_items[window_start : window_end + 1]
meta = DocMeta(
doc_items=doc_chunk.meta.doc_items[window_start : window_end + 1],
doc_items=doc_items,
headings=doc_chunk.meta.headings,
captions=doc_chunk.meta.captions,
origin=doc_chunk.meta.origin,
)
window_text = (
doc_chunk.text
if len(doc_chunk.meta.doc_items) == 1
else self.delim.join(
[
doc_item.text
for doc_item in doc_items
if isinstance(doc_item, TextItem)
]
)
)
new_chunk = DocChunk(text=window_text, meta=meta)
return new_chunk

def _merge_text(self, t1, t2):
if t1 == "":
return t2
elif t2 == "":
return t1
else:
return f"{t1}{self.delim}{t2}"

def _split_by_doc_items(self, doc_chunk: DocChunk) -> list[DocChunk]:
if doc_chunk.meta.doc_items is None or len(doc_chunk.meta.doc_items) <= 1:
return [doc_chunk]
length = self._doc_chunk_length(doc_chunk)
if length.total_len <= self.max_tokens:
return [doc_chunk]
else:
chunks = []
window_start = 0
window_end = 0
window_text = ""
window_text_length = 0
other_length = length.other_len
num_items = len(doc_chunk.meta.doc_items)
while window_end < num_items:
doc_item = doc_chunk.meta.doc_items[window_end]
if isinstance(doc_item, TextItem):
text = doc_item.text
else:
raise RuntimeError("Non-TextItem split not implemented yet")
text_length = self._count_tokens(text)
if (
text_length + window_text_length + other_length < self.max_tokens
and window_end < num_items - 1
):
chunks = []
window_start = 0
window_end = 0 # an inclusive index
num_items = len(doc_chunk.meta.doc_items)
while window_end < num_items:
new_chunk = self._make_chunk_from_doc_items(
doc_chunk=doc_chunk,
window_start=window_start,
window_end=window_end,
)
if self._count_chunk_tokens(doc_chunk=new_chunk) <= self.max_tokens:
if window_end < num_items - 1:
window_end += 1
# Still room left to add more to this chunk AND still at least one
# item left
window_end += 1
window_text_length += text_length
window_text = self._merge_text(window_text, text)
elif text_length + window_text_length + other_length < self.max_tokens:
continue
else:
# All the items in the window fit into the chunk and there are no
# other items left
window_text = self._merge_text(window_text, text)
new_chunk = self._make_chunk_from_doc_items(
doc_chunk, window_text, window_start, window_end
)
chunks.append(new_chunk)
window_end = num_items
elif window_start == window_end:
# Only one item in the window and it doesn't fit into the chunk. So
# we'll just make it a chunk for now and it will get split in the
# plain text splitter.
window_text = self._merge_text(window_text, text)
new_chunk = self._make_chunk_from_doc_items(
doc_chunk, window_text, window_start, window_end
)
chunks.append(new_chunk)
window_start = window_end + 1
window_end = window_start
window_text = ""
window_text_length = 0
else:
# Multiple items in the window but they don't fit into the chunk.
# However, the existing items must have fit or we wouldn't have
# gotten here. So we put everything but the last item into the chunk
# and then start a new window INCLUDING the current window end.
new_chunk = self._make_chunk_from_doc_items(
doc_chunk, window_text, window_start, window_end - 1
)
chunks.append(new_chunk)
window_start = window_end
window_text = ""
window_text_length = 0
return chunks
window_end = num_items # signalizing the last loop
elif window_start == window_end:
# Only one item in the window and it doesn't fit into the chunk. So
# we'll just make it a chunk for now and it will get split in the
# plain text splitter.
window_end += 1
window_start = window_end
else:
# Multiple items in the window but they don't fit into the chunk.
# However, the existing items must have fit or we wouldn't have
# gotten here. So we put everything but the last item into the chunk
# and then start a new window INCLUDING the current window end.
new_chunk = self._make_chunk_from_doc_items(
doc_chunk=doc_chunk,
window_start=window_start,
window_end=window_end - 1,
)
window_start = window_end
chunks.append(new_chunk)
return chunks

def _split_using_plain_text(
self,
Expand Down Expand Up @@ -204,53 +185,45 @@ def _split_using_plain_text(
def _merge_chunks_with_matching_metadata(self, chunks: list[DocChunk]):
output_chunks = []
window_start = 0
window_end = 0
window_end = 0 # an inclusive index
num_chunks = len(chunks)
while window_end < num_chunks:
chunk = chunks[window_end]
lengths = self._doc_chunk_length(chunk)
headings_and_captions = (chunk.meta.headings, chunk.meta.captions)
ready_to_append = False
if window_start == window_end:
# starting a new block of chunks to potentially merge
current_headings_and_captions = headings_and_captions
window_text = chunk.text
window_other_length = lengths.other_len
window_text_length = lengths.text_len
window_items = chunk.meta.doc_items
window_end += 1
first_chunk_of_window = chunk
elif (
headings_and_captions == current_headings_and_captions
and window_text_length + window_other_length + lengths.text_len
<= self.max_tokens
):
# there is room to include the new chunk so add it to the window and
# continue
window_text = self._merge_text(window_text, chunk.text)
window_text_length += lengths.text_len
window_items = window_items + chunk.meta.doc_items
window_end += 1
else:
ready_to_append = True

chks = chunks[window_start : window_end + 1]
doc_items = [it for chk in chks for it in chk.meta.doc_items]
candidate = DocChunk(
text=self.delim.join([chk.text for chk in chks]),
meta=DocMeta(
doc_items=doc_items,
headings=current_headings_and_captions[0],
captions=current_headings_and_captions[1],
origin=chunk.meta.origin,
),
)
if (
headings_and_captions == current_headings_and_captions
and self._count_chunk_tokens(doc_chunk=candidate) <= self.max_tokens
):
# there is room to include the new chunk so add it to the window and
# continue
window_end += 1
new_chunk = candidate
else:
ready_to_append = True
if ready_to_append or window_end == num_chunks:
# no more room OR the start of new metadata. Either way, end the block
# and use the current window_end as the start of a new block
if window_start + 1 == window_end:
# just one chunk so use it as is
output_chunks.append(first_chunk_of_window)
else:
new_meta = DocMeta(
doc_items=window_items,
headings=current_headings_and_captions[0],
captions=current_headings_and_captions[1],
origin=chunk.meta.origin,
)
new_chunk = DocChunk(
text=window_text,
meta=new_meta,
)
output_chunks.append(new_chunk)
# no need to reset window_text, etc. because that will be reset in the
# next iteration in the if window_start == window_end block
Expand Down
Loading
Loading