diff --git a/ckanext/datapusher_plus/jobs.py b/ckanext/datapusher_plus/jobs.py index 98af5ed..293c06f 100644 --- a/ckanext/datapusher_plus/jobs.py +++ b/ckanext/datapusher_plus/jobs.py @@ -53,6 +53,7 @@ import ckanext.datapusher_plus.helpers as dph from ckanext.datapusher_plus.config import config + if locale.getdefaultlocale()[0]: lang, encoding = locale.getdefaultlocale() locale.setlocale(locale.LC_ALL, locale=(lang, encoding)) @@ -103,70 +104,39 @@ def default(self, obj): return json.JSONEncoder.default(self, obj) -def delete_datastore_resource(resource_id, api_key, ckan_url): +def delete_datastore_resource(resource_id): try: - delete_url = get_url("datastore_delete", ckan_url) - response = requests.post( - delete_url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id, "force": True}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response( - response, - delete_url, - "CKAN", - good_status=(201, 200, 404), - ignore_no_success=True, - ) - except requests.exceptions.RequestException: + tk.get_action("datastore_delete")( + {"ignore_auth": True}, {"resource_id": resource_id, "force": True}) + except tk.ObjectNotFound: raise utils.JobError("Deleting existing datastore failed.") def delete_resource(resource_id, api_key, ckan_url): + if not tk.user: + raise utils.JobError("No user found.") try: - delete_url = get_url("resource_delete", ckan_url) - response = requests.post( - delete_url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id, "force": True}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response( - response, - delete_url, - "CKAN", - good_status=(201, 200, 404), - ignore_no_success=True, - ) - except requests.exceptions.RequestException: + tk.get_action("resource_delete")({"user": tk.user}, {"id": resource_id, "force": True}) + except tk.ObjectNotFound: raise utils.JobError("Deleting existing resource failed.") def datastore_resource_exists(resource_id, api_key, ckan_url): - from ckanext.datapusher_plus.job_exceptions import HTTPError, JobError + from ckanext.datapusher_plus.job_exceptions import JobError + + data_dict = { + "resource_id": resource_id, + "limit": 0, + "include_total": False, + } + + context = {'ignore_auth': True } try: - search_url = get_url("datastore_search", ckan_url) - response = requests.post( - search_url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id, "limit": 0}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - if response.status_code == 404: - return False - elif response.status_code == 200: - return response.json().get("result", {"fields": []}) - else: - raise HTTPError( - "Error getting datastore resource.", - response.status_code, - search_url, - response, - ) - except requests.exceptions.RequestException as e: - raise JobError("Error getting datastore resource ({!s}).".format(e)) + result = tk.get_action("datastore_search")(context, data_dict) + return result + except tk.ObjectNotFound: + return False def send_resource_to_datastore( @@ -202,60 +172,47 @@ def send_resource_to_datastore( "aliases": aliases, "calculate_record_count": calculate_record_count, } + try: + resource_dict = tk.get_action("datastore_create")({"ignore_auth": True}, request) + return resource_dict + except Exception as e: + raise utils.JobError("Error sending data to datastore ({!s}).".format(e)) - url = get_url("datastore_create", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps(request, cls=DatastoreEncoder), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response(r, url, "CKAN DataStore") - return r.json() - - -def update_resource(resource, ckan_url, api_key): - url = get_url("resource_update", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps(resource), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response(r, url, "CKAN") +def update_resource(resource): + """ + Updates resource metadata + """ + site_user = tk.get_action('get_site_user')({'ignore_auth': True}, {}) + context = { + 'ignore_auth': True, + 'user': site_user['name'], + 'auth_user_obj': None + } + try: + tk.get_action("resource_update")(context, resource) + except tk.ObjectNotFound: + raise utils.JobError("Updating existing resource failed.") -def get_resource(resource_id, ckan_url, api_key): +def get_resource(resource_id): """ Gets available information about the resource from CKAN """ - url = get_url("resource_show", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps({"id": resource_id}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response(r, url, "CKAN") + resource_dict = tk.get_action('resource_show')({'ignore_auth': True}, + {'id': resource_id}) - return r.json()["result"] + return resource_dict -def get_package(package_id, ckan_url, api_key): +def get_package(package_id): """ Gets available information about a package from CKAN """ - url = get_url("package_show", ckan_url) - r = requests.post( - url, - verify=SSL_VERIFY, - data=json.dumps({"id": package_id}), - headers={"Content-Type": "application/json", "Authorization": api_key}, - ) - utils.check_response(r, url, "CKAN") + dataset_dict = tk.get_action('package_show')({'ignore_auth': True}, + {'id': package_id}) - return r.json()["result"] + return dataset_dict def validate_input(input): @@ -359,9 +316,9 @@ def push_to_datastore(input, task_id, dry_run=False): with tempfile.TemporaryDirectory() as temp_dir: return _push_to_datastore(task_id, input, dry_run=dry_run, temp_dir=temp_dir) - + def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): - #add job to dn (datapusher_plus_jobs table) + # add job to dn (datapusher_plus_jobs table) try: dph.add_pending_job(task_id, **input) except sa.exc.IntegrityError: @@ -395,8 +352,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): raise utils.JobError("qsv version check error: {}".format(e)) qsv_version_info = str(qsv_version.stdout) qsv_semver = qsv_version_info[ - qsv_version_info.find(" ") : qsv_version_info.find("-") - ].lstrip() + qsv_version_info.find(" "): qsv_version_info.find("-")].lstrip() try: if semver.compare(qsv_semver, MINIMUM_QSV_VERSION) < 0: raise utils.JobError( @@ -416,11 +372,11 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): api_key = input.get("api_key") try: - resource = get_resource(resource_id, ckan_url, api_key) + resource = get_resource(resource_id) except utils.JobError: # try again in 5 seconds just incase CKAN is slow at adding resource time.sleep(5) - resource = get_resource(resource_id, ckan_url, api_key) + resource = get_resource(resource_id) # check if the resource url_type is a datastore if resource.get("url_type") == "datastore": @@ -977,7 +933,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): res_id=resource_id ) ) - delete_datastore_resource(resource_id, api_key, ckan_url) + delete_datastore_resource(resource_id) # 1st pass of building headers_dict # here we map inferred types to postgresql data types @@ -1263,7 +1219,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): if pii_alias_result: existing_pii_alias_of = pii_alias_result[0] - delete_datastore_resource(existing_pii_alias_of, api_key, ckan_url) + delete_datastore_resource(existing_pii_alias_of) delete_resource(existing_pii_alias_of, api_key, ckan_url) pii_alias = [pii_resource_id] @@ -1346,7 +1302,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): pii_resource["pii_preview"] = True pii_resource["pii_of_resource"] = resource_id pii_resource["total_record_count"] = pii_rows_with_matches - update_resource(pii_resource, ckan_url, api_key) + update_resource(pii_resource) pii_msg = ( "{} PII candidate/s in {} row/s are available at {} for review".format( @@ -1561,7 +1517,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): if stats_alias_result: existing_stats_alias_of = stats_alias_result[0] - delete_datastore_resource(existing_stats_alias_of, api_key, ckan_url) + delete_datastore_resource(existing_stats_alias_of) delete_resource(existing_stats_alias_of, api_key, ckan_url) stats_aliases = [stats_resource_id] @@ -1591,9 +1547,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): if result: existing_stats_alias_of = result[0] - delete_datastore_resource( - existing_stats_alias_of, api_key, ckan_url - ) + delete_datastore_resource(existing_stats_alias_of) delete_resource(existing_stats_alias_of, api_key, ckan_url) # run stats on stats CSV to get header names and infer data types @@ -1669,7 +1623,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): stats_resource["id"] = new_stats_resource_id stats_resource["summary_statistics"] = True stats_resource["summary_of_resource"] = resource_id - update_resource(stats_resource, ckan_url, api_key) + update_resource(stats_resource) cur.close() raw_connection.commit() @@ -1683,7 +1637,7 @@ def _push_to_datastore(task_id, input, dry_run=False, temp_dir=None): resource["preview"] = False resource["preview_rows"] = None resource["partial_download"] = False - update_resource(resource, ckan_url, api_key) + update_resource(resource) # tell CKAN to calculate_record_count and set alias if set send_resource_to_datastore(