From 0762816d8b8bdd76d121613dc285f9076cb2eaeb Mon Sep 17 00:00:00 2001 From: Maxim Masiutin Date: Sun, 19 Mar 2023 01:52:57 +0200 Subject: [PATCH 1/6] Server explains the reasons to the worker if no tasks are available --- server/fishtest/rundb.py | 28 +++++++++++++++++++++++++++- worker/worker.py | 22 +++++++++++++++++++++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 20d4da77c..6a98dfd86 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -766,6 +766,13 @@ def priority(run): # lower is better run_found = False + worker_low_memory = False + worker_too_many_threads = False + worker_too_few_threads = False + worker_no_binary = False + worker_tc_too_short = False + worker_core_limit_reached = False + for run in self.task_runs: if run["finished"]: continue @@ -774,9 +781,11 @@ def priority(run): # lower is better continue if run["args"]["threads"] > max_threads: + worker_too_many_threads = true continue if run["args"]["threads"] < min_threads: + worker_too_few_threads = true continue # Check if there aren't already enough workers @@ -811,6 +820,7 @@ def priority(run): # lower is better ) if need_base + need_tt > max_memory: + worker_low_memory = True continue # Github API limit... @@ -820,6 +830,7 @@ def priority(run): # lower is better and run["_id"] in self.worker_runs[unique_key] ) if not have_binary: + worker_no_binary = True continue # To avoid time losses in the case of large concurrency and short TC, @@ -837,6 +848,7 @@ def priority(run): # lower is better < 1.0 ) if tc_too_short: + worker_tc_too_short = True continue # Limit the number of cores. @@ -856,6 +868,7 @@ def priority(run): # lower is better break if core_limit_reached: + worker_core_limit_reached = True continue # If we make it here, it means we have found a run @@ -865,7 +878,20 @@ def priority(run): # lower is better # If there is no suitable run, tell the worker. if not run_found: - return {"task_waiting": False} + ret = {"task_waiting": False} + if worker_low_memory is True: + ret["worker_low_memory"] = True + if worker_too_many_threads is True: + ret["worker_too_many_threads"] = True + if worker_too_few_threads is True: + ret["worker_too_few_threads"] = True + if worker_no_binary is True: + ret["worker_no_binary"] = True + if worker_tc_too_short is True: + ret["worker_tc_too_short"] = True + if worker_core_limit_reached is True: + ret["worker_core_limit_reached"] = True + return ret # Now we create a new task for this run. opening_offset = 0 diff --git a/worker/worker.py b/worker/worker.py index c8863d862..9266bd898 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -717,6 +717,7 @@ def _get_help_string(self, action): "-t", "--min_threads", dest="min_threads", + metavar="MIN_THREADS", default=config.getint("parameters", "min_threads"), type=int, help="do not accept tasks with fewer threads than MIN_THREADS", @@ -1336,7 +1337,26 @@ def fetch_and_handle_task(worker_info, password, remote, lock_file, current_stat # No tasks ready for us yet, just wait... if "task_waiting" in req: - print("No tasks available at this time, waiting...") + reasons = [] + if "worker_low_memory" in req: + reasons.append("The MAX_MEMORY parameter is too low") + if "worker_too_many_threads" in req: + reasons.append("The CONCURRENCY parameter is too high") + if "worker_too_few_threads" in req: + reasons.append("The MIN_THREADS parameter is too low") + if "worker_no_binary" in req: + reasons.append("No binary or near Github API limit") + if "worker_tc_too_short" in req: + reasons.append("The TC is too short") + if "worker_core_limit_reached" in req: + reasons.append("Exceeded the limit of cores set by the server") + if len(reasons) == 0: + print("No tasks available at this time, waiting...") + else: + print("No suitable tasks available for the worker at this time, reason(s):") + for reason in reasons: + print(" - {}".format(reason)) + print("Waiting...") return False run, task_id = req["run"], req["task_id"] From 58d42bdf16ea6ecd0615c68f1b84a30030e0fbe8 Mon Sep 17 00:00:00 2001 From: Dubslow Date: Sun, 19 Mar 2023 01:42:59 -0500 Subject: [PATCH 2/6] Refactor maxim's request task error messages Use an Enum to greatly improve both maintainability and readability Untested --- server/fishtest/rundb.py | 57 +++++++++++++++++----------------------- worker/worker.py | 54 +++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 56 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 6a98dfd86..ae7f33ebe 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -1,5 +1,6 @@ import configparser import copy +import enum import math import os import random @@ -40,6 +41,16 @@ last_rundb = None +# This is duplicated in worker.py. Any changes here must be mirrored there. +class _RequestTaskErrors(enum.IntFlag): + MachineLimit = enum.auto() + LowThreads = enum.auto() + HighThreads = enum.auto() + LowMemory = enum.auto() + NoBinary = enum.auto() + SkipSTC = enum.auto() + ServerSide = enum.auto() + def get_port(): params = {} @@ -676,7 +687,7 @@ def request_task(self, worker_info): self.task_semaphore.release() else: print("request_task too busy", flush=True) - return {"task_waiting": False} + return {"errors": int(_RequestTaskErrors.ServerSide)} def sync_request_task(self, worker_info): unique_key = worker_info["unique_key"] @@ -755,23 +766,15 @@ def priority(run): # lower is better connections = connections + 1 if connections >= self.userdb.get_machine_limit(worker_info["username"]): - error = "Request_task: Machine limit reached for user {}".format( - worker_info["username"] - ) + error = f'Request_task: Machine limit reached for user {worker_info["username"]}' print(error, flush=True) - return {"task_waiting": False, "error": error} + return {"errors": int(_RequestTaskErrors.MachineLimit)} # Now go through the sorted list of unfinished runs. # We will add a task to the first run that is suitable. run_found = False - - worker_low_memory = False - worker_too_many_threads = False - worker_too_few_threads = False - worker_no_binary = False - worker_tc_too_short = False - worker_core_limit_reached = False + errors = _RequestTaskErrors(0) for run in self.task_runs: if run["finished"]: @@ -781,11 +784,11 @@ def priority(run): # lower is better continue if run["args"]["threads"] > max_threads: - worker_too_many_threads = true + errors |= _RequestTaskErrors.LowThreads continue if run["args"]["threads"] < min_threads: - worker_too_few_threads = true + errors |= _RequestTaskErrors.HighThreads continue # Check if there aren't already enough workers @@ -812,7 +815,7 @@ def priority(run): # lower is better need_tt += get_hash(run["args"]["new_options"]) need_tt += get_hash(run["args"]["base_options"]) need_tt *= max_threads // run["args"]["threads"] - # estime another 10MB per process, 30MB per thread, and 40MB for net as a base memory need besides hash + # estimate another 10MB per process, 30MB per thread, and 40MB for net as a base memory need besides hash need_base = ( 2 * (max_threads // run["args"]["threads"]) @@ -820,7 +823,7 @@ def priority(run): # lower is better ) if need_base + need_tt > max_memory: - worker_low_memory = True + errors |= _RequestTaskErrors.LowMemory continue # Github API limit... @@ -830,7 +833,7 @@ def priority(run): # lower is better and run["_id"] in self.worker_runs[unique_key] ) if not have_binary: - worker_no_binary = True + errors |= _RequestTaskErrors.NoBinary continue # To avoid time losses in the case of large concurrency and short TC, @@ -848,7 +851,7 @@ def priority(run): # lower is better < 1.0 ) if tc_too_short: - worker_tc_too_short = True + errors |= _RequestTaskErrors.SkipSTC continue # Limit the number of cores. @@ -868,7 +871,6 @@ def priority(run): # lower is better break if core_limit_reached: - worker_core_limit_reached = True continue # If we make it here, it means we have found a run @@ -878,20 +880,9 @@ def priority(run): # lower is better # If there is no suitable run, tell the worker. if not run_found: - ret = {"task_waiting": False} - if worker_low_memory is True: - ret["worker_low_memory"] = True - if worker_too_many_threads is True: - ret["worker_too_many_threads"] = True - if worker_too_few_threads is True: - ret["worker_too_few_threads"] = True - if worker_no_binary is True: - ret["worker_no_binary"] = True - if worker_tc_too_short is True: - ret["worker_tc_too_short"] = True - if worker_core_limit_reached is True: - ret["worker_core_limit_reached"] = True - return ret + if not errors: # No active tasks whatsoever, no fault of the worker + errors = _RequestTaskErrors.ServerSide + return {"errors": int(errors)} # Now we create a new task for this run. opening_offset = 0 diff --git a/worker/worker.py b/worker/worker.py index 9266bd898..2f11f88fa 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -2,6 +2,7 @@ import atexit import base64 import datetime +import enum import getpass import hashlib import json @@ -1285,6 +1286,30 @@ def verify_worker_version(remote, username, password): return True +# Duplicated from server's rundb.py. +# Ideally we would have a common/ folder next to server/ and worker/, to prevent this... +class _RequestTaskErrors(enum.IntFlag): + MachineLimit = enum.auto() + LowThreads = enum.auto() + HighThreads = enum.auto() + LowMemory = enum.auto() + NoBinary = enum.auto() + SkipSTC = enum.auto() + ServerSide = enum.auto() + + # __private_names are not enum-ized + __messages = {MachineLimit: "This user has reached the max machines limit.", + LowThreads: "An available run requires more than CONCURRENCY threads." + HighThreads: "An available run requires less than MIN_THREADS threads." + LowMemory: "An available run requires more than MAX_MEMORY memory." + NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run." + SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord." + ServerSide: "Server error or no active runs. Try again shortly." + } + + def __str__(self): + return self.__messages[self] + def fetch_and_handle_task(worker_info, password, remote, lock_file, current_state): # This function should normally not raise exceptions. # Unusual conditions are handled by returning False. @@ -1332,31 +1357,14 @@ def fetch_and_handle_task(worker_info, password, remote, lock_file, current_stat except WorkerException: return False # error message has already been printed - if "error" in req: - return False # likewise - # No tasks ready for us yet, just wait... - if "task_waiting" in req: - reasons = [] - if "worker_low_memory" in req: - reasons.append("The MAX_MEMORY parameter is too low") - if "worker_too_many_threads" in req: - reasons.append("The CONCURRENCY parameter is too high") - if "worker_too_few_threads" in req: - reasons.append("The MIN_THREADS parameter is too low") - if "worker_no_binary" in req: - reasons.append("No binary or near Github API limit") - if "worker_tc_too_short" in req: - reasons.append("The TC is too short") - if "worker_core_limit_reached" in req: - reasons.append("Exceeded the limit of cores set by the server") - if len(reasons) == 0: - print("No tasks available at this time, waiting...") + if "errors" in req and req["errors"]: + errors = _RequestTaskErrors(req["errors"]) + if _RequestTaskErrors.ServerSide in errors: + print(_RequestTaskErrors.ServerSide) else: - print("No suitable tasks available for the worker at this time, reason(s):") - for reason in reasons: - print(" - {}".format(reason)) - print("Waiting...") + print(f"No active tasks suitable for the worker at this time:\n - {'\n - '.join(str(e) for e in errors)}") + print("Waiting...") return False run, task_id = req["run"], req["task_id"] From 8f5b56717786b4acd25abce41d971472cc7e98c8 Mon Sep 17 00:00:00 2001 From: Dubslow Date: Sun, 19 Mar 2023 05:34:12 -0500 Subject: [PATCH 3/6] Remove all message interpretation from worker to server Per vdbergh's comments Still untested --- server/fishtest/rundb.py | 27 +++++++++++++++++++++------ worker/worker.py | 34 ++-------------------------------- 2 files changed, 23 insertions(+), 38 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index ae7f33ebe..6a1eb9a35 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -41,8 +41,8 @@ last_rundb = None -# This is duplicated in worker.py. Any changes here must be mirrored there. -class _RequestTaskErrors(enum.IntFlag): +# Just a bit of low-maintenance, easy-to-read bookkeeping +class _RequestTaskErrors(enum.Flag): MachineLimit = enum.auto() LowThreads = enum.auto() HighThreads = enum.auto() @@ -51,6 +51,20 @@ class _RequestTaskErrors(enum.IntFlag): SkipSTC = enum.auto() ServerSide = enum.auto() + # __private_names are not enum-ized + # These messages refer to worker command line options, and so need to be updated as the worker is. + __messages = {MachineLimit: "This user has reached the max machines limit.", + LowThreads: "An available run requires more than CONCURRENCY threads." + HighThreads: "An available run requires less than MIN_THREADS threads." + LowMemory: "An available run requires more than MAX_MEMORY memory." + NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run." + SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord." + ServerSide: "Server error or no active runs. Try again shortly." + } + + def __str__(self): + return self.__messages[self] + def get_port(): params = {} @@ -687,10 +701,11 @@ def request_task(self, worker_info): self.task_semaphore.release() else: print("request_task too busy", flush=True) - return {"errors": int(_RequestTaskErrors.ServerSide)} + return {"error_msg": _msg_sep + str(_RequestTaskErrors.ServerSide)} def sync_request_task(self, worker_info): unique_key = worker_info["unique_key"] + _msg_sep = '\n - ' # We get the list of unfinished runs. # To limit db access the list is cached for @@ -768,13 +783,13 @@ def priority(run): # lower is better if connections >= self.userdb.get_machine_limit(worker_info["username"]): error = f'Request_task: Machine limit reached for user {worker_info["username"]}' print(error, flush=True) - return {"errors": int(_RequestTaskErrors.MachineLimit)} + return {"error_msg": _msg_sep + str(_RequestTaskErrors.MachineLimit)} # Now go through the sorted list of unfinished runs. # We will add a task to the first run that is suitable. run_found = False - errors = _RequestTaskErrors(0) + errors = _RequestTaskErrors(0) # Ignored when run_found for run in self.task_runs: if run["finished"]: @@ -882,7 +897,7 @@ def priority(run): # lower is better if not run_found: if not errors: # No active tasks whatsoever, no fault of the worker errors = _RequestTaskErrors.ServerSide - return {"errors": int(errors)} + return {"error_msg": _msg_sep + _msg_sep.join(str(e) for e in errors)} # Now we create a new task for this run. opening_offset = 0 diff --git a/worker/worker.py b/worker/worker.py index 2f11f88fa..7fda6f780 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -2,7 +2,6 @@ import atexit import base64 import datetime -import enum import getpass import hashlib import json @@ -1286,30 +1285,6 @@ def verify_worker_version(remote, username, password): return True -# Duplicated from server's rundb.py. -# Ideally we would have a common/ folder next to server/ and worker/, to prevent this... -class _RequestTaskErrors(enum.IntFlag): - MachineLimit = enum.auto() - LowThreads = enum.auto() - HighThreads = enum.auto() - LowMemory = enum.auto() - NoBinary = enum.auto() - SkipSTC = enum.auto() - ServerSide = enum.auto() - - # __private_names are not enum-ized - __messages = {MachineLimit: "This user has reached the max machines limit.", - LowThreads: "An available run requires more than CONCURRENCY threads." - HighThreads: "An available run requires less than MIN_THREADS threads." - LowMemory: "An available run requires more than MAX_MEMORY memory." - NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run." - SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord." - ServerSide: "Server error or no active runs. Try again shortly." - } - - def __str__(self): - return self.__messages[self] - def fetch_and_handle_task(worker_info, password, remote, lock_file, current_state): # This function should normally not raise exceptions. # Unusual conditions are handled by returning False. @@ -1358,13 +1333,8 @@ def fetch_and_handle_task(worker_info, password, remote, lock_file, current_stat return False # error message has already been printed # No tasks ready for us yet, just wait... - if "errors" in req and req["errors"]: - errors = _RequestTaskErrors(req["errors"]) - if _RequestTaskErrors.ServerSide in errors: - print(_RequestTaskErrors.ServerSide) - else: - print(f"No active tasks suitable for the worker at this time:\n - {'\n - '.join(str(e) for e in errors)}") - print("Waiting...") + if "error_msg" in req: + print(f"Request task failure:{req['error_msg']}\nWaiting...") return False run, task_id = req["run"], req["task_id"] From e829cd954fe3ccc74826ed07bd9b206694f59100 Mon Sep 17 00:00:00 2001 From: Maxim Masiutin Date: Sun, 9 Apr 2023 01:12:33 +0300 Subject: [PATCH 4/6] updated sri.txt hashes --- worker/sri.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/sri.txt b/worker/sri.txt index 2689633b1..8a3dc8dc2 100644 --- a/worker/sri.txt +++ b/worker/sri.txt @@ -1 +1 @@ -{"__version": 197, "updater.py": "PHFUVXcoxBFiW2hTqFN5q5WdAw2pK8uzFC7hyMUC3cLY5bGZPhL8TBGThtqDmcXd", "worker.py": "/Lu4ip+T1nwF1Wi7yEIIfhUwn0VPz6P7Z/flcnJWIWLjxliLmyLFJAoYHTStaShc", "games.py": "U/LQtRw/dlRyIgOYz+ioiACLnn9Qu/laZOaenm61HHglhcAYpVysgNLlO9bvhFGD"} +{"__version": 197, "updater.py": "PHFUVXcoxBFiW2hTqFN5q5WdAw2pK8uzFC7hyMUC3cLY5bGZPhL8TBGThtqDmcXd", "worker.py": "43XRSVuU4ZaQa3SFxg20zjaKGD0xZMeGwuBBmWYryW6yhEN72cKEBjNNnZb+xGgq", "games.py": "U/LQtRw/dlRyIgOYz+ioiACLnn9Qu/laZOaenm61HHglhcAYpVysgNLlO9bvhFGD"} From 91cb772d3af31eef1ec5d8f71fd0cf56ae2b55d3 Mon Sep 17 00:00:00 2001 From: Maxim Masiutin Date: Sun, 9 Apr 2023 01:39:54 +0300 Subject: [PATCH 5/6] fixed syntax errors; used pylint --- server/fishtest/rundb.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 6a1eb9a35..4db8fc381 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -54,11 +54,11 @@ class _RequestTaskErrors(enum.Flag): # __private_names are not enum-ized # These messages refer to worker command line options, and so need to be updated as the worker is. __messages = {MachineLimit: "This user has reached the max machines limit.", - LowThreads: "An available run requires more than CONCURRENCY threads." - HighThreads: "An available run requires less than MIN_THREADS threads." - LowMemory: "An available run requires more than MAX_MEMORY memory." - NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run." - SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord." + LowThreads: "An available run requires more than CONCURRENCY threads.", + HighThreads: "An available run requires less than MIN_THREADS threads.", + LowMemory: "An available run requires more than MAX_MEMORY memory.", + NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run.", + SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord.", ServerSide: "Server error or no active runs. Try again shortly." } From fe52d55de9285122c75ceb3adea25be5954725f9 Mon Sep 17 00:00:00 2001 From: Maxim Masiutin Date: Sun, 9 Apr 2023 01:45:22 +0300 Subject: [PATCH 6/6] Used black --- server/fishtest/__init__.py | 3 +-- server/fishtest/rundb.py | 36 +++++++++++++++++++----------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/server/fishtest/__init__.py b/server/fishtest/__init__.py index 3f31352a4..719850769 100644 --- a/server/fishtest/__init__.py +++ b/server/fishtest/__init__.py @@ -2,6 +2,7 @@ import hashlib from pathlib import Path +from fishtest import helpers from fishtest.rundb import RunDb from pyramid.authentication import AuthTktAuthenticationPolicy from pyramid.authorization import ACLAuthorizationPolicy @@ -9,8 +10,6 @@ from pyramid.events import BeforeRender, NewRequest from pyramid.session import SignedCookieSessionFactory -from fishtest import helpers - def main(global_config, **settings): """This function returns a Pyramid WSGI application.""" diff --git a/server/fishtest/rundb.py b/server/fishtest/rundb.py index 4db8fc381..f5ab6f05b 100644 --- a/server/fishtest/rundb.py +++ b/server/fishtest/rundb.py @@ -41,26 +41,28 @@ last_rundb = None + # Just a bit of low-maintenance, easy-to-read bookkeeping class _RequestTaskErrors(enum.Flag): MachineLimit = enum.auto() - LowThreads = enum.auto() - HighThreads = enum.auto() - LowMemory = enum.auto() - NoBinary = enum.auto() - SkipSTC = enum.auto() - ServerSide = enum.auto() + LowThreads = enum.auto() + HighThreads = enum.auto() + LowMemory = enum.auto() + NoBinary = enum.auto() + SkipSTC = enum.auto() + ServerSide = enum.auto() # __private_names are not enum-ized # These messages refer to worker command line options, and so need to be updated as the worker is. - __messages = {MachineLimit: "This user has reached the max machines limit.", - LowThreads: "An available run requires more than CONCURRENCY threads.", - HighThreads: "An available run requires less than MIN_THREADS threads.", - LowMemory: "An available run requires more than MAX_MEMORY memory.", - NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run.", - SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord.", - ServerSide: "Server error or no active runs. Try again shortly." - } + __messages = { + MachineLimit: "This user has reached the max machines limit.", + LowThreads: "An available run requires more than CONCURRENCY threads.", + HighThreads: "An available run requires less than MIN_THREADS threads.", + LowMemory: "An available run requires more than MAX_MEMORY memory.", + NoBinary: "This worker has exceeded its GitHub API limit, and has no local binary for an availabe run.", + SkipSTC: "An available run is at STC, requiring less than CONCURRENCY threads due to cutechess issues. Consider splitting this worker. See Discord.", + ServerSide: "Server error or no active runs. Try again shortly.", + } def __str__(self): return self.__messages[self] @@ -705,7 +707,7 @@ def request_task(self, worker_info): def sync_request_task(self, worker_info): unique_key = worker_info["unique_key"] - _msg_sep = '\n - ' + _msg_sep = "\n - " # We get the list of unfinished runs. # To limit db access the list is cached for @@ -789,7 +791,7 @@ def priority(run): # lower is better # We will add a task to the first run that is suitable. run_found = False - errors = _RequestTaskErrors(0) # Ignored when run_found + errors = _RequestTaskErrors(0) # Ignored when run_found for run in self.task_runs: if run["finished"]: @@ -895,7 +897,7 @@ def priority(run): # lower is better # If there is no suitable run, tell the worker. if not run_found: - if not errors: # No active tasks whatsoever, no fault of the worker + if not errors: # No active tasks whatsoever, no fault of the worker errors = _RequestTaskErrors.ServerSide return {"error_msg": _msg_sep + _msg_sep.join(str(e) for e in errors)}