From 033802bca157846a0fe002139044346926943d46 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Tue, 7 Jan 2025 12:55:13 +0300 Subject: [PATCH 1/3] Experimental feature: loop for crashed tasks --- karton/core/karton.py | 78 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/karton/core/karton.py b/karton/core/karton.py index 5d819b9..1b069de 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -346,6 +346,84 @@ def loop(self) -> None: if task: self.internal_process(task) + def loop_crashed_tasks(self) -> None: + """ + Blocking loop that consumes crashed tasks. + It's preferable for debugging issueses occured with your service. + + Consumer.loop_crashed_tasks is different from Consumer.loop: + - It does not rely on `karton.queue`. It finds crashed doc in `karton.task`. + So RUN ONLY ONE REPLICA to avoid race condition + and large resource consumption. + - It does not rely in task_timeout. + - It does not register new binds. + - It does not shut down another instances on binds / version mismatch. + - It does not listen queue in a traditional way / it dows not subsribe. + It looks for tasks in `CRASHED` state. + - It does not increment `TASK_CRASHED` metrics. + - It reimplements `Cunsumer.internal_process` in simplified way. + + :meta private: + """ + self.log.info("Service %s started in crash-consume mode", self.identity) + + with self.graceful_killer(): + while not self.shutdown: + + task: Task + for task in self.backend.iter_all_tasks(parse_resources=False): + if task.headers["receiver"] == self.identity: + break + else: + self.log.warning( + "Crashed task for consumer %s not found." + "Sleeping and trying again..." % (self.identity,) + ) + time.sleep(5) + continue + + self.current_task = task + self.log_handler.set_task(self.current_task) + + exception_str = None + + try: + self.log.info("Received new task - %s", self.current_task.uid) + self.backend.set_task_status(self.current_task, TaskState.STARTED) + + self._run_pre_hooks() + + saved_exception = None + try: + self.process(self.current_task) + except Exception as exc: + saved_exception = exc + raise + finally: + self._run_post_hooks(saved_exception) + + self.log.info("Task done - %s", self.current_task.uid) + except Exception: + exc_info = sys.exc_info() + exception_str = traceback.format_exception(*exc_info) + self.log.exception( + "Failed to process task - %s", self.current_task.uid + ) + finally: + self.backend.increment_metrics( + KartonMetrics.TASK_CONSUMED, self.identity + ) + + task_state = TaskState.FINISHED + + # report the task status as crashed + # if an exception was caught while processing + if exception_str is not None: + task_state = TaskState.CRASHED + self.current_task.error = exception_str + + self.backend.set_task_status(self.current_task, task_state) + class LogConsumer(KartonServiceBase): """ From 6073d1a3cd881dbb9918c8cd5a40267d1c045222 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Tue, 7 Jan 2025 13:18:03 +0300 Subject: [PATCH 2/3] Do not make task STARTED for not losing the task in this status --- karton/core/karton.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karton/core/karton.py b/karton/core/karton.py index 1b069de..ddf01c4 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -389,7 +389,7 @@ def loop_crashed_tasks(self) -> None: try: self.log.info("Received new task - %s", self.current_task.uid) - self.backend.set_task_status(self.current_task, TaskState.STARTED) + #self.backend.set_task_status(self.current_task, TaskState.STARTED) self._run_pre_hooks() From 944a4cbcbf577fc26b73fc0e307f34aca4a8e44d Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Tue, 7 Jan 2025 13:20:52 +0300 Subject: [PATCH 3/3] linting --- karton/core/karton.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karton/core/karton.py b/karton/core/karton.py index ddf01c4..1414331 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -389,7 +389,7 @@ def loop_crashed_tasks(self) -> None: try: self.log.info("Received new task - %s", self.current_task.uid) - #self.backend.set_task_status(self.current_task, TaskState.STARTED) + # self.backend.set_task_status(self.current_task, TaskState.STARTED) self._run_pre_hooks()