From 990c5c0f279a3c01b8c4a427a71aad026e30292f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 13 Jan 2025 13:56:42 +0000 Subject: [PATCH] [DOP-22144] Change logic of applying limits in FileConnection.walk --- docs/changelog/next_release/327.breaking.rst | 2 ++ onetl/base/base_file_connection.py | 6 ++--- .../file_connection/file_connection.py | 23 ++++++++----------- onetl/core/file_limit/file_limit.py | 2 +- onetl/file/limit/max_files_count.py | 2 +- onetl/file/limit/total_files_size.py | 2 +- .../test_file_downloader_integration.py | 4 ++-- .../test_file/test_file_limit_unit.py | 11 +++++---- .../test_limit/test_limits_are_reached.py | 3 +++ .../test_limit/test_limits_stop_at.py | 4 ++++ .../test_limit/test_max_files_count.py | 11 +++++---- .../test_file/test_limit/test_reset_limits.py | 2 +- .../test_limit/test_total_files_size.py | 2 +- 13 files changed, 43 insertions(+), 31 deletions(-) create mode 100644 docs/changelog/next_release/327.breaking.rst diff --git a/docs/changelog/next_release/327.breaking.rst b/docs/changelog/next_release/327.breaking.rst new file mode 100644 index 000000000..8707d816a --- /dev/null +++ b/docs/changelog/next_release/327.breaking.rst @@ -0,0 +1,2 @@ +Change the logic of ``FileConnection.walk`` and ``FileConnection.list_dir`` to exclude file what returned ``True`` from ``limit.stops_at(path)``, +instead of stopping just *after* this file (possibly exceeding the limit). diff --git a/onetl/base/base_file_connection.py b/onetl/base/base_file_connection.py index 289494999..70e96d34c 100644 --- a/onetl/base/base_file_connection.py +++ b/onetl/base/base_file_connection.py @@ -435,11 +435,11 @@ def walk( If ``True``, walk in top-down order, otherwise walk in bottom-up order. filters : list of :obj:`BaseFileFilter `, optional - Return only files/directories matching these filters. See :ref:`file-filters` + Return only files/directories matching these filters. See :ref:`file-filters`. limits : list of :obj:`BaseFileLimit `, optional - Apply limits to the list of files/directories, and stop if one of the limits is reached. - See :ref:`file-limits` + Apply limits to the list of files/directories, and immediately stop if any of these limits is reached. + See :ref:`file-limits`. Returns ------- diff --git a/onetl/connection/file_connection/file_connection.py b/onetl/connection/file_connection/file_connection.py index c85eead4b..180b9b44a 100644 --- a/onetl/connection/file_connection/file_connection.py +++ b/onetl/connection/file_connection/file_connection.py @@ -415,6 +415,9 @@ def list_dir( limits = reset_limits(limits or []) for entry in self._scan_entries(remote_dir): + if limits_reached(limits): + break + name = self._extract_name_from_entry(entry) stat = self._extract_stat_from_entry(remote_dir, entry) @@ -423,12 +426,9 @@ def list_dir( else: path = RemoteFile(path=name, stats=stat) - if match_all_filters(path, filters): + if match_all_filters(path, filters) and not limits_stop_at(path, limits): result.append(path) - if limits_stop_at(path, limits): - break - return result @slot @@ -491,6 +491,9 @@ def _walk( # noqa: WPS231 dirs, files = [], [] for entry in self._scan_entries(root): + if limits_reached(limits): + break + name = self._extract_name_from_entry(entry) stat = self._extract_stat_from_entry(root, entry) @@ -499,21 +502,15 @@ def _walk( # noqa: WPS231 yield from self._walk(root=root / name, topdown=topdown, filters=filters, limits=limits) path = RemoteDirectory(path=root / name, stats=stat) - if match_all_filters(path, filters): + if match_all_filters(path, filters) and not limits_stop_at(path, limits): dirs.append(RemoteDirectory(path=name, stats=stat)) - - if limits_stop_at(path, limits): - break else: path = RemoteFile(path=root / name, stats=stat) - if match_all_filters(path, filters): + if match_all_filters(path, filters) and not limits_stop_at(path, limits): files.append(RemoteFile(path=name, stats=stat)) - if limits_stop_at(path, limits): - break - - if topdown: + if topdown and not limits_reached(limits): for name in dirs: yield from self._walk(root=root / name, topdown=topdown, filters=filters, limits=limits) diff --git a/onetl/core/file_limit/file_limit.py b/onetl/core/file_limit/file_limit.py index de82dafec..c235547eb 100644 --- a/onetl/core/file_limit/file_limit.py +++ b/onetl/core/file_limit/file_limit.py @@ -67,7 +67,7 @@ def stops_at(self, path: PathProtocol) -> bool: @property def is_reached(self) -> bool: - return self._counter >= self.count_limit + return self._counter > self.count_limit @validator("count_limit") def _deprecated(cls, value): diff --git a/onetl/file/limit/max_files_count.py b/onetl/file/limit/max_files_count.py index 4e9309333..8b18e8417 100644 --- a/onetl/file/limit/max_files_count.py +++ b/onetl/file/limit/max_files_count.py @@ -76,4 +76,4 @@ def stops_at(self, path: PathProtocol) -> bool: @property def is_reached(self) -> bool: - return self._handled >= self.limit + return self._handled > self.limit diff --git a/onetl/file/limit/total_files_size.py b/onetl/file/limit/total_files_size.py index 5c9237476..980001f47 100644 --- a/onetl/file/limit/total_files_size.py +++ b/onetl/file/limit/total_files_size.py @@ -85,4 +85,4 @@ def stops_at(self, path: PathProtocol) -> bool: @property def is_reached(self) -> bool: - return self._handled >= self.limit + return self._handled > self.limit diff --git a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py index f3f594e1a..a76841965 100644 --- a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py +++ b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py @@ -790,7 +790,7 @@ def finalizer(): downloader.run([not_a_file]) -def test_file_downloader_with_file_limit(file_connection_with_path_and_files, tmp_path_factory, caplog): +def test_file_downloader_with_limit(file_connection_with_path_and_files, tmp_path_factory, caplog): file_connection, remote_path, _ = file_connection_with_path_and_files limit = 2 local_path = tmp_path_factory.mktemp("local_path") @@ -814,7 +814,7 @@ def test_file_downloader_with_file_limit(file_connection_with_path_and_files, tm assert len(download_result.successful) == limit -def test_file_downloader_file_limit_is_ignored_by_user_input( +def test_file_downloader_limit_is_ignored_by_user_input( file_connection_with_path_and_files, tmp_path_factory, ): diff --git a/tests/tests_unit/test_file/test_file_limit_unit.py b/tests/tests_unit/test_file/test_file_limit_unit.py index d73c0d380..672ca1f19 100644 --- a/tests/tests_unit/test_file/test_file_limit_unit.py +++ b/tests/tests_unit/test_file/test_file_limit_unit.py @@ -44,10 +44,10 @@ def test_file_limit(): assert not file_limit.stops_at(directory) assert not file_limit.is_reached - # limit is reached - all check are True, input does not matter - assert file_limit.stops_at(file3) - assert file_limit.is_reached + assert not file_limit.stops_at(file3) + assert not file_limit.is_reached + # limit is reached - all check are True, input does not matter assert file_limit.stops_at(file4) assert file_limit.is_reached @@ -64,5 +64,8 @@ def test_file_limit(): assert not file_limit.stops_at(file1) assert not file_limit.is_reached - assert file_limit.stops_at(file1) + assert not file_limit.stops_at(file2) + assert not file_limit.is_reached + + assert file_limit.stops_at(file3) assert file_limit.is_reached diff --git a/tests/tests_unit/test_file/test_limit/test_limits_are_reached.py b/tests/tests_unit/test_file/test_limit/test_limits_are_reached.py index 77474becb..c0138f132 100644 --- a/tests/tests_unit/test_file/test_limit/test_limits_are_reached.py +++ b/tests/tests_unit/test_file/test_limit/test_limits_are_reached.py @@ -18,6 +18,9 @@ def test_limits_reached(caplog): assert not limit1.stops_at(file) assert not limits_reached(limits) + assert not limit1.stops_at(file) + assert not limits_reached(limits) + # limit is reached - all check are True, input does not matter with caplog.at_level(logging.DEBUG): assert limit1.stops_at(file) diff --git a/tests/tests_unit/test_file/test_limit/test_limits_stop_at.py b/tests/tests_unit/test_file/test_limit/test_limits_stop_at.py index 954f1b86c..1acaadb7f 100644 --- a/tests/tests_unit/test_file/test_limit/test_limits_stop_at.py +++ b/tests/tests_unit/test_file/test_limit/test_limits_stop_at.py @@ -19,6 +19,10 @@ def test_limits_stop_at(caplog): assert not limit1.is_reached assert not limit2.is_reached + assert not limits_stop_at(file, limits) + assert not limit1.is_reached + assert not limit2.is_reached + # limit is reached - all check are True, input does not matter with caplog.at_level(logging.DEBUG): assert limits_stop_at(file, limits) diff --git a/tests/tests_unit/test_file/test_limit/test_max_files_count.py b/tests/tests_unit/test_file/test_limit/test_max_files_count.py index 19e6ae1bd..71444ee6f 100644 --- a/tests/tests_unit/test_file/test_limit/test_max_files_count.py +++ b/tests/tests_unit/test_file/test_limit/test_max_files_count.py @@ -36,10 +36,10 @@ def test_max_files_count(): assert not limit.stops_at(directory) assert not limit.is_reached - # limit is reached - all check are True, input does not matter - assert limit.stops_at(file3) - assert limit.is_reached + assert not limit.stops_at(file3) + assert not limit.is_reached + # limit is reached - all check are True, input does not matter assert limit.stops_at(file4) assert limit.is_reached @@ -56,5 +56,8 @@ def test_max_files_count(): assert not limit.stops_at(file1) assert not limit.is_reached - assert limit.stops_at(file1) + assert not limit.stops_at(file3) + assert not limit.is_reached + + assert limit.stops_at(file4) assert limit.is_reached diff --git a/tests/tests_unit/test_file/test_limit/test_reset_limits.py b/tests/tests_unit/test_file/test_limit/test_reset_limits.py index 00d2ec1fb..5fe34e237 100644 --- a/tests/tests_unit/test_file/test_limit/test_reset_limits.py +++ b/tests/tests_unit/test_file/test_limit/test_reset_limits.py @@ -9,7 +9,7 @@ def test_reset_limits(): file = RemoteFile(path="file1.csv", stats=RemotePathStat(st_size=10 * 1024, st_mtime=50)) - for _ in range(3): + for _ in range(4): limit1.stops_at(file) limit2.stops_at(file) diff --git a/tests/tests_unit/test_file/test_limit/test_total_files_size.py b/tests/tests_unit/test_file/test_limit/test_total_files_size.py index 26b6798a3..75f92d60d 100644 --- a/tests/tests_unit/test_file/test_limit/test_total_files_size.py +++ b/tests/tests_unit/test_file/test_limit/test_total_files_size.py @@ -77,5 +77,5 @@ def test_total_files_size(): assert not limit.stops_at(file1) assert not limit.is_reached - assert limit.stops_at(file1) + assert limit.stops_at(file3) assert limit.is_reached