Skip to content

Commit

Permalink
[DOP-22127] Implement FileModificationTime filter
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jan 27, 2025
1 parent 5a3d541 commit e983a57
Show file tree
Hide file tree
Showing 18 changed files with 567 additions and 98 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/330.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement ``FileModifiedTime(since=..., until=...)`` file filter. Now users can set ``FileDownloader`` / ``FileMover`` to download/move only files with specific file modification time.
9 changes: 9 additions & 0 deletions docs/file/file_filters/file_mtime_filter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.. _file-modificatiom-time:

FileModifiedTime
====================

.. currentmodule:: onetl.file.filter.file_mtime

.. autoclass:: FileModifiedTime
:members: match
1 change: 1 addition & 0 deletions docs/file/file_filters/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ File Filters
regexp
exclude_dir
file_size_filter
file_mtime_filter

.. toctree::
:maxdepth: 1
Expand Down
3 changes: 1 addition & 2 deletions onetl/base/base_file_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ def match(self, path: PathProtocol) -> bool:
Examples
--------
from onetl.impl import LocalPath
>>> from onetl.impl import LocalPath
>>> filter.match(LocalPath("/path/to/file.csv"))
True
>>> filter.match(LocalPath("/path/to/excluded.csv"))
Expand Down
15 changes: 8 additions & 7 deletions onetl/connection/file_connection/file_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,25 @@ def get_stat(self, path: os.PathLike | str) -> PathStatProtocol:
def resolve_dir(self, path: os.PathLike | str) -> RemoteDirectory:
is_dir = self.is_dir(path)
stat = self.get_stat(path)
remote_path = RemotePath(path)

if not is_dir:
raise NotADirectoryError(
f"{path_repr(RemoteFile(path, stats=stat))} is not a directory",
)
remote_file = RemoteFile(path=remote_path, stats=stat)
raise NotADirectoryError(f"{path_repr(remote_file)} is not a directory")

return RemoteDirectory(path=path, stats=stat)
return RemoteDirectory(path=remote_path, stats=stat)

@slot
def resolve_file(self, path: os.PathLike | str) -> RemoteFile:
is_file = self.is_file(path)
stat = self.get_stat(path)
remote_path = RemoteFile(path=path, stats=stat)
remote_path = RemotePath(path)

if not is_file:
raise NotAFileError(f"{path_repr(remote_path)} is not a file")
remote_directory = RemoteDirectory(path=remote_path, stats=stat)
raise NotAFileError(f"{path_repr(remote_directory)} is not a file")

return remote_path
return RemoteFile(path=remote_path, stats=stat)

@slot
def read_text(self, path: os.PathLike | str, encoding: str = "utf-8", **kwargs) -> str:
Expand Down
2 changes: 1 addition & 1 deletion onetl/connection/file_connection/webdav.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
) from e

log = getLogger(__name__)
DATA_MODIFIED_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"
DATA_MODIFIED_FORMAT = "%a, %d %b %Y %H:%M:%S %Z"


@support_hooks
Expand Down
5 changes: 2 additions & 3 deletions onetl/file/file_downloader/file_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,16 +468,15 @@ def view_files(self) -> FileSet[RemoteFile]:
if not self._connection_checked:
self._check_source_path()

result = FileSet()

filters = self.filters.copy()
if self.hwm:
filters.append(FileHWMFilter(hwm=self._init_hwm(self.hwm)))

result = FileSet()
try:
for root, _dirs, files in self.connection.walk(self.source_path, filters=filters, limits=self.limits):
for file in files:
result.append(RemoteFile(path=root / file, stats=file.stats))
result.append(RemoteFile(path=root / file, stats=file.stat()))

