Skip to content

Commit

Permalink
Set Cluster Configuration to operate in memory by default
Browse files Browse the repository at this point in the history
  • Loading branch information
Bobbins228 authored and openshift-merge-bot[bot] committed Mar 19, 2024
1 parent 0c014f1 commit 171e91a
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 91 deletions.
202 changes: 119 additions & 83 deletions src/codeflare_sdk/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ def __init__(self, config: ClusterConfiguration):
self.config = config
self.app_wrapper_yaml = self.create_app_wrapper()
self._job_submission_client = None
self.app_wrapper_name = self.app_wrapper_yaml.replace(".yaml", "").split("/")[
-1
]
self.app_wrapper_name = self.config.name

@property
def _client_headers(self):
Expand Down Expand Up @@ -192,6 +190,7 @@ def create_app_wrapper(self):
dispatch_priority = self.config.dispatch_priority
ingress_domain = self.config.ingress_domain
ingress_options = self.config.ingress_options
write_to_file = self.config.write_to_file
return generate_appwrapper(
name=name,
namespace=namespace,
Expand All @@ -217,6 +216,7 @@ def create_app_wrapper(self):
openshift_oauth=self.config.openshift_oauth,
ingress_domain=ingress_domain,
ingress_options=ingress_options,
write_to_file=write_to_file,
)

# creates a new cluster with the provided or default spec
Expand All @@ -235,15 +235,25 @@ def up(self):
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
if self.config.mcad:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
body=aw,
)
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
aw = yaml.load(f, Loader=yaml.FullLoader)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
body=aw,
)
else:
aw = yaml.safe_load(self.app_wrapper_yaml)
api_instance.create_namespaced_custom_object(
group="workload.codeflare.dev",
version="v1beta1",
namespace=namespace,
plural="appwrappers",
body=aw,
)
else:
self._component_resources_up(namespace, api_instance)
except Exception as e: # pragma: no cover
Expand Down Expand Up @@ -492,7 +502,9 @@ def torchx_config(
to_return["requirements"] = requirements
return to_return

def from_k8_cluster_object(rc, mcad=True, ingress_domain=None, ingress_options={}):
def from_k8_cluster_object(
rc, mcad=True, ingress_domain=None, ingress_options={}, write_to_file=False
):
machine_types = (
rc["metadata"]["labels"]["orderedinstance"].split("_")
if "orderedinstance" in rc["metadata"]["labels"]
Expand Down Expand Up @@ -538,6 +550,7 @@ def from_k8_cluster_object(rc, mcad=True, ingress_domain=None, ingress_options={
mcad=mcad,
ingress_domain=ingress_domain,
ingress_options=ingress_options,
write_to_file=write_to_file,
)
return Cluster(cluster_config)

Expand All @@ -551,79 +564,25 @@ def local_client_url(self):
def _component_resources_up(
self, namespace: str, api_instance: client.CustomObjectsApi
):
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.create_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
body=resource,
)
elif resource["kind"] == "Ingress":
api_instance.create_namespaced_custom_object(
group="networking.k8s.io",
version="v1",
namespace=namespace,
plural="ingresses",
body=resource,
)
elif resource["kind"] == "Route":
api_instance.create_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
body=resource,
)
elif resource["kind"] == "Secret":
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.create_namespaced_secret(
namespace=namespace,
body=resource,
)
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
_create_resources(yamls, namespace, api_instance)
else:
yamls = yaml.load_all(self.app_wrapper_yaml, Loader=yaml.FullLoader)
_create_resources(yamls, namespace, api_instance)

def _component_resources_down(
self, namespace: str, api_instance: client.CustomObjectsApi
):
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.delete_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
name=self.app_wrapper_name,
)
elif resource["kind"] == "Ingress":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="networking.k8s.io",
version="v1",
namespace=namespace,
plural="ingresses",
name=name,
)
elif resource["kind"] == "Route":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
name=name,
)
elif resource["kind"] == "Secret":
name = resource["metadata"]["name"]
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.delete_namespaced_secret(
namespace=namespace,
name=name,
)
cluster_name = self.config.name
if self.config.write_to_file:
with open(self.app_wrapper_yaml) as f:
yamls = yaml.load_all(f, Loader=yaml.FullLoader)
_delete_resources(yamls, namespace, api_instance, cluster_name)
else:
yamls = yaml.safe_load_all(self.app_wrapper_yaml)
_delete_resources(yamls, namespace, api_instance, cluster_name)


def list_all_clusters(namespace: str, print_to_console: bool = True):
Expand Down Expand Up @@ -675,7 +634,9 @@ def get_current_namespace(): # pragma: no cover
return None


def get_cluster(cluster_name: str, namespace: str = "default"):
def get_cluster(
cluster_name: str, namespace: str = "default", write_to_file: bool = False
):
try:
config_check()
api_instance = client.CustomObjectsApi(api_config_handler())
Expand Down Expand Up @@ -746,13 +707,88 @@ def get_cluster(cluster_name: str, namespace: str = "default"):
mcad=mcad,
ingress_domain=ingress_domain,
ingress_options=ingress_options,
write_to_file=write_to_file,
)
raise FileNotFoundError(
f"Cluster {cluster_name} is not found in {namespace} namespace"
)


# private methods
def _delete_resources(
yamls, namespace: str, api_instance: client.CustomObjectsApi, cluster_name: str
):
for resource in yamls:
if resource["kind"] == "RayCluster":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
name=name,
)
elif resource["kind"] == "Ingress":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="networking.k8s.io",
version="v1",
namespace=namespace,
plural="ingresses",
name=name,
)
elif resource["kind"] == "Route":
name = resource["metadata"]["name"]
api_instance.delete_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
name=name,
)
elif resource["kind"] == "Secret":
name = resource["metadata"]["name"]
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.delete_namespaced_secret(
namespace=namespace,
name=name,
)


def _create_resources(yamls, namespace: str, api_instance: client.CustomObjectsApi):
for resource in yamls:
if resource["kind"] == "RayCluster":
api_instance.create_namespaced_custom_object(
group="ray.io",
version="v1",
namespace=namespace,
plural="rayclusters",
body=resource,
)
elif resource["kind"] == "Ingress":
api_instance.create_namespaced_custom_object(
group="networking.k8s.io",
version="v1",
namespace=namespace,
plural="ingresses",
body=resource,
)
elif resource["kind"] == "Route":
api_instance.create_namespaced_custom_object(
group="route.openshift.io",
version="v1",
namespace=namespace,
plural="routes",
body=resource,
)
elif resource["kind"] == "Secret":
secret_instance = client.CoreV1Api(api_config_handler())
secret_instance.create_namespaced_secret(
namespace=namespace,
body=resource,
)


def _check_aw_exists(name: str, namespace: str) -> bool:
try:
config_check()
Expand Down
1 change: 1 addition & 0 deletions src/codeflare_sdk/cluster/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ class ClusterConfiguration:
openshift_oauth: bool = False # NOTE: to use the user must have permission to create a RoleBinding for system:auth-delegator
ingress_options: dict = field(default_factory=dict)
ingress_domain: str = None
write_to_file: bool = False
38 changes: 34 additions & 4 deletions src/codeflare_sdk/utils/generate_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,27 @@ def write_components(user_yaml: dict, output_file_name: str):
print(f"Written to: {output_file_name}")


def load_components(user_yaml: dict, name: str):
component_list = []
components = user_yaml.get("spec", "resources")["resources"].get("GenericItems")
for component in components:
if "generictemplate" in component:
component_list.append(component["generictemplate"])

resources = "---\n" + "---\n".join(
[yaml.dump(component) for component in component_list]
)
user_yaml = resources
print(f"Yaml resources loaded for {name}")
return user_yaml


def load_appwrapper(user_yaml: dict, name: str):
user_yaml = yaml.dump(user_yaml)
print(f"Yaml resources loaded for {name}")
return user_yaml


def generate_appwrapper(
name: str,
namespace: str,
Expand All @@ -665,6 +686,7 @@ def generate_appwrapper(
openshift_oauth: bool,
ingress_domain: str,
ingress_options: dict,
write_to_file: bool,
):
user_yaml = read_template(template)
appwrapper_name, cluster_name = gen_names(name)
Expand Down Expand Up @@ -724,8 +746,16 @@ def generate_appwrapper(

directory_path = os.path.expanduser("~/.codeflare/appwrapper/")
outfile = os.path.join(directory_path, appwrapper_name + ".yaml")
if not mcad:
write_components(user_yaml, outfile)

if write_to_file:
if mcad:
write_user_appwrapper(user_yaml, outfile)
else:
write_components(user_yaml, outfile)
return outfile
else:
write_user_appwrapper(user_yaml, outfile)
return outfile
if mcad:
user_yaml = load_appwrapper(user_yaml, name)
else:
user_yaml = load_components(user_yaml, name)
return user_yaml
1 change: 1 addition & 0 deletions tests/e2e/mnist_raycluster_sdk_oauth_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def run_mnist_raycluster_sdk_oauth(self):
instascale=False,
image=ray_image,
openshift_oauth=True,
write_to_file=True,
)
)

Expand Down
1 change: 1 addition & 0 deletions tests/e2e/mnist_raycluster_sdk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def run_mnist_raycluster_sdk(self):
instascale=False,
image=ray_image,
ingress_options=ingress_options,
write_to_file=True,
)
)

Expand Down
Loading

0 comments on commit 171e91a

Please sign in to comment.