From fec22d30e4f19fe93ef7e770bbd84005785f3f21 Mon Sep 17 00:00:00 2001 From: apiyo Date: Tue, 16 Aug 2022 21:53:36 +0300 Subject: [PATCH 01/18] Add endpoint for fetching currently running csv imports --- .pylintrc | 2 +- onadata/apps/api/viewsets/xform_viewset.py | 12 ++++ onadata/libs/tests/utils/test_async_status.py | 42 ++++++++++++- onadata/libs/utils/async_status.py | 62 ++++++++++++++++--- 4 files changed, 107 insertions(+), 11 deletions(-) diff --git a/.pylintrc b/.pylintrc index 056fb09489..bf1e2490f0 100644 --- a/.pylintrc +++ b/.pylintrc @@ -124,7 +124,7 @@ indent-string=' ' max-line-length=100 # Maximum number of lines in a module -max-module-lines=1000 +max-module-lines=1005 # List of optional constructs for which whitespace checking is disabled. `dict- # separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. diff --git a/onadata/apps/api/viewsets/xform_viewset.py b/onadata/apps/api/viewsets/xform_viewset.py index 5c75c71281..8516908405 100644 --- a/onadata/apps/api/viewsets/xform_viewset.py +++ b/onadata/apps/api/viewsets/xform_viewset.py @@ -97,6 +97,7 @@ get_form_url, ) from onadata.settings.common import CSV_EXTENSION, XLS_EXTENSIONS +from onadata.libs.utils.async_status import get_active_tasks ENKETO_AUTH_COOKIE = getattr(settings, "ENKETO_AUTH_COOKIE", "__enketo") ENKETO_META_UID_COOKIE = getattr( @@ -877,6 +878,17 @@ def versions(self, request, *args, **kwargs): return Response(data=serializer.data, status=status.HTTP_200_OK) + @action(methods=["GET"], detail=True) + def active_imports(self, request, *args, **kwargs): + """Returns csv import async tasks that belong to this form""" + xform = self.get_object() + task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] + return Response( + data=get_active_tasks(task_names, xform), + status=status.HTTP_200_OK, + content_type="application/json", + ) + @action(methods=["GET"], detail=True) def export_async(self, request, *args, **kwargs): """Returns the status of an async export.""" diff --git a/onadata/libs/tests/utils/test_async_status.py b/onadata/libs/tests/utils/test_async_status.py index ea088b4f7b..eac712fc57 100644 --- a/onadata/libs/tests/utils/test_async_status.py +++ b/onadata/libs/tests/utils/test_async_status.py @@ -1,7 +1,13 @@ -from onadata.apps.main.tests.test_base import TestBase +""" +tests for celery asyncronous task utilities +""" +from unittest.mock import MagicMock + from celery import states +from onadata.apps.main.tests.test_base import TestBase +from onadata.celeryapp import app from onadata.libs.utils import async_status - +from onadata.apps.logger.models.xform import XForm class TestAsyncStatus(TestBase): @@ -34,3 +40,35 @@ def test_async_status(self): .get('error')) self.assertFalse(async_status. async_status(async_status.SUCCESSFUL).get('error')) + + def test_get_active_tasks(self): + """test get_active_tasks""" + xform = XForm() + self.assertEqual( + async_status.get_active_tasks( + ['onadata.libs.utils.csv_import.submit_csv_async'], xform + ), + '[]', + ) + inspect = MagicMock() + inspect.active = MagicMock( + return_value={ + 'celery-worker@onadata-id-1': [ + { + 'args': [None, xform.pk], + 'id': '11', + 'time_start': '2021-02-26T03:28:19.512875-05:00', + 'name': 'onadata.libs.utils.csv_import.submit_csv_async', + } + ] + } + ) + app.control.inspect = MagicMock(return_value=inspect) + + self.assertEqual( + async_status.get_active_tasks( + ['onadata.libs.utils.csv_import.submit_csv_async'], xform + ), + '[{"job_uuid": "11", "time_start"' + + ': "2021-02-26T03:28:19.512875-05:00"}]', + ) diff --git a/onadata/libs/utils/async_status.py b/onadata/libs/utils/async_status.py index e494188fd9..ec48e6f012 100644 --- a/onadata/libs/utils/async_status.py +++ b/onadata/libs/utils/async_status.py @@ -1,4 +1,14 @@ +""" +Utilities for celery asyncronous tasks +""" +import json + +from typing import List from celery import states +from django.utils.translation import gettext + +from onadata.celeryapp import app +from onadata.apps.logger.models.xform import XForm PENDING = 0 SUCCESSFUL = 1 @@ -7,21 +17,57 @@ RETRY = 4 STARTED = 5 -status_msg = {PENDING: 'PENDING', SUCCESSFUL: 'SUCCESS', FAILED: 'FAILURE', - PROGRESS: 'PROGRESS', RETRY: 'RETRY', STARTED: 'STARTED'} +status_msg = { + PENDING: 'PENDING', + SUCCESSFUL: 'SUCCESS', + FAILED: 'FAILURE', + PROGRESS: 'PROGRESS', + RETRY: 'RETRY', + STARTED: 'STARTED', +} def celery_state_to_status(state): - status_map = {states.PENDING: PENDING, states.STARTED: STARTED, - states.RETRY: RETRY, states.SUCCESS: SUCCESSFUL, - states.FAILURE: FAILED, 'PROGRESS': PROGRESS} + status_map = { + states.PENDING: PENDING, + states.STARTED: STARTED, + states.RETRY: RETRY, + states.SUCCESS: SUCCESSFUL, + states.FAILURE: FAILED, + 'PROGRESS': PROGRESS, + } return status_map[state] if state in status_map else FAILED def async_status(status, error=None): - status = { - 'job_status': status_msg[status] - } + status = {'job_status': status_msg[status]} if error: status['error'] = error return status + + +def get_active_tasks(task_names: List[str], xform: XForm): + """Get active celery tasks""" + inspect = app.control.inspect() + inspect_active = inspect.active() + data = [] + if inspect_active: + task_list = list(inspect_active.values()) + data = list( + filter( + lambda task: xform.pk == task['args'][1] and task['name'] in task_names, + task_list[0], + ) + ) + + return json.dumps( + list( + map( + lambda i: { + 'job_uuid': gettext(i['id']), + 'time_start': i['time_start'], + }, + data, + ) + ) + ) From 7f9c3a853e27ed7754dcd2df923117f9142c50db Mon Sep 17 00:00:00 2001 From: apiyo Date: Thu, 29 Sep 2022 09:01:33 +0300 Subject: [PATCH 02/18] Add documentation for form's running csv imports endpoint --- .pylintrc | 2 +- docs/forms.rst | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/.pylintrc b/.pylintrc index bf1e2490f0..c7efbd5dee 100644 --- a/.pylintrc +++ b/.pylintrc @@ -124,7 +124,7 @@ indent-string=' ' max-line-length=100 # Maximum number of lines in a module -max-module-lines=1005 +max-module-lines=1010 # List of optional constructs for which whitespace checking is disabled. `dict- # separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. diff --git a/docs/forms.rst b/docs/forms.rst index 5c4184b859..d800626c1a 100644 --- a/docs/forms.rst +++ b/docs/forms.rst @@ -1628,3 +1628,29 @@ If the upload is still running: { "job_status": "PENDING" } + + +Check active imports +-------------------- + +.. raw:: html + +
GET /api/v1/forms/{form_id}/active_imports
+ +Example +^^^^^^^ +:: + + curl -X GET https://api.ona.io/api/v1/forms/{form_id}/active_imports + +Response +^^^^^^^^ + +:: + + [ + { + "job_uuid": "256dcef5-1baa-48ee-83a3-f7100123f5d2", + "time_start": 1664372983.8631873 + } + ] From 8ff3e31ce2d6340d21ab9a2598afa0eba61eb237 Mon Sep 17 00:00:00 2001 From: apiyo Date: Thu, 29 Sep 2022 16:23:22 +0300 Subject: [PATCH 03/18] Code clean up: Format job_start datetime --- docs/forms.rst | 2 +- onadata/libs/tests/utils/test_async_status.py | 8 ++++++-- onadata/libs/utils/async_status.py | 5 ++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/forms.rst b/docs/forms.rst index d800626c1a..8b04cbb5e0 100644 --- a/docs/forms.rst +++ b/docs/forms.rst @@ -1651,6 +1651,6 @@ Response [ { "job_uuid": "256dcef5-1baa-48ee-83a3-f7100123f5d2", - "time_start": 1664372983.8631873 + "time_start": "2022-09-29T09:08:59" } ] diff --git a/onadata/libs/tests/utils/test_async_status.py b/onadata/libs/tests/utils/test_async_status.py index eac712fc57..2654aa2580 100644 --- a/onadata/libs/tests/utils/test_async_status.py +++ b/onadata/libs/tests/utils/test_async_status.py @@ -1,6 +1,7 @@ """ tests for celery asyncronous task utilities """ +from datetime import datetime from unittest.mock import MagicMock from celery import states @@ -44,6 +45,7 @@ def test_async_status(self): def test_get_active_tasks(self): """test get_active_tasks""" xform = XForm() + time_start = 1664372983.8631873 self.assertEqual( async_status.get_active_tasks( ['onadata.libs.utils.csv_import.submit_csv_async'], xform @@ -57,7 +59,7 @@ def test_get_active_tasks(self): { 'args': [None, xform.pk], 'id': '11', - 'time_start': '2021-02-26T03:28:19.512875-05:00', + 'time_start': time_start, 'name': 'onadata.libs.utils.csv_import.submit_csv_async', } ] @@ -70,5 +72,7 @@ def test_get_active_tasks(self): ['onadata.libs.utils.csv_import.submit_csv_async'], xform ), '[{"job_uuid": "11", "time_start"' - + ': "2021-02-26T03:28:19.512875-05:00"}]', + + ": \"" + + datetime.fromtimestamp(time_start).strftime("%Y-%m-%dT%H:%M:%S") + + "\"}]", ) diff --git a/onadata/libs/utils/async_status.py b/onadata/libs/utils/async_status.py index ec48e6f012..2f62f659ed 100644 --- a/onadata/libs/utils/async_status.py +++ b/onadata/libs/utils/async_status.py @@ -2,6 +2,7 @@ Utilities for celery asyncronous tasks """ import json +from datetime import datetime from typing import List from celery import states @@ -65,7 +66,9 @@ def get_active_tasks(task_names: List[str], xform: XForm): map( lambda i: { 'job_uuid': gettext(i['id']), - 'time_start': i['time_start'], + 'time_start': datetime.fromtimestamp(i["time_start"]).strftime( + "%Y-%m-%dT%H:%M:%S" + ), }, data, ) From cb01a967b59769c26458a42fcc2b2f2a5d7a89af Mon Sep 17 00:00:00 2001 From: apiyo Date: Tue, 4 Oct 2022 15:12:22 +0300 Subject: [PATCH 04/18] Add imports viewset and register with v2 router --- onadata/apps/api/urls/v2_urls.py | 4 +- .../apps/api/viewsets/v2/imports_viewset.py | 88 +++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 onadata/apps/api/viewsets/v2/imports_viewset.py diff --git a/onadata/apps/api/urls/v2_urls.py b/onadata/apps/api/urls/v2_urls.py index 0751997480..72cbfefb05 100644 --- a/onadata/apps/api/urls/v2_urls.py +++ b/onadata/apps/api/urls/v2_urls.py @@ -3,8 +3,10 @@ Custom rest_framework Router V2 """ from onadata.apps.api.viewsets.v2.tableau_viewset import TableauViewSet +from onadata.apps.api.viewsets.v2.imports_viewset import ImportsViewSet from .v1_urls import MultiLookupRouter router = MultiLookupRouter(trailing_slash=False) -router.register(r"open-data", TableauViewSet, basename="open-data") +router.register(r'open-data', TableauViewSet, basename='open-data') +router.register(r'imports', ImportsViewSet, basename='imports') diff --git a/onadata/apps/api/viewsets/v2/imports_viewset.py b/onadata/apps/api/viewsets/v2/imports_viewset.py new file mode 100644 index 0000000000..c4f7279978 --- /dev/null +++ b/onadata/apps/api/viewsets/v2/imports_viewset.py @@ -0,0 +1,88 @@ +import os + +from django.conf import settings +from django.shortcuts import get_object_or_404 +from django.core.files.storage import default_storage +from rest_framework import viewsets, status +from rest_framework.response import Response +from rest_framework.exceptions import ParseError +from rest_framework.decorators import action + +from onadata.apps.api.tools import get_baseviewset_class +from onadata.apps.api.permissions import XFormPermissions +from onadata.apps.logger.models import XForm +from onadata.libs.mixins.cache_control_mixin import CacheControlMixin +from onadata.libs.mixins.etags_mixin import ETagsMixin +from onadata.libs.utils.async_status import get_active_tasks +from onadata.libs.utils.csv_import import submission_xls_to_csv, submit_csv, submit_csv_async +from onadata.settings.common import CSV_EXTENSION, XLS_EXTENSIONS + +BaseViewset = get_baseviewset_class() + + +class ImportsViewSet(ETagsMixin, CacheControlMixin, + viewsets.ViewSet): + permission_classes = [XFormPermissions] + queryset = XForm.objects.filter(deleted_at__isnull=True) + + def get_queryset(self): + return XForm.objects.filter(deleted_at__isnull=True) + + def get_object(self, pk: int): + queryset = self.get_queryset() + obj = get_object_or_404(queryset, pk=pk) + self.check_object_permissions(self.request, obj) + return obj + + @action(detail=True, methods=["POST"]) + def start(self, request, pk: int = None): + xform = self.get_object(pk) + resp = {} + import ipdb; ipdb.set_trace() + csv_file = request.FILES.get("csv_file", None) + xls_file = request.FILES.get("xls_file", None) + if csv_file is None and xls_file is None: + resp.update({"error": "csv_file and xls_file field empty"}) + elif xls_file and xls_file.name.split(".")[-1] not in XLS_EXTENSIONS: + resp.update({"error": "xls_file not an excel file"}) + elif csv_file and csv_file.name.split(".")[-1] != CSV_EXTENSION: + resp.update({"error": "csv_file not a csv file"}) + else: + if xls_file and xls_file.name.split(".")[-1] in XLS_EXTENSIONS: + csv_file = submission_xls_to_csv(xls_file) + overwrite = request.query_params.get("overwrite") + overwrite = (overwrite.lower() == "true" if isinstance( + overwrite, str) else overwrite) + size_threshold = settings.CSV_FILESIZE_IMPORT_ASYNC_THRESHOLD + try: + csv_size = csv_file.size + except AttributeError: + csv_size = csv_file.__sizeof__() + csv_file.seek(0) + upload_to = os.path.join(request.user.username, "csv_imports", + csv_file.name) + file_name = default_storage.save(upload_to, csv_file) + task = submit_csv_async.delay(request.user.username, + self.object.pk, file_name, overwrite) + if task is None: + raise ParseError("Task not found") + resp.update({"task_id": task.task_id}) + + return Response( + data=resp, + status=status.HTTP_200_OK + if resp.get("error") is None else status.HTTP_400_BAD_REQUEST, + content_type="application/json" + ) + + def retrieve(self, request, pk: int = None): + """Returns csv import async tasks that belong to this form""" + xform = self.get_object(pk) + + task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] + if xform: + return Response( + data=get_active_tasks(task_names, xform), + status=status.HTTP_200_OK, + content_type="application/json", + ) From b4c52caa93e1978f167b255de8d4b988c233292a Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 10:24:23 +0300 Subject: [PATCH 05/18] Include `file` and `overwrite` kwargs on the Task status - Format the file using black --- onadata/libs/utils/async_status.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/onadata/libs/utils/async_status.py b/onadata/libs/utils/async_status.py index 2f62f659ed..cf851e3de9 100644 --- a/onadata/libs/utils/async_status.py +++ b/onadata/libs/utils/async_status.py @@ -19,12 +19,12 @@ STARTED = 5 status_msg = { - PENDING: 'PENDING', - SUCCESSFUL: 'SUCCESS', - FAILED: 'FAILURE', - PROGRESS: 'PROGRESS', - RETRY: 'RETRY', - STARTED: 'STARTED', + PENDING: "PENDING", + SUCCESSFUL: "SUCCESS", + FAILED: "FAILURE", + PROGRESS: "PROGRESS", + RETRY: "RETRY", + STARTED: "STARTED", } @@ -35,15 +35,15 @@ def celery_state_to_status(state): states.RETRY: RETRY, states.SUCCESS: SUCCESSFUL, states.FAILURE: FAILED, - 'PROGRESS': PROGRESS, + "PROGRESS": PROGRESS, } return status_map[state] if state in status_map else FAILED def async_status(status, error=None): - status = {'job_status': status_msg[status]} + status = {"job_status": status_msg[status]} if error: - status['error'] = error + status["error"] = error return status @@ -56,7 +56,7 @@ def get_active_tasks(task_names: List[str], xform: XForm): task_list = list(inspect_active.values()) data = list( filter( - lambda task: xform.pk == task['args'][1] and task['name'] in task_names, + lambda task: xform.pk == task["args"][1] and task["name"] in task_names, task_list[0], ) ) @@ -65,10 +65,12 @@ def get_active_tasks(task_names: List[str], xform: XForm): list( map( lambda i: { - 'job_uuid': gettext(i['id']), - 'time_start': datetime.fromtimestamp(i["time_start"]).strftime( + "job_uuid": gettext(i["id"]), + "time_start": datetime.fromtimestamp(i["time_start"]).strftime( "%Y-%m-%dT%H:%M:%S" ), + "file": gettext(i["args"][2]), + "overwrite": gettext(i["args"][3]), }, data, ) From 0d484b1d63cd44b3fd5a36a0a0bfa56579cabbe7 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 10:25:32 +0300 Subject: [PATCH 06/18] Add `DetailedPostRouter` class The class is used to allow the `create` route of a viewset to have detail passed to it i.e // --- onadata/apps/api/urls/v2_urls.py | 61 ++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/onadata/apps/api/urls/v2_urls.py b/onadata/apps/api/urls/v2_urls.py index 72cbfefb05..011deed4e4 100644 --- a/onadata/apps/api/urls/v2_urls.py +++ b/onadata/apps/api/urls/v2_urls.py @@ -2,11 +2,66 @@ """ Custom rest_framework Router V2 """ +from rest_framework import routers + from onadata.apps.api.viewsets.v2.tableau_viewset import TableauViewSet from onadata.apps.api.viewsets.v2.imports_viewset import ImportsViewSet from .v1_urls import MultiLookupRouter -router = MultiLookupRouter(trailing_slash=False) -router.register(r'open-data', TableauViewSet, basename='open-data') -router.register(r'imports', ImportsViewSet, basename='imports') + +class DetailedPostRouter(routers.DefaultRouter): + routes = [ + # List route. + routers.Route( + url=r"^{prefix}{trailing_slash}$", + mapping={"get": "list", "post": "create"}, + name="{basename}-list", + detail=False, + initkwargs={"suffix": "List"}, + ), + # Dynamically generated list routes. Generated using + # @action(detail=False) decorator on methods of the viewset. + routers.DynamicRoute( + url=r"^{prefix}/{url_path}{trailing_slash}$", + name="{basename}-{url_name}", + detail=False, + initkwargs={}, + ), + # Detail route. + routers.Route( + url=r"^{prefix}/{lookup}{trailing_slash}$", + mapping={ + "get": "retrieve", + "put": "update", + "patch": "partial_update", + "delete": "destroy", + "post": "create", + }, + name="{basename}-detail", + detail=True, + initkwargs={"suffix": "Instance"}, + ), + # Dynamically generated detail routes. Generated using + # @action(detail=True) decorator on methods of the viewset. + routers.DynamicRoute( + url=r"^{prefix}/{lookup}/{url_path}{trailing_slash}$", + name="{basename}-{url_name}", + detail=True, + initkwargs={}, + ), + ] + + def extend(self, router): + """ + Extends the routers routes with the routes from another router + """ + self.registry.extend(router.registry) + + +base_router = MultiLookupRouter(trailing_slash=False) +router = DetailedPostRouter(trailing_slash=False) +base_router.register(r"open-data", TableauViewSet, basename="open-data") +router.register(r"imports", ImportsViewSet, basename="imports") + +router.extend(base_router) From e9b0afce737a8ec6698fc7b98d6bca08d436d1a1 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 10:26:44 +0300 Subject: [PATCH 07/18] Add `create` route to the `ImportsViewSet` - Add _get_active_tasks utility function - Modify `retrieve` route; Remove uneeded if statement since the get_object function raises a 404 error if object is missing --- .../apps/api/viewsets/v2/imports_viewset.py | 127 +++++++++++++----- 1 file changed, 91 insertions(+), 36 deletions(-) diff --git a/onadata/apps/api/viewsets/v2/imports_viewset.py b/onadata/apps/api/viewsets/v2/imports_viewset.py index c4f7279978..5145a85934 100644 --- a/onadata/apps/api/viewsets/v2/imports_viewset.py +++ b/onadata/apps/api/viewsets/v2/imports_viewset.py @@ -1,3 +1,4 @@ +import json import os from django.conf import settings @@ -14,16 +15,20 @@ from onadata.libs.mixins.cache_control_mixin import CacheControlMixin from onadata.libs.mixins.etags_mixin import ETagsMixin from onadata.libs.utils.async_status import get_active_tasks -from onadata.libs.utils.csv_import import submission_xls_to_csv, submit_csv, submit_csv_async +from onadata.libs.utils.csv_import import ( + submission_xls_to_csv, + submit_csv, + submit_csv_async, +) from onadata.settings.common import CSV_EXTENSION, XLS_EXTENSIONS BaseViewset = get_baseviewset_class() -class ImportsViewSet(ETagsMixin, CacheControlMixin, - viewsets.ViewSet): +class ImportsViewSet(ETagsMixin, CacheControlMixin, viewsets.ViewSet): permission_classes = [XFormPermissions] queryset = XForm.objects.filter(deleted_at__isnull=True) + task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] def get_queryset(self): return XForm.objects.filter(deleted_at__isnull=True) @@ -34,13 +39,46 @@ def get_object(self, pk: int): self.check_object_permissions(self.request, obj) return obj - @action(detail=True, methods=["POST"]) - def start(self, request, pk: int = None): + def _get_active_tasks(self, xform: XForm) -> str: + """ + Utility function to retrieve the active tasks of an XForm + """ + return get_active_tasks(self.task_names, xform) + + def create(self, request, pk: int = None) -> Response: + """ + Starts a new Import task for a given form; The route processes imports asynchronously + unless the `DISABLE_ASYNCHRONOUS_IMPORTS` setting is set to false. + + Route: + + Curl example: + + $ curl -X POST "http:///api/v2/imports/" + + Supported Query Parameters: + + - overwrite: bool = Whether the server should permanently delete the data currently available on + the form then reimport the data using the csv_file/xls_file sent with the request. + + Required Request Arguements: + + - csv_file: file = A CSV File containing the forms data + - xls_file: file = An XLSX file containing the forms data + + Possible Response status codes: + + - 202 Accepted: Server has successfully accepted your request for data import and has queued the task + - 200 Ok: Server has successfully imported your data to the form; Only returned when asynchronous imports are disabled + - 400 Bad Request: Request has been refused due to incorrect/missing csv_file or xls_file file + - 403 Forbidden: The request was valid but the server refused to process it. An explanation on why it was refused can be found in the JSON Response + """ xform = self.get_object(pk) resp = {} - import ipdb; ipdb.set_trace() csv_file = request.FILES.get("csv_file", None) xls_file = request.FILES.get("xls_file", None) + status_code = status.HTTP_400_BAD_REQUEST + if csv_file is None and xls_file is None: resp.update({"error": "csv_file and xls_file field empty"}) elif xls_file and xls_file.name.split(".")[-1] not in XLS_EXTENSIONS: @@ -50,39 +88,56 @@ def start(self, request, pk: int = None): else: if xls_file and xls_file.name.split(".")[-1] in XLS_EXTENSIONS: csv_file = submission_xls_to_csv(xls_file) + overwrite = request.query_params.get("overwrite") - overwrite = (overwrite.lower() == "true" if isinstance( - overwrite, str) else overwrite) - size_threshold = settings.CSV_FILESIZE_IMPORT_ASYNC_THRESHOLD - try: - csv_size = csv_file.size - except AttributeError: - csv_size = csv_file.__sizeof__() - csv_file.seek(0) - upload_to = os.path.join(request.user.username, "csv_imports", - csv_file.name) - file_name = default_storage.save(upload_to, csv_file) - task = submit_csv_async.delay(request.user.username, - self.object.pk, file_name, overwrite) - if task is None: - raise ParseError("Task not found") - resp.update({"task_id": task.task_id}) - - return Response( - data=resp, - status=status.HTTP_200_OK - if resp.get("error") is None else status.HTTP_400_BAD_REQUEST, - content_type="application/json" + overwrite = ( + overwrite.lower() == "true" if isinstance(overwrite, str) else overwrite ) - def retrieve(self, request, pk: int = None): + # Block overwrite imports from running in parallel + active_tasks = json.load(self._get_active_tasks(xform)) + for task in active_tasks: + if task.get("overwrite", False): + task_id = task.get("id") + resp.update( + { + "reason": "An ongoing overwrite request with the ID {task_id} is being processed" + } + ) + status_code = status.HTTP_403_FORBIDDEN + break + + if not status_code == status.HTTP_403_FORBIDDEN: + try: + csv_size = csv_file.size + except AttributeError: + csv_size = csv_file.__sizeof__() + csv_file.seek(0) + + if getattr(settings, "DISABLE_ASYNCHRONOUS_IMPORTS", False): + resp.update(submit_csv(request.user.username, xform, csv_file)) + status_code = status.HTTP_200_OK + else: + upload_to = os.path.join( + request.user.username, "csv_imports", csv_file.name + ) + file_name = default_storage.save(upload_to, csv_file) + task = submit_csv_async.delay( + request.user.username, xform.pk, file_name, overwrite + ) + if task is None: + raise ParseError("Task not found") + resp.update({"task_id": task.task_id}) + status_code = status.HTTP_202_ACCEPTED + + return Response(data=resp, status=status_code, content_type="application/json") + + def retrieve(self, request, pk: int = None) -> Response: """Returns csv import async tasks that belong to this form""" xform = self.get_object(pk) - task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] - if xform: - return Response( - data=get_active_tasks(task_names, xform), - status=status.HTTP_200_OK, - content_type="application/json", - ) + return Response( + data=self._get_active_tasks(xform), + status=status.HTTP_200_OK, + content_type="application/json", + ) From da13b14644f7f21b4f16bb326d0cc2b55ea40d6d Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 10:30:10 +0300 Subject: [PATCH 08/18] Update docstring --- onadata/apps/api/viewsets/v2/imports_viewset.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/onadata/apps/api/viewsets/v2/imports_viewset.py b/onadata/apps/api/viewsets/v2/imports_viewset.py index 5145a85934..7f02a91dbe 100644 --- a/onadata/apps/api/viewsets/v2/imports_viewset.py +++ b/onadata/apps/api/viewsets/v2/imports_viewset.py @@ -50,8 +50,6 @@ def create(self, request, pk: int = None) -> Response: Starts a new Import task for a given form; The route processes imports asynchronously unless the `DISABLE_ASYNCHRONOUS_IMPORTS` setting is set to false. - Route: - Curl example: $ curl -X POST "http:///api/v2/imports/" From 3057ed7830c68b4d554288619c67c8c5a4ba4b76 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 10:40:20 +0300 Subject: [PATCH 09/18] Update `get_active_tasks` function - Update test; ensure that the expected args are present --- onadata/libs/tests/utils/test_async_status.py | 4 ++-- onadata/libs/utils/async_status.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/onadata/libs/tests/utils/test_async_status.py b/onadata/libs/tests/utils/test_async_status.py index 2654aa2580..66d327a910 100644 --- a/onadata/libs/tests/utils/test_async_status.py +++ b/onadata/libs/tests/utils/test_async_status.py @@ -57,7 +57,7 @@ def test_get_active_tasks(self): return_value={ 'celery-worker@onadata-id-1': [ { - 'args': [None, xform.pk], + 'args': [None, xform.pk, "/home/ona/import.csv", True], 'id': '11', 'time_start': time_start, 'name': 'onadata.libs.utils.csv_import.submit_csv_async', @@ -74,5 +74,5 @@ def test_get_active_tasks(self): '[{"job_uuid": "11", "time_start"' + ": \"" + datetime.fromtimestamp(time_start).strftime("%Y-%m-%dT%H:%M:%S") - + "\"}]", + + '", "file": "/home/ona/import.csv", "overwrite": true}]', ) diff --git a/onadata/libs/utils/async_status.py b/onadata/libs/utils/async_status.py index cf851e3de9..944322edba 100644 --- a/onadata/libs/utils/async_status.py +++ b/onadata/libs/utils/async_status.py @@ -70,7 +70,7 @@ def get_active_tasks(task_names: List[str], xform: XForm): "%Y-%m-%dT%H:%M:%S" ), "file": gettext(i["args"][2]), - "overwrite": gettext(i["args"][3]), + "overwrite": i["args"][3], }, data, ) From de7fa8c83a1ba880337ab3a89db8d103bca66017 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 11:13:57 +0300 Subject: [PATCH 10/18] Move version 2 viewset tests to separate folder --- onadata/apps/api/tests/viewsets/v2/__init__.py | 0 onadata/apps/api/tests/viewsets/{ => v2}/test_tableau_viewset.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 onadata/apps/api/tests/viewsets/v2/__init__.py rename onadata/apps/api/tests/viewsets/{ => v2}/test_tableau_viewset.py (100%) diff --git a/onadata/apps/api/tests/viewsets/v2/__init__.py b/onadata/apps/api/tests/viewsets/v2/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/onadata/apps/api/tests/viewsets/test_tableau_viewset.py b/onadata/apps/api/tests/viewsets/v2/test_tableau_viewset.py similarity index 100% rename from onadata/apps/api/tests/viewsets/test_tableau_viewset.py rename to onadata/apps/api/tests/viewsets/v2/test_tableau_viewset.py From 112007eca0b036b05b2705b7d972bff3a423e429 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 12:17:03 +0300 Subject: [PATCH 11/18] Add tests for the ImportsViewSet `create` route --- .../tests/viewsets/v2/test_imports_viewset.py | 175 ++++++++++++++++++ .../apps/api/viewsets/v2/imports_viewset.py | 8 +- 2 files changed, 179 insertions(+), 4 deletions(-) create mode 100644 onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py diff --git a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py new file mode 100644 index 0000000000..d72d6ab0d7 --- /dev/null +++ b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py @@ -0,0 +1,175 @@ +""" +Module containing tests for the ImportsViewSet +""" +import os + +from typing import IO, Any +from unittest.mock import patch + +from django.conf import settings +from django.test import override_settings +from httmock import HTTMock + +from onadata.celeryapp import app +from onadata.apps.api.viewsets.v2.imports_viewset import ImportsViewSet +from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet +from onadata.apps.api.tests.mocked_data import enketo_mock + + +def fixtures_path(filepath: str) -> IO[Any]: + """Returns the file object at the given filepath.""" + return open( + os.path.join( + settings.PROJECT_ROOT, "libs", "tests", "utils", "fixtures", filepath + ), + "rb", + ) + + +class TestImportsViewSet(TestAbstractViewSet): + def setUp(self): + super(TestImportsViewSet, self).setUp() + self.view = ImportsViewSet.as_view({"post": "create", "get": "retrieve"}) + + @override_settings(DISABLE_ASYNCHRONOUS_IMPORTS=True) + def test_create_expected_synchronous_response(self): + """ + Tests that the `api/v2/imports/` route processes a request + successfully when `DISABLE_ASYNCHRONOUS_IMPORTS` is set to `True` + """ + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + self._publish_xls_form_to_project(xlsform_path=xls_path) + form = self.project.xform_set.first() + csv_import = fixtures_path("good.csv") + post_data = {"csv_file": csv_import} + request = self.factory.post( + f"/api/v2/imports/{form.pk}", data=post_data, **self.extra + ) + response = self.view(request, pk=self.xform.id) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.content_type, "application/json") + self.assertEqual(response.get("Cache-Control"), None) + self.assertEqual(response.data.get("additions"), 9) + self.assertEqual(response.data.get("updates"), 0) + + def test_create_expected_async_response(self): + """ + Tests that the `api/v2/imports/` route processes a request + successfully when `DISABLE_ASYNCHRONOUS_IMPORTS` is set to `False` + """ + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + self._publish_xls_form_to_project(xlsform_path=xls_path) + form = self.project.xform_set.first() + csv_import = fixtures_path("good.csv") + post_data = {"csv_file": csv_import} + request = self.factory.post( + "/api/v2/imports/{form.pk}", data=post_data, **self.extra + ) + response = self.view(request, pk=self.xform.id) + + self.assertEqual(response.status_code, 202) + self.assertEqual(response.content_type, "application/json") + expected_fields = ["task_id"] + self.assertEqual(expected_fields, list(response.data.keys())) + + @patch("onadata.apps.api.viewsets.v2.imports_viewset.get_active_tasks") + def test_create_ongoing_overwrite_task(self, mocked_get_active_tasks): + """ + Test that the `api/v2/imports/` route refuses to process request + when an overwrite import task is ongoing + """ + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + self._publish_xls_form_to_project(xlsform_path=xls_path) + form = self.project.xform_set.first() + path = "/api/v2/imports/{form.pk}" + mocked_get_active_tasks.return_value = '[{"job_uuid": "11", "time_start": "1664372983.8631873", "file": "good.csv", "overwrite": true}]' + + # Test that request fails when csv_file & xls_file are not sent + csv_import = fixtures_path("good.csv") + post_data = {"csv_file": csv_import} + request = self.factory.post(path, data=post_data, **self.extra) + response = self.view(request, pk=self.xform.id) + + self.assertEqual(response.status_code, 403) + self.assertEqual(response.content_type, "application/json") + expected_response = { + "reason": "An ongoing overwrite request with the ID 11 is being processed" + } + self.assertEqual(expected_response, response.data) + + def test_create_request_validation(self): + """ + Tests that the `api/v2/imports/` route validates requests. + + Expected Validations: + - Checks that either `xls_file` or `csv_file` is sent + - Checks that `xls_file` is an XLS File + - Checks that `csv_file` is a CSV File + """ + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + self._publish_xls_form_to_project(xlsform_path=xls_path) + form = self.project.xform_set.first() + path = "/api/v2/imports/{form.pk}" + + # Test that request fails when csv_file & xls_file are not sent + request = self.factory.post(path, **self.extra) + response = self.view(request, pk=self.xform.id) + + self.assertEqual(response.status_code, 400) + self.assertEqual(response.content_type, "application/json") + expected_response = {"error": "csv_file and xls_file field empty"} + self.assertEqual(expected_response, response.data) + + # Test that request fails when csv_file or xls_file + # has the incorrect extension + csv_import = fixtures_path("good.csv") + post_data = {"xls_file": csv_import} + request = self.factory.post(path, data=post_data, **self.extra) + response = self.view(request, pk=self.xform.id) + + self.assertEqual(response.status_code, 400) + self.assertEqual(response.content_type, "application/json") + expected_response = {"error": "xls_file not an excel file"} + self.assertEqual(expected_response, response.data) + + post_data = {"csv_file": open(xls_path, "rb")} + request = self.factory.post(path, data=post_data, **self.extra) + response = self.view(request, pk=self.xform.id) + + self.assertEqual(response.status_code, 400) + self.assertEqual(response.content_type, "application/json") + expected_response = {"error": "csv_file not a csv file"} + self.assertEqual(expected_response, response.data) diff --git a/onadata/apps/api/viewsets/v2/imports_viewset.py b/onadata/apps/api/viewsets/v2/imports_viewset.py index 7f02a91dbe..157aaff1c9 100644 --- a/onadata/apps/api/viewsets/v2/imports_viewset.py +++ b/onadata/apps/api/viewsets/v2/imports_viewset.py @@ -92,14 +92,14 @@ def create(self, request, pk: int = None) -> Response: overwrite.lower() == "true" if isinstance(overwrite, str) else overwrite ) - # Block overwrite imports from running in parallel - active_tasks = json.load(self._get_active_tasks(xform)) + # Block imports from running when an overwrite is ongoing + active_tasks = json.loads(self._get_active_tasks(xform)) for task in active_tasks: if task.get("overwrite", False): - task_id = task.get("id") + task_id = task.get("job_uuid") resp.update( { - "reason": "An ongoing overwrite request with the ID {task_id} is being processed" + "reason": f"An ongoing overwrite request with the ID {task_id} is being processed" } ) status_code = status.HTTP_403_FORBIDDEN From c60dd97f6712f9a035d9b872d2393b4e7db3da2b Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 13:57:27 +0300 Subject: [PATCH 12/18] Add `ImportPermissions` class --- onadata/apps/api/permissions.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/onadata/apps/api/permissions.py b/onadata/apps/api/permissions.py index 8d5db54b2b..01da241101 100644 --- a/onadata/apps/api/permissions.py +++ b/onadata/apps/api/permissions.py @@ -127,6 +127,17 @@ class DjangoObjectPermissionsAllowAnon(DjangoObjectPermissions): authenticated_users_only = False +class ImportPermissions(DjangoObjectPermissions): + """ + ImportPermissions - custom permissions check on Imports viewset. + """ + + authenticated_users_only = True + + def has_object_permission(self, request, view, obj): + return request.user.has_perm(CAN_CHANGE_XFORM, obj) + + class XFormPermissions(DjangoObjectPermissions): """ XFormPermissions - custom permissions check on XForm viewset. From a6c436d6ba1fa9d08dcbbf2778fae55a9eedba7f Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 13:58:00 +0300 Subject: [PATCH 13/18] Add `delete` route for the ImportsViewSet --- .../tests/viewsets/v2/test_imports_viewset.py | 226 ++++++++++++++++-- .../apps/api/viewsets/v2/imports_viewset.py | 59 ++++- 2 files changed, 256 insertions(+), 29 deletions(-) diff --git a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py index d72d6ab0d7..1dfeb6336d 100644 --- a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py +++ b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py @@ -6,6 +6,7 @@ from typing import IO, Any from unittest.mock import patch +from django.contrib.auth.models import User from django.conf import settings from django.test import override_settings from httmock import HTTMock @@ -14,6 +15,8 @@ from onadata.apps.api.viewsets.v2.imports_viewset import ImportsViewSet from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet from onadata.apps.api.tests.mocked_data import enketo_mock +from onadata.apps.main.models import UserProfile +from onadata.libs.permissions import DataEntryRole, EditorRole def fixtures_path(filepath: str) -> IO[Any]: @@ -29,12 +32,78 @@ def fixtures_path(filepath: str) -> IO[Any]: class TestImportsViewSet(TestAbstractViewSet): def setUp(self): super(TestImportsViewSet, self).setUp() - self.view = ImportsViewSet.as_view({"post": "create", "get": "retrieve"}) + self.view = ImportsViewSet.as_view( + {"post": "create", "get": "retrieve", "delete": "destroy"} + ) + + def test_create_permissions(self): + """ + Tests that only users with Editor role or role superseding it can + create imports + """ + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + self._publish_xls_form_to_project(xlsform_path=xls_path) + user = User.objects.create( + username="joe", email="joe@example.com", first_name="Joe" + ) + _ = UserProfile.objects.create(user=user) + extra = {"HTTP_AUTHORIZATION": f"Token {user.auth_token}"} + csv_import = fixtures_path("good.csv") + post_data = {"csv_file": csv_import} + + # Unauthenticated request fails + request = self.factory.post( + f"/api/v2/imports/{self.xform.pk}", data=post_data + ) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 401) + + # User without permissions can not import data + request = self.factory.post( + f"/api/v2/imports/{self.xform.pk}", data=post_data, **extra + ) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 403) + self.assertEqual( + "You do not have permission to perform this action.", + str(response.data.get("detail")), + ) + + # User with dataentry role can not import data + DataEntryRole.add(user, self.xform) + + request = self.factory.post( + f"/api/v2/imports/{self.xform.pk}", data=post_data, **extra + ) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 403) + self.assertEqual( + "You do not have permission to perform this action.", + str(response.data.get("detail")), + ) + + # User with editor role can import data + DataEntryRole.remove_obj_permissions(user, self.xform) + EditorRole.add(user, self.xform) + + request = self.factory.post( + f"/api/v2/imports/{self.xform.pk}", data=post_data, **extra + ) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 202) @override_settings(DISABLE_ASYNCHRONOUS_IMPORTS=True) def test_create_expected_synchronous_response(self): """ - Tests that the `api/v2/imports/` route processes a request + Tests that the `api/v2/imports/` route processes a request successfully when `DISABLE_ASYNCHRONOUS_IMPORTS` is set to `True` """ with HTTMock(enketo_mock): @@ -47,13 +116,12 @@ def test_create_expected_synchronous_response(self): "tutorial.xlsx", ) self._publish_xls_form_to_project(xlsform_path=xls_path) - form = self.project.xform_set.first() csv_import = fixtures_path("good.csv") post_data = {"csv_file": csv_import} request = self.factory.post( - f"/api/v2/imports/{form.pk}", data=post_data, **self.extra + f"/api/v2/imports/{self.xform.pk}", data=post_data, **self.extra ) - response = self.view(request, pk=self.xform.id) + response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 200) self.assertEqual(response.content_type, "application/json") @@ -63,7 +131,7 @@ def test_create_expected_synchronous_response(self): def test_create_expected_async_response(self): """ - Tests that the `api/v2/imports/` route processes a request + Tests that the `api/v2/imports/` route processes a request successfully when `DISABLE_ASYNCHRONOUS_IMPORTS` is set to `False` """ with HTTMock(enketo_mock): @@ -76,13 +144,12 @@ def test_create_expected_async_response(self): "tutorial.xlsx", ) self._publish_xls_form_to_project(xlsform_path=xls_path) - form = self.project.xform_set.first() csv_import = fixtures_path("good.csv") post_data = {"csv_file": csv_import} request = self.factory.post( - "/api/v2/imports/{form.pk}", data=post_data, **self.extra + "/api/v2/imports/{self.xform.pk}", data=post_data, **self.extra ) - response = self.view(request, pk=self.xform.id) + response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 202) self.assertEqual(response.content_type, "application/json") @@ -92,7 +159,7 @@ def test_create_expected_async_response(self): @patch("onadata.apps.api.viewsets.v2.imports_viewset.get_active_tasks") def test_create_ongoing_overwrite_task(self, mocked_get_active_tasks): """ - Test that the `api/v2/imports/` route refuses to process request + Test that the `api/v2/imports/` route refuses to process request when an overwrite import task is ongoing """ with HTTMock(enketo_mock): @@ -105,26 +172,25 @@ def test_create_ongoing_overwrite_task(self, mocked_get_active_tasks): "tutorial.xlsx", ) self._publish_xls_form_to_project(xlsform_path=xls_path) - form = self.project.xform_set.first() - path = "/api/v2/imports/{form.pk}" + path = "/api/v2/imports/{self.xform.pk}" mocked_get_active_tasks.return_value = '[{"job_uuid": "11", "time_start": "1664372983.8631873", "file": "good.csv", "overwrite": true}]' # Test that request fails when csv_file & xls_file are not sent csv_import = fixtures_path("good.csv") post_data = {"csv_file": csv_import} request = self.factory.post(path, data=post_data, **self.extra) - response = self.view(request, pk=self.xform.id) + response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 403) self.assertEqual(response.content_type, "application/json") - expected_response = { - "reason": "An ongoing overwrite request with the ID 11 is being processed" - } - self.assertEqual(expected_response, response.data) + self.assertEqual( + "An ongoing overwrite request with the ID 11 is being processed", + str(response.data.get("detail")), + ) def test_create_request_validation(self): """ - Tests that the `api/v2/imports/` route validates requests. + Tests that the `api/v2/imports/` route validates requests. Expected Validations: - Checks that either `xls_file` or `csv_file` is sent @@ -141,12 +207,11 @@ def test_create_request_validation(self): "tutorial.xlsx", ) self._publish_xls_form_to_project(xlsform_path=xls_path) - form = self.project.xform_set.first() - path = "/api/v2/imports/{form.pk}" + path = "/api/v2/imports/{self.xform.pk}" # Test that request fails when csv_file & xls_file are not sent request = self.factory.post(path, **self.extra) - response = self.view(request, pk=self.xform.id) + response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 400) self.assertEqual(response.content_type, "application/json") @@ -158,7 +223,7 @@ def test_create_request_validation(self): csv_import = fixtures_path("good.csv") post_data = {"xls_file": csv_import} request = self.factory.post(path, data=post_data, **self.extra) - response = self.view(request, pk=self.xform.id) + response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 400) self.assertEqual(response.content_type, "application/json") @@ -167,9 +232,124 @@ def test_create_request_validation(self): post_data = {"csv_file": open(xls_path, "rb")} request = self.factory.post(path, data=post_data, **self.extra) - response = self.view(request, pk=self.xform.id) + response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 400) self.assertEqual(response.content_type, "application/json") expected_response = {"error": "csv_file not a csv file"} self.assertEqual(expected_response, response.data) + + def test_delete_permissions(self): + """ + Tests that only users with Editor role or role superseding it can + terminate imports + """ + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + self._publish_xls_form_to_project(xlsform_path=xls_path) + path = "/api/v2/imports/{self.xform.pk}?task_uuid=11" + user = User.objects.create( + username="joe", email="joe@example.com", first_name="Joe" + ) + _ = UserProfile.objects.create(user=user) + extra = {"HTTP_AUTHORIZATION": f"Token {user.auth_token}"} + + # Test that unauthenticated users can not terminate imports + request = self.factory.delete(path) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 401) + + # Test that users without permissions to the form can not terminate imports + request = self.factory.delete(path, **extra) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 403) + self.assertEqual( + "You do not have permission to perform this action.", + str(response.data.get("detail")), + ) + + # Test that users with data entry permissions can not terminate imports + DataEntryRole.add(user, self.xform) + + request = self.factory.delete(path, **extra) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 403) + self.assertEqual( + "You do not have permission to perform this action.", + str(response.data.get("detail")), + ) + + # Test that users with editor role can terminate imports + DataEntryRole.remove_obj_permissions(user, self.xform) + EditorRole.add(user, self.xform) + + request = self.factory.delete(path, **extra) + response = self.view(request, pk=self.xform.pk) + self.assertEqual(response.status_code, 400) + self.assertEqual( + {"error": "Queued task with ID 11 does not exist"}, response.data + ) + + @patch("onadata.apps.api.viewsets.v2.imports_viewset.terminate_import_task") + def test_delete_expected_response(self, mocked_terminate_import_task): + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + self._publish_xls_form_to_project(xlsform_path=xls_path) + path = "/api/v2/imports/{self.xform.pk}?task_uuid=11" + mocked_terminate_import_task.return_value = True + + request = self.factory.delete(path, **self.extra) + response = self.view(request, pk=self.xform.pk) + + self.assertEqual(response.status_code, 204) + + @patch("onadata.apps.api.viewsets.v2.imports_viewset.app") + def test_delete_validation(self, mocked_celery_app): + with HTTMock(enketo_mock): + xls_path = os.path.join( + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "tutorial.xlsx", + ) + + self._publish_xls_form_to_project(xlsform_path=xls_path) + path = "/api/v2/imports/{self.xform.pk}" + + # Test that request fails without task_uuid query param + request = self.factory.delete(path, **self.extra) + response = self.view(request, pk=self.xform.pk) + + self.assertEqual(response.status_code, 400) + self.assertEqual( + {"error": "The task_uuid query parameter is required"}, response.data + ) + + # Test that request fails if task_uuid does not exist for form + mocked_celery_app.control.inspect().query_task.return_value = { + "id": 11, + "args": [None, "0", "good.csv", True], + } + request = self.factory.delete(f"{path}?task_uuid=11", **self.extra) + response = self.view(request, pk=self.xform.pk) + + self.assertEqual(response.status_code, 400) + self.assertEqual( + {"error": "Queued task with ID 11 does not exist"}, response.data + ) diff --git a/onadata/apps/api/viewsets/v2/imports_viewset.py b/onadata/apps/api/viewsets/v2/imports_viewset.py index 157aaff1c9..71f232e9cb 100644 --- a/onadata/apps/api/viewsets/v2/imports_viewset.py +++ b/onadata/apps/api/viewsets/v2/imports_viewset.py @@ -6,13 +6,15 @@ from django.core.files.storage import default_storage from rest_framework import viewsets, status from rest_framework.response import Response -from rest_framework.exceptions import ParseError +from rest_framework.exceptions import ParseError, ErrorDetail from rest_framework.decorators import action +from onadata.celeryapp import app from onadata.apps.api.tools import get_baseviewset_class -from onadata.apps.api.permissions import XFormPermissions +from onadata.apps.api.permissions import ImportPermissions from onadata.apps.logger.models import XForm from onadata.libs.mixins.cache_control_mixin import CacheControlMixin +from onadata.libs.mixins.authenticate_header_mixin import AuthenticateHeaderMixin from onadata.libs.mixins.etags_mixin import ETagsMixin from onadata.libs.utils.async_status import get_active_tasks from onadata.libs.utils.csv_import import ( @@ -25,8 +27,18 @@ BaseViewset = get_baseviewset_class() -class ImportsViewSet(ETagsMixin, CacheControlMixin, viewsets.ViewSet): - permission_classes = [XFormPermissions] +def terminate_import_task(task_uuid: str, xform_pk: int) -> bool: + task_details = app.control.inspect().query_task(task_uuid) + if task_details and task_details["args"][1] == xform_pk: + app.control.terminate(task_uuid) + return True + return False + + +class ImportsViewSet( + AuthenticateHeaderMixin, ETagsMixin, CacheControlMixin, viewsets.ViewSet +): + permission_classes = [ImportPermissions] queryset = XForm.objects.filter(deleted_at__isnull=True) task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] @@ -56,7 +68,7 @@ def create(self, request, pk: int = None) -> Response: Supported Query Parameters: - - overwrite: bool = Whether the server should permanently delete the data currently available on + - [Optional] overwrite: bool = Whether the server should permanently delete the data currently available on the form then reimport the data using the csv_file/xls_file sent with the request. Required Request Arguements: @@ -70,6 +82,7 @@ def create(self, request, pk: int = None) -> Response: - 200 Ok: Server has successfully imported your data to the form; Only returned when asynchronous imports are disabled - 400 Bad Request: Request has been refused due to incorrect/missing csv_file or xls_file file - 403 Forbidden: The request was valid but the server refused to process it. An explanation on why it was refused can be found in the JSON Response + - 401 Unauthorized: The request has been refused due to missing authentication """ xform = self.get_object(pk) resp = {} @@ -99,7 +112,9 @@ def create(self, request, pk: int = None) -> Response: task_id = task.get("job_uuid") resp.update( { - "reason": f"An ongoing overwrite request with the ID {task_id} is being processed" + "detail": ErrorDetail( + f"An ongoing overwrite request with the ID {task_id} is being processed" + ) } ) status_code = status.HTTP_403_FORBIDDEN @@ -139,3 +154,35 @@ def retrieve(self, request, pk: int = None) -> Response: status=status.HTTP_200_OK, content_type="application/json", ) + + def destroy(self, request, pk: int = None) -> Response: + """ + Stops a queued/on-going import task + + Supported Query Parameters: + + - [Required] task_uuid: str = The unique task uuid for the form + + Possible Response status codes: + + - 204 No Content: Request was successfully processed. Task was terminated. + - 400 Bad Request: Request was rejected either due to a missing `task_uuid` query parameter or because the `task_uuid` does not exist for the XForm + """ + xform = self.get_object(pk) + task_uuid = request.query_params.get("task_uuid") + + if not task_uuid: + return Response( + data={"error": "The task_uuid query parameter is required"}, + status=status.HTTP_400_BAD_REQUEST, + content_type="application/json", + ) + + successful = terminate_import_task(task_uuid, xform.pk) + if not successful: + return Response( + data={"error": f"Queued task with ID {task_uuid} does not exist"}, + status=status.HTTP_400_BAD_REQUEST, + content_type="application/json", + ) + return Response(status=status.HTTP_204_NO_CONTENT) From c48b672bd9793885ffa72ec1e50e4ac56ee83ff7 Mon Sep 17 00:00:00 2001 From: Davis Raymond Muro Date: Wed, 5 Oct 2022 14:03:59 +0300 Subject: [PATCH 14/18] Add test docstring --- .../api/tests/viewsets/v2/test_imports_viewset.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py index 1dfeb6336d..726eeced78 100644 --- a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py +++ b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py @@ -299,6 +299,10 @@ def test_delete_permissions(self): @patch("onadata.apps.api.viewsets.v2.imports_viewset.terminate_import_task") def test_delete_expected_response(self, mocked_terminate_import_task): + """ + Test that the `api/v2/imports/` DELETE route returns the + expected successful response + """ with HTTMock(enketo_mock): xls_path = os.path.join( settings.PROJECT_ROOT, @@ -319,6 +323,13 @@ def test_delete_expected_response(self, mocked_terminate_import_task): @patch("onadata.apps.api.viewsets.v2.imports_viewset.app") def test_delete_validation(self, mocked_celery_app): + """ + Test that the `api/v2/imports/` DELETE route validates requests. + + Expected validation checks: + - Checks that `task_uuid` is sent + - Checks that the task_uuid actually exists and is tied to the form + """ with HTTMock(enketo_mock): xls_path = os.path.join( settings.PROJECT_ROOT, From cbf7af03cc704b50cf7f49ff8c86cfc5c1ec40bd Mon Sep 17 00:00:00 2001 From: apiyo Date: Fri, 16 Dec 2022 11:36:34 +0300 Subject: [PATCH 15/18] Add documentation for imports endpoint --- docs/forms.rst | 25 ------------------- docs/imports.rst | 64 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 25 deletions(-) create mode 100644 docs/imports.rst diff --git a/docs/forms.rst b/docs/forms.rst index 8b04cbb5e0..1c932edd28 100644 --- a/docs/forms.rst +++ b/docs/forms.rst @@ -1629,28 +1629,3 @@ If the upload is still running: "job_status": "PENDING" } - -Check active imports --------------------- - -.. raw:: html - -
GET /api/v1/forms/{form_id}/active_imports
- -Example -^^^^^^^ -:: - - curl -X GET https://api.ona.io/api/v1/forms/{form_id}/active_imports - -Response -^^^^^^^^ - -:: - - [ - { - "job_uuid": "256dcef5-1baa-48ee-83a3-f7100123f5d2", - "time_start": "2022-09-29T09:08:59" - } - ] diff --git a/docs/imports.rst b/docs/imports.rst new file mode 100644 index 0000000000..f94a4bd403 --- /dev/null +++ b/docs/imports.rst @@ -0,0 +1,64 @@ +Check active imports +-------------------- + +.. raw:: html + +
GET /api/v2/imports/{form_id}
+ +Example +^^^^^^^ +:: + + curl -X GET https://api.ona.io/api/v2/imports/{form_id} + +Response +^^^^^^^^ + +:: + + [ + { + "job_uuid": "256dcef5-1baa-48ee-83a3-f7100123f5d2", + "time_start": "2022-09-29T09:08:59" + } + ] + + +Stops a queued/on-going import task +----------------------------------- + +.. raw:: html + +
DELTE /api/v2/imports/{form_id}?task_uuid={task_uuid}
+ +Example +^^^^^^^ +:: + + curl -X DELETE https://api.ona.io/api/v2/imports/{form_id}?task_uuid={task_uuid} + +Response +^^^^^^^^ + +:: + HTTP 204 NO CONTENT + + +Starts a new Import task for a given form +----------------------------------- + +.. raw:: html + +
POST /api/v2/imports/{form_id}
+ +Example +^^^^^^^ +:: + + cur-X POST -F 'csv_file=@' https://api.ona.io/api/v2/imports/{form_id} + +Response +^^^^^^^^ + +:: + {"task_id":"0819b3b0-f0e5-4a6b-83aa-addab20fe208"} From f55cbf3433f44eb29abcf73b6d88832e939ef8d8 Mon Sep 17 00:00:00 2001 From: apiyo Date: Fri, 16 Dec 2022 12:58:39 +0300 Subject: [PATCH 16/18] Ensure retreive endpoint for imports viewset returns json and fix failing tests --- .../tests/viewsets/v2/test_imports_viewset.py | 95 +++++++++++-------- .../apps/api/viewsets/v2/imports_viewset.py | 56 ++++++----- onadata/libs/utils/async_status.py | 6 +- 3 files changed, 88 insertions(+), 69 deletions(-) diff --git a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py index 726eeced78..52892932d2 100644 --- a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py +++ b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py @@ -22,19 +22,21 @@ def fixtures_path(filepath: str) -> IO[Any]: """Returns the file object at the given filepath.""" return open( - os.path.join( - settings.PROJECT_ROOT, "libs", "tests", "utils", "fixtures", filepath - ), + os.path.join(settings.PROJECT_ROOT, "libs", "tests", "utils", + "fixtures", filepath), "rb", ) class TestImportsViewSet(TestAbstractViewSet): + def setUp(self): super(TestImportsViewSet, self).setUp() - self.view = ImportsViewSet.as_view( - {"post": "create", "get": "retrieve", "delete": "destroy"} - ) + self.view = ImportsViewSet.as_view({ + "post": "create", + "get": "retrieve", + "delete": "destroy" + }) def test_create_permissions(self): """ @@ -51,25 +53,24 @@ def test_create_permissions(self): "tutorial.xlsx", ) self._publish_xls_form_to_project(xlsform_path=xls_path) - user = User.objects.create( - username="joe", email="joe@example.com", first_name="Joe" - ) + user = User.objects.create(username="joe", + email="joe@example.com", + first_name="Joe") _ = UserProfile.objects.create(user=user) extra = {"HTTP_AUTHORIZATION": f"Token {user.auth_token}"} csv_import = fixtures_path("good.csv") post_data = {"csv_file": csv_import} # Unauthenticated request fails - request = self.factory.post( - f"/api/v2/imports/{self.xform.pk}", data=post_data - ) + request = self.factory.post(f"/api/v2/imports/{self.xform.pk}", + data=post_data) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 401) # User without permissions can not import data - request = self.factory.post( - f"/api/v2/imports/{self.xform.pk}", data=post_data, **extra - ) + request = self.factory.post(f"/api/v2/imports/{self.xform.pk}", + data=post_data, + **extra) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 403) self.assertEqual( @@ -80,9 +81,9 @@ def test_create_permissions(self): # User with dataentry role can not import data DataEntryRole.add(user, self.xform) - request = self.factory.post( - f"/api/v2/imports/{self.xform.pk}", data=post_data, **extra - ) + request = self.factory.post(f"/api/v2/imports/{self.xform.pk}", + data=post_data, + **extra) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 403) self.assertEqual( @@ -94,9 +95,9 @@ def test_create_permissions(self): DataEntryRole.remove_obj_permissions(user, self.xform) EditorRole.add(user, self.xform) - request = self.factory.post( - f"/api/v2/imports/{self.xform.pk}", data=post_data, **extra - ) + request = self.factory.post(f"/api/v2/imports/{self.xform.pk}", + data=post_data, + **extra) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 202) @@ -118,9 +119,9 @@ def test_create_expected_synchronous_response(self): self._publish_xls_form_to_project(xlsform_path=xls_path) csv_import = fixtures_path("good.csv") post_data = {"csv_file": csv_import} - request = self.factory.post( - f"/api/v2/imports/{self.xform.pk}", data=post_data, **self.extra - ) + request = self.factory.post(f"/api/v2/imports/{self.xform.pk}", + data=post_data, + **self.extra) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 200) @@ -146,9 +147,11 @@ def test_create_expected_async_response(self): self._publish_xls_form_to_project(xlsform_path=xls_path) csv_import = fixtures_path("good.csv") post_data = {"csv_file": csv_import} - request = self.factory.post( - "/api/v2/imports/{self.xform.pk}", data=post_data, **self.extra - ) + + # import ipdb; ipdb.set_trace() + request = self.factory.post("/api/v2/imports/{self.xform.pk}", + data=post_data, + **self.extra) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 202) @@ -173,7 +176,12 @@ def test_create_ongoing_overwrite_task(self, mocked_get_active_tasks): ) self._publish_xls_form_to_project(xlsform_path=xls_path) path = "/api/v2/imports/{self.xform.pk}" - mocked_get_active_tasks.return_value = '[{"job_uuid": "11", "time_start": "1664372983.8631873", "file": "good.csv", "overwrite": true}]' + mocked_get_active_tasks.return_value = [{ + "job_uuid": "11", + "time_start": "1664372983.8631873", + "file": "good.csv", + "overwrite": True + }] # Test that request fails when csv_file & xls_file are not sent csv_import = fixtures_path("good.csv") @@ -255,9 +263,9 @@ def test_delete_permissions(self): ) self._publish_xls_form_to_project(xlsform_path=xls_path) path = "/api/v2/imports/{self.xform.pk}?task_uuid=11" - user = User.objects.create( - username="joe", email="joe@example.com", first_name="Joe" - ) + user = User.objects.create(username="joe", + email="joe@example.com", + first_name="Joe") _ = UserProfile.objects.create(user=user) extra = {"HTTP_AUTHORIZATION": f"Token {user.auth_token}"} @@ -294,10 +302,11 @@ def test_delete_permissions(self): response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 400) self.assertEqual( - {"error": "Queued task with ID 11 does not exist"}, response.data - ) + {"error": "Queued task with ID 11 does not exist"}, + response.data) - @patch("onadata.apps.api.viewsets.v2.imports_viewset.terminate_import_task") + @patch("onadata.apps.api.viewsets.v2.imports_viewset.terminate_import_task" + ) def test_delete_expected_response(self, mocked_terminate_import_task): """ Test that the `api/v2/imports/` DELETE route returns the @@ -349,18 +358,24 @@ def test_delete_validation(self, mocked_celery_app): self.assertEqual(response.status_code, 400) self.assertEqual( - {"error": "The task_uuid query parameter is required"}, response.data - ) + {"error": "The task_uuid query parameter is required"}, + response.data) # Test that request fails if task_uuid does not exist for form mocked_celery_app.control.inspect().query_task.return_value = { - "id": 11, - "args": [None, "0", "good.csv", True], + 'hostname': { + "11": [ + "active", { + "id": 11, + "args": [None, "0", "good.csv", True], + } + ] + } } request = self.factory.delete(f"{path}?task_uuid=11", **self.extra) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 400) self.assertEqual( - {"error": "Queued task with ID 11 does not exist"}, response.data - ) + {"error": "Queued task with ID 11 does not exist"}, + response.data) diff --git a/onadata/apps/api/viewsets/v2/imports_viewset.py b/onadata/apps/api/viewsets/v2/imports_viewset.py index 71f232e9cb..d4c44942d5 100644 --- a/onadata/apps/api/viewsets/v2/imports_viewset.py +++ b/onadata/apps/api/viewsets/v2/imports_viewset.py @@ -29,15 +29,19 @@ def terminate_import_task(task_uuid: str, xform_pk: int) -> bool: task_details = app.control.inspect().query_task(task_uuid) - if task_details and task_details["args"][1] == xform_pk: + if task_details: + task_details = list(task_details.values()) + if task_details and task_uuid in task_details[0]: + task_details = task_details[0][task_uuid] + + if task_details and task_details[1]["args"][1] == xform_pk: app.control.terminate(task_uuid) return True return False -class ImportsViewSet( - AuthenticateHeaderMixin, ETagsMixin, CacheControlMixin, viewsets.ViewSet -): +class ImportsViewSet(AuthenticateHeaderMixin, ETagsMixin, CacheControlMixin, + viewsets.ViewSet): permission_classes = [ImportPermissions] queryset = XForm.objects.filter(deleted_at__isnull=True) task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] @@ -101,22 +105,20 @@ def create(self, request, pk: int = None) -> Response: csv_file = submission_xls_to_csv(xls_file) overwrite = request.query_params.get("overwrite") - overwrite = ( - overwrite.lower() == "true" if isinstance(overwrite, str) else overwrite - ) + overwrite = (overwrite.lower() == "true" if isinstance( + overwrite, str) else overwrite) # Block imports from running when an overwrite is ongoing - active_tasks = json.loads(self._get_active_tasks(xform)) + active_tasks = self._get_active_tasks(xform) for task in active_tasks: if task.get("overwrite", False): task_id = task.get("job_uuid") - resp.update( - { - "detail": ErrorDetail( - f"An ongoing overwrite request with the ID {task_id} is being processed" - ) - } - ) + resp.update({ + "detail": + ErrorDetail( + f"An ongoing overwrite request with the ID {task_id} is being processed" + ) + }) status_code = status.HTTP_403_FORBIDDEN break @@ -128,24 +130,26 @@ def create(self, request, pk: int = None) -> Response: csv_file.seek(0) if getattr(settings, "DISABLE_ASYNCHRONOUS_IMPORTS", False): - resp.update(submit_csv(request.user.username, xform, csv_file)) + resp.update( + submit_csv(request.user.username, xform, csv_file)) status_code = status.HTTP_200_OK else: - upload_to = os.path.join( - request.user.username, "csv_imports", csv_file.name - ) + upload_to = os.path.join(request.user.username, + "csv_imports", csv_file.name) file_name = default_storage.save(upload_to, csv_file) - task = submit_csv_async.delay( - request.user.username, xform.pk, file_name, overwrite - ) + task = submit_csv_async.delay(request.user.username, + xform.pk, file_name, + overwrite) if task is None: raise ParseError("Task not found") resp.update({"task_id": task.task_id}) status_code = status.HTTP_202_ACCEPTED - return Response(data=resp, status=status_code, content_type="application/json") + return Response(data=resp, + status=status_code, + content_type="application/json") - def retrieve(self, request, pk: int = None) -> Response: + def retrieve(self, request, pk: int = None, format=None) -> Response: """Returns csv import async tasks that belong to this form""" xform = self.get_object(pk) @@ -181,7 +185,9 @@ def destroy(self, request, pk: int = None) -> Response: successful = terminate_import_task(task_uuid, xform.pk) if not successful: return Response( - data={"error": f"Queued task with ID {task_uuid} does not exist"}, + data={ + "error": f"Queued task with ID {task_uuid} does not exist" + }, status=status.HTTP_400_BAD_REQUEST, content_type="application/json", ) diff --git a/onadata/libs/utils/async_status.py b/onadata/libs/utils/async_status.py index 944322edba..79414eee00 100644 --- a/onadata/libs/utils/async_status.py +++ b/onadata/libs/utils/async_status.py @@ -61,8 +61,7 @@ def get_active_tasks(task_names: List[str], xform: XForm): ) ) - return json.dumps( - list( + return list( map( lambda i: { "job_uuid": gettext(i["id"]), @@ -74,5 +73,4 @@ def get_active_tasks(task_names: List[str], xform: XForm): }, data, ) - ) - ) + ) \ No newline at end of file From 2488cb1dfc799bf8b401e6529102e94b60ac50af Mon Sep 17 00:00:00 2001 From: apiyo Date: Fri, 16 Dec 2022 12:59:37 +0300 Subject: [PATCH 17/18] Remove active_imports endpoint from api/v1/ xform_viewset --- onadata/apps/api/viewsets/xform_viewset.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/onadata/apps/api/viewsets/xform_viewset.py b/onadata/apps/api/viewsets/xform_viewset.py index 8516908405..a918c8c29c 100644 --- a/onadata/apps/api/viewsets/xform_viewset.py +++ b/onadata/apps/api/viewsets/xform_viewset.py @@ -878,17 +878,6 @@ def versions(self, request, *args, **kwargs): return Response(data=serializer.data, status=status.HTTP_200_OK) - @action(methods=["GET"], detail=True) - def active_imports(self, request, *args, **kwargs): - """Returns csv import async tasks that belong to this form""" - xform = self.get_object() - task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] - return Response( - data=get_active_tasks(task_names, xform), - status=status.HTTP_200_OK, - content_type="application/json", - ) - @action(methods=["GET"], detail=True) def export_async(self, request, *args, **kwargs): """Returns the status of an async export.""" From d54af21f1531d293c7518359c8ef8812401c55ea Mon Sep 17 00:00:00 2001 From: apiyo Date: Fri, 16 Dec 2022 15:07:03 +0300 Subject: [PATCH 18/18] Fix failing tests and formatting errors --- onadata/apps/api/permissions.py | 1 + .../apps/api/tests/viewsets/test_ona_api.py | 2 +- .../tests/viewsets/v2/test_imports_viewset.py | 31 +++++----- onadata/apps/api/urls/v2_urls.py | 4 ++ .../apps/api/viewsets/v2/imports_viewset.py | 56 +++++++++++++------ onadata/apps/api/viewsets/xform_viewset.py | 1 - onadata/libs/tests/utils/test_async_status.py | 19 +++---- onadata/libs/utils/async_status.py | 26 ++++----- 8 files changed, 82 insertions(+), 58 deletions(-) diff --git a/onadata/apps/api/permissions.py b/onadata/apps/api/permissions.py index 01da241101..3d1bc2a119 100644 --- a/onadata/apps/api/permissions.py +++ b/onadata/apps/api/permissions.py @@ -127,6 +127,7 @@ class DjangoObjectPermissionsAllowAnon(DjangoObjectPermissions): authenticated_users_only = False +# pylint: disable=too-few-public-methods class ImportPermissions(DjangoObjectPermissions): """ ImportPermissions - custom permissions check on Imports viewset. diff --git a/onadata/apps/api/tests/viewsets/test_ona_api.py b/onadata/apps/api/tests/viewsets/test_ona_api.py index 84f7e2b38e..291c41518c 100644 --- a/onadata/apps/api/tests/viewsets/test_ona_api.py +++ b/onadata/apps/api/tests/viewsets/test_ona_api.py @@ -30,4 +30,4 @@ def test_number_of_v2_viewsets(self): request = self.factory.get(path) request.resolver_match = resolve(path) response = view(request) - self.assertEqual(len(response.data), 1) + self.assertEqual(len(response.data), 2) diff --git a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py index 52892932d2..873759a116 100644 --- a/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py +++ b/onadata/apps/api/tests/viewsets/v2/test_imports_viewset.py @@ -6,12 +6,11 @@ from typing import IO, Any from unittest.mock import patch -from django.contrib.auth.models import User +from django.contrib.auth import get_user_model from django.conf import settings from django.test import override_settings from httmock import HTTMock -from onadata.celeryapp import app from onadata.apps.api.viewsets.v2.imports_viewset import ImportsViewSet from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet from onadata.apps.api.tests.mocked_data import enketo_mock @@ -29,9 +28,12 @@ def fixtures_path(filepath: str) -> IO[Any]: class TestImportsViewSet(TestAbstractViewSet): + """ + Test for ImportsViewSet + """ def setUp(self): - super(TestImportsViewSet, self).setUp() + super().setUp() self.view = ImportsViewSet.as_view({ "post": "create", "get": "retrieve", @@ -53,9 +55,9 @@ def test_create_permissions(self): "tutorial.xlsx", ) self._publish_xls_form_to_project(xlsform_path=xls_path) - user = User.objects.create(username="joe", - email="joe@example.com", - first_name="Joe") + user = get_user_model().objects.create(username="joe", + email="joe@example.com", + first_name="Joe") _ = UserProfile.objects.create(user=user) extra = {"HTTP_AUTHORIZATION": f"Token {user.auth_token}"} csv_import = fixtures_path("good.csv") @@ -102,7 +104,7 @@ def test_create_permissions(self): self.assertEqual(response.status_code, 202) @override_settings(DISABLE_ASYNCHRONOUS_IMPORTS=True) - def test_create_expected_synchronous_response(self): + def test_synchronous_response(self): """ Tests that the `api/v2/imports/` route processes a request successfully when `DISABLE_ASYNCHRONOUS_IMPORTS` is set to `True` @@ -130,7 +132,7 @@ def test_create_expected_synchronous_response(self): self.assertEqual(response.data.get("additions"), 9) self.assertEqual(response.data.get("updates"), 0) - def test_create_expected_async_response(self): + def test_expected_async_response(self): """ Tests that the `api/v2/imports/` route processes a request successfully when `DISABLE_ASYNCHRONOUS_IMPORTS` is set to `False` @@ -160,7 +162,7 @@ def test_create_expected_async_response(self): self.assertEqual(expected_fields, list(response.data.keys())) @patch("onadata.apps.api.viewsets.v2.imports_viewset.get_active_tasks") - def test_create_ongoing_overwrite_task(self, mocked_get_active_tasks): + def test_ongoing_overwrite_task(self, mocked_get_active_tasks): """ Test that the `api/v2/imports/` route refuses to process request when an overwrite import task is ongoing @@ -238,8 +240,9 @@ def test_create_request_validation(self): expected_response = {"error": "xls_file not an excel file"} self.assertEqual(expected_response, response.data) - post_data = {"csv_file": open(xls_path, "rb")} - request = self.factory.post(path, data=post_data, **self.extra) + with open(xls_path, "rb") as xls_file: + post_data = {"csv_file": xls_file} + request = self.factory.post(path, data=post_data, **self.extra) response = self.view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 400) @@ -263,9 +266,9 @@ def test_delete_permissions(self): ) self._publish_xls_form_to_project(xlsform_path=xls_path) path = "/api/v2/imports/{self.xform.pk}?task_uuid=11" - user = User.objects.create(username="joe", - email="joe@example.com", - first_name="Joe") + user = get_user_model().objects.create(username="joe", + email="joe@example.com", + first_name="Joe") _ = UserProfile.objects.create(user=user) extra = {"HTTP_AUTHORIZATION": f"Token {user.auth_token}"} diff --git a/onadata/apps/api/urls/v2_urls.py b/onadata/apps/api/urls/v2_urls.py index 011deed4e4..36081a40b2 100644 --- a/onadata/apps/api/urls/v2_urls.py +++ b/onadata/apps/api/urls/v2_urls.py @@ -11,6 +11,9 @@ class DetailedPostRouter(routers.DefaultRouter): + """ + Custom router + """ routes = [ # List route. routers.Route( @@ -52,6 +55,7 @@ class DetailedPostRouter(routers.DefaultRouter): ), ] + # pylint: disable=redefined-outer-name def extend(self, router): """ Extends the routers routes with the routes from another router diff --git a/onadata/apps/api/viewsets/v2/imports_viewset.py b/onadata/apps/api/viewsets/v2/imports_viewset.py index d4c44942d5..0509dd82a0 100644 --- a/onadata/apps/api/viewsets/v2/imports_viewset.py +++ b/onadata/apps/api/viewsets/v2/imports_viewset.py @@ -1,4 +1,6 @@ -import json +""" +Imports v2 viewset +""" import os from django.conf import settings @@ -7,7 +9,6 @@ from rest_framework import viewsets, status from rest_framework.response import Response from rest_framework.exceptions import ParseError, ErrorDetail -from rest_framework.decorators import action from onadata.celeryapp import app from onadata.apps.api.tools import get_baseviewset_class @@ -28,28 +29,40 @@ def terminate_import_task(task_uuid: str, xform_pk: int) -> bool: + """Terminate an import task given a uuid and xform_pk""" task_details = app.control.inspect().query_task(task_uuid) if task_details: task_details = list(task_details.values()) if task_details and task_uuid in task_details[0]: task_details = task_details[0][task_uuid] - if task_details and task_details[1]["args"][1] == xform_pk: + if task_details and task_details[0] == 'active' and \ + task_details[1]["args"] and task_details[1]["args"][1] == xform_pk: app.control.terminate(task_uuid) return True return False +# pylint: disable=too-many-ancestors class ImportsViewSet(AuthenticateHeaderMixin, ETagsMixin, CacheControlMixin, viewsets.ViewSet): + """ + Implements api/v2/imports endpoints + """ permission_classes = [ImportPermissions] queryset = XForm.objects.filter(deleted_at__isnull=True) task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] def get_queryset(self): + """ + Retreive all XForms that have not been deleted + """ return XForm.objects.filter(deleted_at__isnull=True) - def get_object(self, pk: int): + def get_object(self, pk: int): # pylint: disable=invalid-name + """ + Retreive XForm with specified pk + """ queryset = self.get_queryset() obj = get_object_or_404(queryset, pk=pk) self.check_object_permissions(self.request, obj) @@ -61,10 +74,12 @@ def _get_active_tasks(self, xform: XForm) -> str: """ return get_active_tasks(self.task_names, xform) + # pylint: disable=invalid-name def create(self, request, pk: int = None) -> Response: """ - Starts a new Import task for a given form; The route processes imports asynchronously - unless the `DISABLE_ASYNCHRONOUS_IMPORTS` setting is set to false. + Starts a new Import task for a given form; The route processes imports + asynchronously unless the `DISABLE_ASYNCHRONOUS_IMPORTS` setting + is set to false. Curl example: @@ -72,8 +87,9 @@ def create(self, request, pk: int = None) -> Response: Supported Query Parameters: - - [Optional] overwrite: bool = Whether the server should permanently delete the data currently available on - the form then reimport the data using the csv_file/xls_file sent with the request. + - [Optional] overwrite: bool = Whether the server should permanently + delete the data currently available on the form then reimport the + data using the csv_file/xls_file sent with the request. Required Request Arguements: @@ -82,10 +98,15 @@ def create(self, request, pk: int = None) -> Response: Possible Response status codes: - - 202 Accepted: Server has successfully accepted your request for data import and has queued the task - - 200 Ok: Server has successfully imported your data to the form; Only returned when asynchronous imports are disabled - - 400 Bad Request: Request has been refused due to incorrect/missing csv_file or xls_file file - - 403 Forbidden: The request was valid but the server refused to process it. An explanation on why it was refused can be found in the JSON Response + - 202 Accepted: Server has successfully accepted your request for data + import and has queued the task + - 200 Ok: Server has successfully imported your data to the form; + Only returned when asynchronous imports are disabled + - 400 Bad Request: Request has been refused due to incorrect/missing + csv_file or xls_file file + - 403 Forbidden: The request was valid but the server refused to + process it. An explanation on why it was refused can be found + in the JSON Response - 401 Unauthorized: The request has been refused due to missing authentication """ xform = self.get_object(pk) @@ -116,17 +137,14 @@ def create(self, request, pk: int = None) -> Response: resp.update({ "detail": ErrorDetail( - f"An ongoing overwrite request with the ID {task_id} is being processed" + "An ongoing overwrite request with the ID " + + f"{task_id} is being processed" ) }) status_code = status.HTTP_403_FORBIDDEN break if not status_code == status.HTTP_403_FORBIDDEN: - try: - csv_size = csv_file.size - except AttributeError: - csv_size = csv_file.__sizeof__() csv_file.seek(0) if getattr(settings, "DISABLE_ASYNCHRONOUS_IMPORTS", False): @@ -149,6 +167,7 @@ def create(self, request, pk: int = None) -> Response: status=status_code, content_type="application/json") + # pylint: disable=redefined-builtin, unused-argument def retrieve(self, request, pk: int = None, format=None) -> Response: """Returns csv import async tasks that belong to this form""" xform = self.get_object(pk) @@ -170,7 +189,8 @@ def destroy(self, request, pk: int = None) -> Response: Possible Response status codes: - 204 No Content: Request was successfully processed. Task was terminated. - - 400 Bad Request: Request was rejected either due to a missing `task_uuid` query parameter or because the `task_uuid` does not exist for the XForm + - 400 Bad Request: Request was rejected either due to a missing `task_uuid` + query parameter or because the `task_uuid` does not exist for the XForm """ xform = self.get_object(pk) task_uuid = request.query_params.get("task_uuid") diff --git a/onadata/apps/api/viewsets/xform_viewset.py b/onadata/apps/api/viewsets/xform_viewset.py index a918c8c29c..5c75c71281 100644 --- a/onadata/apps/api/viewsets/xform_viewset.py +++ b/onadata/apps/api/viewsets/xform_viewset.py @@ -97,7 +97,6 @@ get_form_url, ) from onadata.settings.common import CSV_EXTENSION, XLS_EXTENSIONS -from onadata.libs.utils.async_status import get_active_tasks ENKETO_AUTH_COOKIE = getattr(settings, "ENKETO_AUTH_COOKIE", "__enketo") ENKETO_META_UID_COOKIE = getattr( diff --git a/onadata/libs/tests/utils/test_async_status.py b/onadata/libs/tests/utils/test_async_status.py index 66d327a910..301867c99d 100644 --- a/onadata/libs/tests/utils/test_async_status.py +++ b/onadata/libs/tests/utils/test_async_status.py @@ -10,6 +10,7 @@ from onadata.libs.utils import async_status from onadata.apps.logger.models.xform import XForm + class TestAsyncStatus(TestBase): def test_celery_state_to_status(self): @@ -50,7 +51,7 @@ def test_get_active_tasks(self): async_status.get_active_tasks( ['onadata.libs.utils.csv_import.submit_csv_async'], xform ), - '[]', + [], ) inspect = MagicMock() inspect.active = MagicMock( @@ -67,12 +68,10 @@ def test_get_active_tasks(self): ) app.control.inspect = MagicMock(return_value=inspect) - self.assertEqual( - async_status.get_active_tasks( - ['onadata.libs.utils.csv_import.submit_csv_async'], xform - ), - '[{"job_uuid": "11", "time_start"' - + ": \"" - + datetime.fromtimestamp(time_start).strftime("%Y-%m-%dT%H:%M:%S") - + '", "file": "/home/ona/import.csv", "overwrite": true}]', - ) + self.assertEqual(async_status.get_active_tasks( + ['onadata.libs.utils.csv_import.submit_csv_async'], + xform), + [{'job_uuid': '11', + 'time_start': datetime.fromtimestamp(time_start).strftime( + "%Y-%m-%dT%H:%M:%S"), + "file": "/home/ona/import.csv", "overwrite": True}]) diff --git a/onadata/libs/utils/async_status.py b/onadata/libs/utils/async_status.py index 79414eee00..1ced064621 100644 --- a/onadata/libs/utils/async_status.py +++ b/onadata/libs/utils/async_status.py @@ -1,7 +1,6 @@ """ Utilities for celery asyncronous tasks """ -import json from datetime import datetime from typing import List @@ -47,6 +46,17 @@ def async_status(status, error=None): return status +def format_task(task): + """Format a celery task element""" + return {"job_uuid": gettext(task["id"]), + "time_start": datetime.fromtimestamp(task["time_start"]).strftime( + "%Y-%m-%dT%H:%M:%S" + ), + "file": gettext(task["args"][2]), + "overwrite": task["args"][3], + } + + def get_active_tasks(task_names: List[str], xform: XForm): """Get active celery tasks""" inspect = app.control.inspect() @@ -61,16 +71,4 @@ def get_active_tasks(task_names: List[str], xform: XForm): ) ) - return list( - map( - lambda i: { - "job_uuid": gettext(i["id"]), - "time_start": datetime.fromtimestamp(i["time_start"]).strftime( - "%Y-%m-%dT%H:%M:%S" - ), - "file": gettext(i["args"][2]), - "overwrite": i["args"][3], - }, - data, - ) - ) \ No newline at end of file + return list(map(format_task, data))