Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: config_monitor: use sha256 hash to monitor the change of config file; temporary disable ecu_info.yaml check #19

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/otaclient-logger.service
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Environment=UPLOAD_LOGGING_SERVER_LOGS=true
Environment=SERVER_LOGGING_LEVEL=INFO
Restart=on-failure
RestartSec=10
Type=simple
Type=nofity
NotifyAccess=all

[Install]
WantedBy=multi-user.target
47 changes: 47 additions & 0 deletions src/otaclient_iot_logging_server/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@

from __future__ import annotations

import hashlib
import io
import sys
import time
from functools import partial, wraps
from pathlib import Path
from typing import Any, Callable, Optional, TypeVar, overload

from pydantic import BaseModel, ConfigDict
Expand Down Expand Up @@ -144,3 +148,46 @@ def parse_pkcs11_uri(_pkcs11_uri: str) -> PKCS11URI:
k, v = opt.split("=", maxsplit=1)
pkcs11_opts_dict[k] = v
return PKCS11URI(**pkcs11_opts_dict)


DEFAULT_FILE_CHUNK_SIZE = 1024 * 1024 # 1MiB

if sys.version_info >= (3, 11):
from hashlib import file_digest as _file_digest

else:

def _file_digest(
fileobj: io.BufferedReader,
digest,
/,
*,
_bufsize: int = DEFAULT_FILE_CHUNK_SIZE,
) -> hashlib._Hash:
"""
Basically a simpified copy from 3.11's hashlib.file_digest.
"""
digestobj = hashlib.new(digest)

buf = bytearray(_bufsize) # Reusable buffer to reduce allocations.
view = memoryview(buf)
while True:
size = fileobj.readinto(buf)
if size == 0:
break # EOF
digestobj.update(view[:size])

return digestobj


def cal_file_digest(
fpath: str | Path,
algorithm: str,
chunk_size: int = DEFAULT_FILE_CHUNK_SIZE,
) -> str:
"""Generate file digest with <algorithm>.

A wrapper for the _file_digest method.
"""
with open(fpath, "rb") as f:
return _file_digest(f, algorithm, _bufsize=chunk_size).hexdigest()
53 changes: 21 additions & 32 deletions src/otaclient_iot_logging_server/config_file_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,59 +23,48 @@

from __future__ import annotations

import atexit
import logging
import os
import signal
import threading
import time
from os import stat_result
from pathlib import Path
from typing import NamedTuple, NoReturn
from functools import partial

from otaclient_iot_logging_server._utils import cal_file_digest

logger = logging.getLogger(__name__)

_CHECK_INTERVAL = 3 # second
_global_shutdown = False

monitored_config_files: set[str] = set()
_monitored_files_stat: dict[str, _MCTime] = {}

def _python_exit():
global _global_shutdown
_global_shutdown = True

class _MCTime(NamedTuple):
mtime: int
ctime: int

def file_changed(self, new_mctime: _MCTime) -> bool:
# if create time is newer in <new_mctime>, it means the file is recreated.
# if modified time is newer in <new_mctime>, it means the file is modified.
return self.ctime < new_mctime.ctime or self.mtime < new_mctime.mtime
atexit.register(_python_exit)

@classmethod
def from_stat(cls, stat: stat_result) -> _MCTime:
return cls(int(stat.st_mtime), int(stat.st_ctime))
_CHECK_INTERVAL = 3 # second

file_digest_sha256 = partial(cal_file_digest, algorithm="sha256")

def _config_file_monitor() -> NoReturn:
monitored_config_files: set[str] = set()
_monitored_files_hash: dict[str, str] = {}


def _config_file_monitor() -> None:
# initialize, record the original status
logger.info(f"start to monitor the changes of {monitored_config_files}")
while True:
while not _global_shutdown:
for entry in monitored_config_files:
try:
f_stat = Path(entry).stat()
except Exception as e:
logger.debug(f"cannot query stat from {entry}, skip: {e!r}")
continue

new_f_mctime = _MCTime.from_stat(f_stat)
if entry not in _monitored_files_stat:
_monitored_files_stat[entry] = new_f_mctime
continue

f_mctime = _monitored_files_stat[entry]
if f_mctime.file_changed(new_f_mctime):
_f_digest = file_digest_sha256(entry)
_saved_digest = _monitored_files_hash.setdefault(entry, _f_digest)

if _f_digest != _saved_digest:
logger.warning(f"detect change on config file {entry}, exit")
# NOTE: sys.exit is not working in thread
os.kill(os.getpid(), signal.SIGINT)

time.sleep(_CHECK_INTERVAL)


Expand Down
13 changes: 7 additions & 6 deletions src/otaclient_iot_logging_server/ecu_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import yaml
from pydantic import BaseModel, ConfigDict, Field, IPvAnyAddress

from otaclient_iot_logging_server.config_file_monitor import monitored_config_files
from otaclient_iot_logging_server.configs import server_cfg

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -70,6 +67,10 @@ def parse_ecu_info(ecu_info_file: Path | str) -> Optional[ECUInfo]:
logger.info(f"{ecu_info_file=} is invalid or missing: {e!r}")


ecu_info = parse_ecu_info(server_cfg.ECU_INFO_YAML)
if ecu_info:
monitored_config_files.add(server_cfg.ECU_INFO_YAML)
# NOTE(20241113): since ecu_info.yaml file will still be modified regularly,
# temporary disable the logging filter with ecu_info.yaml.
# ecu_info = parse_ecu_info(server_cfg.ECU_INFO_YAML)
# if ecu_info:
# monitored_config_files.add(server_cfg.ECU_INFO_YAML)
logger.warning("disable logging filtering for now")
ecu_info = None