From 7c1d31b6b8d3274cdb8987b9a37fe4ad10a8f271 Mon Sep 17 00:00:00 2001 From: Samuel Laroche Date: Thu, 18 Apr 2024 12:07:52 -0400 Subject: [PATCH] Add timeout delay as a configuration job option --- src/saturn_engine/worker/executors/arq/__init__.py | 1 + src/saturn_engine/worker/executors/arq/executor.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/saturn_engine/worker/executors/arq/__init__.py b/src/saturn_engine/worker/executors/arq/__init__.py index 6e014f21..efcaef9b 100644 --- a/src/saturn_engine/worker/executors/arq/__init__.py +++ b/src/saturn_engine/worker/executors/arq/__init__.py @@ -1,4 +1,5 @@ TIMEOUT = 1200 +TIMEOUT_DELAY = 60 EXECUTE_FUNC_NAME = "remote_execute" healthcheck_interval = 10 diff --git a/src/saturn_engine/worker/executors/arq/executor.py b/src/saturn_engine/worker/executors/arq/executor.py index c19fd41f..60f8f121 100644 --- a/src/saturn_engine/worker/executors/arq/executor.py +++ b/src/saturn_engine/worker/executors/arq/executor.py @@ -23,6 +23,7 @@ from .. import Executor from . import EXECUTE_FUNC_NAME from . import TIMEOUT +from . import TIMEOUT_DELAY from . import executor_healthcheck_key from . import healthcheck_interval from . import worker_healthcheck_key @@ -41,6 +42,7 @@ class Options: concurrency: int queue_name: str = "arq:queue" timeout: int = TIMEOUT + timeout_delay: int = TIMEOUT_DELAY def __init__(self, options: Options, services: Services) -> None: self.logger = getLogger(__name__, self) @@ -103,7 +105,7 @@ async def process_message(self, message: ExecutableMessage) -> PipelineResults: job = await (await self.redis_queue).enqueue_job( EXECUTE_FUNC_NAME, message.message.as_remote(), - _expires=options.timeout + 5, + _expires=options.timeout + options.timeout_delay, _queue_name=options.queue_name, ) except (OSError, RedisError):