From 8edf042639ffcf354e975c4675fa2de137a87093 Mon Sep 17 00:00:00 2001 From: Zina Rasoamanana Date: Fri, 6 Dec 2024 11:32:24 +0100 Subject: [PATCH] [IMP] queue_job: remove cron garbage collector and automatically requeue jobs in timeout [IMP] queue_job: increment 'retry' when re-queuing job that have been killed --- queue_job/README.rst | 13 +--- queue_job/__manifest__.py | 2 +- queue_job/controllers/main.py | 2 + queue_job/data/queue_data.xml | 11 --- queue_job/job.py | 50 ++++++++++++ queue_job/jobrunner/runner.py | 78 +++++++++++++++---- .../migrations/16.0.2.7.0/pre-migration.py | 35 +++++++++ queue_job/models/queue_job.py | 53 ------------- queue_job/post_init_hook.py | 13 ++++ queue_job/readme/CONFIGURE.rst | 13 +--- queue_job/static/description/index.html | 14 +--- 11 files changed, 166 insertions(+), 118 deletions(-) create mode 100644 queue_job/migrations/16.0.2.7.0/pre-migration.py diff --git a/queue_job/README.rst b/queue_job/README.rst index 791e3f05ce..0472cb2dcf 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -137,18 +137,7 @@ Configuration .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. -* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs. - - * ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck. - Set it to 0 to disable this check. - * ``started_delta``: Spent time in minutes after which a started job is considered stuck. - This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. - Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. - - .. code-block:: python - - # `model` corresponds to 'queue.job' model - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) +* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued. Usage ===== diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 359786abd0..750f9ab7af 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "16.0.2.6.8", + "version": "16.0.2.7.0", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a93c644841..d1e56e8f77 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -31,6 +31,8 @@ def _try_perform_job(self, env, job): job.set_started() job.store() env.cr.commit() + job.lock() + _logger.debug("%s started", job) job.perform() diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml index 55bcb3f5fc..a2680cc475 100644 --- a/queue_job/data/queue_data.xml +++ b/queue_job/data/queue_data.xml @@ -1,17 +1,6 @@ - - Jobs Garbage Collector - 5 - minutes - -1 - - code - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) - Job failed diff --git a/queue_job/job.py b/queue_job/job.py index 920a8a0781..4cd9581929 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -238,6 +238,34 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} + def lock(self): + self.env.cr.execute( + """ + SELECT + * + FROM + queue_job_locks + WHERE + id in ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + AND state='started' + ) + FOR UPDATE; + """, + [self.uuid], + ) + + # 1 job should be locked + if not 1 == len(self.env.cr.fetchall()): + raise RetryableJobError( + f"Trying to lock job that wasn't started, uuid: {self.uuid}" + ) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record @@ -517,6 +545,11 @@ def perform(self): The job is executed with the user which has initiated it. """ + if self.max_retries and self.retry >= self.max_retries: + raise FailedJobError( + "Job: %s, Max. retries (%d) reached" % (self.uuid, self.max_retries) + ) + self.retry += 1 try: self.result = self.func(*tuple(self.args), **self.kwargs) @@ -820,6 +853,23 @@ def set_started(self): self.date_started = datetime.now() self.worker_pid = os.getpid() + # add job to list of lockable jobs + self.env.cr.execute( + """ + INSERT INTO + queue_job_locks (id) + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ON CONFLICT(id) + DO NOTHING; + """, + [self.uuid], + ) + def set_done(self, result=None): self.state = DONE self.exc_name = None diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 25823a9973..bd7313f937 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -114,22 +114,6 @@ * After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it. -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP @@ -343,6 +327,62 @@ def set_job_enqueued(self, uuid): (ENQUEUED, uuid), ) + def requeue_dead_jobs(self): + """ + Set started and enqueued jobs but not locked to pending + + A job is locked when it's being executed + When a job is killed, it releases the lock + + Adding a buffer on 'date_enqueued' to check + that it has been enqueued for more than 10sec. + This prevents from requeuing jobs before they are actually started. + + When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. + """ + + with closing(self.conn.cursor()) as cr: + query = """ + UPDATE + queue_job + SET + state='pending', + retry=(CASE WHEN state='started' THEN retry+1 ELSE retry END) + WHERE + id in ( + SELECT + id + FROM + queue_job_locks + WHERE + id in ( + SELECT + id + FROM + queue_job + WHERE + state IN ('enqueued','started') + AND date_enqueued < + (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING uuid + """ + + cr.execute(query) + + job_uuids_to_requeue = [job_uuid[0] for job_uuid in cr.fetchall()] + if job_uuids_to_requeue: + for uuid in job_uuids_to_requeue: + _logger.warning( + "Re-queued job with uuid: %s", + str(uuid), + ) + class QueueJobRunner(object): def __init__( @@ -424,6 +464,11 @@ def initialize_databases(self): self.channel_manager.notify(db_name, *job_data) _logger.info("queue job runner ready for db %s", db_name) + def requeue_dead_jobs(self): + for db in self.db_by_name.values(): + if db.has_queue_job: + db.requeue_dead_jobs() + def run_jobs(self): now = _odoo_now() for job in self.channel_manager.get_jobs_to_run(now): @@ -516,6 +561,7 @@ def run(self): _logger.info("database connections ready") # inner loop does the normal processing while not self._stop: + self.requeue_dead_jobs() self.process_notifications() self.run_jobs() self.wait_notification() diff --git a/queue_job/migrations/16.0.2.7.0/pre-migration.py b/queue_job/migrations/16.0.2.7.0/pre-migration.py new file mode 100644 index 0000000000..19c3ddb15c --- /dev/null +++ b/queue_job/migrations/16.0.2.7.0/pre-migration.py @@ -0,0 +1,35 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + + +def migrate(cr, version): + # Create job lock table + cr.execute( + """ + CREATE TABLE IF NOT EXISTS queue_job_locks ( + id INT PRIMARY KEY, + CONSTRAINT + queue_job_locks_queue_job_id_fkey + FOREIGN KEY (id) + REFERENCES queue_job (id) ON DELETE CASCADE + ); + """ + ) + + # Deactivate cron garbage collector + cr.execute( + """ + UPDATE + ir_cron + SET + active=False + WHERE id IN ( + SELECT res_id + FROM + ir_model_data + WHERE + module='queue_job' + AND model='ir.cron' + AND name='ir_cron_queue_job_garbage_collector' + ); + """ + ) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 8af7468b7c..ab50873861 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.osv import expression from odoo.tools import config, html_escape from odoo.addons.base_sparse_field.models.fields import Serialized @@ -417,58 +416,6 @@ def autovacuum(self): break return True - def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0): - """Fix jobs that are in a bad states - - :param in_queue_delta: lookup time in minutes for jobs - that are in enqueued state, - 0 means that it is not checked - - :param started_delta: lookup time in minutes for jobs - that are in started state, - 0 means that it is not checked, - -1 will use `--limit-time-real` config value - """ - if started_delta == -1: - started_delta = (config["limit_time_real"] // 60) + 1 - return self._get_stuck_jobs_to_requeue( - enqueued_delta=enqueued_delta, started_delta=started_delta - ).requeue() - - def _get_stuck_jobs_domain(self, queue_dl, started_dl): - domain = [] - now = fields.datetime.now() - if queue_dl: - queue_dl = now - timedelta(minutes=queue_dl) - domain.append( - [ - "&", - ("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)), - ("state", "=", "enqueued"), - ] - ) - if started_dl: - started_dl = now - timedelta(minutes=started_dl) - domain.append( - [ - "&", - ("date_started", "<=", fields.Datetime.to_string(started_dl)), - ("state", "=", "started"), - ] - ) - if not domain: - raise exceptions.ValidationError( - _("If both parameters are 0, ALL jobs will be requeued!") - ) - return expression.OR(domain) - - def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta): - job_model = self.env["queue.job"] - stuck_jobs = job_model.search( - self._get_stuck_jobs_domain(enqueued_delta, started_delta) - ) - return stuck_jobs - def related_action_open_record(self): """Open a form view with the record(s) of the job. diff --git a/queue_job/post_init_hook.py b/queue_job/post_init_hook.py index 1e1a469cdf..daaa7c98df 100644 --- a/queue_job/post_init_hook.py +++ b/queue_job/post_init_hook.py @@ -31,3 +31,16 @@ def post_init_hook(cr, registry): FOR EACH ROW EXECUTE PROCEDURE queue_job_notify(); """ ) + + # Create job lock table + cr.execute( + """ + CREATE TABLE IF NOT EXISTS queue_job_locks ( + id INT PRIMARY KEY, + CONSTRAINT + queue_job_locks_queue_job_id_fkey + FOREIGN KEY (id) + REFERENCES queue_job (id) ON DELETE CASCADE + ); + """ + ) diff --git a/queue_job/readme/CONFIGURE.rst b/queue_job/readme/CONFIGURE.rst index fdd3dd1598..0d24075c0a 100644 --- a/queue_job/readme/CONFIGURE.rst +++ b/queue_job/readme/CONFIGURE.rst @@ -47,15 +47,4 @@ .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. -* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs. - - * ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck. - Set it to 0 to disable this check. - * ``started_delta``: Spent time in minutes after which a started job is considered stuck. - This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. - Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. - - .. code-block:: python - - # `model` corresponds to 'queue.job' model - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) +* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued. diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index b430c0ea4f..5abda84847 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -496,20 +496,8 @@

Configuration

of running Odoo is obviously not for production purposes. -
    -
  • Be sure to check out Jobs Garbage Collector CRON and change enqueued_delta and started_delta parameters to your needs.

      -
    • enqueued_delta: Spent time in minutes after which an enqueued job is considered stuck. -Set it to 0 to disable this check.
    • -
    • started_delta: Spent time in minutes after which a started job is considered stuck. -This parameter should not be less than --limit-time-real // 60 parameter in your configuration. -Set it to 0 to disable this check. Set it to -1 to automate it, based in the server’s --limit-time-real config parameter.
    • -
    -
    -# `model` corresponds to 'queue.job' model
    -model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
    -
    -
  • +
  • Jobs that remain in enqueued or started state (because, for instance, their worker has been killed) will be automatically re-queued.