Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBTP-743 - Add celery beat service #57

Merged
merged 12 commits into from
Mar 15, 2024
Merged
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
web: python migrate.py && python manage.py load_defaults && opentelemetry-instrument gunicorn -b 0.0.0.0:$PORT demodjango.wsgi:application
celery-worker: celery --app demodjango.celery worker --task-events --loglevel INFO
celery-beat: celery --app demodjango.celery beat --loglevel INFO
check: python manage.py check
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ By default, it will use SQLite for the backend database and the app will work fi
To connect to an Aurora Postgres instance, set the following env var:

```
DATABASE_CREDENTIALS = DATABASE_CREDENTIALS_STRING
AURORA_POSTGRES_CREDENTIALS = AURORA_POSTGRES_CREDENTIALS_STRING
```

## RDS Postgres

To connect to an RDS Postgres instance, set the following env var:

```
RDS_DATABASE_CREDENTIALS = "{"db_credential_key": "db_credential_value"}"
RDS_POSTGRES_CREDENTIALS = "{"db_credential_key": "db_credential_value"}"
```

## Redis
Expand Down
21 changes: 21 additions & 0 deletions app/migrations/0003_scheduledtask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Generated by Django 4.2.9 on 2024-03-12 11:30

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('app', '0002_sampletable_sample_email'),
]

operations = [
migrations.CreateModel(
name='ScheduledTask',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('taskid', models.CharField(max_length=50)),
('timestamp', models.DateTimeField()),
],
),
]
5 changes: 5 additions & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ class SampleTable(models.Model):
sampleid = models.CharField(max_length=90)
sample_name = models.CharField(max_length=60)
sample_email = models.EmailField(max_length=256, null=True)


class ScheduledTask(models.Model):
taskid = models.CharField(max_length=50)
timestamp = models.DateTimeField()
32 changes: 30 additions & 2 deletions app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

from celery_worker.tasks import demodjango_task
from .check.check_http import HTTPCheck

from .util import render_connection_info

logger = logging.getLogger("django")

CELERY = 'celery'
BEAT = 'beat'
GIT_INFORMATION = 'git_information'
OPENSEARCH = 'opensearch'
POSTGRES_AURORA = 'postgres_aurora'
Expand All @@ -30,6 +32,7 @@
HTTP_CONNECTION = 'http'

ALL_CHECKS = {
BEAT: 'Celery Beat',
CELERY: 'Celery Worker',
GIT_INFORMATION: 'Git information',
OPENSEARCH: 'OpenSearch',
Expand All @@ -42,6 +45,8 @@
HTTP_CONNECTION: 'HTTP Checks',
}

RDS_POSTGRES_CREDENTIALS = os.environ.get("RDS_POSTGRES_CREDENTIALS", "")


def index(request):
logger.info("Rendering landing page")
Expand All @@ -62,6 +67,7 @@ def index(request):
S3: s3_bucket_check,
OPENSEARCH: opensearch_check,
CELERY: celery_worker_check,
BEAT: celery_beat_check,
HTTP_CONNECTION: http_check,
}

Expand Down Expand Up @@ -91,7 +97,10 @@ def server_time_check():
def postgres_rds_check():
addon_type = ALL_CHECKS[POSTGRES_RDS]
try:
with connections['rds'].cursor() as c:
if not RDS_POSTGRES_CREDENTIALS:
raise Exception("No RDS database")

with connections['default'].cursor() as c:
c.execute('SELECT version()')
return render_connection_info(addon_type, True, c.fetchone()[0])
except Exception as e:
Expand All @@ -111,7 +120,11 @@ def postgres_aurora_check():
def sqlite_check():
addon_type = ALL_CHECKS[SQLITE]
try:
with connections['default'].cursor() as c:
db_name = "default"
if RDS_POSTGRES_CREDENTIALS:
db_name = "sqlite"

