Skip to content

Commit

Permalink
Merge pull request #5135 from kobotoolbox/fix-transfer-ownership-race…
Browse files Browse the repository at this point in the history
…-condition

Fix project transfer failure in some circumstances
  • Loading branch information
noliveleger authored Oct 1, 2024
2 parents abe5c07 + 041aeb9 commit 2ff37e7
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 31 deletions.
4 changes: 2 additions & 2 deletions kobo/apps/openrosa/apps/logger/models/attachment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# coding: utf-8
import mimetypes
import os

Expand All @@ -10,6 +9,7 @@
from kpi.deployment_backends.kc_access.storage import (
default_kobocat_storage as default_storage,
)
from kpi.fields.file import ExtendedFileField
from .instance import Instance


Expand Down Expand Up @@ -41,7 +41,7 @@ class Attachment(models.Model):
instance = models.ForeignKey(
Instance, related_name='attachments', on_delete=models.CASCADE
)
media_file = models.FileField(
media_file = ExtendedFileField(
storage=default_storage,
upload_to=upload_to,
max_length=380,
Expand Down
4 changes: 4 additions & 0 deletions kobo/apps/project_ownership/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class AsyncTaskException(Exception):

class TransferAlreadyProcessedException(Exception):
pass


class TransferStillPendingException(Exception):
pass
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from django.core.management import call_command
from django.core.management.base import BaseCommand

from ...models import (
Transfer,
TransferStatus,
TransferStatusChoices,
TransferStatusTypeChoices,
)
from ...utils import (
move_media_files,
move_attachments,
rewrite_mongo_userform_id,
)


class Command(BaseCommand):
help = (
'Resume project ownership transfers done under `2.024.25` which failed '
'with error: "Project A : previous_owner -> new_owner is not in progress"'
)

def handle(self, *args, **options):

usernames = set()
verbosity = options['verbosity']

for transfer_status in TransferStatus.objects.filter(
status=TransferStatusChoices.FAILED,
status_type=TransferStatusTypeChoices.GLOBAL,
error__icontains='is not in progress',
).iterator():
transfer = transfer_status.transfer
if transfer.asset.pending_delete:
if verbosity:
self.stdout.write(
f'Project `{transfer.asset}` is in trash bin, skip it!'
)
continue

if not self._validate_whether_transfer_can_be_fixed(transfer):
if verbosity:
self.stdout.write(
f'Project `{transfer.asset}` transfer cannot be fixed'
f' automatically'
)
continue

if not transfer.asset.has_deployment:
continue

if verbosity:
self.stdout.write(
f'Resuming `{transfer.asset}` transfer…'
)
self._move_data(transfer)
move_attachments(transfer)
move_media_files(transfer)
if verbosity:
self.stdout.write('\tDone!')
usernames.add(transfer.invite.recipient.username)

# Update attachment storage bytes counters
for username in usernames:
call_command(
'update_attachment_storage_bytes',
verbosity=verbosity,
force=True,
username=username,
)

def _move_data(self, transfer: Transfer):

# Sanity check
asset = transfer.asset
rewrite_mongo_userform_id(transfer)
number_of_submissions = asset.deployment.xform.num_of_submissions
submission_ids = [
s['_id']
for s in asset.deployment.get_submissions(asset.owner, fields=['_id'])
]

if number_of_submissions == (mongo_document_count := len(submission_ids)):
self.stdout.write(f'\tSuccess: {number_of_submissions} submissions moved!')
else:
missing_count = number_of_submissions - mongo_document_count
self.stdout.write(
f'\t⚠️ Only {mongo_document_count} submissions moved, '
f'{missing_count} are missing!'
)

def _validate_whether_transfer_can_be_fixed(self, transfer: Transfer) -> bool:
original_new_owner_id = transfer.invite.recipient_id
current_owner_id = transfer.asset.owner_id

return current_owner_id == original_new_owner_id
33 changes: 20 additions & 13 deletions kobo/apps/project_ownership/models/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def status(self, value: Union[str, tuple[str]]):
global_status.status = value

global_status.save()

self.date_modified = timezone.now()
self.save(update_fields=['date_modified'])
self._update_invite_status()
Expand All @@ -100,6 +101,7 @@ def transfer_project(self):
success = False
try:
if not self.asset.has_deployment:
_rewrite_mongo = False
with transaction.atomic():
self._reassign_project_permissions(update_deployment=False)
self._sent_in_app_messages()
Expand All @@ -113,6 +115,7 @@ def transfer_project(self):
status=TransferStatusChoices.SUCCESS
)
else:
_rewrite_mongo = True
with transaction.atomic():
with kc_transaction_atomic():
deployment = self.asset.deployment
Expand All @@ -129,19 +132,9 @@ def transfer_project(self):

self._sent_in_app_messages()

# Move submissions, media files and attachments in background
# tasks because it can take a while to complete on big projects

# 1) Rewrite `_userform_id` in MongoDB
async_task.delay(
self.pk, TransferStatusTypeChoices.SUBMISSIONS
)

# 2) Move media files to new owner's home directory
async_task.delay(
self.pk, TransferStatusTypeChoices.MEDIA_FILES
)

# Do not delegate anything to Celery before the transaction has
# been validated. Otherwise, Celery could fetch outdated data.
transaction.on_commit(lambda: self._start_async_jobs(_rewrite_mongo))
success = True
finally:
if not success:
Expand Down Expand Up @@ -265,6 +258,20 @@ def _sent_in_app_messages(self):
]
)

