From 84f0846ac6aae996e037f2c5e174a144d13f3b5d Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 28 Oct 2024 01:15:39 +0545 Subject: [PATCH 1/8] wip: adds support for numpy datatypes in tokensloader --- src/litdata/streaming/item_loader.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 43e981e9..cdc59548 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -23,9 +23,7 @@ import numpy as np import torch -from litdata.constants import ( - _TORCH_DTYPES_MAPPING, -) +from litdata.constants import _NUMPY_DTYPES_MAPPING, _TORCH_DTYPES_MAPPING from litdata.streaming.serializers import Serializer from litdata.utilities._pytree import PyTree, tree_unflatten from litdata.utilities.encryption import Encryption, EncryptionLevel @@ -281,7 +279,17 @@ def setup( region_of_interest: Optional[List[Tuple[int, int]]] = None, ) -> None: super().setup(config, chunks, serializers, region_of_interest) - self._dtype = _TORCH_DTYPES_MAPPING[int(config["data_format"][0].split(":")[1])] + + serializer_name, dtype_index = self._data_format[0].split(":") + if serializer_name not in ["no_header_numpy", "no_header_tensor"]: + raise ValueError("The provided data format isn't supported.") + + self._serializer_name = serializer_name + self._dtype = ( + _TORCH_DTYPES_MAPPING[int(dtype_index)] + if serializer_name == "no_header_tensor" + else _NUMPY_DTYPES_MAPPING[int(dtype_index)] + ) if all(chunk["dim"] is None for chunk in self._chunks): raise ValueError("The provided chunks isn't properly setup.") @@ -350,7 +358,12 @@ def load_item_from_chunk( buffer: bytes = self._buffers[chunk_index] offset = self._dtype.itemsize * (index - begin) * self._block_size - return torch.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) + if self._serializer_name == "no_header_tensor": + data = torch.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) + else: + data = np.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) + data = torch.from_numpy(data) + return data def delete(self, chunk_index: int, chunk_filepath: str) -> None: if os.path.exists(chunk_filepath): From 3bad2004ba06a3a1eecc6f1d934d637148613920 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 28 Oct 2024 01:17:02 +0545 Subject: [PATCH 2/8] updated itemloader --- src/litdata/streaming/item_loader.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index cdc59548..43590e42 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -358,6 +358,8 @@ def load_item_from_chunk( buffer: bytes = self._buffers[chunk_index] offset = self._dtype.itemsize * (index - begin) * self._block_size + + # These serilizers steps could be optimized using serializer class and passing other options as kwargs if self._serializer_name == "no_header_tensor": data = torch.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) else: From defe9087854173cb230da2f253d9cede4308ee27 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Mon, 28 Oct 2024 01:19:02 +0545 Subject: [PATCH 3/8] update --- src/litdata/streaming/item_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 43590e42..44e0df6b 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -359,7 +359,7 @@ def load_item_from_chunk( buffer: bytes = self._buffers[chunk_index] offset = self._dtype.itemsize * (index - begin) * self._block_size - # These serilizers steps could be optimized using serializer class and passing other options as kwargs + # These deserialize steps could be optimized using serializer class and passing other options as kwargs if self._serializer_name == "no_header_tensor": data = torch.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) else: From e7709d16e5e9fda46def5403390753f204129b6a Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Tue, 29 Oct 2024 01:05:29 +0545 Subject: [PATCH 4/8] removed numpy array to tensor conversion in tokens loader --- src/litdata/streaming/item_loader.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 44e0df6b..a6ccd220 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -359,12 +359,10 @@ def load_item_from_chunk( buffer: bytes = self._buffers[chunk_index] offset = self._dtype.itemsize * (index - begin) * self._block_size - # These deserialize steps could be optimized using serializer class and passing other options as kwargs if self._serializer_name == "no_header_tensor": data = torch.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) else: data = np.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) - data = torch.from_numpy(data) return data def delete(self, chunk_index: int, chunk_filepath: str) -> None: From 667a777b93b8ebaa2efbf27c89b98c715fa42bc8 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Tue, 29 Oct 2024 01:18:20 +0545 Subject: [PATCH 5/8] update --- src/litdata/streaming/item_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index a6ccd220..64941d14 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -362,7 +362,7 @@ def load_item_from_chunk( if self._serializer_name == "no_header_tensor": data = torch.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) else: - data = np.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) + data = np.frombuffer(buffer, dtype=self._dtype, count=self._block_size, offset=offset) # type: ignore return data def delete(self, chunk_index: int, chunk_filepath: str) -> None: From 97d09d39fb77ef913d739453bc9e265165c64274 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Tue, 5 Nov 2024 12:13:19 +0545 Subject: [PATCH 6/8] fix: add type ignore for dtype mapping in TokensLoader --- src/litdata/streaming/item_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/item_loader.py b/src/litdata/streaming/item_loader.py index 64941d14..9c58cabf 100644 --- a/src/litdata/streaming/item_loader.py +++ b/src/litdata/streaming/item_loader.py @@ -286,7 +286,7 @@ def setup( self._serializer_name = serializer_name self._dtype = ( - _TORCH_DTYPES_MAPPING[int(dtype_index)] + _TORCH_DTYPES_MAPPING[int(dtype_index)] # type: ignore if serializer_name == "no_header_tensor" else _NUMPY_DTYPES_MAPPING[int(dtype_index)] ) From ad23f5f667019b52d12d18f2316fba9ac369cda1 Mon Sep 17 00:00:00 2001 From: bhimrazy Date: Tue, 5 Nov 2024 12:51:50 +0545 Subject: [PATCH 7/8] feat: add test for numpy array in toekns loader --- tests/streaming/test_item_loader.py | 33 +++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/tests/streaming/test_item_loader.py b/tests/streaming/test_item_loader.py index ecb8e6f8..b0a943cf 100644 --- a/tests/streaming/test_item_loader.py +++ b/tests/streaming/test_item_loader.py @@ -1,10 +1,12 @@ from unittest.mock import MagicMock +import numpy as np import torch -from litdata.constants import _TORCH_DTYPES_MAPPING + +from litdata.constants import _NUMPY_DTYPES_MAPPING, _TORCH_DTYPES_MAPPING from litdata.streaming import Cache from litdata.streaming.dataset import StreamingDataset -from litdata.streaming.item_loader import PyTreeLoader +from litdata.streaming.item_loader import PyTreeLoader, TokensLoader def test_serializer_setup(): @@ -38,3 +40,30 @@ def test_pytreeloader_with_no_header_tensor_serializer(tmpdir): item = dataset[i] assert torch.allclose(i * torch.ones(10).to(_TORCH_DTYPES_MAPPING[dtype_index_float]), item["float"]) assert torch.allclose(i * torch.ones(10).to(_TORCH_DTYPES_MAPPING[dtype_index_long]), item["long"]) + + +def test_tokensloader_with_no_header_numpy_serializer(tmpdir): + cache = Cache(str(tmpdir), chunk_size=512, item_loader=TokensLoader()) + assert isinstance(cache._reader._item_loader, TokensLoader) + + dtype_index_int32 = 3 + dtype = _NUMPY_DTYPES_MAPPING[dtype_index_int32] + + for i in range(10): + data = np.random.randint(0, 100, size=(256), dtype=dtype) + cache._add_item(i, data) + + data_format = [f"no_header_numpy:{dtype_index_int32}"] + assert cache._writer.get_config()["data_format"] == data_format + cache.done() + cache.merge() + + dataset = StreamingDataset( + input_dir=str(tmpdir), + drop_last=True, + item_loader=TokensLoader(block_size=256), + ) + + for data in dataset: + assert data.shape == (256,) + assert data.dtype == dtype From 69719df38af1c64e605d4376575f6717a10ffa23 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 5 Nov 2024 07:07:59 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/streaming/test_item_loader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/streaming/test_item_loader.py b/tests/streaming/test_item_loader.py index b0a943cf..a5828b24 100644 --- a/tests/streaming/test_item_loader.py +++ b/tests/streaming/test_item_loader.py @@ -2,7 +2,6 @@ import numpy as np import torch - from litdata.constants import _NUMPY_DTYPES_MAPPING, _TORCH_DTYPES_MAPPING from litdata.streaming import Cache from litdata.streaming.dataset import StreamingDataset