diff --git a/queue_job/README.rst b/queue_job/README.rst index 4f4fa1fe1d..30032e33bc 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -137,18 +137,7 @@ Configuration .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. -* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs. - - * ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck. - Set it to 0 to disable this check. - * ``started_delta``: Spent time in minutes after which a started job is considered stuck. - This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. - Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. - - .. code-block:: python - - # `model` corresponds to 'queue.job' model - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) +* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued. Usage ===== diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index efc1c1abb0..de6bfc4343 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "16.0.2.8.0", + "version": "16.0.2.9.0", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index a93c644841..d1e56e8f77 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -31,6 +31,8 @@ def _try_perform_job(self, env, job): job.set_started() job.store() env.cr.commit() + job.lock() + _logger.debug("%s started", job) job.perform() diff --git a/queue_job/data/queue_data.xml b/queue_job/data/queue_data.xml index 55bcb3f5fc..a2680cc475 100644 --- a/queue_job/data/queue_data.xml +++ b/queue_job/data/queue_data.xml @@ -1,17 +1,6 @@ - - Jobs Garbage Collector - 5 - minutes - -1 - - code - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) - Job failed diff --git a/queue_job/job.py b/queue_job/job.py index 288b3e0421..619a2d8aca 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -238,6 +238,61 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} + def add_lock_record(self): + """ + Create row in db to be locked once the job is performed + """ + self.env.cr.execute( + """ + INSERT INTO + queue_job_locks (id) + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ON CONFLICT(id) + DO NOTHING; + """, + [self.uuid], + ) + + def lock(self): + """ + Lock row of job that is being performed + + If a job cannot be locked, + it means that the job wasn't started, + a RetryableJobError is thrown. + """ + self.env.cr.execute( + """ + SELECT + * + FROM + queue_job_locks + WHERE + id in ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + AND state='started' + ) + FOR UPDATE; + """, + [self.uuid], + ) + + # 1 job should be locked + if 1 != len(self.env.cr.fetchall()): + raise RetryableJobError( + f"Trying to lock job that wasn't started, uuid: {self.uuid}" + ) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record @@ -827,6 +882,7 @@ def set_started(self): self.state = STARTED self.date_started = datetime.now() self.worker_pid = os.getpid() + self.add_lock_record() def set_done(self, result=None): self.state = DONE diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 025c228c62..b096942b11 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -114,22 +114,6 @@ * After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it. -* When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, - it does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. - You must therefore requeue them manually, either from the Jobs view, - or by running the following SQL statement *before starting Odoo*: - -.. code-block:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') - .. rubric:: Footnotes .. [1] From a security standpoint, it is safe to have an anonymous HTTP @@ -155,7 +139,7 @@ from odoo.tools import config from . import queue_job_config -from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager +from .channels import ENQUEUED, NOT_DONE, ChannelManager SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 @@ -207,28 +191,6 @@ def _connection_info_for(db_name): def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): - # Method to set failed job (due to timeout, etc) as pending, - # to avoid keeping it as enqueued. - def set_job_pending(): - connection_info = _connection_info_for(db_name) - conn = psycopg2.connect(**connection_info) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - with closing(conn.cursor()) as cr: - cr.execute( - "UPDATE queue_job SET state=%s, " - "date_enqueued=NULL, date_started=NULL " - "WHERE uuid=%s and state=%s " - "RETURNING uuid", - (PENDING, job_uuid, ENQUEUED), - ) - if cr.fetchone(): - _logger.warning( - "state of job %s was reset from %s to %s", - job_uuid, - ENQUEUED, - PENDING, - ) - # TODO: better way to HTTP GET asynchronously (grequest, ...)? # if this was python3 I would be doing this with # asyncio, aiohttp and aiopg @@ -236,6 +198,7 @@ def urlopen(): url = "{}://{}:{}/queue_job/runjob?db={}&job_uuid={}".format( scheme, host, port, db_name, job_uuid ) + # pylint: disable=except-pass try: auth = None if user: @@ -249,10 +212,10 @@ def urlopen(): # for codes between 500 and 600 response.raise_for_status() except requests.Timeout: - set_job_pending() + # A timeout is a normal behaviour, it shouldn't be logged as an exception + pass except Exception: _logger.exception("exception in GET %s", url) - set_job_pending() thread = threading.Thread(target=urlopen) thread.daemon = True @@ -343,6 +306,92 @@ def set_job_enqueued(self, uuid): (ENQUEUED, uuid), ) + def _query_requeue_dead_jobs(self): + return """ + UPDATE + queue_job + SET + state=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'failed' + ELSE 'pending' + END), + retry=(CASE WHEN state='started' THEN COALESCE(retry,0)+1 ELSE retry END), + exc_name=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'JobFoundDead' + ELSE exc_name + END), + exc_info=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>max_retries + THEN 'Job found dead after too many retries' + ELSE exc_info + END) + WHERE + id in ( + SELECT + id + FROM + queue_job_locks + WHERE + id in ( + SELECT + id + FROM + queue_job + WHERE + state IN ('enqueued','started') + AND date_enqueued < + (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING uuid + """ + + def requeue_dead_jobs(self): + """ + Set started and enqueued jobs but not locked to pending + + A job is locked when it's being executed + When a job is killed, it releases the lock + + If the number of retries exceeds the number of max retries, + the job is set as 'failed' with the error 'JobFoundDead'. + + Adding a buffer on 'date_enqueued' to check + that it has been enqueued for more than 10sec. + This prevents from requeuing jobs before they are actually started. + + When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. + """ + + with closing(self.conn.cursor()) as cr: + query = self._query_requeue_dead_jobs() + + cr.execute(query) + + for (uuid,) in cr.fetchall(): + _logger.warning( + "Re-queued job with uuid: %s", + uuid, + ) + class QueueJobRunner: def __init__( @@ -424,6 +473,11 @@ def initialize_databases(self): self.channel_manager.notify(db_name, *job_data) _logger.info("queue job runner ready for db %s", db_name) + def requeue_dead_jobs(self): + for db in self.db_by_name.values(): + if db.has_queue_job: + db.requeue_dead_jobs() + def run_jobs(self): now = _odoo_now() for job in self.channel_manager.get_jobs_to_run(now): @@ -516,6 +570,7 @@ def run(self): _logger.info("database connections ready") # inner loop does the normal processing while not self._stop: + self.requeue_dead_jobs() self.process_notifications() self.run_jobs() self.wait_notification() diff --git a/queue_job/migrations/16.0.2.9.0/pre-migration.py b/queue_job/migrations/16.0.2.9.0/pre-migration.py new file mode 100644 index 0000000000..19c3ddb15c --- /dev/null +++ b/queue_job/migrations/16.0.2.9.0/pre-migration.py @@ -0,0 +1,35 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + + +def migrate(cr, version): + # Create job lock table + cr.execute( + """ + CREATE TABLE IF NOT EXISTS queue_job_locks ( + id INT PRIMARY KEY, + CONSTRAINT + queue_job_locks_queue_job_id_fkey + FOREIGN KEY (id) + REFERENCES queue_job (id) ON DELETE CASCADE + ); + """ + ) + + # Deactivate cron garbage collector + cr.execute( + """ + UPDATE + ir_cron + SET + active=False + WHERE id IN ( + SELECT res_id + FROM + ir_model_data + WHERE + module='queue_job' + AND model='ir.cron' + AND name='ir_cron_queue_job_garbage_collector' + ); + """ + ) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index ff3723478b..075c8f0501 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models -from odoo.osv import expression from odoo.tools import config, html_escape from odoo.addons.base_sparse_field.models.fields import Serialized @@ -419,58 +418,6 @@ def autovacuum(self): break return True - def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0): - """Fix jobs that are in a bad states - - :param in_queue_delta: lookup time in minutes for jobs - that are in enqueued state, - 0 means that it is not checked - - :param started_delta: lookup time in minutes for jobs - that are in started state, - 0 means that it is not checked, - -1 will use `--limit-time-real` config value - """ - if started_delta == -1: - started_delta = (config["limit_time_real"] // 60) + 1 - return self._get_stuck_jobs_to_requeue( - enqueued_delta=enqueued_delta, started_delta=started_delta - ).requeue() - - def _get_stuck_jobs_domain(self, queue_dl, started_dl): - domain = [] - now = fields.datetime.now() - if queue_dl: - queue_dl = now - timedelta(minutes=queue_dl) - domain.append( - [ - "&", - ("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)), - ("state", "=", "enqueued"), - ] - ) - if started_dl: - started_dl = now - timedelta(minutes=started_dl) - domain.append( - [ - "&", - ("date_started", "<=", fields.Datetime.to_string(started_dl)), - ("state", "=", "started"), - ] - ) - if not domain: - raise exceptions.ValidationError( - _("If both parameters are 0, ALL jobs will be requeued!") - ) - return expression.OR(domain) - - def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta): - job_model = self.env["queue.job"] - stuck_jobs = job_model.search( - self._get_stuck_jobs_domain(enqueued_delta, started_delta) - ) - return stuck_jobs - def related_action_open_record(self): """Open a form view with the record(s) of the job. diff --git a/queue_job/post_init_hook.py b/queue_job/post_init_hook.py index 1e1a469cdf..daaa7c98df 100644 --- a/queue_job/post_init_hook.py +++ b/queue_job/post_init_hook.py @@ -31,3 +31,16 @@ def post_init_hook(cr, registry): FOR EACH ROW EXECUTE PROCEDURE queue_job_notify(); """ ) + + # Create job lock table + cr.execute( + """ + CREATE TABLE IF NOT EXISTS queue_job_locks ( + id INT PRIMARY KEY, + CONSTRAINT + queue_job_locks_queue_job_id_fkey + FOREIGN KEY (id) + REFERENCES queue_job (id) ON DELETE CASCADE + ); + """ + ) diff --git a/queue_job/readme/CONFIGURE.rst b/queue_job/readme/CONFIGURE.rst index fdd3dd1598..0d24075c0a 100644 --- a/queue_job/readme/CONFIGURE.rst +++ b/queue_job/readme/CONFIGURE.rst @@ -47,15 +47,4 @@ .. [1] It works with the threaded Odoo server too, although this way of running Odoo is obviously not for production purposes. -* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs. - - * ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck. - Set it to 0 to disable this check. - * ``started_delta``: Spent time in minutes after which a started job is considered stuck. - This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration. - Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter. - - .. code-block:: python - - # `model` corresponds to 'queue.job' model - model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1) +* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued. diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 2a9504b5bf..ea8ea81e01 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -496,20 +496,8 @@

