diff --git a/docker/Dockerfile b/docker/Dockerfile index bbbd9a2af..783a306f4 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -148,12 +148,6 @@ RUN \ && useradd --uid 911 --user-group --create-home abc \ && mkdir -p /home/abc/bin /segments -VOLUME /config -VOLUME /recordings -VOLUME /segments -VOLUME /snapshots -VOLUME /thumbnails - ENTRYPOINT ["/init"] COPY docker/ffprobe_wrapper /home/abc/bin/ffprobe diff --git a/rootfs/etc/services.d/viseron/finish b/rootfs/etc/services.d/viseron/finish index 4cbd04f2b..342568ddf 100644 --- a/rootfs/etc/services.d/viseron/finish +++ b/rootfs/etc/services.d/viseron/finish @@ -4,7 +4,11 @@ define VISERON_RESTART_EXIT_CODE 100 define SIGNAL_EXIT_CODE 256 define SIGTERM 15 +# Log the exit code and time foreground { s6-echo "[viseron-finish] Viseron exit code ${1}" } +backtick -D "unknown time" date { /bin/date } +importas -i date date +foreground { s6-echo "[viseron-finish] Shutdown completed at ${date}" } # Exit without stopping the supervisor so the Viseron service restarts on its own if { s6-test ${1} -ne ${VISERON_RESTART_EXIT_CODE} } diff --git a/tests/components/codeprojectai/test_object_detector.py b/tests/components/codeprojectai/test_object_detector.py index 74a47f12c..0af1847e0 100644 --- a/tests/components/codeprojectai/test_object_detector.py +++ b/tests/components/codeprojectai/test_object_detector.py @@ -87,7 +87,7 @@ def test_object_detector_init(vis: MockViseron, config): """ _ = MockComponent(COMPONENT, vis) _ = MockCamera(vis, identifier=CAMERA_IDENTIFIER) - with patch("codeprojectai.core.CodeProjectAIObject"): + with patch("viseron.components.codeprojectai.object_detector.CodeProjectAIObject"): detector = ObjectDetector(vis, config["codeprojectai"], CAMERA_IDENTIFIER) assert detector._image_resolution == ( # pylint: disable=protected-access 640, @@ -108,7 +108,7 @@ def test_preprocess(vis: Viseron, config): """ _ = MockComponent(COMPONENT, vis) _ = MockCamera(vis, identifier=CAMERA_IDENTIFIER) - with patch("codeprojectai.core.CodeProjectAIObject"): + with patch("viseron.components.codeprojectai.object_detector.CodeProjectAIObject"): detector = ObjectDetector(vis, config["codeprojectai"], CAMERA_IDENTIFIER) frame = np.zeros((480, 640, 3), dtype=np.uint8) processed = detector.preprocess(frame) @@ -125,7 +125,7 @@ def test_postprocess(vis: Viseron, config): """ _ = MockComponent(COMPONENT, vis) _ = MockCamera(vis, identifier=CAMERA_IDENTIFIER) - with patch("codeprojectai.core.CodeProjectAIObject"): + with patch("viseron.components.codeprojectai.object_detector.CodeProjectAIObject"): detector = ObjectDetector(vis, config["codeprojectai"], CAMERA_IDENTIFIER) detections = [ { @@ -142,7 +142,7 @@ def test_postprocess(vis: Viseron, config): assert isinstance(objects[0], DetectedObject) -@patch("codeprojectai.core.CodeProjectAIObject.detect") +@patch("viseron.components.codeprojectai.object_detector.CodeProjectAIObject.detect") def test_return_objects_success(mock_detect, vis: Viseron, config): """ Test the return_objects method of the ObjectDetector class for successful detection. @@ -203,7 +203,7 @@ def test_object_detector_init_no_image_size(vis: Viseron, config, mock_detected_ config (dict): The configuration dictionary. mock_detected_object (MagicMock): Mocked DetectedObject class. """ - with patch("codeprojectai.core.CodeProjectAIObject"): + with patch("viseron.components.codeprojectai.object_detector.CodeProjectAIObject"): # Set non-square image resolution config["codeprojectai"]["object_detector"]["image_size"] = None @@ -255,7 +255,7 @@ def test_postprocess_square_resolution(vis: Viseron, config, mock_detected_objec config (dict): The configuration dictionary. mock_detected_object (MagicMock): Mocked DetectedObject class. """ - with patch("codeprojectai.core.CodeProjectAIObject"): + with patch("viseron.components.codeprojectai.object_detector.CodeProjectAIObject"): # Set square image resolution config["codeprojectai"]["object_detector"]["image_size"] = 640 diff --git a/tests/components/ffmpeg/test_stream.py b/tests/components/ffmpeg/test_stream.py index 4ec3a6847..d8e135d01 100644 --- a/tests/components/ffmpeg/test_stream.py +++ b/tests/components/ffmpeg/test_stream.py @@ -31,6 +31,7 @@ DEFAULT_CODEC, DEFAULT_FPS, DEFAULT_HEIGHT, + DEFAULT_PASSWORD, DEFAULT_PROTOCOL, DEFAULT_RECORDER_AUDIO_CODEC, DEFAULT_STREAM_FORMAT, @@ -48,7 +49,7 @@ from tests.common import MockCamera, return_any -CONFIG = { +CONFIG: dict[str, Any] = { CONFIG_HOST: "test_host", CONFIG_PORT: 1234, CONFIG_PATH: "/", @@ -211,22 +212,31 @@ def test_get_encoder_audio_codec( stream.get_encoder_audio_codec(stream_audio_codec) == expected_audio_cmd ) - def test_get_stream_url(self) -> None: + @pytest.mark.parametrize( + "username, password, expected_url", + [ + ( + "test_username", + "test_password", + "rtsp://test_username:test_password@test_host:1234/", + ), + ("admin", "", "rtsp://admin:@test_host:1234/"), + (DEFAULT_USERNAME, DEFAULT_PASSWORD, "rtsp://test_host:1234/"), + ], + ) + def test_get_stream_url(self, username, password, expected_url) -> None: """Test that the correct stream url is returned.""" mocked_camera = MockCamera(identifier="test_camera_identifier") + config = dict(CONFIG) + config[CONFIG_USERNAME] = username + config[CONFIG_PASSWORD] = password + with patch.object( Stream, "__init__", MagicMock(spec=Stream, return_value=None) ): stream = Stream(CONFIG, mocked_camera, "test_camera_identifier") - stream._config = CONFIG # pylint: disable=protected-access - assert ( - stream.get_stream_url(CONFIG) - == "rtsp://test_username:test_password@test_host:1234/" - ) - stream._config[ # pylint: disable=protected-access - CONFIG_USERNAME - ] = DEFAULT_USERNAME - assert stream.get_stream_url(CONFIG) == "rtsp://test_host:1234/" + stream._config = config # pylint: disable=protected-access + assert stream.get_stream_url(config) == expected_url def test_get_stream_information(self): """Test that the correct stream information is returned.""" diff --git a/tests/components/storage/test__init__.py b/tests/components/storage/test__init__.py index db79707bd..e2f508309 100644 --- a/tests/components/storage/test__init__.py +++ b/tests/components/storage/test__init__.py @@ -6,7 +6,7 @@ import tempfile from pathlib import Path from typing import TYPE_CHECKING -from unittest.mock import MagicMock, Mock +from unittest.mock import MagicMock, Mock, patch import pytest @@ -133,7 +133,8 @@ class TestStorage: def setup_method(self, vis: Viseron) -> None: """Set up the test.""" - self._storage = Storage(vis, MagicMock()) + with patch("viseron.components.storage.CleanupManager"): + self._storage = Storage(vis, MagicMock()) def test_search_file(self) -> None: """Test the search_file method.""" diff --git a/tests/conftest.py b/tests/conftest.py index ea78e0903..692b2bb9f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,8 +28,10 @@ class MockViseron(Viseron): def __init__(self) -> None: super().__init__() - self.register_domain = Mock(side_effect=self.register_domain) # type: ignore - self.mocked_register_domain = self.register_domain # type: ignore + self.register_domain = Mock( # type: ignore[method-assign] + side_effect=self.register_domain, + ) + self.mocked_register_domain = self.register_domain @pytest.fixture diff --git a/viseron/__init__.py b/viseron/__init__.py index c0c917708..d440f8807 100644 --- a/viseron/__init__.py +++ b/viseron/__init__.py @@ -69,6 +69,7 @@ ) from viseron.states import States from viseron.types import SupportedDomains +from viseron.watchdog.process_watchdog import ProcessWatchDog from viseron.watchdog.subprocess_watchdog import SubprocessWatchDog from viseron.watchdog.thread_watchdog import ThreadWatchDog @@ -190,8 +191,7 @@ def setup_viseron() -> Viseron: else: vis.critical_components_config_store.save(config) - end = timer() - LOGGER.info("Viseron initialized in %.1f seconds", end - start) + LOGGER.info("Viseron initialized in %.1f seconds", timer() - start) return vis @@ -218,10 +218,11 @@ def __init__(self) -> None: self._domain_register_lock = threading.Lock() self.data[REGISTERED_DOMAINS] = {} - self._thread_watchdog = ThreadWatchDog() - self._subprocess_watchdog = SubprocessWatchDog() self.background_scheduler = BackgroundScheduler(timezone="UTC", daemon=True) self.background_scheduler.start() + self._thread_watchdog = ThreadWatchDog(self) + self._subprocess_watchdog = SubprocessWatchDog(self) + self._process_watchdog = ProcessWatchDog(self) self.storage: Storage | None = None @@ -265,7 +266,7 @@ def register_signal_handler(self, viseron_signal, callback): return False return self.data[DATA_STREAM_COMPONENT].subscribe_data( - f"viseron/signal/{viseron_signal}", callback + VISERON_SIGNALS[viseron_signal], callback, stage=viseron_signal ) def listen_event(self, event: str, callback, ioloop=None) -> Callable[[], None]: @@ -482,15 +483,14 @@ def get_registered_identifiers(self, domain: SupportedDomains): def shutdown(self) -> None: """Shut down Viseron.""" + start = timer() LOGGER.info("Initiating shutdown") if self.data.get(DATA_STREAM_COMPONENT, None): data_stream: DataStream = self.data[DATA_STREAM_COMPONENT] - self._thread_watchdog.stop() - self._subprocess_watchdog.stop() try: - self.background_scheduler.shutdown() + self.background_scheduler.shutdown(wait=False) except SchedulerNotRunningError as err: LOGGER.warning(f"Failed to shutdown scheduler: {err}") @@ -504,7 +504,7 @@ def shutdown(self) -> None: self, data_stream, VISERON_SIGNAL_STOPPING ) - LOGGER.info("Shutdown complete") + LOGGER.info("Shutdown complete in %.1f seconds", timer() - start) def add_entity(self, component: str, entity: Entity): """Add entity to states registry.""" @@ -543,9 +543,10 @@ def wait_for_threads_and_processes_to_exit( stage: Literal["shutdown", "last_write", "stopping"], ) -> None: """Wait for all threads and processes to exit.""" + LOGGER.debug(f"Sending signal for stage {stage}") vis.shutdown_stage = stage data_stream.publish_data(VISERON_SIGNALS[stage]) - + time.sleep(0.1) # Wait for signal to be processed LOGGER.debug(f"Waiting for threads and processes to exit in stage {stage}") def join( @@ -553,8 +554,20 @@ def join( | multiprocessing.Process | multiprocessing.process.BaseProcess, ) -> None: - thread_or_process.join(timeout=10) - time.sleep(0.5) # Wait for process to exit properly + start_time = time.time() + LOGGER.debug(f"Waiting for {thread_or_process.name} to exit") + try: + thread_or_process.join(timeout=5) + except RuntimeError: + LOGGER.debug(f"Failed to join {thread_or_process.name}") + time.sleep(0.1) + thread_or_process.join(timeout=5) + LOGGER.debug( + f"Finished waiting for {thread_or_process.name} " + f"after {time.time() - start_time:.2f}s" + ) + + time.sleep(0.1) # Wait for process to exit properly if thread_or_process.is_alive(): LOGGER.error(f"{thread_or_process.name} did not exit in time") if isinstance(thread_or_process, multiprocessing.Process): diff --git a/viseron/__main__.py b/viseron/__main__.py index b1f18a412..dc4f225b5 100644 --- a/viseron/__main__.py +++ b/viseron/__main__.py @@ -1,8 +1,12 @@ """Start Viseron.""" from __future__ import annotations +import multiprocessing as mp +import os import signal import sys +import threading +from threading import Timer from viseron import Viseron, setup_viseron @@ -15,6 +19,16 @@ def signal_term(*_) -> None: if viseron: viseron.shutdown() + def shutdown_failed(): + print("Shutdown failed. Exiting forcefully.") + print(f"Active threads: {threading.enumerate()}") + print(f"Active processes: {mp.active_children()}") + os.kill(os.getpid(), signal.SIGKILL) + + shutdown_timer = Timer(2, shutdown_failed, args=()) + shutdown_timer.daemon = True + shutdown_timer.start() + # Listen to signals signal.signal(signal.SIGTERM, signal_term) signal.signal(signal.SIGINT, signal_term) diff --git a/viseron/components/codeprojectai/object_detector.py b/viseron/components/codeprojectai/object_detector.py index ec41b70de..256e58274 100644 --- a/viseron/components/codeprojectai/object_detector.py +++ b/viseron/components/codeprojectai/object_detector.py @@ -39,7 +39,7 @@ def __init__(self, vis: Viseron, config, camera_identifier) -> None: ) self._ds_config = config - self._detector = cpai.CodeProjectAIObject( + self._detector = CodeProjectAIObject( ip=config[CONFIG_HOST], port=config[CONFIG_PORT], timeout=config[CONFIG_TIMEOUT], @@ -109,3 +109,21 @@ def return_objects(self, frame): return [] return self.postprocess(detections) + + +class CodeProjectAIObject(cpai.CodeProjectAIObject): + """CodeProject.AI object detection.""" + + def __init__(self, ip, port, timeout, min_confidence, custom_model): + super().__init__(ip, port, timeout, min_confidence, custom_model) + + def detect(self, image_bytes: bytes): + """Process image_bytes and detect.""" + response = cpai.process_image( + url=self._url_detect, + image_bytes=image_bytes, + min_confidence=self.min_confidence, + timeout=self.timeout, + ) + LOGGER.debug("CodeProject.AI response: %s", response) + return response["predictions"] diff --git a/viseron/components/darknet/__init__.py b/viseron/components/darknet/__init__.py index fc673eb63..8bfda444d 100644 --- a/viseron/components/darknet/__init__.py +++ b/viseron/components/darknet/__init__.py @@ -7,10 +7,11 @@ import os import pwd from abc import ABC, abstractmethod -from queue import Queue +from queue import Empty, Queue from typing import Any import cv2 +import numpy as np import voluptuous as vol from viseron import Viseron @@ -272,7 +273,7 @@ def spawn_subprocess(self) -> RestartablePopen: stderr=self._log_pipe, ) - def preprocess(self, frame): + def preprocess(self, frame) -> np.ndarray: """Pre process frame before detection.""" return cv2.resize( frame, @@ -421,11 +422,17 @@ def work_output(self, item) -> None: """Put result into queue.""" pop_if_full(self._result_queues[item["camera_identifier"]], item) - def preprocess(self, frame): + def preprocess(self, frame) -> bytes: """Pre process frame before detection.""" return letterbox_resize(frame, self.model_width, self.model_height).tobytes() - def detect(self, frame, camera_identifier, result_queue, min_confidence): + def detect( + self, + frame: np.ndarray, + camera_identifier: str, + result_queue, + min_confidence: float, + ): """Perform detection.""" self._result_queues[camera_identifier] = result_queue pop_if_full( @@ -436,7 +443,10 @@ def detect(self, frame, camera_identifier, result_queue, min_confidence): "min_confidence": min_confidence, }, ) - item = result_queue.get() + try: + item = result_queue.get(timeout=3) + except Empty: + return None return item["result"] def post_process(self, detections, camera_resolution): diff --git a/viseron/components/darknet/object_detector.py b/viseron/components/darknet/object_detector.py index 3919e914b..4c28ee882 100644 --- a/viseron/components/darknet/object_detector.py +++ b/viseron/components/darknet/object_detector.py @@ -13,6 +13,7 @@ if TYPE_CHECKING: from viseron import Viseron from viseron.components.darknet import BaseDarknet + from viseron.domains.camera.shared_frames import SharedFrame from viseron.domains.object_detector.detected_object import DetectedObject @@ -36,11 +37,11 @@ def __init__(self, vis: Viseron, config, camera_identifier) -> None: vis.register_domain(DOMAIN, camera_identifier, self) - def preprocess(self, frame): + def preprocess(self, frame: SharedFrame): """Return preprocessed frame before performing object detection.""" return self._darknet.preprocess(frame) - def return_objects(self, frame) -> list[DetectedObject]: + def return_objects(self, frame: SharedFrame) -> list[DetectedObject] | None: """Perform object detection.""" detections = self._darknet.detect( frame, @@ -48,6 +49,8 @@ def return_objects(self, frame) -> list[DetectedObject]: self._object_result_queue, self.min_confidence, ) + if detections is None: + return None return self._darknet.post_process(detections, self._camera.resolution) @property diff --git a/viseron/components/data_stream/__init__.py b/viseron/components/data_stream/__init__.py index f7b66d76d..fb9cfa58f 100644 --- a/viseron/components/data_stream/__init__.py +++ b/viseron/components/data_stream/__init__.py @@ -28,6 +28,7 @@ class DataSubscriber(TypedDict): callback: Callable | Queue | tornado_queue ioloop: IOLoop | None + stage: str | None class Subscribe(TypedDict): @@ -105,7 +106,10 @@ def publish_data(data_topic: str, data: Any = None) -> None: @staticmethod def subscribe_data( - data_topic: str, callback: Callable | Queue | tornado_queue, ioloop=None + data_topic: str, + callback: Callable | Queue | tornado_queue, + ioloop=None, + stage=None, ) -> uuid.UUID: """Subscribe to data on a topic. @@ -120,12 +124,14 @@ def subscribe_data( ] = DataSubscriber( callback=callback, ioloop=ioloop, + stage=stage, ) return unique_id DataStream._subscribers.setdefault(data_topic, {})[unique_id] = DataSubscriber( callback=callback, ioloop=ioloop, + stage=stage, ) return unique_id @@ -147,16 +153,24 @@ def run_callbacks( """Run callbacks or put to queues.""" for callback in callbacks.copy().values(): if callable(callback["callback"]) and callback["ioloop"] is None: + name = f"data_stream.callback.{callback['callback']}" + daemon = bool(callback["stage"] is None) if data: - thread = threading.Thread( + thread = RestartableThread( + name=name, target=callback["callback"], args=(data,), - daemon=True, + daemon=daemon, + register=False, + stage=callback["stage"], ) else: - thread = threading.Thread( + thread = RestartableThread( + name=name, target=callback["callback"], - daemon=True, + daemon=daemon, + register=False, + stage=callback["stage"], ) while True: diff --git a/viseron/components/ffmpeg/camera.py b/viseron/components/ffmpeg/camera.py index 18ec85aa3..868274890 100644 --- a/viseron/components/ffmpeg/camera.py +++ b/viseron/components/ffmpeg/camera.py @@ -11,11 +11,11 @@ from viseron import Viseron from viseron.const import ENV_CUDA_SUPPORTED, ENV_VAAPI_SUPPORTED -from viseron.domains.camera import ( +from viseron.domains.camera import AbstractCamera +from viseron.domains.camera.config import ( BASE_CONFIG_SCHEMA as BASE_CAMERA_CONFIG_SCHEMA, DEFAULT_RECORDER, RECORDER_SCHEMA as BASE_RECORDER_SCHEMA, - AbstractCamera, ) from viseron.domains.camera.const import DOMAIN from viseron.exceptions import DomainNotReady, FFprobeError, FFprobeTimeout diff --git a/viseron/components/ffmpeg/stream.py b/viseron/components/ffmpeg/stream.py index 235ea84f0..3d8d7bd6f 100644 --- a/viseron/components/ffmpeg/stream.py +++ b/viseron/components/ffmpeg/stream.py @@ -206,14 +206,11 @@ def output_fps(self, fps) -> None: def get_stream_url(self, stream_config: dict[str, Any]) -> str: """Return stream url.""" + username = self._config[CONFIG_USERNAME] + password = self._config[CONFIG_PASSWORD] auth = "" - if self._config[CONFIG_USERNAME] and self._config[CONFIG_PASSWORD]: - auth = ( - f"{self._config[CONFIG_USERNAME]}" - ":" - f"{escape_string(self._config[CONFIG_PASSWORD])}" - "@" - ) + if username is not None and password is not None: + auth = f"{username}:{escape_string(password)}@" protocol = ( stream_config[CONFIG_PROTOCOL] @@ -435,8 +432,8 @@ def build_segment_command(self): Only used when a substream is configured. """ - if self._config[CONFIG_SUBSTREAM][CONFIG_RAW_COMMAND]: - return self._config[CONFIG_SUBSTREAM][CONFIG_RAW_COMMAND].split(" ") + if self._config[CONFIG_RAW_COMMAND]: + return self._config[CONFIG_RAW_COMMAND].split(" ") stream_input_command = self.stream_command( self._config, self._mainstream.codec, self._mainstream.url @@ -452,10 +449,9 @@ def build_segment_command(self): def build_command(self): """Return full FFmpeg command.""" - if self._config[CONFIG_RAW_COMMAND]: - return self._config[CONFIG_RAW_COMMAND].split(" ") - if self._substream: + if self._config[CONFIG_SUBSTREAM][CONFIG_RAW_COMMAND]: + return self._config[CONFIG_SUBSTREAM][CONFIG_RAW_COMMAND].split(" ") stream_input_command = self.stream_command( self._substream.config, self._substream.codec, @@ -463,6 +459,8 @@ def build_command(self): ) camera_segment_args = [] else: + if self._config[CONFIG_RAW_COMMAND]: + return self._config[CONFIG_RAW_COMMAND].split(" ") stream_input_command = self.stream_command( self._mainstream.config, self._mainstream.codec, diff --git a/viseron/components/gstreamer/camera.py b/viseron/components/gstreamer/camera.py index b55380122..c17ed702a 100644 --- a/viseron/components/gstreamer/camera.py +++ b/viseron/components/gstreamer/camera.py @@ -34,11 +34,11 @@ DESC_RECORDER_OUTPUT_ARGS, DESC_RECORDER_VIDEO_FILTERS, ) -from viseron.domains.camera import ( +from viseron.domains.camera import AbstractCamera +from viseron.domains.camera.config import ( BASE_CONFIG_SCHEMA as BASE_CAMERA_CONFIG_SCHEMA, DEFAULT_RECORDER, RECORDER_SCHEMA as BASE_RECORDER_SCHEMA, - AbstractCamera, ) from viseron.domains.camera.const import DOMAIN from viseron.exceptions import DomainNotReady, FFprobeError, FFprobeTimeout diff --git a/viseron/components/mqtt/__init__.py b/viseron/components/mqtt/__init__.py index 195f7e701..cb44fbb3d 100644 --- a/viseron/components/mqtt/__init__.py +++ b/viseron/components/mqtt/__init__.py @@ -250,10 +250,12 @@ def on_message(self, _client, _userdata, msg) -> None: ) for callback in self._subscriptions[msg.topic]: # Run callback in thread to not block the message queue - threading.Thread( + RestartableThread( + name=f"mqtt_callback.{callback}", target=callback, args=(msg,), daemon=True, + register=False, ).start() def connect(self) -> None: diff --git a/viseron/components/nvr/const.py b/viseron/components/nvr/const.py index d477f74ee..a847ebe64 100644 --- a/viseron/components/nvr/const.py +++ b/viseron/components/nvr/const.py @@ -11,6 +11,8 @@ NO_DETECTOR: Final = "no_detector" NO_DETECTOR_FPS: Final = 1 +SCANNER_RESULT_RETRIES: Final = 5 + # Data stream topic constants DATA_PROCESSED_FRAME_TOPIC = "{camera_identifier}/nvr/processed_frame" diff --git a/viseron/components/nvr/nvr.py b/viseron/components/nvr/nvr.py index 2b4de2edd..16ac0072c 100644 --- a/viseron/components/nvr/nvr.py +++ b/viseron/components/nvr/nvr.py @@ -6,6 +6,7 @@ """ from __future__ import annotations +import datetime import logging import threading import time @@ -31,6 +32,7 @@ from viseron.domains.object_detector.detected_object import DetectedObject from viseron.events import EventData from viseron.exceptions import DomainNotRegisteredError +from viseron.helpers import utcnow from viseron.watchdog.thread_watchdog import RestartableThread from .const import ( @@ -43,6 +45,7 @@ NO_DETECTOR, NO_DETECTOR_FPS, OBJECT_DETECTOR, + SCANNER_RESULT_RETRIES, ) from .sensor import OperationStateSensor @@ -227,7 +230,8 @@ def __init__( self._trigger_type: TriggerTypes | None = None self._start_recorder = False - self._idle_frames = 0 + self._stop_recorder_at: datetime.datetime | None = None + self._seconds_left = 0 self._kill_received = False self._data_stream: DataStream = vis.data[DATA_STREAM_COMPONENT] self._removal_timers: list[threading.Timer] = [] @@ -399,19 +403,20 @@ def scanner_results(self) -> None: self._frame_scanner_errors = [] for name, frame_scanner in self._current_frame_scanners.items(): frame_scanner.scan_error = False - try: - # Wait for scanner to return. - # We dont care about the result since its referenced directly - # from the scanner instead of storing it locally - frame_scanner.result_queue.get( - timeout=3, - ) - except Empty: # Make sure we dont wait forever - if self._kill_received: - return - self._logger.error(f"Failed to retrieve result for {name}") - frame_scanner.scan_error = True - self._frame_scanner_errors.append(name) + retry_count = 0 + while retry_count < SCANNER_RESULT_RETRIES and not self._kill_received: + try: + # Wait for scanner to return. + # We dont care about the result since its referenced directly + # from the scanner instead of storing it locally + frame_scanner.result_queue.get(timeout=1) + break + except Empty: # Make sure we dont wait forever + retry_count += 1 + if retry_count == SCANNER_RESULT_RETRIES: + self._logger.error(f"Failed to retrieve result for {name}") + frame_scanner.scan_error = True + self._frame_scanner_errors.append(name) def event_over_check_motion( self, obj: DetectedObject, object_filters: dict[str, Filter] @@ -586,7 +591,7 @@ def start_recorder( self, shared_frame: SharedFrame, trigger_type: TriggerTypes ) -> None: """Start recorder.""" - self._idle_frames = 0 + self._stop_recorder_at = None self._camera.start_recorder( shared_frame, self._object_detector.objects_in_fov if self._object_detector else None, @@ -605,26 +610,28 @@ def stop_recorder(self, force=False) -> None: """Stop recorder.""" def _stop(): - self._idle_frames = 0 + self._stop_recorder_at = None + self._seconds_left = 0 self._camera.stop_recorder() if force: _stop() return - if self._idle_frames % self._camera.output_fps == 0: - self._logger.info( - "Stopping recording in: {}".format( - int( - self._camera.recorder.idle_timeout - - (self._idle_frames / self._camera.output_fps) - ) - ) + if not self._stop_recorder_at: + self._stop_recorder_at = utcnow() + datetime.timedelta( + seconds=self._camera.recorder.idle_timeout ) - if self._idle_frames >= ( - self._camera.output_fps * self._camera.recorder.idle_timeout - ): + if self._stop_recorder_at: + seconds_left = max( + round((self._stop_recorder_at - utcnow()).total_seconds()), 0 + ) + if seconds_left != self._seconds_left: + self._logger.info(f"Stopping recording in: {seconds_left}s") + self._seconds_left = seconds_left + + if utcnow() > self._stop_recorder_at: if ( self._motion_detector and self._object_detector @@ -656,19 +663,28 @@ def process_recorder(self, shared_frame: SharedFrame) -> None: self._logger.info("Max recording time exceeded, stopping recorder") self.stop_recorder(force=True) elif self._camera.is_recording and self.event_over(): - self._idle_frames += 1 self.stop_recorder() else: - self._idle_frames = 0 + self._stop_recorder_at = None - def remove_frame(self, shared_frame) -> None: + def remove_frame(self, shared_frame: SharedFrame) -> None: """Remove frame after a delay. This makes sure all frames are cleaned up eventually. """ + + def _remove(): + self._camera.shared_frames.remove(shared_frame, self._camera) + self._removal_timers.remove(timer) + timer = threading.Timer( - 2, self._camera.shared_frames.remove, args=(shared_frame, self._camera) + 2, + _remove, + args=(), ) + timer.name = f"{str(self)}.remove_frame.{shared_frame.name}" + timer.daemon = True + self._removal_timers.append(timer) timer.start() def run(self) -> None: diff --git a/viseron/components/storage/__init__.py b/viseron/components/storage/__init__.py index ed721a5d0..e963d77b7 100644 --- a/viseron/components/storage/__init__.py +++ b/viseron/components/storage/__init__.py @@ -6,7 +6,7 @@ import os import pathlib from collections.abc import Callable -from typing import TYPE_CHECKING, Any, Literal, TypedDict +from typing import TYPE_CHECKING, Any, TypedDict import voluptuous as vol from alembic import command, script @@ -34,6 +34,7 @@ DEFAULT_COMPONENT, DESC_COMPONENT, ) +from viseron.components.storage.jobs import CleanupManager from viseron.components.storage.models import Base, Motion, Recordings from viseron.components.storage.tier_handler import ( RecordingsTierHandler, @@ -52,6 +53,7 @@ from viseron.domains.camera.const import CONFIG_STORAGE, DOMAIN as CAMERA_DOMAIN from viseron.helpers import utcnow from viseron.helpers.logs import StreamToLogger +from viseron.types import SnapshotDomain if TYPE_CHECKING: from viseron import Event, Viseron @@ -179,6 +181,9 @@ def __init__(self, vis: Viseron, config: dict[str, Any]) -> None: self.engine: Engine | None = None self._get_session: Callable[[], Session] | None = None + self._cleanup_manager = CleanupManager(vis, self) + self._cleanup_manager.start() + @property def camera_tier_handlers(self): """Return camera tier handlers.""" @@ -250,12 +255,22 @@ def create_database(self) -> None: self._run_migrations() self._get_session = scoped_session(sessionmaker(bind=self.engine, future=True)) + self._get_session_expire = scoped_session( + sessionmaker(bind=self.engine, future=True, expire_on_commit=True) + ) startup_chores(self._get_session) - def get_session(self) -> Session: - """Get a new sqlalchemy session.""" - if self._get_session is None: + def get_session(self, expire_on_commit: bool = False) -> Session: + """Get a new sqlalchemy session. + + Args: + expire_on_commit: Whether to expire objects when committing. + """ + if self._get_session is None or self._get_session_expire is None: raise RuntimeError("The database connection has not been established") + + if expire_on_commit: + return self._get_session_expire() return self._get_session() def get_recordings_path(self, camera: AbstractCamera) -> str: @@ -298,15 +313,14 @@ def get_thumbnails_path(self, camera: AbstractCamera) -> str: def get_snapshots_path( self, camera: AbstractCamera, - domain: Literal["object_detector"] - | Literal["face_recognition"] - | Literal["license_plate_recognition"] - | Literal["motion_detector"], + domain: SnapshotDomain, ) -> str: """Get snapshots path for camera.""" self.create_tier_handlers(camera) return get_snapshots_path( - self._camera_tier_handlers[camera.identifier]["snapshots"][0][domain].tier, + self._camera_tier_handlers[camera.identifier]["snapshots"][0][ + domain.value + ].tier, camera, domain, ) diff --git a/viseron/components/storage/alembic/versions/19e8b884f943_add_indexes_for_cleanup_jobs.py b/viseron/components/storage/alembic/versions/19e8b884f943_add_indexes_for_cleanup_jobs.py new file mode 100644 index 000000000..86c25ff7b --- /dev/null +++ b/viseron/components/storage/alembic/versions/19e8b884f943_add_indexes_for_cleanup_jobs.py @@ -0,0 +1,56 @@ +# pylint: disable=invalid-name +"""Add indexes for cleanup jobs. + +Revision ID: 19e8b884f943 +Revises: 31851b6eb50c +Create Date: 2024-11-12 11:04:52.728159 + +""" +from __future__ import annotations + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str | None = "19e8b884f943" +down_revision: str | None = "31851b6eb50c" +branch_labels: str | None = None +depends_on: str | None = None + + +def upgrade() -> None: + """Run the upgrade migrations.""" + op.create_index("idx_files_camera_id", "files", ["camera_identifier"], unique=False) + op.create_index("idx_files_path", "files", ["path"], unique=False) + op.create_index( + "idx_files_meta_orig_ctime", "files_meta", ["orig_ctime"], unique=False + ) + op.create_index("idx_files_meta_path", "files_meta", ["path"], unique=False) + op.create_index("idx_motion_snapshot", "motion", ["snapshot_path"], unique=False) + op.create_index("idx_objects_snapshot", "objects", ["snapshot_path"], unique=False) + op.create_index( + "idx_ppr_snapshot", "post_processor_results", ["snapshot_path"], unique=False + ) + op.create_index( + "idx_recordings_camera_times", + "recordings", + ["camera_identifier", "start_time", "end_time"], + unique=False, + ) + op.create_index("idx_recordings_clip", "recordings", ["clip_path"], unique=False) + op.create_index( + "idx_recordings_thumbnail", "recordings", ["thumbnail_path"], unique=False + ) + + +def downgrade() -> None: + """Run the downgrade migrations.""" + op.drop_index("idx_recordings_thumbnail", table_name="recordings") + op.drop_index("idx_recordings_clip", table_name="recordings") + op.drop_index("idx_recordings_camera_times", table_name="recordings") + op.drop_index("idx_ppr_snapshot", table_name="post_processor_results") + op.drop_index("idx_objects_snapshot", table_name="objects") + op.drop_index("idx_motion_snapshot", table_name="motion") + op.drop_index("idx_files_meta_path", table_name="files_meta") + op.drop_index("idx_files_meta_orig_ctime", table_name="files_meta") + op.drop_index("idx_files_path", table_name="files") + op.drop_index("idx_files_camera_id", table_name="files") diff --git a/viseron/components/storage/jobs.py b/viseron/components/storage/jobs.py new file mode 100644 index 000000000..fd44da5f3 --- /dev/null +++ b/viseron/components/storage/jobs.py @@ -0,0 +1,785 @@ +"""Cleanup jobs for removing orphaned files and database records.""" +from __future__ import annotations + +import datetime +import logging +import multiprocessing as mp +import os +import time +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +import setproctitle +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger +from sqlalchemy import and_, delete, exists, func, select + +from viseron.components.storage.models import ( + Events, + Files, + FilesMeta, + Motion, + Objects, + PostProcessorResults, + Recordings, +) +from viseron.const import VISERON_SIGNAL_SHUTDOWN +from viseron.domains.camera.const import DOMAIN as CAMERA_DOMAIN +from viseron.exceptions import DomainNotRegisteredError +from viseron.helpers import utcnow +from viseron.types import SnapshotDomain +from viseron.watchdog.process_watchdog import RestartableProcess + +if TYPE_CHECKING: + from viseron import Viseron + from viseron.components.storage import Storage + from viseron.domains.camera import AbstractCamera + +LOGGER = logging.getLogger(__name__) + +BATCH_SIZE = 100 + + +class BaseCleanupJob(ABC): + """Base class for cleanup jobs.""" + + def __init__( + self, vis: Viseron, storage: Storage, interval_trigger: IntervalTrigger + ) -> None: + self._vis = vis + self._storage = storage + self._interval_trigger = interval_trigger + + self._last_log_time = 0.0 + self.kill_event = mp.Event() + + def _get_cameras(self) -> dict[str, AbstractCamera] | None: + """Get list of registered camera identifiers.""" + try: + return self._vis.get_registered_identifiers(CAMERA_DOMAIN) + except DomainNotRegisteredError: + return None + + @property + @abstractmethod + def name(self) -> str: + """Name of the cleanup job.""" + + @property + def interval_trigger(self) -> IntervalTrigger: + """Return the trigger interval for the cleanup job.""" + return self._interval_trigger + + @abstractmethod + def _run(self) -> None: + """Run the cleanup job.""" + + def _wrapped_run(self): + setproctitle.setproctitle(f"viseron_{self.name}") + if self._storage.engine is None: + LOGGER.debug("Engine has not been created, skipping cleanup job") + return + + self._storage.engine.dispose(close=False) + self._run() + + def run(self) -> None: + """Run the cleanup job using multiprocessing.""" + process = RestartableProcess( + name=self.name, target=self._wrapped_run, daemon=True, register=False + ) + process.start() + process.join() + + def log_progress(self, message: str): + """Log progress of the cleanup job. + + Throttled to only run every 5s so that it doesn't spam the logs. + """ + now = time.time() + if now - self._last_log_time > 5: + LOGGER.debug(message) + self._last_log_time = now + + +class BaseTableCleanupJob(BaseCleanupJob): + """Base class for database table cleanup jobs that use batch processing.""" + + def batch_delete_orphaned(self, session, table, path_column): + """Delete orphaned records in batches using cursor-based pagination. + + Args: + session: Database session + table: SQLAlchemy table to clean up + path_column: Column containing the file path to check against Files table + """ + total_deleted = 0 + last_id = 0 + now = time.time() + + # Calculate cutoff time for 5 minutes ago + cutoff_time = utcnow() - datetime.timedelta(minutes=5) + + while True: + if self.kill_event.is_set(): + break + + # Get next batch of records that need checking + batch = session.execute( + select(table.id, path_column) + .where( + and_( + table.id > last_id, + ~exists().where(Files.path == path_column), + table.created_at + < cutoff_time, # Only consider records older than 5 minutes + ) + ) + .order_by(table.id) + .limit(BATCH_SIZE) + ).all() + + if not batch: + break + + # Update cursor + last_id = batch[-1][0] + + # Get IDs to delete + batch_ids = [row[0] for row in batch] + + # Delete the batch + result = session.execute(delete(table).where(table.id.in_(batch_ids))) + session.commit() + total_deleted += result.rowcount + self.log_progress(f"{self.name} deleted {result.rowcount} records in batch") + time.sleep(1) + + LOGGER.debug( + "%s deleted %d total records, took %s", + self.name, + total_deleted, + time.time() - now, + ) + + +class OrphanedFilesCleanup(BaseCleanupJob): + """Cleanup job that removes files with no corresponding database records. + + Walks through recordings, segments and snapshots directories to find and delete + any files that don't have a matching record in the Files table. + """ + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_files" + + def _run(self) -> None: + """Run the job.""" + now = time.time() + LOGGER.debug("Running %s", self.name) + deleted_count = 0 + cameras = self._get_cameras() + if not cameras: + return + + paths = [] + for camera in cameras.values(): + paths += [ + self._storage.get_recordings_path(camera), + self._storage.get_segments_path(camera), + self._storage.get_thumbnails_path(camera), + ] + [ + self._storage.get_snapshots_path(camera, domain) + for camera in cameras.values() + for domain in SnapshotDomain + ] + + total_files_processed = 0 + with self._storage.get_session() as session: + for path in paths: + if self.kill_event.is_set(): + break + LOGGER.debug("%s checking %s", self.name, path) + for root, _, files in os.walk(path): + if self.kill_event.is_set(): + break + + files_processed = 0 + for file in files: + if self.kill_event.is_set(): + break + + total_files_processed += 1 + files_processed += 1 + if file in self._storage.ignored_files: + continue + file_path = os.path.join(root, file) + file_exists = session.execute( + select(Files).where(Files.path == file_path) + ).first() + if not file_exists and os.path.exists(file_path): + os.remove(file_path) + LOGGER.debug("%s deleted %s", self.name, file_path) + deleted_count += 1 + self.log_progress( + f"{self.name} processed {files_processed}/{len(files)} " + f"files in {root}", + ) + if total_files_processed % 100 == 0: + time.sleep(1) + LOGGER.debug( + f"{self.name} processed {files_processed}/{len(files)} " + f"files in {root}", + ) + + LOGGER.debug( + "%s deleted %d/%d processed files, took %s", + self.name, + deleted_count, + total_files_processed, + time.time() - now, + ) + + +class OrphanedDatabaseFilesCleanup(BaseCleanupJob): + """Cleanup job that removes rows from Files with no corresponding files on disk.""" + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_db_files" + + def _run(self) -> None: + """Run the job.""" + now = time.time() + LOGGER.debug("Running %s", self.name) + total_deleted = 0 + last_id = 0 + total_files_processed = 0 + + with self._storage.get_session() as session: + count = session.execute( + select( + func.count(), # pylint: disable=not-callable + ).select_from(Files) + ).scalar() + + while True: + if self.kill_event.is_set(): + break + + # Get next batch of files to check + time.sleep(1) + files = session.execute( + select(Files.id, Files.path) + .where(Files.id > last_id) + .order_by(Files.id) + .limit(BATCH_SIZE) + ).all() + + if not files: + break + + # Update cursor + last_id = files[-1][0] + + # Find records where files don't exist + to_delete = [ + file_id + for file_id, file_path in files + if not os.path.exists(file_path) + ] + + if to_delete: + result = session.execute( + delete(Files).where(Files.id.in_(to_delete)) + ) + session.commit() + total_deleted += result.rowcount + LOGGER.debug( + "%s deleted %d rows in batch", self.name, result.rowcount + ) + total_files_processed += len(files) + self.log_progress( + f"{self.name} processed {total_files_processed}/{count} files" + ) + + LOGGER.debug( + "%s deleted %d total database records for non-existent files, took %s", + self.name, + total_deleted, + time.time() - now, + ) + + +class EmptyFoldersCleanup(BaseCleanupJob): + """Cleanup job that removes empty directories from the storage locations. + + Walks through all storage paths (recordings, segments, thumbnails, snapshots) + and removes any empty directories encountered. Uses a bottom-up traversal to + ensure nested empty directories are handled properly. + """ + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_empty_folders" + + def _run(self) -> None: + """Run the job.""" + now = time.time() + LOGGER.debug("Running %s", self.name) + deleted_count = 0 + processed_count = 0 + cameras = self._get_cameras() + if not cameras: + return + + for camera in cameras.values(): + for path in [ + self._storage.get_recordings_path(camera), + self._storage.get_segments_path(camera), + self._storage.get_thumbnails_path(camera), + ] + [ + self._storage.get_snapshots_path(camera, domain) + for domain in SnapshotDomain + ]: + time.sleep(1) + for root, dirs, files in os.walk(path, topdown=False): + processed_count += 1 + self.log_progress( + f"{self.name} processed {processed_count} folders" + ) + if root == path: + continue + if not dirs and not files: + LOGGER.debug("Deleting folder %s", root) + os.rmdir(root) + deleted_count += 1 + + LOGGER.debug( + "%s deleted %d empty folders, took %s", + self.name, + deleted_count, + time.time() - now, + ) + + +class OrphanedThumbnailsCleanup(BaseCleanupJob): + """Cleanup job that removes thumbnail files with no database records.""" + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_thumbnails" + + def _run(self) -> None: + """Run the job.""" + now = time.time() + LOGGER.debug("Running %s", self.name) + deleted_count = 0 + total_files_processed = 0 + + cameras = self._get_cameras() + if not cameras: + return + + with self._storage.get_session() as session: + for camera in cameras.values(): + files_processed = 0 + thumbnails_path = self._storage.get_thumbnails_path(camera) + if not os.path.exists(thumbnails_path): + continue + + files_to_check: list[str] = [] + for root, _, files in os.walk(thumbnails_path): + files_to_check.extend( + os.path.join(root, f) + for f in files + if f not in self._storage.ignored_files + ) + + # Process files in batches + for i in range(0, len(files_to_check), BATCH_SIZE): + if self.kill_event.is_set(): + break + + batch = files_to_check[i : i + BATCH_SIZE] + existing_thumbnails = { + row[0] + for row in session.execute( + select(Recordings.thumbnail_path).where( + Recordings.thumbnail_path.in_(batch) + ) + ).all() + } + + # Delete files that don't exist in database + for file_path in batch: + if file_path not in existing_thumbnails and os.path.exists( + file_path + ): + os.remove(file_path) + deleted_count += 1 + total_files_processed += 1 + files_processed += 1 + + self.log_progress( + f"{self.name} processed " + f"{files_processed}/{len(files_to_check)} " + f"files in {thumbnails_path}" + ) + time.sleep(1) + LOGGER.debug( + f"{self.name} processed " + f"{files_processed}/{len(files_to_check)} " + f"files in {thumbnails_path}" + ) + + LOGGER.debug( + "%s deleted %d/%d orphaned thumbnails, took %s", + self.name, + deleted_count, + total_files_processed, + time.time() - now, + ) + + +class OrphanedClipsCleanup(BaseCleanupJob): + """Cleanup job that removes clip files with no corresponding database records.""" + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_clips" + + def _run(self) -> None: + """Run the job.""" + now = time.time() + LOGGER.debug("Running %s", self.name) + deleted_count = 0 + total_files_processed = 0 + + cameras = self._get_cameras() + if not cameras: + return + + with self._storage.get_session() as session: + for camera in cameras.values(): + files_processed = 0 + recordings_path = self._storage.get_recordings_path(camera) + if not os.path.exists(recordings_path): + continue + + # Collect all files first + files_to_check: list[str] = [] + for root, _, files in os.walk(recordings_path): + files_to_check.extend( + os.path.join(root, f) + for f in files + if f not in self._storage.ignored_files + ) + + # Process files in batches + for i in range(0, len(files_to_check), BATCH_SIZE): + if self.kill_event.is_set(): + break + + batch = files_to_check[i : i + BATCH_SIZE] + existing_clips = { + row[0] + for row in session.execute( + select(Recordings.clip_path).where( + Recordings.clip_path.in_(batch) + ) + ).all() + } + + # Delete files that don't exist in database + for file_path in batch: + if file_path not in existing_clips and os.path.exists( + file_path + ): + os.remove(file_path) + deleted_count += 1 + total_files_processed += 1 + files_processed += 1 + + self.log_progress( + f"{self.name} processed " + f"{files_processed}/{len(files_to_check)} " + f"files in {recordings_path}" + ) + time.sleep(1) + LOGGER.debug( + f"{self.name} processed " + f"{files_processed}/{len(files_to_check)} " + f"files in {recordings_path}" + ) + + LOGGER.debug( + "%s deleted %d/%d orphaned clips, took %s", + self.name, + deleted_count, + total_files_processed, + time.time() - now, + ) + + +class OrphanedRecordingsCleanup(BaseCleanupJob): + """Cleanup job that removes orphaned recording entries from the database.""" + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_recordings" + + def _run(self) -> None: + """Run the job.""" + now = time.time() + LOGGER.debug("Running %s", self.name) + total_deleted = 0 + total_processed = 0 + + with self._storage.get_session() as session: + count = session.execute( + select(func.count()) # pylint: disable=not-callable + .select_from(Recordings) + .where(Recordings.end_time.is_not(None)) + ).scalar() + + if not count: + return + + last_id = 0 + while True: + if self.kill_event.is_set(): + break + + # Get next batch of recordings + batch = session.execute( + select(Recordings) + .where( + and_(Recordings.id > last_id, Recordings.end_time.is_not(None)) + ) + .order_by(Recordings.id) + .limit(BATCH_SIZE) + ).all() + + if not batch: + break + + # Update cursor + last_id = batch[-1][0].id + + # Find orphaned recordings + to_delete = [] + for recording in batch: + if self.kill_event.is_set(): + break + has_segments = session.execute( + select(1) + .select_from(Files) + .join(FilesMeta, Files.path == FilesMeta.path) + .where( + and_( + Files.camera_identifier + == recording[0].camera_identifier, + FilesMeta.orig_ctime.between( + recording[0].start_time, recording[0].end_time + ), + ) + ) + .limit(1) + ).first() + time.sleep(0.1) + + if not has_segments: + to_delete.append(recording[0].id) + + if to_delete: + result = session.execute( + delete(Recordings).where(Recordings.id.in_(to_delete)) + ) + session.commit() + total_deleted += result.rowcount + + total_processed += len(batch) + self.log_progress( + f"{self.name} processed {total_processed}/{count} recordings" + ) + time.sleep(1) + + LOGGER.debug( + "%s deleted %d/%d orphaned recordings, took %s", + self.name, + total_deleted, + total_processed, + time.time() - now, + ) + + +class OrphanedPostProcessorResultsCleanup(BaseTableCleanupJob): + """Cleanup job that removes orphaned post-processor results from the database. + + Deletes records from the PostProcessorResults table where the referenced + snapshot file no longer exists in the Files table, ensuring that results + without associated files are removed. + """ + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_postprocessor_results" + + def _run(self) -> None: + """Run the job.""" + LOGGER.debug("Running %s", self.name) + with self._storage.get_session() as session: + self.batch_delete_orphaned( + session, PostProcessorResults, PostProcessorResults.snapshot_path + ) + + +class OrphanedObjectsCleanup(BaseTableCleanupJob): + """Cleanup job that removes orphaned object detection records from the database. + + Deletes records from the Objects table where the referenced snapshot file + no longer exists in the Files table, ensuring that object detections + without associated files are removed. + """ + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_objects" + + def _run(self) -> None: + """Run the job.""" + LOGGER.debug("Running %s", self.name) + with self._storage.get_session() as session: + self.batch_delete_orphaned(session, Objects, Objects.snapshot_path) + + +class OrphanedMotionCleanup(BaseTableCleanupJob): + """Cleanup job that removes orphaned motion detection records from the database. + + Deletes records from the Motion table where the referenced snapshot file + no longer exists in the Files table, ensuring that motion detections + without associated files are removed. + """ + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_motion" + + def _run(self) -> None: + """Run the job.""" + LOGGER.debug("Running %s", self.name) + with self._storage.get_session() as session: + self.batch_delete_orphaned(session, Motion, Motion.snapshot_path) + + +class OrphanedFilesMetaCleanup(BaseTableCleanupJob): + """Cleanup job that removes orphaned file meta records from the database. + + Deletes records from the FilesMeta table where the referenced path + no longer exists in the Files table. + """ + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_orphaned_files_meta" + + def _run(self) -> None: + """Run the job.""" + LOGGER.debug("Running %s", self.name) + with self._storage.get_session() as session: + self.batch_delete_orphaned(session, FilesMeta, FilesMeta.path) + + +class OldEventsCleanup(BaseCleanupJob): + """Cleanup job that removes old events from the database. + + Deletes records from the Events table that are older than a specified + number of days. + """ + + @property + def name(self) -> str: + """Return job name.""" + return "cleanup_old_events" + + def _run(self) -> None: + """Run the job.""" + now = time.time() + LOGGER.debug("Running %s", self.name) + with self._storage.get_session() as session: + stmt = delete(Events).where( + Events.created_at < utcnow() - datetime.timedelta(days=7) + ) + result = session.execute(stmt) + session.commit() + LOGGER.debug( + "%s deleted %d old events, took %s", + self.name, + result.rowcount, + time.time() - now, + ) + + +class CleanupManager: + """Manager class that handles scheduling and running of cleanup jobs. + + Initializes all cleanup jobs and manages their execution through a background + scheduler. Provides functionality to start and stop the cleanup process for + both filesystem and database maintenance tasks. + """ + + def __init__(self, vis: Viseron, storage: Storage): + self._vis = vis + self.jobs: list[BaseCleanupJob] = [ + OrphanedFilesCleanup( + vis, storage, CronTrigger(day_of_week="mon", hour=3, minute=0) + ), + OrphanedDatabaseFilesCleanup( + vis, storage, CronTrigger(day_of_week="wed", hour=3, minute=0) + ), + OrphanedFilesMetaCleanup( + vis, storage, CronTrigger(day_of_week="fri", hour=3, minute=0) + ), + EmptyFoldersCleanup(vis, storage, CronTrigger(hour=0, jitter=3600)), + OrphanedThumbnailsCleanup(vis, storage, CronTrigger(hour=0, jitter=3600)), + OrphanedClipsCleanup(vis, storage, CronTrigger(hour=0, jitter=3600)), + OrphanedRecordingsCleanup(vis, storage, CronTrigger(hour=0, jitter=3600)), + OrphanedPostProcessorResultsCleanup( + vis, storage, CronTrigger(hour=0, jitter=3600) + ), + OrphanedObjectsCleanup(vis, storage, CronTrigger(hour=0, jitter=3600)), + OrphanedMotionCleanup(vis, storage, CronTrigger(hour=0, jitter=3600)), + OldEventsCleanup(vis, storage, CronTrigger(hour=0, jitter=3600)), + ] + vis.register_signal_handler(VISERON_SIGNAL_SHUTDOWN, self.stop) + + def start(self): + """Start the cleanup scheduler.""" + for job in self.jobs: + self._vis.background_scheduler.add_job( + job.run, + trigger=job.interval_trigger, + name=job.name, + id=job.name, + max_instances=1, + coalesce=True, + ) + + def stop(self): + """Stop the cleanup scheduler.""" + LOGGER.debug("Stopping cleanup jobs") + for job in self.jobs: + LOGGER.debug("Sending kill event to %s", job.name) + job.kill_event.set() diff --git a/viseron/components/storage/models.py b/viseron/components/storage/models.py index 3eb0e95d2..0a663ed35 100644 --- a/viseron/components/storage/models.py +++ b/viseron/components/storage/models.py @@ -10,6 +10,7 @@ ColumnElement, DateTime, Float, + Index, Integer, Label, LargeBinary, @@ -103,6 +104,11 @@ class Files(Base): __tablename__ = "files" + __table_args__ = ( + Index("idx_files_path", "path"), + Index("idx_files_camera_id", "camera_identifier"), + ) + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) tier_id: Mapped[int] = mapped_column(Integer) tier_path: Mapped[str] = mapped_column(String) @@ -129,6 +135,11 @@ class FilesMeta(Base): __tablename__ = "files_meta" + __table_args__ = ( + Index("idx_files_meta_path", "path"), + Index("idx_files_meta_orig_ctime", "orig_ctime"), + ) + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) path: Mapped[str] = mapped_column(String, unique=True) orig_ctime = mapped_column(UTCDateTime(timezone=False), nullable=False) @@ -153,6 +164,14 @@ class Recordings(Base): __tablename__ = "recordings" + __table_args__ = ( + Index( + "idx_recordings_camera_times", "camera_identifier", "start_time", "end_time" + ), + Index("idx_recordings_thumbnail", "thumbnail_path"), + Index("idx_recordings_clip", "clip_path"), + ) + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) camera_identifier: Mapped[str] = mapped_column(String) start_time: Mapped[datetime.datetime] = mapped_column(UTCDateTime(timezone=False)) @@ -192,6 +211,8 @@ class Objects(Base): __tablename__ = "objects" + __table_args__ = (Index("idx_objects_snapshot", "snapshot_path"),) + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) camera_identifier: Mapped[str] = mapped_column(String) label: Mapped[str] = mapped_column(String) @@ -217,6 +238,8 @@ class Motion(Base): __tablename__ = "motion" + __table_args__ = (Index("idx_motion_snapshot", "snapshot_path"),) + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) camera_identifier: Mapped[str] = mapped_column(String) start_time: Mapped[datetime.datetime] = mapped_column(UTCDateTime(timezone=False)) @@ -253,6 +276,8 @@ class PostProcessorResults(Base): __tablename__ = "post_processor_results" + __table_args__ = (Index("idx_ppr_snapshot", "snapshot_path"),) + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) camera_identifier: Mapped[str] = mapped_column(String) domain: Mapped[str] = mapped_column(String) diff --git a/viseron/components/storage/tier_handler.py b/viseron/components/storage/tier_handler.py index bb35fdc72..541dd866d 100644 --- a/viseron/components/storage/tier_handler.py +++ b/viseron/components/storage/tier_handler.py @@ -92,7 +92,7 @@ def __init__( next_tier: dict[str, Any] | None, ) -> None: self._logger = logging.getLogger( - f"{__name__}.{camera.identifier}.tier_{tier_id}" + f"{__name__}.{camera.identifier}.tier_{tier_id}.{category}.{subcategory}" ) super().__init__() @@ -191,9 +191,9 @@ def check_tier(self) -> None: time_since_last_call = now - self._time_of_last_call if time_since_last_call < self._throttle_period: return + self._time_of_last_call = now self._check_tier(self._storage.get_session) - self._time_of_last_call = now def _check_tier(self, get_session: Callable[[], Session]) -> None: file_ids = None @@ -320,6 +320,8 @@ def _on_deleted(self, event: FileDeletedEvent) -> None: with self._storage.get_session() as session: stmt = delete(Files).where(Files.path == event.src_path) session.execute(stmt) + stmt = delete(FilesMeta).where(FilesMeta.path == event.src_path) + session.execute(stmt) session.commit() self._vis.dispatch_event( @@ -347,6 +349,7 @@ def _shutdown(self) -> None: self._storage, self._storage.get_session, self._category, + self._subcategory, self._tier_id, self._camera.identifier, self._tier, @@ -665,16 +668,11 @@ def initialize(self): self._camera.identifier, ) self.add_file_handler(self._path, rf"{self._path}/(.*.jpg$)") + self._storage.ignore_file("latest_thumbnail.jpg") def check_tier(self) -> None: """Do nothing, as we don't want to move thumbnails.""" - def on_any_event(self, event: FileSystemEvent) -> None: - """Ignore changes to latest_thumbnail.jpg.""" - if os.path.basename(event.src_path) == "latest_thumbnail.jpg": - return - return super().on_any_event(event) - def _on_created(self, event: FileCreatedEvent) -> None: try: with self._storage.get_session() as session: @@ -865,6 +863,8 @@ def handle_file( with get_session() as session: stmt = delete(Files).where(Files.path == path) session.execute(stmt) + stmt = delete(FilesMeta).where(FilesMeta.path == path) + session.execute(stmt) session.commit() @@ -890,18 +890,31 @@ def move_file( ) session.execute(ins) session.commit() - except IntegrityError: - logger.error(f"Failed to insert metadata for {dst}", exc_info=True) + except IntegrityError as error: + logger.debug(f"Failed to insert metadata for {dst}: {error}") + except NoResultFound as error: + logger.debug(f"Failed to find metadata for {src}: {error}") + with get_session() as session: + stmt = delete(Files).where(Files.path == src) + session.execute(stmt) + session.commit() + try: + os.remove(src) + except FileNotFoundError as _error: + logger.debug(f"Failed to delete file {src}: {_error}") + return try: os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.copy(src, dst) os.remove(src) except FileNotFoundError as error: - logger.error(f"Failed to move file {src} to {dst}: {error}") + logger.debug(f"Failed to move file {src} to {dst}: {error}") with get_session() as session: stmt = delete(Files).where(Files.path == src) session.execute(stmt) + stmt = delete(FilesMeta).where(FilesMeta.path == src) + session.execute(stmt) session.commit() @@ -915,12 +928,14 @@ def delete_file( with get_session() as session: stmt = delete(Files).where(Files.path == path) session.execute(stmt) + stmt = delete(FilesMeta).where(FilesMeta.path == path) + session.execute(stmt) session.commit() try: os.remove(path) except FileNotFoundError as error: - logger.error(f"Failed to delete file {path}: {error}") + logger.debug(f"Failed to delete file {path}: {error}") def get_files_to_move( @@ -1005,6 +1020,7 @@ def force_move_files( storage: Storage, get_session: Callable[..., Session], category: str, + subcategory: str, tier_id: int, camera_identifier: str, curr_tier: dict[str, Any], @@ -1012,14 +1028,15 @@ def force_move_files( logger: logging.Logger, ) -> None: """Get and move/delete all files in tier.""" - with get_session() as session: + with get_session(expire_on_commit=False) as session: stmt = ( - select(Files) - .where(Files.category == category) - .where(Files.tier_id == tier_id) + select(Files.path, Files.tier_path) .where(Files.camera_identifier == camera_identifier) + .where(Files.tier_id == tier_id) + .where(Files.category == category) + .where(Files.subcategory == subcategory) ) - result = session.execute(stmt) + result = session.execute(stmt).all() for file in result: handle_file( get_session, @@ -1027,8 +1044,8 @@ def force_move_files( camera_identifier, curr_tier, next_tier, - file.path, - file.tier_path, + file[0], + file[1], logger, ) session.commit() diff --git a/viseron/components/storage/triggers.py b/viseron/components/storage/triggers.py index 08180fe77..06ac9d977 100644 --- a/viseron/components/storage/triggers.py +++ b/viseron/components/storage/triggers.py @@ -2,7 +2,7 @@ import logging -from sqlalchemy import Connection, delete, event +from sqlalchemy import Connection, event from sqlalchemy.dialects.postgresql import insert from viseron.components.storage.models import Files, FilesMeta @@ -32,22 +32,6 @@ def insert_into_files_meta( ) -def delete_from_files_meta( - conn: Connection, - clauseelement, - _multiparams, - _params, - _execution_options, -) -> None: - """Delete a row from FilesMeta when a row is deleted from Files.""" - if clauseelement.is_delete and clauseelement.table.name == Files.__tablename__: - compiled = clauseelement.compile() - conn.execute( - delete(FilesMeta).where(FilesMeta.path == compiled.params["path_1"]) - ) - - def setup_triggers(engine) -> None: """Set up database triggers.""" event.listen(engine, "before_execute", insert_into_files_meta) - event.listen(engine, "after_execute", delete_from_files_meta) diff --git a/viseron/components/storage/util.py b/viseron/components/storage/util.py index 7ed3b5978..d09ecf527 100644 --- a/viseron/components/storage/util.py +++ b/viseron/components/storage/util.py @@ -17,6 +17,7 @@ CONFIG_PATH, ) from viseron.events import EventData +from viseron.types import SnapshotDomain if TYPE_CHECKING: from viseron.domains.camera import AbstractCamera, FailedCamera @@ -71,10 +72,10 @@ def get_thumbnails_path( def get_snapshots_path( tier: dict[str, Any], camera: AbstractCamera | FailedCamera, - domain: str, + domain: SnapshotDomain, ) -> str: """Get snapshots path for camera.""" - return os.path.join(tier[CONFIG_PATH], "snapshots", domain, camera.identifier) + return os.path.join(tier[CONFIG_PATH], "snapshots", domain.value, camera.identifier) def files_to_move_overlap(events_file_ids, continuous_file_ids): diff --git a/viseron/components/webserver/request_handler.py b/viseron/components/webserver/request_handler.py index 1e637d953..cf14411bc 100644 --- a/viseron/components/webserver/request_handler.py +++ b/viseron/components/webserver/request_handler.py @@ -3,7 +3,6 @@ import hmac import logging -import time from collections.abc import Callable from datetime import timedelta from http import HTTPStatus @@ -18,7 +17,7 @@ from viseron.const import DOMAIN_FAILED from viseron.domains.camera.const import DOMAIN as CAMERA_DOMAIN from viseron.exceptions import DomainNotRegisteredError -from viseron.helpers import utcnow +from viseron.helpers import get_utc_offset, utcnow if TYPE_CHECKING: from viseron import Viseron @@ -93,7 +92,7 @@ def utc_offset(self) -> timedelta: return timedelta(minutes=int(header)) if cookie := self.get_cookie("X-Client-UTC-Offset", None): return timedelta(minutes=int(cookie)) - return timedelta(seconds=time.localtime().tm_gmtoff) + return get_utc_offset() def on_finish(self) -> None: """Log requests with failed authentication.""" diff --git a/viseron/components/webserver/stream_handler.py b/viseron/components/webserver/stream_handler.py index f5a69b27e..1042d13d3 100644 --- a/viseron/components/webserver/stream_handler.py +++ b/viseron/components/webserver/stream_handler.py @@ -15,7 +15,7 @@ from viseron.components.nvr.const import DATA_PROCESSED_FRAME_TOPIC from viseron.components.nvr.nvr import NVR, DataProcessedFrame from viseron.const import TOPIC_STATIC_MJPEG_STREAMS -from viseron.domains.camera import MJPEG_STREAM_SCHEMA +from viseron.domains.camera.config import MJPEG_STREAM_SCHEMA from viseron.domains.motion_detector import AbstractMotionDetectorScanner from viseron.helpers import ( draw_contours, diff --git a/viseron/domains/camera/__init__.py b/viseron/domains/camera/__init__.py index 357ee0077..6aeffe518 100644 --- a/viseron/domains/camera/__init__.py +++ b/viseron/domains/camera/__init__.py @@ -9,31 +9,20 @@ from dataclasses import dataclass from functools import lru_cache from threading import Event, Timer -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any from uuid import uuid4 import cv2 import imutils -import voluptuous as vol from sqlalchemy import or_, select +from typing_extensions import assert_never from viseron.components import DomainToSetup from viseron.components.data_stream import ( COMPONENT as DATA_STREAM_COMPONENT, DataStream, ) -from viseron.components.storage.config import TIER_SCHEMA_BASE, TIER_SCHEMA_RECORDER -from viseron.components.storage.const import ( - COMPONENT as STORAGE_COMPONENT, - CONFIG_CONTINUOUS, - CONFIG_EVENTS, - CONFIG_TIERS, - DEFAULT_CONTINUOUS, - DEFAULT_EVENTS, - DESC_CONTINUOUS, - DESC_EVENTS, - DESC_RECORDER_TIERS, -) +from viseron.components.storage.const import COMPONENT as STORAGE_COMPONENT from viseron.components.storage.models import Files from viseron.components.webserver.const import COMPONENT as WEBSERVER_COMPONENT from viseron.const import TEMP_DIR @@ -51,113 +40,21 @@ zoom_boundingbox, ) from viseron.helpers.logs import SensitiveInformationFilter -from viseron.helpers.validators import CoerceNoneToDict, Deprecated, Maybe, Slug +from viseron.types import SnapshotDomain from .const import ( - AUTHENTICATION_BASIC, - AUTHENTICATION_DIGEST, - CONFIG_AUTHENTICATION, - CONFIG_CREATE_EVENT_CLIP, - CONFIG_EXTENSION, - CONFIG_FILENAME_PATTERN, - CONFIG_FOLDER, - CONFIG_IDLE_TIMEOUT, - CONFIG_LOOKBACK, - CONFIG_MAX_RECORDING_TIME, - CONFIG_MJPEG_DRAW_MOTION, - CONFIG_MJPEG_DRAW_MOTION_MASK, - CONFIG_MJPEG_DRAW_OBJECT_MASK, - CONFIG_MJPEG_DRAW_OBJECTS, - CONFIG_MJPEG_DRAW_ZONES, - CONFIG_MJPEG_HEIGHT, - CONFIG_MJPEG_MIRROR, - CONFIG_MJPEG_ROTATE, CONFIG_MJPEG_STREAMS, - CONFIG_MJPEG_WIDTH, CONFIG_NAME, CONFIG_PASSWORD, - CONFIG_RECORDER, CONFIG_REFRESH_INTERVAL, - CONFIG_RETAIN, - CONFIG_SAVE_TO_DISK, CONFIG_STILL_IMAGE, - CONFIG_STORAGE, - CONFIG_THUMBNAIL, - CONFIG_URL, - CONFIG_USERNAME, - DEFAULT_AUTHENTICATION, - DEFAULT_CREATE_EVENT_CLIP, - DEFAULT_FILENAME_PATTERN, - DEFAULT_IDLE_TIMEOUT, - DEFAULT_LOOKBACK, - DEFAULT_MAX_RECORDING_TIME, - DEFAULT_MJPEG_DRAW_MOTION, - DEFAULT_MJPEG_DRAW_MOTION_MASK, - DEFAULT_MJPEG_DRAW_OBJECT_MASK, - DEFAULT_MJPEG_DRAW_OBJECTS, - DEFAULT_MJPEG_DRAW_ZONES, - DEFAULT_MJPEG_HEIGHT, - DEFAULT_MJPEG_MIRROR, - DEFAULT_MJPEG_ROTATE, - DEFAULT_MJPEG_STREAMS, - DEFAULT_MJPEG_WIDTH, - DEFAULT_NAME, - DEFAULT_PASSWORD, - DEFAULT_RECORDER, - DEFAULT_REFRESH_INTERVAL, - DEFAULT_SAVE_TO_DISK, - DEFAULT_STILL_IMAGE, - DEFAULT_STORAGE, - DEFAULT_THUMBNAIL, - DEFAULT_URL, - DEFAULT_USERNAME, - DEPRECATED_EXTENSION, - DEPRECATED_FILENAME_PATTERN_THUMBNAIL, - DEPRECATED_FOLDER, - DEPRECATED_RETAIN, - DESC_AUTHENTICATION, - DESC_CREATE_EVENT_CLIP, - DESC_EXTENSION, - DESC_FILENAME_PATTERN, - DESC_FILENAME_PATTERN_THUMBNAIL, - DESC_FOLDER, - DESC_IDLE_TIMEOUT, - DESC_LOOKBACK, - DESC_MAX_RECORDING_TIME, - DESC_MJPEG_DRAW_MOTION, - DESC_MJPEG_DRAW_MOTION_MASK, - DESC_MJPEG_DRAW_OBJECT_MASK, - DESC_MJPEG_DRAW_OBJECTS, - DESC_MJPEG_DRAW_ZONES, - DESC_MJPEG_HEIGHT, - DESC_MJPEG_MIRROR, - DESC_MJPEG_ROTATE, - DESC_MJPEG_STREAM, - DESC_MJPEG_STREAMS, - DESC_MJPEG_WIDTH, - DESC_NAME, - DESC_PASSWORD, - DESC_RECORDER, - DESC_REFRESH_INTERVAL, - DESC_RETAIN, - DESC_SAVE_TO_DISK, - DESC_STILL_IMAGE, - DESC_STORAGE, - DESC_THUMBNAIL, - DESC_URL, - DESC_USERNAME, EVENT_CAMERA_STARTED, EVENT_CAMERA_STOPPED, EVENT_STATUS, EVENT_STATUS_CONNECTED, EVENT_STATUS_DISCONNECTED, - INCLUSION_GROUP_AUTHENTICATION, UPDATE_TOKEN_INTERVAL_MINUTES, VIDEO_CONTAINER, - WARNING_EXTENSION, - WARNING_FILENAME_PATTERN_THUMBNAIL, - WARNING_FOLDER, - WARNING_RETAIN, ) from .entity.binary_sensor import ConnectionStatusBinarySensor from .entity.toggle import CameraConnectionToggle @@ -175,200 +72,6 @@ from .shared_frames import SharedFrame -MJPEG_STREAM_SCHEMA = vol.Schema( - { - vol.Optional( - CONFIG_MJPEG_WIDTH, - default=DEFAULT_MJPEG_WIDTH, - description=DESC_MJPEG_WIDTH, - ): vol.Coerce(int), - vol.Optional( - CONFIG_MJPEG_HEIGHT, - default=DEFAULT_MJPEG_HEIGHT, - description=DESC_MJPEG_HEIGHT, - ): vol.Coerce(int), - vol.Optional( - CONFIG_MJPEG_DRAW_OBJECTS, - default=DEFAULT_MJPEG_DRAW_OBJECTS, - description=DESC_MJPEG_DRAW_OBJECTS, - ): vol.Coerce(bool), - vol.Optional( - CONFIG_MJPEG_DRAW_MOTION, - default=DEFAULT_MJPEG_DRAW_MOTION, - description=DESC_MJPEG_DRAW_MOTION, - ): vol.Coerce(bool), - vol.Optional( - CONFIG_MJPEG_DRAW_MOTION_MASK, - default=DEFAULT_MJPEG_DRAW_MOTION_MASK, - description=DESC_MJPEG_DRAW_MOTION_MASK, - ): vol.Coerce(bool), - vol.Optional( - CONFIG_MJPEG_DRAW_OBJECT_MASK, - default=DEFAULT_MJPEG_DRAW_OBJECT_MASK, - description=DESC_MJPEG_DRAW_OBJECT_MASK, - ): vol.Coerce(bool), - vol.Optional( - CONFIG_MJPEG_DRAW_ZONES, - default=DEFAULT_MJPEG_DRAW_ZONES, - description=DESC_MJPEG_DRAW_ZONES, - ): vol.Coerce(bool), - vol.Optional( - CONFIG_MJPEG_ROTATE, - default=DEFAULT_MJPEG_ROTATE, - description=DESC_MJPEG_ROTATE, - ): vol.Coerce(int), - vol.Optional( - CONFIG_MJPEG_MIRROR, - default=DEFAULT_MJPEG_MIRROR, - description=DESC_MJPEG_MIRROR, - ): vol.Coerce(bool), - } -) - -THUMBNAIL_SCHEMA = vol.Schema( - { - vol.Optional( - CONFIG_SAVE_TO_DISK, - default=DEFAULT_SAVE_TO_DISK, - description=DESC_SAVE_TO_DISK, - ): bool, - Deprecated( - CONFIG_FILENAME_PATTERN, - description=DESC_FILENAME_PATTERN_THUMBNAIL, - message=DEPRECATED_FILENAME_PATTERN_THUMBNAIL, - warning=WARNING_FILENAME_PATTERN_THUMBNAIL, - ): str, - } -) - - -RECORDER_SCHEMA = vol.Schema( - { - vol.Optional( - CONFIG_LOOKBACK, default=DEFAULT_LOOKBACK, description=DESC_LOOKBACK - ): vol.All(int, vol.Range(min=0)), - vol.Optional( - CONFIG_IDLE_TIMEOUT, - default=DEFAULT_IDLE_TIMEOUT, - description=DESC_IDLE_TIMEOUT, - ): vol.All(int, vol.Range(min=0)), - vol.Optional( - CONFIG_MAX_RECORDING_TIME, - default=DEFAULT_MAX_RECORDING_TIME, - description=DESC_MAX_RECORDING_TIME, - ): vol.All(int, vol.Range(min=0)), - Deprecated( - CONFIG_RETAIN, - description=DESC_RETAIN, - message=DEPRECATED_RETAIN, - warning=WARNING_RETAIN, - ): vol.All(int, vol.Range(min=1)), - Deprecated( - CONFIG_FOLDER, - description=DESC_FOLDER, - message=DEPRECATED_FOLDER, - warning=WARNING_FOLDER, - ): str, - vol.Optional( - CONFIG_FILENAME_PATTERN, - default=DEFAULT_FILENAME_PATTERN, - description=DESC_FILENAME_PATTERN, - ): str, - Deprecated( - CONFIG_EXTENSION, - description=DESC_EXTENSION, - message=DEPRECATED_EXTENSION, - warning=WARNING_EXTENSION, - ): str, - vol.Optional( - CONFIG_THUMBNAIL, default=DEFAULT_THUMBNAIL, description=DESC_THUMBNAIL - ): vol.All(CoerceNoneToDict(), THUMBNAIL_SCHEMA), - vol.Optional( - CONFIG_STORAGE, - default=DEFAULT_STORAGE, - description=DESC_STORAGE, - ): Maybe( - { - vol.Required(CONFIG_TIERS, description=DESC_RECORDER_TIERS,): vol.All( - [TIER_SCHEMA_RECORDER], - vol.Length(min=1), - ) - }, - ), - vol.Optional( - CONFIG_CONTINUOUS, - default=DEFAULT_CONTINUOUS, - description=DESC_CONTINUOUS, - ): Maybe(TIER_SCHEMA_BASE), - vol.Optional( - CONFIG_EVENTS, - default=DEFAULT_EVENTS, - description=DESC_EVENTS, - ): Maybe(TIER_SCHEMA_BASE), - vol.Optional( - CONFIG_CREATE_EVENT_CLIP, - default=DEFAULT_CREATE_EVENT_CLIP, - description=DESC_CREATE_EVENT_CLIP, - ): bool, - } -) - -STILL_IMAGE_SCHEMA = vol.Schema( - { - vol.Optional( - CONFIG_URL, - default=DEFAULT_URL, - description=DESC_URL, - ): Maybe(str), - vol.Inclusive( - CONFIG_USERNAME, - INCLUSION_GROUP_AUTHENTICATION, - default=DEFAULT_USERNAME, - description=DESC_USERNAME, - ): Maybe(str), - vol.Inclusive( - CONFIG_PASSWORD, - INCLUSION_GROUP_AUTHENTICATION, - default=DEFAULT_PASSWORD, - description=DESC_PASSWORD, - ): Maybe(str), - vol.Optional( - CONFIG_AUTHENTICATION, - default=DEFAULT_AUTHENTICATION, - description=DESC_AUTHENTICATION, - ): Maybe(vol.In([AUTHENTICATION_BASIC, AUTHENTICATION_DIGEST])), - vol.Optional( - CONFIG_REFRESH_INTERVAL, - default=DEFAULT_REFRESH_INTERVAL, - description=DESC_REFRESH_INTERVAL, - ): vol.All(int, vol.Range(min=1)), - } -) - -BASE_CONFIG_SCHEMA = vol.Schema( - { - vol.Optional(CONFIG_NAME, default=DEFAULT_NAME, description=DESC_NAME): vol.All( - str, vol.Length(min=1) - ), - vol.Optional( - CONFIG_MJPEG_STREAMS, - default=DEFAULT_MJPEG_STREAMS, - description=DESC_MJPEG_STREAMS, - ): vol.All( - CoerceNoneToDict(), - {Slug(description=DESC_MJPEG_STREAM): MJPEG_STREAM_SCHEMA}, - ), - vol.Optional( - CONFIG_RECORDER, default=DEFAULT_RECORDER, description=DESC_RECORDER - ): vol.All(CoerceNoneToDict(), RECORDER_SCHEMA), - vol.Optional( - CONFIG_STILL_IMAGE, - default=DEFAULT_STILL_IMAGE, - description=DESC_STILL_IMAGE, - ): vol.All(CoerceNoneToDict(), STILL_IMAGE_SCHEMA), - } -) - LOGGER = logging.getLogger(__name__) @@ -394,6 +97,7 @@ def __init__(self, vis: Viseron, component: str, config, identifier: str) -> Non self._connected: bool = False self.stopped = Event() + self.stopped.set() self._data_stream: DataStream = vis.data[DATA_STREAM_COMPONENT] self.current_frame: SharedFrame | None = None self.shared_frames = SharedFrames(vis) @@ -421,16 +125,16 @@ def __init__(self, vis: Viseron, component: str, config, identifier: str) -> Non self.thumbnails_folder: str = self._storage.get_thumbnails_path(self) self.temp_segments_folder: str = TEMP_DIR + self.segments_folder self.snapshots_object_folder: str = self._storage.get_snapshots_path( - self, "object_detector" + self, SnapshotDomain.OBJECT_DETECTOR ) self.snapshots_face_folder: str = self._storage.get_snapshots_path( - self, "face_recognition" + self, SnapshotDomain.FACE_RECOGNITION ) self.snapshots_license_plate_folder: str = self._storage.get_snapshots_path( - self, "license_plate_recognition" + self, SnapshotDomain.LICENSE_PLATE_RECOGNITION ) self.snapshots_motion_folder: str = self._storage.get_snapshots_path( - self, "motion_detector" + self, SnapshotDomain.MOTION_DETECTOR ) self.fragmenter: Fragmenter = Fragmenter(vis, self) @@ -659,13 +363,21 @@ def get_snapshot( return ret, jpg.tobytes() return ret, False + def _get_folder(self, domain: SnapshotDomain) -> str: + if domain is SnapshotDomain.OBJECT_DETECTOR: + return self.snapshots_object_folder + if domain is SnapshotDomain.FACE_RECOGNITION: + return self.snapshots_face_folder + if domain is SnapshotDomain.LICENSE_PLATE_RECOGNITION: + return self.snapshots_license_plate_folder + if domain == SnapshotDomain.MOTION_DETECTOR: + return self.snapshots_motion_folder + assert_never(domain) + def save_snapshot( self, shared_frame: SharedFrame, - domain: Literal["object_detector"] - | Literal["face_recognition"] - | Literal["license_plate_recognition"] - | Literal["motion_detector"], + domain: SnapshotDomain, zoom_coordinates: tuple[float, float, float, float] | None = None, detected_object: DetectedObject | None = None, bbox: tuple[float, float, float, float] | None = None, @@ -692,16 +404,7 @@ def save_snapshot( crop_correction_factor=1.2, ) - if domain == "object_detector": - folder = self.snapshots_object_folder - elif domain == "face_recognition": - folder = self.snapshots_face_folder - elif domain == "license_plate_recognition": - folder = self.snapshots_license_plate_folder - elif domain == "motion_detector": - folder = self.snapshots_motion_folder - else: - raise ValueError(f"Invalid domain {domain}") + folder = self._get_folder(domain) if subfolder: folder = os.path.join(folder, subfolder) diff --git a/viseron/domains/camera/config.py b/viseron/domains/camera/config.py new file mode 100644 index 000000000..fac16fe4e --- /dev/null +++ b/viseron/domains/camera/config.py @@ -0,0 +1,309 @@ +"""Camera domain config.""" +import voluptuous as vol + +from viseron.components.storage.config import TIER_SCHEMA_BASE, TIER_SCHEMA_RECORDER +from viseron.components.storage.const import ( + CONFIG_CONTINUOUS, + CONFIG_EVENTS, + CONFIG_TIERS, + DEFAULT_CONTINUOUS, + DEFAULT_EVENTS, + DESC_CONTINUOUS, + DESC_EVENTS, + DESC_RECORDER_TIERS, +) +from viseron.helpers.validators import CoerceNoneToDict, Deprecated, Maybe, Slug + +from .const import ( + AUTHENTICATION_BASIC, + AUTHENTICATION_DIGEST, + CONFIG_AUTHENTICATION, + CONFIG_CREATE_EVENT_CLIP, + CONFIG_EXTENSION, + CONFIG_FILENAME_PATTERN, + CONFIG_FOLDER, + CONFIG_IDLE_TIMEOUT, + CONFIG_LOOKBACK, + CONFIG_MAX_RECORDING_TIME, + CONFIG_MJPEG_DRAW_MOTION, + CONFIG_MJPEG_DRAW_MOTION_MASK, + CONFIG_MJPEG_DRAW_OBJECT_MASK, + CONFIG_MJPEG_DRAW_OBJECTS, + CONFIG_MJPEG_DRAW_ZONES, + CONFIG_MJPEG_HEIGHT, + CONFIG_MJPEG_MIRROR, + CONFIG_MJPEG_ROTATE, + CONFIG_MJPEG_STREAMS, + CONFIG_MJPEG_WIDTH, + CONFIG_NAME, + CONFIG_PASSWORD, + CONFIG_RECORDER, + CONFIG_REFRESH_INTERVAL, + CONFIG_RETAIN, + CONFIG_SAVE_TO_DISK, + CONFIG_STILL_IMAGE, + CONFIG_STORAGE, + CONFIG_THUMBNAIL, + CONFIG_URL, + CONFIG_USERNAME, + DEFAULT_AUTHENTICATION, + DEFAULT_CREATE_EVENT_CLIP, + DEFAULT_FILENAME_PATTERN, + DEFAULT_IDLE_TIMEOUT, + DEFAULT_LOOKBACK, + DEFAULT_MAX_RECORDING_TIME, + DEFAULT_MJPEG_DRAW_MOTION, + DEFAULT_MJPEG_DRAW_MOTION_MASK, + DEFAULT_MJPEG_DRAW_OBJECT_MASK, + DEFAULT_MJPEG_DRAW_OBJECTS, + DEFAULT_MJPEG_DRAW_ZONES, + DEFAULT_MJPEG_HEIGHT, + DEFAULT_MJPEG_MIRROR, + DEFAULT_MJPEG_ROTATE, + DEFAULT_MJPEG_STREAMS, + DEFAULT_MJPEG_WIDTH, + DEFAULT_NAME, + DEFAULT_PASSWORD, + DEFAULT_RECORDER, + DEFAULT_REFRESH_INTERVAL, + DEFAULT_SAVE_TO_DISK, + DEFAULT_STILL_IMAGE, + DEFAULT_STORAGE, + DEFAULT_THUMBNAIL, + DEFAULT_URL, + DEFAULT_USERNAME, + DEPRECATED_EXTENSION, + DEPRECATED_FILENAME_PATTERN_THUMBNAIL, + DEPRECATED_FOLDER, + DEPRECATED_RETAIN, + DESC_AUTHENTICATION, + DESC_CREATE_EVENT_CLIP, + DESC_EXTENSION, + DESC_FILENAME_PATTERN, + DESC_FILENAME_PATTERN_THUMBNAIL, + DESC_FOLDER, + DESC_IDLE_TIMEOUT, + DESC_LOOKBACK, + DESC_MAX_RECORDING_TIME, + DESC_MJPEG_DRAW_MOTION, + DESC_MJPEG_DRAW_MOTION_MASK, + DESC_MJPEG_DRAW_OBJECT_MASK, + DESC_MJPEG_DRAW_OBJECTS, + DESC_MJPEG_DRAW_ZONES, + DESC_MJPEG_HEIGHT, + DESC_MJPEG_MIRROR, + DESC_MJPEG_ROTATE, + DESC_MJPEG_STREAM, + DESC_MJPEG_STREAMS, + DESC_MJPEG_WIDTH, + DESC_NAME, + DESC_PASSWORD, + DESC_RECORDER, + DESC_REFRESH_INTERVAL, + DESC_RETAIN, + DESC_SAVE_TO_DISK, + DESC_STILL_IMAGE, + DESC_STORAGE, + DESC_THUMBNAIL, + DESC_URL, + DESC_USERNAME, + INCLUSION_GROUP_AUTHENTICATION, + WARNING_EXTENSION, + WARNING_FILENAME_PATTERN_THUMBNAIL, + WARNING_FOLDER, + WARNING_RETAIN, +) + +MJPEG_STREAM_SCHEMA = vol.Schema( + { + vol.Optional( + CONFIG_MJPEG_WIDTH, + default=DEFAULT_MJPEG_WIDTH, + description=DESC_MJPEG_WIDTH, + ): vol.Coerce(int), + vol.Optional( + CONFIG_MJPEG_HEIGHT, + default=DEFAULT_MJPEG_HEIGHT, + description=DESC_MJPEG_HEIGHT, + ): vol.Coerce(int), + vol.Optional( + CONFIG_MJPEG_DRAW_OBJECTS, + default=DEFAULT_MJPEG_DRAW_OBJECTS, + description=DESC_MJPEG_DRAW_OBJECTS, + ): vol.Coerce(bool), + vol.Optional( + CONFIG_MJPEG_DRAW_MOTION, + default=DEFAULT_MJPEG_DRAW_MOTION, + description=DESC_MJPEG_DRAW_MOTION, + ): vol.Coerce(bool), + vol.Optional( + CONFIG_MJPEG_DRAW_MOTION_MASK, + default=DEFAULT_MJPEG_DRAW_MOTION_MASK, + description=DESC_MJPEG_DRAW_MOTION_MASK, + ): vol.Coerce(bool), + vol.Optional( + CONFIG_MJPEG_DRAW_OBJECT_MASK, + default=DEFAULT_MJPEG_DRAW_OBJECT_MASK, + description=DESC_MJPEG_DRAW_OBJECT_MASK, + ): vol.Coerce(bool), + vol.Optional( + CONFIG_MJPEG_DRAW_ZONES, + default=DEFAULT_MJPEG_DRAW_ZONES, + description=DESC_MJPEG_DRAW_ZONES, + ): vol.Coerce(bool), + vol.Optional( + CONFIG_MJPEG_ROTATE, + default=DEFAULT_MJPEG_ROTATE, + description=DESC_MJPEG_ROTATE, + ): vol.Coerce(int), + vol.Optional( + CONFIG_MJPEG_MIRROR, + default=DEFAULT_MJPEG_MIRROR, + description=DESC_MJPEG_MIRROR, + ): vol.Coerce(bool), + } +) + +THUMBNAIL_SCHEMA = vol.Schema( + { + vol.Optional( + CONFIG_SAVE_TO_DISK, + default=DEFAULT_SAVE_TO_DISK, + description=DESC_SAVE_TO_DISK, + ): bool, + Deprecated( + CONFIG_FILENAME_PATTERN, + description=DESC_FILENAME_PATTERN_THUMBNAIL, + message=DEPRECATED_FILENAME_PATTERN_THUMBNAIL, + warning=WARNING_FILENAME_PATTERN_THUMBNAIL, + ): str, + } +) + + +RECORDER_SCHEMA = vol.Schema( + { + vol.Optional( + CONFIG_LOOKBACK, default=DEFAULT_LOOKBACK, description=DESC_LOOKBACK + ): vol.All(int, vol.Range(min=0)), + vol.Optional( + CONFIG_IDLE_TIMEOUT, + default=DEFAULT_IDLE_TIMEOUT, + description=DESC_IDLE_TIMEOUT, + ): vol.All(int, vol.Range(min=0)), + vol.Optional( + CONFIG_MAX_RECORDING_TIME, + default=DEFAULT_MAX_RECORDING_TIME, + description=DESC_MAX_RECORDING_TIME, + ): vol.All(int, vol.Range(min=0)), + Deprecated( + CONFIG_RETAIN, + description=DESC_RETAIN, + message=DEPRECATED_RETAIN, + warning=WARNING_RETAIN, + ): vol.All(int, vol.Range(min=1)), + Deprecated( + CONFIG_FOLDER, + description=DESC_FOLDER, + message=DEPRECATED_FOLDER, + warning=WARNING_FOLDER, + ): str, + vol.Optional( + CONFIG_FILENAME_PATTERN, + default=DEFAULT_FILENAME_PATTERN, + description=DESC_FILENAME_PATTERN, + ): str, + Deprecated( + CONFIG_EXTENSION, + description=DESC_EXTENSION, + message=DEPRECATED_EXTENSION, + warning=WARNING_EXTENSION, + ): str, + vol.Optional( + CONFIG_THUMBNAIL, default=DEFAULT_THUMBNAIL, description=DESC_THUMBNAIL + ): vol.All(CoerceNoneToDict(), THUMBNAIL_SCHEMA), + vol.Optional( + CONFIG_STORAGE, + default=DEFAULT_STORAGE, + description=DESC_STORAGE, + ): Maybe( + { + vol.Required(CONFIG_TIERS, description=DESC_RECORDER_TIERS,): vol.All( + [TIER_SCHEMA_RECORDER], + vol.Length(min=1), + ) + }, + ), + vol.Optional( + CONFIG_CONTINUOUS, + default=DEFAULT_CONTINUOUS, + description=DESC_CONTINUOUS, + ): Maybe(TIER_SCHEMA_BASE), + vol.Optional( + CONFIG_EVENTS, + default=DEFAULT_EVENTS, + description=DESC_EVENTS, + ): Maybe(TIER_SCHEMA_BASE), + vol.Optional( + CONFIG_CREATE_EVENT_CLIP, + default=DEFAULT_CREATE_EVENT_CLIP, + description=DESC_CREATE_EVENT_CLIP, + ): bool, + } +) + +STILL_IMAGE_SCHEMA = vol.Schema( + { + vol.Optional( + CONFIG_URL, + default=DEFAULT_URL, + description=DESC_URL, + ): Maybe(str), + vol.Inclusive( + CONFIG_USERNAME, + INCLUSION_GROUP_AUTHENTICATION, + default=DEFAULT_USERNAME, + description=DESC_USERNAME, + ): Maybe(str), + vol.Inclusive( + CONFIG_PASSWORD, + INCLUSION_GROUP_AUTHENTICATION, + default=DEFAULT_PASSWORD, + description=DESC_PASSWORD, + ): Maybe(str), + vol.Optional( + CONFIG_AUTHENTICATION, + default=DEFAULT_AUTHENTICATION, + description=DESC_AUTHENTICATION, + ): Maybe(vol.In([AUTHENTICATION_BASIC, AUTHENTICATION_DIGEST])), + vol.Optional( + CONFIG_REFRESH_INTERVAL, + default=DEFAULT_REFRESH_INTERVAL, + description=DESC_REFRESH_INTERVAL, + ): vol.All(int, vol.Range(min=1)), + } +) + +BASE_CONFIG_SCHEMA = vol.Schema( + { + vol.Optional(CONFIG_NAME, default=DEFAULT_NAME, description=DESC_NAME): vol.All( + str, vol.Length(min=1) + ), + vol.Optional( + CONFIG_MJPEG_STREAMS, + default=DEFAULT_MJPEG_STREAMS, + description=DESC_MJPEG_STREAMS, + ): vol.All( + CoerceNoneToDict(), + {Slug(description=DESC_MJPEG_STREAM): MJPEG_STREAM_SCHEMA}, + ), + vol.Optional( + CONFIG_RECORDER, default=DEFAULT_RECORDER, description=DESC_RECORDER + ): vol.All(CoerceNoneToDict(), RECORDER_SCHEMA), + vol.Optional( + CONFIG_STILL_IMAGE, + default=DEFAULT_STILL_IMAGE, + description=DESC_STILL_IMAGE, + ): vol.All(CoerceNoneToDict(), STILL_IMAGE_SCHEMA), + } +) diff --git a/viseron/domains/camera/entity/image.py b/viseron/domains/camera/entity/image.py index 5a520c512..b4343a896 100644 --- a/viseron/domains/camera/entity/image.py +++ b/viseron/domains/camera/entity/image.py @@ -44,7 +44,6 @@ def extra_attributes(self): """Return extra attributes.""" return { "start_time": self._attr_start_time, - "path": self._attr_path, "thumbnail_path": self._attr_thumbnail_path, } @@ -52,7 +51,6 @@ def handle_event(self, event_data: Event[EventRecorderData]) -> None: """Handle recorder start event.""" recording = event_data.data.recording self._attr_start_time = recording.start_time.isoformat() - self._attr_path = recording.path self._attr_thumbnail_path = recording.thumbnail_path self._image = recording.thumbnail self.set_state() diff --git a/viseron/domains/camera/fragmenter.py b/viseron/domains/camera/fragmenter.py index a85c791f2..54997c85e 100644 --- a/viseron/domains/camera/fragmenter.py +++ b/viseron/domains/camera/fragmenter.py @@ -7,7 +7,6 @@ import re import shutil import subprocess as sp -import time import uuid from collections.abc import Callable from dataclasses import dataclass @@ -23,6 +22,7 @@ from viseron.components.storage.queries import get_time_period_fragments from viseron.const import CAMERA_SEGMENT_DURATION, TEMP_DIR, VISERON_SIGNAL_SHUTDOWN from viseron.domains.camera.const import CONFIG_FFMPEG_LOGLEVEL, CONFIG_RECORDER +from viseron.helpers import get_utc_offset from viseron.helpers.logs import LogPipe if TYPE_CHECKING: @@ -95,10 +95,9 @@ def _extract_program_date_time( # Remove timezone information since fromisoformat does not support it no_tz = re.sub(r"\+[0-9]{4}$", "", match.group(1)) # Adjust according to local timezone - return ( - datetime.datetime.fromisoformat(no_tz) - - datetime.timedelta(seconds=time.localtime().tm_gmtoff) - ).replace(tzinfo=datetime.timezone.utc) + return (datetime.datetime.fromisoformat(no_tz) - get_utc_offset()).replace( + tzinfo=datetime.timezone.utc + ) except ValueError: pass return None @@ -135,6 +134,7 @@ def __init__( seconds=1, id=self._fragment_job_id, max_instances=1, + coalesce=True, ) def _mp4box_command(self, file: str): @@ -214,9 +214,10 @@ def _write_files_metadata( if program_date_time: orig_ctime = program_date_time else: - orig_ctime = datetime.datetime.fromtimestamp( - int(file.split(".")[0]), tz=None - ) - datetime.timedelta(seconds=time.localtime().tm_gmtoff) + orig_ctime = ( + datetime.datetime.fromtimestamp(int(file.split(".")[0]), tz=None) + - get_utc_offset() + ) orig_ctime = orig_ctime.replace(tzinfo=datetime.timezone.utc) stmt = insert(FilesMeta).values( @@ -311,7 +312,8 @@ def _create_fragmented_mp4(self): def _shutdown(self) -> None: """Handle shutdown event.""" self._logger.debug("Shutting down fragment thread") - self._camera.stopped.wait() + if not self._camera.stopped.is_set(): + self._camera.stopped.wait(timeout=5) self._logger.debug("Camera stopped, running final fragmentation") self._create_fragmented_mp4() self._logger.debug("Fragment thread shutdown complete") diff --git a/viseron/domains/camera/recorder.py b/viseron/domains/camera/recorder.py index d9867d801..6da8a7097 100644 --- a/viseron/domains/camera/recorder.py +++ b/viseron/domains/camera/recorder.py @@ -5,8 +5,6 @@ import logging import os import shutil -import threading -import time from abc import ABC, abstractmethod from collections.abc import Callable, Sequence from dataclasses import dataclass @@ -24,7 +22,8 @@ from viseron.domains.camera.fragmenter import Fragment from viseron.domains.object_detector.detected_object import DetectedObject from viseron.events import EventData -from viseron.helpers import create_directory, draw_objects, utcnow +from viseron.helpers import create_directory, draw_objects, get_utc_offset, utcnow +from viseron.watchdog.thread_watchdog import RestartableThread from .const import ( CONFIG_CREATE_EVENT_CLIP, @@ -91,8 +90,6 @@ class Recording: end_time: datetime.datetime | None end_timestamp: float | None date: str - path: str - filename: str thumbnail: np.ndarray | None thumbnail_path: str | None objects: list[DetectedObject] @@ -106,8 +103,6 @@ def as_dict(self): "end_time": self.end_time, "end_timestamp": self.end_timestamp, "date": self.date, - "path": self.path, - "filename": self.filename, "thumbnail_path": self.thumbnail_path, "objects": self.objects, } @@ -212,9 +207,7 @@ def __init__(self, vis: Viseron, component, config, camera: AbstractCamera) -> N def as_dict(self) -> dict[str, dict[int, RecordingDict]]: """Return recorder information as dict.""" - return self.get_recordings( - datetime.timedelta(seconds=time.localtime().tm_gmtoff) - ) + return self.get_recordings(get_utc_offset()) def create_thumbnail( self, @@ -255,18 +248,6 @@ def start( self.is_recording = True start_time = utcnow() - # Create filename - filename_pattern = start_time.strftime( - self._config[CONFIG_RECORDER][CONFIG_FILENAME_PATTERN] - ) - video_name = f"{filename_pattern}.{self._camera.extension}" - - # Create foldername - full_path = os.path.join( - self._camera.recordings_folder, start_time.date().isoformat() - ) - create_directory(full_path) - with self._storage.get_session() as session: stmt = ( insert(Recordings) @@ -304,8 +285,6 @@ def start( end_time=None, end_timestamp=None, date=start_time.date().isoformat(), - path=os.path.join(full_path, video_name), - filename=video_name, thumbnail=thumbnail, thumbnail_path=thumbnail_path if self._config[CONFIG_RECORDER][CONFIG_THUMBNAIL][CONFIG_SAVE_TO_DISK] @@ -367,8 +346,11 @@ def stop(self, recording: Recording | None) -> None: self.is_recording = False if self._config[CONFIG_RECORDER][CONFIG_CREATE_EVENT_CLIP]: - concat_thread = threading.Thread( - target=self._concatenate_fragments, args=(recording,) + concat_thread = RestartableThread( + name=f"viseron.camera.{self._camera.identifier}.concatenate_fragments", + target=self._concatenate_fragments, + args=(recording,), + register=False, ) concat_thread.start() @@ -388,18 +370,32 @@ def _concatenate_fragments(self, recording: Recording) -> None: if not event_clip: return + # Create filename + filename_pattern = recording.start_time.strftime( + self._config[CONFIG_RECORDER][CONFIG_FILENAME_PATTERN] + ) + video_name = f"{filename_pattern}.{self._camera.extension}" + + # Create foldername + full_path = os.path.join( + self._camera.recordings_folder, recording.start_time.date().isoformat() + ) + + clip_path = os.path.join(full_path, video_name) + + create_directory(os.path.dirname(clip_path)) shutil.move( event_clip, - recording.path, + clip_path, ) - self._logger.debug(f"Moved event clip to {recording.path}") + self._logger.debug(f"Moved event clip to {clip_path}") with self._storage.get_session() as session: stmt = ( update(Recordings) .where(Recordings.id == recording.id) .values( - clip_path=recording.path, + clip_path=clip_path, ) ) session.execute(stmt) diff --git a/viseron/domains/face_recognition/__init__.py b/viseron/domains/face_recognition/__init__.py index 6d23d7f1d..f4a0e1d3a 100644 --- a/viseron/domains/face_recognition/__init__.py +++ b/viseron/domains/face_recognition/__init__.py @@ -20,6 +20,7 @@ from viseron.helpers import calculate_relative_coords from viseron.helpers.schemas import FLOAT_MIN_ZERO from viseron.helpers.validators import Deprecated +from viseron.types import SnapshotDomain from .binary_sensor import FaceDetectionBinarySensor from .const import ( @@ -147,7 +148,7 @@ def _save_face( if shared_frame: snapshot_path = self._camera.save_snapshot( shared_frame, - DOMAIN, + SnapshotDomain.FACE_RECOGNITION, zoom_coordinates=calculate_relative_coords( coordinates, self._camera.resolution ), diff --git a/viseron/domains/license_plate_recognition/__init__.py b/viseron/domains/license_plate_recognition/__init__.py index 401a85170..b2e038124 100644 --- a/viseron/domains/license_plate_recognition/__init__.py +++ b/viseron/domains/license_plate_recognition/__init__.py @@ -15,6 +15,7 @@ from viseron.domains.post_processor import BASE_CONFIG_SCHEMA, AbstractPostProcessor from viseron.events import EventData from viseron.helpers.schemas import FLOAT_MIN_ZERO, FLOAT_MIN_ZERO_MAX_ONE +from viseron.types import SnapshotDomain from .const import ( CONFIG_EXPIRE_AFTER, @@ -206,7 +207,7 @@ def _save_plate( if shared_frame: snapshot_path = self._camera.save_snapshot( shared_frame=shared_frame, - domain=DOMAIN, + domain=SnapshotDomain.LICENSE_PLATE_RECOGNITION, zoom_coordinates=plate.detected_object.rel_coordinates, bbox=plate.rel_coordinates, text=f"{plate.plate} {int(plate.confidence * 100)}%", diff --git a/viseron/domains/motion_detector/__init__.py b/viseron/domains/motion_detector/__init__.py index b3d47a85e..33a5ce6ed 100644 --- a/viseron/domains/motion_detector/__init__.py +++ b/viseron/domains/motion_detector/__init__.py @@ -66,6 +66,7 @@ FLOAT_MIN_ZERO_MAX_ONE, ) from viseron.helpers.validators import CameraIdentifier +from viseron.types import SnapshotDomain from viseron.watchdog.thread_watchdog import RestartableThread if TYPE_CHECKING: @@ -234,7 +235,7 @@ def _motion_detected_setter( if shared_frame: snapshot_path = self._camera.save_snapshot( shared_frame, - DOMAIN, + SnapshotDomain.MOTION_DETECTOR, ) self._insert_motion(snapshot_path) self._vis.dispatch_event( diff --git a/viseron/domains/object_detector/__init__.py b/viseron/domains/object_detector/__init__.py index 4998f22fc..d35518b30 100644 --- a/viseron/domains/object_detector/__init__.py +++ b/viseron/domains/object_detector/__init__.py @@ -32,6 +32,7 @@ FLOAT_MIN_ZERO_MAX_ONE, ) from viseron.helpers.validators import CameraIdentifier +from viseron.types import SnapshotDomain from viseron.watchdog.thread_watchdog import RestartableThread from .binary_sensor import ( @@ -409,7 +410,7 @@ def _insert_objects( if shared_frame: snapshot_path = self._camera.save_snapshot( shared_frame, - DOMAIN, + SnapshotDomain.OBJECT_DETECTOR, ( obj.rel_x1, obj.rel_y1, @@ -489,6 +490,9 @@ def _detect(self, shared_frame: SharedFrame, frame_time: float): frame_time = time.time() objects = self.return_objects(preprocessed_frame) + if objects is None: + return + self._inference_fps.append(1 / (time.time() - frame_time)) self.filter_fov(shared_frame, objects) @@ -503,7 +507,7 @@ def _detect(self, shared_frame: SharedFrame, frame_time: float): self._theoretical_max_fps.append(1 / (time.time() - frame_time)) @abstractmethod - def return_objects(self, frame) -> list[DetectedObject]: + def return_objects(self, frame) -> list[DetectedObject] | None: """Perform object detection.""" @property diff --git a/viseron/helpers/__init__.py b/viseron/helpers/__init__.py index 0a4410577..c1575890d 100644 --- a/viseron/helpers/__init__.py +++ b/viseron/helpers/__init__.py @@ -8,6 +8,7 @@ import multiprocessing as mp import os import socket +import time import tracemalloc import urllib.parse from queue import Full, Queue @@ -32,6 +33,11 @@ def utcnow() -> datetime.datetime: return datetime.datetime.now(tz=datetime.timezone.utc) +def get_utc_offset() -> datetime.timedelta: + """Return the current UTC offset.""" + return datetime.timedelta(seconds=time.localtime().tm_gmtoff) + + def daterange_to_utc( date: str, utc_offset: datetime.timedelta ) -> tuple[datetime.datetime, datetime.datetime]: diff --git a/viseron/helpers/child_process_worker.py b/viseron/helpers/child_process_worker.py index ddbdaf0de..786a17afb 100644 --- a/viseron/helpers/child_process_worker.py +++ b/viseron/helpers/child_process_worker.py @@ -13,6 +13,7 @@ from viseron.const import VISERON_SIGNAL_SHUTDOWN from viseron.helpers import pop_if_full from viseron.helpers.mprt_monkeypatch import remove_shm_from_resource_tracker +from viseron.watchdog.process_watchdog import RestartableProcess from viseron.watchdog.thread_watchdog import RestartableThread if TYPE_CHECKING: @@ -44,7 +45,7 @@ def __init__(self, vis: Viseron, name) -> None: ) input_thread.start() - self._output_queue: Any = mp.Queue(maxsize=100) + self._output_queue: mp.Queue = mp.Queue(maxsize=100) output_thread = RestartableThread( target=self._process_output_queue, name=f"child_process.{self._name}.output_thread", @@ -53,8 +54,31 @@ def __init__(self, vis: Viseron, name) -> None: ) output_thread.start() - self._process_queue: Any = mp.Queue(maxsize=100) - self._process_frames_proc = mp.Process( + self._process_queue: mp.Queue = mp.Queue(maxsize=100) + self._process_frames_proc = RestartableProcess( + name=self.child_process_name, + create_process_method=self.create_process, + ) + self._process_frames_proc.start() + + vis.register_signal_handler(VISERON_SIGNAL_SHUTDOWN, self.stop) + + def create_process(self) -> mp.Process: + """Return process used by RestartableProcess. + + This method is called by RestartableProcess to create a new process. + Queue is recreated for each new process to avoid freezes when the process is + killed. + Also closes the old queues to avoid freezing Viseron restarts. + """ + LOGGER.debug(f"Setting process queues for {self.child_process_name}") + if self._process_queue: + self._process_queue.close() + if self._output_queue: + self._output_queue.close() + self._process_queue = mp.Queue(maxsize=100) + self._output_queue = mp.Queue(maxsize=100) + return mp.Process( target=self._process_frames, name=self.child_process_name, args=( @@ -64,9 +88,6 @@ def __init__(self, vis: Viseron, name) -> None: ), daemon=True, ) - self._process_frames_proc.start() - - vis.register_signal_handler(VISERON_SIGNAL_SHUTDOWN, self.stop) @property def child_process_name(self) -> str: diff --git a/viseron/types.py b/viseron/types.py index d14adc623..5feede996 100644 --- a/viseron/types.py +++ b/viseron/types.py @@ -1,5 +1,6 @@ """Viseron types.""" +import enum from typing import Literal SupportedDomains = Literal[ @@ -12,4 +13,14 @@ "object_detector", ] + +class SnapshotDomain(enum.Enum): + """Snapshot domains.""" + + FACE_RECOGNITION = "face_recognition" + LICENSE_PLATE_RECOGNITION = "license_plate_recognition" + MOTION_DETECTOR = "motion_detector" + OBJECT_DETECTOR = "object_detector" + + DatabaseOperations = Literal["insert", "update", "delete"] diff --git a/viseron/watchdog/__init__.py b/viseron/watchdog/__init__.py index 26cdb5e1c..a39a72cba 100644 --- a/viseron/watchdog/__init__.py +++ b/viseron/watchdog/__init__.py @@ -4,12 +4,6 @@ import logging from abc import ABC, abstractmethod -from apscheduler.schedulers import ( - SchedulerAlreadyRunningError, - SchedulerNotRunningError, -) -from apscheduler.schedulers.background import BackgroundScheduler - LOGGER = logging.getLogger(__name__) @@ -17,14 +11,9 @@ class WatchDog(ABC): """A watchdog for long running items.""" registered_items: list = [] - _scheduler = BackgroundScheduler(timezone="UTC", daemon=True) def __init__(self) -> None: - try: - self._scheduler.start() - LOGGER.debug("Starting scheduler") - except SchedulerAlreadyRunningError: - pass + """Initialize the watchdog.""" @classmethod def register( @@ -50,11 +39,3 @@ def unregister( @abstractmethod def watchdog(self): """Watchdog.""" - - def stop(self) -> None: - """Stop the watchdog.""" - try: - self._scheduler.shutdown() - LOGGER.debug("Stopping scheduler") - except SchedulerNotRunningError: - pass diff --git a/viseron/watchdog/process_watchdog.py b/viseron/watchdog/process_watchdog.py index b8342b2a2..c2bc2c076 100644 --- a/viseron/watchdog/process_watchdog.py +++ b/viseron/watchdog/process_watchdog.py @@ -3,10 +3,16 @@ import logging import multiprocessing as mp +from collections.abc import Callable +from typing import TYPE_CHECKING +from viseron.const import VISERON_SIGNAL_SHUTDOWN from viseron.helpers import utcnow from viseron.watchdog import WatchDog +if TYPE_CHECKING: + from viseron import Viseron + LOGGER = logging.getLogger(__name__) @@ -18,16 +24,28 @@ class RestartableProcess: """ def __init__( - self, *args, name=None, grace_period=20, register=True, **kwargs + self, + *args, + name=None, + grace_period=20, + register=True, + stage: str | None = VISERON_SIGNAL_SHUTDOWN, + create_process_method: Callable[[], mp.Process] | None = None, + **kwargs, ) -> None: self._args = args self._name = name self._grace_period = grace_period self._kwargs = kwargs + self._kwargs["name"] = name self._process: mp.Process | None = None self._started = False self._start_time: float | None = None self._register = register + self._create_process_method = create_process_method + if self._register: + ProcessWatchDog.register(self) + setattr(self, "__stage__", stage) def __getattr__(self, attr): """Forward all undefined attribute calls to mp.Process.""" @@ -69,15 +87,16 @@ def exitcode(self) -> int | None: def start(self) -> None: """Start the process.""" - self._process = mp.Process( - *self._args, - **self._kwargs, - ) + if self._create_process_method: + self._process = self._create_process_method() + else: + self._process = mp.Process( + *self._args, + **self._kwargs, + ) self._start_time = utcnow().timestamp() self._started = True self._process.start() - if self._register: - ProcessWatchDog.register(self) def restart(self, timeout: float | None = None) -> None: """Restart the process.""" @@ -119,9 +138,11 @@ class ProcessWatchDog(WatchDog): registered_items: list[RestartableProcess] = [] - def __init__(self) -> None: + def __init__(self, vis: Viseron) -> None: super().__init__() - self._scheduler.add_job(self.watchdog, "interval", seconds=15) + vis.background_scheduler.add_job( + self.watchdog, "interval", name="process_watchdog", seconds=15 + ) def watchdog(self) -> None: """Check for stopped processes and restart them.""" diff --git a/viseron/watchdog/subprocess_watchdog.py b/viseron/watchdog/subprocess_watchdog.py index dc8eabb88..84b00e5a9 100644 --- a/viseron/watchdog/subprocess_watchdog.py +++ b/viseron/watchdog/subprocess_watchdog.py @@ -3,10 +3,14 @@ import logging import subprocess as sp +from typing import TYPE_CHECKING +from viseron.const import VISERON_SIGNAL_SHUTDOWN from viseron.helpers import utcnow from viseron.watchdog import WatchDog +if TYPE_CHECKING: + from viseron import Viseron LOGGER = logging.getLogger(__name__) @@ -18,7 +22,13 @@ class RestartablePopen: """ def __init__( - self, *args, name=None, grace_period=20, register=True, **kwargs + self, + *args, + name=None, + grace_period=20, + register=True, + stage: str | None = VISERON_SIGNAL_SHUTDOWN, + **kwargs, ) -> None: self._args = args self._name = name @@ -29,6 +39,7 @@ def __init__( self.start() if register: SubprocessWatchDog.register(self) + setattr(self, "__stage__", stage) def __getattr__(self, attr): """Forward all undefined attribute calls to sp.Popen.""" @@ -96,9 +107,11 @@ class SubprocessWatchDog(WatchDog): registered_items: list[RestartablePopen] = [] - def __init__(self) -> None: + def __init__(self, vis: Viseron) -> None: super().__init__() - self._scheduler.add_job(self.watchdog, "interval", seconds=15) + vis.background_scheduler.add_job( + self.watchdog, "interval", name="subprocess_watchdog", seconds=15 + ) def watchdog(self) -> None: """Check for stopped processes and restart them.""" diff --git a/viseron/watchdog/thread_watchdog.py b/viseron/watchdog/thread_watchdog.py index 47bc380b9..07ecebd5e 100644 --- a/viseron/watchdog/thread_watchdog.py +++ b/viseron/watchdog/thread_watchdog.py @@ -1,12 +1,16 @@ """Watchdog for long-running threads.""" +from __future__ import annotations + import logging import threading from collections.abc import Callable -from typing import overload +from typing import TYPE_CHECKING, overload from viseron.const import VISERON_SIGNAL_SHUTDOWN from viseron.watchdog import WatchDog +if TYPE_CHECKING: + from viseron import Viseron LOGGER = logging.getLogger(__name__) @@ -123,7 +127,7 @@ def __init__( self._base_class = base_class self._base_class_args = base_class_args self._stage = stage - setattr(self, "stage", stage) + setattr(self, "__stage__", stage) @property def started(self): @@ -188,9 +192,11 @@ class ThreadWatchDog(WatchDog): registered_items: list[RestartableThread] = [] - def __init__(self) -> None: + def __init__(self, vis: Viseron) -> None: super().__init__() - self._scheduler.add_job(self.watchdog, "interval", seconds=15) + vis.background_scheduler.add_job( + self.watchdog, "interval", name="thread_watchdog", seconds=15 + ) def watchdog(self) -> None: """Check for stopped threads and restart them."""