Skip to content

Commit

Permalink
Merge pull request #124 from dathere/replace-http-requests-with-actions
Browse files Browse the repository at this point in the history
Replace http requests with actions
  • Loading branch information
tino097 authored May 17, 2024
2 parents 90a4868 + b5aca7b commit e059b8a
Showing 1 changed file with 61 additions and 107 deletions.
168 changes: 61 additions & 107 deletions ckanext/datapusher_plus/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import ckanext.datapusher_plus.helpers as dph
from ckanext.datapusher_plus.config import config


if locale.getdefaultlocale()[0]:
lang, encoding = locale.getdefaultlocale()
locale.setlocale(locale.LC_ALL, locale=(lang, encoding))
Expand Down Expand Up @@ -103,70 +104,39 @@ def default(self, obj):
return json.JSONEncoder.default(self, obj)


def delete_datastore_resource(resource_id, api_key, ckan_url):
def delete_datastore_resource(resource_id):
try:
delete_url = get_url("datastore_delete", ckan_url)
response = requests.post(
delete_url,
verify=SSL_VERIFY,
data=json.dumps({"id": resource_id, "force": True}),
headers={"Content-Type": "application/json", "Authorization": api_key},
)
utils.check_response(
response,
delete_url,
"CKAN",
good_status=(201, 200, 404),
ignore_no_success=True,
)
except requests.exceptions.RequestException:
tk.get_action("datastore_delete")(
{"ignore_auth": True}, {"resource_id": resource_id, "force": True})
except tk.ObjectNotFound:
raise utils.JobError("Deleting existing datastore failed.")


def delete_resource(resource_id, api_key, ckan_url):
if not tk.user:
raise utils.JobError("No user found.")
try:
delete_url = get_url("resource_delete", ckan_url)
response = requests.post(
delete_url,
verify=SSL_VERIFY,
data=json.dumps({"id": resource_id, "force": True}),
headers={"Content-Type": "application/json", "Authorization": api_key},
)
utils.check_response(
response,
delete_url,
"CKAN",
good_status=(201, 200, 404),
ignore_no_success=True,
)
except requests.exceptions.RequestException:
tk.get_action("resource_delete")({"user": tk.user}, {"id": resource_id, "force": True})
except tk.ObjectNotFound:
raise utils.JobError("Deleting existing resource failed.")


def datastore_resource_exists(resource_id, api_key, ckan_url):
from ckanext.datapusher_plus.job_exceptions import HTTPError, JobError
from ckanext.datapusher_plus.job_exceptions import JobError

data_dict = {
"resource_id": resource_id,
"limit": 0,
"include_total": False,
}

context = {'ignore_auth': True }

try:
search_url = get_url("datastore_search", ckan_url)
response = requests.post(
search_url,
verify=SSL_VERIFY,
data=json.dumps({"id": resource_id, "limit": 0}),
headers={"Content-Type": "application/json", "Authorization": api_key},
)
if response.status_code == 404:
return False
elif response.status_code == 200:
return response.json().get("result", {"fields": []})
else:
raise HTTPError(
"Error getting datastore resource.",
response.status_code,
search_url,
response,
)
except requests.exceptions.RequestException as e:
raise JobError("Error getting datastore resource ({!s}).".format(e))
result = tk.get_action("datastore_search")(context, data_dict)
return result
except tk.ObjectNotFound:
return False


