diff --git a/bin/install_test_requirements.sh b/bin/install_test_requirements.sh index 9b9d51441..57a92528a 100755 --- a/bin/install_test_requirements.sh +++ b/bin/install_test_requirements.sh @@ -8,8 +8,8 @@ pip install -e /__w/ckanext-switzerland/ckanext-switzerland/ # Install ckanext dependencies pip install -e git+https://github.com/ckan/ckanext-dcat.git@v1.5.1#egg=ckanext-dcat pip install -r https://raw.githubusercontent.com/ckan/ckanext-dcat/v1.5.1/requirements.txt -pip install -e git+https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest.git#egg=ckanext-harvest -pip install -r https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest/-/raw/main/requirements.txt +pip install -e git+https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest.git@v1.0.2#egg=ckanext-harvest +pip install -r https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest/-/raw/v1.0.2/requirements.txt pip install -e git+https://github.com/ckan/ckanext-scheming.git@release-3.0.0#egg=ckanext-scheming pip install -e git+https://github.com/ckan/ckanext-fluent.git#egg=ckanext-fluent pip install -r https://raw.githubusercontent.com/ckan/ckanext-fluent/master/requirements.txt diff --git a/ckanext/switzerland/harvester/base_sbb_harvester.py b/ckanext/switzerland/harvester/base_sbb_harvester.py index e36100c87..f50a9c2ad 100644 --- a/ckanext/switzerland/harvester/base_sbb_harvester.py +++ b/ckanext/switzerland/harvester/base_sbb_harvester.py @@ -38,7 +38,6 @@ from ckanext.harvest.harvesters.base import HarvesterBase from ckanext.switzerland.harvester.storage_adapter_factory import StorageAdapterFactory -from ckanext.switzerland.helpers import resource_filename log = logging.getLogger(__name__) @@ -445,7 +444,7 @@ def fetch_stage(self, harvest_object): ) return False - def _fetch_stage(self, harvest_object): # noqa + def _fetch_stage(self, harvest_object): # noqa: C901 """ Fetching of resources. Runs once for each gathered resource. @@ -603,7 +602,7 @@ def import_stage(self, harvest_object): ) return False - def _import_stage(self, harvest_object): # noqa + def _import_stage(self, harvest_object): # noqa: C901 """ Importing the fetched files into CKAN storage. Runs once for each fetched resource. @@ -939,15 +938,17 @@ def _import_stage(self, harvest_object): # noqa if old_resource_id: log.info("Deleting old resource: %s", old_resource_id) - # delete the datastore table try: - get_action("datastore_delete")( - context, {"resource_id": old_resource_id, "force": True} - ) + self._fully_delete_resource(context, old_resource_meta) except NotFound: - pass # Sometimes importing the data into the datastore fails - - get_action("resource_delete")(context, {"id": old_resource_id}) + self._save_object_error( + f"Error deleting old resource {old_resource_id} for " + f"filename {file_name}. This could be due to a failed " + f"connection to the database. " + f"{traceback.format_exc()}", + harvest_object, + stage, + ) Session.commit() @@ -995,8 +996,10 @@ def _get_ordered_resources(self, package): return ordered_resources, unmatched_resources - def finalize(self, harvest_object, harvest_object_data): - context = {"model": model, "session": Session, "user": self._get_user_name()} + def finalize(self, harvest_object, harvest_object_data): # noqa: C901 + # TODO: Simplify this method. + user_name = self._get_user_name() + context = {"model": model, "session": Session, "user": user_name} stage = "Import" log.info("Running finalizing tasks:") @@ -1029,9 +1032,23 @@ def finalize(self, harvest_object, harvest_object_data): log.exception(message) self._save_object_error(message, harvest_object, "Import") - return True + return False - ordered_resources, unmatched_resources = self._get_ordered_resources(package) + try: + ordered_resources, unmatched_resources = self._get_ordered_resources( + package + ) + except NotFound: + self._save_object_error( + f"Error reordering resources for dataset " + f"{harvest_object_data['dataset']}. " + f"This could be due to a failed connection to the database. " + f"{traceback.format_exc()}", + harvest_object, + stage, + ) + + return False # ---------------------------------------------------------------------------- # delete old resources @@ -1048,9 +1065,16 @@ def finalize(self, harvest_object, harvest_object_data): for resource in ordered_resources[max_resources:]: try: - self._delete_version( - context, package, resource_filename(resource["url"]) - ) + # We need a new context each time: otherwise, if there is an + # exception deleting the resource, there will be auth data left in + # the context that won't get deleted. Then all subsequent calls to + # resource_delete will seem unauthorized and fail. + delete_context = { + "model": model, + "session": Session, + "user": user_name, + } + self._fully_delete_resource(delete_context, resource) except Exception as e: self._save_object_error( "Error deleting resource {} in finalizing tasks: {}".format( @@ -1080,15 +1104,27 @@ def finalize(self, harvest_object, harvest_object_data): }, ) - # reorder resources - # not matched resources come first in the list, then the ordered - get_action("package_resource_reorder")( - context, - { - "id": package["id"], - "order": [r["id"] for r in unmatched_resources + ordered_resources], - }, - ) + try: + # reorder resources + # not matched resources come first in the list, then the ordered + get_action("package_resource_reorder")( + context, + { + "id": package["id"], + "order": [r["id"] for r in unmatched_resources + ordered_resources], + }, + ) + except ValidationError: + self._save_object_error( + f"Error reordering resources for dataset " + f"{harvest_object_data['dataset']}. " + f"This could be due to a failed connection to the database. " + f"{traceback.format_exc()}", + harvest_object, + stage, + ) + + return False from ckanext.harvest.model import harvest_object_table @@ -1114,27 +1150,25 @@ def finalize(self, harvest_object, harvest_object_data): search.rebuild(package["id"]) - def _delete_version(self, context, package, filename): - """Fully delete the resource with the given filename""" - for resource in package["resources"]: - if resource_filename(resource["url"]) == filename: - log.debug( - "Deleting resource {} with filename {}".format( - resource["id"], filename - ) - ) - # delete the file from the filestore - path = uploader.ResourceUpload(resource).get_path(resource["id"]) - if os.path.exists(path): - os.remove(path) + def _fully_delete_resource(self, context, resource): + """Fully delete a resource and its file.""" + log.debug( + "Deleting resource {} with filename {}".format( + resource["id"], resource["url"] + ) + ) + # delete the file from the filestore + path = uploader.ResourceUpload(resource).get_path(resource["id"]) + if os.path.exists(path): + os.remove(path) - # delete the datastore table - try: - get_action("datastore_delete")( - context, {"resource_id": resource["id"], "force": True} - ) - except NotFound: - pass # Sometimes importing the data into the datastore fails + # delete the datastore table + try: + get_action("datastore_delete")( + context, {"resource_id": resource["id"], "force": True} + ) + except NotFound: + pass # Sometimes importing the data into the datastore fails - # delete the resource itself - get_action("resource_delete")(context, {"id": resource["id"]}) + # delete the resource itself + get_action("resource_delete")(context, {"id": resource["id"]})