From d340d8789b573c77844bb7f073ccb68eafcbf7ed Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Wed, 23 Aug 2023 06:28:10 +0100 Subject: [PATCH 01/24] add new parameter databus_base_url --- cmem_plugin_databus/loader.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index a7b1db2..655e962 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -23,16 +23,21 @@ """, parameters=[ PluginParameter( - name="target_graph", - label="Target Graph", - description="Graph name to save the response from the Databus.", - param_type=DatasetParameterType(dataset_type="eccencaDataPlatform"), + name="databus_base_url", + label="Databus Base URL", + description="The URL of the Databus server", ), PluginParameter( name="databus_file_id", label="Databus File ID", description="The Databus file id of the file to download", ), + PluginParameter( + name="target_graph", + label="Target Graph", + description="Graph name to save the response from the Databus.", + param_type=DatasetParameterType(dataset_type="eccencaDataPlatform"), + ), PluginParameter( name="chunk_size", label="Chunk Size", @@ -46,8 +51,9 @@ class SimpleDatabusLoadingPlugin(WorkflowPlugin): """Implementation of loading one file from the Databus into a given dataset""" def __init__( - self, databus_file_id: str, target_graph: str, chunk_size: int + self, databus_base_url, databus_file_id: str, target_graph: str, chunk_size: int ) -> None: + self.databus_url = databus_base_url self.databus_file_id = databus_file_id self.target_graph = target_graph self.chunk_size = chunk_size From a78167e4b41668a77a305aa7c0529c2f40d12438 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 28 Aug 2023 07:00:48 +0100 Subject: [PATCH 02/24] add new parameter DatabusSearch type --- cmem_plugin_databus/loader.py | 64 ++++++++++++++++++++++++++++++++--- cmem_plugin_databus/utils.py | 11 +++--- 2 files changed, 67 insertions(+), 8 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 655e962..d98acba 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -1,18 +1,61 @@ """Plugin for loading one file from the databus and write it ino a dataset""" +from typing import Any import requests from cmem.cmempy.workspace.tasks import get_task -from cmem_plugin_base.dataintegration.context import ExecutionContext, ExecutionReport +from cmem_plugin_base.dataintegration.context import ExecutionContext, ExecutionReport, \ + PluginContext from cmem_plugin_base.dataintegration.description import Plugin, PluginParameter from cmem_plugin_base.dataintegration.parameter.dataset import DatasetParameterType from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin +from cmem_plugin_base.dataintegration.types import StringParameterType, Autocompletion from cmem_plugin_base.dataintegration.utils import setup_cmempy_super_user_access +from cmem_plugin_databus.cmem_wrappers import post_streamed_bytes from cmem_plugin_databus.utils import ( byte_iterator_context_update, - get_clock, + get_clock, fetch_api_search_result, ) -from cmem_plugin_databus.cmem_wrappers import post_streamed_bytes + + +class DatabusSearch(StringParameterType): + """Databus Search Type""" + + autocompletion_depends_on_parameters: list[str] = ["databus_base_url"] + + # auto complete for values + allow_only_autocompleted_values: bool = True + # auto complete for labels + autocomplete_value_with_labels: bool = False + + def autocomplete( + self, + query_terms: list[str], + depend_on_parameter_values: list[Any], + context: PluginContext, + ) -> list[Autocompletion]: + + if not query_terms: + label = "Search for Databus artifacts" + return [Autocompletion(value="", label=f"{label}")] + + databus_base_url = depend_on_parameter_values[0] + result = fetch_api_search_result( + databus_base=databus_base_url, + url_parameters={ + "query": " ".join(query_terms), + "typeNameWeight": 0, + "minRelevance": 20, + "maxResults": 25, + "typeName": " Artifact" + } + ) + return [ + Autocompletion( + value=f"{_.resource}", + label=f"{_.label}", + )for _ in result + ] @Plugin( @@ -27,6 +70,14 @@ label="Databus Base URL", description="The URL of the Databus server", ), + PluginParameter( + name="databus_document", + label="Databus Document", + description="The name of databus artifact", + param_type=DatabusSearch(), + default_value="" + ), + PluginParameter( name="databus_file_id", label="Databus File ID", @@ -51,7 +102,12 @@ class SimpleDatabusLoadingPlugin(WorkflowPlugin): """Implementation of loading one file from the Databus into a given dataset""" def __init__( - self, databus_base_url, databus_file_id: str, target_graph: str, chunk_size: int + self, + databus_base_url: str, + databus_document: str, + databus_file_id: str, + target_graph: str, + chunk_size: int ) -> None: self.databus_url = databus_base_url self.databus_file_id = databus_file_id diff --git a/cmem_plugin_databus/utils.py b/cmem_plugin_databus/utils.py index b0d6327..a5b6aef 100644 --- a/cmem_plugin_databus/utils.py +++ b/cmem_plugin_databus/utils.py @@ -1,7 +1,7 @@ """Utils for handling the DBpedia Databus""" from dataclasses import dataclass from typing import Dict, Iterator, List, Optional, Any -from urllib.parse import quote +from urllib.parse import quote, urlencode import requests from cmem_plugin_base.dataintegration.context import ( @@ -72,12 +72,15 @@ def result_from_json_dict(json_dict: Dict[str, List[str]]) -> DatabusSearchResul def fetch_api_search_result( - databus_base: str, query_str: str + databus_base: str, + url_parameters: dict = None ) -> List[DatabusSearchResult]: """Fetches Search Results.""" - encoded_query_str = quote(query_str) + encoded_query_str = "" + if url_parameters: + encoded_query_str = urlencode(url_parameters) - request_uri = f"{databus_base}/api/search?query={encoded_query_str}" + request_uri = f"{databus_base}/api/search?{encoded_query_str}" json_resp = requests.get(request_uri, timeout=30).json() From 898d48cd6c7d1d6e994000856609d1b42ec0a2ed Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 28 Aug 2023 07:26:09 +0100 Subject: [PATCH 03/24] add FacetSearch parameter type --- cmem_plugin_databus/loader.py | 53 +++++++++++++++++++++++++++++++++-- cmem_plugin_databus/utils.py | 21 ++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index d98acba..1e0918e 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -14,7 +14,7 @@ from cmem_plugin_databus.cmem_wrappers import post_streamed_bytes from cmem_plugin_databus.utils import ( byte_iterator_context_update, - get_clock, fetch_api_search_result, + get_clock, fetch_api_search_result, fetch_facets_options, ) @@ -58,6 +58,48 @@ def autocomplete( ] +class FacetSearch(StringParameterType): + """Facet Type""" + + autocompletion_depends_on_parameters: list[str] = [ + "databus_base_url", + "databus_document" + ] + + # auto complete for values + allow_only_autocompleted_values: bool = True + # auto complete for labels + autocomplete_value_with_labels: bool = False + # + + def __init__(self, facet_option: str): + self.facet_option = facet_option + + def autocomplete( + self, + query_terms: list[str], + depend_on_parameter_values: list[Any], + context: PluginContext, + ) -> list[Autocompletion]: + + databus_base_url = depend_on_parameter_values[0] + databus_document = depend_on_parameter_values[1] + result = fetch_facets_options( + databus_base=databus_base_url, + url_parameters={ + "type": "artifact", + "uri": databus_document + } + ) + _format = result[self.facet_option] + return [ + Autocompletion( + value=f"{_}", + label=f"{_}", + )for _ in _format + ] + + @Plugin( label="Simple Databus Loading Plugin", description="Loads a specfic file from the Databus to a local directory", @@ -77,7 +119,13 @@ def autocomplete( param_type=DatabusSearch(), default_value="" ), - + PluginParameter( + name="artifact_format", + label="Format", + description="The format of databus artifact", + param_type=FacetSearch(facet_option="format"), + default_value="" + ), PluginParameter( name="databus_file_id", label="Databus File ID", @@ -105,6 +153,7 @@ def __init__( self, databus_base_url: str, databus_document: str, + artifact_format: str, databus_file_id: str, target_graph: str, chunk_size: int diff --git a/cmem_plugin_databus/utils.py b/cmem_plugin_databus/utils.py index a5b6aef..3addaa7 100644 --- a/cmem_plugin_databus/utils.py +++ b/cmem_plugin_databus/utils.py @@ -365,3 +365,24 @@ def byte_iterator_context_update( ) ) yield chunk + + +def fetch_facets_options( + databus_base: str, + url_parameters: dict = None +): + encoded_query_str = "" + if url_parameters: + encoded_query_str = urlencode(url_parameters) + headers = { + "Content-Type": "application/json" + } + request_uri = f"{databus_base}/app/utils/facets?{encoded_query_str}" + json_resp = requests.get(request_uri, headers=headers, timeout=30).json() + + result = { + "version": json_resp["http://purl.org/dc/terms/hasVersion"]["values"], + "format": json_resp["https://dataid.dbpedia.org/databus#formatExtension"]["values"] + } + + return result From b72b6e2cf2d9adcda8a62136eb1849a60f8306ec Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Wed, 30 Aug 2023 05:46:57 +0100 Subject: [PATCH 04/24] add parameter artifact version --- cmem_plugin_databus/loader.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 1e0918e..c331e1a 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -126,6 +126,13 @@ def autocomplete( param_type=FacetSearch(facet_option="format"), default_value="" ), + PluginParameter( + name="artifact_version", + label="Version", + description="The version of databus artifact", + param_type=FacetSearch(facet_option="version"), + default_value="" + ), PluginParameter( name="databus_file_id", label="Databus File ID", @@ -154,6 +161,7 @@ def __init__( databus_base_url: str, databus_document: str, artifact_format: str, + artifact_version: str, databus_file_id: str, target_graph: str, chunk_size: int From a9c67fd22b1db485dbab45df19c4561a77467fc6 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Wed, 30 Aug 2023 07:07:35 +0100 Subject: [PATCH 05/24] filter databus file based on format and version --- cmem_plugin_databus/loader.py | 40 +++++++++++++++++++++++++- cmem_plugin_databus/utils.py | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index c331e1a..7a7e8a2 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -14,7 +14,7 @@ from cmem_plugin_databus.cmem_wrappers import post_streamed_bytes from cmem_plugin_databus.utils import ( byte_iterator_context_update, - get_clock, fetch_api_search_result, fetch_facets_options, + get_clock, fetch_api_search_result, fetch_facets_options, fetch_databus_files, ) @@ -100,6 +100,43 @@ def autocomplete( ] +class DatabusFile(StringParameterType): + autocompletion_depends_on_parameters: list[str] = [ + "databus_base_url", + "databus_document", + "artifact_format", + "artifact_version" + ] + + # auto complete for values + allow_only_autocompleted_values: bool = True + # auto complete for labels + autocomplete_value_with_labels: bool = False + + def autocomplete( + self, + query_terms: list[str], + depend_on_parameter_values: list[Any], + context: PluginContext, + ) -> list[Autocompletion]: + databus_base_url = depend_on_parameter_values[0] + databus_document = depend_on_parameter_values[1] + artifact_format = depend_on_parameter_values[2] + artifact_version = depend_on_parameter_values[3] + result = fetch_databus_files( + endpoint=databus_base_url, + artifact=databus_document, + version=artifact_version, + file_format=artifact_format + ) + return [ + Autocompletion( + value=f"{_['file']['value']}", + label=f"{_['version']['value']}:{_['variant']['value']}:{_['format']['value']}:{_['compression']['value']}", + ) for _ in result + ] + + @Plugin( label="Simple Databus Loading Plugin", description="Loads a specfic file from the Databus to a local directory", @@ -137,6 +174,7 @@ def autocomplete( name="databus_file_id", label="Databus File ID", description="The Databus file id of the file to download", + param_type=DatabusFile() ), PluginParameter( name="target_graph", diff --git a/cmem_plugin_databus/utils.py b/cmem_plugin_databus/utils.py index 3addaa7..948a124 100644 --- a/cmem_plugin_databus/utils.py +++ b/cmem_plugin_databus/utils.py @@ -1,4 +1,5 @@ """Utils for handling the DBpedia Databus""" +import json from dataclasses import dataclass from typing import Dict, Iterator, List, Optional, Any from urllib.parse import quote, urlencode @@ -386,3 +387,55 @@ def fetch_facets_options( } return result + + +def fetch_databus_files(endpoint: str, artifact: str, version: str, file_format: str): + query = f"""PREFIX rdfs: +PREFIX rdf: +PREFIX dcat: +PREFIX dct: +PREFIX dcv: +PREFIX databus: +SELECT DISTINCT ?file ?version ?artifact ?license ?size ?format ?compression (GROUP_CONCAT(DISTINCT ?var; SEPARATOR=', ') AS ?variant) ?preview WHERE +{{ + GRAPH ?g + {{ + ?dataset databus:artifact <{artifact}> . + {{ + ?distribution dct:hasVersion ?version {{ + SELECT (?v as ?version) {{ + GRAPH ?g2 {{ + ?dataset databus:artifact <{artifact}> . + ?dataset dct:hasVersion ?v . + }} + }} ORDER BY DESC (STR(?version)) LIMIT 1 + }} + }} + UNION + {{ ?distribution '{version}' . }} + {{ + ?distribution ?c0 . + VALUES ?c0 {{ + '{file_format}' + }} + }} + ?dataset dcat:distribution ?distribution . + ?distribution databus:file ?file . + ?distribution databus:formatExtension ?format . + ?distribution databus:compression ?compression . + ?dataset dct:license ?license . + ?dataset dct:hasVersion ?version . + ?dataset databus:artifact ?artifact . + OPTIONAL {{ ?distribution ?p ?var. ?p rdfs:subPropertyOf databus:contentVariant . }} + OPTIONAL {{ ?distribution dcat:byteSize ?size . }} + }} +}} +GROUP BY ?file ?version ?artifact ?license ?size ?format ?compression ?preview""" + + endpoint = endpoint+"/sparql" + sparql_service = SPARQLWrapper(endpoint) + sparql_service.setQuery(query) + sparql_service.setReturnFormat(JSON) + + query_results = sparql_service.query().convert() + return query_results["results"]["bindings"] From edaae531e6a57df826b37934692ba0389bc338cf Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Wed, 30 Aug 2023 07:37:32 +0100 Subject: [PATCH 06/24] fix lint issues --- cmem_plugin_databus/loader.py | 18 +++++++++++++++--- cmem_plugin_databus/utils.py | 34 +++++++++++++++++++++------------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 7a7e8a2..e4e356f 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -3,8 +3,11 @@ import requests from cmem.cmempy.workspace.tasks import get_task -from cmem_plugin_base.dataintegration.context import ExecutionContext, ExecutionReport, \ +from cmem_plugin_base.dataintegration.context import ( + ExecutionContext, + ExecutionReport, PluginContext +) from cmem_plugin_base.dataintegration.description import Plugin, PluginParameter from cmem_plugin_base.dataintegration.parameter.dataset import DatasetParameterType from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin @@ -26,7 +29,7 @@ class DatabusSearch(StringParameterType): # auto complete for values allow_only_autocompleted_values: bool = True # auto complete for labels - autocomplete_value_with_labels: bool = False + autocomplete_value_with_labels: bool = True def autocomplete( self, @@ -101,6 +104,7 @@ def autocomplete( class DatabusFile(StringParameterType): + """Class for DatabusFile""" autocompletion_depends_on_parameters: list[str] = [ "databus_base_url", "databus_document", @@ -132,7 +136,10 @@ def autocomplete( return [ Autocompletion( value=f"{_['file']['value']}", - label=f"{_['version']['value']}:{_['variant']['value']}:{_['format']['value']}:{_['compression']['value']}", + label=f"{_['version']['value']}:" + f"{_['variant']['value']}:" + f"{_['format']['value']}:" + f"{_['compression']['value']}", ) for _ in result ] @@ -194,6 +201,7 @@ def autocomplete( class SimpleDatabusLoadingPlugin(WorkflowPlugin): """Implementation of loading one file from the Databus into a given dataset""" + # pylint: disable=too-many-arguments def __init__( self, databus_base_url: str, @@ -208,6 +216,10 @@ def __init__( self.databus_file_id = databus_file_id self.target_graph = target_graph self.chunk_size = chunk_size + # to get rid of unused-argument + _ = databus_document + _ = artifact_format + _ = artifact_version def __get_graph_uri(self, context: ExecutionContext): task_info = get_task(project=context.task.project_id(), task=self.target_graph) diff --git a/cmem_plugin_databus/utils.py b/cmem_plugin_databus/utils.py index 948a124..748bfa6 100644 --- a/cmem_plugin_databus/utils.py +++ b/cmem_plugin_databus/utils.py @@ -1,8 +1,7 @@ """Utils for handling the DBpedia Databus""" -import json from dataclasses import dataclass from typing import Dict, Iterator, List, Optional, Any -from urllib.parse import quote, urlencode +from urllib.parse import urlencode import requests from cmem_plugin_base.dataintegration.context import ( @@ -74,7 +73,7 @@ def result_from_json_dict(json_dict: Dict[str, List[str]]) -> DatabusSearchResul def fetch_api_search_result( databus_base: str, - url_parameters: dict = None + url_parameters: Optional[dict] = None ) -> List[DatabusSearchResult]: """Fetches Search Results.""" encoded_query_str = "" @@ -370,8 +369,9 @@ def byte_iterator_context_update( def fetch_facets_options( databus_base: str, - url_parameters: dict = None + url_parameters: Optional[dict] = None ): + """Fetch facet options for a given document""" encoded_query_str = "" if url_parameters: encoded_query_str = urlencode(url_parameters) @@ -383,32 +383,37 @@ def fetch_facets_options( result = { "version": json_resp["http://purl.org/dc/terms/hasVersion"]["values"], - "format": json_resp["https://dataid.dbpedia.org/databus#formatExtension"]["values"] + "format": + json_resp["https://dataid.dbpedia.org/databus#formatExtension"]["values"] } return result def fetch_databus_files(endpoint: str, artifact: str, version: str, file_format: str): + """fetch databus file name based of artifact, version and format on a given + databus instance""" + query = f"""PREFIX rdfs: PREFIX rdf: PREFIX dcat: PREFIX dct: PREFIX dcv: PREFIX databus: -SELECT DISTINCT ?file ?version ?artifact ?license ?size ?format ?compression (GROUP_CONCAT(DISTINCT ?var; SEPARATOR=', ') AS ?variant) ?preview WHERE +SELECT DISTINCT ?file ?version ?artifact ?license ?size ?format ?compression + (GROUP_CONCAT(DISTINCT ?var; SEPARATOR=', ') AS ?variant) ?preview WHERE {{ GRAPH ?g {{ ?dataset databus:artifact <{artifact}> . {{ ?distribution dct:hasVersion ?version {{ - SELECT (?v as ?version) {{ - GRAPH ?g2 {{ - ?dataset databus:artifact <{artifact}> . - ?dataset dct:hasVersion ?v . + SELECT (?v as ?version) {{ + GRAPH ?g2 {{ + ?dataset databus:artifact <{artifact}> . + ?dataset dct:hasVersion ?v . }} - }} ORDER BY DESC (STR(?version)) LIMIT 1 + }} ORDER BY DESC (STR(?version)) LIMIT 1 }} }} UNION @@ -426,7 +431,8 @@ def fetch_databus_files(endpoint: str, artifact: str, version: str, file_format: ?dataset dct:license ?license . ?dataset dct:hasVersion ?version . ?dataset databus:artifact ?artifact . - OPTIONAL {{ ?distribution ?p ?var. ?p rdfs:subPropertyOf databus:contentVariant . }} + OPTIONAL + {{ ?distribution ?p ?var. ?p rdfs:subPropertyOf databus:contentVariant . }} OPTIONAL {{ ?distribution dcat:byteSize ?size . }} }} }} @@ -438,4 +444,6 @@ def fetch_databus_files(endpoint: str, artifact: str, version: str, file_format: sparql_service.setReturnFormat(JSON) query_results = sparql_service.query().convert() - return query_results["results"]["bindings"] + # just to make mypy stop complaining + assert isinstance(query_results, dict) # nosec + return query_results["results"]["bindings"] # nosec From 3bccaa76cebd6da8da4588d17a722a5b79721613 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Wed, 30 Aug 2023 08:03:07 +0100 Subject: [PATCH 07/24] rename parameter name to databus artifact --- cmem_plugin_databus/loader.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index e4e356f..0daa4e9 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -66,7 +66,7 @@ class FacetSearch(StringParameterType): autocompletion_depends_on_parameters: list[str] = [ "databus_base_url", - "databus_document" + "databus_artifact" ] # auto complete for values @@ -107,7 +107,7 @@ class DatabusFile(StringParameterType): """Class for DatabusFile""" autocompletion_depends_on_parameters: list[str] = [ "databus_base_url", - "databus_document", + "databus_artifact", "artifact_format", "artifact_version" ] @@ -154,11 +154,11 @@ def autocomplete( PluginParameter( name="databus_base_url", label="Databus Base URL", - description="The URL of the Databus server", + description="The URL of the databus server", ), PluginParameter( - name="databus_document", - label="Databus Document", + name="databus_artifact", + label="Artifact", description="The name of databus artifact", param_type=DatabusSearch(), default_value="" @@ -205,7 +205,7 @@ class SimpleDatabusLoadingPlugin(WorkflowPlugin): def __init__( self, databus_base_url: str, - databus_document: str, + databus_artifact: str, artifact_format: str, artifact_version: str, databus_file_id: str, @@ -217,7 +217,7 @@ def __init__( self.target_graph = target_graph self.chunk_size = chunk_size # to get rid of unused-argument - _ = databus_document + _ = databus_artifact _ = artifact_format _ = artifact_version From 9ec68fe0d95e365ebd38f41c3bffbfdf1d4e38b6 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Wed, 30 Aug 2023 08:14:59 +0100 Subject: [PATCH 08/24] switch to user context instead of super user context --- cmem_plugin_databus/loader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 0daa4e9..32fb68d 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -12,7 +12,7 @@ from cmem_plugin_base.dataintegration.parameter.dataset import DatasetParameterType from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin from cmem_plugin_base.dataintegration.types import StringParameterType, Autocompletion -from cmem_plugin_base.dataintegration.utils import setup_cmempy_super_user_access +from cmem_plugin_base.dataintegration.utils import setup_cmempy_user_access from cmem_plugin_databus.cmem_wrappers import post_streamed_bytes from cmem_plugin_databus.utils import ( @@ -228,7 +228,7 @@ def __get_graph_uri(self, context: ExecutionContext): def execute( self, inputs=(), context: ExecutionContext = ExecutionContext() ) -> None: - setup_cmempy_super_user_access() + setup_cmempy_user_access(context.user) self.log.info(f"Loading file from {self.databus_file_id}") data: bytearray = bytearray() From 28fe6564bb5aeb1140c771ced63bd3399b004c70 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Fri, 1 Sep 2023 07:02:01 +0100 Subject: [PATCH 09/24] update query to remove filtering of file from latest version --- cmem_plugin_databus/utils.py | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/cmem_plugin_databus/utils.py b/cmem_plugin_databus/utils.py index 748bfa6..d92695f 100644 --- a/cmem_plugin_databus/utils.py +++ b/cmem_plugin_databus/utils.py @@ -393,7 +393,6 @@ def fetch_facets_options( def fetch_databus_files(endpoint: str, artifact: str, version: str, file_format: str): """fetch databus file name based of artifact, version and format on a given databus instance""" - query = f"""PREFIX rdfs: PREFIX rdf: PREFIX dcat: @@ -401,29 +400,14 @@ def fetch_databus_files(endpoint: str, artifact: str, version: str, file_format: PREFIX dcv: PREFIX databus: SELECT DISTINCT ?file ?version ?artifact ?license ?size ?format ?compression - (GROUP_CONCAT(DISTINCT ?var; SEPARATOR=', ') AS ?variant) ?preview WHERE + (GROUP_CONCAT(DISTINCT ?var; SEPARATOR=', ') AS ?variant) ?preview WHERE {{ GRAPH ?g {{ ?dataset databus:artifact <{artifact}> . - {{ - ?distribution dct:hasVersion ?version {{ - SELECT (?v as ?version) {{ - GRAPH ?g2 {{ - ?dataset databus:artifact <{artifact}> . - ?dataset dct:hasVersion ?v . - }} - }} ORDER BY DESC (STR(?version)) LIMIT 1 - }} - }} - UNION {{ ?distribution '{version}' . }} - {{ - ?distribution ?c0 . - VALUES ?c0 {{ - '{file_format}' - }} - }} + {{ ?distribution + '{file_format}' . }} ?dataset dcat:distribution ?distribution . ?distribution databus:file ?file . ?distribution databus:formatExtension ?format . @@ -432,7 +416,7 @@ def fetch_databus_files(endpoint: str, artifact: str, version: str, file_format: ?dataset dct:hasVersion ?version . ?dataset databus:artifact ?artifact . OPTIONAL - {{ ?distribution ?p ?var. ?p rdfs:subPropertyOf databus:contentVariant . }} + {{ ?distribution ?p ?var. ?p rdfs:subPropertyOf databus:contentVariant . }} OPTIONAL {{ ?distribution dcat:byteSize ?size . }} }} }} From be5270648a71412ce6a8766edf693b6f00571a66 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Sun, 3 Sep 2023 06:11:28 +0100 Subject: [PATCH 10/24] add new resource parameter type --- cmem_plugin_databus/loader.py | 45 ++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 32fb68d..f599641 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -1,7 +1,9 @@ """Plugin for loading one file from the databus and write it ino a dataset""" -from typing import Any +from typing import Any, Optional import requests +from cmem.cmempy.workspace.projects.resources import get_resources +from cmem.cmempy.workspace.projects.resources.resource import create_resource from cmem.cmempy.workspace.tasks import get_task from cmem_plugin_base.dataintegration.context import ( ExecutionContext, @@ -103,6 +105,33 @@ def autocomplete( ] +class ResourceParameterType(StringParameterType): + allow_only_autocompleted_values: bool = True + + autocomplete_value_with_labels: bool = True + + file_type: Optional[str] = None + + def __init__(self, file_type: Optional[str] = None): + """Dataset parameter type.""" + self.file_type = file_type + + def autocomplete( + self, + query_terms: list[str], + depend_on_parameter_values: list[Any], + context: PluginContext, + ) -> list[Autocompletion]: + setup_cmempy_user_access(context.user) + resources = get_resources(context.project_id) + return [ + Autocompletion( + value=f"{_['fullPath']}", + label=f"{_['name']}", + ) for _ in resources + ] + + class DatabusFile(StringParameterType): """Class for DatabusFile""" autocompletion_depends_on_parameters: list[str] = [ @@ -184,10 +213,10 @@ def autocomplete( param_type=DatabusFile() ), PluginParameter( - name="target_graph", - label="Target Graph", - description="Graph name to save the response from the Databus.", - param_type=DatasetParameterType(dataset_type="eccencaDataPlatform"), + name="target_file", + label="File", + description="File name to save the response from the Databus.", + param_type=ResourceParameterType(), ), PluginParameter( name="chunk_size", @@ -209,12 +238,12 @@ def __init__( artifact_format: str, artifact_version: str, databus_file_id: str, - target_graph: str, + target_file: str, chunk_size: int ) -> None: self.databus_url = databus_base_url self.databus_file_id = databus_file_id - self.target_graph = target_graph + self.target_file = target_file self.chunk_size = chunk_size # to get rid of unused-argument _ = databus_artifact @@ -222,7 +251,7 @@ def __init__( _ = artifact_version def __get_graph_uri(self, context: ExecutionContext): - task_info = get_task(project=context.task.project_id(), task=self.target_graph) + task_info = get_task(project=context.task.project_id(), task=self.target_file) return task_info["data"]["parameters"]["graph"]["value"] def execute( From aa95baa23c5ce4eba8c7d609d6ddbc1d513b47b5 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Sun, 3 Sep 2023 07:02:32 +0100 Subject: [PATCH 11/24] use create resource to upload databus file to project --- cmem_plugin_databus/loader.py | 52 +++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index f599641..e42959d 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -173,6 +173,28 @@ def autocomplete( ] +class ResponseStream: + """A Base class for producing messages from Dataset to a Kafka topic.""" + + def __enter__(self): + return self.read() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def __init__(self, response, chunk_size=1048576): + self.response = response + self.chunk_size = chunk_size + + def read(self): + for _ in self.response.iter_content(chunk_size=self.chunk_size): + yield _ + + def close(self): + pass + + + @Plugin( label="Simple Databus Loading Plugin", description="Loads a specfic file from the Databus to a local directory", @@ -276,28 +298,18 @@ def execute( ) return - with databus_file_resp as resp: - for _, chunk in enumerate(resp.iter_content(chunk_size=self.chunk_size)): - data += bytearray(chunk) - desc = f"Downloading File {get_clock(_)}" - context.report.update( - ExecutionReport( - entity_count=len(data) // 1000000, - operation="load", - operation_desc=desc, - ) - ) - graph_uri = self.__get_graph_uri(context) - post_resp = post_streamed_bytes( - str(graph_uri), - byte_iterator_context_update( - bytes(data), context, self.chunk_size, "Uploading File" - ), - replace=True, + upload_response = create_resource( + project_name=context.task.project_id(), + resource_name=self.target_file, + file_resource=ResponseStream(databus_file_resp), + replace=True ) - if post_resp.status_code < 400: + if upload_response.status_code < 400: context.report.update(ExecutionReport(operation_desc="Upload Successful ✓")) else: context.report.update( - ExecutionReport(operation_desc="Upload Failed ❌", error=post_resp.text) + ExecutionReport( + operation_desc="Upload Failed ❌", + error=upload_response.text + ) ) From 9303c2241d921fbf5af56128119543fc7c7c4c42 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Sun, 3 Sep 2023 07:05:30 +0100 Subject: [PATCH 12/24] optimze imports --- cmem_plugin_databus/loader.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index e42959d..edb95cb 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -11,15 +11,12 @@ PluginContext ) from cmem_plugin_base.dataintegration.description import Plugin, PluginParameter -from cmem_plugin_base.dataintegration.parameter.dataset import DatasetParameterType from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin from cmem_plugin_base.dataintegration.types import StringParameterType, Autocompletion from cmem_plugin_base.dataintegration.utils import setup_cmempy_user_access -from cmem_plugin_databus.cmem_wrappers import post_streamed_bytes from cmem_plugin_databus.utils import ( - byte_iterator_context_update, - get_clock, fetch_api_search_result, fetch_facets_options, fetch_databus_files, + fetch_api_search_result, fetch_facets_options, fetch_databus_files, ) @@ -194,7 +191,6 @@ def close(self): pass - @Plugin( label="Simple Databus Loading Plugin", description="Loads a specfic file from the Databus to a local directory", @@ -282,7 +278,6 @@ def execute( setup_cmempy_user_access(context.user) self.log.info(f"Loading file from {self.databus_file_id}") - data: bytearray = bytearray() databus_file_resp = requests.get( self.databus_file_id, allow_redirects=True, From f532376064ef8c366e5bb2687aa4b9a9a9e9a95f Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Sun, 3 Sep 2023 07:25:44 +0100 Subject: [PATCH 13/24] update dependency --- cmem_plugin_databus/loader.py | 21 +++++++++------------ poetry.lock | 28 ++++++++++++++-------------- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index edb95cb..9a30191 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -4,7 +4,6 @@ import requests from cmem.cmempy.workspace.projects.resources import get_resources from cmem.cmempy.workspace.projects.resources.resource import create_resource -from cmem.cmempy.workspace.tasks import get_task from cmem_plugin_base.dataintegration.context import ( ExecutionContext, ExecutionReport, @@ -103,6 +102,7 @@ def autocomplete( class ResourceParameterType(StringParameterType): + """Resource parameter type.""" allow_only_autocompleted_values: bool = True autocomplete_value_with_labels: bool = True @@ -171,25 +171,26 @@ def autocomplete( class ResponseStream: - """A Base class for producing messages from Dataset to a Kafka topic.""" + """A context manager for streaming the content of an HTTP response in chunks. + + This class allows you to stream the content of an HTTP response in manageable chunks + without loading the entire response into memory at once. It provides an iterable + interface to read the response content piece by piece.""" def __enter__(self): - return self.read() + return self._read() def __exit__(self, exc_type, exc_val, exc_tb): - self.close() + pass def __init__(self, response, chunk_size=1048576): self.response = response self.chunk_size = chunk_size - def read(self): + def _read(self): for _ in self.response.iter_content(chunk_size=self.chunk_size): yield _ - def close(self): - pass - @Plugin( label="Simple Databus Loading Plugin", @@ -268,10 +269,6 @@ def __init__( _ = artifact_format _ = artifact_version - def __get_graph_uri(self, context: ExecutionContext): - task_info = get_task(project=context.task.project_id(), task=self.target_file) - return task_info["data"]["parameters"]["graph"]["value"] - def execute( self, inputs=(), context: ExecutionContext = ExecutionContext() ) -> None: diff --git a/poetry.lock b/poetry.lock index f31c537..8126b97 100644 --- a/poetry.lock +++ b/poetry.lock @@ -463,13 +463,13 @@ smmap = ">=3.0.1,<6" [[package]] name = "gitpython" -version = "3.1.32" +version = "3.1.34" description = "GitPython is a Python library used to interact with Git repositories" optional = false python-versions = ">=3.7" files = [ - {file = "GitPython-3.1.32-py3-none-any.whl", hash = "sha256:e3d59b1c2c6ebb9dfa7a184daf3b6dd4914237e7488a1730a6d8f6f5d0b4187f"}, - {file = "GitPython-3.1.32.tar.gz", hash = "sha256:8d9b8cb1e80b9735e8717c9362079d3ce4c6e5ddeebedd0361b228c3a67a62f6"}, + {file = "GitPython-3.1.34-py3-none-any.whl", hash = "sha256:5d3802b98a3bae1c2b8ae0e1ff2e4aa16bcdf02c145da34d092324f599f01395"}, + {file = "GitPython-3.1.34.tar.gz", hash = "sha256:85f7d365d1f6bf677ae51039c1ef67ca59091c7ebd5a3509aa399d4eda02d6dd"}, ] [package.dependencies] @@ -957,13 +957,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4)", "pytest-co [[package]] name = "pluggy" -version = "1.2.0" +version = "1.3.0" description = "plugin and hook calling mechanisms for python" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "pluggy-1.2.0-py3-none-any.whl", hash = "sha256:c2fd55a7d7a3863cba1a013e4e2414658b1d07b6bc57b3919e0c63c9abb99849"}, - {file = "pluggy-1.2.0.tar.gz", hash = "sha256:d12f0c4b579b15f5e054301bb226ee85eeeba08ffec228092f8defbaa3a4c4b3"}, + {file = "pluggy-1.3.0-py3-none-any.whl", hash = "sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7"}, + {file = "pluggy-1.3.0.tar.gz", hash = "sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12"}, ] [package.extras] @@ -1072,13 +1072,13 @@ files = [ [[package]] name = "pytest" -version = "7.4.0" +version = "7.4.1" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.7" files = [ - {file = "pytest-7.4.0-py3-none-any.whl", hash = "sha256:78bf16451a2eb8c7a2ea98e32dc119fd2aa758f1d5d66dbf0a59d69a3969df32"}, - {file = "pytest-7.4.0.tar.gz", hash = "sha256:b4bf8c45bd59934ed84001ad51e11b4ee40d40a1229d2c79f9c592b0a3f6bd8a"}, + {file = "pytest-7.4.1-py3-none-any.whl", hash = "sha256:460c9a59b14e27c602eb5ece2e47bec99dc5fc5f6513cf924a7d03a578991b1f"}, + {file = "pytest-7.4.1.tar.gz", hash = "sha256:2f2301e797521b23e4d2585a0a3d7b5e50fdddaaf7e7d6773ea26ddb17c213ab"}, ] [package.dependencies] @@ -1110,13 +1110,13 @@ testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtuale [[package]] name = "pytest-memray" -version = "1.4.1" +version = "1.5.0" description = "A simple plugin to use with pytest" optional = false python-versions = ">=3.8" files = [ - {file = "pytest_memray-1.4.1-py3-none-any.whl", hash = "sha256:54af95f6a454bb7c0849843ed59c52b67cfe51fc6eeb7e59a4f3d107e5100405"}, - {file = "pytest_memray-1.4.1.tar.gz", hash = "sha256:ea7e35a7e230f5d5244665d3e4daa0bb1bf3a0f89b0ea51734de531c4de87e56"}, + {file = "pytest_memray-1.5.0-py3-none-any.whl", hash = "sha256:120afb6d9f11e2d2fbc6989d750e97e8a127c5d0269a158d99372ee3e164ff5c"}, + {file = "pytest_memray-1.5.0.tar.gz", hash = "sha256:05ec37e23c14967e02994df116bfcca26ce64a017274080c4c4e8f29818bc78a"}, ] [package.dependencies] @@ -1125,7 +1125,7 @@ pytest = ">=7.2" [package.extras] docs = ["furo (>=2022.12.7)", "sphinx (>=6.1.3)", "sphinx-argparse (>=0.4)", "sphinx-inline-tabs (>=2022.1.2b11)", "sphinxcontrib-programoutput (>=0.17)", "towncrier (>=22.12)"] -lint = ["black (==22.12)", "flake8 (==6)", "isort (==5.11.4)", "mypy (==0.991)"] +lint = ["black (==22.12)", "isort (==5.11.4)", "mypy (==0.991)", "ruff (==0.0.272)"] test = ["covdefaults (>=2.2.2)", "coverage (>=7.0.5)", "flaky (>=3.7)", "pytest (>=7.2)", "pytest-xdist (>=3.1)"] [[package]] From 309a81d1b8ea35c696eb75c9a4bfcc8e76245b43 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 4 Sep 2023 08:11:08 +0100 Subject: [PATCH 14/24] add query term to option, if that is not part of project files --- cmem_plugin_databus/loader.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 9a30191..0b6911d 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -121,12 +121,27 @@ def autocomplete( ) -> list[Autocompletion]: setup_cmempy_user_access(context.user) resources = get_resources(context.project_id) - return [ + result = [ Autocompletion( value=f"{_['fullPath']}", label=f"{_['name']}", ) for _ in resources ] + append_query_terms_to_list = True + if query_terms: + for resource in resources: + if resource['fullPath'].lower().startswith(query_terms[0].lower()): + append_query_terms_to_list = False + if append_query_terms_to_list and query_terms: + result.insert( + 0, + Autocompletion( + value=f"{query_terms[0]}", + label=f"{query_terms[0]} (New resource)" + ) + ) + + return result class DatabusFile(StringParameterType): From aae68e1d5b3b325e335f3debdd091bf18b680fea Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 4 Sep 2023 08:21:29 +0100 Subject: [PATCH 15/24] enable dynamic versioning --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 67e8ff0..c234abf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,7 +45,7 @@ requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"] build-backend = "poetry_dynamic_versioning.backend" [tool.poetry-dynamic-versioning] -enable = false +enable = true vcs = "git" dirty = true From c982c073f964dcd531918e56ca3f02f6be1efa61 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 4 Sep 2023 08:46:30 +0100 Subject: [PATCH 16/24] filter option based on query terms --- cmem_plugin_databus/loader.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 0b6911d..9ff5516 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -93,13 +93,16 @@ def autocomplete( } ) _format = result[self.facet_option] - return [ - Autocompletion( - value=f"{_}", - label=f"{_}", - )for _ in _format - ] + result = [ + Autocompletion( + value=f"{_}", + label=f"{_}", + ) for _ in _format + ] + if query_terms: + return [ _ for _ in result if _.value.find(query_terms[0]) > -1 ] + return result class ResourceParameterType(StringParameterType): """Resource parameter type.""" @@ -127,19 +130,16 @@ def autocomplete( label=f"{_['name']}", ) for _ in resources ] - append_query_terms_to_list = True if query_terms: - for resource in resources: - if resource['fullPath'].lower().startswith(query_terms[0].lower()): - append_query_terms_to_list = False - if append_query_terms_to_list and query_terms: - result.insert( - 0, + result = [ _ for _ in result if _.value.find(query_terms[0]) > -1 ] + + if not result and query_terms: + result = [ Autocompletion( value=f"{query_terms[0]}", label=f"{query_terms[0]} (New resource)" ) - ) + ] return result From 947e1ec74fa48be3b921b36956521e9af7f4dfdc Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 4 Sep 2023 10:53:23 +0100 Subject: [PATCH 17/24] fix linter issues --- cmem_plugin_databus/loader.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 9ff5516..dd84fe4 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -85,14 +85,14 @@ def autocomplete( databus_base_url = depend_on_parameter_values[0] databus_document = depend_on_parameter_values[1] - result = fetch_facets_options( + facet_options = fetch_facets_options( databus_base=databus_base_url, url_parameters={ "type": "artifact", "uri": databus_document } ) - _format = result[self.facet_option] + _format = facet_options[self.facet_option] result = [ Autocompletion( value=f"{_}", @@ -100,10 +100,11 @@ def autocomplete( ) for _ in _format ] if query_terms: - return [ _ for _ in result if _.value.find(query_terms[0]) > -1 ] + result = [_ for _ in result if _.value.find(query_terms[0]) > -1] return result + class ResourceParameterType(StringParameterType): """Resource parameter type.""" allow_only_autocompleted_values: bool = True @@ -131,7 +132,7 @@ def autocomplete( ) for _ in resources ] if query_terms: - result = [ _ for _ in result if _.value.find(query_terms[0]) > -1 ] + result = [_ for _ in result if _.value.find(query_terms[0]) > -1] if not result and query_terms: result = [ From 9e4a0011e8f4649457b8269b95d872b857dd0d8f Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Sun, 10 Sep 2023 23:19:08 +0100 Subject: [PATCH 18/24] default chunk_size --- cmem_plugin_databus/loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index dd84fe4..59af903 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -274,7 +274,7 @@ def __init__( artifact_version: str, databus_file_id: str, target_file: str, - chunk_size: int + chunk_size: int = 1048576 ) -> None: self.databus_url = databus_base_url self.databus_file_id = databus_file_id From b174ab97d9250aadc2f8d8c71e5e08076ee68e9a Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 11 Sep 2023 00:17:45 +0100 Subject: [PATCH 19/24] add test for loader --- tests/test_loader.py | 247 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 247 insertions(+) create mode 100644 tests/test_loader.py diff --git a/tests/test_loader.py b/tests/test_loader.py new file mode 100644 index 0000000..d3a89d4 --- /dev/null +++ b/tests/test_loader.py @@ -0,0 +1,247 @@ +import io +from dataclasses import dataclass + +import pytest +from cmem.cmempy.workspace.projects.project import make_new_project, delete_project +from cmem.cmempy.workspace.projects.resources.resource import resource_exist, \ + create_resource + +from cmem_plugin_databus.loader import SimpleDatabusLoadingPlugin, DatabusSearch, \ + ResourceParameterType, FacetSearch, DatabusFile +from .utils import needs_cmem, TestExecutionContext, TestPluginContext + +DATABUS_BASE_URL = "https://databus.dbpedia.org" +DATABUS_DOCUMENT = f"{DATABUS_BASE_URL}/cmempydeveloper/CorporateMemory/Documentation" +DOCUMENT_VERSION = "23.01" +DOCUMENT_FORMAT = "md" +DATABUS_FILE = ( + "https://databus.dbpedia.org/cmempydeveloper/CorporateMemory/" + f"Documentation/{DOCUMENT_VERSION}/Documentation.md" +) + + +def get_autocomplete_values( + parameter, + query_terms, + depend_on_parameter_values, + context +): + """get autocomplete values""" + if depend_on_parameter_values is None: + depend_on_parameter_values = [] + return [ + x.value + for x in parameter.autocomplete( + query_terms=query_terms, + depend_on_parameter_values=depend_on_parameter_values, + context=context + ) + ] + + +@pytest.fixture(name="project") +def project(): + """Provides the DI build project incl. assets.""" + project_name = 'databus_sample_project' + make_new_project(project_name) + yield project_name + delete_project(project_name) + + +@pytest.fixture(name="resource") +def resource(project): + """setup json resource""" + _resource_name = "sample_test.txt" + create_resource( + project_name=project, + resource_name=_resource_name, + file_resource=io.StringIO("SAMPLE CONTENT"), + replace=True + ) + + @dataclass + class FixtureDate: + """fixture dataclass""" + project_name = project + resource_name = _resource_name + + _ = FixtureDate() + yield _ + + +@needs_cmem +def test_databus_load(project): + resource_name = "upload_readme.md" + databus_load = SimpleDatabusLoadingPlugin( + databus_base_url=DATABUS_BASE_URL, + databus_artifact="", + artifact_format="", + artifact_version="", + databus_file_id="https://databus.dbpedia.org/cmempydeveloper/CorporateMemory" + "/Documentation/23.02/Documentation.md", + target_file=resource_name + ) + databus_load.execute( + inputs=(), + context=TestExecutionContext(project_id=project) + ) + assert resource_exist(project_name=project, resource_name=resource_name) + + +@needs_cmem +def test_databus_search_auto_complete(): + parameter = DatabusSearch() + assert '' in get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[], + context=TestPluginContext()) + + assert len(get_autocomplete_values( + parameter, + ['NOTFOUND'], + depend_on_parameter_values=[DATABUS_BASE_URL], + context=TestPluginContext())) == 0 + + +@needs_cmem +def test_resource_parameter_type_completion(resource): + """test resource parameter type completion""" + project_name = resource.project_name + resource_name = resource.resource_name + parameter = ResourceParameterType() + context = TestPluginContext(project_name) + assert resource_name in get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[], + context=context + ) + new_resource_name = "lkshfkdsjfhsd" + assert len( + get_autocomplete_values( + parameter, + [new_resource_name], + depend_on_parameter_values=[], + context=context + ) + ) == 1 + assert new_resource_name in get_autocomplete_values( + parameter, + [new_resource_name], + depend_on_parameter_values=[], + context=context + ) + + +@needs_cmem +def test_facet_search_auto_complete(): + parameter = FacetSearch(facet_option='format') + assert DOCUMENT_FORMAT in get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT + ], + context=TestPluginContext()) + + assert len(get_autocomplete_values( + parameter, + ['NOTFOUND'], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT + ], + context=TestPluginContext())) == 0 + + parameter = FacetSearch(facet_option='version') + assert DOCUMENT_VERSION in get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT + ], + context=TestPluginContext()) + assert DOCUMENT_VERSION not in get_autocomplete_values( + parameter, + ["23.02"], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT + ], + context=TestPluginContext()) + assert len( + get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT + ], + context=TestPluginContext() + ) + ) == 2 + assert len(get_autocomplete_values( + parameter, + ['NOTFOUND'], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT + ], + context=TestPluginContext())) == 0 + + +@needs_cmem +def test_databus_file_auto_complete(): + parameter = DatabusFile() + assert DATABUS_FILE in get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT, + DOCUMENT_FORMAT, + DOCUMENT_VERSION + ], + context=TestPluginContext()) + assert len( + get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT, + DOCUMENT_FORMAT, + DOCUMENT_VERSION + ], + context=TestPluginContext() + ) + ) == 1 + assert len( + get_autocomplete_values( + parameter, + ["ADSLASD"], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT, + DOCUMENT_FORMAT, + DOCUMENT_VERSION + ], + context=TestPluginContext() + ) + ) == 1 + assert len( + get_autocomplete_values( + parameter, + [], + depend_on_parameter_values=[ + DATABUS_BASE_URL, + DATABUS_DOCUMENT, + "NOTFOUND", + DOCUMENT_VERSION + ], + context=TestPluginContext() + ) + ) == 0 From bba0f053d8275c16712008fb74db93e160389c46 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 11 Sep 2023 22:10:14 +0100 Subject: [PATCH 20/24] fix label for Databus File ID parameter --- cmem_plugin_databus/loader.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 59af903..61eab4d 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -175,15 +175,22 @@ def autocomplete( version=artifact_version, file_format=artifact_format ) - return [ - Autocompletion( - value=f"{_['file']['value']}", - label=f"{_['version']['value']}:" - f"{_['variant']['value']}:" - f"{_['format']['value']}:" - f"{_['compression']['value']}", - ) for _ in result - ] + finalized_result = [] + for _ in result: + variant = _["variant"]["value"] \ + if not _["variant"]["value"].startswith(", ") \ + else _["variant"]["value"].replace(", ", "", 1) + finalized_result.append( + Autocompletion( + value=f"{_['file']['value']}", + label=f'Version={_["version"]["value"]}, ' + f'Variant={variant}, ' + f'Format={_["format"]["value"]}, ' + f'Compression={_["compression"]["value"]}, ' + f'Size={_["size"]["value"]} Bytes', + ) + ) + return finalized_result class ResponseStream: From 234cc57033a4ac62ecc837791774a957a336c456 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Mon, 11 Sep 2023 22:19:20 +0100 Subject: [PATCH 21/24] minor modifications to test Constants --- tests/test_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_loader.py b/tests/test_loader.py index d3a89d4..b223f7e 100644 --- a/tests/test_loader.py +++ b/tests/test_loader.py @@ -15,7 +15,7 @@ DOCUMENT_VERSION = "23.01" DOCUMENT_FORMAT = "md" DATABUS_FILE = ( - "https://databus.dbpedia.org/cmempydeveloper/CorporateMemory/" + f"{DATABUS_BASE_URL}/cmempydeveloper/CorporateMemory/" f"Documentation/{DOCUMENT_VERSION}/Documentation.md" ) From d4745340dd94678e1c22372c4b5a11e3e6757c7a Mon Sep 17 00:00:00 2001 From: Sebastian Tramp Date: Mon, 11 Sep 2023 09:57:46 +0200 Subject: [PATCH 22/24] extend documentation, sort auto-complete values, set default values --- cmem_plugin_databus/loader.py | 49 +++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 61eab4d..616dd57 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -93,6 +93,11 @@ def autocomplete( } ) _format = facet_options[self.facet_option] + if self.facet_option == "version": + _format = sorted(_format, reverse=True) + else: + _format = sorted(_format) + result = [ Autocompletion( value=f"{_}", @@ -216,71 +221,75 @@ def _read(self): @Plugin( - label="Simple Databus Loading Plugin", - description="Loads a specfic file from the Databus to a local directory", - documentation=""" -This CMEM task loads a file from the defined Databus to a RDF dataset. -""", + label="Download File from a DBpedia Databus", + description="Download a file artifact listed in a Databus Catalog.", + documentation="This workflow task allows for selecting and downloading a file" + " artifact from a DBpedia Databus to a Corporate Memory dataset" + " resource.", parameters=[ PluginParameter( name="databus_base_url", label="Databus Base URL", - description="The URL of the databus server", + description="The deployment URL of a Databus service," + " e.g. https://databus.dbpedia.org/", ), PluginParameter( name="databus_artifact", - label="Artifact", - description="The name of databus artifact", + label="Artifact URL", + description="The URL of the Databus artifact. You can search by name.", param_type=DatabusSearch(), default_value="" ), PluginParameter( name="artifact_format", label="Format", - description="The format of databus artifact", + description="The format of the Databus artifact. You can select the" + " format, after you have a proper Artifact URL selected.", param_type=FacetSearch(facet_option="format"), default_value="" ), PluginParameter( name="artifact_version", label="Version", - description="The version of databus artifact", + description="The version of Databus artifact. You can select the" + " version, after you have a proper Artifact URL selected.", param_type=FacetSearch(facet_option="version"), default_value="" ), PluginParameter( name="databus_file_id", label="Databus File ID", - description="The Databus file id of the file to download", + description="The Databus file ID of the file to download.", param_type=DatabusFile() ), PluginParameter( name="target_file", label="File", - description="File name to save the response from the Databus.", + description="File name where you want to save the dowloaded file" + " from the Databus.", param_type=ResourceParameterType(), ), PluginParameter( name="chunk_size", label="Chunk Size", - description="Chunksize during up/downloading the graph", + description="Chunksize during up/downloading the graph.", default_value=1048576, advanced=True, ), ], ) class SimpleDatabusLoadingPlugin(WorkflowPlugin): - """Implementation of loading one file from the Databus into a given dataset""" + """Implementation of downloading one file from the Databus to a dataset resource.""" # pylint: disable=too-many-arguments def __init__( self, - databus_base_url: str, - databus_artifact: str, - artifact_format: str, - artifact_version: str, - databus_file_id: str, - target_file: str, + databus_base_url: str = "https://databus.dbpedia.org/", + databus_artifact: str = "", + artifact_format: str = "", + artifact_version: str = "", + databus_file_id: str = "", + target_file: str = "", chunk_size: int = 1048576 ) -> None: self.databus_url = databus_base_url From 3ec9f1d22683d978f7592b8557b4cf9110c507c0 Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Tue, 12 Sep 2023 07:59:38 +0100 Subject: [PATCH 23/24] add plugin_id=cmem-plugin-databus-Download --- cmem_plugin_databus/loader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index 616dd57..ca0deaa 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -222,6 +222,7 @@ def _read(self): @Plugin( label="Download File from a DBpedia Databus", + plugin_id="cmem-plugin-databus-Download", description="Download a file artifact listed in a Databus Catalog.", documentation="This workflow task allows for selecting and downloading a file" " artifact from a DBpedia Databus to a Corporate Memory dataset" From cecf783d236b95e70295e23a68b2bfd1de8e48ca Mon Sep 17 00:00:00 2001 From: saipraneeth <2506664+msaipraneeth@users.noreply.github.com> Date: Tue, 12 Sep 2023 08:03:53 +0100 Subject: [PATCH 24/24] fix typo in plugin_id --- cmem_plugin_databus/loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmem_plugin_databus/loader.py b/cmem_plugin_databus/loader.py index ca0deaa..1e5789d 100644 --- a/cmem_plugin_databus/loader.py +++ b/cmem_plugin_databus/loader.py @@ -222,7 +222,7 @@ def _read(self): @Plugin( label="Download File from a DBpedia Databus", - plugin_id="cmem-plugin-databus-Download", + plugin_id="cmem_plugin_databus-Download", description="Download a file artifact listed in a Databus Catalog.", documentation="This workflow task allows for selecting and downloading a file" " artifact from a DBpedia Databus to a Corporate Memory dataset"