with connections[db_name].cursor() as c:
c.execute('SELECT SQLITE_VERSION()')
return render_connection_info(addon_type, True, c.fetchone()[0])
except Exception as e:
Expand Down Expand Up @@ -193,6 +206,21 @@ def get_result_from_celery_backend():
return render_connection_info(addon_type, False, str(e))


def celery_beat_check():
from .models import ScheduledTask
addon_type = ALL_CHECKS[BEAT]

try:
if not RDS_POSTGRES_CREDENTIALS:
raise Exception("Database not found")

latest_task = ScheduledTask.objects.all().order_by('-timestamp').first()
connection_info = f"Latest task scheduled with task_id {latest_task.taskid} at {latest_task.timestamp}"
return render_connection_info(addon_type, True, connection_info)
except Exception as e:
return render_connection_info(addon_type, False, str(e))


def git_information():
git_commit = os.environ.get("GIT_COMMIT", "Unknown")
git_branch = os.environ.get("GIT_BRANCH", "Unknown")
Expand Down
17 changes: 17 additions & 0 deletions celery_worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
from datetime import datetime
import logging

from celery import shared_task


logger = logging.getLogger("django")


@shared_task()
def demodjango_task(timestamp):
logger.info("Running demodjango_task")
return f"demodjango_task queued at {timestamp}"


@shared_task(bind=True)
def demodjango_scheduled_task(self):
from app.models import ScheduledTask

timestamp = datetime.utcnow()

task = ScheduledTask()
task.taskid = self.request.id
task.timestamp = timestamp
task.save()

logger.info(f"Running demodjango_scheduled_task")
return f"demodjango_scheduled_task queued at {timestamp}"
7 changes: 7 additions & 0 deletions demodjango/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@
celery_app.autodiscover_tasks()

celery_app = healthcheck.setup(celery_app)

celery_app.conf.beat_schedule = {
"schedule-demodjango-task": {
"task": "celery_worker.tasks.demodjango_scheduled_task",
"schedule": 30.0,
},
}
40 changes: 24 additions & 16 deletions demodjango/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
# Application definition

INSTALLED_APPS = [
'django_celery_beat',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
Expand Down Expand Up @@ -149,25 +150,31 @@

sqlite_db_root = BASE_DIR if is_copilot() else Path(tempfile.gettempdir())

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': sqlite_db_root / "demodjango.sqlite3",
# Django requires a default database. If RDS is present make it the default
# database to enable celery-beat, otherwise use SQLite
RDS_POSTGRES_CREDENTIALS = os.getenv("RDS_POSTGRES_CREDENTIALS", "")
if RDS_POSTGRES_CREDENTIALS:
DATABASES = {
"default": dj_database_url.config(
default=database_url_from_env("RDS_POSTGRES_CREDENTIALS")
),
"sqlite": {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': sqlite_db_root / "demodjango.sqlite3",
}
}
else:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': sqlite_db_root / "demodjango.sqlite3",
}
}
}

RDS_DATABASE_CREDENTIALS = os.getenv("RDS_DATABASE_CREDENTIALS", "")

if RDS_DATABASE_CREDENTIALS:
DATABASES["rds"] = dj_database_url.config(
default=database_url_from_env("RDS_DATABASE_CREDENTIALS")
)

DATABASE_CREDENTIALS = os.getenv("DATABASE_CREDENTIALS", "")

if DATABASE_CREDENTIALS:
AURORA_POSTGRES_CREDENTIALS = os.getenv("AURORA_POSTGRES_CREDENTIALS", "")
if AURORA_POSTGRES_CREDENTIALS:
DATABASES['aurora'] = dj_database_url.config(
default=database_url_from_env("DATABASE_CREDENTIALS")
default=database_url_from_env("AURORA_POSTGRES_CREDENTIALS")
)

