Skip to content

Commit

Permalink
Unstable: Update tasks and errors (#423)
Browse files Browse the repository at this point in the history
Several updates to the `unstable` package. See the changelog for a complete list of stuff that has changed
  • Loading branch information
mathialo authored Jan 22, 2025
1 parent 585ed5d commit 5663a39
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 81 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## 7.5.9

### Added

* In the `unstable` package: Task description
* In the `unstable` package: Convenience methods to create scheduled tasks (ie, `from_interval`, `from_cron`)
* In the `unstable` package: A `TaskContext` class, which exposes a logging interface to use for logging and error
reporting within a task.

### Changed

* In the `unstable` package: Don't base tasks on dataclasses
* In the `unstable` package: Error reporting has changed from a single `error(level=..., description=..., ...)` method
to several more specific methods, with the goal of being more convenient to use. E.g. `self.fatal(desc)` or
`self.warning(desc)`. As well as separating out `begin_error`, `begin_warning` etc for long-lasting errors, to avoid
the `self.warning(...).instant()` workaround.
* In the `unstable` package: Change task signature to take a `TaskContext` as an arg

### Fixed

* In the `unstable` package: Correctly set the `action` attribute on reported tasks

## 7.5.8

### Changed
Expand Down
2 changes: 1 addition & 1 deletion cognite/extractorutils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Cognite extractor utils is a Python package that simplifies the development of new extractors.
"""

__version__ = "7.5.8"
__version__ = "7.5.9"
from .base import Extractor

__all__ = ["Extractor"]
90 changes: 49 additions & 41 deletions cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
import logging.config
import time
from concurrent.futures import ThreadPoolExecutor
from contextvars import ContextVar, Token
from functools import partial
from logging.handlers import TimedRotatingFileHandler
from multiprocessing import Queue
from threading import RLock, Thread
from traceback import format_exception
from types import TracebackType
from typing import Generic, Literal, TypeVar

Expand All @@ -25,8 +24,9 @@
from cognite.extractorutils.unstable.core._dto import TaskUpdate
from cognite.extractorutils.unstable.core._messaging import RuntimeMessage
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.logger import CogniteLogger
from cognite.extractorutils.unstable.core.restart_policy import WHEN_CONTINUOUS_TASKS_CRASHES, RestartPolicy
from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task
from cognite.extractorutils.unstable.core.tasks import ContinuousTask, ScheduledTask, StartupTask, Task, TaskContext
from cognite.extractorutils.unstable.scheduling import TaskScheduler
from cognite.extractorutils.util import now

Expand All @@ -51,7 +51,7 @@ def __init__(
self.current_config_revision = current_config_revision


class Extractor(Generic[ConfigType]):
class Extractor(Generic[ConfigType], CogniteLogger):
NAME: str
EXTERNAL_ID: str
DESCRIPTION: str
Expand All @@ -62,6 +62,8 @@ class Extractor(Generic[ConfigType]):
RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES

def __init__(self, config: FullConfig[ConfigType]) -> None:
self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")

self.cancellation_token = CancellationToken()
self.cancellation_token.cancel_on_interrupt()

Expand All @@ -80,10 +82,6 @@ def __init__(self, config: FullConfig[ConfigType]) -> None:
self._task_updates: list[TaskUpdate] = []
self._errors: dict[str, Error] = {}

self.logger = logging.getLogger(f"{self.EXTERNAL_ID}.main")

self._current_task: ContextVar[str | None] = ContextVar("current_task", default=None)

self.__init_tasks__()

def _setup_logging(self) -> None:
Expand Down Expand Up @@ -175,36 +173,34 @@ def _checkin(self) -> None:
def _run_checkin(self) -> None:
while not self.cancellation_token.is_cancelled:
try:
self.logger.debug("Running checkin")
self._logger.debug("Running checkin")
self._checkin()
except Exception:
self.logger.exception("Error during checkin")
self._logger.exception("Error during checkin")
self.cancellation_token.wait(10)

def _report_error(self, error: Error) -> None:
with self._checkin_lock:
self._errors[error.external_id] = error

def error(
def _new_error(
self,
level: ErrorLevel,
description: str,
details: str | None = None,
*,
force_global: bool = False,
details: str | None = None,
task_name: str | None = None,
) -> Error:
task_name = self._current_task.get()

return Error(
level=level,
description=description,
details=details,
extractor=self,
task_name=None if force_global else task_name,
task_name=task_name,
)

def restart(self) -> None:
self.logger.info("Restarting extractor")
self._logger.info("Restarting extractor")
if self._runtime_messages:
self._runtime_messages.put(RuntimeMessage.RESTART)
self.cancellation_token.cancel()
Expand All @@ -217,7 +213,7 @@ def add_task(self, task: Task) -> None:
# Store this for later, since we'll override it with the wrapped version
target = task.target

def run_task() -> None:
def run_task(task_context: TaskContext) -> None:
"""
A wrapped version of the task's target, with tracking and error handling
"""
Expand All @@ -227,33 +223,22 @@ def run_task() -> None:
TaskUpdate(type="started", name=task.name, timestamp=now()),
)

context_token: Token[str | None] | None = None

try:
# Set the current task context var, used to track that we're in a task for error reporting
context_token = self._current_task.set(task.name)

# Run task
target()
target(task_context)

except Exception as e:
self.logger.exception(f"Unexpected error in {task.name}")

# Task crashed, record it as a fatal error
self.error(
ErrorLevel.fatal,
description="Task crashed unexpectedly",
details="".join(format_exception(e)),
).instant()
task_context.exception(
f"Task {task.name} crashed unexpectedly",
e,
level=ErrorLevel.fatal,
)

if self.__class__.RESTART_POLICY(task, e):
self.restart()

finally:
# Unset the current task
if context_token is not None:
self._current_task.reset(context_token)

# Record task end
with self._checkin_lock:
self._task_updates.append(
Expand All @@ -265,7 +250,16 @@ def run_task() -> None:

match task:
case ScheduledTask() as t:
self._scheduler.schedule_task(name=t.name, schedule=t.schedule, task=t.target)
self._scheduler.schedule_task(
name=t.name,
schedule=t.schedule,
task=lambda: t.target(
TaskContext(
task=task,
extractor=self,
)
),
)

def _report_extractor_info(self) -> None:
self.cognite_client.post(
Expand All @@ -281,6 +275,8 @@ def _report_extractor_info(self) -> None:
{
"name": t.name,
"type": "continuous" if isinstance(t, ContinuousTask) else "batch",
"action": True if isinstance(t, ScheduledTask) else False,
"description": t.description,
}
for t in self._tasks
],
Expand Down Expand Up @@ -310,7 +306,7 @@ def __exit__(
with self._checkin_lock:
self._checkin()

self.logger.info("Shutting down extractor")
self._logger.info("Shutting down extractor")
return exc_val is None

def run(self) -> None:
Expand All @@ -333,15 +329,27 @@ def run(self) -> None:
case _:
assert_never(task)

self.logger.info("Starting extractor")
self._logger.info("Starting extractor")
if startup:
with ThreadPoolExecutor() as pool:
for task in startup:
pool.submit(task.target)
self.logger.info("Startup done")
pool.submit(
partial(
task.target,
TaskContext(
task=task,
extractor=self,
),
)
)
self._logger.info("Startup done")

for task in continuous:
Thread(name=pascalize(task.name), target=task.target).start()
Thread(
name=pascalize(task.name),
target=task.target,
args=(TaskContext(task=task, extractor=self),),
).start()

if has_scheduled:
self._scheduler.run()
Expand Down
19 changes: 17 additions & 2 deletions cognite/extractorutils/unstable/core/errors.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import typing
import logging
from enum import Enum
from types import TracebackType
from typing import TYPE_CHECKING
from uuid import uuid4

from typing_extensions import assert_never

from cognite.extractorutils.util import now

if typing.TYPE_CHECKING:
if TYPE_CHECKING:
from .base import Extractor

__all__ = ["Error", "ErrorLevel"]
Expand All @@ -16,6 +19,18 @@ class ErrorLevel(Enum):
error = "error"
fatal = "fatal"

@property
def log_level(self) -> int:
match self:
case ErrorLevel.warning:
return logging.WARNING
case ErrorLevel.error:
return logging.ERROR
case ErrorLevel.fatal:
return logging.CRITICAL
case _:
assert_never(self)


class Error:
def __init__(
Expand Down
Loading

0 comments on commit 5663a39

Please sign in to comment.