def _start_async_jobs(self, rewrite_mongo: bool = True):
# Move submissions, media files and attachments in background
# tasks because it can take a while to complete on big projects
if rewrite_mongo:
# 1) Rewrite `_userform_id` in MongoDB
async_task.delay(
self.pk, TransferStatusTypeChoices.SUBMISSIONS
)

# 2) Move media files to new owner's home directory
async_task.delay(
self.pk, TransferStatusTypeChoices.MEDIA_FILES
)

def _update_invite_status(self):
"""
Update the status of the invite based on the status of each transfer
Expand Down
11 changes: 9 additions & 2 deletions kobo/apps/project_ownership/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from kobo.celery import celery_app
from kpi.utils.mailer import EmailMessage, Mailer
from .exceptions import AsyncTaskException
from .exceptions import AsyncTaskException, TransferStillPendingException
from .models.choices import (
InviteStatusChoices,
TransferStatusChoices,
Expand All @@ -28,6 +28,7 @@
autoretry_for=(
SoftTimeLimitExceeded,
TimeLimitExceeded,
TransferStillPendingException,
),
max_retry=5,
retry_backoff=60,
Expand All @@ -43,8 +44,14 @@ def async_task(transfer_id: int, async_task_type: str):

transfer = Transfer.objects.get(pk=transfer_id)

if transfer.status == TransferStatusChoices.PENDING:
# Sometimes, a race condition occurs: the Celery task starts, but
# `transfer.status` has not been updated fast enough.
# Raise an exception which allows retry.
raise TransferStillPendingException

if transfer.status != TransferStatusChoices.IN_PROGRESS:
raise AsyncTaskException(f'`{transfer}` is not in progress')
raise AsyncTaskException(f'`{transfer}` is not in progress: {transfer.status}')

TransferStatus.update_status(
transfer_id=transfer_id,
Expand Down
23 changes: 11 additions & 12 deletions kobo/apps/project_ownership/tests/api/v2/test_api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import uuid

from constance.test import override_config
from datetime import timedelta
from dateutil.parser import isoparse
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils import timezone
from mock import patch, MagicMock
from rest_framework import status
from rest_framework.reverse import reverse
from unittest.mock import ANY

from kobo.apps.project_ownership.models import (
Invite,
Expand All @@ -18,11 +15,11 @@
)
from kobo.apps.project_ownership.tests.utils import MockServiceUsageSerializer
from kobo.apps.trackers.utils import update_nlp_counter

from kpi.constants import PERM_VIEW_ASSET
from kpi.models import Asset
from kpi.tests.base_test_case import BaseAssetTestCase
from kpi.tests.kpi_test_case import KpiTestCase
from kpi.tests.utils.transaction import immediate_on_commit
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE


Expand Down Expand Up @@ -432,7 +429,7 @@ def test_account_usage_transferred_to_new_user(self):
response = self.client.get(service_usage_url)
assert response.data == expected_empty_data

# Transfer project from someuser to anotheruser
# Transfer the project from someuser to anotheruser
self.client.login(username='someuser', password='someuser')
payload = {
'recipient': self.absolute_reverse(
Expand All @@ -445,9 +442,10 @@ def test_account_usage_transferred_to_new_user(self):
'kpi.deployment_backends.backends.MockDeploymentBackend.xform',
MagicMock(),
):
response = self.client.post(
self.invite_url, data=payload, format='json'
)
with immediate_on_commit():
response = self.client.post(
self.invite_url, data=payload, format='json'
)
assert response.status_code == status.HTTP_201_CREATED

# someuser should have no usage reported anymore
Expand Down Expand Up @@ -495,7 +493,7 @@ def test_data_accessible_to_new_user(self):
) == 0
)

# Transfer project from someuser to anotheruser
# Transfer the project from someuser to anotheruser
self.client.login(username='someuser', password='someuser')
payload = {
'recipient': self.absolute_reverse(
Expand All @@ -508,9 +506,10 @@ def test_data_accessible_to_new_user(self):
'kpi.deployment_backends.backends.MockDeploymentBackend.xform',
MagicMock(),
):
response = self.client.post(
self.invite_url, data=payload, format='json'
)
with immediate_on_commit():
response = self.client.post(
self.invite_url, data=payload, format='json'
)
assert response.status_code == status.HTTP_201_CREATED

# anotheruser is the owner and should see the project
Expand Down
7 changes: 5 additions & 2 deletions kobo/apps/project_ownership/tests/test_transfer_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from django.test import TestCase

from kpi.models import Asset
from kpi.tests.utils.transaction import immediate_on_commit
from ..models import (
Invite,
InviteStatusChoices,
Expand Down Expand Up @@ -105,9 +106,11 @@ def test_calculated_failed_transfer_status(self):
assert self.invite.status == InviteStatusChoices.FAILED

def test_draft_project_transfer(self):
# when project is a draft, there are no celery tasks called to move
# When a project is a draft, there are no celery tasks called to move
# submissions (and related attachments).
self.transfer.transfer_project()
with immediate_on_commit():
self.transfer.transfer_project()

assert self.transfer.status == TransferStatusChoices.SUCCESS

# However, the status of each async task should still be updated to
Expand Down
25 changes: 25 additions & 0 deletions kpi/tests/utils/transaction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from contextlib import contextmanager
from unittest import mock

from django.contrib.auth.management import DEFAULT_DB_ALIAS


@contextmanager
def immediate_on_commit(using=None):
"""
Context manager executing transaction.on_commit() hooks immediately as
if the connection was in auto-commit mode. This is required when
using a subclass of django.test.TestCase as all tests are wrapped in
a transaction that never gets committed.
Source: https://code.djangoproject.com/ticket/30457#comment:1
"""
immediate_using = DEFAULT_DB_ALIAS if using is None else using

def on_commit(func, using=None):
using = DEFAULT_DB_ALIAS if using is None else using
if using == immediate_using:
func()

with mock.patch('django.db.transaction.on_commit', side_effect=on_commit) as patch:
yield patch

0 comments on commit 2ff37e7

Please sign in to comment.