Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Knative deployment support #206

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion benchmarks/000.microbenchmarks/010.sleep/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return { 'sleep': size_generators[size] }
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
def buckets_count():
return (0, 1)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return {'output-bucket': output_buckets[0]}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
def buckets_count():
return (0, 1)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return {'output-bucket': output_buckets[0]}
2 changes: 1 addition & 1 deletion benchmarks/000.microbenchmarks/040.server-reply/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
def buckets_count():
return (0, 0)

def generate_input(data_dir, size, input_buckets, output_buckets, upload_func):
def generate_input(data_dir, size, benchmarks_bucket, input_buckets, output_buckets, upload_func):
return { 'sleep': size_generators[size] }
61 changes: 61 additions & 0 deletions benchmarks/wrappers/knative/nodejs/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const {
CloudEvent,
HTTP
} = require('cloudevents');
const path = require('path');
const fs = require('fs');
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
const {
v4: uuidv4
} = require('uuid');

async function handle(context, event) {
const requestId = uuidv4();
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

// Ensure event data is parsed correctly
const eventData = event ? event : context.body;
context.log.info(`Received event: ${JSON.stringify(eventData)}`);

const func = require('/function/function.js');
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
mcopik marked this conversation as resolved.
Show resolved Hide resolved
const begin = Date.now() / 1000;
const start = process.hrtime();

try {
// Call the handler function with the event data
const ret = await func.handler(eventData);
const elapsed = process.hrtime(start);
const end = Date.now() / 1000;
const micro = elapsed[1] / 1e3 + elapsed[0] * 1e6;

let is_cold = false;
const fname = path.join('/tmp', 'cold_run');
if (!fs.existsSync(fname)) {
is_cold = true;
fs.closeSync(fs.openSync(fname, 'w'));
}
mcopik marked this conversation as resolved.
Show resolved Hide resolved

context.log.info(`Function result: ${JSON.stringify(ret)}`);

return {
begin: begin,
end: end,
compute_time: micro,
results_time: 0,
result: ret,
request_id: requestId,
is_cold: is_cold,
};
} catch (error) {
context.log.error(`Error - invocation failed! Reason: ${error.message}`);
return {
begin: begin,
end: Date.now() / 1000,
compute_time: process.hrtime(start),
results_time: 0,
result: `Error - invocation failed! Reason: ${error.message}`,
request_id: requestId,
is_cold: false,
};
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
}
}

exports.handle = handle;
60 changes: 60 additions & 0 deletions benchmarks/wrappers/knative/nodejs/storage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const minio = require('minio'),
path = require('path'),
uuid = require('uuid'),
util = require('util'),
stream = require('stream'),
fs = require('fs');
mcopik marked this conversation as resolved.
Show resolved Hide resolved

class minio_storage {

constructor() {
let address = process.env.MINIO_STORAGE_CONNECTION_URL;
let access_key = process.env.MINIO_STORAGE_ACCESS_KEY;
let secret_key = process.env.MINIO_STORAGE_SECRET_KEY;

this.client = new minio.Client({
endPoint: address.split(':')[0],
port: parseInt(address.split(':')[1], 10),
accessKey: access_key,
secretKey: secret_key,
useSSL: false
});
}
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

unique_name(file) {
let name = path.parse(file);
let uuid_name = uuid.v4().split('-')[0];
return path.join(name.dir, util.format('%s.%s%s', name.name, uuid_name, name.ext));
}

upload(bucket, file, filepath) {
let uniqueName = this.unique_name(file);
return [uniqueName, this.client.fPutObject(bucket, uniqueName, filepath)];
};

download(bucket, file, filepath) {
return this.client.fGetObject(bucket, file, filepath);
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
};

uploadStream(bucket, file) {
var write_stream = new stream.PassThrough();
let uniqueName = this.unique_name(file);
let promise = this.client.putObject(bucket, uniqueName, write_stream, write_stream.size);
return [write_stream, promise, uniqueName];
};

downloadStream(bucket, file) {
var read_stream = new stream.PassThrough();
return this.client.getObject(bucket, file);
};

static get_instance() {
if (!this.instance) {
this.instance = new storage();
}
return this.instance;
}
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved


};
exports.storage = minio_storage;
58 changes: 58 additions & 0 deletions benchmarks/wrappers/knative/python/func.py
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import datetime
import os
import uuid
from flask import jsonify
from parliament import Context
import minio
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved


def main(context: Context):
logging.getLogger().setLevel(logging.INFO)
begin = datetime.datetime.now() # Initialize begin outside the try block
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

event = context.request.json
logging.info(f"Received event: {event}")

request_id = str(uuid.uuid4()) # Generate a unique request ID

