Skip to content

Commit

Permalink
Merge pull request #3989 from kobotoolbox/3988-celery-priority-queues
Browse files Browse the repository at this point in the history
Move background tasks for REST Services to their own queue
  • Loading branch information
jnm authored Sep 9, 2022
2 parents a86a6d9 + a2bc740 commit fdfcae7
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ RUN mkdir -p "${NGINX_STATIC_DIR}" && \
mkdir -p ${CELERY_PID_DIR} && \
mkdir -p ${SERVICES_DIR}/uwsgi && \
mkdir -p ${SERVICES_DIR}/celery && \
mkdir -p ${SERVICES_DIR}/celery_low_priority && \
mkdir -p ${SERVICES_DIR}/celery_beat && \
mkdir -p "${INIT_PATH}"

Expand Down Expand Up @@ -155,6 +156,7 @@ RUN rm -rf /etc/runit/runsvdir/default/getty-tty*
# Create symlinks for runsv services
RUN ln -s "${KPI_SRC_DIR}/docker/run_uwsgi.bash" "${SERVICES_DIR}/uwsgi/run" && \
ln -s "${KPI_SRC_DIR}/docker/run_celery.bash" "${SERVICES_DIR}/celery/run" && \
ln -s "${KPI_SRC_DIR}/docker/run_celery_low_priority.bash" "${SERVICES_DIR}/celery_low_priority/run" && \
ln -s "${KPI_SRC_DIR}/docker/run_celery_beat.bash" "${SERVICES_DIR}/celery_beat/run"


Expand Down
5 changes: 3 additions & 2 deletions docker/run_celery.bash
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -e
source /etc/profile

# Run the main Celery worker (will not process `sync_kobocat_xforms` jobs).
# Run the main Celery worker (will not process low priority jobs).
# Start 2 processes by default; this will be overridden later, in Python code,
# according to the user's preference saved by django-constance
cd "${KPI_SRC_DIR}"
Expand All @@ -11,7 +11,8 @@ exec celery -A kobo worker --loglevel=info \
--hostname=kpi_main_worker@%h \
--logfile=${KPI_LOGS_DIR}/celery.log \
--pidfile=/tmp/celery.pid \
--exclude-queues=sync_kobocat_xforms_queue \
--queues=kpi_queue \
--exclude-queues=kpi_low_priority_queue \
--uid=${UWSGI_USER} \
--gid=${UWSGI_GROUP} \
--autoscale 2,2
18 changes: 18 additions & 0 deletions docker/run_celery_low_priority.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
set -e
source /etc/profile

# Run the main Celery worker (will process only low priority jobs).
# Start 2 processes by default; this will be overridden later, in Python code,
# according to the user's preference saved by django-constance
cd "${KPI_SRC_DIR}"

exec celery -A kobo worker --loglevel=info \
--hostname=kpi_main_worker@%h \
--logfile=${KPI_LOGS_DIR}/celery_low_priority.log \
--pidfile=/tmp/celery_low_priority.pid \
--queues=kpi_low_priority_queue \
--exclude-queues=kpi_queue \
--uid=${UWSGI_USER} \
--gid=${UWSGI_GROUP} \
--autoscale 2,2
2 changes: 1 addition & 1 deletion kobo/apps/hook/tests/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _create_periodic_task(self):
def test_notifications(self):
self._create_periodic_task()
first_log_response = self._send_and_fail()
failures_reports.delay()
failures_reports.apply_async(queue='kpi_low_priority_queue')
self.assertEqual(len(mail.outbox), 1)

expected_record = {
Expand Down
4 changes: 3 additions & 1 deletion kobo/apps/hook/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def call_services(asset: 'kpi.models.asset.Asset', submission_id: int):
submission_id=submission_id, hook_id=hook_id
).exists():
success = True
service_definition_task.delay(hook_id, submission_id)
service_definition_task.apply_async(
queue='kpi_low_priority_queue', args=(hook_id, submission_id)
)

return success
4 changes: 3 additions & 1 deletion kobo/apps/hook/views/v2/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ def retry(self, request, uid=None, *args, **kwargs):
# Mark all logs as PENDING
HookLog.objects.filter(id__in=hooklogs_ids).update(status=HOOK_LOG_PENDING)
# Delegate to Celery
retry_all_task.delay(hooklogs_ids)
retry_all_task.apply_async(
queue='kpi_low_priority_queue', args=(hooklogs_ids,)
)
response.update({
"pending_uids": hooklogs_uids
})
Expand Down
2 changes: 1 addition & 1 deletion kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ def __init__(self, *args, **kwargs):
"send-hooks-failures-reports": {
"task": "kobo.apps.hook.tasks.failures_reports",
"schedule": crontab(hour=0, minute=0),
'options': {'queue': 'kpi_queue'}
'options': {'queue': 'kpi_low_priority_queue'}
},
}

Expand Down

0 comments on commit fdfcae7

Please sign in to comment.