except Exception as e:
raise RuntimeError(
Expand Down
2 changes: 1 addition & 1 deletion onetl/file/file_mover/file_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def view_files(self) -> FileSet[RemoteFile]:
try:
for root, _dirs, files in self.connection.walk(self.source_path, filters=self.filters, limits=self.limits):
for file in files:
result.append(RemoteFile(path=root / file, stats=file.stats))
result.append(RemoteFile(path=root / file, stats=file.stat()))

except Exception as e:
raise RuntimeError(
Expand Down
2 changes: 2 additions & 0 deletions onetl/file/filter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
from onetl.file.filter.exclude_dir import ExcludeDir
from onetl.file.filter.file_hwm import FileHWMFilter
from onetl.file.filter.file_mtime import FileModifiedTime
from onetl.file.filter.file_size import FileSizeRange
from onetl.file.filter.glob import Glob
from onetl.file.filter.match_all_filters import match_all_filters
Expand All @@ -10,6 +11,7 @@
__all__ = [
"ExcludeDir",
"FileHWMFilter",
"FileModifiedTime",
"FileSizeRange",
"Glob",
"match_all_filters",
Expand Down
117 changes: 117 additions & 0 deletions onetl/file/filter/file_mtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# SPDX-FileCopyrightText: 2021-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from datetime import datetime
from typing import Optional

from onetl.base.path_protocol import PathWithStatsProtocol

try:
from pydantic.v1 import root_validator, validator
except (ImportError, AttributeError):
from pydantic import root_validator, validator # type: ignore[no-redef, assignment]

Check warning on line 13 in onetl/file/filter/file_mtime.py

View check run for this annotation

Codecov / codecov/patch

onetl/file/filter/file_mtime.py#L12-L13

Added lines #L12 - L13 were not covered by tests

from onetl.base import BaseFileFilter, PathProtocol
from onetl.impl import FrozenModel


class FileModifiedTime(BaseFileFilter, FrozenModel):
"""Filter files matching a specified modification time.
If file modification time (``.stat().st_mtime``) doesn't match range, it will be excluded.
Doesn't affect directories or paths without ``.stat()`` method.
.. note::
Some filesystems return timestamps truncated to whole seconds (without millisecond part).
obj:`~since` and :obj`~until`` values should be adjusted accordingly.
.. versionadded:: 0.13.0
Parameters
----------
since : datetime, optional
Minimal allowed file modification time. ``None`` means no limit.
until : datetime, optional
Maximum allowed file modification time. ``None`` means no limit.
Examples
--------
Select files modified between start of the day (``00:00:00``) and hour ago:
.. code:: python
from datetime import datetime, timedelta
from onetl.file.filter import FileModifiedTime
hour_ago = datetime.now() - timedelta(hours=1)
day_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
file_mtime = FileModifiedTime(since=day_start, until=hour_ago)
Select only files modified since hour ago:
.. code:: python
from datetime import datetime, timedelta
from onetl.file.filter import FileModifiedTime
hour_ago = datetime.now() - timedelta(hours=1)
file_mtime = FileModifiedTime(since=hour_ago)
"""

since: Optional[datetime] = None
until: Optional[datetime] = None

@root_validator(skip_on_failure=True)
def _validate_since_until(cls, values):
since = values.get("since")
until = values.get("until")

if since is None and until is None:
raise ValueError("Either since or until must be specified")

# since and until can be tz-naive and tz-aware, which are cannot be compared.
if since and until and since.timestamp() > until.timestamp():
raise ValueError("since cannot be greater than until")

return values

@validator("since", "until", pre=True)
def _parse_isoformat(cls, value):
if isinstance(value, str):
# Pydantic doesn't allow values like "YYYY-MM-DD" as input, but .fromisoformat() does
return datetime.fromisoformat(value)
return value

@validator("since", "until")
def _always_include_tz(cls, value):
if value.tzinfo is None:
# tz-naive datetime should be converted to tz-aware
return value.astimezone()
return value

def __repr__(self):
since_human_readable = self.since.isoformat() if self.since is not None else None
until_human_readable = self.until.isoformat() if self.until is not None else None
return f"{self.__class__.__name__}(since={since_human_readable!r}, until={until_human_readable!r})"

def match(self, path: PathProtocol) -> bool:
if path.is_file() and isinstance(path, PathWithStatsProtocol):
file_mtime = path.stat().st_mtime
if not file_mtime:
return True

Check warning on line 108 in onetl/file/filter/file_mtime.py

View check run for this annotation

Codecov / codecov/patch

onetl/file/filter/file_mtime.py#L108

Added line #L108 was not covered by tests

# mtime is always POSIX timestamp, avoid converting it to datetime and dancing with timezones
if self.since is not None and file_mtime < self.since.timestamp():
return False

if self.until is not None and file_mtime > self.until.timestamp():
return False

return True
8 changes: 3 additions & 5 deletions onetl/file/filter/file_size.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
class FileSizeRange(BaseFileFilter, FrozenModel):
"""Filter files matching a specified size.
If file size doesn't match boundaries, it will be excluded.
Doesn't affect directories or paths without defined size.
If file size (``.stat().st_size``) doesn't match the range, it will be excluded.
Doesn't affect directories or paths without ``.stat()`` method.
.. versionadded:: 0.13.0
Expand All @@ -37,8 +37,6 @@ class FileSizeRange(BaseFileFilter, FrozenModel):
max : int or str, optional
If file size is greater than this value, it will be excluded.
Maximum allowed file size. ``None`` means no limit.
Examples
Expand Down Expand Up @@ -72,7 +70,7 @@ class FileSizeRange(BaseFileFilter, FrozenModel):
min: Optional[ByteSize] = None
max: Optional[ByteSize] = None

@root_validator
@root_validator(skip_on_failure=True)
def _validate_min_max(cls, values):
min_value = values.get("min")
max_value = values.get("max")
Expand Down
4 changes: 2 additions & 2 deletions onetl/impl/path_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import stat
import textwrap
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timezone
from pathlib import PurePath

from humanize import naturalsize
Expand Down Expand Up @@ -49,7 +49,7 @@ def from_path(cls, path) -> PathRepr:
if path.exists() and isinstance(path, PathWithStatsProtocol):
details = path.stat()
result.size = details.st_size
result.mtime = datetime.fromtimestamp(details.st_mtime) if details.st_mtime else None
result.mtime = datetime.fromtimestamp(details.st_mtime, tz=timezone.utc) if details.st_mtime else None
result.mode = details.st_mode
result.user = details.st_uid
result.group = details.st_gid
Expand Down
6 changes: 5 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,11 @@ per-file-ignores =
# WPS202 Found too many module members: 40 > 35
WPS202,
# WPS210 Found too many local variables: 21 > 20
WPS210
WPS210,
# WPS441 Found control variable used after block: file
WPS441,
# WPS333 Found implicit complex compare
WPS333


[darglint]
Expand Down
Loading

0 comments on commit e983a57

Please sign in to comment.