Skip to content

Commit

Permalink
Add new function to handle etas and limits together (celery#4251)
Browse files Browse the repository at this point in the history
* Add new function to handle etas and limits together

* Adding unit test

* Fixing indentation
  • Loading branch information
arpanshah29 authored and ask committed Sep 12, 2017
1 parent 9ca61fa commit ca962fa
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 13 deletions.
6 changes: 6 additions & 0 deletions celery/worker/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ def _limit_task(self, request, bucket, tokens):
return bucket.add(request)
return self._schedule_bucket_request(request, bucket, tokens)

def _limit_post_eta(self, request, bucket, tokens):
self.qos.decrement_eventually()
if bucket.contents:
return bucket.add(request)
return self._schedule_bucket_request(request, bucket, tokens)

def start(self):
blueprint = self.blueprint
while blueprint.state not in STOP_CONDITIONS:
Expand Down
34 changes: 21 additions & 13 deletions celery/worker/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def default(task, app, consumer,
get_bucket = consumer.task_buckets.__getitem__
handle = consumer.on_task_request
limit_task = consumer._limit_task
limit_post_eta = consumer._limit_post_eta
body_can_be_buffer = consumer.pool.body_can_be_buffer
Request = symbol_by_name(task.Request)
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)
Expand Down Expand Up @@ -123,6 +124,8 @@ def task_message_handler(message, body, ack, reject, callbacks,
expires=req.expires and req.expires.isoformat(),
)

bucket = None
eta = None
if req.eta:
try:
if req.utc:
Expand All @@ -133,17 +136,22 @@ def task_message_handler(message, body, ack, reject, callbacks,
error("Couldn't convert ETA %r to timestamp: %r. Task: %r",
req.eta, exc, req.info(safe=True), exc_info=True)
req.reject(requeue=False)
else:
consumer.qos.increment_eventually()
call_at(eta, apply_eta_task, (req,), priority=6)
else:
if rate_limits_enabled:
bucket = get_bucket(task.name)
if bucket:
return limit_task(req, bucket, 1)
task_reserved(req)
if callbacks:
[callback(req) for callback in callbacks]
handle(req)

if rate_limits_enabled:
bucket = get_bucket(task.name)

if eta and bucket:
consumer.qos.increment_eventually()
return call_at(eta, limit_post_eta, (req, bucket, 1),
priority=6)
if eta:
consumer.qos.increment_eventually()
call_at(eta, apply_eta_task, (req,), priority=6)
return task_message_handler
if bucket:
return limit_task(req, bucket, 1)

task_reserved(req)
if callbacks:
[callback(req) for callback in callbacks]
handle(req)
return task_message_handler
15 changes: 15 additions & 0 deletions t/unit/worker/test_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ def was_rate_limited(self):
assert not self.was_reserved()
return self.consumer._limit_task.called

def was_limited_with_eta(self):
assert not self.was_reserved()
called = self.consumer.timer.call_at.called
if called:
assert self.consumer.timer.call_at.call_args[0][1] == \
self.consumer._limit_post_eta
return called

def was_scheduled(self):
assert not self.was_reserved()
assert not self.was_rate_limited()
Expand Down Expand Up @@ -186,6 +194,13 @@ def test_when_rate_limited(self):
C()
assert C.was_rate_limited()

def test_when_rate_limited_with_eta(self):
task = self.add.s(2, 2).set(countdown=10)
with self._context(task, rate_limits=True, limit='1/m') as C:
C()
assert C.was_limited_with_eta()
C.consumer.qos.increment_eventually.assert_called_with()

def test_when_rate_limited__limits_disabled(self):
task = self.add.s(2, 2)
with self._context(task, rate_limits=False, limit='1/m') as C:
Expand Down

0 comments on commit ca962fa

Please sign in to comment.