diff --git a/CHANGELOG.md b/CHANGELOG.md index 113f3783..af9950e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,28 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## 7.5.9 + +### Added + +* In the `unstable` package: Task description +* In the `unstable` package: Convenience methods to create scheduled tasks (ie, `from_interval`, `from_cron`) +* In the `unstable` package: A `TaskContext` class, which exposes a logging interface to use for logging and error + reporting within a task. + +### Changed + +* In the `unstable` package: Don't base tasks on dataclasses +* In the `unstable` package: Error reporting has changed from a single `error(level=..., description=..., ...)` method + to several more specific methods, with the goal of being more convenient to use. E.g. `self.fatal(desc)` or + `self.warning(desc)`. As well as separating out `begin_error`, `begin_warning` etc for long-lasting errors, to avoid + the `self.warning(...).instant()` workaround. +* In the `unstable` package: Change task signature to take a `TaskContext` as an arg + +### Fixed + +* In the `unstable` package: Correctly set the `action` attribute on reported tasks + ## 7.5.8 ### Changed diff --git a/cognite/extractorutils/__init__.py b/cognite/extractorutils/__init__.py index e531ba97..5fa33026 100644 --- a/cognite/extractorutils/__init__.py +++ b/cognite/extractorutils/__init__.py @@ -16,7 +16,7 @@ Cognite extractor utils is a Python package that simplifies the development of new extractors. """ -__version__ = "7.5.8" +__version__ = "7.5.9" from .base import Extractor __all__ = ["Extractor"] diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index b00dcf46..40dd4b50 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -2,11 +2,10 @@ import logging.config import time from concurrent.futures import ThreadPoolExecutor -from contextvars import ContextVar, Token +from functools import partial from logging.handlers import TimedRotatingFileHandler from multiprocessing import Queue from threading import RLock, Thread -from traceback import format_exception from types import TracebackType from typing import Generic, Literal, TypeVar @@ -25,8 +24,9 @@ from cognite.extractorutils.unstable.core._dto import TaskUpdate from cognite.extractorutils.unstable.core._messaging import RuntimeMessage from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel +from cognite.extractorutils.unstable.core.logger import CogniteLogger from cognite.extractorutils.unstable.core.restart_policy import WHEN_CONTINUOUS_TASKS_CRASHES, RestartPolicy -from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task +from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task, TaskContext from cognite.extractorutils.unstable.scheduling import TaskScheduler from cognite.extractorutils.util import now @@ -51,7 +51,7 @@ def __init__( self.current_config_revision = current_config_revision -class Extractor(Generic[ConfigType]): +class Extractor(Generic[ConfigType], CogniteLogger): NAME: str EXTERNAL_ID: str DESCRIPTION: str @@ -62,6 +62,8 @@ class Extractor(Generic[ConfigType]): RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES def __init__(self, config: FullConfig[ConfigType]) -> None: + self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") + self.cancellation_token = CancellationToken() self.cancellation_token.cancel_on_interrupt() @@ -80,10 +82,6 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: self._task_updates: list[TaskUpdate] = [] self._errors: dict[str, Error] = {} - self.logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") - - self._current_task: ContextVar[str | None] = ContextVar("current_task", default=None) - self.__init_tasks__() def _setup_logging(self) -> None: @@ -175,36 +173,34 @@ def _checkin(self) -> None: def _run_checkin(self) -> None: while not self.cancellation_token.is_cancelled: try: - self.logger.debug("Running checkin") + self._logger.debug("Running checkin") self._checkin() except Exception: - self.logger.exception("Error during checkin") + self._logger.exception("Error during checkin") self.cancellation_token.wait(10) def _report_error(self, error: Error) -> None: with self._checkin_lock: self._errors[error.external_id] = error - def error( + def _new_error( self, level: ErrorLevel, description: str, - details: str | None = None, *, - force_global: bool = False, + details: str | None = None, + task_name: str | None = None, ) -> Error: - task_name = self._current_task.get() - return Error( level=level, description=description, details=details, extractor=self, - task_name=None if force_global else task_name, + task_name=task_name, ) def restart(self) -> None: - self.logger.info("Restarting extractor") + self._logger.info("Restarting extractor") if self._runtime_messages: self._runtime_messages.put(RuntimeMessage.RESTART) self.cancellation_token.cancel() @@ -217,7 +213,7 @@ def add_task(self, task: Task) -> None: # Store this for later, since we'll override it with the wrapped version target = task.target - def run_task() -> None: + def run_task(task_context: TaskContext) -> None: """ A wrapped version of the task's target, with tracking and error handling """ @@ -227,33 +223,22 @@ def run_task() -> None: TaskUpdate(type="started", name=task.name, timestamp=now()), ) - context_token: Token[str | None] | None = None - try: - # Set the current task context var, used to track that we're in a task for error reporting - context_token = self._current_task.set(task.name) - # Run task - target() + target(task_context) except Exception as e: - self.logger.exception(f"Unexpected error in {task.name}") - # Task crashed, record it as a fatal error - self.error( - ErrorLevel.fatal, - description="Task crashed unexpectedly", - details="".join(format_exception(e)), - ).instant() + task_context.exception( + f"Task {task.name} crashed unexpectedly", + e, + level=ErrorLevel.fatal, + ) if self.__class__.RESTART_POLICY(task, e): self.restart() finally: - # Unset the current task - if context_token is not None: - self._current_task.reset(context_token) - # Record task end with self._checkin_lock: self._task_updates.append( @@ -265,7 +250,16 @@ def run_task() -> None: match task: case ScheduledTask() as t: - self._scheduler.schedule_task(name=t.name, schedule=t.schedule, task=t.target) + self._scheduler.schedule_task( + name=t.name, + schedule=t.schedule, + task=lambda: t.target( + TaskContext( + task=task, + extractor=self, + ) + ), + ) def _report_extractor_info(self) -> None: self.cognite_client.post( @@ -281,6 +275,8 @@ def _report_extractor_info(self) -> None: { "name": t.name, "type": "continuous" if isinstance(t, ContinuousTask) else "batch", + "action": True if isinstance(t, ScheduledTask) else False, + "description": t.description, } for t in self._tasks ], @@ -310,7 +306,7 @@ def __exit__( with self._checkin_lock: self._checkin() - self.logger.info("Shutting down extractor") + self._logger.info("Shutting down extractor") return exc_val is None def run(self) -> None: @@ -333,15 +329,27 @@ def run(self) -> None: case _: assert_never(task) - self.logger.info("Starting extractor") + self._logger.info("Starting extractor") if startup: with ThreadPoolExecutor() as pool: for task in startup: - pool.submit(task.target) - self.logger.info("Startup done") + pool.submit( + partial( + task.target, + TaskContext( + task=task, + extractor=self, + ), + ) + ) + self._logger.info("Startup done") for task in continuous: - Thread(name=pascalize(task.name), target=task.target).start() + Thread( + name=pascalize(task.name), + target=task.target, + args=(TaskContext(task=task, extractor=self),), + ).start() if has_scheduled: self._scheduler.run() diff --git a/cognite/extractorutils/unstable/core/errors.py b/cognite/extractorutils/unstable/core/errors.py index d6b0b40a..01540e2e 100644 --- a/cognite/extractorutils/unstable/core/errors.py +++ b/cognite/extractorutils/unstable/core/errors.py @@ -1,11 +1,14 @@ -import typing +import logging from enum import Enum from types import TracebackType +from typing import TYPE_CHECKING from uuid import uuid4 +from typing_extensions import assert_never + from cognite.extractorutils.util import now -if typing.TYPE_CHECKING: +if TYPE_CHECKING: from .base import Extractor __all__ = ["Error", "ErrorLevel"] @@ -16,6 +19,18 @@ class ErrorLevel(Enum): error = "error" fatal = "fatal" + @property + def log_level(self) -> int: + match self: + case ErrorLevel.warning: + return logging.WARNING + case ErrorLevel.error: + return logging.ERROR + case ErrorLevel.fatal: + return logging.CRITICAL + case _: + assert_never(self) + class Error: def __init__( diff --git a/cognite/extractorutils/unstable/core/logger.py b/cognite/extractorutils/unstable/core/logger.py new file mode 100644 index 00000000..159e83a6 --- /dev/null +++ b/cognite/extractorutils/unstable/core/logger.py @@ -0,0 +1,149 @@ +from abc import ABC, abstractmethod +from logging import Logger, getLogger +from traceback import format_exception +from typing import Literal + +from typing_extensions import assert_never + +from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel + + +class CogniteLogger(ABC): + def __init__(self) -> None: + self._logger: Logger = getLogger() + + @abstractmethod + def _new_error( + self, + level: ErrorLevel, + description: str, + *, + details: str | None = None, + task_name: str | None = None, + ) -> Error: + pass + + def debug(self, message: str) -> None: + self._logger.debug(message) + + def info(self, message: str) -> None: + self._logger.info(message) + + def begin_warning( + self, + message: str, + *, + details: str | None = None, + auto_log: bool = True, + ) -> Error: + if auto_log: + self._logger.warning(message) + return self._new_error( + level=ErrorLevel.warning, + description=message, + details=details, + ) + + def begin_error( + self, + message: str, + *, + details: str | None = None, + auto_log: bool = True, + ) -> Error: + if auto_log: + self._logger.error(message) + return self._new_error( + level=ErrorLevel.error, + description=message, + details=details, + ) + + def begin_fatal( + self, + message: str, + *, + details: str | None = None, + auto_log: bool = True, + ) -> Error: + if auto_log: + self._logger.critical(message) + return self._new_error( + level=ErrorLevel.fatal, + description=message, + details=details, + ) + + def warning( + self, + message: str, + *, + details: str | None = None, + auto_log: bool = True, + ) -> None: + if auto_log: + self._logger.warning(message) + self._new_error( + level=ErrorLevel.warning, + description=message, + details=details, + ).instant() + + def error( + self, + message: str, + *, + details: str | None = None, + auto_log: bool = True, + ) -> None: + if auto_log: + self._logger.error(message) + self._new_error( + level=ErrorLevel.error, + description=message, + details=details, + ).instant() + + def fatal( + self, + message: str, + *, + details: str | None = None, + auto_log: bool = True, + ) -> None: + if auto_log: + self._logger.critical(message) + self._new_error( + level=ErrorLevel.fatal, + description=message, + details=details, + ).instant() + + def exception( + self, + message: str, + exception: Exception, + *, + level: ErrorLevel = ErrorLevel.error, + include_details: Literal["stack_trace"] | Literal["exception_message"] | bool = "exception_message", + auto_log: bool = True, + ) -> None: + if auto_log: + self._logger.log(level=level.log_level, msg=message, exc_info=exception) + + details: str | None + match include_details: + case "stack_trace": + details = "".join(format_exception(exception)) + case "exception_message" | True: + details = str(exception) + case False: + details = None + case _: + assert_never(include_details) + + self._new_error( + level=level, + description=message, + details=details, + ).instant() diff --git a/cognite/extractorutils/unstable/core/tasks.py b/cognite/extractorutils/unstable/core/tasks.py index 1ae02618..0736bfcc 100644 --- a/cognite/extractorutils/unstable/core/tasks.py +++ b/cognite/extractorutils/unstable/core/tasks.py @@ -1,30 +1,115 @@ -from abc import ABC +import logging from collections.abc import Callable -from dataclasses import dataclass +from typing import TYPE_CHECKING -from cognite.extractorutils.unstable.configuration.models import ScheduleConfig +from cognite.extractorutils.unstable.configuration.models import ( + CronConfig, + IntervalConfig, + ScheduleConfig, + TimeIntervalConfig, +) +from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel +from cognite.extractorutils.unstable.core.logger import CogniteLogger -__all__ = ["ScheduledTask", "ContinuousTask", "StartupTask", "Task"] +if TYPE_CHECKING: + from cognite.extractorutils.unstable.core.base import Extractor +__all__ = ["ScheduledTask", "ContinuousTask", "StartupTask", "Task", "TaskContext"] -@dataclass -class _Task(ABC): - name: str - target: Callable[[], None] + +class TaskContext(CogniteLogger): + def __init__(self, task: "Task", extractor: "Extractor"): + super().__init__() + self._task = task + self._extractor = extractor + + self._logger = logging.getLogger(f"{self._extractor.EXTERNAL_ID}.{self._task.name.replace(' ', '')}") + + def _new_error( + self, + level: ErrorLevel, + description: str, + *, + details: str | None = None, + task_name: str | None = None, + ) -> Error: + return self._extractor._new_error( + level=level, + description=description, + details=details, + task_name=self._task.name, + ) + + +TaskTarget = Callable[[TaskContext], None] + + +class _Task: + def __init__( + self, + *, + name: str, + target: TaskTarget, + description: str | None = None, + ) -> None: + self.name = name + self.target = target + self.description = description -@dataclass class ScheduledTask(_Task): - schedule: ScheduleConfig + def __init__( + self, + *, + name: str, + target: TaskTarget, + description: str | None = None, + schedule: ScheduleConfig, + ): + super().__init__(name=name, target=target, description=description) + self.schedule = schedule + + @classmethod + def from_interval( + cls, *, interval: str, name: str, target: TaskTarget, description: str | None = None + ) -> "ScheduledTask": + return ScheduledTask( + name=name, + target=target, + description=description, + schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig(interval)), + ) + + @classmethod + def from_cron(cls, *, cron: str, name: str, target: TaskTarget, description: str | None = None) -> "ScheduledTask": + return ScheduledTask( + name=name, + target=target, + description=description, + schedule=CronConfig(type="cron", expression=cron), + ) -@dataclass class ContinuousTask(_Task): - pass + def __init__( + self, + *, + name: str, + target: TaskTarget, + description: str | None = None, + ) -> None: + super().__init__(name=name, target=target, description=description) class StartupTask(_Task): - pass + def __init__( + self, + *, + name: str, + target: TaskTarget, + description: str | None = None, + ) -> None: + super().__init__(name=name, target=target, description=description) # Making a type union to help with exhaustion checks in matches diff --git a/cognite/extractorutils/unstable/scheduling/_scheduler.py b/cognite/extractorutils/unstable/scheduling/_scheduler.py index afb983b6..cf3e4abb 100644 --- a/cognite/extractorutils/unstable/scheduling/_scheduler.py +++ b/cognite/extractorutils/unstable/scheduling/_scheduler.py @@ -71,7 +71,7 @@ def wrap() -> None: with self._running_lock: self._running.remove(job) - Thread(target=wrap, name=f"{pascalize(job.name)}").start() + Thread(target=wrap, name=f"{pascalize(job.name.replace(' ', '_'))}").start() return True def trigger(self, name: str) -> bool: diff --git a/pyproject.toml b/pyproject.toml index e1be5cbb..1a68d9f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cognite-extractor-utils" -version = "7.5.8" +version = "7.5.9" description = "Utilities for easier development of extractors for CDF" authors = [ {name = "Mathias Lohne", email = "mathias.lohne@cognite.com"} diff --git a/tests/test_unstable/test_base.py b/tests/test_unstable/test_base.py index 88f524f5..c15420cb 100644 --- a/tests/test_unstable/test_base.py +++ b/tests/test_unstable/test_base.py @@ -30,7 +30,7 @@ def test_simple_task_report( extractor.add_task( ScheduledTask( name="TestTask", - target=mock, + target=lambda _t: mock(), schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("15m")), ) ) diff --git a/tests/test_unstable/test_errors.py b/tests/test_unstable/test_errors.py index 5e00628c..e5b8e4f4 100644 --- a/tests/test_unstable/test_errors.py +++ b/tests/test_unstable/test_errors.py @@ -2,10 +2,10 @@ import pytest -from cognite.extractorutils.unstable.configuration.models import ConnectionConfig, IntervalConfig, TimeIntervalConfig +from cognite.extractorutils.unstable.configuration.models import ConnectionConfig from cognite.extractorutils.unstable.core.base import FullConfig from cognite.extractorutils.unstable.core.errors import ErrorLevel -from cognite.extractorutils.unstable.core.tasks import ScheduledTask +from cognite.extractorutils.unstable.core.tasks import ScheduledTask, TaskContext from test_unstable.conftest import TestConfig, TestExtractor @@ -21,7 +21,7 @@ def test_global_error( ) ) - err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") + err = extractor.begin_error("Oh no!", details="There was an error") assert len(extractor._errors) == 1 assert err.external_id in extractor._errors @@ -51,7 +51,7 @@ def test_instant_error( ) ) - err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") + err = extractor.begin_error("Oh no!", details="There was an error") assert len(extractor._errors) == 1 assert err.external_id in extractor._errors @@ -78,19 +78,16 @@ def test_task_error( ) ) - def task() -> None: + def task(tc: TaskContext) -> None: sleep(0.05) - extractor.error(level=ErrorLevel.warning, description="Hey now").instant() + tc.warning("Hey now") sleep(0.05) extractor.add_task( - ScheduledTask( - "TestTask", + ScheduledTask.from_interval( + interval="15m", + name="TestTask", target=task, - schedule=IntervalConfig( - type="interval", - expression=TimeIntervalConfig("15m"), - ), ) ) @@ -122,18 +119,15 @@ def test_crashing_task( ) ) - def task() -> None: + def task(_tc: TaskContext) -> None: sleep(0.05) raise ValueError("Try catching this!") extractor.add_task( - ScheduledTask( - "TestTask", + ScheduledTask.from_interval( + interval="15m", + name="TestTask", target=task, - schedule=IntervalConfig( - type="interval", - expression=TimeIntervalConfig("15m"), - ), ) ) @@ -146,7 +140,7 @@ def task() -> None: assert len(extractor._errors) == 1 error = list(extractor._errors.values())[0] - assert error.description == "Task crashed unexpectedly" + assert error.description == "Task TestTask crashed unexpectedly" assert error.level == ErrorLevel.fatal # Make sure error was recorded as a task error @@ -167,7 +161,7 @@ def test_reporting_errors( ) ) - err = extractor.error(level=ErrorLevel.error, description="Oh no!", details="There was an error") + err = extractor.begin_error("Oh no!", details="There was an error") assert len(extractor._errors) == 1 assert err.external_id in extractor._errors