Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/handle error deleting old resource for main #139

Merged
merged 11 commits into from
Dec 4, 2024
4 changes: 2 additions & 2 deletions bin/install_test_requirements.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ pip install -e /__w/ckanext-switzerland/ckanext-switzerland/
# Install ckanext dependencies
pip install -e git+https://github.com/ckan/[email protected]#egg=ckanext-dcat
pip install -r https://raw.githubusercontent.com/ckan/ckanext-dcat/v1.5.1/requirements.txt
pip install -e git+https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest.git#egg=ckanext-harvest
pip install -r https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest/-/raw/main/requirements.txt
pip install -e git+https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest.git@v1.0.2#egg=ckanext-harvest
pip install -r https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest/-/raw/v1.0.2/requirements.txt
pip install -e git+https://github.com/ckan/[email protected]#egg=ckanext-scheming
pip install -e git+https://github.com/ckan/ckanext-fluent.git#egg=ckanext-fluent
pip install -r https://raw.githubusercontent.com/ckan/ckanext-fluent/master/requirements.txt
Expand Down
130 changes: 82 additions & 48 deletions ckanext/switzerland/harvester/base_sbb_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

from ckanext.harvest.harvesters.base import HarvesterBase
from ckanext.switzerland.harvester.storage_adapter_factory import StorageAdapterFactory
from ckanext.switzerland.helpers import resource_filename

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -445,7 +444,7 @@ def fetch_stage(self, harvest_object):
)
return False

def _fetch_stage(self, harvest_object): # noqa
def _fetch_stage(self, harvest_object): # noqa: C901
"""
Fetching of resources. Runs once for each gathered resource.

Expand Down Expand Up @@ -603,7 +602,7 @@ def import_stage(self, harvest_object):
)
return False

def _import_stage(self, harvest_object): # noqa
def _import_stage(self, harvest_object): # noqa: C901
"""
Importing the fetched files into CKAN storage.
Runs once for each fetched resource.
Expand Down Expand Up @@ -939,15 +938,17 @@ def _import_stage(self, harvest_object): # noqa
if old_resource_id:
log.info("Deleting old resource: %s", old_resource_id)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": old_resource_id, "force": True}
)
self._fully_delete_resource(context, old_resource_meta)
except NotFound:
pass # Sometimes importing the data into the datastore fails

get_action("resource_delete")(context, {"id": old_resource_id})
self._save_object_error(
f"Error deleting old resource {old_resource_id} for "
f"filename {file_name}. This could be due to a failed "
f"connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

Session.commit()

Expand Down Expand Up @@ -995,8 +996,10 @@ def _get_ordered_resources(self, package):

return ordered_resources, unmatched_resources

def finalize(self, harvest_object, harvest_object_data):
context = {"model": model, "session": Session, "user": self._get_user_name()}
def finalize(self, harvest_object, harvest_object_data): # noqa: C901
# TODO: Simplify this method.
user_name = self._get_user_name()
context = {"model": model, "session": Session, "user": user_name}
stage = "Import"

log.info("Running finalizing tasks:")
Expand Down Expand Up @@ -1029,9 +1032,23 @@ def finalize(self, harvest_object, harvest_object_data):
log.exception(message)
self._save_object_error(message, harvest_object, "Import")

return True
return False

ordered_resources, unmatched_resources = self._get_ordered_resources(package)
try:
ordered_resources, unmatched_resources = self._get_ordered_resources(
package
)
except NotFound:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

# ----------------------------------------------------------------------------
# delete old resources
Expand All @@ -1048,9 +1065,16 @@ def finalize(self, harvest_object, harvest_object_data):

for resource in ordered_resources[max_resources:]:
try:
self._delete_version(
context, package, resource_filename(resource["url"])
)
# We need a new context each time: otherwise, if there is an
# exception deleting the resource, there will be auth data left in
# the context that won't get deleted. Then all subsequent calls to
# resource_delete will seem unauthorized and fail.
delete_context = {
"model": model,
"session": Session,
"user": user_name,
}
self._fully_delete_resource(delete_context, resource)
except Exception as e:
self._save_object_error(
"Error deleting resource {} in finalizing tasks: {}".format(
Expand Down Expand Up @@ -1080,15 +1104,27 @@ def finalize(self, harvest_object, harvest_object_data):
},
)

# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
try:
# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
except ValidationError:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

from ckanext.harvest.model import harvest_object_table

Expand All @@ -1114,27 +1150,25 @@ def finalize(self, harvest_object, harvest_object_data):

search.rebuild(package["id"])

def _delete_version(self, context, package, filename):
"""Fully delete the resource with the given filename"""
for resource in package["resources"]:
if resource_filename(resource["url"]) == filename:
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], filename
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)
def _fully_delete_resource(self, context, resource):
"""Fully delete a resource and its file."""
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], resource["url"]
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails
# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails

# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})
# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})
Loading