Configuration

of running Odoo is obviously not for production purposes. -
    -
  • Be sure to check out Jobs Garbage Collector CRON and change enqueued_delta and started_delta parameters to your needs.

      -
    • enqueued_delta: Spent time in minutes after which an enqueued job is considered stuck. -Set it to 0 to disable this check.
    • -
    • started_delta: Spent time in minutes after which a started job is considered stuck. -This parameter should not be less than --limit-time-real // 60 parameter in your configuration. -Set it to 0 to disable this check. Set it to -1 to automate it, based in the server’s --limit-time-real config parameter.
    • -
    -
    -# `model` corresponds to 'queue.job' model
    -model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
    -
    -
  • +
  • Jobs that remain in enqueued or started state (because, for instance, their worker has been killed) will be automatically re-queued.
diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index db53ac3a60..2fdff496bc 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -7,3 +7,4 @@ from . import test_model_job_function from . import test_queue_job_protected_write from . import test_wizards +from . import test_requeue_dead_job diff --git a/queue_job/tests/test_requeue_dead_job.py b/queue_job/tests/test_requeue_dead_job.py new file mode 100644 index 0000000000..3d63dd8780 --- /dev/null +++ b/queue_job/tests/test_requeue_dead_job.py @@ -0,0 +1,133 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from contextlib import closing +from datetime import datetime, timedelta + +from odoo.tests.common import TransactionCase + +from odoo.addons.queue_job.job import Job +from odoo.addons.queue_job.jobrunner.runner import Database + + +class TestRequeueDeadJob(TransactionCase): + def create_dummy_job(self, uuid): + """ + Create dummy job for tests + """ + return ( + self.env["queue.job"] + .with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ) + .create( + { + "uuid": uuid, + "user_id": self.env.user.id, + "state": "pending", + "model_name": "queue.job", + "method_name": "write", + } + ) + ) + + def get_locks(self, uuid, cr=None): + """ + Retrieve lock rows + """ + if cr is None: + cr = self.env.cr + + cr.execute( + """ + SELECT + id + FROM + queue_job_locks + WHERE + id IN ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + ) + FOR UPDATE SKIP LOCKED + """, + [uuid], + ) + + return cr.fetchall() + + def test_add_lock_record(self): + queue_job = self.create_dummy_job("test_add_lock") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + self.assertEqual(job_obj.state, "started") + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + def test_lock(self): + queue_job = self.create_dummy_job("test_lock") + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_started() + job_obj.store() + + locks = self.get_locks(job_obj.uuid) + + self.assertEqual(1, len(locks)) + + # commit to update queue_job records in DB + self.env.cr.commit() # pylint: disable=E8102 + + job_obj.lock() + + with closing(self.env.registry.cursor()) as new_cr: + locks = self.get_locks(job_obj.uuid, new_cr) + + # Row should be locked + self.assertEqual(0, len(locks)) + + # clean up + queue_job.unlink() + + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) + + def test_requeue_dead_jobs(self): + uuid = "test_requeue_dead_jobs" + + queue_job = self.create_dummy_job(uuid) + job_obj = Job.load(self.env, queue_job.uuid) + + job_obj.set_enqueued() + # simulate enqueuing was in the past + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.set_started() + + job_obj.store() + self.env.cr.commit() # pylint: disable=E8102 + + # requeue dead jobs using current cursor + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + + uuids_requeued = self.env.cr.fetchall() + + self.assertEqual(len(uuids_requeued), 1) + self.assertEqual(uuids_requeued[0][0], uuid) + + # clean up + queue_job.unlink() + self.env.cr.commit() # pylint: disable=E8102 + + # because we committed the cursor, the savepoint of the test method is + # gone, and this would break TransactionCase cleanups + self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id)