From e983a57f6be1de6c789cf5c0d80a25718346e5e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Mon, 27 Jan 2025 12:13:32 +0000 Subject: [PATCH] [DOP-22127] Implement FileModificationTime filter --- docs/changelog/next_release/330.feature.rst | 1 + docs/file/file_filters/file_mtime_filter.rst | 9 + docs/file/file_filters/index.rst | 1 + onetl/base/base_file_filter.py | 3 +- .../file_connection/file_connection.py | 15 +- onetl/connection/file_connection/webdav.py | 2 +- onetl/file/file_downloader/file_downloader.py | 5 +- onetl/file/file_mover/file_mover.py | 2 +- onetl/file/filter/__init__.py | 2 + onetl/file/filter/file_mtime.py | 117 +++++++++++ onetl/file/filter/file_size.py | 8 +- onetl/impl/path_repr.py | 4 +- setup.cfg | 6 +- .../test_file_downloader_integration.py | 158 +++++++++++--- .../test_file_mover_integration.py | 198 ++++++++++++++---- .../test_filter/test_file_modified_time.py | 97 +++++++++ .../test_filter/test_file_size_range.py | 29 +-- tests/tests_unit/test_impl_unit.py | 8 +- 18 files changed, 567 insertions(+), 98 deletions(-) create mode 100644 docs/changelog/next_release/330.feature.rst create mode 100644 docs/file/file_filters/file_mtime_filter.rst create mode 100644 onetl/file/filter/file_mtime.py create mode 100644 tests/tests_unit/test_file/test_filter/test_file_modified_time.py diff --git a/docs/changelog/next_release/330.feature.rst b/docs/changelog/next_release/330.feature.rst new file mode 100644 index 000000000..b5569e660 --- /dev/null +++ b/docs/changelog/next_release/330.feature.rst @@ -0,0 +1 @@ +Implement ``FileModifiedTime(since=..., until=...)`` file filter. Now users can set ``FileDownloader`` / ``FileMover`` to download/move only files with specific file modification time. diff --git a/docs/file/file_filters/file_mtime_filter.rst b/docs/file/file_filters/file_mtime_filter.rst new file mode 100644 index 000000000..e239bfd54 --- /dev/null +++ b/docs/file/file_filters/file_mtime_filter.rst @@ -0,0 +1,9 @@ +.. _file-modificatiom-time: + +FileModifiedTime +==================== + +.. currentmodule:: onetl.file.filter.file_mtime + +.. autoclass:: FileModifiedTime + :members: match diff --git a/docs/file/file_filters/index.rst b/docs/file/file_filters/index.rst index 575a31667..c9fda3e5e 100644 --- a/docs/file/file_filters/index.rst +++ b/docs/file/file_filters/index.rst @@ -11,6 +11,7 @@ File Filters regexp exclude_dir file_size_filter + file_mtime_filter .. toctree:: :maxdepth: 1 diff --git a/onetl/base/base_file_filter.py b/onetl/base/base_file_filter.py index d2fb0a641..6361c2cd2 100644 --- a/onetl/base/base_file_filter.py +++ b/onetl/base/base_file_filter.py @@ -29,8 +29,7 @@ def match(self, path: PathProtocol) -> bool: Examples -------- - from onetl.impl import LocalPath - + >>> from onetl.impl import LocalPath >>> filter.match(LocalPath("/path/to/file.csv")) True >>> filter.match(LocalPath("/path/to/excluded.csv")) diff --git a/onetl/connection/file_connection/file_connection.py b/onetl/connection/file_connection/file_connection.py index 180b9b44a..e9e7a04dd 100644 --- a/onetl/connection/file_connection/file_connection.py +++ b/onetl/connection/file_connection/file_connection.py @@ -172,24 +172,25 @@ def get_stat(self, path: os.PathLike | str) -> PathStatProtocol: def resolve_dir(self, path: os.PathLike | str) -> RemoteDirectory: is_dir = self.is_dir(path) stat = self.get_stat(path) + remote_path = RemotePath(path) if not is_dir: - raise NotADirectoryError( - f"{path_repr(RemoteFile(path, stats=stat))} is not a directory", - ) + remote_file = RemoteFile(path=remote_path, stats=stat) + raise NotADirectoryError(f"{path_repr(remote_file)} is not a directory") - return RemoteDirectory(path=path, stats=stat) + return RemoteDirectory(path=remote_path, stats=stat) @slot def resolve_file(self, path: os.PathLike | str) -> RemoteFile: is_file = self.is_file(path) stat = self.get_stat(path) - remote_path = RemoteFile(path=path, stats=stat) + remote_path = RemotePath(path) if not is_file: - raise NotAFileError(f"{path_repr(remote_path)} is not a file") + remote_directory = RemoteDirectory(path=remote_path, stats=stat) + raise NotAFileError(f"{path_repr(remote_directory)} is not a file") - return remote_path + return RemoteFile(path=remote_path, stats=stat) @slot def read_text(self, path: os.PathLike | str, encoding: str = "utf-8", **kwargs) -> str: diff --git a/onetl/connection/file_connection/webdav.py b/onetl/connection/file_connection/webdav.py index 2b9e50ebd..5943884e7 100644 --- a/onetl/connection/file_connection/webdav.py +++ b/onetl/connection/file_connection/webdav.py @@ -43,7 +43,7 @@ ) from e log = getLogger(__name__) -DATA_MODIFIED_FORMAT = "%a, %d %b %Y %H:%M:%S GMT" +DATA_MODIFIED_FORMAT = "%a, %d %b %Y %H:%M:%S %Z" @support_hooks diff --git a/onetl/file/file_downloader/file_downloader.py b/onetl/file/file_downloader/file_downloader.py index 2d32be60c..d96d04f70 100644 --- a/onetl/file/file_downloader/file_downloader.py +++ b/onetl/file/file_downloader/file_downloader.py @@ -468,16 +468,15 @@ def view_files(self) -> FileSet[RemoteFile]: if not self._connection_checked: self._check_source_path() - result = FileSet() - filters = self.filters.copy() if self.hwm: filters.append(FileHWMFilter(hwm=self._init_hwm(self.hwm))) + result = FileSet() try: for root, _dirs, files in self.connection.walk(self.source_path, filters=filters, limits=self.limits): for file in files: - result.append(RemoteFile(path=root / file, stats=file.stats)) + result.append(RemoteFile(path=root / file, stats=file.stat())) except Exception as e: raise RuntimeError( diff --git a/onetl/file/file_mover/file_mover.py b/onetl/file/file_mover/file_mover.py index 81c25e3bc..3c1a34b1a 100644 --- a/onetl/file/file_mover/file_mover.py +++ b/onetl/file/file_mover/file_mover.py @@ -358,7 +358,7 @@ def view_files(self) -> FileSet[RemoteFile]: try: for root, _dirs, files in self.connection.walk(self.source_path, filters=self.filters, limits=self.limits): for file in files: - result.append(RemoteFile(path=root / file, stats=file.stats)) + result.append(RemoteFile(path=root / file, stats=file.stat())) except Exception as e: raise RuntimeError( diff --git a/onetl/file/filter/__init__.py b/onetl/file/filter/__init__.py index 3449d3bb4..0d208cba7 100644 --- a/onetl/file/filter/__init__.py +++ b/onetl/file/filter/__init__.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 from onetl.file.filter.exclude_dir import ExcludeDir from onetl.file.filter.file_hwm import FileHWMFilter +from onetl.file.filter.file_mtime import FileModifiedTime from onetl.file.filter.file_size import FileSizeRange from onetl.file.filter.glob import Glob from onetl.file.filter.match_all_filters import match_all_filters @@ -10,6 +11,7 @@ __all__ = [ "ExcludeDir", "FileHWMFilter", + "FileModifiedTime", "FileSizeRange", "Glob", "match_all_filters", diff --git a/onetl/file/filter/file_mtime.py b/onetl/file/filter/file_mtime.py new file mode 100644 index 000000000..98047e324 --- /dev/null +++ b/onetl/file/filter/file_mtime.py @@ -0,0 +1,117 @@ +# SPDX-FileCopyrightText: 2021-2024 MTS PJSC +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +from datetime import datetime +from typing import Optional + +from onetl.base.path_protocol import PathWithStatsProtocol + +try: + from pydantic.v1 import root_validator, validator +except (ImportError, AttributeError): + from pydantic import root_validator, validator # type: ignore[no-redef, assignment] + +from onetl.base import BaseFileFilter, PathProtocol +from onetl.impl import FrozenModel + + +class FileModifiedTime(BaseFileFilter, FrozenModel): + """Filter files matching a specified modification time. + + If file modification time (``.stat().st_mtime``) doesn't match range, it will be excluded. + Doesn't affect directories or paths without ``.stat()`` method. + + .. note:: + + Some filesystems return timestamps truncated to whole seconds (without millisecond part). + obj:`~since` and :obj`~until`` values should be adjusted accordingly. + + .. versionadded:: 0.13.0 + + Parameters + ---------- + + since : datetime, optional + + Minimal allowed file modification time. ``None`` means no limit. + + until : datetime, optional + + Maximum allowed file modification time. ``None`` means no limit. + + Examples + -------- + + Select files modified between start of the day (``00:00:00``) and hour ago: + + .. code:: python + + from datetime import datetime, timedelta + from onetl.file.filter import FileModifiedTime + + hour_ago = datetime.now() - timedelta(hours=1) + day_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + file_mtime = FileModifiedTime(since=day_start, until=hour_ago) + + Select only files modified since hour ago: + + .. code:: python + + from datetime import datetime, timedelta + from onetl.file.filter import FileModifiedTime + + hour_ago = datetime.now() - timedelta(hours=1) + file_mtime = FileModifiedTime(since=hour_ago) + """ + + since: Optional[datetime] = None + until: Optional[datetime] = None + + @root_validator(skip_on_failure=True) + def _validate_since_until(cls, values): + since = values.get("since") + until = values.get("until") + + if since is None and until is None: + raise ValueError("Either since or until must be specified") + + # since and until can be tz-naive and tz-aware, which are cannot be compared. + if since and until and since.timestamp() > until.timestamp(): + raise ValueError("since cannot be greater than until") + + return values + + @validator("since", "until", pre=True) + def _parse_isoformat(cls, value): + if isinstance(value, str): + # Pydantic doesn't allow values like "YYYY-MM-DD" as input, but .fromisoformat() does + return datetime.fromisoformat(value) + return value + + @validator("since", "until") + def _always_include_tz(cls, value): + if value.tzinfo is None: + # tz-naive datetime should be converted to tz-aware + return value.astimezone() + return value + + def __repr__(self): + since_human_readable = self.since.isoformat() if self.since is not None else None + until_human_readable = self.until.isoformat() if self.until is not None else None + return f"{self.__class__.__name__}(since={since_human_readable!r}, until={until_human_readable!r})" + + def match(self, path: PathProtocol) -> bool: + if path.is_file() and isinstance(path, PathWithStatsProtocol): + file_mtime = path.stat().st_mtime + if not file_mtime: + return True + + # mtime is always POSIX timestamp, avoid converting it to datetime and dancing with timezones + if self.since is not None and file_mtime < self.since.timestamp(): + return False + + if self.until is not None and file_mtime > self.until.timestamp(): + return False + + return True diff --git a/onetl/file/filter/file_size.py b/onetl/file/filter/file_size.py index 39efab781..5744eb6fd 100644 --- a/onetl/file/filter/file_size.py +++ b/onetl/file/filter/file_size.py @@ -18,8 +18,8 @@ class FileSizeRange(BaseFileFilter, FrozenModel): """Filter files matching a specified size. - If file size doesn't match boundaries, it will be excluded. - Doesn't affect directories or paths without defined size. + If file size (``.stat().st_size``) doesn't match the range, it will be excluded. + Doesn't affect directories or paths without ``.stat()`` method. .. versionadded:: 0.13.0 @@ -37,8 +37,6 @@ class FileSizeRange(BaseFileFilter, FrozenModel): max : int or str, optional - If file size is greater than this value, it will be excluded. - Maximum allowed file size. ``None`` means no limit. Examples @@ -72,7 +70,7 @@ class FileSizeRange(BaseFileFilter, FrozenModel): min: Optional[ByteSize] = None max: Optional[ByteSize] = None - @root_validator + @root_validator(skip_on_failure=True) def _validate_min_max(cls, values): min_value = values.get("min") max_value = values.get("max") diff --git a/onetl/impl/path_repr.py b/onetl/impl/path_repr.py index ad007280a..6b89e44a1 100644 --- a/onetl/impl/path_repr.py +++ b/onetl/impl/path_repr.py @@ -7,7 +7,7 @@ import stat import textwrap from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from pathlib import PurePath from humanize import naturalsize @@ -49,7 +49,7 @@ def from_path(cls, path) -> PathRepr: if path.exists() and isinstance(path, PathWithStatsProtocol): details = path.stat() result.size = details.st_size - result.mtime = datetime.fromtimestamp(details.st_mtime) if details.st_mtime else None + result.mtime = datetime.fromtimestamp(details.st_mtime, tz=timezone.utc) if details.st_mtime else None result.mode = details.st_mode result.user = details.st_uid result.group = details.st_gid diff --git a/setup.cfg b/setup.cfg index 3d6d37b61..99726a097 100644 --- a/setup.cfg +++ b/setup.cfg @@ -433,7 +433,11 @@ per-file-ignores = # WPS202 Found too many module members: 40 > 35 WPS202, # WPS210 Found too many local variables: 21 > 20 - WPS210 + WPS210, +# WPS441 Found control variable used after block: file + WPS441, +# WPS333 Found implicit complex compare + WPS333 [darglint] diff --git a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py index a76841965..274113f96 100644 --- a/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py +++ b/tests/tests_integration/tests_core_integration/test_file_downloader_integration.py @@ -11,8 +11,7 @@ from onetl.exception import DirectoryNotFoundError, NotAFileError from onetl.file import FileDownloader -from onetl.file.file_set import FileSet -from onetl.file.filter import ExcludeDir, Glob +from onetl.file.filter import ExcludeDir, FileModifiedTime, FileSizeRange, Glob, Regexp from onetl.file.limit import MaxFilesCount from onetl.impl import ( FailedRemoteFile, @@ -39,6 +38,8 @@ def test_file_downloader_view_file(file_connection_with_path_and_files): remote_files = downloader.view_files() remote_files_list = [] + # some clients return different st_mtime in .walk(root) and in .get_stat(file), + # so don't use .resolve_file(path) here for root, _dirs, files in file_connection.walk(remote_path): for file in files: remote_files_list.append(RemotePath(root) / file) @@ -160,11 +161,7 @@ def test_file_downloader_run_delete_source( # S3 does not support creating directories return - remote_files = FileSet() - for root, _dirs, files in file_connection.walk(remote_path): - for file in files: - remote_files.add(RemoteFile(path=root / file.name, stats=file.stats)) - + remote_files = [file for _root, _dirs, files in file_connection.walk(remote_path) for file in files] assert not remote_files @@ -175,7 +172,7 @@ def test_file_downloader_file_filter_exclude_dir( tmp_path_factory, caplog, ): - file_connection, remote_path, uploaded_files = file_connection_with_path_and_files + file_connection, remote_path, _ = file_connection_with_path_and_files local_path = tmp_path_factory.mktemp("local_path") downloader = FileDownloader( @@ -185,11 +182,6 @@ def test_file_downloader_file_filter_exclude_dir( filters=[ExcludeDir(path_type(remote_path / "exclude_dir"))], ) - excluded = [ - remote_path / "exclude_dir/excluded1.txt", - remote_path / "exclude_dir/nested/excluded2.txt", - ] - with caplog.at_level(logging.INFO): download_result = downloader.run() assert " filters = [" in caplog.text @@ -201,13 +193,16 @@ def test_file_downloader_file_filter_exclude_dir( assert not download_result.missing assert download_result.successful - assert sorted(download_result.successful) == sorted( - local_path / file.relative_to(remote_path) for file in uploaded_files if file not in excluded - ) + assert sorted(download_result.successful) == [ + local_path / "ascii.txt", + local_path / "nested/exclude_dir/excluded3.txt", + local_path / "some.csv", + local_path / "utf-8.txt", + ] def test_file_downloader_file_filter_glob(file_connection_with_path_and_files, tmp_path_factory, caplog): - file_connection, source_path, uploaded_files = file_connection_with_path_and_files + file_connection, source_path, _ = file_connection_with_path_and_files local_path = tmp_path_factory.mktemp("local_path") downloader = FileDownloader( @@ -217,18 +212,65 @@ def test_file_downloader_file_filter_glob(file_connection_with_path_and_files, t filters=[Glob("*.csv")], ) - excluded = [ - source_path / "utf-8.txt", - source_path / "ascii.txt", - source_path / "exclude_dir/excluded1.txt", - source_path / "exclude_dir/nested/excluded2.txt", - source_path / "nested/exclude_dir/excluded3.txt", + with caplog.at_level(logging.INFO): + download_result = downloader.run() + assert " filters = [" in caplog.text + assert " Glob('*.csv')," in caplog.text + assert " ]" in caplog.text + + assert not download_result.failed + assert not download_result.skipped + assert not download_result.missing + assert download_result.successful + + assert sorted(download_result.successful) == [local_path / "some.csv"] + + +def test_file_downloader_file_filter_regexp(file_connection_with_path_and_files, tmp_path_factory, caplog): + file_connection, source_path, _ = file_connection_with_path_and_files + local_path = tmp_path_factory.mktemp("local_path") + + downloader = FileDownloader( + connection=file_connection, + source_path=source_path, + local_path=local_path, + filters=[Regexp(r"\d+\.txt")], + ) + + with caplog.at_level(logging.INFO): + download_result = downloader.run() + assert " filters = [" in caplog.text + assert " Regexp(re.compile('\\\\d+\\\\.txt', re.IGNORECASE|re.DOTALL))," in caplog.text + assert " ]" in caplog.text + + assert not download_result.failed + assert not download_result.skipped + assert not download_result.missing + assert download_result.successful + + assert sorted(download_result.successful) == [ + local_path / "exclude_dir/excluded1.txt", + local_path / "exclude_dir/nested/excluded2.txt", + local_path / "nested/exclude_dir/excluded3.txt", + local_path / "utf-8.txt", ] + +def test_file_downloader_file_filter_file_size(file_connection_with_path_and_files, tmp_path_factory, caplog): + file_connection, source_path, _ = file_connection_with_path_and_files + local_path = tmp_path_factory.mktemp("local_path") + + downloader = FileDownloader( + connection=file_connection, + source_path=source_path, + local_path=local_path, + filters=[FileSizeRange(min="1B")], + ) + with caplog.at_level(logging.INFO): download_result = downloader.run() assert " filters = [" in caplog.text - assert " Glob('*.csv')," in caplog.text + assert " FileSizeRange(min='1.0B', max=None)," in caplog.text assert " ]" in caplog.text assert not download_result.failed @@ -236,11 +278,77 @@ def test_file_downloader_file_filter_glob(file_connection_with_path_and_files, t assert not download_result.missing assert download_result.successful + # other files are empty + assert sorted(download_result.successful) == [ + local_path / "ascii.txt", + local_path / "some.csv", + local_path / "utf-8.txt", + ] + + +def test_file_downloader_file_filter_file_mtime_since(file_connection_with_path_and_files, tmp_path_factory, caplog): + file_connection, source_path, uploaded_files = file_connection_with_path_and_files + local_path = tmp_path_factory.mktemp("local_path") + + remote_files = [file_connection.resolve_file(file) for file in uploaded_files] + remote_files.sort(key=lambda file: file.stat().st_mtime) + minimal_mtime = remote_files[0].stat().st_mtime + + downloader = FileDownloader( + connection=file_connection, + source_path=source_path, + local_path=local_path, + # some connectors return truncated timestamps in `.walk()`. + filters=[FileModifiedTime(since=int(minimal_mtime))], + ) + + with caplog.at_level(logging.INFO): + download_result = downloader.run() + assert " filters = [" in caplog.text + assert " FileModifiedTime(since=" in caplog.text + assert " ]" in caplog.text + + assert not download_result.failed + assert not download_result.skipped + assert not download_result.missing + assert download_result.successful + + # all files are newer than minimal_mtime assert sorted(download_result.successful) == sorted( - local_path / file.relative_to(source_path) for file in uploaded_files if file not in excluded + local_path / file.relative_to(source_path) for file in remote_files ) +def test_file_downloader_file_filter_file_mtime_until(file_connection_with_path_and_files, tmp_path_factory, caplog): + file_connection, source_path, uploaded_files = file_connection_with_path_and_files + local_path = tmp_path_factory.mktemp("local_path") + + remote_files = [file_connection.resolve_file(file) for file in uploaded_files] + remote_files.sort(key=lambda file: file.stat().st_mtime) + minimal_mtime = remote_files[0].stat().st_mtime + + downloader = FileDownloader( + connection=file_connection, + source_path=source_path, + local_path=local_path, + # some connectors return truncated timestamps in `.walk()`. + # also we should exclude files created exactly at minimal_mtime + filters=[FileModifiedTime(until=int(minimal_mtime) - 1)], + ) + + with caplog.at_level(logging.INFO): + download_result = downloader.run() + assert " filters = [" in caplog.text + assert " FileModifiedTime(since=" in caplog.text + assert " ]" in caplog.text + + # all files are older than minimal_mtime + assert not download_result.failed + assert not download_result.skipped + assert not download_result.missing + assert not download_result.successful + + def test_file_downloader_file_filter_is_ignored_by_user_input( file_connection_with_path_and_files, tmp_path_factory, diff --git a/tests/tests_integration/tests_core_integration/test_file_mover_integration.py b/tests/tests_integration/tests_core_integration/test_file_mover_integration.py index 762e9b9aa..9b5798cf3 100644 --- a/tests/tests_integration/tests_core_integration/test_file_mover_integration.py +++ b/tests/tests_integration/tests_core_integration/test_file_mover_integration.py @@ -8,7 +8,7 @@ from onetl.exception import DirectoryNotFoundError, NotAFileError from onetl.file import FileMover -from onetl.file.filter import ExcludeDir, Glob +from onetl.file.filter import ExcludeDir, FileModifiedTime, FileSizeRange, Glob, Regexp from onetl.file.limit import MaxFilesCount from onetl.impl import FailedRemoteFile, FileExistBehavior, RemoteFile, RemotePath @@ -26,6 +26,8 @@ def test_file_mover_view_file(file_connection_with_path_and_files): remote_files = mover.view_files() remote_files_list = [] + # some clients return different st_mtime in .walk(root) and in .get_stat(file), + # so don't use .resolve_file(path) here for root, _dirs, files in file_connection.walk(source_path): for file in files: remote_files_list.append(RemotePath(root) / file) @@ -61,13 +63,8 @@ def finalizer(): ) # record files content and size before move - files_content = {} - files_size = {} - for root, _dirs, files in file_connection.walk(source_path): - for file_name in files: - file_path = root / file_name - files_content[file_path] = file_connection.read_bytes(file_path) - files_size[file_path] = file_connection.get_stat(file_path).st_size + files_content = {file: file_connection.read_bytes(file) for file in uploaded_files} + files_size = {file: file_connection.get_stat(file).st_size for file in uploaded_files} with caplog.at_level(logging.DEBUG): move_result = mover.run() @@ -126,11 +123,6 @@ def finalizer(): filters=[ExcludeDir(path_type(source_path / "exclude_dir"))], ) - excluded = [ - source_path / "exclude_dir/excluded1.txt", - source_path / "exclude_dir/nested/excluded2.txt", - ] - with caplog.at_level(logging.INFO): move_result = mover.run() assert " filters = [" in caplog.text @@ -142,8 +134,14 @@ def finalizer(): assert not move_result.missing assert move_result.successful + included = [ + source_path / "ascii.txt", + source_path / "nested/exclude_dir/excluded3.txt", + source_path / "some.csv", + source_path / "utf-8.txt", + ] assert sorted(move_result.successful) == sorted( - target_path / file.relative_to(source_path) for file in uploaded_files if file not in excluded + target_path / file.relative_to(source_path) for file in uploaded_files if file in included ) @@ -163,18 +161,81 @@ def finalizer(): filters=[Glob("*.csv")], ) - excluded = [ - source_path / "utf-8.txt", - source_path / "ascii.txt", + with caplog.at_level(logging.INFO): + move_result = mover.run() + assert " filters = [" in caplog.text + assert " Glob('*.csv')," in caplog.text + assert " ]" in caplog.text + + assert not move_result.failed + assert not move_result.skipped + assert not move_result.missing + assert move_result.successful + + included = [source_path / "some.csv"] + assert sorted(move_result.successful) == sorted( + target_path / file.relative_to(source_path) for file in uploaded_files if file in included + ) + + +def test_file_mover_file_filter_regexp(request, file_connection_with_path_and_files, caplog): + file_connection, source_path, uploaded_files = file_connection_with_path_and_files + target_path = f"/tmp/test_move_{secrets.token_hex(5)}" + + def finalizer(): + file_connection.remove_dir(target_path, recursive=True) + + request.addfinalizer(finalizer) + + mover = FileMover( + connection=file_connection, + source_path=source_path, + target_path=target_path, + filters=[Regexp(r"\d+\.txt")], + ) + + with caplog.at_level(logging.INFO): + move_result = mover.run() + assert " filters = [" in caplog.text + assert " Regexp(re.compile('\\\\d+\\\\.txt', re.IGNORECASE|re.DOTALL))," in caplog.text + assert " ]" in caplog.text + + assert not move_result.failed + assert not move_result.skipped + assert not move_result.missing + assert move_result.successful + + included = [ source_path / "exclude_dir/excluded1.txt", source_path / "exclude_dir/nested/excluded2.txt", source_path / "nested/exclude_dir/excluded3.txt", + source_path / "utf-8.txt", ] + assert sorted(move_result.successful) == sorted( + target_path / file.relative_to(source_path) for file in uploaded_files if file in included + ) + + +def test_file_mover_file_filter_file_size(request, file_connection_with_path_and_files, caplog): + file_connection, source_path, uploaded_files = file_connection_with_path_and_files + target_path = f"/tmp/test_move_{secrets.token_hex(5)}" + + def finalizer(): + file_connection.remove_dir(target_path, recursive=True) + + request.addfinalizer(finalizer) + + mover = FileMover( + connection=file_connection, + source_path=source_path, + target_path=target_path, + filters=[FileSizeRange(min="1B")], + ) with caplog.at_level(logging.INFO): move_result = mover.run() assert " filters = [" in caplog.text - assert " Glob('*.csv')," in caplog.text + assert " FileSizeRange(min='1.0B', max=None)," in caplog.text assert " ]" in caplog.text assert not move_result.failed @@ -182,13 +243,90 @@ def finalizer(): assert not move_result.missing assert move_result.successful + # other files are empty + included = [ + source_path / "ascii.txt", + source_path / "some.csv", + source_path / "utf-8.txt", + ] assert sorted(move_result.successful) == sorted( - target_path / file.relative_to(source_path) - for file in uploaded_files - if file not in excluded and source_path in file.parents + target_path / file.relative_to(source_path) for file in uploaded_files if file in included ) +def test_file_mover_file_filter_file_mtime_since(request, file_connection_with_path_and_files, caplog): + file_connection, source_path, uploaded_files = file_connection_with_path_and_files + target_path = f"/tmp/test_move_{secrets.token_hex(5)}" + + def finalizer(): + file_connection.remove_dir(target_path, recursive=True) + + request.addfinalizer(finalizer) + + remote_files = [file_connection.resolve_file(file) for file in uploaded_files] + remote_files.sort(key=lambda file: file.stat().st_mtime) + minimal_mtime = remote_files[0].stat().st_mtime + + mover = FileMover( + connection=file_connection, + source_path=source_path, + target_path=target_path, + # some connectors return truncated timestamps in `.walk()`. + filters=[FileModifiedTime(since=int(minimal_mtime))], + ) + + with caplog.at_level(logging.INFO): + move_result = mover.run() + assert " filters = [" in caplog.text + assert " FileModifiedTime(since=" in caplog.text + assert " ]" in caplog.text + + assert not move_result.failed + assert not move_result.skipped + assert not move_result.missing + assert move_result.successful + + # all files are newer than minimal_mtime + assert sorted(move_result.successful) == sorted( + target_path / file.relative_to(source_path) for file in remote_files + ) + + +def test_file_mover_file_filter_file_mtime_until(request, file_connection_with_path_and_files, caplog): + file_connection, source_path, uploaded_files = file_connection_with_path_and_files + target_path = f"/tmp/test_move_{secrets.token_hex(5)}" + + def finalizer(): + file_connection.remove_dir(target_path, recursive=True) + + request.addfinalizer(finalizer) + + remote_files = [file_connection.resolve_file(file) for file in uploaded_files] + remote_files.sort(key=lambda file: file.stat().st_mtime) + minimal_mtime = remote_files[0].stat().st_mtime + + mover = FileMover( + connection=file_connection, + source_path=source_path, + target_path=target_path, + # some connectors return truncated timestamps in `.walk()`. + # also we should exclude files created exactly at minimal_mtime + filters=[FileModifiedTime(until=int(minimal_mtime) - 1)], + ) + + with caplog.at_level(logging.INFO): + move_result = mover.run() + assert " filters = [" in caplog.text + assert " FileModifiedTime(since=" in caplog.text + assert " ]" in caplog.text + + # all files are older than minimal_mtime + assert not move_result.failed + assert not move_result.skipped + assert not move_result.missing + assert not move_result.successful + + def test_file_mover_file_filter_is_ignored_by_user_input( request, file_connection_with_path_and_files, @@ -242,13 +380,8 @@ def finalizer(): ) # record files content and size before move - files_content = {} - files_size = {} - for root, _dirs, files in file_connection.walk(source_path): - for file_name in files: - file_path = root / file_name - files_content[file_path] = file_connection.read_bytes(file_path) - files_size[file_path] = file_connection.get_stat(file_path).st_size + files_content = {file: file_connection.read_bytes(file) for file in uploaded_files} + files_size = {file: file_connection.get_stat(file).st_size for file in uploaded_files} with caplog.at_level(logging.WARNING): move_result = mover.run(uploaded_files) @@ -301,13 +434,8 @@ def finalizer(): relative_files_path = [file.relative_to(source_path) for file in uploaded_files] # record files content and size before move - files_content = {} - files_size = {} - for root, _dirs, files in file_connection.walk(source_path): - for file_name in files: - file_path = root / file_name - files_content[file_path] = file_connection.read_bytes(file_path) - files_size[file_path] = file_connection.get_stat(file_path).st_size + files_content = {file: file_connection.read_bytes(file) for file in uploaded_files} + files_size = {file: file_connection.get_stat(file).st_size for file in uploaded_files} mover = FileMover( connection=file_connection, diff --git a/tests/tests_unit/test_file/test_filter/test_file_modified_time.py b/tests/tests_unit/test_file/test_filter/test_file_modified_time.py new file mode 100644 index 000000000..4953ecdc5 --- /dev/null +++ b/tests/tests_unit/test_file/test_filter/test_file_modified_time.py @@ -0,0 +1,97 @@ +from datetime import datetime, timedelta, timezone + +import pytest + +from onetl.file.filter import FileModifiedTime +from onetl.impl import RemoteDirectory, RemoteFile, RemotePathStat + + +def test_file_modified_time_invalid(): + with pytest.raises(ValueError, match="Either since or until must be specified"): + FileModifiedTime() + + with pytest.raises(ValueError, match="since cannot be greater than until"): + FileModifiedTime(since=datetime(2025, 1, 1), until=datetime(2024, 1, 1)) + + with pytest.raises(ValueError, match="Invalid isoformat string"): + FileModifiedTime(since="wtf") + with pytest.raises(ValueError, match="Invalid isoformat string"): + FileModifiedTime(until="wtf") + + +# values always timezone-aware +@pytest.mark.parametrize( + ["input", "expected"], + [ + ( + datetime(2025, 1, 1), + datetime(2025, 1, 1).astimezone(), + ), + ( + "2025-01-01", + datetime(2025, 1, 1).astimezone(), + ), + ( + datetime(2025, 1, 1, 11, 22, 33, 456789), + datetime(2025, 1, 1, 11, 22, 33, 456789).astimezone(), + ), + ( + "2025-01-01T11:22:33.456789", + datetime(2025, 1, 1, 11, 22, 33, 456789).astimezone(), + ), + ( + datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc), + datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc), + ), + ( + "2025-01-01T11:22:33.456789+00:00", + datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc), + ), + ( + datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone(timedelta(hours=3))), + datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone(timedelta(hours=3))), + ), + ( + "2025-01-01T11:22:33.456789+03:00", + datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone(timedelta(hours=3))), + ), + ], +) +def test_file_modified_time_parse(input: str, expected: datetime): + assert FileModifiedTime(since=input).since == expected + assert FileModifiedTime(until=input).until == expected + + +def test_file_modified_time_repr(): + value = FileModifiedTime( + since=datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc), + until=datetime(2025, 1, 1, 22, 33, 44, 567890, tzinfo=timezone(timedelta(hours=3))), + ) + + since_str = "2025-01-01T11:22:33.456789+00:00" + until_str = "2025-01-01T22:33:44.567890+03:00" + expected = f"FileModifiedTime(since='{since_str}', until='{until_str}')" + assert repr(value) == expected + + +# only POSIX timestamps are compared, so all values are in UTC +@pytest.mark.parametrize( + "matched, mtime", + [ + (False, datetime(2025, 1, 1, 11, 22, 33, 456788, tzinfo=timezone.utc)), # since-1ms + (True, datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc)), + (True, datetime(2025, 1, 1, 22, 33, 44, 567890, tzinfo=timezone.utc)), + (False, datetime(2025, 1, 1, 22, 33, 44, 567891, tzinfo=timezone.utc)), # util+1ms + ], +) +def test_file_modified_time_match(matched: bool, mtime: datetime): + file_filter = FileModifiedTime( + since=datetime(2025, 1, 1, 11, 22, 33, 456789, tzinfo=timezone.utc), + until=datetime(2025, 1, 1, 22, 33, 44, 567890, tzinfo=timezone.utc), + ) + + file = RemoteFile(path="file.csv", stats=RemotePathStat(st_size=0, st_mtime=mtime.timestamp())) + assert file_filter.match(file) == matched + + directory = RemoteDirectory("some") + assert file_filter.match(directory) diff --git a/tests/tests_unit/test_file/test_filter/test_file_size_range.py b/tests/tests_unit/test_file/test_filter/test_file_size_range.py index f3bb1db40..e258132c4 100644 --- a/tests/tests_unit/test_file/test_filter/test_file_size_range.py +++ b/tests/tests_unit/test_file/test_filter/test_file_size_range.py @@ -40,25 +40,30 @@ def test_file_size_range_repr(): ("10GiB", 10 * 1024 * 1024 * 1024), ], ) -def test_file_size_range_parse_units(input: str, expected: int): - assert FileSizeRange(min=input.replace("B", "b")).min == expected +def test_file_size_range_parse(input: str, expected: int): + assert FileSizeRange(min=expected).min == expected assert FileSizeRange(min=input).min == expected - assert FileSizeRange(max=input.replace("B", "b")).max == expected + assert FileSizeRange(min=input.replace("B", "b")).min == expected + assert FileSizeRange(max=expected).max == expected assert FileSizeRange(max=input).max == expected + assert FileSizeRange(max=input.replace("B", "b")).max == expected @pytest.mark.parametrize( - "matched, path", + "matched, size", [ - (False, RemoteFile(path="file.csv", stats=RemotePathStat(st_size=1024, st_mtime=50))), - (True, RemoteFile(path="file.csv", stats=RemotePathStat(st_size=10 * 1024, st_mtime=50))), - (True, RemoteFile(path="file.csv", stats=RemotePathStat(st_size=15 * 1024, st_mtime=50))), - (True, RemoteFile(path="file.csv", stats=RemotePathStat(st_size=20 * 1024, st_mtime=50))), - (False, RemoteFile(path="file.csv", stats=RemotePathStat(st_size=30 * 1024, st_mtime=50))), - (True, RemoteDirectory("some")), + (False, 1024), + (True, 10 * 1024), + (True, 15 * 1024), + (True, 20 * 1024), + (False, 30 * 1024), ], ) -def test_file_size_range_match(matched, path): +def test_file_size_range_match(matched: bool, size: int): file_filter = FileSizeRange(min="10Kib", max="20Kib") - assert file_filter.match(path) == matched + file = RemoteFile(path="file.csv", stats=RemotePathStat(st_size=size, st_mtime=50)) + assert file_filter.match(file) == matched + + directory = RemoteDirectory("some") + assert file_filter.match(directory) diff --git a/tests/tests_unit/test_impl_unit.py b/tests/tests_unit/test_impl_unit.py index 5cf1b46e1..e6554ec41 100644 --- a/tests/tests_unit/test_impl_unit.py +++ b/tests/tests_unit/test_impl_unit.py @@ -1,8 +1,8 @@ import os import stat import textwrap -from datetime import datetime -from time import time +import time +from datetime import datetime, timezone import pytest @@ -342,8 +342,8 @@ def test_path_repr_stats_with_size(st_size, details): ], ) def test_path_repr_stats_with_mtime(path_class, kind): - current_timestamp = time() - current_datetime = datetime.fromtimestamp(current_timestamp).isoformat() + current_timestamp = time.time() + current_datetime = datetime.fromtimestamp(current_timestamp, tz=timezone.utc).isoformat() options = {"with_size": False, "with_mode": False, "with_owner": False} file1 = path_class("a/b/c", stats=RemotePathStat(st_mtime=current_timestamp))