From 46e97a08cc90c4c8e4e35ae21fe9bf5eb17a00d2 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Mon, 15 Feb 2021 22:52:53 +0200 Subject: [PATCH] Upgrade RQ to v1.5 (#5207) * upgrade RQ to v1.5 * set job's started_at * update healthcheck to match string worker names * delay worker healthcheck for 5 minutes from start to allow enough time to load in case many workers try to load simultaneously * log when worker cannot be found --- redash/cli/rq.py | 56 +++++++++++++++---------------- redash/tasks/__init__.py | 1 - redash/tasks/general.py | 39 ++------------------- redash/tasks/queries/execution.py | 1 + redash/tasks/schedule.py | 4 +-- redash/tasks/worker.py | 4 ++- redash/worker.py | 2 +- requirements.txt | 5 +-- worker.conf | 1 + 9 files changed, 40 insertions(+), 73 deletions(-) diff --git a/redash/cli/rq.py b/redash/cli/rq.py index 80ec7eb7ee..97fdaab43d 100644 --- a/redash/cli/rq.py +++ b/redash/cli/rq.py @@ -50,30 +50,22 @@ def worker(queues): class WorkerHealthcheck(base.BaseCheck): - NAME = 'RQ Worker Healthcheck' - INTERVAL = datetime.timedelta(minutes=5) - _last_check_time = {} - - def time_to_check(self, pid): - now = datetime.datetime.utcnow() - - if pid not in self._last_check_time: - self._last_check_time[pid] = now - - if now - self._last_check_time[pid] >= self.INTERVAL: - self._last_check_time[pid] = now - return True - - return False + NAME = "RQ Worker Healthcheck" def __call__(self, process_spec): - pid = process_spec['pid'] - if not self.time_to_check(pid): - return True - + pid = process_spec["pid"] all_workers = Worker.all(connection=rq_redis_connection) - worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and - w.pid == pid].pop() + workers = [ + w + for w in all_workers + if w.hostname == socket.gethostname() and w.pid == pid + ] + + if not workers: + self._log(f"Cannot find worker for hostname {socket.gethostname()} and pid {pid}. ==> Is healthy? False") + return False + + worker = workers.pop() is_busy = worker.get_state() == WorkerStatus.BUSY @@ -85,12 +77,19 @@ def __call__(self, process_spec): is_healthy = is_busy or seen_lately or has_nothing_to_do - self._log("Worker %s healthcheck: Is busy? %s. " - "Seen lately? %s (%d seconds ago). " - "Has nothing to do? %s (%d jobs in watched queues). " - "==> Is healthy? %s", - worker.key, is_busy, seen_lately, time_since_seen.seconds, - has_nothing_to_do, total_jobs_in_watched_queues, is_healthy) + self._log( + "Worker %s healthcheck: Is busy? %s. " + "Seen lately? %s (%d seconds ago). " + "Has nothing to do? %s (%d jobs in watched queues). " + "==> Is healthy? %s", + worker.key, + is_busy, + seen_lately, + time_since_seen.seconds, + has_nothing_to_do, + total_jobs_in_watched_queues, + is_healthy, + ) return is_healthy @@ -98,4 +97,5 @@ def __call__(self, process_spec): @manager.command() def healthcheck(): return check_runner.CheckRunner( - 'worker_healthcheck', 'worker', None, [(WorkerHealthcheck, {})]).run() + "worker_healthcheck", "worker", None, [(WorkerHealthcheck, {})] + ).run() diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index 42dae5b796..4f68fe0ef9 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -3,7 +3,6 @@ version_check, send_mail, sync_user_details, - purge_failed_jobs, ) from .queries import ( enqueue_query, diff --git a/redash/tasks/general.py b/redash/tasks/general.py index 5b3fcca496..c482d1f450 100644 --- a/redash/tasks/general.py +++ b/redash/tasks/general.py @@ -2,13 +2,10 @@ from datetime import datetime from flask_mail import Message -from rq import Connection, Queue -from rq.registry import FailedJobRegistry -from rq.job import Job -from redash import mail, models, settings, rq_redis_connection +from redash import mail, models, settings from redash.models import users from redash.version_check import run_version_check -from redash.worker import job, get_job_logger, default_operational_queues +from redash.worker import job, get_job_logger from redash.tasks.worker import Queue from redash.query_runner import NotSupported @@ -94,35 +91,3 @@ def get_schema(data_source_id, refresh): def sync_user_details(): users.sync_last_active_at() - - -def purge_failed_jobs(): - with Connection(rq_redis_connection): - queues = [q for q in Queue.all() if q.name not in default_operational_queues] - for queue in queues: - failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids() - failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection) - stale_jobs = [] - for failed_job in failed_jobs: - # the job may not actually exist anymore in Redis - if not failed_job: - continue - # the job could have an empty ended_at value in case - # of a worker dying before it can save the ended_at value, - # in which case we also consider them stale - if not failed_job.ended_at: - stale_jobs.append(failed_job) - elif ( - datetime.utcnow() - failed_job.ended_at - ).total_seconds() > settings.JOB_DEFAULT_FAILURE_TTL: - stale_jobs.append(failed_job) - - for stale_job in stale_jobs: - stale_job.delete() - - if stale_jobs: - logger.info( - "Purged %d old failed jobs from the %s queue.", - len(stale_jobs), - queue.name, - ) diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index 24ffea929d..0c7495e458 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -90,6 +90,7 @@ def enqueue_query( "scheduled_query_id": scheduled_query_id, "is_api_key": is_api_key, "job_timeout": time_limit, + "failure_ttl": settings.JOB_DEFAULT_FAILURE_TTL, "meta": { "data_source_id": data_source.id, "org_id": data_source.org_id, diff --git a/redash/tasks/schedule.py b/redash/tasks/schedule.py index 9e0cffd55f..5ed4b580e7 100644 --- a/redash/tasks/schedule.py +++ b/redash/tasks/schedule.py @@ -15,7 +15,6 @@ empty_schedules, refresh_schemas, cleanup_query_results, - purge_failed_jobs, version_check, send_aggregated_errors, Queue, @@ -71,14 +70,13 @@ def periodic_job_definitions(): { "func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE), - }, + }, { "func": sync_user_details, "timeout": 60, "interval": timedelta(minutes=1), "result_ttl": 600, }, - {"func": purge_failed_jobs, "timeout": 3600, "interval": timedelta(days=1)}, { "func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL), diff --git a/redash/tasks/worker.py b/redash/tasks/worker.py index d4ca454a31..3502c60b08 100644 --- a/redash/tasks/worker.py +++ b/redash/tasks/worker.py @@ -101,12 +101,13 @@ def enforce_hard_limit(self, job): ) self.kill_horse() - def monitor_work_horse(self, job): + def monitor_work_horse(self, job, queue): """The worker will monitor the work horse and make sure that it either executes successfully or the status of the job is set to failed """ self.monitor_started = utcnow() + job.started_at = utcnow() while True: try: with UnixSignalDeathPenalty( @@ -158,6 +159,7 @@ def monitor_work_horse(self, job): self.handle_job_failure( job, + queue=queue, exc_string="Work-horse process was terminated unexpectedly " "(waitpid returned %s)" % ret_val, ) diff --git a/redash/worker.py b/redash/worker.py index 63f62f70fb..28b13ee22a 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -30,7 +30,7 @@ class StatsdRecordingJobDecorator(rq_job): # noqa queue_class = RedashQueue -job = partial(StatsdRecordingJobDecorator, connection=rq_redis_connection) +job = partial(StatsdRecordingJobDecorator, connection=rq_redis_connection, failure_ttl=settings.JOB_DEFAULT_FAILURE_TTL) class CurrentJobFilter(logging.Filter): diff --git a/requirements.txt b/requirements.txt index 434e705237..62e64f3a36 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,7 +24,7 @@ psycopg2==2.8.3 python-dateutil==2.8.0 pytz>=2019.3 PyYAML==5.1.2 -redis==3.3.11 +redis==3.5.0 requests==2.21.0 SQLAlchemy==1.3.10 # We can't upgrade SQLAlchemy-Searchable version as newer versions require PostgreSQL > 9.6, but we target older versions at the moment. @@ -34,8 +34,9 @@ pyparsing==2.3.0 SQLAlchemy-Utils==0.34.2 sqlparse==0.3.0 statsd==3.3.0 +greenlet==0.4.16 gunicorn==20.0.4 -rq==1.1.0 +rq==1.5.0 rq-scheduler==0.9.1 jsonschema==3.1.1 RestrictedPython==5.0 diff --git a/worker.conf b/worker.conf index c77b381275..c817a3a04d 100644 --- a/worker.conf +++ b/worker.conf @@ -17,6 +17,7 @@ directory=/app stopsignal=TERM autostart=true autorestart=true +startsecs=300 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr