diff --git a/config/systems.json b/config/systems.json index bb21dcd9..12b4bb59 100644 --- a/config/systems.json +++ b/config/systems.json @@ -234,5 +234,47 @@ } } } + }, + "knative": { + "languages": { + "python": { + "base_images": { + "3.9": "python:3.9-slim", + "3.10": "python:3.10-slim" + }, + "images": [ + "build", + "run" + ], + "username": "docker_user", + "deployment": { + "files": [ + "handler.py", + "storage.py" + ], + "packages": { + "parliament-functions": "0.1.0" + } + } + }, + "nodejs": { + "base_images": { + "20": "node:20", + "18": "node:18" + }, + "images": [ + "build", + "run" + ], + "username": "docker_user", + "deployment": { + "files": [ + "handler.js", + "storage.js" + ], + "packages": [] + } + } + } } } diff --git a/sebs.py b/sebs.py index ff7f7769..b95e51d6 100755 --- a/sebs.py +++ b/sebs.py @@ -88,7 +88,7 @@ def common_params(func): @click.option( "--deployment", default=None, - type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk"]), + type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk", "knative"]), help="Cloud deployment to use.", ) @click.option( diff --git a/sebs/faas/config.py b/sebs/faas/config.py index 19c7d3ab..889a0102 100644 --- a/sebs/faas/config.py +++ b/sebs/faas/config.py @@ -204,6 +204,10 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config from sebs.openwhisk.config import OpenWhiskConfig implementations["openwhisk"] = OpenWhiskConfig.deserialize + if has_platform("knative"): + from sebs.knative.config import KnativeConfig + + implementations["knative"] = KnativeConfig.deserialize func = implementations.get(name) assert func, "Unknown config type!" return func(config[name] if name in config else config, cache, handlers) diff --git a/sebs/knative/config.py b/sebs/knative/config.py new file mode 100644 index 00000000..c9ddbb2c --- /dev/null +++ b/sebs/knative/config.py @@ -0,0 +1,196 @@ +from sebs.cache import Cache +from sebs.faas.config import Credentials, Resources, Config +from sebs.utils import LoggingHandlers +from sebs.storage.config import MinioConfig + +from typing import cast, Optional + + +class KnativeResources(Resources): + def __init__( + self, + registry: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + registry_updated: bool = False, + ): + super().__init__(name="knative") + self._docker_registry = registry if registry != "" else None + self._docker_username = username if username != "" else None + self._docker_password = password if password != "" else None + self._registry_updated = registry_updated + self._storage: Optional[MinioConfig] = None + self._storage_updated = False + + @staticmethod + def typename() -> str: + return "Knative.Resources" + + @property + def docker_registry(self) -> Optional[str]: + return self._docker_registry + + @property + def docker_username(self) -> Optional[str]: + return self._docker_username + + @property + def docker_password(self) -> Optional[str]: + return self._docker_password + + @property + def storage_config(self) -> Optional[MinioConfig]: + return self._storage + + @property + def storage_updated(self) -> bool: + return self._storage_updated + + @property + def registry_updated(self) -> bool: + return self._registry_updated + + @staticmethod + def initialize(res: Resources, dct: dict): + ret = cast(KnativeResources, res) + ret._docker_registry = dct["registry"] + ret._docker_username = dct["username"] + ret._docker_password = dct["password"] + + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Resources: + + cached_config = cache.get_config("knative") + ret = KnativeResources() + if cached_config: + super(KnativeResources, KnativeResources).initialize( + ret, cached_config["resources"] + ) + + # Check for new config - overrides but check if it's different + if "docker_registry" in config: + + KnativeResources.initialize(ret, config["docker_registry"]) + ret.logging.info("Using user-provided Docker registry for Knative.") + ret.logging_handlers = handlers + + # check if there has been an update + if not ( + cached_config + and "resources" in cached_config + and "docker" in cached_config["resources"] + and cached_config["resources"]["docker"] == config["docker_registry"] + ): + ret._registry_updated = True + + # Load cached values + elif ( + cached_config + and "resources" in cached_config + and "docker" in cached_config["resources"] + ): + KnativeResources.initialize(ret, cached_config["resources"]["docker"]) + ret.logging_handlers = handlers + ret.logging.info("Using cached Docker registry for Knative") + else: + ret = KnativeResources() + ret.logging.info("Using default Docker registry for Knative.") + ret.logging_handlers = handlers + ret._registry_updated = True + + # Check for new config + if "storage" in config: + ret._storage = MinioConfig.deserialize(config["storage"]) + ret.logging.info("Using user-provided configuration of storage for Knative.") + + # check if there has been an update + if not ( + cached_config + and "resources" in cached_config + and "storage" in cached_config["resources"] + and cached_config["resources"]["storage"] == config["storage"] + ): + ret.logging.info( + "User-provided configuration is different from cached storage, " + "we will update existing Knative actions." + ) + ret._storage_updated = True + + # Load cached values + elif ( + cached_config + and "resources" in cached_config + and "storage" in cached_config["resources"] + ): + ret._storage = MinioConfig.deserialize(cached_config["resources"]["storage"]) + ret.logging.info("Using cached configuration of storage for Knative.") + + return ret + + def update_cache(self, cache: Cache): + super().update_cache(cache) + cache.update_config( + val=self.docker_registry, keys=["knative", "resources", "docker", "registry"] + ) + cache.update_config( + val=self.docker_username, keys=["knative", "resources", "docker", "username"] + ) + cache.update_config( + val=self.docker_password, keys=["knative", "resources", "docker", "password"] + ) + if self._storage: + self._storage.update_cache(["knative", "resources", "storage"], cache) + + def serialize(self) -> dict: + out: dict = { + **super().serialize(), + "docker_registry": self.docker_registry, + "docker_username": self.docker_username, + "docker_password": self.docker_password, + } + if self._storage: + out = {**out, "storage": self._storage.serialize()} + return out + + +class KnativeConfig(Config): + name: str + cache: Cache + + def __init__(self, config: dict, cache: Cache): + super().__init__(name="knative") + self._resources = KnativeResources() + self.knative_exec = config["knativeExec"] + self.cache = cache + + @property + def resources(self) -> KnativeResources: + return self._resources + + @staticmethod + def initialize(cfg: Config, dct: dict): + cfg._region = dct["region"] + + def serialize(self) -> dict: + return { + "name": self._name, + "region": self._region, + "knativeExec": self.knative_exec, + "resources": self._resources.serialize(), + } + + @staticmethod + def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config: + cached_config = cache.get_config("knative") + resources = cast( + KnativeResources, KnativeResources.deserialize(config, cache, handlers) + ) + + res = KnativeConfig(config, cached_config) + res.logging_handlers = handlers + res._resources = resources + return res + + def update_cache(self, cache: Cache): + cache.update_config(val=self.knative_exec, keys=["knative", "knativeExec"]) + self.resources.update_cache(cache) diff --git a/sebs/knative/function.py b/sebs/knative/function.py new file mode 100644 index 00000000..404f4da4 --- /dev/null +++ b/sebs/knative/function.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +from typing import cast, Optional +from dataclasses import dataclass + +from sebs.benchmark import Benchmark +from sebs.faas.function import Function, FunctionConfig, Runtime +from sebs.storage.config import MinioConfig + +@dataclass +class KnativeFunctionConfig(FunctionConfig): + docker_image: str = "" + namespace: str = "default" + storage: Optional[MinioConfig] = None + url: str = "" + + @staticmethod + def deserialize(data: dict) -> KnativeFunctionConfig: + keys = list(KnativeFunctionConfig.__dataclass_fields__.keys()) + data = {k: v for k, v in data.items() if k in keys} + data["runtime"] = Runtime.deserialize(data["runtime"]) + data["storage"] = MinioConfig.deserialize(data["storage"]) + return KnativeFunctionConfig(**data) + + def serialize(self) -> dict: + return self.__dict__ + + @staticmethod + def from_benchmark(benchmark: Benchmark) -> KnativeFunctionConfig: + return super(KnativeFunctionConfig, KnativeFunctionConfig)._from_benchmark( + benchmark, KnativeFunctionConfig + ) + +class KnativeFunction(Function): + def __init__( + self, name: str, benchmark: str, code_package_hash: str, cfg: KnativeFunctionConfig + ): + super().__init__(benchmark, name, code_package_hash, cfg) + + @property + def config(self) -> KnativeFunctionConfig: + return cast(KnativeFunctionConfig, self._cfg) + + @staticmethod + def typename() -> str: + return "Knative.Function" + + def serialize(self) -> dict: + return {**super().serialize(), "config": self._cfg.serialize()} + + @staticmethod + def deserialize(cached_config: dict) -> KnativeFunction: + from sebs.faas.function import Trigger + from sebs.knative.triggers import KnativeLibraryTrigger, KnativeHTTPTrigger + + cfg = KnativeFunctionConfig.deserialize(cached_config["config"]) + ret = KnativeFunction( + cached_config["name"], cached_config["benchmark"], cached_config["hash"], cfg + ) + for trigger in cached_config["triggers"]: + trigger_type = cast( + Trigger, + {"Library": KnativeLibraryTrigger, "HTTP": KnativeHTTPTrigger}.get(trigger["type"]), + ) + assert trigger_type, "Unknown trigger type {}".format(trigger["type"]) + ret.add_trigger(trigger_type.deserialize(trigger)) + return ret diff --git a/sebs/knative/knative.py b/sebs/knative/knative.py new file mode 100644 index 00000000..41d7f055 --- /dev/null +++ b/sebs/knative/knative.py @@ -0,0 +1,240 @@ +from os import devnull +import subprocess +from flask import config +from sebs.faas.system import System +from sebs.faas.function import Function, Trigger, ExecutionResult +from sebs.faas.storage import PersistentStorage +from sebs.benchmark import Benchmark +from sebs.config import SeBSConfig +from sebs.cache import Cache +from sebs.utils import LoggingHandlers +from sebs.knative.storage import KnativeMinio +from sebs.knative.triggers import KnativeLibraryTrigger, KnativeHTTPTrigger +from sebs.faas.config import Resources +from typing import Dict, Tuple, Type, List, Optional +import docker +from .function import KnativeFunction, KnativeFunctionConfig +import uuid +from typing import cast + +from .config import KnativeConfig + +class KnativeSystem(System): + _config: KnativeConfig + + def __init__(self, system_config: SeBSConfig, config: KnativeConfig, cache_client: Cache, docker_client: docker.client, logger_handlers: LoggingHandlers): + super().__init__(system_config, cache_client, docker_client) + # Initialize any additional Knative-specific attributes here + self._config = config + self._logger_handlers = logger_handlers + + if self.config.resources.docker_username: + if self.config.resources.docker_registry: + docker_client.login( + username=self.config.resources.docker_username, + password=self.config.resources.docker_password, + registry=self.config.resources.docker_registry, + ) + else: + docker_client.login( + username=self.config.resources.docker_username, + password=self.config.resources.docker_password, + ) + + + @property + def config(self) -> KnativeConfig: + # Return the configuration specific to Knative + return self._config + + def get_knative_func_cmd(self) -> List[str]: + cmd = [self.config.knative_exec] + return cmd + + @staticmethod + def function_type() -> Type[Function]: + # Return the specific function type for Knative + return Function + + def get_storage(self, replace_existing: bool = False) -> PersistentStorage: + # Implementation of persistent storage retrieval for Knative + # This might involve creating a persistent volume or bucket in Knative's ecosystem + pass + + def package_code( + self, + directory: str, + language_name: str, + language_version: str, + benchmark: str, + is_cached: bool, + ) -> Tuple[str, int]: + """ + Package code for Knative platform by building a Docker image. + + Args: + - directory: Directory where the function code resides. + - language_name: Name of the programming language (e.g., Python). + - language_version: Version of the programming language. + - benchmark: Identifier for the benchmark or function. + - is_cached: Flag indicating if the code is cached. + + Returns: + - Tuple containing the Docker image name (tag) and its size. + """ + + # Generate a unique Docker image name/tag for this function + docker_image_name = f"{benchmark}:{language_version}" + + # Build Docker image from the specified directory + image, _ = self._docker_client.images.build(path=directory, tag=docker_image_name) + + # Retrieve size of the Docker image + image_size = image.attrs['Size'] + + # Return the Docker image name (tag) and its size + return docker_image_name, image_size + + + def create_function(self, code_package: Benchmark, func_name: str) -> "KnativeFunction": + self.logging.info("Creating Knative function.") + try: + # Check if the function already exists + knative_func_command = subprocess.run( + [*self.get_knative_func_cmd(), "list"], + stderr=subprocess.DEVNULL, + stdout=subprocess.PIPE, + ) + function_found = False + for line in knative_func_command.stdout.decode().split("\n"): + if line and func_name in line.split()[0]: + function_found = True + break + + if function_found: + self.logging.info(f"Function {func_name} already exists.") + # Logic for updating or handling existing function + # For now, just pass + pass + else: + try: + self.logging.info(f"Creating new Knative function {func_name}") + language = code_package.language_name + + # Create the function + subprocess.run( + ["func", "create", "-l", language, func_name], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + check=True, + ) + + # Deploy the function + subprocess.run( + ["func", "deploy", "--path", func_name], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + check=True, + ) + + # Retrieve the function URL + describe_command = [*self.get_knative_func_cmd(), "describe", func_name, "-o", "url"] + result = subprocess.run( + describe_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + function_url = result.stdout.decode().strip() + + # Create the KnativeFunctionConfig + function_cfg = KnativeFunctionConfig.from_benchmark(code_package) + function_cfg.storage = cast(KnativeMinio, self.get_storage()).config + function_cfg.url = function_url + + # Create the function object + res = KnativeFunction( + func_name, code_package.benchmark, code_package.hash, function_cfg + ) + + # Add HTTP trigger with the function URL + trigger = KnativeHTTPTrigger(func_name, function_url) + trigger.logging_handlers = self.logging_handlers + res.add_trigger(trigger) + + return res + + except subprocess.CalledProcessError as e: + self.logging.error(f"Error creating Knative function {func_name}.") + self.logging.error(f"Output: {e.stderr.decode('utf-8')}") + raise RuntimeError(e) + + except FileNotFoundError: + self.logging.error("Could not retrieve Knative functions - is path to func correct?") + raise RuntimeError("Failed to access func binary") + + + def update_function(self, function: Function, code_package: Benchmark): + self.logging.info(f"Updating an existing Knative function {function.name}.") + function = cast(KnativeFunction, function) + docker_image = self.system_config.benchmark_image_name( + self.name(), + code_package.benchmark, + code_package.language_name, + code_package.language_version, + ) + + try: + subprocess.run( + [ + "func", "deploy", + "--path", code_package.code_location, + "--image", docker_image, + "--name", function.name, + ], + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + check=True, + ) + function.config.docker_image = docker_image + + except FileNotFoundError as e: + self.logging.error("Could not update Knative function - is the 'func' CLI installed and configured correctly?") + raise RuntimeError(e) + except subprocess.CalledProcessError as e: + self.logging.error(f"Unknown error when running function update: {e}!") + self.logging.error("Ensure the SeBS cache is cleared if there are issues with Knative!") + self.logging.error(f"Output: {e.stderr.decode('utf-8')}") + raise RuntimeError(e) + + + def create_trigger(self, function: Function, trigger_type: Trigger.TriggerType) -> Trigger: + if trigger_type == Trigger.TriggerType.LIBRARY: + return function.triggers(Trigger.TriggerType.LIBRARY)[0] + elif trigger_type == Trigger.TriggerType.HTTP: + try: + response = subprocess.run( + ["func", "describe", function.name, "--output", "url"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + check=True, + ) + except FileNotFoundError as e: + self.logging.error( + "Could not retrieve Knative function configuration - is the 'func' CLI installed and configured correctly?" + ) + raise RuntimeError(e) + stdout = response.stdout.decode("utf-8") + url = stdout.strip() + trigger = KnativeHTTPTrigger(function.name, url) + trigger.logging_handlers = self.logging_handlers + function.add_trigger(trigger) + self.cache_client.update_function(function) + return trigger + else: + raise RuntimeError("Not supported!") + + + @staticmethod + def name() -> str: + return "Knative" diff --git a/sebs/knative/storage.py b/sebs/knative/storage.py new file mode 100644 index 00000000..0ef1c827 --- /dev/null +++ b/sebs/knative/storage.py @@ -0,0 +1,25 @@ +import docker +from sebs.faas.config import Resources +from sebs.storage import minio +from sebs.storage.config import MinioConfig +from sebs.cache import Cache + +class KnativeMinio(minio.Minio): + @staticmethod + def deployment_name() -> str: + return "knative" + + def __init__( + self, + docker_client: docker.client, + cache_client: Cache, + res: Resources, + replace_existing: bool, + ): + super().__init__(docker_client, cache_client, res, replace_existing) + + @staticmethod + def deserialize( + cached_config: MinioConfig, cache_client: Cache, resources: Resources + ) -> "KnativeMinio": + return super(KnativeMinio, KnativeMinio)._deserialize(cached_config, cache_client, resources, KnativeMinio) diff --git a/sebs/knative/triggers.py b/sebs/knative/triggers.py new file mode 100644 index 00000000..b1f01f05 --- /dev/null +++ b/sebs/knative/triggers.py @@ -0,0 +1,106 @@ +import concurrent.futures +import datetime +import json +import subprocess +from typing import Dict, List, Optional + +from sebs.faas.function import ExecutionResult, Trigger + +class KnativeLibraryTrigger(Trigger): + def __init__(self, fname: str, func_cmd: Optional[List[str]] = None): + super().__init__() + self.fname = fname + if func_cmd: + self._func_cmd = [*func_cmd, "invoke", "--target", "remote"] + + @staticmethod + def trigger_type() -> "Trigger.TriggerType": + return Trigger.TriggerType.LIBRARY + + @property + def func_cmd(self) -> List[str]: + assert self._func_cmd + return self._func_cmd + + @func_cmd.setter + def func_cmd(self, func_cmd: List[str]): + self._func_cmd = [*func_cmd, "invoke", "--target", "remote"] + + @staticmethod + def get_command(payload: dict) -> List[str]: + params = ["--data", json.dumps(payload)] + return params + + def sync_invoke(self, payload: dict) -> ExecutionResult: + command = self.func_cmd + self.get_command(payload) + error = None + try: + begin = datetime.datetime.now() + response = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + end = datetime.datetime.now() + parsed_response = response.stdout.decode("utf-8") + except (subprocess.CalledProcessError, FileNotFoundError) as e: + end = datetime.datetime.now() + error = e + + knative_result = ExecutionResult.from_times(begin, end) + if error is not None: + self.logging.error("Invocation of {} failed!".format(self.fname)) + knative_result.stats.failure = True + return knative_result + + return_content = json.loads(parsed_response) + knative_result.parse_benchmark_output(return_content) + return knative_result + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return {"type": "Library", "name": self.fname} + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return KnativeLibraryTrigger(obj["name"]) + + @staticmethod + def typename() -> str: + return "Knative.LibraryTrigger" + + +class KnativeHTTPTrigger(Trigger): + def __init__(self, fname: str, url: str): + super().__init__() + self.fname = fname + self.url = url + + @staticmethod + def typename() -> str: + return "Knative.HTTPTrigger" + + @staticmethod + def trigger_type() -> Trigger.TriggerType: + return Trigger.TriggerType.HTTP + + def sync_invoke(self, payload: dict) -> ExecutionResult: + self.logging.debug(f"Invoke function {self.url}") + return self._http_invoke(payload, self.url, False) + + def async_invoke(self, payload: dict) -> concurrent.futures.Future: + pool = concurrent.futures.ThreadPoolExecutor() + fut = pool.submit(self.sync_invoke, payload) + return fut + + def serialize(self) -> dict: + return {"type": "HTTP", "fname": self.fname, "url": self.url} + + @staticmethod + def deserialize(obj: dict) -> Trigger: + return KnativeHTTPTrigger(obj["fname"], obj["url"]) diff --git a/sebs/types.py b/sebs/types.py index 2f26117e..aafbac84 100644 --- a/sebs/types.py +++ b/sebs/types.py @@ -7,6 +7,7 @@ class Platforms(str, Enum): GCP = "gcp" LOCAL = "local" OPENWHISK = "openwhisk" + KNATIVE = "knative" class Storage(str, Enum):