Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-22127] Implement FileModifiedTime filter #330

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/330.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement ``FileModifiedTime(since=..., until=...)`` file filter. Now users can set ``FileDownloader`` / ``FileMover`` to download/move only files with specific file modification time.
9 changes: 9 additions & 0 deletions docs/file/file_filters/file_mtime_filter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.. _file-modificatiom-time:

FileModifiedTime
====================

.. currentmodule:: onetl.file.filter.file_mtime

.. autoclass:: FileModifiedTime
:members: match
1 change: 1 addition & 0 deletions docs/file/file_filters/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ File Filters
regexp
exclude_dir
file_size_filter
file_mtime_filter

.. toctree::
:maxdepth: 1
Expand Down
3 changes: 1 addition & 2 deletions onetl/base/base_file_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def match(self, path: PathProtocol) -> bool:
Examples
--------

from onetl.impl import LocalPath

>>> from onetl.impl import LocalPath
>>> filter.match(LocalPath("/path/to/file.csv"))
True
>>> filter.match(LocalPath("/path/to/excluded.csv"))
Expand Down
15 changes: 8 additions & 7 deletions onetl/connection/file_connection/file_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,25 @@ def get_stat(self, path: os.PathLike | str) -> PathStatProtocol:
def resolve_dir(self, path: os.PathLike | str) -> RemoteDirectory:
is_dir = self.is_dir(path)
stat = self.get_stat(path)
remote_path = RemotePath(path)

if not is_dir:
raise NotADirectoryError(
f"{path_repr(RemoteFile(path, stats=stat))} is not a directory",
)
remote_file = RemoteFile(path=remote_path, stats=stat)
raise NotADirectoryError(f"{path_repr(remote_file)} is not a directory")

return RemoteDirectory(path=path, stats=stat)
return RemoteDirectory(path=remote_path, stats=stat)

@slot
def resolve_file(self, path: os.PathLike | str) -> RemoteFile:
is_file = self.is_file(path)
stat = self.get_stat(path)
remote_path = RemoteFile(path=path, stats=stat)
remote_path = RemotePath(path)

if not is_file:
raise NotAFileError(f"{path_repr(remote_path)} is not a file")
remote_directory = RemoteDirectory(path=remote_path, stats=stat)
raise NotAFileError(f"{path_repr(remote_directory)} is not a file")

return remote_path
return RemoteFile(path=remote_path, stats=stat)

@slot
def read_text(self, path: os.PathLike | str, encoding: str = "utf-8", **kwargs) -> str:
Expand Down
2 changes: 1 addition & 1 deletion onetl/connection/file_connection/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
) from e

log = getLogger(__name__)
DATA_MODIFIED_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"
DATA_MODIFIED_FORMAT = "%a, %d %b %Y %H:%M:%S %Z"


@support_hooks
Expand Down
5 changes: 2 additions & 3 deletions onetl/file/file_downloader/file_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,16 +468,15 @@ def view_files(self) -> FileSet[RemoteFile]:
if not self._connection_checked:
self._check_source_path()

result = FileSet()

filters = self.filters.copy()
if self.hwm:
filters.append(FileHWMFilter(hwm=self._init_hwm(self.hwm)))

result = FileSet()
try:
for root, _dirs, files in self.connection.walk(self.source_path, filters=filters, limits=self.limits):
for file in files:
result.append(RemoteFile(path=root / file, stats=file.stats))
result.append(RemoteFile(path=root / file, stats=file.stat()))

