From 8fb584533a97939288eea6de3501c55db8da98c0 Mon Sep 17 00:00:00 2001 From: Pierre Lasorak Date: Thu, 24 Oct 2024 19:23:37 +0200 Subject: [PATCH 1/2] Enable the Process manager to run as a daemon, and the unified shell to connect to it --- src/drunc/process_manager/configuration.py | 26 +++- .../interface/process_manager.py | 4 +- src/drunc/unified_shell/shell.py | 125 +++++++++--------- 3 files changed, 91 insertions(+), 64 deletions(-) diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index 44c26a99..2707a468 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -79,4 +79,28 @@ def get_cla(db, session_uid, obj): from confmodel import daq_application_construct_commandline_parameters return daq_application_construct_commandline_parameters(db, session_uid, obj.id) - return obj.commandline_parameters \ No newline at end of file + return obj.commandline_parameters + + +def get_process_manager_configuration(process_manager): + import os + ## Make the configuration name finding easier + if os.path.splitext(process_manager)[1] != '.json': + process_manager += '.json' + ## If no scheme is provided, assume that it is an internal packaged configuration. + ## First check it's not an existing external file + if os.path.isfile(process_manager): + if urlparse(process_manager).scheme == '': + process_manager = 'file://' + process_manager + else: + ## Check if the file is in the list of packaged configurations + from importlib.resources import path + packaged_configurations = os.listdir(path('drunc.data.process_manager', '')) + if process_manager in packaged_configurations: + process_manager = 'file://' + str(path('drunc.data.process_manager', '')) + '/' + process_manager + else: + rprint(f"Configuration [red]{process_manager}[/red] not found, check filename spelling or use a packaged configuration as one of [green]{packaged_configurations}[/green]") + exit() + #from drunc.exceptions import DruncShellException + #raise DruncShellException(f"Configuration {process_manager} is not found in the package. The packaged configurations are {packaged_configurations}") + return process_manager diff --git a/src/drunc/process_manager/interface/process_manager.py b/src/drunc/process_manager/interface/process_manager.py index 11bb8d9a..e9a6a23f 100644 --- a/src/drunc/process_manager/interface/process_manager.py +++ b/src/drunc/process_manager/interface/process_manager.py @@ -61,7 +61,7 @@ async def server_shutdown(): # grace period, the server won't accept new connections and allow # existing RPCs to continue within the grace period. await server.stop(5) - pm.terminate() + pm._terminate_impl(None) _cleanup_coroutines.append(server_shutdown()) if ready_event is not None: @@ -95,4 +95,6 @@ async def server_shutdown(): help='Set the log level' ) def process_manager_cli(pm_conf:str, log_level): + from drunc.process_manager.configuration import get_process_manager_configuration + pm_conf = get_process_manager_configuration(pm_conf) run_pm(pm_conf, log_level) diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index 389ad966..ac73951b 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -5,47 +5,23 @@ from drunc.utils.utils import validate_command_facility import pathlib from drunc.process_manager.interface.cli_argument import validate_conf_string +from urllib.parse import urlparse +from rich import print as rprint +import logging @click_shell.shell(prompt='drunc-unified-shell > ', chain=True, hist_file=os.path.expanduser('~')+'/.drunc-unified-shell.history') @click.option('-l', '--log-level', type=click.Choice(log_levels.keys(), case_sensitive=False), default='INFO', help='Set the log level') -@click.argument('process-manager-configuration', type=str, nargs=1) +@click.argument('process-manager', type=str, nargs=1) @click.argument('boot-configuration', type=str, nargs=1) @click.argument('session-name', type=str, nargs=1) @click.pass_context def unified_shell( ctx, - process_manager_configuration:str, + process_manager:str, boot_configuration:str, session_name:str, log_level:str, ) -> None: - from drunc.utils.configuration import find_configuration - ctx.obj.boot_configuration = find_configuration(boot_configuration) - ctx.obj.session_name = session_name - - # Check if process_manager_configuration is a packaged config - from urllib.parse import urlparse - import os - ## Make the configuration name finding easier - if os.path.splitext(process_manager_configuration)[1] != '.json': - process_manager_configuration += '.json' - ## If no scheme is provided, assume that it is an internal packaged configuration. - ## First check it's not an existing external file - if os.path.isfile(process_manager_configuration): - if urlparse(process_manager_configuration).scheme == '': - process_manager_configuration = 'file://' + process_manager_configuration - else: - ## Check if the file is in the list of packaged configurations - from importlib.resources import path - packaged_configurations = os.listdir(path('drunc.data.process_manager', '')) - if process_manager_configuration in packaged_configurations: - process_manager_configuration = 'file://' + str(path('drunc.data.process_manager', '')) + '/' + process_manager_configuration - else: - from rich import print as rprint - rprint(f"Configuration [red]{process_manager_configuration}[/red] not found, check filename spelling or use a packaged configuration as one of [green]{packaged_configurations}[/green]") - exit() - #from drunc.exceptions import DruncShellException - #raise DruncShellException(f"Configuration {process_manager_configuration} is not found in the package. The packaged configurations are {packaged_configurations}") from drunc.utils.utils import update_log_level, pid_info_str, ignore_sigint_sighandler update_log_level(log_level) @@ -53,38 +29,54 @@ def unified_shell( logger = getLogger('unified_shell') logger.debug(pid_info_str()) - from drunc.process_manager.interface.process_manager import run_pm - import multiprocessing as mp - ready_event = mp.Event() - port = mp.Value('i', 0) - - ctx.obj.pm_process = mp.Process( - target = run_pm, - kwargs = { - "pm_conf": process_manager_configuration, - "log_level": log_level, - "ready_event": ready_event, - "signal_handler": ignore_sigint_sighandler, - # sigint gets sent to the PM, so we need to ignore it, otherwise everytime the user ctrl-c on the shell, the PM goes down - "generated_port": port, - }, - ) - ctx.obj.print(f'Starting process manager with configuration {process_manager_configuration}') - ctx.obj.pm_process.start() - from time import sleep - for _ in range(100): - if ready_event.is_set(): - break - sleep(0.1) + url_process_manager = urlparse(process_manager) + external_pm = True + + if url_process_manager.scheme != 'grpc': # slightly hacky to see if the process manager is an address + rprint(f"Spawning a process manager with configuration [green]{process_manager}[/green]") + external_pm = False + # Check if process_manager is a packaged config + from drunc.process_manager.configuration import get_process_manager_configuration + process_manager = get_process_manager_configuration(process_manager) + + from drunc.process_manager.interface.process_manager import run_pm + import multiprocessing as mp + ready_event = mp.Event() + port = mp.Value('i', 0) + + ctx.obj.pm_process = mp.Process( + target = run_pm, + kwargs = { + "pm_conf": process_manager, + "log_level": log_level, + "ready_event": ready_event, + "signal_handler": ignore_sigint_sighandler, + # sigint gets sent to the PM, so we need to ignore it, otherwise everytime the user ctrl-c on the shell, the PM goes down + "generated_port": port, + }, + ) + ctx.obj.print(f'Starting process manager with configuration {process_manager}') + ctx.obj.pm_process.start() + + + from time import sleep + for _ in range(100): + if ready_event.is_set(): + break + sleep(0.1) + + if not ready_event.is_set(): + from drunc.exceptions import DruncSetupException + raise DruncSetupException('Process manager did not start in time') - if not ready_event.is_set(): - from drunc.exceptions import DruncSetupException - raise DruncSetupException('Process manager did not start in time') + import socket + process_manager_address = f'localhost:{port.value}' - import socket - process_manager_address = f'localhost:{port.value}' + else: # user provided an address + rprint(f"Connecting to process manager at [green]{process_manager}[/green]") + process_manager_address = process_manager.replace('grpc://', '') # remove the grpc scheme ctx.obj.reset( address_pm = process_manager_address, @@ -100,10 +92,18 @@ def unified_shell( desc = desc.data except Exception as e: - ctx.obj.critical(f'Could not connect to the process manager') - if not ctx.obj.pm_process.is_alive(): + ctx.obj.critical(f'Could not connect to the process manager at the address [green]{process_manager_address}[/]', extra={'markup': True}) + if not external_pm and not ctx.obj.pm_process.is_alive(): ctx.obj.critical(f'The process manager is dead, exit code {ctx.obj.pm_process.exitcode}') - raise e + if logging.DEBUG == logging.root.level: + raise e + else: + exit() + + from drunc.utils.configuration import find_configuration + ctx.obj.boot_configuration = find_configuration(boot_configuration) + ctx.obj.session_name = session_name + ctx.obj.info(f'{process_manager_address} is \'{desc.name}.{desc.session}\' (name.session), starting listening...') if desc.HasField('broadcast'): @@ -113,8 +113,9 @@ def unified_shell( def cleanup(): ctx.obj.terminate() - ctx.obj.pm_process.terminate() - ctx.obj.pm_process.join() + if not external_pm: + ctx.obj.pm_process.terminate() + ctx.obj.pm_process.join() ctx.call_on_close(cleanup) From eacd4ce39fdcaf7379ded8bdf8237d9cf1bae16a Mon Sep 17 00:00:00 2001 From: Pierre Lasorak Date: Thu, 24 Oct 2024 19:49:21 +0200 Subject: [PATCH 2/2] remove the port from the configuration of the PM, and use the CLA instead. Also make the unified shell PM endpoint only listen localhost --- src/drunc/data/process_manager/k8s.json | 1 - src/drunc/data/process_manager/ssh-pocket-kafka.json | 1 - src/drunc/data/process_manager/ssh-standalone.json | 1 - src/drunc/process_manager/configuration.py | 2 -- src/drunc/process_manager/interface/process_manager.py | 9 +++++---- src/drunc/process_manager/process_manager.py | 4 ---- src/drunc/unified_shell/shell.py | 1 + 7 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/drunc/data/process_manager/k8s.json b/src/drunc/data/process_manager/k8s.json index fdd7eec4..485d6863 100644 --- a/src/drunc/data/process_manager/k8s.json +++ b/src/drunc/data/process_manager/k8s.json @@ -1,7 +1,6 @@ { "type": "k8s", "name": "K8sProcessManager", - "command_address": "0.0.0.0:0", "authoriser": { "type": "dummy" diff --git a/src/drunc/data/process_manager/ssh-pocket-kafka.json b/src/drunc/data/process_manager/ssh-pocket-kafka.json index 54fbe7b6..2433fb3f 100644 --- a/src/drunc/data/process_manager/ssh-pocket-kafka.json +++ b/src/drunc/data/process_manager/ssh-pocket-kafka.json @@ -1,7 +1,6 @@ { "type": "ssh", "name": "SSHProcessManager", - "command_address": "0.0.0.0:10054", "authoriser": { "type": "dummy" diff --git a/src/drunc/data/process_manager/ssh-standalone.json b/src/drunc/data/process_manager/ssh-standalone.json index 8eaf1c5e..c6958dda 100644 --- a/src/drunc/data/process_manager/ssh-standalone.json +++ b/src/drunc/data/process_manager/ssh-standalone.json @@ -1,7 +1,6 @@ { "type": "ssh", "name": "SSHProcessManager", - "command_address": "0.0.0.0:0", "authoriser": { "type": "dummy" diff --git a/src/drunc/process_manager/configuration.py b/src/drunc/process_manager/configuration.py index 2707a468..3c5454e4 100644 --- a/src/drunc/process_manager/configuration.py +++ b/src/drunc/process_manager/configuration.py @@ -36,8 +36,6 @@ def _parse_dict(self, data): from drunc.process_manager.exceptions import UnknownProcessManagerType raise UnknownProcessManagerType(data['type']) - new_data.command_address = data['command_address'] - return new_data def create_id(self, obj, segment=None, **kwargs): diff --git a/src/drunc/process_manager/interface/process_manager.py b/src/drunc/process_manager/interface/process_manager.py index e9a6a23f..8f81e852 100644 --- a/src/drunc/process_manager/interface/process_manager.py +++ b/src/drunc/process_manager/interface/process_manager.py @@ -7,7 +7,7 @@ _cleanup_coroutines = [] -def run_pm(pm_conf, log_level, ready_event=None, signal_handler=None, generated_port=None): +def run_pm(pm_conf, pm_address, log_level, ready_event=None, signal_handler=None, generated_port=None): if signal_handler is not None: signal_handler() @@ -72,7 +72,7 @@ async def server_shutdown(): try: loop.run_until_complete( - serve(pm.get_address()) + serve(pm_address) ) except Exception as e: import os @@ -84,6 +84,7 @@ async def server_shutdown(): @click.command() @click.argument('pm-conf', type=str) +@click.argument('pm-port', type=int) @click.option( '-l', '--log-level', @@ -94,7 +95,7 @@ async def server_shutdown(): default='INFO', help='Set the log level' ) -def process_manager_cli(pm_conf:str, log_level): +def process_manager_cli(pm_conf:str, pm_port:int, log_level): from drunc.process_manager.configuration import get_process_manager_configuration pm_conf = get_process_manager_configuration(pm_conf) - run_pm(pm_conf, log_level) + run_pm(pm_conf, f'0.0.0.0:{pm_port}', log_level) diff --git a/src/drunc/process_manager/process_manager.py b/src/drunc/process_manager/process_manager.py index fbbf8ac4..babc0a15 100644 --- a/src/drunc/process_manager/process_manager.py +++ b/src/drunc/process_manager/process_manager.py @@ -482,10 +482,6 @@ def _get_process_uid(self, query:ProcessQuery, in_boot_request:bool=False) -> [s return processes - - def get_address(self): - return self.configuration.data.command_address - @staticmethod def get(conf, **kwargs): from rich.console import Console diff --git a/src/drunc/unified_shell/shell.py b/src/drunc/unified_shell/shell.py index ac73951b..c77c24e5 100644 --- a/src/drunc/unified_shell/shell.py +++ b/src/drunc/unified_shell/shell.py @@ -50,6 +50,7 @@ def unified_shell( target = run_pm, kwargs = { "pm_conf": process_manager, + "pm_address": "localhost:0", "log_level": log_level, "ready_event": ready_event, "signal_handler": ignore_sigint_sighandler,