Skip to content

Commit

Permalink
[DOP-22144] Introduce TotalFilesSize limit
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jan 13, 2025
1 parent dc1bc9a commit 9fa194a
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/325.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce ``TotalFilesSize(...)`` limit. Now users can set ``FileDownloader`` / ``FileMover`` to stop downloading/moving files after reaching a certain amount of data.
1 change: 1 addition & 0 deletions docs/file/file_limits/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ File Limits
:caption: File limits

max_files_count
total_files_size

.. toctree::
:maxdepth: 1
Expand Down
9 changes: 9 additions & 0 deletions docs/file/file_limits/total_files_size.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.. _total-files-size-limit:

TotalFilesSize
==============

.. currentmodule:: onetl.file.limit.total_tiles_size

.. autoclass:: TotalFilesSize
:members: reset, stops_at, is_reached
4 changes: 2 additions & 2 deletions onetl/file/file_downloader/file_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class FileDownloader(FrozenModel):
from onetl.connection import SFTP
from onetl.file import FileDownloader
from onetl.file.filter import Glob, ExcludeDir
from onetl.file.limit import MaxFilesCount
from onetl.file.limit import MaxFilesCount, TotalFileSize
sftp = SFTP(...)
Expand All @@ -196,7 +196,7 @@ class FileDownloader(FrozenModel):
Glob("*.txt"),
ExcludeDir("/path/to/remote/source/exclude_dir"),
],
limits=[MaxFilesCount(100)],
limits=[MaxFilesCount(100), TotalFileSize("10GiB")],
options=FileDownloader.Options(delete_source=True, if_exists="replace_file"),
)
Expand Down
4 changes: 2 additions & 2 deletions onetl/file/file_mover/file_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class FileMover(FrozenModel):
from onetl.connection import SFTP
from onetl.file import FileMover
from onetl.file.filter import Glob, ExcludeDir
from onetl.file.limit import MaxFilesCount
from onetl.file.limit import MaxFilesCount, TotalFilesSize
sftp = SFTP(...)
Expand All @@ -134,7 +134,7 @@ class FileMover(FrozenModel):
Glob("*.txt"),
ExcludeDir("/path/to/source/dir/exclude"),
],
limits=[MaxFilesCount(100)],
limits=[MaxFilesCount(100), TotalFileSize("10GiB")],
options=FileMover.Options(if_exists="replace_file"),
)
Expand Down
9 changes: 9 additions & 0 deletions onetl/file/limit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@
from onetl.file.limit.limits_stop_at import limits_stop_at
from onetl.file.limit.max_files_count import MaxFilesCount
from onetl.file.limit.reset_limits import reset_limits
from onetl.file.limit.total_files_size import TotalFilesSize

__all__ = [
"limits_reached",
"limits_stop_at",
"MaxFilesCount",
"TotalFilesSize",
"reset_limits",
]
29 changes: 21 additions & 8 deletions onetl/file/limit/max_files_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

import logging

try:
from pydantic.v1 import validator
except (ImportError, AttributeError):
from pydantic import validator # type: ignore[no-redef, assignment]

from onetl.base import BaseFileLimit, PathProtocol
from onetl.impl import FrozenModel

