Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Knative deployment support #206

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
54 changes: 54 additions & 0 deletions benchmarks/wrappers/knative/nodejs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const { CloudEvent, HTTP } = require('cloudevents');
const handler = require('./function').handler;

async function handle(context, event) {
const startTime = new Date();

try {
// Ensure event data is parsed correctly
const eventData = event ? event : context.body;
context.log.info(`Received event: ${JSON.stringify(eventData)}`);

// Call the handler function with the event data
const result = await handler(eventData);
const endTime = new Date();

context.log.info(`Function result: ${JSON.stringify(result)}`);
const resultTime = (endTime - startTime) / 1000; // Time in seconds

// Create a response
const response = {
begin: startTime.toISOString(),
end: endTime.toISOString(),
results_time: resultTime,
result: result
};

// Return the response
return {
data: response,
headers: { 'Content-Type': 'application/json' },
statusCode: 200
};
} catch (error) {
const endTime = new Date();
const resultTime = (endTime - startTime) / 1000; // Time in seconds

context.log.error(`Error - invocation failed! Reason: ${error.message}`);
const response = {
begin: startTime.toISOString(),
end: endTime.toISOString(),
results_time: resultTime,
result: `Error - invocation failed! Reason: ${error.message}`
};

// Return the error response
return {
data: response,
headers: { 'Content-Type': 'application/json' },
statusCode: 500
};
}
}

module.exports = handle;
63 changes: 63 additions & 0 deletions benchmarks/wrappers/knative/nodejs/storage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

const minio = require('minio'),
path = require('path'),
uuid = require('uuid'),
util = require('util'),
stream = require('stream'),
fs = require('fs');

class minio_storage {

constructor() {
let address = process.env.MINIO_STORAGE_CONNECTION_URL;
let access_key = process.env.MINIO_STORAGE_ACCESS_KEY;
let secret_key = process.env.MINIO_STORAGE_SECRET_KEY;

this.client = new minio.Client(
{
endPoint: address.split(':')[0],
port: parseInt(address.split(':')[1], 10),
accessKey: access_key,
secretKey: secret_key,
useSSL: false
}
);
}

unique_name(file) {
let name = path.parse(file);
let uuid_name = uuid.v4().split('-')[0];
return path.join(name.dir, util.format('%s.%s%s', name.name, uuid_name, name.ext));
}

upload(bucket, file, filepath) {
let uniqueName = this.unique_name(file);
return [uniqueName, this.client.fPutObject(bucket, uniqueName, filepath)];
};

download(bucket, file, filepath) {
return this.client.fGetObject(bucket, file, filepath);
};

uploadStream(bucket, file) {
var write_stream = new stream.PassThrough();
let uniqueName = this.unique_name(file);
let promise = this.client.putObject(bucket, uniqueName, write_stream, write_stream.size);
return [write_stream, promise, uniqueName];
};

downloadStream(bucket, file) {
var read_stream = new stream.PassThrough();
return this.client.getObject(bucket, file);
};

static get_instance() {
if(!this.instance) {
this.instance = new storage();
}
return this.instance;
}
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved


};
exports.storage = minio_storage;
41 changes: 41 additions & 0 deletions benchmarks/wrappers/knative/python/func.py
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
import datetime
from flask import jsonify
from parliament import Context
from function import handler


def main(context: Context):
logging.getLogger().setLevel(logging.INFO)

try:
# Extract JSON data from the request
event = context.request.json

begin = datetime.datetime.now()
# Pass the extracted JSON data to the handler function
ret = handler(event)
end = datetime.datetime.now()
logging.info(f"Function result: {ret}")
results_time = (end - begin) / datetime.timedelta(microseconds=1)

response = {
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"result": ret,
}

return jsonify(response), 200