def send_resource_to_datastore(
Expand Down Expand Up @@ -202,60 +172,47 @@ def send_resource_to_datastore(
"aliases": aliases,
"calculate_record_count": calculate_record_count,
}
try:
resource_dict = tk.get_action("datastore_create")({"ignore_auth": True}, request)
return resource_dict
except Exception as e:
raise utils.JobError("Error sending data to datastore ({!s}).".format(e))

url = get_url("datastore_create", ckan_url)
r = requests.post(
url,
verify=SSL_VERIFY,
data=json.dumps(request, cls=DatastoreEncoder),
headers={"Content-Type": "application/json", "Authorization": api_key},
)
utils.check_response(r, url, "CKAN DataStore")
return r.json()


def update_resource(resource, ckan_url, api_key):
url = get_url("resource_update", ckan_url)
r = requests.post(
url,
verify=SSL_VERIFY,
data=json.dumps(resource),
headers={"Content-Type": "application/json", "Authorization": api_key},
)

utils.check_response(r, url, "CKAN")
def update_resource(resource):
"""
Updates resource metadata
"""
site_user = tk.get_action('get_site_user')({'ignore_auth': True}, {})
context = {
'ignore_auth': True,
'user': site_user['name'],
'auth_user_obj': None
}
try:
tk.get_action("resource_update")(context, resource)
except tk.ObjectNotFound:
raise utils.JobError("Updating existing resource failed.")


def get_resource(resource_id, ckan_url, api_key):
def get_resource(resource_id):
"""
Gets available information about the resource from CKAN
"""
url = get_url("resource_show", ckan_url)
r = requests.post(
url,
verify=SSL_VERIFY,
data=json.dumps({"id": resource_id}),
headers={"Content-Type": "application/json", "Authorization": api_key},
)
utils.check_response(r, url, "CKAN")
resource_dict = tk.get_action('resource_show')({'ignore_auth': True},
{'id': resource_id})

return r.json()["result"]
return resource_dict


def get_package(package_id, ckan_url, api_key):
def get_package(package_id):
"""
Gets available information about a package from CKAN
"""
url = get_url("package_show", ckan_url)
r = requests.post(
url,
verify=SSL_VERIFY,
data=json.dumps({"id": package_id}),
headers={"Content-Type": "application/json", "Authorization": api_key},
)
utils.check_response(r, url, "CKAN")
dataset_dict = tk.get_action('package_show')({'ignore_auth': True},
{'id': package_id})

return r.json()["result"]
return dataset_dict


def validate_input(input):
Expand Down Expand Up @@ -359,9 +316,9 @@ def push_to_datastore(input, task_id, dry_run=False):
with tempfile.TemporaryDirectory() as temp_dir:
return _push_to_datastore(task_id, input, dry_run=dry_run, temp_dir=temp_dir)


def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
#add job to dn (datapusher_plus_jobs table)
# add job to dn (datapusher_plus_jobs table)
try:
dph.add_pending_job(task_id, **input)
except sa.exc.IntegrityError:
Expand Down Expand Up @@ -395,8 +352,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
raise utils.JobError("qsv version check error: {}".format(e))
qsv_version_info = str(qsv_version.stdout)
qsv_semver = qsv_version_info[
qsv_version_info.find(" ") : qsv_version_info.find("-")
].lstrip()
qsv_version_info.find(" "): qsv_version_info.find("-")].lstrip()
try:
if semver.compare(qsv_semver, MINIMUM_QSV_VERSION) < 0:
raise utils.JobError(
Expand All @@ -416,11 +372,11 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
api_key = input.get("api_key")

try:
resource = get_resource(resource_id, ckan_url, api_key)
resource = get_resource(resource_id)
except utils.JobError:
# try again in 5 seconds just incase CKAN is slow at adding resource
time.sleep(5)
resource = get_resource(resource_id, ckan_url, api_key)
resource = get_resource(resource_id)

# check if the resource url_type is a datastore
if resource.get("url_type") == "datastore":
Expand Down Expand Up @@ -977,7 +933,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
res_id=resource_id
)
)
delete_datastore_resource(resource_id, api_key, ckan_url)
delete_datastore_resource(resource_id)

# 1st pass of building headers_dict
# here we map inferred types to postgresql data types
Expand Down Expand Up @@ -1263,7 +1219,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
if pii_alias_result:
existing_pii_alias_of = pii_alias_result[0]

delete_datastore_resource(existing_pii_alias_of, api_key, ckan_url)
delete_datastore_resource(existing_pii_alias_of)
delete_resource(existing_pii_alias_of, api_key, ckan_url)

pii_alias = [pii_resource_id]
Expand Down Expand Up @@ -1346,7 +1302,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
pii_resource["pii_preview"] = True
pii_resource["pii_of_resource"] = resource_id
pii_resource["total_record_count"] = pii_rows_with_matches
update_resource(pii_resource, ckan_url, api_key)
update_resource(pii_resource)

pii_msg = (
"{} PII candidate/s in {} row/s are available at {} for review".format(
Expand Down Expand Up @@ -1561,7 +1517,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
if stats_alias_result:
existing_stats_alias_of = stats_alias_result[0]

delete_datastore_resource(existing_stats_alias_of, api_key, ckan_url)
delete_datastore_resource(existing_stats_alias_of)
delete_resource(existing_stats_alias_of, api_key, ckan_url)

stats_aliases = [stats_resource_id]
Expand Down Expand Up @@ -1591,9 +1547,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
if result:
existing_stats_alias_of = result[0]

delete_datastore_resource(
existing_stats_alias_of, api_key, ckan_url
)
delete_datastore_resource(existing_stats_alias_of)
delete_resource(existing_stats_alias_of, api_key, ckan_url)

# run stats on stats CSV to get header names and infer data types
Expand Down Expand Up @@ -1669,7 +1623,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
stats_resource["id"] = new_stats_resource_id
stats_resource["summary_statistics"] = True
stats_resource["summary_of_resource"] = resource_id
update_resource(stats_resource, ckan_url, api_key)
update_resource(stats_resource)

cur.close()
raw_connection.commit()
Expand All @@ -1683,7 +1637,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None):
resource["preview"] = False
resource["preview_rows"] = None
resource["partial_download"] = False
update_resource(resource, ckan_url, api_key)
update_resource(resource)

# tell CKAN to calculate_record_count and set alias if set
send_resource_to_datastore(
Expand Down

0 comments on commit e059b8a

Please sign in to comment.