Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy in #270

Open
wants to merge 22 commits into
base: ec2-new-implementation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ scripts/*.sh
vmms/id_rsa*
courselabs/*
dockerTmp/*
id_ed25519
id_ed25519.pub
# config
config.py

Expand All @@ -27,3 +29,10 @@ pip-selfcheck.json

# Backup files
*.bak

# Tests
tests/my_tests/*
tests/sample_test/output
tests/sample_test/output/*
tests/sample_test/log.txt
tests/sample_test/summary.txt
14 changes: 12 additions & 2 deletions clients/tango-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@
"--notifyURL",
help="Complete URL for Tango to give callback to once job is complete.",
)
parser.add_argument(
"--callbackURL",
help="Complete URL for Tango to give callback to once job is complete.",
)
parser.add_argument(
"--disableNetwork",
action="store_true",
Expand All @@ -127,7 +131,8 @@
parser.add_argument("--accessKeyId", default="", help="AWS account access key ID")
parser.add_argument("--accessKey", default="", help="AWS account access key content")
parser.add_argument("--instanceType", default="", help="AWS EC2 instance type")

parser.add_argument("--ec2", action="store_true", help="Enable ec2SSH VMMS")
parser.add_argument("--stopBefore", default="", help="Stops the worker before a function is executed")

def checkKey():
if args.key is None:
Expand Down Expand Up @@ -210,11 +215,11 @@ def tango_upload():
if res != 0:
raise Exception("Invalid usage: [upload] " + upload_help)

f = open(args.filename)
dirs = args.filename.split("/")
filename = dirs[len(dirs) - 1]
header = {"Filename": filename}

f = open(args.filename, 'rb')
response = requests.post(
"%s://%s:%d/upload/%s/%s/"
% (_tango_protocol, args.server, args.port, args.key, args.courselab),
Expand Down Expand Up @@ -257,10 +262,15 @@ def tango_addJob():
if args.notifyURL:
requestObj["notifyURL"] = args.notifyURL

if args.callbackURL:
requestObj["callback_url"] = args.callbackURL

requestObj["accessKeyId"] = args.accessKeyId
requestObj["accessKey"] = args.accessKey
requestObj["disable_network"] = args.disableNetwork
requestObj["instanceType"] = args.instanceType
requestObj["ec2Vmms"] = args.ec2
requestObj["stopBefore"] = args.stopBefore

response = requests.post(
"%s://%s:%d/addJob/%s/%s/"
Expand Down
Binary file added dump.rdb
Binary file not shown.
18 changes: 16 additions & 2 deletions jobManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ def __manage(self):
if job.vm.ec2_vmms:
from vmms.ec2SSH import Ec2SSH

self.log.error('beforeec2ssh')
vmms = Ec2SSH(job.accessKeyId, job.accessKey)
self.log.error('afterec2ssh')

newVM = copy.deepcopy(job.vm)
newVM.id = self._getNextID()
try:
Expand All @@ -90,6 +93,7 @@ def __manage(self):
else:
# Try to find a vm on the free list and allocate it to
# the worker if successful.
self.log.error('huh?')
if Config.REUSE_VMS:
preVM = vm
else:
Expand All @@ -116,8 +120,11 @@ def __manage(self):
Worker(job, vmms, self.jobQueue, self.preallocator, preVM).start()

except Exception as err:
self.log.error("job failed during creation %d %s" % (job.id, str(err)))
self.jobQueue.makeDead(job.id, str(err))
if job is None:
self.log.info("job_manager: job is None")
else:
self.log.error("job failed during creation %d %s" % (job.id, str(err)))
self.jobQueue.makeDead(job.id, str(err))


if __name__ == "__main__":
Expand All @@ -133,6 +140,13 @@ def __manage(self):
tango.resetTango(tango.preallocator.vmms)
for key in tango.preallocator.machines.keys():
tango.preallocator.machines.set(key, [[], TangoQueue(key)])

# The above call sets the total pool empty. But the free pool which
# is a queue in redis, may not be empty. When the job manager restarts,
# resetting the free queue using the key doesn't change its content.
# Therefore we empty the queue, thus the free pool, to keep it consistent
# with the total pool.
tango.preallocator.machines.get(key)[1].make_empty()
jobs = JobManager(tango.jobQueue)

print("Starting the stand-alone Tango JobManager")
Expand Down
1 change: 1 addition & 0 deletions preallocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def update(self, vm, num):
self.lock.acquire()
if vm.name not in self.machines:
self.machines.set(vm.name, [[], TangoQueue(vm.name)])
self.machines.get(vm.name)[1].make_empty()
self.log.debug("Creating empty pool of %s instances" % (vm.name))
self.lock.release()

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ urllib3==1.26.19
docker==5.0.3
backoff==2.2.1
pytz
pyyaml
7 changes: 6 additions & 1 deletion restful_tango/tangoREST.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ def convertJobObj(self, dirName, jobObj):

ec2_vmms = False
if "ec2Vmms" in jobObj:
ec2_vmms = True
ec2_vmms = jobObj["ec2Vmms"]

stopBefore = ""
if "stopBefore" in jobObj:
stopBefore = jobObj["stopBefore"]

instance_type = None
if "instanceType" in jobObj and len(jobObj["instanceType"]) > 0:
Expand Down Expand Up @@ -198,6 +202,7 @@ def convertJobObj(self, dirName, jobObj):
accessKey=accessKey,
accessKeyId=accessKeyId,
disableNetwork=disableNetwork,
stopBefore=stopBefore
)

self.log.debug("inputFiles: %s" % [file.localFile for file in input])
Expand Down
8 changes: 8 additions & 0 deletions tangoObjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(
accessKeyId=None,
accessKey=None,
disableNetwork=None,
stopBefore="",
):
self.assigned = False
self.retries = 0
Expand All @@ -120,6 +121,7 @@ def __init__(
self.accessKeyId = accessKeyId
self.accessKey = accessKey
self.disableNetwork = disableNetwork
self.stopBefore = "stopBefore"

def __repr__(self):
self.syncRemote()
Expand Down Expand Up @@ -319,6 +321,12 @@ def remove(self, item):
def _clean(self):
self.__db.delete(self.key)

def make_empty(self):
while True:
item = self.__db.lpop(self.key)
if item is None:
break


# This is an abstract class that decides on
# if we should initiate a TangoRemoteDictionary or TangoNativeDictionary
Expand Down
1 change: 1 addition & 0 deletions tests/sample_test/expected_output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello world
2 changes: 2 additions & 0 deletions tests/sample_test/input/autograde-Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
autograde:
bash hello.sh
3 changes: 3 additions & 0 deletions tests/sample_test/input/hello.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

echo "Hello world"
10 changes: 10 additions & 0 deletions tests/sample_test/sample_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
num_submissions: 5
submission_delay: 0.1
autograder_image: ec2_213_llvm_14_s25
output_file: log.txt
tango_port: 3001
cli_path: /home/snarita/Autolab/Tango/clients/tango-cli.py
instance_type: t2.micro
timeout: 180
ec2: True
expected_output: expected_output.txt
158 changes: 158 additions & 0 deletions tests/stressTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import argparse
import subprocess
import time
import os
import sys

import asyncio
import tornado.web

import yaml

test_dir = ""
sub_num = 0
finished_tests = dict()
start_time = time.time()
expected_output = ""

def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"):
"""
Call in a loop to create terminal progress bar
@params:
iteration - Required : current iteration (Int)
total - Required : total iterations (Int)
prefix - Optional : prefix string (Str)
suffix - Optional : suffix string (Str)
decimals - Optional : positive number of decimals in percent complete (Int)
length - Optional : character length of bar (Int)
fill - Optional : bar fill character (Str)
printEnd - Optional : end character (e.g. "\r", "\r\n") (Str)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filledLength = int(length * iteration // total)
bar = fill * filledLength + '-' * (length - filledLength)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd)
# Print New Line on Complete
if iteration == total:
print()

def run_stress_test(num_submissions, submission_delay, autograder_image, output_file, tango_port, cli_path,
job_name, job_path, instance_type, timeout, ec2):
printProgressBar(0, num_submissions, prefix = 'Jobs Added:', suffix = 'Complete', length = 50)
with open(output_file, 'a') as f:
f.write(f"Stress testing with {num_submissions} submissions\n")

for i in range(1, num_submissions + 1):
command = [
'python3', cli_path,
'-P', str(tango_port),
'-k', 'test',
'-l', job_name,
'--runJob', job_path,
'--image', autograder_image,
'--instanceType', instance_type,
'--timeout', str(timeout),
'--callbackURL', ("http://localhost:8888/autograde_done?id=%d" % (i))
]
if ec2:
command += ['--ec2']
subprocess.run(command, stdout=f, stderr=f)
f.write(f"Submission {i} completed\n")
printProgressBar(i, num_submissions, prefix = 'Jobs Added:', suffix = 'Complete', length = 50)
if submission_delay > 0:
time.sleep(submission_delay)
print()

class AutogradeDoneHandler(tornado.web.RequestHandler):
def post(self):
global finished_tests
global test_dir
global sub_num
global start_time
id = self.get_query_argument("id")
fileBody = self.request.files["file"][0]["body"].decode()
scoreJson = fileBody.split("\n")[-2]
with open(os.path.join(test_dir, "output", "output%s.txt" % id), 'w') as f:
f.write(fileBody)
finished_tests[str(id)] = scoreJson
printProgressBar(len(finished_tests), sub_num, prefix = 'Tests Done:', suffix = 'Complete', length = 50)
if len(finished_tests) == sub_num:
self.write("ok")
print()
create_summary()
print("Test Summary in summary.txt")
sys.exit()
self.write("ok")

def create_summary():
success = []
failed = []
for i in range(1, sub_num + 1):
if expected_output == finished_tests[str(i)]:
success.append(i)
else:
failed.append(i)
with open(os.path.join(test_dir, "summary.txt"), 'w') as f:
f.write("Total Time: %d seconds\n" % (time.time() - start_time))
f.write("Total Succeeded: %d / %d\n" % (len(success), sub_num))
f.write("Total Failed: %d / %d\n" % (len(failed), sub_num))
f.write("\n===========================================================\n")
f.write("The expected value is:\n")
f.write(expected_output)
f.write("\n\n===========================================================\n")
f.write("Failed Cases:\n")
for i in range(1, len(failed)):
f.write("Test Case #%d: %s\n" % (i, finished_tests[str(i)]))

def make_app():
return tornado.web.Application([
(r"/autograde_done", AutogradeDoneHandler),
])

async def notifyServer():
app = make_app()
app.listen(8888)
printProgressBar(0, sub_num, prefix = 'Tests Done:', suffix = 'Complete', length = 50)
shutdown_event = asyncio.Event()
await shutdown_event.wait()

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Stress test script for Tango")
parser.add_argument('--test_dir', type=str, required=True, help="Directory to run the test in")

args = parser.parse_args()

dirname = os.path.basename(args.test_dir)

test_dir = args.test_dir

with open(os.path.join(args.test_dir, dirname + '.yaml'), 'r') as f:
data = yaml.load(f, Loader=yaml.SafeLoader)

with open(os.path.join(args.test_dir, data["expected_output"]), 'r') as f:
expected_output = f.read()

sub_num = data["num_submissions"]
finished_tests = dict()
start_time = time.time()

subprocess.run("rm -rf %s/output" % args.test_dir, shell=True)
subprocess.run("mkdir %s/output" % args.test_dir, shell=True)

print()

run_stress_test(
data["num_submissions"],
data["submission_delay"],
data["autograder_image"],
os.path.join(args.test_dir, data["output_file"]),
data["tango_port"],
data["cli_path"],
dirname,
os.path.join(args.test_dir, 'input'),
data["instance_type"],
data["timeout"],
data["ec2"]
)

asyncio.run(notifyServer())
Loading