Skip to content

Commit

Permalink
Add timeout delay as a configuration job option
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Laroche authored and isra17 committed Apr 18, 2024
1 parent a8e86a5 commit 7c1d31b
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/saturn_engine/worker/executors/arq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
TIMEOUT = 1200
TIMEOUT_DELAY = 60
EXECUTE_FUNC_NAME = "remote_execute"

healthcheck_interval = 10
Expand Down
4 changes: 3 additions & 1 deletion src/saturn_engine/worker/executors/arq/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .. import Executor
from . import EXECUTE_FUNC_NAME
from . import TIMEOUT
from . import TIMEOUT_DELAY
from . import executor_healthcheck_key
from . import healthcheck_interval
from . import worker_healthcheck_key
Expand All @@ -41,6 +42,7 @@ class Options:
concurrency: int
queue_name: str = "arq:queue"
timeout: int = TIMEOUT
timeout_delay: int = TIMEOUT_DELAY

def __init__(self, options: Options, services: Services) -> None:
self.logger = getLogger(__name__, self)
Expand Down Expand Up @@ -103,7 +105,7 @@ async def process_message(self, message: ExecutableMessage) -> PipelineResults:
job = await (await self.redis_queue).enqueue_job(
EXECUTE_FUNC_NAME,
message.message.as_remote(),
_expires=options.timeout + 5,
_expires=options.timeout + options.timeout_delay,
_queue_name=options.queue_name,
)
except (OSError, RedisError):
Expand Down

0 comments on commit 7c1d31b

Please sign in to comment.