From c0782c37ccadaa311704abdeb719972bdf5470be Mon Sep 17 00:00:00 2001 From: noncomputable Date: Tue, 18 Feb 2020 00:59:00 -0500 Subject: [PATCH 1/3] added rq.job's dependency parameter to rq_scheduler --- rq_scheduler/scheduler.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index 775a45c..3176509 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -109,7 +109,7 @@ def stop(signum, frame): def _create_job(self, func, args=None, kwargs=None, commit=True, result_ttl=None, ttl=None, id=None, description=None, - queue_name=None, timeout=None, meta=None): + depends_on=None, queue_name=None, timeout=None, meta=None): """ Creates an RQ job and saves it to Redis. The job is assigned to the given queue name if not None else it is assigned to scheduler queue by @@ -122,7 +122,7 @@ def _create_job(self, func, args=None, kwargs=None, commit=True, job = self.job_class.create( func, args=args, connection=self.connection, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id, - description=description, timeout=timeout, meta=meta) + description=description, depends_on=depends_on, timeout=timeout, meta=meta) if queue_name: job.origin = queue_name else: @@ -144,6 +144,9 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs): - job_ttl - job_result_ttl - job_description + - depends_on + - meta + - queue_name Usage: @@ -162,12 +165,13 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs): job_ttl = kwargs.pop('job_ttl', None) job_result_ttl = kwargs.pop('job_result_ttl', None) job_description = kwargs.pop('job_description', None) + depends_on = kwargs.pop('depends_on', None) meta = kwargs.pop('meta', None) queue_name = kwargs.pop('queue_name', None) job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, - description=job_description, meta=meta, queue_name=queue_name) + description=job_description, depends_on=depends_on, meta=meta, queue_name=queue_name) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(scheduled_time)}) return job @@ -183,20 +187,21 @@ def enqueue_in(self, time_delta, func, *args, **kwargs): job_ttl = kwargs.pop('job_ttl', None) job_result_ttl = kwargs.pop('job_result_ttl', None) job_description = kwargs.pop('job_description', None) + depends_on = kwargs.pop('depends_on', None) meta = kwargs.pop('meta', None) queue_name = kwargs.pop('queue_name', None) job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, - description=job_description, meta=meta, queue_name=queue_name) + description=job_description, depends_on=depends_on, meta=meta, queue_name=queue_name) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(datetime.utcnow() + time_delta)}) return job def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, result_ttl=None, ttl=None, - timeout=None, id=None, description=None, queue_name=None, - meta=None): + timeout=None, id=None, description=None, depends_on=None, + queue_name=None, meta=None): """ Schedule a job to be periodically executed, at a certain interval. """ @@ -205,7 +210,7 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None, result_ttl = -1 job = self._create_job(func, args=args, kwargs=kwargs, commit=False, result_ttl=result_ttl, ttl=ttl, id=id, - description=description, queue_name=queue_name, + description=description, depends_on=depends_on, queue_name=queue_name, timeout=timeout, meta=meta) if interval is not None: @@ -220,7 +225,7 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None, return job def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, - queue_name=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False): + queue_name=None, id=None, timeout=None, description=None, depends_on=None, meta=None, use_local_timezone=False): """ Schedule a cronjob """ @@ -230,7 +235,7 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, # Otherwise the job would expire after 500 sec. job = self._create_job(func, args=args, kwargs=kwargs, commit=False, result_ttl=-1, id=id, queue_name=queue_name, - description=description, timeout=timeout, meta=meta) + description=description, depends_on=depends_on, timeout=timeout, meta=meta) job.meta['cron_string'] = cron_string job.meta['use_local_timezone'] = use_local_timezone From 7a49007c74dfd65a0bf91dd50840c87741c1e427 Mon Sep 17 00:00:00 2001 From: noncomputable Date: Tue, 18 Feb 2020 06:15:49 +0000 Subject: [PATCH 2/3] added test for job dependency parameter --- tests/test_scheduler.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 1af1ded..4aeb45d 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -101,6 +101,14 @@ def test_create_job_with_description(self): job_from_queue = Job.fetch(job.id, connection=self.testconn) self.assertEqual('description', job_from_queue.description) + def test_create_job_with_depends_on(self): + """ + Ensure that depends_on is passed to RQ. + """ + job = self.scheduler._create_job(say_hello, depends_on="dependency", args=(), kwargs={}) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(["dependency"], job_from_queue._dependency_ids) + def test_create_job_with_timeout(self): """ Ensure that timeout is passed to RQ. From f783ade2a86198701fdb7ea741d2ebf38579de3d Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 18 Feb 2020 03:01:51 -0500 Subject: [PATCH 3/3] moved depends_on to end of args --- rq_scheduler/scheduler.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index 3176509..ef0b8a0 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -109,7 +109,7 @@ def stop(signum, frame): def _create_job(self, func, args=None, kwargs=None, commit=True, result_ttl=None, ttl=None, id=None, description=None, - depends_on=None, queue_name=None, timeout=None, meta=None): + queue_name=None, timeout=None, meta=None, depends_on=None): """ Creates an RQ job and saves it to Redis. The job is assigned to the given queue name if not None else it is assigned to scheduler queue by @@ -122,7 +122,7 @@ def _create_job(self, func, args=None, kwargs=None, commit=True, job = self.job_class.create( func, args=args, connection=self.connection, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id, - description=description, depends_on=depends_on, timeout=timeout, meta=meta) + description=description, timeout=timeout, meta=meta, depends_on=depends_on) if queue_name: job.origin = queue_name else: @@ -171,7 +171,7 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs): job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, - description=job_description, depends_on=depends_on, meta=meta, queue_name=queue_name) + description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(scheduled_time)}) return job @@ -193,15 +193,15 @@ def enqueue_in(self, time_delta, func, *args, **kwargs): job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, - description=job_description, depends_on=depends_on, meta=meta, queue_name=queue_name) + description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(datetime.utcnow() + time_delta)}) return job def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, result_ttl=None, ttl=None, - timeout=None, id=None, description=None, depends_on=None, - queue_name=None, meta=None): + timeout=None, id=None, description=None, + queue_name=None, meta=None, depends_on=None): """ Schedule a job to be periodically executed, at a certain interval. """ @@ -210,8 +210,8 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None, result_ttl = -1 job = self._create_job(func, args=args, kwargs=kwargs, commit=False, result_ttl=result_ttl, ttl=ttl, id=id, - description=description, depends_on=depends_on, queue_name=queue_name, - timeout=timeout, meta=meta) + description=description, queue_name=queue_name, + timeout=timeout, meta=meta, depends_on=depends_on) if interval is not None: job.meta['interval'] = int(interval) @@ -225,7 +225,7 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None, return job def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, - queue_name=None, id=None, timeout=None, description=None, depends_on=None, meta=None, use_local_timezone=False): + queue_name=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False, depends_on=None): """ Schedule a cronjob """ @@ -235,7 +235,7 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, # Otherwise the job would expire after 500 sec. job = self._create_job(func, args=args, kwargs=kwargs, commit=False, result_ttl=-1, id=id, queue_name=queue_name, - description=description, depends_on=depends_on, timeout=timeout, meta=meta) + description=description, timeout=timeout, meta=meta, depends_on=depends_on) job.meta['cron_string'] = cron_string job.meta['use_local_timezone'] = use_local_timezone