except Exception as e:
raise RuntimeError(
Expand Down
2 changes: 1 addition & 1 deletion onetl/file/file_mover/file_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def view_files(self) -> FileSet[RemoteFile]:
try:
for root, _dirs, files in self.connection.walk(self.source_path, filters=self.filters, limits=self.limits):
for file in files:
result.append(RemoteFile(path=root / file, stats=file.stats))
result.append(RemoteFile(path=root / file, stats=file.stat()))

except Exception as e:
raise RuntimeError(
Expand Down
2 changes: 2 additions & 0 deletions onetl/file/filter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
from onetl.file.filter.exclude_dir import ExcludeDir
from onetl.file.filter.file_hwm import FileHWMFilter
from onetl.file.filter.file_mtime import FileModifiedTime
from onetl.file.filter.file_size import FileSizeRange
from onetl.file.filter.glob import Glob
from onetl.file.filter.match_all_filters import match_all_filters
Expand All @@ -10,6 +11,7 @@
__all__ = [
"ExcludeDir",
"FileHWMFilter",
"FileModifiedTime",
"FileSizeRange",
"Glob",
"match_all_filters",
Expand Down
117 changes: 117 additions & 0 deletions onetl/file/filter/file_mtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# SPDX-FileCopyrightText: 2021-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from datetime import datetime
from typing import Optional

from onetl.base.path_protocol import PathWithStatsProtocol

try:
from pydantic.v1 import root_validator, validator
except (ImportError, AttributeError):
from pydantic import root_validator, validator # type: ignore[no-redef, assignment]

Check warning on line 13 in onetl/file/filter/file_mtime.py

View check run for this annotation

Codecov / codecov/patch

onetl/file/filter/file_mtime.py#L12-L13

Added lines #L12 - L13 were not covered by tests

from onetl.base import BaseFileFilter, PathProtocol
from onetl.impl import FrozenModel


class FileModifiedTime(BaseFileFilter, FrozenModel):
"""Filter files matching a specified modification time.

If file modification time (``.stat().st_mtime``) doesn't match range, it will be excluded.
Doesn't affect directories or paths without ``.stat()`` method.

.. note::

Some filesystems return timestamps truncated to whole seconds (without millisecond part).
obj:`~since` and :obj`~until`` values should be adjusted accordingly.

.. versionadded:: 0.13.0

Parameters
----------

since : datetime, optional

Minimal allowed file modification time. ``None`` means no limit.

until : datetime, optional

Maximum allowed file modification time. ``None`` means no limit.

Examples
--------

Select files modified between start of the day (``00:00:00``) and hour ago:

.. code:: python

from datetime import datetime, timedelta
from onetl.file.filter import FileModifiedTime

hour_ago = datetime.now() - timedelta(hours=1)
day_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
file_mtime = FileModifiedTime(since=day_start, until=hour_ago)

Select only files modified since hour ago:

.. code:: python

from datetime import datetime, timedelta
from onetl.file.filter import FileModifiedTime

hour_ago = datetime.now() - timedelta(hours=1)
file_mtime = FileModifiedTime(since=hour_ago)
"""

since: Optional[datetime] = None
until: Optional[datetime] = None

@root_validator(skip_on_failure=True)
def _validate_since_until(cls, values):
since = values.get("since")
until = values.get("until")

if since is None and until is None:
raise ValueError("Either since or until must be specified")

# since and until can be tz-naive and tz-aware, which are cannot be compared.
if since and until and since.timestamp() > until.timestamp():
raise ValueError("since cannot be greater than until")

return values

@validator("since", "until", pre=True)
def _parse_isoformat(cls, value):
if isinstance(value, str):
# Pydantic doesn't allow values like "YYYY-MM-DD" as input, but .fromisoformat() does
return datetime.fromisoformat(value)
return value

@validator("since", "until")
def _always_include_tz(cls, value):
if value.tzinfo is None:
# tz-naive datetime should be converted to tz-aware
return value.astimezone()
return value

def __repr__(self):
since_human_readable = self.since.isoformat() if self.since is not None else None
until_human_readable = self.until.isoformat() if self.until is not None else None
return f"{self.__class__.__name__}(since={since_human_readable!r}, until={until_human_readable!r})"

def match(self, path: PathProtocol) -> bool:
if path.is_file() and isinstance(path, PathWithStatsProtocol):
file_mtime = path.stat().st_mtime
if not file_mtime:
return True

Check warning on line 108 in onetl/file/filter/file_mtime.py

View check run for this annotation

Codecov / codecov/patch

onetl/file/filter/file_mtime.py#L108

Added line #L108 was not covered by tests

# mtime is always POSIX timestamp, avoid converting it to datetime and dancing with timezones
if self.since is not None and file_mtime < self.since.timestamp():
return False

if self.until is not None and file_mtime > self.until.timestamp():
return False

return True
8 changes: 3 additions & 5 deletions onetl/file/filter/file_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
class FileSizeRange(BaseFileFilter, FrozenModel):
"""Filter files matching a specified size.

If file size doesn't match boundaries, it will be excluded.
Doesn't affect directories or paths without defined size.
If file size (``.stat().st_size``) doesn't match the range, it will be excluded.
Doesn't affect directories or paths without ``.stat()`` method.

.. versionadded:: 0.13.0

Expand All @@ -37,8 +37,6 @@ class FileSizeRange(BaseFileFilter, FrozenModel):

max : int or str, optional

If file size is greater than this value, it will be excluded.

Maximum allowed file size. ``None`` means no limit.

Examples
Expand Down Expand Up @@ -72,7 +70,7 @@ class FileSizeRange(BaseFileFilter, FrozenModel):
min: Optional[ByteSize] = None
max: Optional[ByteSize] = None

@root_validator
@root_validator(skip_on_failure=True)
def _validate_min_max(cls, values):
min_value = values.get("min")
max_value = values.get("max")
Expand Down
4 changes: 2 additions & 2 deletions onetl/impl/path_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import stat
import textwrap
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timezone
from pathlib import PurePath

from humanize import naturalsize
Expand Down Expand Up @@ -49,7 +49,7 @@ def from_path(cls, path) -> PathRepr:
if path.exists() and isinstance(path, PathWithStatsProtocol):
details = path.stat()
result.size = details.st_size
result.mtime = datetime.fromtimestamp(details.st_mtime) if details.st_mtime else None
result.mtime = datetime.fromtimestamp(details.st_mtime, tz=timezone.utc) if details.st_mtime else None
result.mode = details.st_mode
result.user = details.st_uid
result.group = details.st_gid
Expand Down
6 changes: 5 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,11 @@ per-file-ignores =
# WPS202 Found too many module members: 40 > 35
WPS202,
# WPS210 Found too many local variables: 21 > 20
WPS210
WPS210,
# WPS441 Found control variable used after block: file
WPS441,
# WPS333 Found implicit complex compare
WPS333


[darglint]
Expand Down
Loading
Loading