Expand All @@ -13,6 +18,10 @@
class MaxFilesCount(BaseFileLimit, FrozenModel):
"""Limits the total number of files handled by :ref:`file-downloader` or :ref:`file-mover`.
All files until specified limit (including) will be downloaded/moved, but ``limit+1`` will not.
This doesn't apply to directories.
.. versionadded:: 0.8.0
Replaces deprecated ``onetl.core.FileLimit``
Expand All @@ -21,12 +30,10 @@ class MaxFilesCount(BaseFileLimit, FrozenModel):
limit : int
All files until ``limit`` (including) will be downloaded/moved, but ``limit+1`` will not.
Examples
--------
Create filter which allows to handle 100 files, but stops on 101:
Create filter which allows to download/move up to 100 files, but stops on 101:
.. code:: python
Expand All @@ -37,7 +44,7 @@ class MaxFilesCount(BaseFileLimit, FrozenModel):

limit: int

_counter: int = 0
_handled: int = 0

def __init__(self, limit: int):
# this is only to allow passing glob as positional argument
Expand All @@ -46,21 +53,27 @@ def __init__(self, limit: int):
def __repr__(self):
return f"{self.__class__.__name__}({self.limit})"

@validator("limit")
def _limit_cannot_be_negative(cls, value):
if value <= 0:
raise ValueError("Limit should be positive number")
return value

def reset(self):
self._counter = 0
self._handled = 0
return self

def stops_at(self, path: PathProtocol) -> bool:
if self.is_reached:
return True

if path.is_dir():
if not path.is_file():
# directories count does not matter
return False

self._counter += 1
self._handled += 1
return self.is_reached

@property
def is_reached(self) -> bool:
return self._counter >= self.limit
return self._handled >= self.limit
88 changes: 88 additions & 0 deletions onetl/file/limit/total_files_size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# SPDX-FileCopyrightText: 2021-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import logging

from onetl.base.path_protocol import PathWithStatsProtocol

try:
from pydantic.v1 import ByteSize, validator
except (ImportError, AttributeError):
from pydantic import ByteSize, validator # type: ignore[no-redef, assignment]

from onetl.base import BaseFileLimit, PathProtocol
from onetl.impl import FrozenModel

log = logging.getLogger(__name__)


class TotalFilesSize(BaseFileLimit, FrozenModel):
"""Limits the total size of files handled by :ref:`file-downloader` or :ref:`file-mover`.
Sum of downloaded/moved files should be less or equal to specified size. After that all files with non-zero size will be ignored.
This doesn't apply to directories or files with no size information,
.. versionadded:: 0.13.0
..note::
SI unit prefixes means that ``1KB`` == ``1 kilobyte`` == ``1000 bytes``.
If you need ``1024 bytes``, use ``1 KiB`` == ``1 kibibyte``.
Parameters
----------
limit : int or str
Examples
--------
Create filter which allows to download/move files with total size up to 1GiB, but not higher:
.. code:: python
from onetl.file.limit import MaxFilesCount
limit = TotalFilesSize("1GiB")
"""

limit: ByteSize

_handled: int = 0

def __init__(self, limit: int | str):
# this is only to allow passing glob as positional argument
super().__init__(limit=limit) # type: ignore

def __repr__(self):
return f'{self.__class__.__name__}("{self.limit.human_readable()}")'

@validator("limit")
def _limit_cannot_be_negative(cls, value):
if value <= 0:
raise ValueError("Limit should be positive number")
return value

def reset(self):
self._handled = 0
return self

def stops_at(self, path: PathProtocol) -> bool:
if self.is_reached:
return True

if not path.is_file():
# directories count does not matter
return False

if not isinstance(path, PathWithStatsProtocol):
return False

self._handled += path.stat().st_size
return self.is_reached

@property
def is_reached(self) -> bool:
return self._handled >= self.limit
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ ignore =
# WPS412 Found `__init__.py` module with logic
WPS412,
# WPS413 Found bad magic module function: __getattr__
WPS413
WPS413,
# WPS338 Found incorrect order of methods in a class
WPS338

# http://flake8.pycqa.org/en/latest/user/options.html?highlight=per-file-ignores#cmdoption-flake8-per-file-ignores
per-file-ignores =
Expand Down
14 changes: 14 additions & 0 deletions tests/tests_unit/test_file/test_limit/test_max_files_count.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
import pytest

from onetl.file.limit import MaxFilesCount
from onetl.impl import RemoteDirectory, RemoteFile, RemotePathStat


def test_max_files_count_invalid():
with pytest.raises(ValueError, match="Limit should be positive number"):
MaxFilesCount(0)

with pytest.raises(ValueError, match="Limit should be positive number"):
MaxFilesCount(-1)


def test_max_files_count_repr():
assert repr(MaxFilesCount(10)) == "MaxFilesCount(10)"


def test_max_files_count():
limit = MaxFilesCount(3)
assert not limit.is_reached
Expand Down
81 changes: 81 additions & 0 deletions tests/tests_unit/test_file/test_limit/test_total_files_size.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pytest

from onetl.file.limit import TotalFilesSize
from onetl.impl import RemoteDirectory, RemoteFile, RemotePathStat


def test_total_files_size_invalid():
with pytest.raises(ValueError, match="Limit should be positive number"):
TotalFilesSize(0)

with pytest.raises(ValueError, match="Limit should be positive number"):
TotalFilesSize(-1)

with pytest.raises(ValueError, match="could not parse value and unit from byte string"):
TotalFilesSize("wtf")


def test_total_files_size_repr():
assert repr(TotalFilesSize("10KiB")) == 'TotalFilesSize("10.0KiB")'


@pytest.mark.parametrize(
["input", "expected"],
[
("10", 10),
("10B", 10),
("10KB", 10_000),
("10KiB", 10 * 1024),
("10MB", 10_000_000),
("10MiB", 10 * 1024 * 1024),
("10GB", 10_000_000_000),
("10GiB", 10 * 1024 * 1024 * 1024),
],
)
def test_total_files_size_parse_units(input: str, expected: int):
assert TotalFilesSize(input.replace("B", "b")).limit == expected
assert TotalFilesSize(input).limit == expected


def test_total_files_size():
limit = TotalFilesSize("30KiB")
assert not limit.is_reached

directory = RemoteDirectory("some")
file1 = RemoteFile(path="file1.csv", stats=RemotePathStat(st_size=10 * 1024, st_mtime=50))
file2 = RemoteFile(path="file2.csv", stats=RemotePathStat(st_size=10 * 1024, st_mtime=50))
file3 = RemoteFile(path="nested/file3.csv", stats=RemotePathStat(st_size=20 * 1024, st_mtime=50))
file4 = RemoteFile(path="nested/file4.csv", stats=RemotePathStat(st_size=20 * 1024, st_mtime=50))

assert not limit.stops_at(file1)
assert not limit.is_reached

assert not limit.stops_at(file2)
assert not limit.is_reached

# directories are not checked by limit
assert not limit.stops_at(directory)
assert not limit.is_reached

# limit is reached - all check are True, input does not matter
assert limit.stops_at(file3)
assert limit.is_reached

assert limit.stops_at(file4)
assert limit.is_reached

assert limit.stops_at(directory)
assert limit.is_reached

# reset internal state
limit.reset()

assert not limit.stops_at(file1)
assert not limit.is_reached

# limit does not remember each file, so if duplicates are present, they can affect the result
assert not limit.stops_at(file1)
assert not limit.is_reached

assert limit.stops_at(file1)
assert limit.is_reached

0 comments on commit 9fa194a

Please sign in to comment.