Skip to content
This repository has been archived by the owner on Jun 6, 2022. It is now read-only.

Updating the celery_periodic_task_changed triggers ALL the existing tasks now. #28

Open
akapit opened this issue Feb 16, 2021 · 1 comment

Comments

@akapit
Copy link

akapit commented Feb 16, 2021

Whenever I need to update the scheduler tasks, I update the celery_periodic_task_changed to now to make beat aware of the changes.

The problem is that doing that triggers all the scheduled tasks to run NOW, and I just need them to run at the right time they were scheduled.

There is any workaround for this?

Thanks

@minsis
Copy link

minsis commented Mar 25, 2021

This is a known issue celery/celery#4806. The gist of it is that when a change is made to any one of your scheduled tasks, beat then reloads the table to make sure everything is current. In doing so it also ends up checking the last_ran_at timestamp and if too much time has elapsed it schedules it for execution. There's some logic issues with how that works. There's a larger work around in the same issue that you can use if you want to fork your own copy. Note that this can also happen if you restart beat in the same timeframe.

The way I've been able to work around it, on start up or changes made to any one of my scheduled entries, I basically just set last_run_at to null. As the logic for last_run_at wont trigger an execution if its the 'first time'. Of course this creates an issue if you actually care about the last execution time. Here my use case it doesn't matter to have this viewable by anyone as I have flower to track all my tasks executions as well as a custom audit table.

This is what my start up of beat looks like (replace SessionManager() with whatever your db connection is):

@beat_init.connect
def reset_last_run(**_):
    """
    Reset all the last ran times in celery database. This is to prevent accidental runs if a new service is deployed
    or restarted.
    """

    logging.info("Resetting last run at for Periodic Task table")
    with SessionManager() as session:
        periodic_tasks = session.query(PeriodicTask).all()
        for pt in periodic_tasks:
            pt.last_run_at = None
            session.add(pt)
        session.commit()

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants