Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Daemon Process Manager #286

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/drunc/data/process_manager/k8s.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"type": "k8s",
"name": "K8sProcessManager",
"command_address": "0.0.0.0:0",

"authoriser": {
"type": "dummy"
Expand Down
1 change: 0 additions & 1 deletion src/drunc/data/process_manager/ssh-pocket-kafka.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"type": "ssh",
"name": "SSHProcessManager",
"command_address": "0.0.0.0:10054",

"authoriser": {
"type": "dummy"
Expand Down
1 change: 0 additions & 1 deletion src/drunc/data/process_manager/ssh-standalone.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"type": "ssh",
"name": "SSHProcessManager",
"command_address": "0.0.0.0:0",

"authoriser": {
"type": "dummy"
Expand Down
28 changes: 25 additions & 3 deletions src/drunc/process_manager/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ def _parse_dict(self, data):
from drunc.process_manager.exceptions import UnknownProcessManagerType
raise UnknownProcessManagerType(data['type'])

new_data.command_address = data['command_address']

return new_data

def create_id(self, obj, segment=None, **kwargs):
Expand Down Expand Up @@ -79,4 +77,28 @@ def get_cla(db, session_uid, obj):
from confmodel import daq_application_construct_commandline_parameters
return daq_application_construct_commandline_parameters(db, session_uid, obj.id)

return obj.commandline_parameters
return obj.commandline_parameters


def get_process_manager_configuration(process_manager):
import os
## Make the configuration name finding easier
if os.path.splitext(process_manager)[1] != '.json':
process_manager += '.json'
## If no scheme is provided, assume that it is an internal packaged configuration.
## First check it's not an existing external file
if os.path.isfile(process_manager):
if urlparse(process_manager).scheme == '':
process_manager = 'file://' + process_manager
else:
## Check if the file is in the list of packaged configurations
from importlib.resources import path
packaged_configurations = os.listdir(path('drunc.data.process_manager', ''))
if process_manager in packaged_configurations:
process_manager = 'file://' + str(path('drunc.data.process_manager', '')) + '/' + process_manager
else:
rprint(f"Configuration [red]{process_manager}[/red] not found, check filename spelling or use a packaged configuration as one of [green]{packaged_configurations}[/green]")
exit()
#from drunc.exceptions import DruncShellException
#raise DruncShellException(f"Configuration {process_manager} is not found in the package. The packaged configurations are {packaged_configurations}")
return process_manager
13 changes: 8 additions & 5 deletions src/drunc/process_manager/interface/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

_cleanup_coroutines = []

def run_pm(pm_conf, log_level, ready_event=None, signal_handler=None, generated_port=None):
def run_pm(pm_conf, pm_address, log_level, ready_event=None, signal_handler=None, generated_port=None):
if signal_handler is not None:
signal_handler()

Expand Down Expand Up @@ -61,7 +61,7 @@ async def server_shutdown():
# grace period, the server won't accept new connections and allow
# existing RPCs to continue within the grace period.
await server.stop(5)
pm.terminate()
pm._terminate_impl(None)

_cleanup_coroutines.append(server_shutdown())
if ready_event is not None:
Expand All @@ -72,7 +72,7 @@ async def server_shutdown():

try:
loop.run_until_complete(
serve(pm.get_address())
serve(pm_address)
)
except Exception as e:
import os
Expand All @@ -84,6 +84,7 @@ async def server_shutdown():

@click.command()
@click.argument('pm-conf', type=str)
@click.argument('pm-port', type=int)
@click.option(
'-l',
'--log-level',
Expand All @@ -94,5 +95,7 @@ async def server_shutdown():
default='INFO',
help='Set the log level'
)
def process_manager_cli(pm_conf:str, log_level):
run_pm(pm_conf, log_level)
def process_manager_cli(pm_conf:str, pm_port:int, log_level):
from drunc.process_manager.configuration import get_process_manager_configuration
pm_conf = get_process_manager_configuration(pm_conf)
run_pm(pm_conf, f'0.0.0.0:{pm_port}', log_level)
4 changes: 0 additions & 4 deletions src/drunc/process_manager/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,6 @@ def _get_process_uid(self, query:ProcessQuery, in_boot_request:bool=False) -> [s

return processes


def get_address(self):
return self.configuration.data.command_address

@staticmethod
def get(conf, **kwargs):
from rich.console import Console
Expand Down
126 changes: 64 additions & 62 deletions src/drunc/unified_shell/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,86 +5,79 @@
from drunc.utils.utils import validate_command_facility
import pathlib
from drunc.process_manager.interface.cli_argument import validate_conf_string
from urllib.parse import urlparse
from rich import print as rprint
import logging

@click_shell.shell(prompt='drunc-unified-shell > ', chain=True, hist_file=os.path.expanduser('~')+'/.drunc-unified-shell.history')
@click.option('-l', '--log-level', type=click.Choice(log_levels.keys(), case_sensitive=False), default='INFO', help='Set the log level')
@click.argument('process-manager-configuration', type=str, nargs=1)
@click.argument('process-manager', type=str, nargs=1)
@click.argument('boot-configuration', type=str, nargs=1)
@click.argument('session-name', type=str, nargs=1)
@click.pass_context
def unified_shell(
ctx,
process_manager_configuration:str,
process_manager:str,
boot_configuration:str,
session_name:str,
log_level:str,
) -> None:
from drunc.utils.configuration import find_configuration
ctx.obj.boot_configuration = find_configuration(boot_configuration)
ctx.obj.session_name = session_name

# Check if process_manager_configuration is a packaged config
from urllib.parse import urlparse
import os
## Make the configuration name finding easier
if os.path.splitext(process_manager_configuration)[1] != '.json':
process_manager_configuration += '.json'
## If no scheme is provided, assume that it is an internal packaged configuration.
## First check it's not an existing external file
if os.path.isfile(process_manager_configuration):
if urlparse(process_manager_configuration).scheme == '':
process_manager_configuration = 'file://' + process_manager_configuration
else:
## Check if the file is in the list of packaged configurations
from importlib.resources import path
packaged_configurations = os.listdir(path('drunc.data.process_manager', ''))
if process_manager_configuration in packaged_configurations:
process_manager_configuration = 'file://' + str(path('drunc.data.process_manager', '')) + '/' + process_manager_configuration
else:
from rich import print as rprint
rprint(f"Configuration [red]{process_manager_configuration}[/red] not found, check filename spelling or use a packaged configuration as one of [green]{packaged_configurations}[/green]")
exit()
#from drunc.exceptions import DruncShellException
#raise DruncShellException(f"Configuration {process_manager_configuration} is not found in the package. The packaged configurations are {packaged_configurations}")

from drunc.utils.utils import update_log_level, pid_info_str, ignore_sigint_sighandler
update_log_level(log_level)
from logging import getLogger
logger = getLogger('unified_shell')
logger.debug(pid_info_str())

from drunc.process_manager.interface.process_manager import run_pm
import multiprocessing as mp
ready_event = mp.Event()
port = mp.Value('i', 0)

ctx.obj.pm_process = mp.Process(
target = run_pm,
kwargs = {
"pm_conf": process_manager_configuration,
"log_level": log_level,
"ready_event": ready_event,
"signal_handler": ignore_sigint_sighandler,
# sigint gets sent to the PM, so we need to ignore it, otherwise everytime the user ctrl-c on the shell, the PM goes down
"generated_port": port,
},
)
ctx.obj.print(f'Starting process manager with configuration {process_manager_configuration}')
ctx.obj.pm_process.start()


from time import sleep
for _ in range(100):
if ready_event.is_set():
break
sleep(0.1)
url_process_manager = urlparse(process_manager)
external_pm = True

if url_process_manager.scheme != 'grpc': # slightly hacky to see if the process manager is an address
rprint(f"Spawning a process manager with configuration [green]{process_manager}[/green]")
external_pm = False
# Check if process_manager is a packaged config
from drunc.process_manager.configuration import get_process_manager_configuration
process_manager = get_process_manager_configuration(process_manager)

from drunc.process_manager.interface.process_manager import run_pm
import multiprocessing as mp
ready_event = mp.Event()
port = mp.Value('i', 0)

ctx.obj.pm_process = mp.Process(
target = run_pm,
kwargs = {
"pm_conf": process_manager,
"pm_address": "localhost:0",
"log_level": log_level,
"ready_event": ready_event,
"signal_handler": ignore_sigint_sighandler,
# sigint gets sent to the PM, so we need to ignore it, otherwise everytime the user ctrl-c on the shell, the PM goes down
"generated_port": port,
},
)
ctx.obj.print(f'Starting process manager with configuration {process_manager}')
ctx.obj.pm_process.start()


from time import sleep
for _ in range(100):
if ready_event.is_set():
break
sleep(0.1)

if not ready_event.is_set():
from drunc.exceptions import DruncSetupException
raise DruncSetupException('Process manager did not start in time')

if not ready_event.is_set():
from drunc.exceptions import DruncSetupException
raise DruncSetupException('Process manager did not start in time')
import socket
process_manager_address = f'localhost:{port.value}'

import socket
process_manager_address = f'localhost:{port.value}'
else: # user provided an address
rprint(f"Connecting to process manager at [green]{process_manager}[/green]")
process_manager_address = process_manager.replace('grpc://', '') # remove the grpc scheme

ctx.obj.reset(
address_pm = process_manager_address,
Expand All @@ -100,10 +93,18 @@ def unified_shell(
desc = desc.data

except Exception as e:
ctx.obj.critical(f'Could not connect to the process manager')
if not ctx.obj.pm_process.is_alive():
ctx.obj.critical(f'Could not connect to the process manager at the address [green]{process_manager_address}[/]', extra={'markup': True})
if not external_pm and not ctx.obj.pm_process.is_alive():
ctx.obj.critical(f'The process manager is dead, exit code {ctx.obj.pm_process.exitcode}')
raise e
if logging.DEBUG == logging.root.level:
raise e
else:
exit()

from drunc.utils.configuration import find_configuration
ctx.obj.boot_configuration = find_configuration(boot_configuration)
ctx.obj.session_name = session_name


ctx.obj.info(f'{process_manager_address} is \'{desc.name}.{desc.session}\' (name.session), starting listening...')
if desc.HasField('broadcast'):
Expand All @@ -113,8 +114,9 @@ def unified_shell(

def cleanup():
ctx.obj.terminate()
ctx.obj.pm_process.terminate()
ctx.obj.pm_process.join()
if not external_pm:
ctx.obj.pm_process.terminate()
ctx.obj.pm_process.join()

ctx.call_on_close(cleanup)

Expand Down