try:
from function import function

# Update the timestamp after extracting JSON data
begin = datetime.datetime.now()
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
# Pass the extracted JSON data to the handler function
ret = function.handler(event)
end = datetime.datetime.now()
logging.info("Function result: {}".format(ret))
log_data = {"result": ret["result"]}
if "measurement" in ret:
log_data["measurement"] = ret["measurement"]
results_time = (end - begin) / datetime.timedelta(microseconds=1)

is_cold = False
fname = "cold_run"
if not os.path.exists(fname):
is_cold = True
open(fname, "a").close()
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

return {
"request_id": request_id,
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"is_cold": is_cold,
"result": log_data,
}

except Exception as e:
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
end = datetime.datetime.now()
results_time = (end - begin) / datetime.timedelta(microseconds=1)
logging.error(f"Error - invocation failed! Reason: {e}")
return {
"request_id": request_id,
"begin": begin.strftime("%s.%f"),
"end": end.strftime("%s.%f"),
"results_time": results_time,
"result": f"Error - invocation failed! Reason: {e}",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
}
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
77 changes: 77 additions & 0 deletions benchmarks/wrappers/knative/python/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
import uuid
import json
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
import minio
import logging


class storage:
instance = None
client = None

def __init__(self):
try:
"""
Minio does not allow another way of configuring timeout for connection.
The rest of configuration is copied from source code of Minio.
"""
import urllib3
from datetime import timedelta

timeout = timedelta(seconds=1).seconds

mgr = urllib3.PoolManager(
timeout=urllib3.util.Timeout(connect=timeout, read=timeout),
maxsize=10,
retries=urllib3.Retry(
total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504]
),
)
self.client = minio.Minio(
os.getenv("MINIO_STORAGE_CONNECTION_URL"),
access_key=os.getenv("MINIO_STORAGE_ACCESS_KEY"),
secret_key=os.getenv("MINIO_STORAGE_SECRET_KEY"),
secure=False,
http_client=mgr,
)
except Exception as e:
logging.info(e)
raise e
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def unique_name(name):
name, extension = os.path.splitext(name)
return "{name}.{random}{extension}".format(
name=name, extension=extension, random=str(uuid.uuid4()).split("-")[0]
)

def upload(self, bucket, file, filepath):
key_name = storage.unique_name(file)
self.client.fput_object(bucket, key_name, filepath)
return key_name
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download(self, bucket, file, filepath):
self.client.fget_object(bucket, file, filepath)
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download_directory(self, bucket, prefix, path):
objects = self.client.list_objects(bucket, prefix, recursive=True)
for obj in objects:
file_name = obj.object_name
self.download(bucket, file_name, os.path.join(path, file_name))
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def upload_stream(self, bucket, file, bytes_data):
key_name = storage.unique_name(file)
self.client.put_object(
bucket, key_name, bytes_data, bytes_data.getbuffer().nbytes
)
return key_name
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

def download_stream(self, bucket, file):
data = self.client.get_object(bucket, file)
return data.read()
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def get_instance():
if storage.instance is None:
storage.instance = storage()
return storage.instance
20 changes: 20 additions & 0 deletions config/example.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@
"output_buckets": [],
"type": "minio"
}
},
"knative": {
"shutdownStorage": false,
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
"removeCluster": false,
"knativeExec": "func",
"docker_registry": {
"registry": "",
"username": "",
"password": ""
},
"storage": {
"address": "localhost:9000",
"mapped_port": 9000,
"access_key": "myaccesskey",
"secret_key": "mysecretkey",
"instance_id": "",
"input_buckets": [],
"output_buckets": [],
"type": "minio"
}
}
}
}
45 changes: 45 additions & 0 deletions config/systems.json
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,50 @@
}
}
}
},
"knative": {
"languages": {
"python": {
"base_images": {
"3.9": "python:3.9-slim",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
"3.10": "python:3.10-slim"
},
"images": [],
"username": "docker_user",
"deployment": {
"files": [
"func.py",
"storage.py"
],
"packages": {
"parliament-functions": "0.1.0",
"minio": "5.0.10",
"Pillow": "9.0.0",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
"python-igraph": "0.8.0",
"squiggle": "0.3.1"
}
}
},
"nodejs": {
"base_images": {
"20": "node:20",
"18": "node:18"
},
"images": [],
"username": "docker_user",
"deployment": {
"files": [
"index.js",
"storage.js"
],
"packages": {
"faas-js-runtime": "^2.2.2",
"minio": "7.0.16",
"mustache": "^3.2.1",
octonawish-akcodes marked this conversation as resolved.
Show resolved Hide resolved
"sharp": "^0.25"
}
}
}
}
}
}
Loading