Skip to content

Commit

Permalink
feat(migrations): introduce long-running migration system using Celery (
Browse files Browse the repository at this point in the history
#5379)

### 📣 Summary
Added a new system for handling long-running migrations using Celery.

### 📖 Description
A new long-running migration system has been implemented, leveraging
Celery to process migrations in the background. This system is designed
to handle large-scale data updates that would otherwise cause require
significant downtime with regular Django migrations

### 💭 Notes

This long-running migration system leverages Celery for asynchronous
task processing. However, a similar (and likely more robust) feature is
being developed in Django, as detailed in [DEP 14 - Background
Workers](https://github.com/django/deps/blob/main/accepted/0014-background-workers.rst).
Once Django officially releases its built-in background worker system,
this custom solution will be phased out in favor of the native
implementation to better align with the Django ecosystem.
  • Loading branch information
noliveleger authored Dec 18, 2024
1 parent edd6352 commit c71dde8
Show file tree
Hide file tree
Showing 21 changed files with 472 additions and 1 deletion.
1 change: 1 addition & 0 deletions dependencies/pip/dev_requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pytest
pytest-cov
pytest-django
pytest-env
freezegun


# Kobocat
Expand Down
3 changes: 3 additions & 0 deletions dependencies/pip/dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ fabric==3.2.2
# via -r dependencies/pip/dev_requirements.in
flower==2.0.1
# via -r dependencies/pip/requirements.in
freezegun==1.5.1
# via -r dependencies/pip/dev_requirements.in
frozenlist==1.4.1
# via
# aiohttp
Expand Down Expand Up @@ -488,6 +490,7 @@ python-dateutil==2.9.0.post0
# -r dependencies/pip/requirements.in
# botocore
# celery
# freezegun
# pandas
# python-crontab
python3-openid==3.2.0
Expand Down
71 changes: 71 additions & 0 deletions kobo/apps/long_running_migrations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Long Running Migrations

This feature allows you to execute long-running migrations using Celery. Each migration will attempt to complete within the maximum time allowed by Celery (see settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT`). If it does not complete within this time, the periodic task will retry and resume the migration from where it left off, continuing until the long-running migration either successfully completes or raises an exception.

## How to Use

1. **Create your migration**
Define your migrations in the `jobs` folder. Each migration should have a unique name, following Django's migration naming convention (e.g., `0001_description`). The migration file must contain a function called `run()`.

2. **Register the migration**
Create a `LongRunningMigration` entry by running:

```python
LongRunningMigration.objects.create(name='0001_sample')
```

You can automate this step by adding it to a Django migration with `RunPython`.

```python
from django.db import migrations


def add_long_running_migration(apps, schema_editor):
LongRunningMigration = apps.get_model('long_running_migrations', 'LongRunningMigration') # noqa
LongRunningMigration.objects.create(
name='0001_sample'
)


def noop(*args, **kwargs):
pass


class Migration(migrations.Migration):

dependencies = [
('long_running_migrations', '0001_initial'),
]

operations = [
migrations.RunPython(add_long_running_migration, noop),
]


```



3. **Execute the migration**
Wait for the periodic task `execute_long_running_migrations` to run automatically or trigger it manually (beware of the lock, it can only run one at a time).


## Writing a good long-running migration

When writing long-running migrations, ensure they are both **atomic** and **tolerant** to interruptions at any point in their execution.

```python
# 2024-10-13
from django.db import transaction

def run():
for foo in Foo.objects.filter(is_barred=False): # Checks actually needs to run still
with transaction.atomic(): # Atomic!
foo.make_it_bar() # Perhaps this does multiple things that could succeed or fail
```

* Notice that if the task is interrupted, it will simply continue in the next run.

* Because tasks are slow, your code should run regardless of when the data migration takes place.

* Add a timestamp to your migration definition to help future developers identify when it can be safely removed (if needed).
Empty file.
8 changes: 8 additions & 0 deletions kobo/apps/long_running_migrations/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.contrib import admin

from .models import LongRunningMigration


@admin.register(LongRunningMigration)
class LongRunningMigrationAdmin(admin.ModelAdmin):
readonly_fields=('date_created', 'date_modified')
8 changes: 8 additions & 0 deletions kobo/apps/long_running_migrations/jobs/0001_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Generated on 2024-12-18

def run():
"""
Describe your long-running migration
"""

pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Generated on 2024-12-18
from django.db.models import Q, OuterRef, Subquery

from kobo.apps.openrosa.apps.logger.models import XForm
from kobo.apps.openrosa.apps.main.models import MetaData
from kpi.models.asset import Asset
from kpi.models.asset_file import AssetFile
from kobo.apps.project_ownership.models import Transfer


def run():
"""
Update OpenRosa MetaData objects that were not updated when project
ownership was transferred to someone else. This fixes a bug introduced
and later addressed in KPI (issue #5365).
"""

# Step 1: Retrieve all assets that were transferred since the bug was present and
# use media files
asset_uids = Asset.objects.filter(
Q(
pk__in=AssetFile.objects.values_list('asset_id', flat=True).exclude(
file_type=AssetFile.PAIRED_DATA
)
)
& Q(
pk__in=Transfer.objects.values_list('asset_id', flat=True).filter(
invite__date_created__date__gte='2024-09-15'
)
)
).values_list('uid', flat=True)

username_subquery = XForm.objects.filter(pk=OuterRef('xform_id')).values(
'user__username'
)[:1]

# Step 2: Iterate through relevant MetaData objects and fix their data_file fields
for metadata in (
MetaData.objects.filter(
xform_id__in=XForm.objects.filter(
kpi_asset_uid__in=list(asset_uids)
),
)
.exclude(
Q(data_file__startswith=Subquery(username_subquery))
| Q(data_file__isnull=True)
| Q(data_file='')
)
.select_related('xform', 'xform__user')
.iterator()
):
data_file = str(metadata.data_file)
old_username, *other_parts = data_file.split('/')
other_parts.insert(0, metadata.xform.user.username)
metadata.data_file = '/'.join(other_parts)
metadata.save(update_fields=['data_file'])
Empty file.
29 changes: 29 additions & 0 deletions kobo/apps/long_running_migrations/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 4.2.15 on 2024-12-13 19:53
from django.db import migrations, models

import kpi.models.abstract_models


class Migration(migrations.Migration):

initial = True

dependencies = [
]

operations = [
migrations.CreateModel(
name='LongRunningMigration',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('date_created', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)),
('date_modified', models.DateTimeField(default=kpi.models.abstract_models._get_default_datetime)),
('name', models.CharField(max_length=255, unique=True)),
('status', models.CharField(choices=[('created', 'Created'), ('in_progress', 'In Progress'), ('failed', 'Failed'), ('completed', 'Completed')], default='created', max_length=20)),
('attempts', models.PositiveSmallIntegerField(default=0)),
],
options={
'abstract': False,
},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 4.2.15 on 2024-12-18 20:00

from django.db import migrations


def add_long_running_migration(apps, schema_editor):
LongRunningMigration = apps.get_model('long_running_migrations', 'LongRunningMigration') # noqa
LongRunningMigration.objects.create(
name='0002_fix_project_ownership_transfer_with_media_files'
)


def noop(*args, **kwargs):
pass


class Migration(migrations.Migration):

dependencies = [
('long_running_migrations', '0001_initial'),
]

operations = [
migrations.RunPython(add_long_running_migration, noop),
]
Empty file.
108 changes: 108 additions & 0 deletions kobo/apps/long_running_migrations/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
from importlib.util import module_from_spec, spec_from_file_location

from django.conf import settings
from django.core.exceptions import SuspiciousOperation
from django.db import models

from kpi.models.abstract_models import AbstractTimeStampedModel
from kpi.utils.log import logging


class LongRunningMigrationStatus(models.TextChoices):
CREATED = 'created'
IN_PROGRESS = 'in_progress'
FAILED = 'failed'
COMPLETED = 'completed'


class LongRunningMigration(AbstractTimeStampedModel):

LONG_RUNNING_MIGRATIONS_DIR = os.path.join(
'kobo',
'apps',
'long_running_migrations',
'jobs'
)

name = models.CharField(max_length=255, unique=True)
status = models.CharField(
default=LongRunningMigrationStatus.CREATED,
choices=LongRunningMigrationStatus.choices,
max_length=20,
)
attempts = models.PositiveSmallIntegerField(default=0)

def clean(self):
super().clean()
if '..' in self.name or '/' in self.name or '\\' in self.name:
raise SuspiciousOperation(
f"Invalid migration name '{self.name}'. "
f"Migration names cannot contain directory traversal characters "
f"such as '..', '/', or '\\'."
)

def execute(self):
# Skip execution if the migration is already completed
if self.status == LongRunningMigrationStatus.COMPLETED:
return

if not (module := self._load_module()):
return

self.status = LongRunningMigrationStatus.IN_PROGRESS
self.attempts += 1
self.save(update_fields=['status', 'attempts'])

try:
module.run()
except Exception as e:
# Log the error and update the status to 'failed'
logging.error(f'LongRunningMigration.execute(): {str(e)}')
self.status = LongRunningMigrationStatus.FAILED
self.save(update_fields=['status'])
return

self.status = LongRunningMigrationStatus.COMPLETED
self.save(update_fields=['status'])

def save(self, **kwargs):

self.clean()

if self._state.adding:
file_path = os.path.join(
settings.BASE_DIR, self.LONG_RUNNING_MIGRATIONS_DIR, f'{self.name}.py'
)
if not os.path.exists(file_path):
raise ValueError('Task does not exist in tasks directory')
super().save(**kwargs)

def _load_module(self):
"""
This function allows you to load a Python module from a file path even if
the module's name does not follow Python's standard naming conventions
(e.g., starting with numbers or containing special characters). Normally,
Python identifiers must adhere to specific rules, but this method bypasses
those restrictions by dynamically creating a module from its file.
"""
module_path = f'{self.LONG_RUNNING_MIGRATIONS_DIR}/{self.name}.py'
if not os.path.exists(f'{settings.BASE_DIR}/{module_path}'):
logging.error(
f'LongRunningMigration._load_module():'
f'File not found `{module_path}`'
)
return

spec = spec_from_file_location(self.name, module_path)
try:
module = module_from_spec(spec)
except (ModuleNotFoundError, AttributeError):
logging.error(
f'LongRunningMigration._load_module():'
f'Failed to import migration module `{self.name}`'
)
return

spec.loader.exec_module(module)
return module
41 changes: 41 additions & 0 deletions kobo/apps/long_running_migrations/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from kobo.celery import celery_app

from django.conf import settings
from django.core.cache import cache
from django.db.models import Q
from django.utils import timezone
from dateutil.relativedelta import relativedelta

from .models import LongRunningMigration, LongRunningMigrationStatus


@celery_app.task(
queue='kpi_low_priority_queue',
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
)
def execute_long_running_migrations():
lock_key = 'execute_long_running_migrations'

if cache.add(
lock_key, 'true', timeout=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT
):
try:
# Adding an offset to account for potential delays in task execution and
# clock drift between the Celery workers and the database, ensuring tasks
# are not prematurely skipped.
offset = 5 * 60
task_expiry_time = timezone.now() - relativedelta(
seconds=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT + offset
)
# Run tasks that were just created or are in progress but have exceeded
# the maximum runtime allowed for a Celery task, causing Celery to terminate
# them and raise a SoftTimeLimitExceeded exception.
for migration in LongRunningMigration.objects.filter(
Q(status=LongRunningMigrationStatus.CREATED)
| Q(status=LongRunningMigrationStatus.IN_PROGRESS)
& Q(date_modified__lte=task_expiry_time)
).order_by('date_created'):
migration.execute()
finally:
cache.delete(lock_key)
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def run():
raise Exception
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def run():
print('hello from long running migration')
Loading

0 comments on commit c71dde8

Please sign in to comment.