except Exception as e:
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
end = datetime.datetime.now()
results_time = (end - begin) / datetime.timedelta(microseconds=1)
logging.error(f"Error - invocation failed! Reason: {e}")
response = {
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"result": f"Error - invocation failed! Reason: {e}",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
}
return jsonify(response), 500
80 changes: 80 additions & 0 deletions benchmarks/wrappers/knative/python/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os
import uuid
import json
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
import minio
import logging


class storage:
instance = None
client = None

def __init__(self):
try:
"""
Minio does not allow another way of configuring timeout for connection.
The rest of configuration is copied from source code of Minio.
"""
import urllib3
from datetime import timedelta

timeout = timedelta(seconds=1).seconds

mgr = urllib3.PoolManager(
timeout=urllib3.util.Timeout(connect=timeout, read=timeout),
maxsize=10,
retries=urllib3.Retry(
total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504]
)
)
self.client = minio.Minio(
os.getenv("MINIO_STORAGE_CONNECTION_URL"),
access_key=os.getenv("MINIO_STORAGE_ACCESS_KEY"),
secret_key=os.getenv("MINIO_STORAGE_SECRET_KEY"),
secure=False,
http_client=mgr
)
except Exception as e:
logging.info(e)
raise e
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def unique_name(name):
name, extension = os.path.splitext(name)
return '{name}.{random}{extension}'.format(
name=name,
extension=extension,
random=str(uuid.uuid4()).split('-')[0]
)


def upload(self, bucket, file, filepath):
key_name = storage.unique_name(file)
self.client.fput_object(bucket, key_name, filepath)
return key_name
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download(self, bucket, file, filepath):
self.client.fget_object(bucket, file, filepath)
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download_directory(self, bucket, prefix, path):
objects = self.client.list_objects(bucket, prefix, recursive=True)
for obj in objects:
file_name = obj.object_name
self.download(bucket, file_name, os.path.join(path, file_name))
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def upload_stream(self, bucket, file, bytes_data):
key_name = storage.unique_name(file)
self.client.put_object(
bucket, key_name, bytes_data, bytes_data.getbuffer().nbytes
)
return key_name
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download_stream(self, bucket, file):
data = self.client.get_object(bucket, file)
return data.read()
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def get_instance():
if storage.instance is None:
storage.instance = storage()
return storage.instance
42 changes: 42 additions & 0 deletions config/systems.json
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,47 @@
}
}
}
},
"knative": {
"languages": {
"python": {
"base_images": {
"3.9": "python:3.9-slim",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
"3.10": "python:3.10-slim"
},
"images": [
"build",
"run"
],
"username": "docker_user",
"deployment": {
"files": [
"func.py",
"storage.py"
],
"packages": {
"parliament-functions": "0.1.0"
}
}
},
"nodejs": {
"base_images": {
"20": "node:20",
"18": "node:18"
},
"images": [
"build",
"run"
],
"username": "docker_user",
"deployment": {
"files": [
"index.js",
"storage.js"
],
"packages": []
}
}
}
}
}
2 changes: 1 addition & 1 deletion sebs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def common_params(func):
@click.option(
"--deployment",
default=None,
type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk"]),
type=click.Choice(["azure", "aws", "gcp", "local", "openwhisk", "knative"]),
help="Cloud deployment to use.",
)
@click.option(
Expand Down
4 changes: 4 additions & 0 deletions sebs/faas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ def deserialize(config: dict, cache: Cache, handlers: LoggingHandlers) -> Config
from sebs.openwhisk.config import OpenWhiskConfig

implementations["openwhisk"] = OpenWhiskConfig.deserialize
if has_platform("knative"):
from sebs.knative.config import KnativeConfig

implementations["knative"] = KnativeConfig.deserialize
mcopik marked this conversation as resolved.
Show resolved Hide resolved
func = implementations.get(name)
assert func, "Unknown config type!"
return func(config[name] if name in config else config, cache, handlers)
Expand Down
2 changes: 2 additions & 0 deletions sebs/knative/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .knative import KnativeSystem # noqa
from .config import KnativeConfig # noqa
Loading