Skip to content

Commit

Permalink
LocalProvider
Browse files Browse the repository at this point in the history
Signed-off-by: Akihiko Kuroda <[email protected]>
  • Loading branch information
akihikokuroda committed Oct 28, 2023
1 parent c205da6 commit 8119f58
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 1 deletion.
1 change: 1 addition & 0 deletions client/quantum_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ServerlessProvider,
IBMServerlessProvider,
RayProvider,
LocalProvider,
save_result,
)
from .quantum_serverless import (
Expand Down
11 changes: 10 additions & 1 deletion client/quantum_serverless/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
IBMServerlessProvider
BaseProvider
RayProvider
LocalProvider
ComputeResource
Job
GatewayJobClient
Expand Down Expand Up @@ -58,9 +59,17 @@
Provider,
ServerlessProvider,
IBMServerlessProvider,
LocalProvider,
RayProvider,
)
from .job import BaseJobClient, RayJobClient, GatewayJobClient, Job, save_result
from .job import (
BaseJobClient,
RayJobClient,
GatewayJobClient,
LocalJobClient,
Job,
save_result,
)
from .program import (
Program,
ProgramStorage,
Expand Down
71 changes: 71 additions & 0 deletions client/quantum_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@
import os
import tarfile
import time
import sys
from pathlib import Path
from typing import Dict, Any, Optional, List
from uuid import uuid4

import subprocess
from subprocess import Popen
import re

import ray.runtime_env
import requests
from ray.dashboard.modules.job.sdk import JobSubmissionClient
Expand Down Expand Up @@ -153,6 +158,71 @@ def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None):
return Job(job_id=job_id, job_client=self)


class LocalJobClient(BaseJobClient):
"""LocalJobClient."""

def __init__(self):
"""Local job client.
Args:
"""
self._jobs = {}

def status(self, job_id: str):
return self._jobs[job_id]["status"]

def stop(self, job_id: str):
"""Stops job/program."""
return f"job:{job_id} has already stopped"

def logs(self, job_id: str):
return self._jobs[job_id]["logs"]

def result(self, job_id: str):
return self._jobs[job_id]["result"]

def get(self, job_id) -> Optional["Job"]:
return self._jobs[job_id]["job"]

def list(self, **kwargs) -> List["Job"]:
return [job["job"] for job in list(self._jobs.values())]

def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None):
if program.dependencies:
for dependency in program.dependencies:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", dependency]
)
arguments = arguments or {}
env_vars = {
**(program.env_vars or {}),
**{OT_PROGRAM_NAME: program.title},
**{"PATH": os.environ["PATH"]},
**{ENV_JOB_ARGUMENTS: json.dumps(arguments, cls=QiskitObjectsEncoder)},
}

with Popen(
["python", program.working_dir + program.entrypoint],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
env=env_vars,
) as pipe:
status = "SUCCEEDED"
if pipe.wait():
status = "FAILED"
output, _ = pipe.communicate()
results = re.search("\nSaved Result:(.+?):End Saved Result\n", output)
result = ""
if results:
result = results.group(1)

job = Job(job_id=uuid4(), job_client=self)
entry = {"status": status, "logs": output, "result": result, "job": job}
self._jobs[job.job_id] = entry
return job


class GatewayJobClient(BaseJobClient):
"""GatewayJobClient."""

Expand Down Expand Up @@ -403,6 +473,7 @@ def save_result(result: Dict[str, Any]):
"authorization token in the environment."
)
logging.info("Result: %s", result)
print(f"\nSaved Result:{result}:End Saved Result\n")
return False

if not is_jsonable(result, cls=QiskitObjectsEncoder):
Expand Down
25 changes: 25 additions & 0 deletions client/quantum_serverless/core/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
Job,
RayJobClient,
GatewayJobClient,
LocalJobClient,
BaseJobClient,
)
from quantum_serverless.core.program import Program
Expand Down Expand Up @@ -501,3 +502,27 @@ def get_job_by_id(self, job_id: str) -> Optional[Job]:

def get_jobs(self, **kwargs) -> List[Job]:
return self.client.list()


class LocalProvider(BaseProvider):
"""RayProvider."""

def __init__(self):
"""Local provider
Args:
Example:
>>> local = LocalProvider())
"""
super().__init__("local-provier")
self.client = LocalJobClient()

def run(self, program: Program, arguments: Optional[Dict[str, Any]] = None) -> Job:
return self.client.run(program, arguments)

def get_job_by_id(self, job_id: str) -> Optional[Job]:
return self.client.get(job_id)

def get_jobs(self, **kwargs) -> List[Job]:
return self.client.list()

0 comments on commit 8119f58

Please sign in to comment.