diff --git a/docs/changelog/next_release/325.feature.rst b/docs/changelog/next_release/325.feature.rst new file mode 100644 index 000000000..b1cd835c2 --- /dev/null +++ b/docs/changelog/next_release/325.feature.rst @@ -0,0 +1 @@ +Introduce ``TotalFilesSize(...)`` limit. Now users can set ``FileDownloader`` / ``FileMover`` to stop downloading/moving files after reaching a certain amount of data. diff --git a/docs/file/file_limits/index.rst b/docs/file/file_limits/index.rst index bb8042bac..3af8a4406 100644 --- a/docs/file/file_limits/index.rst +++ b/docs/file/file_limits/index.rst @@ -8,6 +8,7 @@ File Limits :caption: File limits max_files_count + total_files_size .. toctree:: :maxdepth: 1 diff --git a/docs/file/file_limits/total_files_size.rst b/docs/file/file_limits/total_files_size.rst new file mode 100644 index 000000000..af1e3c03f --- /dev/null +++ b/docs/file/file_limits/total_files_size.rst @@ -0,0 +1,9 @@ +.. _total-files-size-limit: + +TotalFilesSize +============== + +.. currentmodule:: onetl.file.limit.total_tiles_size + +.. autoclass:: TotalFilesSize + :members: reset, stops_at, is_reached diff --git a/onetl/file/file_downloader/file_downloader.py b/onetl/file/file_downloader/file_downloader.py index ffd4925c0..2d32be60c 100644 --- a/onetl/file/file_downloader/file_downloader.py +++ b/onetl/file/file_downloader/file_downloader.py @@ -182,7 +182,7 @@ class FileDownloader(FrozenModel): from onetl.connection import SFTP from onetl.file import FileDownloader from onetl.file.filter import Glob, ExcludeDir - from onetl.file.limit import MaxFilesCount + from onetl.file.limit import MaxFilesCount, TotalFileSize sftp = SFTP(...) @@ -196,7 +196,7 @@ class FileDownloader(FrozenModel): Glob("*.txt"), ExcludeDir("/path/to/remote/source/exclude_dir"), ], - limits=[MaxFilesCount(100)], + limits=[MaxFilesCount(100), TotalFileSize("10GiB")], options=FileDownloader.Options(delete_source=True, if_exists="replace_file"), ) diff --git a/onetl/file/file_mover/file_mover.py b/onetl/file/file_mover/file_mover.py index 3fd7e5c6e..81c25e3bc 100644 --- a/onetl/file/file_mover/file_mover.py +++ b/onetl/file/file_mover/file_mover.py @@ -121,7 +121,7 @@ class FileMover(FrozenModel): from onetl.connection import SFTP from onetl.file import FileMover from onetl.file.filter import Glob, ExcludeDir - from onetl.file.limit import MaxFilesCount + from onetl.file.limit import MaxFilesCount, TotalFilesSize sftp = SFTP(...) @@ -134,7 +134,7 @@ class FileMover(FrozenModel): Glob("*.txt"), ExcludeDir("/path/to/source/dir/exclude"), ], - limits=[MaxFilesCount(100)], + limits=[MaxFilesCount(100), TotalFileSize("10GiB")], options=FileMover.Options(if_exists="replace_file"), ) diff --git a/onetl/file/limit/__init__.py b/onetl/file/limit/__init__.py index 2d353fc29..ccc511efd 100644 --- a/onetl/file/limit/__init__.py +++ b/onetl/file/limit/__init__.py @@ -4,3 +4,12 @@ from onetl.file.limit.limits_stop_at import limits_stop_at from onetl.file.limit.max_files_count import MaxFilesCount from onetl.file.limit.reset_limits import reset_limits +from onetl.file.limit.total_files_size import TotalFilesSize + +__all__ = [ + "limits_reached", + "limits_stop_at", + "MaxFilesCount", + "TotalFilesSize", + "reset_limits", +] diff --git a/onetl/file/limit/max_files_count.py b/onetl/file/limit/max_files_count.py index c62292fa8..9f8939de3 100644 --- a/onetl/file/limit/max_files_count.py +++ b/onetl/file/limit/max_files_count.py @@ -4,6 +4,11 @@ import logging +try: + from pydantic.v1 import validator +except (ImportError, AttributeError): + from pydantic import validator # type: ignore[no-redef, assignment] + from onetl.base import BaseFileLimit, PathProtocol from onetl.impl import FrozenModel @@ -13,6 +18,10 @@ class MaxFilesCount(BaseFileLimit, FrozenModel): """Limits the total number of files handled by :ref:`file-downloader` or :ref:`file-mover`. + All files until specified limit (including) will be downloaded/moved, but ``limit+1`` will not. + + This doesn't apply to directories. + .. versionadded:: 0.8.0 Replaces deprecated ``onetl.core.FileLimit`` @@ -21,12 +30,10 @@ class MaxFilesCount(BaseFileLimit, FrozenModel): limit : int - All files until ``limit`` (including) will be downloaded/moved, but ``limit+1`` will not. - Examples -------- - Create filter which allows to handle 100 files, but stops on 101: + Create filter which allows to download/move up to 100 files, but stops on 101: .. code:: python @@ -37,7 +44,7 @@ class MaxFilesCount(BaseFileLimit, FrozenModel): limit: int - _counter: int = 0 + _handled: int = 0 def __init__(self, limit: int): # this is only to allow passing glob as positional argument @@ -46,21 +53,27 @@ def __init__(self, limit: int): def __repr__(self): return f"{self.__class__.__name__}({self.limit})" + @validator("limit") + def _limit_cannot_be_negative(cls, value): + if value <= 0: + raise ValueError("Limit should be positive number") + return value + def reset(self): - self._counter = 0 + self._handled = 0 return self def stops_at(self, path: PathProtocol) -> bool: if self.is_reached: return True - if path.is_dir(): + if not path.is_file(): # directories count does not matter return False - self._counter += 1 + self._handled += 1 return self.is_reached @property def is_reached(self) -> bool: - return self._counter >= self.limit + return self._handled >= self.limit diff --git a/onetl/file/limit/total_files_size.py b/onetl/file/limit/total_files_size.py new file mode 100644 index 000000000..5c9237476 --- /dev/null +++ b/onetl/file/limit/total_files_size.py @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: 2021-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import logging + +from onetl.base.path_protocol import PathWithStatsProtocol + +try: + from pydantic.v1 import ByteSize, validator +except (ImportError, AttributeError): + from pydantic import ByteSize, validator # type: ignore[no-redef, assignment] + +from onetl.base import BaseFileLimit, PathProtocol +from onetl.impl import FrozenModel + +log = logging.getLogger(__name__) + + +class TotalFilesSize(BaseFileLimit, FrozenModel): + """Limits the total size of files handled by :ref:`file-downloader` or :ref:`file-mover`. + + Sum of downloaded/moved files should be less or equal to specified size. After that all files with non-zero size will be ignored. + + This doesn't apply to directories or files with no size information, + + .. versionadded:: 0.13.0 + + ..note:: + + SI unit prefixes means that ``1KB`` == ``1 kilobyte`` == ``1000 bytes``. + If you need ``1024 bytes``, use ``1 KiB`` == ``1 kibibyte``. + + Parameters + ---------- + + limit : int or str + + Examples + -------- + + Create filter which allows to download/move files with total size up to 1GiB, but not higher: + + .. code:: python + + from onetl.file.limit import MaxFilesCount + + limit = TotalFilesSize("1GiB") + """ + + limit: ByteSize + + _handled: int = 0 + + def __init__(self, limit: int | str): + # this is only to allow passing glob as positional argument + super().__init__(limit=limit) # type: ignore + + def __repr__(self): + return f'{self.__class__.__name__}("{self.limit.human_readable()}")' + + @validator("limit") + def _limit_cannot_be_negative(cls, value): + if value <= 0: + raise ValueError("Limit should be positive number") + return value + + def reset(self): + self._handled = 0 + return self + + def stops_at(self, path: PathProtocol) -> bool: + if self.is_reached: + return True + + if not path.is_file(): + # directories count does not matter + return False + + if not isinstance(path, PathWithStatsProtocol): + return False + + self._handled += path.stat().st_size + return self.is_reached + + @property + def is_reached(self) -> bool: + return self._handled >= self.limit diff --git a/setup.cfg b/setup.cfg index 3fcecbbaa..3d6d37b61 100644 --- a/setup.cfg +++ b/setup.cfg @@ -281,7 +281,9 @@ ignore = # WPS412 Found `__init__.py` module with logic WPS412, # WPS413 Found bad magic module function: __getattr__ - WPS413 + WPS413, +# WPS338 Found incorrect order of methods in a class + WPS338 # http://flake8.pycqa.org/en/latest/user/options.html?highlight=per-file-ignores#cmdoption-flake8-per-file-ignores per-file-ignores = diff --git a/tests/tests_unit/test_file/test_limit/test_max_files_count.py b/tests/tests_unit/test_file/test_limit/test_max_files_count.py index 0f948e3a6..19e6ae1bd 100644 --- a/tests/tests_unit/test_file/test_limit/test_max_files_count.py +++ b/tests/tests_unit/test_file/test_limit/test_max_files_count.py @@ -1,7 +1,21 @@ +import pytest + from onetl.file.limit import MaxFilesCount from onetl.impl import RemoteDirectory, RemoteFile, RemotePathStat +def test_max_files_count_invalid(): + with pytest.raises(ValueError, match="Limit should be positive number"): + MaxFilesCount(0) + + with pytest.raises(ValueError, match="Limit should be positive number"): + MaxFilesCount(-1) + + +def test_max_files_count_repr(): + assert repr(MaxFilesCount(10)) == "MaxFilesCount(10)" + + def test_max_files_count(): limit = MaxFilesCount(3) assert not limit.is_reached diff --git a/tests/tests_unit/test_file/test_limit/test_total_files_size.py b/tests/tests_unit/test_file/test_limit/test_total_files_size.py new file mode 100644 index 000000000..26b6798a3 --- /dev/null +++ b/tests/tests_unit/test_file/test_limit/test_total_files_size.py @@ -0,0 +1,81 @@ +import pytest + +from onetl.file.limit import TotalFilesSize +from onetl.impl import RemoteDirectory, RemoteFile, RemotePathStat + + +def test_total_files_size_invalid(): + with pytest.raises(ValueError, match="Limit should be positive number"): + TotalFilesSize(0) + + with pytest.raises(ValueError, match="Limit should be positive number"): + TotalFilesSize(-1) + + with pytest.raises(ValueError, match="could not parse value and unit from byte string"): + TotalFilesSize("wtf") + + +def test_total_files_size_repr(): + assert repr(TotalFilesSize("10KiB")) == 'TotalFilesSize("10.0KiB")' + + +@pytest.mark.parametrize( + ["input", "expected"], + [ + ("10", 10), + ("10B", 10), + ("10KB", 10_000), + ("10KiB", 10 * 1024), + ("10MB", 10_000_000), + ("10MiB", 10 * 1024 * 1024), + ("10GB", 10_000_000_000), + ("10GiB", 10 * 1024 * 1024 * 1024), + ], +) +def test_total_files_size_parse_units(input: str, expected: int): + assert TotalFilesSize(input.replace("B", "b")).limit == expected + assert TotalFilesSize(input).limit == expected + + +def test_total_files_size(): + limit = TotalFilesSize("30KiB") + assert not limit.is_reached + + directory = RemoteDirectory("some") + file1 = RemoteFile(path="file1.csv", stats=RemotePathStat(st_size=10 * 1024, st_mtime=50)) + file2 = RemoteFile(path="file2.csv", stats=RemotePathStat(st_size=10 * 1024, st_mtime=50)) + file3 = RemoteFile(path="nested/file3.csv", stats=RemotePathStat(st_size=20 * 1024, st_mtime=50)) + file4 = RemoteFile(path="nested/file4.csv", stats=RemotePathStat(st_size=20 * 1024, st_mtime=50)) + + assert not limit.stops_at(file1) + assert not limit.is_reached + + assert not limit.stops_at(file2) + assert not limit.is_reached + + # directories are not checked by limit + assert not limit.stops_at(directory) + assert not limit.is_reached + + # limit is reached - all check are True, input does not matter + assert limit.stops_at(file3) + assert limit.is_reached + + assert limit.stops_at(file4) + assert limit.is_reached + + assert limit.stops_at(directory) + assert limit.is_reached + + # reset internal state + limit.reset() + + assert not limit.stops_at(file1) + assert not limit.is_reached + + # limit does not remember each file, so if duplicates are present, they can affect the result + assert not limit.stops_at(file1) + assert not limit.is_reached + + assert limit.stops_at(file1) + assert limit.is_reached