Skip to content

Commit

Permalink
replace Celery with Dramatiq.
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Sep 26, 2019
1 parent f47084f commit bba151e
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 73 deletions.
7 changes: 2 additions & 5 deletions core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # NOQA
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings.project')
17 changes: 0 additions & 17 deletions core/celery.py

This file was deleted.

16 changes: 0 additions & 16 deletions core/settings/celery.py

This file was deleted.

16 changes: 6 additions & 10 deletions core/settings/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import os

import dj_database_url
import dramatiq
from dramatiq.brokers.redis import RedisBroker

from .base import *
from .celery import * # NOQA
from .contrib import *
from .utils import ABS_PATH
from hdx.hdx_configuration import Configuration

Expand All @@ -20,14 +22,13 @@
'utils',
)

redis_broker = RedisBroker(host="localhost", port=6379)
dramatiq.set_broker(redis_broker)

DATABASES = {}

DATABASES['default'] = dj_database_url.config(default='postgis:///exports', conn_max_age=500)

CELERY_IMPORTS = (
'tasks.task_runners'
)

TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
Expand Down Expand Up @@ -158,11 +159,6 @@
'propagate': True,
'level': 'DEBUG',
},
'celery.task': {
'handlers': ['console'],
'propagate': True,
'level': 'DEBUG',
},
'jobs': {
'handlers': ['console'],
'propagate': True,
Expand Down
3 changes: 1 addition & 2 deletions doc/setup-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ yarn start # will watch for changes and re-compile as necessary
DEBUG=True python manage.py runserver

# in a different shell
DEBUG=True celery -A core worker
DEBUG=True DJANGO_SETTINGS_MODULE=core.settings.project dramatiq tasks.task_runners -p 1
```

[`direnv`](https://direnv.net/) is a useful tool for managing environment variables using a `.env` file.
Expand Down Expand Up @@ -110,7 +110,6 @@ Most of these environment variables have reasonable default settings.
* `OSMAND_MAP_CREATOR_DIR` path to directory where OsmAndMapCreator is installed
* `GARMIN_CONFIG`, `GARMIN_MKGMAP` absolute paths to garmin JARs
* `OVERPASS_API_URL` url of Overpass api endpoint
* `BROKER_URL` Celery broker URL. Defaults to `amqp://guest:guest@localhost:5672/`
* `DATABASE_URL` Database URL. Defaults to `postgres:///exports`
* `DEBUG` Whether to enable debug mode. Defaults to `False` (production).
* `DJANGO_ENV` Django environment. Set to `development` to enable development tools and email logging to console.
Expand Down
2 changes: 1 addition & 1 deletion jobs/management/commands/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def handle(self, *args, **kwargs):
now.day == 1 and
region.schedule_hour == now.hour and
delta > timedelta(hours=2)):
ExportTaskRunner().run_task(job_uid=region.job.uid,queue="celery-scheduled")
ExportTaskRunner().run_task(job_uid=region.job.uid,ondemand=False)



4 changes: 2 additions & 2 deletions ops/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ Determine the disk UUID with `sudo -i blkid` and add it to fstab to mount on boo

### Logging