# Password validation
Expand Down Expand Up @@ -222,3 +229,4 @@
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_RESULT_SERIALIZER = "json"
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"
26 changes: 24 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ services:
environment:
ALLOWED_HOSTS: '*'
AWS_ENDPOINT_URL: 'http://s3:9090'
DATABASE_CREDENTIALS: '{"password":"pgSecretPassword","dbname":"main","engine":"postgres","port":5432,"dbInstanceIdentifier":"xxx","host":"postgres-aurora","username":"postgres"}'
AURORA_POSTGRES_CREDENTIALS: '{"password":"pgSecretPassword","dbname":"main","engine":"postgres","port":5432,"dbInstanceIdentifier":"xxx","host":"postgres-aurora","username":"postgres"}'
DEBUG: true
DJANGO_SECRET_KEY: this_is_an_example_use_a_proper_key_in_production
OPENSEARCH_ENDPOINT: 'http://opensearch:9200'
RDS_DATABASE_CREDENTIALS: '{"password":"pgSecretPassword","dbname":"main","engine":"postgres","port":5432,"dbInstanceIdentifier":"xxx","host":"postgres-rds","username":"postgres"}'
RDS_POSTGRES_CREDENTIALS: '{"password":"pgSecretPassword","dbname":"main","engine":"postgres","port":5432,"dbInstanceIdentifier":"xxx","host":"postgres-rds","username":"postgres"}'
REDIS_ENDPOINT: 'redis://redis:6379'
S3_BUCKET_NAME: test-bucket
AWS_ACCESS_KEY_ID: access-key-id
Expand Down Expand Up @@ -66,11 +66,33 @@ services:
start_period: 5s
depends_on:
- redis
- postgres-rds
environment:
REDIS_ENDPOINT: redis://redis:6379
DEBUG: true
DJANGO_SECRET_KEY: this_is_an_example_use_a_proper_key_in_production
DJANGO_SETTINGS_MODULE: demodjango.settings
RDS_POSTGRES_CREDENTIALS: '{"password":"pgSecretPassword","dbname":"main","engine":"postgres","port":5432,"dbInstanceIdentifier":"xxx","host":"postgres-rds","username":"postgres"}'

celery-beat:
build:
context: .
cache_from:
- demodjango/application:latest
image: demodjango/application:latest
command: celery --app demodjango.celery beat --loglevel INFO
entrypoint: ''
volumes:
- .:/app
depends_on:
- redis
- postgres-rds
environment:
REDIS_ENDPOINT: redis://redis:6379
DEBUG: true
DJANGO_SECRET_KEY: this_is_an_example_use_a_proper_key_in_production
DJANGO_SETTINGS_MODULE: demodjango.settings
RDS_POSTGRES_CREDENTIALS: '{"password":"pgSecretPassword","dbname":"main","engine":"postgres","port":5432,"dbInstanceIdentifier":"xxx","host":"postgres-rds","username":"postgres"}'

postgres-rds:
image: postgres
Expand Down
2 changes: 1 addition & 1 deletion entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
dockerize -wait tcp://opensearch:9200 -timeout 60s

python manage.py migrate
python manage.py migrate --database rds
python manage.py migrate --database sqlite
python manage.py migrate --database aurora
python manage.py load_defaults

Expand Down
7 changes: 5 additions & 2 deletions migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import subprocess

ACTIVE_CHECKS = [x.strip() for x in os.getenv("ACTIVE_CHECKS", "").split(",")]
RDS_POSTGRES_CREDENTIALS = os.getenv("RDS_POSTGRES_CREDENTIALS", "")

migrations = ["python manage.py migrate"]
if RDS_POSTGRES_CREDENTIALS:
migrations = ["python manage.py migrate", "python manage.py migrate --database sqlite"]
else:
migrations = ["python manage.py migrate"]

optional_migrations = {
"postgres_rds": "python manage.py migrate --database rds",
"postgres_aurora": "python manage.py migrate --database aurora",
}

Expand Down
Loading