diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index 775a45c..ef0b8a0 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -109,7 +109,7 @@ def stop(signum, frame): def _create_job(self, func, args=None, kwargs=None, commit=True, result_ttl=None, ttl=None, id=None, description=None, - queue_name=None, timeout=None, meta=None): + queue_name=None, timeout=None, meta=None, depends_on=None): """ Creates an RQ job and saves it to Redis. The job is assigned to the given queue name if not None else it is assigned to scheduler queue by @@ -122,7 +122,7 @@ def _create_job(self, func, args=None, kwargs=None, commit=True, job = self.job_class.create( func, args=args, connection=self.connection, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id, - description=description, timeout=timeout, meta=meta) + description=description, timeout=timeout, meta=meta, depends_on=depends_on) if queue_name: job.origin = queue_name else: @@ -144,6 +144,9 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs): - job_ttl - job_result_ttl - job_description + - depends_on + - meta + - queue_name Usage: @@ -162,12 +165,13 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs): job_ttl = kwargs.pop('job_ttl', None) job_result_ttl = kwargs.pop('job_result_ttl', None) job_description = kwargs.pop('job_description', None) + depends_on = kwargs.pop('depends_on', None) meta = kwargs.pop('meta', None) queue_name = kwargs.pop('queue_name', None) job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, - description=job_description, meta=meta, queue_name=queue_name) + description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(scheduled_time)}) return job @@ -183,20 +187,21 @@ def enqueue_in(self, time_delta, func, *args, **kwargs): job_ttl = kwargs.pop('job_ttl', None) job_result_ttl = kwargs.pop('job_result_ttl', None) job_description = kwargs.pop('job_description', None) + depends_on = kwargs.pop('depends_on', None) meta = kwargs.pop('meta', None) queue_name = kwargs.pop('queue_name', None) job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout, id=job_id, result_ttl=job_result_ttl, ttl=job_ttl, - description=job_description, meta=meta, queue_name=queue_name) + description=job_description, meta=meta, queue_name=queue_name, depends_on=depends_on) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(datetime.utcnow() + time_delta)}) return job def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, result_ttl=None, ttl=None, - timeout=None, id=None, description=None, queue_name=None, - meta=None): + timeout=None, id=None, description=None, + queue_name=None, meta=None, depends_on=None): """ Schedule a job to be periodically executed, at a certain interval. """ @@ -206,7 +211,7 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None, job = self._create_job(func, args=args, kwargs=kwargs, commit=False, result_ttl=result_ttl, ttl=ttl, id=id, description=description, queue_name=queue_name, - timeout=timeout, meta=meta) + timeout=timeout, meta=meta, depends_on=depends_on) if interval is not None: job.meta['interval'] = int(interval) @@ -220,7 +225,7 @@ def schedule(self, scheduled_time, func, args=None, kwargs=None, return job def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, - queue_name=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False): + queue_name=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False, depends_on=None): """ Schedule a cronjob """ @@ -230,7 +235,7 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, # Otherwise the job would expire after 500 sec. job = self._create_job(func, args=args, kwargs=kwargs, commit=False, result_ttl=-1, id=id, queue_name=queue_name, - description=description, timeout=timeout, meta=meta) + description=description, timeout=timeout, meta=meta, depends_on=depends_on) job.meta['cron_string'] = cron_string job.meta['use_local_timezone'] = use_local_timezone diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 1af1ded..4aeb45d 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -101,6 +101,14 @@ def test_create_job_with_description(self): job_from_queue = Job.fetch(job.id, connection=self.testconn) self.assertEqual('description', job_from_queue.description) + def test_create_job_with_depends_on(self): + """ + Ensure that depends_on is passed to RQ. + """ + job = self.scheduler._create_job(say_hello, depends_on="dependency", args=(), kwargs={}) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(["dependency"], job_from_queue._dependency_ids) + def test_create_job_with_timeout(self): """ Ensure that timeout is passed to RQ.