Systemd's `journalctl` should be used to view logs. To tail celery logs, run: `journalctl -fu
docker.celery`.
Systemd's `journalctl` should be used to view logs. To view worker logs, run: `journalctl -fu
worker-ondemand` or `worker-scheduled`.

### Backups

Expand Down
8 changes: 4 additions & 4 deletions ops/packer/packer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
},
{
"type":"file",
"source": "celery.service",
"destination": "/tmp/celery.service"
"source": "worker-ondemand.service",
"destination": "/tmp/worker-ondemand.service"
},
{
"type":"file",
"source": "celery-scheduled.service",
"destination": "/tmp/celery-scheduled.service"
"source": "worker-scheduled.service",
"destination": "/tmp/worker-scheduled.service"
},
{
"type":"file",
Expand Down
4 changes: 2 additions & 2 deletions ops/packer/provision.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ su - postgres -c 'psql exports -c "create extension hstore;"'

mv /tmp/nginx.conf /etc/nginx/nginx.conf
mv /tmp/django.service /etc/systemd/system/django.service
mv /tmp/celery.service /etc/systemd/system/celery.service
mv /tmp/celery-scheduled.service /etc/systemd/system/celery-scheduled.service
mv /tmp/worker-ondemand.service /etc/systemd/system/worker-ondemand.service
mv /tmp/worker-scheduled.service /etc/systemd/system/worker-scheduled.service

yarn global add tl @mapbox/mbtiles @mapbox/tilelive @mapbox/tilejson tilelive-http --prefix /usr/local/
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[Unit]
Description=Celery task worker
Description=On-demand tasks
After=syslog.target

[Service]
Expand All @@ -19,9 +19,10 @@ Environment=GENERATOR_TOOL=/usr/local/bin/generator_tool
Environment=SYNC_TO_HDX=True
Environment=HDX_SITE=demo
Environment=HDX_API_KEY=
DJANGO_SETTINGS_MODULE=core.settings.project
User=exports
WorkingDirectory=/home/exports/osm-export-tool/
ExecStart=/home/exports/venv/bin/celery -A core worker -c 3
ExecStart=/home/exports/venv/bin/dramatiq tasks.task_runners --processes 3 --threads 1 --queues default
Restart=on-failure

[Install]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[Unit]
Description=Celery task worker
Description=Scheduled tasks
After=syslog.target

[Service]
Expand All @@ -16,9 +16,10 @@ Environment=GARMIN_MKGMAP=/usr/local/mkgmap/mkgmap.jar
Environment=OSMAND_MAP_CREATOR_DIR=/usr/local/OsmAndMapCreator
Environment=OSMCONVERT=/usr/bin/osmconvert
Environment=GENERATOR_TOOL=/usr/local/bin/generator_tool
Environment=DJANGO_SETTINGS_MODULE=core.settings.project
User=exports
WorkingDirectory=/home/exports/osm-export-tool/
ExecStart=/home/exports/venv/bin/celery -A core worker -Q celery-scheduled -c 1 -n scheduled
ExecStart=/home/exports/venv/bin/dramatiq tasks.task_runners --processes 1 --threads 1 --queues scheduled
Restart=on-failure

[Install]
Expand Down
4 changes: 1 addition & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
Django~=1.11.3
billiard==3.6.0
cachetools
celery==4.3.0
dj_database_url
django-oauth-toolkit==1.1.2
django-cors-middleware
djangorestframework-gis~=0.11.2
djangorestframework~=3.6.3
hdx-python-api==2.1.5
kombu==4.4.0
mercantile~=0.10.0
psycopg2==2.8.3
python-openid==2.2.5
Expand All @@ -23,3 +20,4 @@ pyparsing==2.2.0
oauthlib==2.0.7
gunicorn==19.9.0
osmium~=2.15.2
dramatiq[redis,watch]~=1.7.0
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ skip = migrations, static
not_skip = __init__.py
sections = FUTURE,STDLIB,THIRDPARTY,DJANGO,DJANGOTHIRDPARTY,FIRSTPARTY,LOCALFOLDER
known_django = django
known_djangothirdparty = celery, rest_framework, django_fitters
known_djangothirdparty = rest_framework, django_fitters
27 changes: 21 additions & 6 deletions tasks/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
import os
import shutil
import traceback

import dramatiq
import django
from django.apps import apps
from django.conf import settings

if not apps.ready and not settings.configured:
django.setup()

from django.contrib.gis.geos import GEOSGeometry
from django.utils import timezone
from django.utils.text import slugify
from django.db import IntegrityError

from celery import shared_task
from raven import Client

from jobs.models import Job, HDXExportRegion
Expand All @@ -35,7 +40,7 @@
LOG = logging.getLogger(__name__)

class ExportTaskRunner(object):
def run_task(self, job_uid=None, user=None, queue="celery"): # noqa
def run_task(self, job_uid=None, user=None, ondemand=True): # noqa
LOG.debug('Running Job with id: {0}'.format(job_uid))
job = Job.objects.get(uid=job_uid)
if not user:
Expand All @@ -53,11 +58,21 @@ def run_task(self, job_uid=None, user=None, queue="celery"): # noqa
)
LOG.debug('Saved task: {0}'.format(format_name))

run_task_remote.apply_async([run_uid],queue=queue)
if ondemand:
run_task_async_ondemand.send(run_uid)
else:
run_task_async_scheduled.send(run_uid)
return run

@shared_task(bind=True, ignore_result=True)
def run_task_remote(self, run_uid): # noqa
@dramatiq.actor(max_retries=0,queue_name='default')
def run_task_async_ondemand(run_uid):
run_task_remote(run_uid)

@dramatiq.actor(max_retries=0,queue_name='scheduled')
def run_task_async_scheduled(run_uid):
run_task_remote(run_uid)

def run_task_remote(run_uid):
stage_dir = os.path.join(settings.EXPORT_STAGING_ROOT, str(run_uid)) + '/'
try:
run = ExportRun.objects.get(uid=run_uid)
Expand Down

0 comments on commit bba151e

Please sign in to comment.