Skip to content

Commit

Permalink
Improve data directory functionality (#1509)
Browse files Browse the repository at this point in the history
* Create services storage script

* Files list end point refactor (#1529)

* First refactorization for files

* Completed use cases for list end-point

* Renamed storage to file_storage

* Migrated files to file_storage

* Migrated get files to the new service

* Check if function exists first

* Separated end-points to the new provider

* Improved tests

* Restore original fixtures

* Solved the problem with the external if

* Added Literal type in the FileStorage service

* functions methods refactorization

* use self.username instead of the variable

* run path sanitization only one time

* function title is mandatory for file storage

* fix black

* Updated 403 by 404

* Updated swagger for list

* Makes use of enum with integers

* use is instead of equals

* unify path build

* get_function refactor

* Add support new files (#1546)

* new list files for the gateway refactor

* unify files url

* remove enum and linter

* remove provider parameter

* Check provider

* Download end point refactor (#1547)

* refactor of download end-points

* additional test to check non existing file

* renamed files test to v1_files

* added additional checks for the query

* included the checks in the list end-point

* include swagger documentation updated

* remove unneeded try except

* make use of regex instead of a manual parsing

* check not all instead of None

* rename file_extension_is_valid

* Update gateway/api/utils.py

Co-authored-by: Goyo <[email protected]>

---------

Co-authored-by: Goyo <[email protected]>

* add support to new download files endpoints (#1550)

* Gateway/delete end point (#1554)

* add support to new delete files endpoints

* fix get parameter

* fix tests

* delete programDelete fixture

* update file_storage/remove_file description

* Upload end point (#1555)

* Update upload end-point

* removed unused imports

* updated comment in upload end-point

* remove file extension limitation from files (#1559)

* Client - files upload refactor (#1557)

* add support to new upload file endpoints

* replace data with params

* Client - files delete refactor (#1556)

* add support to new delete files endpoints

* fix get parameter

* fix tests

* adapt files delete to the refactor

* replace data with params

* remove not used fixture

* Integration tests fix (#1561)

* add support to new delete files endpoints

* fix get parameter

* fix tests

* adapt files delete to the refactor

* replace data with params

* remove not used fixture

* tests fixed

* fix client integration

* fix context manager in files when download

* added a new test for provider end-points

* fix black

* migrated old tests

* Update gateway/api/services/file_storage.py

Co-authored-by: Goyo <[email protected]>

* remove additional line

---------

Co-authored-by: David <[email protected]>

* Trace decorator (#1553)

* create a trace decorator

* fix decorator

* change documentation

---------

Co-authored-by: David <[email protected]>

* Files refactor to include repositories logic (#1560)

* remove unneeded provider methods

* create access policies file

* refactor get_functions repository method

* refactor groups and repositories

* repository refactor from files

* fix some linter problems

* fixed a bug when the user retrieves a function

* fix lint

* refactor of get_function method

* remove artifact test file

* remove programs access policies

* refactor programs references to functions

* group repository refactor

* rename groups repository into user repository

* simplified get_function methods

* fix query

* adapt get_functions methods

* updated comments

* create path if doesn't exist

* remove some unused code

* fix files client

* fix typos

* fixed the creation of the directory

* added a test for the provider end-points

* fix some typos from the provider end-points

* fix black on tests

* files documentation updated (#1565)

* Refactor storage in scheduler (#1563)

* refactor storage in scheduler

* fix typo in ray template

* remove unused node_image

* use sub-paths for the cluster template

* storage absolute path refactor

* docstrings updated in the client

---------

Co-authored-by: Goyo <[email protected]>
  • Loading branch information
Tansito and korgan00 authored Jan 13, 2025
1 parent 502c54f commit ccd159a
Show file tree
Hide file tree
Showing 27 changed files with 2,517 additions and 931 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ data:
{{- end }}
- mountPath: /data
name: user-storage
subPath: {{`{{ user_id }}`}}
subPath: {{`{{ user_data_folder }}`}}
- mountPath: /function_data
name: user-storage
subPath: {{`{{ function_data }}`}}
subPath: {{`{{ provider_data_folder }}`}}
env:
# Environment variables for Ray TLS authentication.
# See https://docs.ray.io/en/latest/ray-core/configure.html#tls-authentication for more details.
Expand Down Expand Up @@ -184,7 +184,7 @@ data:
{{- end }}
- mountPath: /data
name: user-storage
subPath: {{`{{ user_id }}`}}
subPath: {{`{{ user_data_folder }}`}}
env:
# Environment variables for Ray TLS authentication.
# See https://docs.ray.io/en/latest/ray-core/configure.html#tls-authentication for more details.
Expand Down
253 changes: 137 additions & 116 deletions client/qiskit_serverless/core/clients/serverless_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
MAX_ARTIFACT_FILE_SIZE_MB,
)
from qiskit_serverless.core.client import BaseClient
from qiskit_serverless.core.decorators import trace_decorator_factory
from qiskit_serverless.core.files import GatewayFilesClient
from qiskit_serverless.core.job import (
Job,
Expand All @@ -72,6 +73,9 @@
QiskitObjectsDecoder,
)

_trace_job = trace_decorator_factory("job")
_trace_functions = trace_decorator_factory("function")


class ServerlessClient(BaseClient):
"""
Expand Down Expand Up @@ -146,47 +150,45 @@ def _verify_token(self, token: str):
####### JOBS #######
####################

@_trace_job("list")
def jobs(self, **kwargs) -> List[Job]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.list"):
limit = kwargs.get("limit", 10)
kwargs["limit"] = limit
offset = kwargs.get("offset", 0)
kwargs["offset"] = offset

response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs",
params=kwargs,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
limit = kwargs.get("limit", 10)
kwargs["limit"] = limit
offset = kwargs.get("offset", 0)
kwargs["offset"] = offset

response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs",
params=kwargs,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)

return [
Job(job.get("id"), job_service=self, raw_data=job)
for job in response_data.get("results", [])
]

@_trace_job("get")
def job(self, job_id: str) -> Optional[Job]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.get"):
url = f"{self.host}/api/{self.version}/jobs/{job_id}/"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
url,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
url = f"{self.host}/api/{self.version}/jobs/{job_id}/"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
url,
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)

job = None
job_id = response_data.get("id")
if job_id is not None:
job = Job(
job_id=job_id,
job_service=self,
)
job = None
job_id = response_data.get("id")
if job_id is not None:
job = Job(
job_id=job_id,
job_service=self,
)

return job

Expand All @@ -205,7 +207,7 @@ def run(

tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
span.set_attribute("program", title)
span.set_attribute("function", title)
span.set_attribute("provider", provider)
span.set_attribute("arguments", str(arguments))

Expand Down Expand Up @@ -234,66 +236,62 @@ def run(

return Job(job_id, job_service=self)

@_trace_job
def status(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.status"):
default_status = "Unknown"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
default_status = "Unknown"
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)

return response_data.get("status", default_status)

@_trace_job
def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.stop"):
if service:
data = {
"service": json.dumps(service, cls=QiskitObjectsEncoder),
}
else:
data = {
"service": None,
}
response_data = safe_json_request_as_dict(
request=lambda: requests.post(
f"{self.host}/api/{self.version}/jobs/{job_id}/stop/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
json=data,
)
if service:
data = {
"service": json.dumps(service, cls=QiskitObjectsEncoder),
}
else:
data = {
"service": None,
}
response_data = safe_json_request_as_dict(
request=lambda: requests.post(
f"{self.host}/api/{self.version}/jobs/{job_id}/stop/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
json=data,
)
)

return response_data.get("message")

@_trace_job
def result(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.result"):
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)
return json.loads(
response_data.get("result", "{}") or "{}", cls=QiskitObjectsDecoder
)

@_trace_job
def logs(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.logs"):
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/logs/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/jobs/{job_id}/logs/",
headers={"Authorization": f"Bearer {self.token}"},
timeout=REQUESTS_TIMEOUT,
)
)
return response_data.get("logs")

def filtered_logs(self, job_id: str, **kwargs):
Expand Down Expand Up @@ -323,8 +321,8 @@ def filtered_logs(self, job_id: str, **kwargs):

def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
span.set_attribute("program", program.title)
with tracer.start_as_current_span("function.upload") as span:
span.set_attribute("function", program.title)
url = f"{self.host}/api/{self.version}/programs/upload/"

if program.image is not None:
Expand All @@ -344,18 +342,17 @@ def upload(self, program: QiskitFunction) -> Optional[RunnableQiskitFunction]:

return function_uploaded

@_trace_functions("list")
def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
"""Returns list of available programs."""
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("program.list"):
response_data = safe_json_request_as_list(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs",
headers={"Authorization": f"Bearer {self.token}"},
params=kwargs,
timeout=REQUESTS_TIMEOUT,
)
"""Returns list of available functions."""
response_data = safe_json_request_as_list(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs",
headers={"Authorization": f"Bearer {self.token}"},
params=kwargs,
timeout=REQUESTS_TIMEOUT,
)
)

return [
RunnableQiskitFunction(
Expand All @@ -368,6 +365,7 @@ def functions(self, **kwargs) -> List[RunnableQiskitFunction]:
for program in response_data
]

@_trace_functions("get_by_title")
def function(
self, title: str, provider: Optional[str] = None
) -> Optional[RunnableQiskitFunction]:
Expand All @@ -376,50 +374,73 @@ def function(
request_provider=provider, title=title
)

tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("program.get_by_title"):
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs/get_by_title/{title}",
headers={"Authorization": f"Bearer {self.token}"},
params={"provider": provider},
timeout=REQUESTS_TIMEOUT,
)
)
return RunnableQiskitFunction(
client=self,
title=response_data.get("title"),
provider=response_data.get("provider", None),
raw_data=response_data,
response_data = safe_json_request_as_dict(
request=lambda: requests.get(
f"{self.host}/api/{self.version}/programs/get_by_title/{title}",
headers={"Authorization": f"Bearer {self.token}"},
params={"provider": provider},
timeout=REQUESTS_TIMEOUT,
)
)

return RunnableQiskitFunction(
client=self,
title=response_data.get("title"),
provider=response_data.get("provider", None),
raw_data=response_data,
)

#####################
####### FILES #######
#####################

def files(self, provider: Optional[str] = None) -> List[str]:
"""Returns list of available files produced by programs to download."""
return self._files_client.list(provider)
def files(self, function: QiskitFunction) -> List[str]:
"""Returns the list of files available for the user in the Qiskit Function folder."""
return self._files_client.list(function)

def provider_files(self, function: QiskitFunction) -> List[str]:
"""Returns the list of files available for the provider in the Qiskit Function folder."""
return self._files_client.provider_list(function)

def file_download(
self,
file: str,
function: QiskitFunction,
target_name: Optional[str] = None,
download_location: str = "./",
provider: Optional[str] = None,
):
"""Download file."""
"""Download a file available to the user for the specific Qiskit Function."""
return self._files_client.download(
file, download_location, target_name, provider
file, download_location, function, target_name
)

def provider_file_download(
self,
file: str,
function: QiskitFunction,
target_name: Optional[str] = None,
download_location: str = "./",
):
"""Download a file available to the provider for the specific Qiskit Function."""
return self._files_client.provider_download(
file, download_location, function, target_name
)

def file_delete(self, file: str, provider: Optional[str] = None):
"""Deletes file uploaded or produced by the programs,"""
return self._files_client.delete(file, provider)
def file_delete(self, file: str, function: QiskitFunction):
"""Deletes a file available to the user for the specific Qiskit Function."""
return self._files_client.delete(file, function)

def provider_file_delete(self, file: str, function: QiskitFunction):
"""Deletes a file available to the provider for the specific Qiskit Function."""
return self._files_client.provider_delete(file, function)

def file_upload(self, file: str, function: QiskitFunction):
"""Uploads a file in the specific user's Qiskit Function folder."""
return self._files_client.upload(file, function)

def file_upload(self, file: str, provider: Optional[str] = None):
"""Upload file."""
return self._files_client.upload(file, provider)
def provider_file_upload(self, file: str, function: QiskitFunction):
"""Uploads a file in the specific provider's Qiskit Function folder."""
return self._files_client.provider_upload(file, function)


class IBMServerlessClient(ServerlessClient):
Expand Down Expand Up @@ -520,8 +541,8 @@ def _upload_with_docker_image(
)
program_title = response_data.get("title", "na")
program_provider = response_data.get("provider", "na")
span.set_attribute("program.title", program_title)
span.set_attribute("program.provider", program_provider)
span.set_attribute("function.title", program_title)
span.set_attribute("function.provider", program_provider)
response_data["client"] = client
return RunnableQiskitFunction.from_json(response_data)

Expand Down Expand Up @@ -588,8 +609,8 @@ def _upload_with_artifact(
timeout=REQUESTS_TIMEOUT,
)
)
span.set_attribute("program.title", response_data.get("title", "na"))
span.set_attribute("program.provider", response_data.get("provider", "na"))
span.set_attribute("function.title", response_data.get("title", "na"))
span.set_attribute("function.provider", response_data.get("provider", "na"))
response_data["client"] = client
response_function = RunnableQiskitFunction.from_json(response_data)
except Exception as error: # pylint: disable=broad-exception-caught
Expand Down
Loading

0 comments on commit ccd159a

Please sign in to comment.