Skip to content

Commit

Permalink
Merge pull request #4443 from kobotoolbox/improve-import-export-tasks…
Browse files Browse the repository at this point in the history
…-deletion

Improve long-running trash-bin tasks duration
  • Loading branch information
jnm authored May 22, 2023
2 parents 2ffec2c + e2e0c03 commit 47cd19b
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 5 deletions.
5 changes: 4 additions & 1 deletion kobo/apps/superuser_stats/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ def generate_domain_report(output_filename: str, start_date: str, end_date: str)
writer.writerow(row)


@shared_task(soft_time_limit=4200, time_limit=4260)
@shared_task(
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT
)
def generate_forms_count_by_submission_range(output_filename: str):
# List of submissions count ranges
ranges = [
Expand Down
7 changes: 7 additions & 0 deletions kobo/apps/trash_bin/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from celery.signals import task_failure, task_retry
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import transaction
from django.db.models.signals import post_delete
Expand Down Expand Up @@ -33,6 +34,9 @@
retry_backoff_max=600,
max_retries=5,
retry_jitter=False,
queue='kpi_low_priority_queue',
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
)
def empty_account(account_trash_id: int):
with transaction.atomic():
Expand Down Expand Up @@ -158,6 +162,9 @@ def empty_account(account_trash_id: int):
retry_backoff_max=600,
max_retries=5,
retry_jitter=False,
queue='kpi_low_priority_queue',
soft_time_limit=settings.CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT,
time_limit=settings.CELERY_LONG_RUNNING_TASK_TIME_LIMIT,
)
def empty_project(project_trash_id: int):
with transaction.atomic():
Expand Down
10 changes: 7 additions & 3 deletions kobo/apps/trash_bin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from datetime import timedelta

from django.contrib.auth import get_user_model
from django.conf import settings
from django.db import IntegrityError, models, transaction
from django.db.models import F
from django.db.models import F, Q
from django.db.models.signals import pre_delete
from django.utils.timezone import now
from django_celery_beat.models import (
Expand Down Expand Up @@ -38,19 +39,22 @@ def delete_asset(request_author: 'auth.User', asset: 'kpi.Asset'):

asset_id = asset.pk
asset_uid = asset.uid
host = settings.KOBOFORM_URL
owner_username = asset.owner.username
project_exports = []
if asset.has_deployment:
_delete_submissions(request_author, asset)
asset.deployment.delete()
project_exports = ExportTask.objects.filter(
data__source__endswith=f'/api/v2/assets/{asset.uid}/'
Q(data__source=f'{host}/api/v2/assets/{asset.uid}/')
| Q(data__source=f'{host}/assets/{asset.uid}/')
)

with transaction.atomic():
# Delete imports
ImportTask.objects.filter(
data__destination__endswith=f'/api/v2/assets/{asset.uid}/'
Q(data__destination=f'{host}/api/v2/assets/{asset.uid}/')
| Q(data__destination=f'{host}/assets/{asset.uid}/')
).delete()
# Delete exports (and related files on storage)
for export in project_exports:
Expand Down
9 changes: 9 additions & 0 deletions kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,15 @@ def __init__(self, *args, **kwargs):
CELERY_BROKER_URL = os.environ.get('KPI_BROKER_URL', 'redis://localhost:6379/1')
CELERY_RESULT_BACKEND = CELERY_BROKER_URL

# Increase limits for long-running tasks
# Notes: They are custom name, not part of `CELERY_*` namespace.
CELERY_LONG_RUNNING_TASK_TIME_LIMIT = int(
os.environ.get('CELERY_LONG_RUNNING_TASK_TIME_LIMIT', 4260) # seconds
)

CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT = int(
os.environ.get('CELERY_LONG_RUNNING_TASK_SOFT_TIME_LIMIT', 4200) # seconds
)

''' Django allauth configuration '''
ACCOUNT_ADAPTER = 'kobo.apps.accounts.adapter.AccountAdapter'
Expand Down
101 changes: 101 additions & 0 deletions kpi/migrations/0050_add_indexes_to_import_and_export_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Generated by Django 3.2.15 on 2023-05-18 21:37

import django.contrib.postgres.indexes
import django.db.models.expressions
from django.conf import settings
from django.db import migrations


def manually_create_indexes_instructions(apps, schema_editor):
print(
"""
!!! ATTENTION !!!
You need to run the SQL queries below in PostgreSQL directly:
> CREATE INDEX CONCURRENTLY "data__destination_idx" ON "kpi_importtask" USING btree ((("data" -> 'destination')));
> CREATE INDEX CONCURRENTLY "data__source_idx" ON "kpi_exporttask" USING btree ((("data" -> 'source')));
> CREATE INDEX CONCURRENTLY "data__destination_hash_idx" ON "kpi_importtask" USING hash ((("data" -> 'destination')));
> CREATE INDEX CONCURRENTLY "data__source_hash_idx" ON "kpi_exporttask" USING hash ((("data" -> 'source')));
Otherwise, project deletions will perform very poorly.
You may create one index per table simultaneously, i.e. you may run the
first two queries in parallel (within different psql sessions) and then
run the next two in parallel afterwards. You cannot run all of them
at the same time.
"""
)


def manually_drop_indexes_instructions(apps, schema_editor):
print(
"""
!!! ATTENTION !!!
Run the SQL queries below in PostgreSQL directly:
> DROP INDEX CONCURRENTLY IF EXISTS "data__destination_idx";
> DROP INDEX CONCURRENTLY IF EXISTS "data__source_idx";
> DROP INDEX CONCURRENTLY IF EXISTS "data__destination_hash_idx";
> DROP INDEX CONCURRENTLY IF EXISTS "data__source_hash_idx";
You may remove one index per table simultaneously, i.e. you may run the
first two queries in parallel (within different psql sessions) and then
run the next two in parallel afterwards. You cannot run all of them
at the same time.
"""
)


class Migration(migrations.Migration):

dependencies = [
('kpi', '0049_add_pending_delete_to_asset'),
]

if not settings.SKIP_HEAVY_MIGRATIONS:
print(
"""
This might take a while. If it is too slow, you may want to
interrupt this migration, cancel any outstanding `CREATE…` or `DROP
INDEX` queries on `kpi_importtask` and `kpi_exporttask`, re-run the
migration with `SKIP_HEAVY_MIGRATIONS=True`, and then follow the
printed instructions to set up the indexes concurrently (without
downtime) using raw SQL.
"""
)
operations = [
migrations.AddIndex(
model_name='importtask',
index=django.contrib.postgres.indexes.BTreeIndex(
django.db.models.expressions.F('data__destination'),
name='data__destination_idx',
),
),
migrations.AddIndex(
model_name='importtask',
index=django.contrib.postgres.indexes.HashIndex(
django.db.models.expressions.F('data__destination'),
name='data__destination_hash_idx',
),
),
migrations.AddIndex(
model_name='exporttask',
index=django.contrib.postgres.indexes.BTreeIndex(
django.db.models.expressions.F('data__source'), name='data__source_idx'),
),
migrations.AddIndex(
model_name='exporttask',
index=django.contrib.postgres.indexes.HashIndex(
django.db.models.expressions.F('data__source'),
name='data__source_hash_idx'),
),
]
else:
operations = [
migrations.RunPython(
manually_create_indexes_instructions,
manually_drop_indexes_instructions,
)
]
22 changes: 21 additions & 1 deletion kpi/models/import_export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import constance
import requests
from django.conf import settings
from django.contrib.postgres.indexes import BTreeIndex, HashIndex
from django.core.files.storage import FileSystemStorage
from django.db import models, transaction
from django.db.models import F
from django.urls import reverse
from django.utils.translation import gettext as t
import formpack
Expand All @@ -36,7 +38,6 @@
from formpack.utils.kobo_locking import get_kobo_locking_profiles
from formpack.utils.string import ellipsize
from private_storage.fields import PrivateFileField
from pyxform import xls2json_backends
from rest_framework import exceptions
from werkzeug.http import parse_options_header
from openpyxl.utils.exceptions import InvalidFileException
Expand Down Expand Up @@ -228,6 +229,16 @@ class ImportTask(ImportExportTask):
...although we probably would need to store the file in a blob
"""

class Meta(ImportExportTask.Meta):
indexes = [
BTreeIndex(
F('data__destination'), name='data__destination_idx'
),
HashIndex(
F('data__destination'), name='data__destination_hash_idx'
),
]

def _run_task(self, messages):
self.status = self.PROCESSING
self.save(update_fields=['status'])
Expand Down Expand Up @@ -569,6 +580,14 @@ def __str__(self):
class Meta:
abstract = True
ordering = ['-date_created']
indexes = [
BTreeIndex(
F('data__source'), name='data__source_idx'
),
HashIndex(
F('data__source'), name='data__source_hash_idx'
),
]

def _build_export_filename(self, export, export_type):
"""
Expand Down Expand Up @@ -919,6 +938,7 @@ class ExportTask(ExportTaskBase):
"""
An asynchronous export task, to be run with Celery
"""

def _run_task(self, messages):
try:
source_url = self.data['source']
Expand Down

0 comments on commit 47cd19b

Please sign in to comment.