diff --git a/paasta_tools/cli/cmds/spark_run.py b/paasta_tools/cli/cmds/spark_run.py index a7da6b4264..72ce0bf1ed 100644 --- a/paasta_tools/cli/cmds/spark_run.py +++ b/paasta_tools/cli/cmds/spark_run.py @@ -6,7 +6,6 @@ import shlex import socket import sys -import uuid from typing import Any from typing import Dict from typing import List @@ -15,7 +14,6 @@ from typing import Tuple from typing import Union -import yaml from service_configuration_lib import read_service_configuration from service_configuration_lib import read_yaml_file from service_configuration_lib import spark_config @@ -31,7 +29,9 @@ from paasta_tools.cli.utils import lazy_choices_completer from paasta_tools.cli.utils import list_instances from paasta_tools.clusterman import get_clusterman_metrics -from paasta_tools.kubernetes_tools import limit_size_with_hash +from paasta_tools.spark_tools import auto_add_timeout_for_spark_job +from paasta_tools.spark_tools import create_spark_config_str +from paasta_tools.spark_tools import DEFAULT_SPARK_RUNTIME_TIMEOUT from paasta_tools.spark_tools import DEFAULT_SPARK_SERVICE from paasta_tools.spark_tools import get_volumes_from_spark_k8s_configs from paasta_tools.spark_tools import get_webui_url @@ -39,6 +39,7 @@ from paasta_tools.utils import _run from paasta_tools.utils import DEFAULT_SOA_DIR from paasta_tools.utils import filter_templates_from_config +from paasta_tools.utils import get_k8s_url_for_cluster from paasta_tools.utils import get_possible_launched_by_user_variable_from_env from paasta_tools.utils import get_username from paasta_tools.utils import InstanceConfig @@ -50,7 +51,9 @@ from paasta_tools.utils import NoDockerImageError from paasta_tools.utils import PaastaColors from paasta_tools.utils import PaastaNotConfiguredError +from paasta_tools.utils import PoolsNotConfiguredError from paasta_tools.utils import SystemPaastaConfig +from paasta_tools.utils import validate_pool DEFAULT_AWS_REGION = "us-west-2" @@ -69,42 +72,11 @@ # Extra room for memory overhead and for any other running inside container DOCKER_RESOURCE_ADJUSTMENT_FACTOR = 2 -POD_TEMPLATE_DIR = "/nail/tmp" -POD_TEMPLATE_PATH = "/nail/tmp/spark-pt-{file_uuid}.yaml" -DEFAULT_RUNTIME_TIMEOUT = "12h" -DEFAILT_AWS_PROFILE = "default" - -POD_TEMPLATE = """ -apiVersion: v1 -kind: Pod -metadata: - labels: - spark: {spark_pod_label} -spec: - dnsPolicy: Default - affinity: - podAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 95 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: spark - operator: In - values: - - {spark_pod_label} - topologyKey: topology.kubernetes.io/hostname -""" - -deprecated_opts = { +DEFAULT_AWS_PROFILE = "default" + +DEPRECATED_OPTS = { "j": "spark.jars", "jars": "spark.jars", - "max-cores": "spark.cores.max", - "executor-cores": "spark.executor.cores", - "executor-memory": "spark.executor.memory", - "driver-max-result-size": "spark.driver.maxResultSize", - "driver-cores": "spark.driver.cores", - "driver-memory": "spark.driver.memory", } SPARK_COMMANDS = {"pyspark", "spark-submit"} @@ -113,15 +85,20 @@ class DeprecatedAction(argparse.Action): + def __init__(self, option_strings, dest, nargs="?", **kwargs): + super().__init__(option_strings, dest, nargs=nargs, **kwargs) + def __call__(self, parser, namespace, values, option_string=None): print( PaastaColors.red( - "Use of {} is deprecated. Please use {}=value in --spark-args.".format( - option_string, deprecated_opts[option_string.strip("-")] + f"Use of {option_string} is deprecated. " + + ( + f"Please use {DEPRECATED_OPTS.get(option_string.strip('-'), '')}=value in --spark-args." + if option_string.strip("-") in DEPRECATED_OPTS + else "" ) ) ) - sys.exit(1) def add_subparser(subparsers): @@ -136,6 +113,44 @@ def add_subparser(subparsers): ), formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) + # Deprecated args kept to avoid failures + # TODO: Remove these deprecated args later + list_parser.add_argument( + "--jars", + help=argparse.SUPPRESS, + action=DeprecatedAction, + ) + list_parser.add_argument( + "--executor-memory", + help=argparse.SUPPRESS, + action=DeprecatedAction, + ) + list_parser.add_argument( + "--executor-cores", + help=argparse.SUPPRESS, + action=DeprecatedAction, + ) + list_parser.add_argument( + "--max-cores", + help=argparse.SUPPRESS, + action=DeprecatedAction, + ) + list_parser.add_argument( + "-e", + "--enable-compact-bin-packing", + help=argparse.SUPPRESS, + action=DeprecatedAction, + ) + list_parser.add_argument( + "--enable-dra", + help=argparse.SUPPRESS, + action=DeprecatedAction, + ) + list_parser.add_argument( + "--force-use-eks", + help=argparse.SUPPRESS, + action=DeprecatedAction, + ) group = list_parser.add_mutually_exclusive_group() group.add_argument( @@ -150,26 +165,6 @@ def add_subparser(subparsers): "--image", help="Use the provided image to start the Spark driver and executors.", ) - - list_parser.add_argument( - "-e", - "--enable-compact-bin-packing", - help=( - "Enabling compact bin packing will try to ensure executors are scheduled on the same nodes. Requires --cluster-manager to be kubernetes." - " Always true by default, keep around for backward compability." - ), - action="store_true", - default=True, - ) - list_parser.add_argument( - "--disable-compact-bin-packing", - help=( - "Disable compact bin packing. Requires --cluster-manager to be kubernetes. Note: this option is only for advanced Spark configurations," - " don't use it unless you've been instructed to do so." - ), - action="store_true", - default=False, - ) list_parser.add_argument( "--docker-memory-limit", help=( @@ -288,9 +283,9 @@ def add_subparser(subparsers): list_parser.add_argument( "--timeout-job-runtime", type=str, - help="Timeout value which will be added before spark-submit. Job will exit if it doesn't " - "finishes in given runtime. Recommended value: 2 * expected runtime. Example: 1h, 30m {DEFAULT_RUNTIME_TIMEOUT}", - default=DEFAULT_RUNTIME_TIMEOUT, + help="Timeout value which will be added before spark-submit. Job will exit if it doesn't finish in given " + "runtime. Recommended value: 2 * expected runtime. Example: 1h, 30m", + default=DEFAULT_SPARK_RUNTIME_TIMEOUT, ) list_parser.add_argument( @@ -303,8 +298,9 @@ def add_subparser(subparsers): list_parser.add_argument( "--spark-args", - help="Spark configurations documented in https://spark.apache.org/docs/latest/configuration.html, separated by space. " - 'For example, --spark-args "spark.executor.cores=1 spark.executor.memory=7g spark.executor.instances=2".', + help="Spark configurations documented in https://spark.apache.org/docs/latest/configuration.html, " + 'separated by space. For example, --spark-args "spark.executor.cores=1 spark.executor.memory=7g ' + 'spark.executor.instances=2".', ) list_parser.add_argument( @@ -329,16 +325,6 @@ def add_subparser(subparsers): default=CLUSTER_MANAGER_K8S, ) - list_parser.add_argument( - "--enable-dra", - help=( - "[DEPRECATED] Enable Dynamic Resource Allocation (DRA) for the Spark job as documented in (y/spark-dra)." - "DRA is enabled by default now. This config is a no-op operation and recommended to be removed." - ), - action="store_true", - default=False, - ) - list_parser.add_argument( "--tronfig", help="Load the Tron config yaml. Use with --job-id.", @@ -353,54 +339,6 @@ def add_subparser(subparsers): default=None, ) - k8s_target_cluster_type_group = list_parser.add_mutually_exclusive_group() - k8s_target_cluster_type_group.add_argument( - "--force-use-eks", - help="Use the EKS version of the target cluster rather than the Yelp-managed target cluster", - action="store_true", - dest="use_eks_override", - # We'll take a boolean value to mean that we should honor what the user wants, and None as using - # the CI-provided default - default=None, - ) - k8s_target_cluster_type_group.add_argument( - "--force-no-use-eks", - help="Use the Yelp-managed version of the target cluster rather than the AWS-managed EKS target cluster", - action="store_false", - dest="use_eks_override", - # We'll take a boolean value to mean that we should honor what the user wants, and None as using - # the CI-provided default - default=None, - ) - - list_parser.add_argument( - "-j", "--jars", help=argparse.SUPPRESS, action=DeprecatedAction - ) - - list_parser.add_argument( - "--executor-memory", help=argparse.SUPPRESS, action=DeprecatedAction - ) - - list_parser.add_argument( - "--executor-cores", help=argparse.SUPPRESS, action=DeprecatedAction - ) - - list_parser.add_argument( - "--max-cores", help=argparse.SUPPRESS, action=DeprecatedAction - ) - - list_parser.add_argument( - "--driver-max-result-size", help=argparse.SUPPRESS, action=DeprecatedAction - ) - - list_parser.add_argument( - "--driver-memory", help=argparse.SUPPRESS, action=DeprecatedAction - ) - - list_parser.add_argument( - "--driver-cores", help=argparse.SUPPRESS, action=DeprecatedAction - ) - aws_group = list_parser.add_argument_group( title="AWS credentials options", description="If --aws-credentials-yaml is specified, it overrides all " @@ -423,15 +361,7 @@ def add_subparser(subparsers): "--aws-credentials-yaml is not specified and --service is either " "not specified or the service does not have credentials in " "/etc/boto_cfg", - default=DEFAILT_AWS_PROFILE, - ) - - aws_group.add_argument( - "--no-aws-credentials", - help="Do not load any AWS credentials; allow the Spark job to use its " - "own logic to load credentials", - action="store_true", - default=False, + default=DEFAULT_AWS_PROFILE, ) aws_group.add_argument( @@ -481,51 +411,11 @@ def add_subparser(subparsers): list_parser.set_defaults(command=paasta_spark_run) -def decide_final_eks_toggle_state(user_override: Optional[bool]) -> bool: - """ - This is slightly weird (hooray for tri-value logic!) - but basically: - we want to prioritize any user choice for using EKS (i.e., force - enable/disable using EKS) regardless of what the PaaSTA-supplied - default is. - - If a user hasn't set --force-use-eks or --force-no-use-eks, argparse - will leave args.use_eks_override (or, in this function, user_override) as None - - otherwise, there'll be an actual boolean there. - """ - if user_override is not None: - return user_override - - return load_system_paasta_config().get_spark_use_eks_default() - - def sanitize_container_name(container_name): # container_name only allows [a-zA-Z0-9][a-zA-Z0-9_.-] return re.sub("[^a-zA-Z0-9_.-]", "_", re.sub("^[^a-zA-Z0-9]+", "", container_name)) -def generate_pod_template_path(): - return POD_TEMPLATE_PATH.format(file_uuid=uuid.uuid4().hex) - - -def should_enable_compact_bin_packing(disable_compact_bin_packing, cluster_manager): - if disable_compact_bin_packing: - return False - - if cluster_manager != CLUSTER_MANAGER_K8S: - log.warn( - "enable_compact_bin_packing=True ignored as cluster manager is not kubernetes" - ) - return False - - if not os.access(POD_TEMPLATE_DIR, os.W_OK): - log.warn( - f"enable_compact_bin_packing=True ignored as {POD_TEMPLATE_DIR} is not usable" - ) - return False - - return True - - def get_docker_run_cmd( container_name, volumes, @@ -699,17 +589,13 @@ def get_spark_env( spark_env["SPARK_DAEMON_CLASSPATH"] = "/opt/spark/extra_jars/*" spark_env["SPARK_NO_DAEMONIZE"] = "true" - if decide_final_eks_toggle_state(args.use_eks_override): - spark_env["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig() + spark_env["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig() return spark_env def _parse_user_spark_args( - spark_args: Optional[str], - pod_template_path: str, - enable_compact_bin_packing: bool = False, - enable_spark_dra: bool = False, + spark_args: str, ) -> Dict[str, str]: user_spark_opts = {} @@ -726,48 +612,9 @@ def _parse_user_spark_args( sys.exit(1) user_spark_opts[fields[0]] = fields[1] - if enable_compact_bin_packing: - user_spark_opts["spark.kubernetes.executor.podTemplateFile"] = pod_template_path - - if enable_spark_dra: - if ( - "spark.dynamicAllocation.enabled" in user_spark_opts - and user_spark_opts["spark.dynamicAllocation.enabled"] == "false" - ): - print( - PaastaColors.red( - "Error: --enable-dra flag is provided while spark.dynamicAllocation.enabled " - "is explicitly set to false in --spark-args. If you want to enable DRA, please remove the " - "spark.dynamicAllocation.enabled=false config from spark-args. If you don't want to " - "enable DRA, please remove the --enable-dra flag." - ), - file=sys.stderr, - ) - sys.exit(1) - user_spark_opts["spark.dynamicAllocation.enabled"] = "true" - return user_spark_opts -def create_spark_config_str(spark_config_dict, is_mrjob): - conf_option = "--jobconf" if is_mrjob else "--conf" - spark_config_entries = list() - - if is_mrjob: - spark_master = spark_config_dict["spark.master"] - spark_config_entries.append(f"--spark-master={spark_master}") - - for opt, val in spark_config_dict.items(): - # mrjob use separate options to configure master - if is_mrjob and opt == "spark.master": - continue - # Process Spark configs with multiple space separated values to be in single quotes - if isinstance(val, str) and " " in val: - val = f"'{val}'" - spark_config_entries.append(f"{conf_option} {opt}={val}") - return " ".join(spark_config_entries) - - def run_docker_container( container_name, volumes, @@ -891,7 +738,7 @@ def configure_and_run_docker_container( docker_img: str, instance_config: InstanceConfig, system_paasta_config: SystemPaastaConfig, - spark_conf: Mapping[str, str], + spark_conf: Dict[str, str], aws_creds: Tuple[Optional[str], Optional[str], Optional[str]], cluster_manager: str, pod_template_path: str, @@ -916,13 +763,11 @@ def configure_and_run_docker_container( volumes.append("%s:rw" % args.work_dir) volumes.append("/nail/home:/nail/home:rw") - if args.enable_compact_bin_packing: - volumes.append(f"{pod_template_path}:{pod_template_path}:rw") + volumes.append(f"{pod_template_path}:{pod_template_path}:rw") - if decide_final_eks_toggle_state(args.use_eks_override): - volumes.append( - f"{system_paasta_config.get_spark_kubeconfig()}:{system_paasta_config.get_spark_kubeconfig()}:ro" - ) + volumes.append( + f"{system_paasta_config.get_spark_kubeconfig()}:{system_paasta_config.get_spark_kubeconfig()}:ro" + ) environment = instance_config.get_env_dictionary() # type: ignore spark_conf_str = create_spark_config_str(spark_conf, is_mrjob=args.mrjob) @@ -1129,82 +974,6 @@ def validate_work_dir(s): sys.exit(1) -def _auto_add_timeout_for_job(cmd, timeout_job_runtime): - # Timeout only to be added for spark-submit commands - # TODO: Add timeout for jobs using mrjob with spark-runner - if "spark-submit" not in cmd: - return cmd - try: - timeout_present = re.match( - r"^.*timeout[\s]+[\d]+[\.]?[\d]*[m|h][\s]+spark-submit .*$", cmd - ) - if not timeout_present: - split_cmd = cmd.split("spark-submit") - cmd = f"{split_cmd[0]}timeout {timeout_job_runtime} spark-submit{split_cmd[1]}" - print( - PaastaColors.blue( - f"NOTE: Job will exit in given time {timeout_job_runtime}. " - f"Adjust timeout value using --timeout-job-timeout. " - f"New Updated Command with timeout: {cmd}" - ), - ) - except Exception as e: - err_msg = ( - f"'timeout' could not be added to command: '{cmd}' due to error '{e}'. " - "Please report to #spark." - ) - log.warn(err_msg) - print(PaastaColors.red(err_msg)) - return cmd - - -def _validate_pool(args, system_paasta_config): - if args.pool: - valid_pools = system_paasta_config.get_cluster_pools().get(args.cluster, []) - if not valid_pools: - log.warning( - PaastaColors.yellow( - f"Could not fetch allowed_pools for `{args.cluster}`. Skipping pool validation.\n" - ) - ) - if valid_pools and args.pool not in valid_pools: - print( - PaastaColors.red( - f"Invalid --pool value. List of valid pools for cluster `{args.cluster}`: " - f"{valid_pools}" - ), - file=sys.stderr, - ) - return False - return True - - -def _get_k8s_url_for_cluster(cluster: str) -> Optional[str]: - """ - Annoyingly, there's two layers of aliases: one to figure out what - k8s server url to use (this one) and another to figure out what - soaconfigs filename to use ;_; - - This exists so that we can map something like `--cluster pnw-devc` - into spark-pnw-devc's k8s apiserver url without needing to update - any soaconfigs/alter folk's muscle memory. - - Ideally we can get rid of this entirely once spark-run reads soaconfigs - in a manner more closely aligned to what we do with other paasta workloads - (i.e., have it automatically determine where to run based on soaconfigs - filenames - and not rely on explicit config) - """ - realized_cluster = ( - load_system_paasta_config().get_eks_cluster_aliases().get(cluster, cluster) - ) - return ( - load_system_paasta_config() - .get_kube_clusters() - .get(realized_cluster, {}) - .get("server") - ) - - def parse_tronfig(tronfig_path: str, job_id: str) -> Optional[Dict[str, Any]]: splitted = job_id.split(".") if len(splitted) != 2: @@ -1225,6 +994,8 @@ def update_args_from_tronfig(args: argparse.Namespace) -> Optional[Dict[str, str - pool - iam_role - iam_role_provider + - force_spark_resource_configs + - max_runtime - command - env - spark_args @@ -1255,10 +1026,12 @@ def update_args_from_tronfig(args: argparse.Namespace) -> Optional[Dict[str, str ) return None - # Other args + # Other args: map Tronfig YAML fields to spark-run CLI args fields_to_args = { "pool": "pool", "iam_role": "assume_aws_role", + "force_spark_resource_configs": "force_spark_resource_configs", + "max_runtime": "timeout_job_runtime", "command": "cmd", "spark_args": "spark_args", } @@ -1271,11 +1044,11 @@ def update_args_from_tronfig(args: argparse.Namespace) -> Optional[Dict[str, str value = " ".join([f"{k}={v}" for k, v in dict(value).items()]) # Befutify for printing - arg_name_str = (f"--{arg_name.replace('_', '-')}").ljust(20, " ") - field_name_str = field_name.ljust(12) + arg_name_str = (f"--{arg_name.replace('_', '-')}").ljust(31, " ") + field_name_str = field_name.ljust(28) # Only load iam_role value if --aws-profile is not set - if field_name == "iam_role" and args.aws_profile != DEFAILT_AWS_PROFILE: + if field_name == "iam_role" and args.aws_profile != DEFAULT_AWS_PROFILE: print( PaastaColors.yellow( f"Overwriting args with Tronfig: {arg_name_str} => {field_name_str} : IGNORE, " @@ -1336,8 +1109,22 @@ def paasta_spark_run(args: argparse.Namespace) -> int: return 1 # validate pool - if not _validate_pool(args, system_paasta_config): - return 1 + try: + if not validate_pool(args.cluster, args.pool, system_paasta_config): + print( + PaastaColors.red( + f"Invalid --pool value. List of valid pools for cluster `{args.cluster}`: " + f"{system_paasta_config.get_pools_for_cluster(args.cluster)}" + ), + file=sys.stderr, + ) + return 1 + except PoolsNotConfiguredError: + log.warning( + PaastaColors.yellow( + f"Could not fetch allowed_pools for `{args.cluster}`. Skipping pool validation.\n" + ) + ) # annoyingly, there's two layers of aliases: one for the soaconfigs to read from # (that's this alias lookup) - and then another layer later when figuring out what @@ -1377,7 +1164,6 @@ def paasta_spark_run(args: argparse.Namespace) -> int: aws_creds = get_aws_credentials( service=args.service, - no_aws_credentials=args.no_aws_credentials, aws_credentials_yaml=args.aws_credentials_yaml, profile_name=args.aws_profile, assume_aws_role_arn=args.assume_aws_role, @@ -1387,30 +1173,12 @@ def paasta_spark_run(args: argparse.Namespace) -> int: if docker_image_digest is None: return 1 - pod_template_path = generate_pod_template_path() - args.enable_compact_bin_packing = should_enable_compact_bin_packing( - args.disable_compact_bin_packing, args.cluster_manager - ) - volumes = instance_config.get_volumes(system_paasta_config.get_volumes()) app_base_name = get_spark_app_name(args.cmd or instance_config.get_cmd()) - if args.enable_compact_bin_packing: - document = POD_TEMPLATE.format( - spark_pod_label=limit_size_with_hash(f"exec-{app_base_name}"), - ) - parsed_pod_template = yaml.safe_load(document) - with open(pod_template_path, "w") as f: - yaml.dump(parsed_pod_template, f) - - user_spark_opts = _parse_user_spark_args( - args.spark_args, - pod_template_path, - args.enable_compact_bin_packing, - args.enable_dra, - ) + user_spark_opts = _parse_user_spark_args(args.spark_args) - args.cmd = _auto_add_timeout_for_job(args.cmd, args.timeout_job_runtime) + args.cmd = auto_add_timeout_for_spark_job(args.cmd, args.timeout_job_runtime) # This is required if configs are provided as part of `spark-submit` # Other way to provide is with --spark-args @@ -1424,15 +1192,11 @@ def paasta_spark_run(args: argparse.Namespace) -> int: paasta_instance = get_smart_paasta_instance_name(args) - use_eks = decide_final_eks_toggle_state(args.use_eks_override) - k8s_server_address = _get_k8s_url_for_cluster(args.cluster) if use_eks else None - paasta_cluster = ( - args.cluster - if not use_eks - else load_system_paasta_config() - .get_eks_cluster_aliases() - .get(args.cluster, args.cluster) + k8s_server_address = get_k8s_url_for_cluster(args.cluster) + paasta_cluster = system_paasta_config.get_eks_cluster_aliases().get( + args.cluster, args.cluster ) + spark_conf_builder = spark_config.SparkConfBuilder() spark_conf = spark_conf_builder.get_spark_conf( cluster_manager=args.cluster_manager, @@ -1447,14 +1211,9 @@ def paasta_spark_run(args: argparse.Namespace) -> int: aws_creds=aws_creds, aws_region=args.aws_region, force_spark_resource_configs=args.force_spark_resource_configs, - use_eks=use_eks, + use_eks=True, k8s_server_address=k8s_server_address, ) - # TODO: Remove this after MLCOMPUTE-699 is complete - if "spark.kubernetes.decommission.script" not in spark_conf: - spark_conf[ - "spark.kubernetes.decommission.script" - ] = "/opt/spark/kubernetes/dockerfiles/spark/decom.sh" return configure_and_run_docker_container( args, @@ -1464,6 +1223,8 @@ def paasta_spark_run(args: argparse.Namespace) -> int: spark_conf=spark_conf, aws_creds=aws_creds, cluster_manager=args.cluster_manager, - pod_template_path=pod_template_path, + pod_template_path=spark_conf.get( + "spark.kubernetes.executor.podTemplateFile", "" + ), extra_driver_envs=driver_envs_from_tronfig, ) diff --git a/paasta_tools/cli/schemas/tron_schema.json b/paasta_tools/cli/schemas/tron_schema.json index 3f1ad3095c..b91c22786e 100644 --- a/paasta_tools/cli/schemas/tron_schema.json +++ b/paasta_tools/cli/schemas/tron_schema.json @@ -316,6 +316,15 @@ "type": "string", "pattern": "[-a-zA-z0-9_]" }, + "max_runtime": { + "$ref": "#definitions/time_delta" + }, + "force_spark_resource_configs": { + "type": "boolean" + }, + "mrjob": { + "type": "boolean" + }, "spark_args": { "type": "object", "additionalProperties": true, diff --git a/paasta_tools/kubernetes_tools.py b/paasta_tools/kubernetes_tools.py index aa5cbf3bb8..ee8d43afa6 100644 --- a/paasta_tools/kubernetes_tools.py +++ b/paasta_tools/kubernetes_tools.py @@ -4008,6 +4008,7 @@ def create_or_find_service_account_name( iam_role: str, namespace: str, k8s_role: Optional[str] = None, + kubeconfig_file: Optional[str] = None, dry_run: bool = False, ) -> str: # the service account is expected to always be prefixed with paasta- as using the actual namespace @@ -4039,7 +4040,7 @@ def create_or_find_service_account_name( if dry_run: return sa_name - kube_client = KubeClient() + kube_client = KubeClient(config_file=kubeconfig_file) if not any( sa.metadata and sa.metadata.name == sa_name for sa in get_all_service_accounts(kube_client, namespace) diff --git a/paasta_tools/spark_tools.py b/paasta_tools/spark_tools.py index b1f8df744f..2bc7ade7ae 100644 --- a/paasta_tools/spark_tools.py +++ b/paasta_tools/spark_tools.py @@ -1,26 +1,28 @@ -import copy import logging import re import socket import sys -from functools import lru_cache +from typing import Any from typing import cast from typing import Dict from typing import List from typing import Mapping -from typing import Optional from typing import Set -import yaml from mypy_extensions import TypedDict -from service_configuration_lib import spark_config -from service_configuration_lib.spark_config import DEFAULT_SPARK_RUN_CONFIG from paasta_tools.utils import DockerVolume -from paasta_tools.utils import get_runtimeenv +from paasta_tools.utils import PaastaColors KUBERNETES_NAMESPACE = "paasta-spark" DEFAULT_SPARK_SERVICE = "spark" +DEFAULT_SPARK_RUNTIME_TIMEOUT = "12h" +SPARK_AWS_CREDS_PROVIDER = "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" +SPARK_EXECUTOR_NAMESPACE = "paasta-spark" +SPARK_DRIVER_POOL = "stable" +SPARK_JOB_USER = "TRON" +SPARK_PROMETHEUS_SHARD = "ml-compute" +SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml" log = logging.getLogger(__name__) @@ -52,115 +54,10 @@ class SparkEnvironmentConfig(TypedDict): ) -@lru_cache(maxsize=1) -def get_default_spark_configuration() -> Optional[SparkEnvironmentConfig]: - """ - Read the globally distributed Spark configuration file and return the contents as a dictionary. - - At the time this comment was written, the only bit of information that we care about from this file - is the default event log location on S3. See the TypedDict of the retval for the bits that we care about - """ - try: - with open(DEFAULT_SPARK_RUN_CONFIG, mode="r") as f: - return yaml.safe_load(f.read()) - except OSError: - log.error( - f"Unable to open {DEFAULT_SPARK_RUN_CONFIG} and get default configuration values!" - ) - except yaml.YAMLError: - log.error( - f"Unable to parse {DEFAULT_SPARK_RUN_CONFIG} and get default configuration values!" - ) - - return None - - def get_webui_url(port: str) -> str: return f"http://{socket.getfqdn()}:{port}" -def setup_event_log_configuration(spark_args: Dict[str, str]) -> Dict[str, str]: - """ - Adjusts user settings to provide a default event log storage path if event logging is - enabled but not configured. - - If event logging is not enabled or is fully configured, this function will functionally noop. - """ - # don't enable event logging if explicitly disabled - if spark_args.get("spark.eventLog.enabled", "true") != "true": - # Note: we provide an empty dict as our return value as the expected - # usage of this function is something like CONF.update(setup_event_log_configuration(...)) - # in in this case, we don't want to update the existing config - return {} - - # user set an explicit event log location - there's nothing else for us to - # do here - if spark_args.get("spark.eventLog.dir") is not None: - # so, same as above, we return an empty dict so that there are no updates - return {} - - default_spark_conf = get_default_spark_configuration() - if default_spark_conf is None: - log.error( - "Unable to access default Spark configuration, event log will be disabled" - ) - # Note: we don't return an empty dict here since we want to make sure that our - # caller will overwrite the enabled option with our return value (see the first - # `if` block in this function for more details) - return {"spark.eventLog.enabled": "false"} - - environment_config = default_spark_conf.get("environments", {}).get( - get_runtimeenv() - ) - if environment_config is None: - log.error( - f"{get_runtimeenv()} not found in {DEFAULT_SPARK_RUN_CONFIG}, event log will be disabled" - ) - return {"spark.eventLog.enabled": "false"} - - return { - "spark.eventLog.enabled": "true", - "spark.eventLog.dir": environment_config["default_event_log_dir"], - } - - -def adjust_spark_resources( - spark_args: Dict[str, str], desired_pool: str -) -> Dict[str, str]: - """ - Wrapper around _adjust_spark_requested_resources from service_configuration_lib. - - We have some code that will do some QoL translations from Mesos->K8s arguments as well - as set some more Yelpy defaults than what Spark uses. - """ - # TODO: would be nice if _adjust_spark_requested_resources only returned the stuff it - # modified - spark_conf_builder = spark_config.SparkConfBuilder() - return spark_conf_builder._adjust_spark_requested_resources( - # additionally, _adjust_spark_requested_resources modifies the dict you pass in - # so we make a copy to make things less confusing - consider dropping the - # service_configuration_lib dependency here so that we can do things in a slightly - # cleaner way - user_spark_opts=copy.copy(spark_args), - cluster_manager="kubernetes", - pool=desired_pool, - ) - - -def setup_shuffle_partitions(spark_args: Dict[str, str]) -> Dict[str, str]: - """ - Wrapper around _append_sql_partitions_conf from service_configuration_lib. - - For now, this really just sets a default number of partitions based on # of cores. - """ - # as above, this function also returns everything + mutates the passed in dictionary - # which is not ideal - spark_conf_builder = spark_config.SparkConfBuilder() - return spark_conf_builder._append_sql_partitions_conf( - spark_opts=copy.copy(spark_args), - ) - - def get_volumes_from_spark_mesos_configs(spark_conf: Mapping[str, str]) -> List[str]: return ( spark_conf.get("spark.mesos.executor.docker.volumes", "").split(",") @@ -253,10 +150,100 @@ def setup_volume_mounts(volumes: List[DockerVolume]) -> Dict[str, str]: return conf -def inject_spark_conf_str(original_docker_cmd: str, spark_conf_str: str) -> str: +def create_spark_config_str(spark_config_dict: Dict[str, Any], is_mrjob: bool) -> str: + conf_option = "--jobconf" if is_mrjob else "--conf" + spark_config_entries = list() + + if is_mrjob: + spark_master = spark_config_dict["spark.master"] + spark_config_entries.append(f"--spark-master={spark_master}") + spark_config_dict.pop("spark.master", None) + + for opt, val in spark_config_dict.items(): + # Process Spark configs with multiple space separated values to be in single quotes + if isinstance(val, str) and " " in val: + val = f"'{val}'" + spark_config_entries.append(f"{conf_option} {opt}={val}") + return " ".join(spark_config_entries) + + +def inject_spark_conf_str(original_cmd: str, spark_conf_str: str) -> str: for base_cmd in ("pyspark", "spark-shell", "spark-submit"): - if base_cmd in original_docker_cmd: - return original_docker_cmd.replace( - base_cmd, base_cmd + " " + spark_conf_str, 1 + if base_cmd in original_cmd: + return original_cmd.replace(base_cmd, base_cmd + " " + spark_conf_str, 1) + return original_cmd + + +def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str: + # Timeout only to be added for spark-submit commands + # TODO: Add timeout for jobs using mrjob with spark-runner + if "spark-submit" not in cmd: + return cmd + try: + timeout_present = re.match( + r"^.*timeout[\s]+[\d]+[\.]?[\d]*[m|h][\s]+spark-submit .*$", cmd + ) + if not timeout_present: + split_cmd = cmd.split("spark-submit") + # split_cmd[0] will always be an empty string or end with a space + cmd = f"{split_cmd[0]}timeout {timeout_job_runtime} spark-submit{split_cmd[1]}" + log.info( + PaastaColors.blue( + f"NOTE: Job will exit in given time {timeout_job_runtime}. " + f"Adjust timeout value using --timeout-job-timeout. " + f"New Updated Command with timeout: {cmd}" + ), ) - return original_docker_cmd + except Exception as e: + err_msg = ( + f"'timeout' could not be added to command: '{cmd}' due to error '{e}'. " + "Please report to #spark." + ) + log.warn(err_msg) + print(PaastaColors.red(err_msg)) + return cmd + + +def build_spark_command( + original_cmd: str, + spark_config_dict: Dict[str, Any], + is_mrjob: bool, + timeout_job_runtime: str, +) -> str: + command = f"{inject_spark_conf_str(original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob))}" + return auto_add_timeout_for_spark_job(command, timeout_job_runtime) + + +def get_spark_ports_from_config(spark_conf: Dict[str, str]) -> List[int]: + ports = [int(v) for k, v in spark_conf.items() if k.endswith(".port")] + return ports + + +# TODO: Reuse by ad-hoc Spark-driver-on-k8s +def get_spark_driver_monitoring_annotations( + spark_config: Dict[str, str], +) -> Dict[str, str]: + """ + Returns Spark driver pod annotations - currently used for Prometheus metadata. + """ + ui_port_str = str(spark_config.get("spark.ui.port", "")) + annotations = { + "prometheus.io/port": ui_port_str, + "prometheus.io/path": "/metrics/prometheus", + } + return annotations + + +def get_spark_driver_monitoring_labels( + spark_config: Dict[str, str], +) -> Dict[str, str]: + """ + Returns Spark driver pod labels - generally for Prometheus metric relabeling. + """ + ui_port_str = str(spark_config.get("spark.ui.port", "")) + labels = { + "paasta.yelp.com/prometheus_shard": SPARK_PROMETHEUS_SHARD, + "spark.yelp.com/user": SPARK_JOB_USER, + "spark.yelp.com/driver_ui_port": ui_port_str, + } + return labels diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index d550974150..1516a14e56 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -29,8 +29,7 @@ from mypy_extensions import TypedDict from service_configuration_lib import read_extra_service_information from service_configuration_lib import read_yaml_file -from service_configuration_lib.spark_config import _filter_user_spark_opts -from service_configuration_lib.spark_config import stringify_spark_env +from service_configuration_lib.spark_config import SparkConfBuilder from paasta_tools.mesos_tools import mesos_services_running_here @@ -53,6 +52,13 @@ from paasta_tools.utils import time_cache from paasta_tools.utils import filter_templates_from_config from paasta_tools.utils import TronSecretVolume +from paasta_tools.utils import get_k8s_url_for_cluster +from paasta_tools.utils import validate_pool +from paasta_tools.utils import PoolsNotConfiguredError +from paasta_tools.utils import DockerVolume + +from paasta_tools import spark_tools + from paasta_tools.kubernetes_tools import ( allowlist_denylist_to_requirements, create_or_find_service_account_name, @@ -60,20 +66,12 @@ raw_selectors_to_requirements, to_node_label, ) -from paasta_tools.spark_tools import ( - adjust_spark_resources, - inject_spark_conf_str, - setup_event_log_configuration, - setup_shuffle_partitions, - setup_volume_mounts, -) from paasta_tools.secret_tools import is_secret_ref from paasta_tools.secret_tools import is_shared_secret from paasta_tools.secret_tools import is_shared_secret_from_secret_name from paasta_tools.secret_tools import get_secret_name_from_ref from paasta_tools.kubernetes_tools import get_paasta_secret_name from paasta_tools.secret_tools import SHARED_SECRET_SERVICE -from paasta_tools.spark_tools import KUBERNETES_NAMESPACE as SPARK_KUBERNETES_NAMESPACE from paasta_tools import monitoring_tools from paasta_tools.monitoring_tools import list_teams @@ -98,11 +96,11 @@ DEFAULT_AWS_REGION = "us-west-2" EXECUTOR_TYPE_TO_NAMESPACE = { "paasta": "tron", - "spark": "paasta-spark", + "spark": "tron", } DEFAULT_TZ = "US/Pacific" clusterman_metrics, _ = get_clusterman_metrics() -EXECUTOR_TYPES = ["paasta", "ssh"] +EXECUTOR_TYPES = ["paasta", "ssh", "spark"] class FieldSelectorConfig(TypedDict): @@ -117,6 +115,10 @@ class InvalidTronConfig(Exception): pass +class InvalidPoolError(Exception): + pass + + class TronConfig(dict): """System-level configuration for Tron.""" @@ -240,24 +242,20 @@ def _use_suffixed_log_streams_k8s() -> bool: return load_system_paasta_config().get_tron_k8s_use_suffixed_log_streams_k8s() -def _get_spark_ports(system_paasta_config: SystemPaastaConfig) -> Dict[str, int]: - return { - "spark.ui.port": system_paasta_config.get_spark_ui_port(), - "spark.driver.port": system_paasta_config.get_spark_driver_port(), - "spark.blockManager.port": system_paasta_config.get_spark_blockmanager_port(), - "spark.driver.blockManager.port": system_paasta_config.get_spark_blockmanager_port(), - } - - class TronActionConfigDict(InstanceConfigDict, total=False): # this is kinda confusing: long-running stuff is currently using cmd # ...but tron are using command - this is going to require a little # maneuvering to unify command: str + service_account_name: str + # the values for this dict can be anything since it's whatever # spark accepts spark_args: Dict[str, Any] - service_account_name: str + force_spark_resource_configs: bool + # TODO: TRON-2145: use this to implement timeout for non-spark actions in tron + max_runtime: str + mrjob: bool class TronActionConfig(InstanceConfig): @@ -286,123 +284,91 @@ def __init__( # Indicate whether this config object is created for validation self.for_validation = for_validation - def _build_spark_config(self) -> Dict[str, str]: - # this service account will either be used solely for k8s access or for - # k8s + AWS access - the executor will have its own account though: and - # only if users opt into using pod identity - driver_service_account = create_or_find_service_account_name( - iam_role=self.get_iam_role(), - namespace=EXECUTOR_TYPE_TO_NAMESPACE[self.get_executor()], - k8s_role=_spark_k8s_role(), - dry_run=self.for_validation, - ) + def build_spark_config(self) -> Dict[str, str]: - # this is pretty similar to what's done in service_configuration_lib, but it doesn't seem - # worth refactoring things there when it's unlikely that other users of that library will - # have the k8s creds to deal with service account creation (or the role bindings for the - # driver service account as done above) - spark_args = self.config_dict.get("spark_args", {}) - truncated_service = limit_size_with_hash(self.get_service()) - truncated_instance = limit_size_with_hash(self.get_instance()) system_paasta_config = load_system_paasta_config() - conf = { - "spark.app.name": spark_args.get( - "spark.app.name", - f"tron_spark_{self.get_service()}_{self.get_instance()}", - ), - # TODO: figure out how to handle dedicated spark cluster here - "spark.master": f"k8s://https://k8s.{self.get_cluster()}.paasta:6443", - # TODO: add PAASTA_RESOURCE_* environment variables here - "spark.executorEnv.PAASTA_SERVICE": self.get_service(), - "spark.executorEnv.PAASTA_INSTANCE": self.get_instance(), - "spark.executorEnv.PAASTA_CLUSTER": self.get_cluster(), - "spark.executorEnv.PAASTA_INSTANCE_TYPE": "spark", - "spark.executorEnv.SPARK_EXECUTOR_DIRS": "/tmp", - # XXX: do we need to set this for the driver if the pod running the driver was created with - # this SA already? - "spark.kubernetes.authenticate.driver.serviceAccountName": driver_service_account, - "spark.kubernetes.pyspark.pythonVersion": "3", - "spark.kubernetes.container.image": self.get_docker_url( - system_paasta_config - ), - "spark.kubernetes.namespace": SPARK_KUBERNETES_NAMESPACE, - "spark.kubernetes.executor.label.yelp.com/paasta_service": truncated_service, - "spark.kubernetes.executor.label.yelp.com/paasta_instance": truncated_instance, - "spark.kubernetes.executor.label.yelp.com/paasta_cluster": self.get_cluster(), - "spark.kubernetes.executor.label.paasta.yelp.com/service": truncated_service, - "spark.kubernetes.executor.label.paasta.yelp.com/instance": truncated_instance, - "spark.kubernetes.executor.label.paasta.yelp.com/cluster": self.get_cluster(), - "spark.kubernetes.node.selector.yelp.com/pool": self.get_pool(), - "spark.kubernetes.executor.label.yelp.com/pool": self.get_pool(), - # this relies on the PaaSTA workload contract being followed - $PAASTA_POD_IP - # will be, drumroll please, the routable IP for the Pod - this allows us to - # not need to worry about DNS - "spark.driver.host": "$PAASTA_POD_IP", - } + resolved_cluster = system_paasta_config.get_eks_cluster_aliases().get( + self.get_cluster(), self.get_cluster() + ) + try: + if not validate_pool( + resolved_cluster, self.get_spark_executor_pool(), system_paasta_config + ): + raise InvalidPoolError + except PoolsNotConfiguredError: + log.warning( + f"Could not fetch allowed_pools for `{resolved_cluster}`. Skipping pool validation.\n" + ) + spark_args = self.config_dict.get("spark_args", {}) # most of the service_configuration_lib function expected string values only # so let's go ahead and convert the values now instead of once per-wrapper stringified_spark_args = { k: (str(v) if not isinstance(v, bool) else str(v).lower()) for k, v in spark_args.items() } - # now that we've added all the required stuff, we can add in all the stuff that users have added - # themselves - conf.update(_filter_user_spark_opts(user_spark_opts=stringified_spark_args)) - - if self.get_team() is not None: - conf["spark.kubernetes.executor.label.yelp.com/owner"] = self.get_team() - - conf.update(_get_spark_ports(system_paasta_config=system_paasta_config)) - - # Spark defaults to using the Service Account that the driver uses for executors, - # but that has more permissions than what we'd like to give the executors, so use - # the default service account instead - conf["spark.kubernetes.authenticate.executor.serviceAccountName"] = "default" - if self.get_iam_role() and self.get_iam_role_provider() == "aws": - conf[ - "spark.kubernetes.authenticate.executor.serviceAccountName" - ] = create_or_find_service_account_name( - iam_role=self.get_iam_role(), - namespace=EXECUTOR_TYPE_TO_NAMESPACE[self.get_executor()], - dry_run=self.for_validation, - ) - conf.update( - setup_volume_mounts( - volumes=self.get_volumes( - system_volumes=system_paasta_config.get_volumes(), - ), - ), + spark_app_name = stringified_spark_args.get( + "spark.app.name", + f"tron_spark_{self.get_service()}_{self.get_instance()}", ) - # Core ML has some translation logic to go from Mesos->k8s options for people living in the past - conf.update( - adjust_spark_resources( - spark_args=stringified_spark_args, desired_pool=self.get_pool() + docker_img_url = self.get_docker_url(system_paasta_config) + + spark_conf_builder = SparkConfBuilder() + spark_conf = spark_conf_builder.get_spark_conf( + cluster_manager="kubernetes", + spark_app_base_name=spark_app_name, + user_spark_opts=stringified_spark_args, + paasta_cluster=resolved_cluster, + paasta_pool=self.get_spark_executor_pool(), + paasta_service=self.get_service(), + paasta_instance=self.get_instance(), + docker_img=docker_img_url, + extra_volumes=self.get_volumes(system_paasta_config.get_volumes()), + use_eks=True, + k8s_server_address=get_k8s_url_for_cluster(self.get_cluster()), + force_spark_resource_configs=self.config_dict.get( + "force_spark_resource_configs", False + ), + user=spark_tools.SPARK_JOB_USER, + ) + # TODO: Remove this once dynamic pod template is generated inside the driver using spark-submit wrapper + if "spark.kubernetes.executor.podTemplateFile" in spark_conf: + print( + f"Replacing spark.kubernetes.executor.podTemplateFile=" + f"{spark_conf['spark.kubernetes.executor.podTemplateFile']} with " + f"spark.kubernetes.executor.podTemplateFile={spark_tools.SPARK_DNS_POD_TEMPLATE}" ) + spark_conf[ + "spark.kubernetes.executor.podTemplateFile" + ] = spark_tools.SPARK_DNS_POD_TEMPLATE + + spark_conf.update( + { + "spark.hadoop.fs.s3a.aws.credentials.provider": spark_tools.SPARK_AWS_CREDS_PROVIDER, + "spark.driver.host": "$PAASTA_POD_IP", + } + ) + spark_conf.setdefault( + "spark.kubernetes.executor.label.yelp.com/owner", self.get_team() ) - # as well as some QoL tweaks for other options - conf.update(setup_shuffle_partitions(spark_args=stringified_spark_args)) - conf.update(setup_event_log_configuration(spark_args=stringified_spark_args)) - return conf + # We need to make sure the Service Account used by the executors has been created. + # We are using the Service Account created using the provided or default IAM role. + spark_conf[ + "spark.kubernetes.authenticate.executor.serviceAccountName" + ] = create_or_find_service_account_name( + iam_role=self.get_spark_executor_iam_role(), + namespace=spark_tools.SPARK_EXECUTOR_NAMESPACE, + kubeconfig_file=system_paasta_config.get_spark_kubeconfig(), + dry_run=self.for_validation, + ) + + return spark_conf def get_cmd(self): command = self.config_dict.get("command") - - if self.get_executor() == "spark": - # until we switch drivers to use pod identity, we need to use the Yelp's legacy AWS credential system - # and ensure that the AWS access keys are part of the environment variables that the driver is started - # with - this is more secure than what we appear to have tried to do in the previous attempt and also - # still allows us to use the new CEP-1713 "private" boto_cfg system should the pod identity migration - # require more work/time than expected. - aws_credentials = self.config_dict.get("aws_credentials") - cmd_setup = ( - f". /etc/boto_cfg/{aws_credentials}.sh && " if aws_credentials else "" - ) - command = f"{cmd_setup}{inject_spark_conf_str(command, stringify_spark_env(self._build_spark_config()))}" - return command def get_job_name(self): @@ -411,9 +377,10 @@ def get_job_name(self): def get_action_name(self): return self.action - # mypy does not like the SecretVolume -> TronSecretVolume conversion, because TypedDict inheritence is broken. Until this is fixed, let's ignore this issue. + # mypy does not like the SecretVolume -> TronSecretVolume conversion, because TypedDict inheritence is broken. + # Until this is fixed, let's ignore this issue. def get_secret_volumes(self) -> List[TronSecretVolume]: # type: ignore - """Adds the secret_volume_name to the objet so tron/task_processing can load it downstream without replicating code.""" + """Adds the secret_volume_name to the object so tron/task_processing can load it downstream without replicating code.""" secret_volumes = super().get_secret_volumes() tron_secret_volumes = [] for secret_volume in secret_volumes: @@ -487,6 +454,20 @@ def get_env( return env + def get_iam_role(self) -> str: + iam_role = super().get_iam_role() + + if not iam_role and self.get_executor() == "spark": + iam_role = load_system_paasta_config().get_spark_driver_iam_role() + + return iam_role + + def get_spark_executor_iam_role(self) -> str: + return ( + self.get_iam_role() + or load_system_paasta_config().get_spark_executor_iam_role() + ) + def get_secret_env(self) -> Mapping[str, dict]: base_env = self.config_dict.get("env", {}) secret_env = {} @@ -631,6 +612,15 @@ def get_pool(self) -> str: override this value. To control this, we have an optional config item that we'll puppet onto Tron masters which this function will read. """ + return ( + self.config_dict.get( + "pool", load_system_paasta_config().get_tron_default_pool_override() + ) + if not self.get_executor() == "spark" + else spark_tools.SPARK_DRIVER_POOL + ) + + def get_spark_executor_pool(self) -> str: return self.config_dict.get( "pool", load_system_paasta_config().get_tron_default_pool_override() ) @@ -882,7 +872,7 @@ def format_master_config(master_config, default_volumes, dockercfg_location): def format_tron_action_dict(action_config: TronActionConfig): """Generate a dict of tronfig for an action, from the TronActionConfig. - :param job_config: TronActionConfig + :param action_config: TronActionConfig """ executor = action_config.get_executor() result = { @@ -898,7 +888,7 @@ def format_tron_action_dict(action_config: TronActionConfig): "triggered_by": action_config.get_triggered_by(), "on_upstream_rerun": action_config.get_on_upstream_rerun(), "trigger_timeout": action_config.get_trigger_timeout(), - # outside of Spark usescases, we also allow users to specify an expected-to-exist Service Account name + # outside of Spark use-cases, we also allow users to specify an expected-to-exist Service Account name # in the Tron namespace in case an action needs specific k8s permissions (e.g., a Jolt batch may need # k8s permissions to list Jolt pods in the jolt namespace to do scienceā„¢ to them). # if the provided Service Account does not exist, Tron should simply fail to create the Podspec and report @@ -971,7 +961,7 @@ def format_tron_action_dict(action_config: TronActionConfig): and action_config.get_iam_role() and not action_config.for_validation ): - # this service account will be used for normal Tron batches as well as for Spark executors + # this service account will be used for normal Tron batches as well as for Spark drivers result["service_account_name"] = create_or_find_service_account_name( iam_role=action_config.get_iam_role(), namespace=EXECUTOR_TYPE_TO_NAMESPACE[executor], @@ -979,24 +969,45 @@ def format_tron_action_dict(action_config: TronActionConfig): dry_run=action_config.for_validation, ) + extra_volumes = action_config.get_extra_volumes() if executor == "spark": - # this service account will only be used by Spark drivers since executors don't - # need Kubernetes access permissions - result["service_account_name"] = create_or_find_service_account_name( - iam_role=action_config.get_iam_role(), - namespace=EXECUTOR_TYPE_TO_NAMESPACE[executor], - k8s_role=_spark_k8s_role(), - dry_run=action_config.for_validation, + is_mrjob = action_config.config_dict.get("mrjob", False) + system_paasta_config = load_system_paasta_config() + # inject spark configs to the original spark-submit command + spark_config = action_config.build_spark_config() + result["command"] = spark_tools.build_spark_command( + result["command"], + spark_config, + is_mrjob, + action_config.config_dict.get( + "max_runtime", spark_tools.DEFAULT_SPARK_RUNTIME_TIMEOUT + ), ) - # spark, unlike normal batches, needs to expose several ports for things like the spark + result["env"]["KUBECONFIG"] = system_paasta_config.get_spark_kubeconfig() + # spark, unlike normal batches, needs to expose several ports for things like the spark # ui and for executor->driver communication result["ports"] = list( - set( - _get_spark_ports( - system_paasta_config=load_system_paasta_config() - ).values() + set(spark_tools.get_spark_ports_from_config(spark_config)) + ) + # mount KUBECONFIG file for Spark drivers to communicate with EKS cluster + extra_volumes.append( + DockerVolume( + { + "containerPath": system_paasta_config.get_spark_kubeconfig(), + "hostPath": system_paasta_config.get_spark_kubeconfig(), + "mode": "RO", + } ) ) + # Add pod annotations and labels for Spark monitoring metrics + monitoring_annotations = ( + spark_tools.get_spark_driver_monitoring_annotations(spark_config) + ) + monitoring_labels = spark_tools.get_spark_driver_monitoring_labels( + spark_config + ) + result["annotations"].update(monitoring_annotations) + result["labels"].update(monitoring_labels) elif executor in MESOS_EXECUTOR_NAMES: result["executor"] = "mesos" @@ -1017,7 +1028,7 @@ def format_tron_action_dict(action_config: TronActionConfig): result["cpus"] = action_config.get_cpus() result["mem"] = action_config.get_mem() result["disk"] = action_config.get_disk() - result["extra_volumes"] = format_volumes(action_config.get_extra_volumes()) + result["extra_volumes"] = format_volumes(extra_volumes) result["docker_image"] = action_config.get_docker_url() # Only pass non-None values, so Tron will use defaults for others diff --git a/paasta_tools/utils.py b/paasta_tools/utils.py index 5189cd1383..dcb9ba4fe7 100644 --- a/paasta_tools/utils.py +++ b/paasta_tools/utils.py @@ -1904,6 +1904,7 @@ class RemoteRunConfig(TypedDict, total=False): class SparkRunConfig(TypedDict, total=False): default_cluster: str default_pool: str + default_spark_driver_iam_role: str class PaastaNativeConfig(TypedDict, total=False): @@ -2123,6 +2124,22 @@ def parse_system_paasta_config( return SystemPaastaConfig(config, path) +class PoolsNotConfiguredError(Exception): + pass + + +def validate_pool( + cluster: str, pool: str, system_paasta_config: "SystemPaastaConfig" +) -> bool: + if pool: + valid_pools = system_paasta_config.get_pools_for_cluster(cluster) + if not valid_pools: + raise PoolsNotConfiguredError + # at this point, we can be sure that `valid_pools` is populated + return pool in valid_pools + return True + + class SystemPaastaConfig: def __init__(self, config: SystemPaastaConfigDict, directory: str) -> None: self.directory = directory @@ -2774,11 +2791,6 @@ def get_spark_k8s_role(self) -> str: def get_tron_k8s_use_suffixed_log_streams_k8s(self) -> bool: return self.config_dict.get("tron_use_suffixed_log_streams", False) - def get_spark_ui_port(self) -> int: - # 33000 was picked arbitrarily (it was the base port when we used to - # randomly reserve port numbers) - return self.config_dict.get("spark_ui_port", 33000) - def get_spark_driver_port(self) -> int: # default value is an arbitrary value return self.config_dict.get("spark_driver_port", 33001) @@ -2808,6 +2820,16 @@ def get_eks_cluster_aliases(self) -> Dict[str, str]: def get_cluster_pools(self) -> Dict[str, List[str]]: return self.config_dict.get("allowed_pools", {}) + def get_spark_driver_iam_role(self) -> str: + return self.get_spark_run_config().get("default_spark_driver_iam_role", "") + + def get_spark_executor_iam_role(self) -> str: + # use the same IAM role as the Spark driver + return self.get_spark_run_config().get("default_spark_driver_iam_role", "") + + def get_pools_for_cluster(self, cluster: str) -> List[str]: + return self.get_cluster_pools().get(cluster, []) + def get_hacheck_match_initial_delay(self) -> bool: return self.config_dict.get("hacheck_match_initial_delay", False) @@ -4267,16 +4289,30 @@ def _reorder_docker_volumes(volumes: List[DockerVolume]) -> List[DockerVolume]: return sort_dicts(deduped) -@lru_cache(maxsize=1) -def get_runtimeenv() -> str: - try: - with open("/nail/etc/runtimeenv", mode="r") as f: - return f.read() - except OSError: - log.error("Unable to read runtimeenv - this is not expected if inside Yelp.") - # we could also just crash or return None, but this seems a little easier to find - # should we somehow run into this at Yelp - return "unknown" +def get_k8s_url_for_cluster(cluster: str) -> Optional[str]: + """ + Annoyingly, there's two layers of aliases: one to figure out what + k8s server url to use (this one) and another to figure out what + soaconfigs filename to use ;_; + + This exists so that we can map something like `--cluster pnw-devc` + into spark-pnw-devc's k8s apiserver url without needing to update + any soaconfigs/alter folk's muscle memory. + + Ideally we can get rid of this entirely once spark-run reads soaconfigs + in a manner more closely aligned to what we do with other paasta workloads + (i.e., have it automatically determine where to run based on soaconfigs + filenames - and not rely on explicit config) + """ + realized_cluster = ( + load_system_paasta_config().get_eks_cluster_aliases().get(cluster, cluster) + ) + return ( + load_system_paasta_config() + .get_kube_clusters() + .get(realized_cluster, {}) + .get("server") + ) @lru_cache(maxsize=1) diff --git a/requirements-minimal.txt b/requirements-minimal.txt index a42e3006c9..e2c6801a3e 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -54,7 +54,7 @@ requests-cache >= 0.4.10 retry ruamel.yaml sensu-plugin -service-configuration-lib >= 2.18.11 +service-configuration-lib >= 2.18.15 signalfx slackclient >= 1.2.1 sticht >= 1.1.0 diff --git a/requirements.txt b/requirements.txt index aa6201a9bb..d6c4aa4e18 100644 --- a/requirements.txt +++ b/requirements.txt @@ -90,7 +90,7 @@ rsa==4.7.2 ruamel.yaml==0.15.96 s3transfer==0.10.0 sensu-plugin==0.3.1 -service-configuration-lib==2.18.11 +service-configuration-lib==2.18.15 setuptools==39.0.1 signalfx==1.0.17 simplejson==3.10.0 diff --git a/tests/cli/test_cmds_spark_run.py b/tests/cli/test_cmds_spark_run.py index b56a7da587..68f5118e1d 100644 --- a/tests/cli/test_cmds_spark_run.py +++ b/tests/cli/test_cmds_spark_run.py @@ -15,14 +15,13 @@ import mock import pytest -from service_configuration_lib import spark_config +from paasta_tools import spark_tools +from paasta_tools import utils from paasta_tools.cli.cmds import spark_run from paasta_tools.cli.cmds.spark_run import _should_get_resource_requirements from paasta_tools.cli.cmds.spark_run import build_and_push_docker_image -from paasta_tools.cli.cmds.spark_run import CLUSTER_MANAGER_K8S from paasta_tools.cli.cmds.spark_run import configure_and_run_docker_container -from paasta_tools.cli.cmds.spark_run import decide_final_eks_toggle_state from paasta_tools.cli.cmds.spark_run import DEFAULT_DOCKER_SHM_SIZE from paasta_tools.cli.cmds.spark_run import DEFAULT_DRIVER_CORES_BY_SPARK from paasta_tools.cli.cmds.spark_run import DEFAULT_DRIVER_MEMORY_BY_SPARK @@ -30,9 +29,11 @@ from paasta_tools.cli.cmds.spark_run import get_smart_paasta_instance_name from paasta_tools.cli.cmds.spark_run import get_spark_app_name from paasta_tools.cli.cmds.spark_run import sanitize_container_name -from paasta_tools.cli.cmds.spark_run import should_enable_compact_bin_packing +from paasta_tools.utils import BranchDictV2 from paasta_tools.utils import InstanceConfig +from paasta_tools.utils import InstanceConfigDict from paasta_tools.utils import SystemPaastaConfig +from paasta_tools.utils import SystemPaastaConfigDict DUMMY_DOCKER_IMAGE_DIGEST = "MOCK-docker-dev.yelpcorp.com/paasta-spark-run-user@sha256:103ce91c65d42498ca61cdfe8d799fab8ab1c37dac58b743b49ced227bc7bc06" @@ -99,26 +100,6 @@ def test_sanitize_container_name(container_name, expected): assert sanitize_container_name(container_name) == expected -@pytest.mark.parametrize( - "disable_compact_bin_packing,cluster_manager,dir_access,expected", - [ - (False, CLUSTER_MANAGER_K8S, True, True), - (True, CLUSTER_MANAGER_K8S, True, False), - (True, CLUSTER_MANAGER_K8S, False, False), - ], -) -def test_should_enable_compact_bin_packing( - disable_compact_bin_packing, cluster_manager, dir_access, expected -): - with mock.patch("os.access", autospec=True, return_value=dir_access): - assert ( - should_enable_compact_bin_packing( - disable_compact_bin_packing, cluster_manager - ) - == expected - ) - - @pytest.fixture def mock_build_and_push_docker_image(): with mock.patch.object( @@ -202,12 +183,6 @@ def test_get_smart_paasta_instance_name_tron(): assert get_smart_paasta_instance_name(args) == "yelp-main.rm_rf_slash" -@pytest.fixture -def mock_create_spark_config_str(): - with mock.patch.object(spark_run, "create_spark_config_str") as m: - yield m - - @pytest.fixture def mock_get_possible_launced_by_user_variable_from_env(): with mock.patch.object( @@ -224,7 +199,6 @@ def mock_get_possible_launced_by_user_variable_from_env(): cmd="jupyter-lab", aws_region="test-region", mrjob=False, - use_eks_override=False, ), { "JUPYTER_RUNTIME_DIR": "/source/.jupyter", @@ -239,7 +213,6 @@ def mock_get_possible_launced_by_user_variable_from_env(): mrjob=False, spark_args="spark.history.fs.logDirectory=s3a://bucket", work_dir="/first:/second", - use_eks_override=False, ), { "SPARK_LOG_DIR": "/second", @@ -253,7 +226,6 @@ def mock_get_possible_launced_by_user_variable_from_env(): cmd="spark-submit job.py", aws_region="test-region", mrjob=True, - use_eks_override=False, ), {}, ), @@ -287,14 +259,15 @@ def test_get_spark_env( "PAASTA_LAUNCHED_BY": mock_get_possible_launced_by_user_variable_from_env.return_value, "PAASTA_INSTANCE_TYPE": "spark", "AWS_DEFAULT_REGION": "test-region", + "KUBECONFIG": "/etc/kubernetes/spark.conf", **extra_expected, **expected_aws, } - fake_system_paasta_config = ( - SystemPaastaConfig( - {"allowed_pools": {"test-cluster": ["test-pool", "fake-pool"]}}, - "fake_dir", + fake_system_paasta_config = SystemPaastaConfig( + SystemPaastaConfigDict( + {"allowed_pools": {"test-cluster": ["test-pool", "fake-pool"]}} ), + "fake_dir", ) assert ( spark_run.get_spark_env( @@ -309,91 +282,35 @@ def test_get_spark_env( @pytest.mark.parametrize( - "spark_args,enable_spark_dra,expected", + "spark_args,expected", [ ( "spark.cores.max=1 spark.executor.memory=24g", - False, {"spark.cores.max": "1", "spark.executor.memory": "24g"}, ), ( "spark.cores.max=1 spark.executor.memory=24g", - True, { "spark.cores.max": "1", "spark.executor.memory": "24g", - "spark.dynamicAllocation.enabled": "true", }, ), - ("spark.cores.max", False, None), - (None, False, {}), - (None, True, {"spark.dynamicAllocation.enabled": "true"}), + ("spark.cores.max", None), + (None, {}), ], ) -def test_parse_user_spark_args(spark_args, enable_spark_dra, expected, capsys): +def test_parse_user_spark_args(spark_args, expected, capsys): if expected is not None: - assert ( - spark_run._parse_user_spark_args( - spark_args, "unique-run", False, enable_spark_dra - ) - == expected - ) + assert spark_run._parse_user_spark_args(spark_args) == expected else: with pytest.raises(SystemExit): - spark_run._parse_user_spark_args( - spark_args, "unique-run", False, enable_spark_dra - ) + spark_run._parse_user_spark_args(spark_args) assert ( capsys.readouterr().err == "Spark option spark.cores.max is not in format option=value." ) -@pytest.mark.parametrize( - "args,system_paasta_config,expected", - [ - ( - # allowed_pools key has test-cluster and test-pool - argparse.Namespace(cluster="test-cluster", pool="test-pool"), - SystemPaastaConfig( - {"allowed_pools": {"test-cluster": ["test-pool", "fake-pool"]}}, - "fake_dir", - ), - True, - ), - ( - # allowed_pools key has test-cluster but doesn't have test-pool - argparse.Namespace(cluster="test-cluster", pool="test-pool"), - SystemPaastaConfig( - {"allowed_pools": {"test-cluster": ["fail-test-pool", "fake-pool"]}}, - "fake_dir", - ), - False, - ), - ( - # allowed_pools key doesn't have test-cluster - argparse.Namespace(cluster="test-cluster", pool="test-pool"), - SystemPaastaConfig( - {"allowed_pools": {"fail-test-cluster": ["test-pool", "fake-pool"]}}, - "fake_dir", - ), - True, - ), - ( - # allowed_pools key is not present - argparse.Namespace(cluster="test-cluster", pool="test-pool"), - SystemPaastaConfig( - {"fail_allowed_pools": {"test-cluster": ["test-pool", "fake-pool"]}}, - "fake_dir", - ), - True, - ), - ], -) -def test__validate_pool(args, system_paasta_config, expected): - assert spark_run._validate_pool(args, system_paasta_config) == expected - - @pytest.mark.parametrize("is_mrjob", [True, False]) def test_create_spark_config_str(is_mrjob): spark_opts = { @@ -401,7 +318,7 @@ def test_create_spark_config_str(is_mrjob): "spark.executor.memory": "4g", "spark.max.cores": "10", } - output = spark_run.create_spark_config_str(spark_opts, is_mrjob) + output = spark_tools.create_spark_config_str(spark_opts, is_mrjob) if is_mrjob: assert output == ( "--spark-master=mesos://some-host:5050 " @@ -423,13 +340,6 @@ def mock_get_docker_run_cmd(): yield m -@pytest.fixture -def mock_generate_pod_template_path(): - with mock.patch.object(spark_run, "generate_pod_template_path") as m: - m.return_value = "unique-run" - yield m - - @pytest.fixture def mock_os_execlpe(): with mock.patch("os.execlpe", autospec=True) as m: @@ -491,33 +401,31 @@ def test_run_docker_container( @mock.patch("paasta_tools.cli.cmds.spark_run.get_username", autospec=True) @mock.patch("paasta_tools.cli.cmds.spark_run.run_docker_container", autospec=True) @mock.patch("paasta_tools.cli.cmds.spark_run.get_webui_url", autospec=True) -@mock.patch("paasta_tools.cli.cmds.spark_run.create_spark_config_str", autospec=True) @mock.patch("paasta_tools.cli.cmds.spark_run.get_docker_cmd", autospec=True) -@mock.patch.object(spark_config.SparkConfBuilder(), "get_history_url", autospec=True) +@mock.patch("paasta_tools.cli.cmds.spark_run.create_spark_config_str", autospec=True) class TestConfigureAndRunDockerContainer: instance_config = InstanceConfig( cluster="fake_cluster", instance="fake_instance", service="fake_service", - config_dict={ - "extra_volumes": [{"hostPath": "/h1", "containerPath": "/c1", "mode": "RO"}] - }, - branch_dict={"docker_image": "fake_service:fake_sha"}, + config_dict=InstanceConfigDict( + { + "extra_volumes": [ + {"hostPath": "/h1", "containerPath": "/c1", "mode": "RO"} + ] + } + ), + branch_dict=BranchDictV2({"docker_image": "fake_service:fake_sha"}), ) system_paasta_config = SystemPaastaConfig( - {"volumes": [{"hostPath": "/h2", "containerPath": "/c2", "mode": "RO"}]}, + SystemPaastaConfigDict( + {"volumes": [{"hostPath": "/h2", "containerPath": "/c2", "mode": "RO"}]} + ), "fake_dir", ) - @pytest.fixture - def mock_create_spark_config_str(self): - with mock.patch( - "paasta_tools.cli.cmds.spark_run.create_spark_config_str", autospec=True - ) as _mock_create_spark_config_str: - yield _mock_create_spark_config_str - @pytest.mark.parametrize( ["cluster_manager", "spark_args_volumes", "expected_volumes"], [ @@ -549,9 +457,8 @@ def mock_create_spark_config_str(self): ) def test_configure_and_run_docker_container( self, - mock_get_history_url, - mock_get_docker_cmd, mock_create_spark_config_str, + mock_get_docker_cmd, mock_get_webui_url, mock_run_docker_container, mock_get_username, @@ -565,6 +472,7 @@ def test_configure_and_run_docker_container( "spark.ui.port": "1234", **spark_args_volumes, } + mock_create_spark_config_str.return_value = "testing spark opts string" mock_run_docker_container.return_value = 0 args = mock.MagicMock() @@ -580,7 +488,6 @@ def test_configure_and_run_docker_container( args.docker_cpu_limit = False args.docker_memory_limit = False args.docker_shm_size = False - args.use_eks_override = False args.tronfig = None args.job_id = None with mock.patch.object( @@ -597,11 +504,20 @@ def test_configure_and_run_docker_container( pod_template_path="unique-run", ) assert retcode == 0 + mock_create_spark_config_str.assert_called_once_with( + spark_config_dict=spark_conf, + is_mrjob=args.mrjob, + ) mock_run_docker_container.assert_called_once_with( container_name="fake_app", volumes=( expected_volumes - + ["/fake_dir:/spark_driver:rw", "/nail/home:/nail/home:rw"] + + [ + "/fake_dir:/spark_driver:rw", + "/nail/home:/nail/home:rw", + "unique-run:unique-run:rw", + "/etc/kubernetes/spark.conf:/etc/kubernetes/spark.conf:ro", + ] ), environment={ "env1": "val1", @@ -609,10 +525,11 @@ def test_configure_and_run_docker_container( "AWS_SECRET_ACCESS_KEY": "secret", "AWS_SESSION_TOKEN": "token", "AWS_DEFAULT_REGION": "fake_region", - "SPARK_OPTS": mock_create_spark_config_str.return_value, + "SPARK_OPTS": "testing spark opts string", "SPARK_USER": "root", "PAASTA_INSTANCE_TYPE": "spark", "PAASTA_LAUNCHED_BY": mock.ANY, + "KUBECONFIG": "/etc/kubernetes/spark.conf", }, docker_img="fake-registry/fake-service", docker_cmd=mock_get_docker_cmd.return_value, @@ -654,9 +571,8 @@ def test_configure_and_run_docker_container( ) def test_configure_and_run_docker_driver_resource_limits_config( self, - mock_get_history_url, - mock_get_docker_cmd, mock_create_spark_config_str, + mock_get_docker_cmd, mock_get_webui_url, mock_run_docker_container, mock_get_username, @@ -672,6 +588,7 @@ def test_configure_and_run_docker_driver_resource_limits_config( "spark.driver.cores": "2", **spark_args_volumes, } + mock_create_spark_config_str.return_value = "testing spark opts string" mock_run_docker_container.return_value = 0 args = mock.MagicMock() @@ -687,7 +604,6 @@ def test_configure_and_run_docker_driver_resource_limits_config( args.docker_cpu_limit = 3 args.docker_memory_limit = "4g" args.docker_shm_size = "1g" - args.use_eks_override = False with mock.patch.object( self.instance_config, "get_env_dictionary", return_value={"env1": "val1"} ): @@ -702,11 +618,20 @@ def test_configure_and_run_docker_driver_resource_limits_config( pod_template_path="unique-run", ) assert retcode == 0 + mock_create_spark_config_str.assert_called_once_with( + spark_config_dict=spark_conf, + is_mrjob=args.mrjob, + ) mock_run_docker_container.assert_called_once_with( container_name="fake_app", volumes=( expected_volumes - + ["/fake_dir:/spark_driver:rw", "/nail/home:/nail/home:rw"] + + [ + "/fake_dir:/spark_driver:rw", + "/nail/home:/nail/home:rw", + "unique-run:unique-run:rw", + "/etc/kubernetes/spark.conf:/etc/kubernetes/spark.conf:ro", + ] ), environment={ "env1": "val1", @@ -714,10 +639,11 @@ def test_configure_and_run_docker_driver_resource_limits_config( "AWS_SECRET_ACCESS_KEY": "secret", "AWS_SESSION_TOKEN": "token", "AWS_DEFAULT_REGION": "fake_region", - "SPARK_OPTS": mock_create_spark_config_str.return_value, + "SPARK_OPTS": "testing spark opts string", "SPARK_USER": "root", "PAASTA_INSTANCE_TYPE": "spark", "PAASTA_LAUNCHED_BY": mock.ANY, + "KUBECONFIG": "/etc/kubernetes/spark.conf", }, docker_img="fake-registry/fake-service", docker_cmd=mock_get_docker_cmd.return_value, @@ -759,9 +685,8 @@ def test_configure_and_run_docker_driver_resource_limits_config( ) def test_configure_and_run_docker_driver_resource_limits( self, - mock_get_history_url, - mock_get_docker_cmd, mock_create_spark_config_str, + mock_get_docker_cmd, mock_get_webui_url, mock_run_docker_container, mock_get_username, @@ -777,6 +702,7 @@ def test_configure_and_run_docker_driver_resource_limits( "spark.driver.cores": "2", **spark_args_volumes, } + mock_create_spark_config_str.return_value = "testing spark opts string" mock_run_docker_container.return_value = 0 args = mock.MagicMock() @@ -792,7 +718,6 @@ def test_configure_and_run_docker_driver_resource_limits( args.docker_cpu_limit = False args.docker_memory_limit = False args.docker_shm_size = False - args.use_eks_override = False with mock.patch.object( self.instance_config, "get_env_dictionary", return_value={"env1": "val1"} ): @@ -806,12 +731,22 @@ def test_configure_and_run_docker_driver_resource_limits( cluster_manager=cluster_manager, pod_template_path="unique-run", ) + assert retcode == 0 + mock_create_spark_config_str.assert_called_once_with( + spark_config_dict=spark_conf, + is_mrjob=args.mrjob, + ) mock_run_docker_container.assert_called_once_with( container_name="fake_app", volumes=( expected_volumes - + ["/fake_dir:/spark_driver:rw", "/nail/home:/nail/home:rw"] + + [ + "/fake_dir:/spark_driver:rw", + "/nail/home:/nail/home:rw", + "unique-run:unique-run:rw", + "/etc/kubernetes/spark.conf:/etc/kubernetes/spark.conf:ro", + ] ), environment={ "env1": "val1", @@ -819,10 +754,11 @@ def test_configure_and_run_docker_driver_resource_limits( "AWS_SECRET_ACCESS_KEY": "secret", "AWS_SESSION_TOKEN": "token", "AWS_DEFAULT_REGION": "fake_region", - "SPARK_OPTS": mock_create_spark_config_str.return_value, + "SPARK_OPTS": "testing spark opts string", "SPARK_USER": "root", "PAASTA_INSTANCE_TYPE": "spark", "PAASTA_LAUNCHED_BY": mock.ANY, + "KUBECONFIG": "/etc/kubernetes/spark.conf", }, docker_img="fake-registry/fake-service", docker_cmd=mock_get_docker_cmd.return_value, @@ -835,9 +771,8 @@ def test_configure_and_run_docker_driver_resource_limits( def test_configure_and_run_docker_container_nvidia( self, - mock_get_history_url, - mock_get_docker_cmd, mock_create_spark_config_str, + mock_get_docker_cmd, mock_get_webui_url, mock_run_docker_container, mock_get_username, @@ -872,9 +807,8 @@ def test_configure_and_run_docker_container_nvidia( def test_configure_and_run_docker_container_mrjob( self, - mock_get_history_url, - mock_get_docker_cmd, mock_create_spark_config_str, + mock_get_docker_cmd, mock_get_webui_url, mock_run_docker_container, mock_get_username, @@ -909,9 +843,8 @@ def test_configure_and_run_docker_container_mrjob( def test_dont_emit_metrics_for_inappropriate_commands( self, - mock_get_history_url, - mock_get_docker_cmd, mock_create_spark_config_str, + mock_get_docker_cmd, mock_get_webui_url, mock_run_docker_container, mock_get_username, @@ -996,13 +929,13 @@ def test_get_docker_cmd(args, instance_config, spark_conf_str, expected): @mock.patch.object(spark_run, "validate_work_dir", autospec=True) +@mock.patch.object(utils, "load_system_paasta_config", autospec=True) @mock.patch.object(spark_run, "load_system_paasta_config", autospec=True) @mock.patch.object(spark_run, "get_instance_config", autospec=True) @mock.patch.object(spark_run, "get_aws_credentials", autospec=True) @mock.patch.object(spark_run, "get_docker_image", autospec=True) @mock.patch.object(spark_run, "get_spark_app_name", autospec=True) @mock.patch.object(spark_run, "_parse_user_spark_args", autospec=True) -@mock.patch.object(spark_run, "should_enable_compact_bin_packing", autospec=True) @mock.patch( "paasta_tools.cli.cmds.spark_run.spark_config.SparkConfBuilder", autospec=True ) @@ -1012,15 +945,14 @@ def test_paasta_spark_run_bash( mock_get_smart_paasta_instance_name, mock_configure_and_run_docker_container, mock_spark_conf_builder, - mock_should_enable_compact_bin_packing, mock_parse_user_spark_args, mock_get_spark_app_name, mock_get_docker_image, mock_get_aws_credentials, mock_get_instance_config, - mock_load_system_paasta_config, + mock_load_system_paasta_config_spark_run, + mock_load_system_paasta_config_utils, mock_validate_work_dir, - mock_generate_pod_template_path, ): args = argparse.Namespace( work_dir="/tmp/local", @@ -1045,17 +977,26 @@ def test_paasta_spark_run_bash( force_spark_resource_configs=False, assume_aws_role=None, aws_role_duration=3600, - use_eks_override=False, k8s_server_address=None, tronfig=None, job_id=None, ) - mock_load_system_paasta_config.return_value.get_cluster_aliases.return_value = {} - mock_load_system_paasta_config.return_value.get_cluster_pools.return_value = { - "test-cluster": ["test-pool"] + mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = ( + {} + ) + mock_load_system_paasta_config_spark_run.return_value.get_cluster_aliases.return_value = ( + {} + ) + mock_load_system_paasta_config_spark_run.return_value.get_pools_for_cluster.return_value = [ + "test-pool" + ] + mock_load_system_paasta_config_spark_run.return_value.get_eks_cluster_aliases.return_value = { + "test-cluster": "test-cluster" } - mock_should_enable_compact_bin_packing.return_value = True mock_get_docker_image.return_value = DUMMY_DOCKER_IMAGE_DIGEST + mock_spark_conf_builder.return_value.get_spark_conf.return_value = { + "spark.kubernetes.executor.podTemplateFile": "/test/pod-template.yaml", + } spark_run.paasta_spark_run(args) mock_validate_work_dir.assert_called_once_with("/tmp/local") assert args.cmd == "/bin/bash" @@ -1068,7 +1009,6 @@ def test_paasta_spark_run_bash( ) mock_get_aws_credentials.assert_called_once_with( service="test-service", - no_aws_credentials=False, aws_credentials_yaml="/path/to/creds", profile_name=None, assume_aws_role_arn=None, @@ -1079,7 +1019,7 @@ def test_paasta_spark_run_bash( ) mock_get_spark_app_name.assert_called_once_with("/bin/bash") mock_parse_user_spark_args.assert_called_once_with( - "spark.cores.max=100 spark.executor.cores=10", "unique-run", True, False + "spark.cores.max=100 spark.executor.cores=10" ) mock_spark_conf_builder.return_value.get_spark_conf.assert_called_once_with( cluster_manager=spark_run.CLUSTER_MANAGER_K8S, @@ -1094,7 +1034,7 @@ def test_paasta_spark_run_bash( aws_creds=mock_get_aws_credentials.return_value, aws_region="test-region", force_spark_resource_configs=False, - use_eks=False, + use_eks=True, k8s_server_address=None, ) mock_spark_conf = mock_spark_conf_builder.return_value.get_spark_conf.return_value @@ -1102,24 +1042,24 @@ def test_paasta_spark_run_bash( args, docker_img=DUMMY_DOCKER_IMAGE_DIGEST, instance_config=mock_get_instance_config.return_value, - system_paasta_config=mock_load_system_paasta_config.return_value, + system_paasta_config=mock_load_system_paasta_config_spark_run.return_value, spark_conf=mock_spark_conf, aws_creds=mock_get_aws_credentials.return_value, cluster_manager=spark_run.CLUSTER_MANAGER_K8S, - pod_template_path="unique-run", + pod_template_path="/test/pod-template.yaml", extra_driver_envs=dict(), ) - mock_generate_pod_template_path.assert_called_once() @mock.patch.object(spark_run, "validate_work_dir", autospec=True) +@mock.patch.object(utils, "load_system_paasta_config", autospec=True) @mock.patch.object(spark_run, "load_system_paasta_config", autospec=True) @mock.patch.object(spark_run, "get_instance_config", autospec=True) @mock.patch.object(spark_run, "get_aws_credentials", autospec=True) @mock.patch.object(spark_run, "get_docker_image", autospec=True) @mock.patch.object(spark_run, "get_spark_app_name", autospec=True) +@mock.patch.object(spark_run, "auto_add_timeout_for_spark_job", autospec=True) @mock.patch.object(spark_run, "_parse_user_spark_args", autospec=True) -@mock.patch.object(spark_run, "should_enable_compact_bin_packing", autospec=True) @mock.patch( "paasta_tools.cli.cmds.spark_run.spark_config.SparkConfBuilder", autospec=True ) @@ -1129,15 +1069,15 @@ def test_paasta_spark_run( mock_get_smart_paasta_instance_name, mock_configure_and_run_docker_container, mock_spark_conf_builder, - mock_should_enable_compact_bin_packing, mock_parse_user_spark_args, + mock_auto_add_timeout_for_spark_job, mock_get_spark_app_name, mock_get_docker_image, mock_get_aws_credentials, mock_get_instance_config, - mock_load_system_paasta_config, + mock_load_system_paasta_config_spark_run, + mock_load_system_paasta_config_utils, mock_validate_work_dir, - mock_generate_pod_template_path, ): args = argparse.Namespace( work_dir="/tmp/local", @@ -1162,17 +1102,29 @@ def test_paasta_spark_run( force_spark_resource_configs=False, assume_aws_role=None, aws_role_duration=3600, - use_eks_override=False, k8s_server_address=None, tronfig=None, job_id=None, ) - mock_load_system_paasta_config.return_value.get_cluster_aliases.return_value = {} - mock_load_system_paasta_config.return_value.get_cluster_pools.return_value = { - "test-cluster": ["test-pool"] + mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = ( + {} + ) + mock_load_system_paasta_config_spark_run.return_value.get_cluster_aliases.return_value = ( + {} + ) + mock_load_system_paasta_config_spark_run.return_value.get_pools_for_cluster.return_value = [ + "test-pool" + ] + mock_load_system_paasta_config_spark_run.return_value.get_eks_cluster_aliases.return_value = { + "test-cluster": "test-cluster" } - mock_should_enable_compact_bin_packing.return_value = True mock_get_docker_image.return_value = DUMMY_DOCKER_IMAGE_DIGEST + mock_auto_add_timeout_for_spark_job.return_value = ( + "USER=test timeout 1m spark-submit test.py" + ) + mock_spark_conf_builder.return_value.get_spark_conf.return_value = { + "spark.kubernetes.executor.podTemplateFile": "/test/pod-template.yaml", + } spark_run.paasta_spark_run(args) mock_validate_work_dir.assert_called_once_with("/tmp/local") assert args.cmd == "USER=test timeout 1m spark-submit test.py" @@ -1185,7 +1137,6 @@ def test_paasta_spark_run( ) mock_get_aws_credentials.assert_called_once_with( service="test-service", - no_aws_credentials=False, aws_credentials_yaml="/path/to/creds", profile_name=None, assume_aws_role_arn=None, @@ -1196,7 +1147,7 @@ def test_paasta_spark_run( ) mock_get_spark_app_name.assert_called_once_with("USER=test spark-submit test.py") mock_parse_user_spark_args.assert_called_once_with( - "spark.cores.max=100 spark.executor.cores=10", "unique-run", True, True + "spark.cores.max=100 spark.executor.cores=10" ) mock_spark_conf_builder.return_value.get_spark_conf.assert_called_once_with( cluster_manager=spark_run.CLUSTER_MANAGER_K8S, @@ -1211,31 +1162,30 @@ def test_paasta_spark_run( aws_creds=mock_get_aws_credentials.return_value, aws_region="test-region", force_spark_resource_configs=False, - use_eks=False, + use_eks=True, k8s_server_address=None, ) mock_configure_and_run_docker_container.assert_called_once_with( args, docker_img=DUMMY_DOCKER_IMAGE_DIGEST, instance_config=mock_get_instance_config.return_value, - system_paasta_config=mock_load_system_paasta_config.return_value, + system_paasta_config=mock_load_system_paasta_config_spark_run.return_value, spark_conf=mock_spark_conf_builder.return_value.get_spark_conf.return_value, aws_creds=mock_get_aws_credentials.return_value, cluster_manager=spark_run.CLUSTER_MANAGER_K8S, - pod_template_path="unique-run", + pod_template_path="/test/pod-template.yaml", extra_driver_envs=dict(), ) - mock_generate_pod_template_path.assert_called_once() @mock.patch.object(spark_run, "validate_work_dir", autospec=True) +@mock.patch.object(utils, "load_system_paasta_config", autospec=True) @mock.patch.object(spark_run, "load_system_paasta_config", autospec=True) @mock.patch.object(spark_run, "get_instance_config", autospec=True) @mock.patch.object(spark_run, "get_aws_credentials", autospec=True) @mock.patch.object(spark_run, "get_docker_image", autospec=True) @mock.patch.object(spark_run, "get_spark_app_name", autospec=True) @mock.patch.object(spark_run, "_parse_user_spark_args", autospec=True) -@mock.patch.object(spark_run, "should_enable_compact_bin_packing", autospec=True) @mock.patch( "paasta_tools.cli.cmds.spark_run.spark_config.SparkConfBuilder", autospec=True ) @@ -1245,15 +1195,14 @@ def test_paasta_spark_run_pyspark( mock_get_smart_paasta_instance_name, mock_configure_and_run_docker_container, mock_spark_conf_builder, - mock_should_enable_compact_bin_packing, mock_parse_user_spark_args, mock_get_spark_app_name, mock_get_docker_image, mock_get_aws_credentials, mock_get_instance_config, - mock_load_system_paasta_config, + mock_load_system_paasta_config_spark_run, + mock_load_system_paasta_config_utils, mock_validate_work_dir, - mock_generate_pod_template_path, ): args = argparse.Namespace( work_dir="/tmp/local", @@ -1278,20 +1227,31 @@ def test_paasta_spark_run_pyspark( force_spark_resource_configs=False, assume_aws_role=None, aws_role_duration=3600, - use_eks_override=False, k8s_server_address=None, tronfig=None, job_id=None, ) - mock_load_system_paasta_config.return_value.get_spark_use_eks_default.return_value = ( + mock_load_system_paasta_config_utils.return_value.get_kube_clusters.return_value = ( + {} + ) + mock_load_system_paasta_config_spark_run.return_value.get_spark_use_eks_default.return_value = ( False ) - mock_load_system_paasta_config.return_value.get_cluster_aliases.return_value = {} - mock_load_system_paasta_config.return_value.get_cluster_pools.return_value = { - "test-cluster": ["test-pool"] + mock_load_system_paasta_config_spark_run.return_value.get_cluster_aliases.return_value = ( + {} + ) + mock_load_system_paasta_config_spark_run.return_value.get_pools_for_cluster.return_value = [ + "test-pool" + ] + mock_load_system_paasta_config_spark_run.return_value.get_eks_cluster_aliases.return_value = { + "test-cluster": "test-cluster" } mock_get_docker_image.return_value = DUMMY_DOCKER_IMAGE_DIGEST + mock_spark_conf_builder.return_value.get_spark_conf.return_value = { + "spark.kubernetes.executor.podTemplateFile": "/test/pod-template.yaml", + } + spark_run.paasta_spark_run(args) mock_validate_work_dir.assert_called_once_with("/tmp/local") assert args.cmd == "pyspark" @@ -1304,7 +1264,6 @@ def test_paasta_spark_run_pyspark( ) mock_get_aws_credentials.assert_called_once_with( service="test-service", - no_aws_credentials=False, aws_credentials_yaml="/path/to/creds", profile_name=None, assume_aws_role_arn=None, @@ -1313,15 +1272,9 @@ def test_paasta_spark_run_pyspark( mock_get_docker_image.assert_called_once_with( args, mock_get_instance_config.return_value ) - mock_should_enable_compact_bin_packing.assert_called_once_with( - False, spark_run.CLUSTER_MANAGER_K8S - ) mock_get_spark_app_name.assert_called_once_with("pyspark") mock_parse_user_spark_args.assert_called_once_with( "spark.cores.max=70 spark.executor.cores=10", - "unique-run", - mock_should_enable_compact_bin_packing.return_value, - False, ) mock_spark_conf_builder.return_value.get_spark_conf.assert_called_once_with( cluster_manager=spark_run.CLUSTER_MANAGER_K8S, @@ -1336,21 +1289,20 @@ def test_paasta_spark_run_pyspark( aws_creds=mock_get_aws_credentials.return_value, aws_region="test-region", force_spark_resource_configs=False, - use_eks=False, + use_eks=True, k8s_server_address=None, ) mock_configure_and_run_docker_container.assert_called_once_with( args, docker_img=DUMMY_DOCKER_IMAGE_DIGEST, instance_config=mock_get_instance_config.return_value, - system_paasta_config=mock_load_system_paasta_config.return_value, + system_paasta_config=mock_load_system_paasta_config_spark_run.return_value, spark_conf=mock_spark_conf_builder.return_value.get_spark_conf.return_value, aws_creds=mock_get_aws_credentials.return_value, cluster_manager=spark_run.CLUSTER_MANAGER_K8S, - pod_template_path="unique-run", + pod_template_path="/test/pod-template.yaml", extra_driver_envs=dict(), ) - mock_generate_pod_template_path.assert_called_once() @pytest.mark.parametrize( @@ -1370,29 +1322,6 @@ def test__should_get_resource_requirements(docker_cmd, is_mrjob, expected): assert _should_get_resource_requirements(docker_cmd, is_mrjob) is expected -@pytest.mark.parametrize( - "override,default,expected", - ( - (True, True, True), - (False, True, False), - (None, True, True), - (True, False, True), - (False, False, False), - (None, False, False), - ), -) -def test_decide_final_eks_toggle_state(override, default, expected): - with mock.patch( - "paasta_tools.cli.cmds.spark_run.load_system_paasta_config", - autospec=True, - ) as mock_load_system_paasta_config: - mock_load_system_paasta_config.return_value.get_spark_use_eks_default.return_value = ( - default - ) - - assert decide_final_eks_toggle_state(override) is expected - - @mock.patch.object(spark_run, "makefile_responds_to", autospec=True) @mock.patch.object(spark_run, "paasta_cook_image", autospec=True) @mock.patch.object(spark_run, "get_username", autospec=True) diff --git a/tests/test_tron_tools.py b/tests/test_tron_tools.py index 1f93e82311..d549d056ea 100644 --- a/tests/test_tron_tools.py +++ b/tests/test_tron_tools.py @@ -12,30 +12,35 @@ from paasta_tools.secret_tools import SHARED_SECRET_SERVICE from paasta_tools.tron_tools import MASTER_NAMESPACE from paasta_tools.tron_tools import MESOS_EXECUTOR_NAMES +from paasta_tools.tron_tools import TronActionConfigDict from paasta_tools.utils import CAPS_DROP from paasta_tools.utils import InvalidInstanceConfig from paasta_tools.utils import NoDeploymentsAvailable MOCK_SYSTEM_PAASTA_CONFIG = utils.SystemPaastaConfig( - { - "docker_registry": "mock_registry", - "volumes": [], - "dockercfg_location": "/mock/dockercfg", - "spark_k8s_role": "spark", - }, + utils.SystemPaastaConfigDict( + { + "docker_registry": "mock_registry", + "volumes": [], + "dockercfg_location": "/mock/dockercfg", + "spark_k8s_role": "spark", + } + ), "/mock/system/configs", ) MOCK_SYSTEM_PAASTA_CONFIG_OVERRIDES = utils.SystemPaastaConfig( - { - "docker_registry": "mock_registry", - "volumes": [], - "dockercfg_location": "/mock/dockercfg", - "tron_default_pool_override": "big_pool", - "tron_k8s_cluster_overrides": { - "paasta-dev-test": "paasta-dev", - }, - }, + utils.SystemPaastaConfigDict( + { + "docker_registry": "mock_registry", + "volumes": [], + "dockercfg_location": "/mock/dockercfg", + "tron_default_pool_override": "big_pool", + "tron_k8s_cluster_overrides": { + "paasta-dev-test": "paasta-dev", + }, + } + ), "/mock/system/configs", ) @@ -283,11 +288,13 @@ def test_get_action_config( ) mock_paasta_system_config = utils.SystemPaastaConfig( - config={ - "tron_k8s_cluster_overrides": { - "paasta-dev-test": "paasta-dev", + config=utils.SystemPaastaConfigDict( + { + "tron_k8s_cluster_overrides": { + "paasta-dev-test": "paasta-dev", + } } - }, + ), directory="/mock/system/configs", ) with mock.patch( @@ -920,8 +927,7 @@ def test_format_tron_action_dict_paasta(self): assert result["docker_image"] == expected_docker assert result["env"]["SHELL"] == "/bin/bash" - @mock.patch("paasta_tools.spark_tools.spark_config.SparkConfBuilder", autospec=True) - def test_format_tron_action_dict_spark(self, mock_spark_conf_builder): + def test_format_tron_action_dict_spark(self): action_dict = { "iam_role_provider": "aws", "iam_role": "arn:aws:iam::000000000000:role/some_role", @@ -952,7 +958,7 @@ def test_format_tron_action_dict_spark(self, mock_spark_conf_builder): } ], "extra_volumes": [ - {"containerPath": "/nail/tmp", "hostPath": "/nail/tmp", "mode": "RW"} + {"containerPath": "/nail/tmp", "hostPath": "/nail/tmp", "mode": "RW"}, ], "trigger_downstreams": True, "triggered_by": ["foo.bar.{shortdate}"], @@ -967,8 +973,8 @@ def test_format_tron_action_dict_spark(self, mock_spark_conf_builder): action_config = tron_tools.TronActionConfig( service="my_service", instance=tron_tools.compose_instance("my_job", "do_something"), - config_dict=action_dict, - branch_dict=branch_dict, + config_dict=TronActionConfigDict(action_dict), + branch_dict=utils.BranchDictV2(branch_dict), cluster="test-cluster", ) @@ -992,47 +998,138 @@ def test_format_tron_action_dict_spark(self, mock_spark_conf_builder): autospec=True, return_value="spark", ), mock.patch( - "paasta_tools.spark_tools.get_default_spark_configuration", + "paasta_tools.tron_tools.load_system_paasta_config", autospec=True, - return_value={ - "environments": { - "testenv": { - "account_id": 1, - "default_event_log_dir": "s3a://test", - "history_server": "yelp.com", - }, - }, - }, + return_value=MOCK_SYSTEM_PAASTA_CONFIG, ), mock.patch( - "paasta_tools.spark_tools.get_runtimeenv", + "paasta_tools.tron_tools.get_k8s_url_for_cluster", autospec=True, - return_value="testenv", + return_value="https://k8s.test-cluster.paasta:6443", ), mock.patch( - "paasta_tools.tron_tools.load_system_paasta_config", + "service_configuration_lib.spark_config._get_k8s_docker_volumes_conf", autospec=True, - return_value=MOCK_SYSTEM_PAASTA_CONFIG, + return_value={ + "spark.kubernetes.executor.volumes.hostPath.0.mount.path": "/nail/tmp", + "spark.kubernetes.executor.volumes.hostPath.0.options.path": "/nail/tmp", + "spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly": "false", + "spark.kubernetes.executor.volumes.hostPath.1.mount.path": "/etc/pki/spark", + "spark.kubernetes.executor.volumes.hostPath.1.options.path": "/etc/pki/spark", + "spark.kubernetes.executor.volumes.hostPath.1.mount.readOnly": "true", + "spark.kubernetes.executor.volumes.hostPath.2.mount.path": "/etc/passwd", + "spark.kubernetes.executor.volumes.hostPath.2.options.path": "/etc/passwd", + "spark.kubernetes.executor.volumes.hostPath.2.mount.readOnly": "true", + "spark.kubernetes.executor.volumes.hostPath.3.mount.path": "/etc/group", + "spark.kubernetes.executor.volumes.hostPath.3.options.path": "/etc/group", + "spark.kubernetes.executor.volumes.hostPath.3.mount.readOnly": "true", + }, + ), mock.patch( + "service_configuration_lib.spark_config.utils.load_spark_srv_conf", + autospec=True, + return_value=( + {}, + { + "target_mem_cpu_ratio": 7, + "resource_configs": { + "recommended": { + "cpu": 4, + "mem": 28, + }, + "medium": { + "cpu": 8, + "mem": 56, + }, + "max": { + "cpu": 12, + "mem": 110, + }, + }, + "cost_factor": { + "test-cluster": { + "test-pool": 100, + }, + "spark-pnw-prod": { + "batch": 0.041, + "stable_batch": 0.142, + }, + }, + "adjust_executor_res_ratio_thresh": 99999, + "default_resources_waiting_time_per_executor": 2, + "default_clusterman_observed_scaling_time": 15, + "high_cost_threshold_daily": 500, + "preferred_spark_ui_port_start": 39091, + "preferred_spark_ui_port_end": 39100, + "defaults": { + "spark.executor.cores": 4, + "spark.executor.instances": 2, + "spark.executor.memory": 28, + "spark.task.cpus": 1, + "spark.sql.shuffle.partitions": 128, + "spark.dynamicAllocation.executorAllocationRatio": 0.8, + "spark.dynamicAllocation.cachedExecutorIdleTimeout": "1500s", + "spark.yelp.dra.minExecutorRatio": 0.25, + }, + "mandatory_defaults": { + "spark.kubernetes.allocation.batch.size": 512, + "spark.kubernetes.decommission.script": "/opt/spark/kubernetes/dockerfiles/spark/decom.sh", + "spark.logConf": "true", + }, + }, + { + "spark.executor.cores": 4, + "spark.executor.instances": 2, + "spark.executor.memory": 28, + "spark.task.cpus": 1, + "spark.sql.shuffle.partitions": 128, + "spark.dynamicAllocation.executorAllocationRatio": 0.8, + "spark.dynamicAllocation.cachedExecutorIdleTimeout": "1500s", + "spark.yelp.dra.minExecutorRatio": 0.25, + }, + { + "spark.kubernetes.allocation.batch.size": 512, + "spark.kubernetes.decommission.script": "/opt/spark/kubernetes/dockerfiles/spark/decom.sh", + "spark.logConf": "true", + }, + { + "test-cluster": { + "test-pool": 100, + }, + "spark-pnw-prod": { + "batch": 0.041, + "stable_batch": 0.142, + }, + }, + ), ): - mock_spark_conf_builder.return_value._adjust_spark_requested_resources.return_value = { - "spark.executor.instances": "2", - "spark.kubernetes.executor.limit.cores": "2", - "spark.scheduler.maxRegisteredResourcesWaitingTime": "15min", - "spark.task.cpus": "1", - "spark.sql.shuffle.partitions": "12", - "spark.sql.files.minPartitionNum": "12", - "spark.default.parallelism": "12", - } result = tron_tools.format_tron_action_dict(action_config) - assert result == { - "command": "spark-submit " - "--conf spark.app.name=tron_spark_my_service_my_job.do_something " + confs = result["command"].split(" ") + spark_app_name = "" + spark_app_id = "" + for s in confs: + if s.startswith("spark.app.name"): + spark_app_name = s.split("=")[1] + if s.startswith("spark.app.id"): + spark_app_id = s.split("=")[1] + + expected = { + "command": "timeout 12h spark-submit " + "--conf spark.cores.max=4 " + "--conf spark.driver.memory=1g " + "--conf spark.executor.memory=1g " + "--conf spark.executor.cores=2 " + f"--conf spark.app.name={spark_app_name} " + f"--conf spark.app.id={spark_app_id} " + "--conf spark.ui.port=39091 " + "--conf spark.executor.instances=0 " + "--conf spark.kubernetes.executor.limit.cores=2 " + "--conf spark.scheduler.maxRegisteredResourcesWaitingTime=15min " + "--conf spark.task.cpus=1 " "--conf spark.master=k8s://https://k8s.test-cluster.paasta:6443 " "--conf spark.executorEnv.PAASTA_SERVICE=my_service " "--conf spark.executorEnv.PAASTA_INSTANCE=my_job.do_something " "--conf spark.executorEnv.PAASTA_CLUSTER=test-cluster " "--conf spark.executorEnv.PAASTA_INSTANCE_TYPE=spark " "--conf spark.executorEnv.SPARK_EXECUTOR_DIRS=/tmp " - "--conf spark.kubernetes.authenticate.driver.serviceAccountName=paasta--arn-aws-iam-000000000000-role-some-role--spark " "--conf spark.kubernetes.pyspark.pythonVersion=3 " "--conf spark.kubernetes.container.image=docker-registry.com:400/my_service:paasta-123abcde " "--conf spark.kubernetes.namespace=paasta-spark " @@ -1042,83 +1139,131 @@ def test_format_tron_action_dict_spark(self, mock_spark_conf_builder): "--conf spark.kubernetes.executor.label.paasta.yelp.com/service=my_service " "--conf spark.kubernetes.executor.label.paasta.yelp.com/instance=my_job.do_something " "--conf spark.kubernetes.executor.label.paasta.yelp.com/cluster=test-cluster " + "--conf spark.kubernetes.executor.label.spark.yelp.com/user=TRON " + "--conf spark.kubernetes.executor.label.spark.yelp.com/driver_ui_port=39091 " "--conf spark.kubernetes.node.selector.yelp.com/pool=special_pool " "--conf spark.kubernetes.executor.label.yelp.com/pool=special_pool " - "--conf spark.driver.host=$PAASTA_POD_IP " - # user args - "--conf spark.cores.max=4 " - "--conf spark.driver.memory=1g " - "--conf spark.executor.memory=1g " - "--conf spark.executor.cores=2 " - "--conf spark.ui.port=33000 " - "--conf spark.driver.port=33001 " - "--conf spark.blockManager.port=33002 " - "--conf spark.driver.blockManager.port=33002 " - "--conf spark.kubernetes.authenticate.executor.serviceAccountName=paasta--arn-aws-iam-000000000000-role-some-role " - # extra_volumes from config + "--conf spark.kubernetes.executor.label.paasta.yelp.com/pool=special_pool " + "--conf spark.kubernetes.executor.label.yelp.com/owner=core_ml " + "--conf spark.kubernetes.executor.podTemplateFile=/nail/srv/configs/spark_dns_pod_template.yaml " "--conf spark.kubernetes.executor.volumes.hostPath.0.mount.path=/nail/tmp " "--conf spark.kubernetes.executor.volumes.hostPath.0.options.path=/nail/tmp " "--conf spark.kubernetes.executor.volumes.hostPath.0.mount.readOnly=false " - # default spark volumes - "--conf spark.kubernetes.executor.volumes.hostPath.1.mount.path=/etc/passwd " - "--conf spark.kubernetes.executor.volumes.hostPath.1.options.path=/etc/passwd " + "--conf spark.kubernetes.executor.volumes.hostPath.1.mount.path=/etc/pki/spark " + "--conf spark.kubernetes.executor.volumes.hostPath.1.options.path=/etc/pki/spark " "--conf spark.kubernetes.executor.volumes.hostPath.1.mount.readOnly=true " - "--conf spark.kubernetes.executor.volumes.hostPath.2.mount.path=/etc/group " - "--conf spark.kubernetes.executor.volumes.hostPath.2.options.path=/etc/group " + "--conf spark.kubernetes.executor.volumes.hostPath.2.mount.path=/etc/passwd " + "--conf spark.kubernetes.executor.volumes.hostPath.2.options.path=/etc/passwd " "--conf spark.kubernetes.executor.volumes.hostPath.2.mount.readOnly=true " - # coreml adjustments - "--conf spark.executor.instances=2 " - "--conf spark.kubernetes.executor.limit.cores=2 " - "--conf spark.scheduler.maxRegisteredResourcesWaitingTime=15min " - "--conf spark.task.cpus=1 " + "--conf spark.kubernetes.executor.volumes.hostPath.3.mount.path=/etc/group " + "--conf spark.kubernetes.executor.volumes.hostPath.3.options.path=/etc/group " + "--conf spark.kubernetes.executor.volumes.hostPath.3.mount.readOnly=true " + "--conf spark.dynamicAllocation.enabled=true " + "--conf spark.dynamicAllocation.shuffleTracking.enabled=true " + "--conf spark.dynamicAllocation.executorAllocationRatio=0.8 " + "--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=1500s " + "--conf spark.dynamicAllocation.minExecutors=0 " + "--conf spark.dynamicAllocation.maxExecutors=2 " + "--conf spark.ui.prometheus.enabled=true " + "--conf spark.metrics.conf.*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet " + "--conf spark.metrics.conf.*.sink.prometheusServlet.path=/metrics/prometheus " + "--conf spark.eventLog.enabled=false " "--conf spark.sql.shuffle.partitions=12 " "--conf spark.sql.files.minPartitionNum=12 " "--conf spark.default.parallelism=12 " - "--conf spark.eventLog.enabled=true " - "--conf spark.eventLog.dir=s3a://test " - # actual script to run + "--conf spark.kubernetes.allocation.batch.size=512 " + "--conf spark.kubernetes.decommission.script=/opt/spark/kubernetes/dockerfiles/spark/decom.sh " + "--conf spark.logConf=true " + "--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider " + "--conf spark.driver.host=$PAASTA_POD_IP " + "--conf spark.kubernetes.authenticate.executor.serviceAccountName=paasta--arn-aws-iam-000000000000-role-some-role " "file://this/is/a_test.py", + "executor": "spark", "requires": ["required_action"], "retries": 2, "retries_delay": "5m", - "docker_image": mock.ANY, - "executor": "spark", - "cpus": 2, - "mem": 1200, - "disk": 42, - "env": mock.ANY, "secret_volumes": [ { "secret_volume_name": "tron-secret-my--service-secret1", "secret_name": "secret1", "container_path": "/b/c", - "default_mode": "0644", "items": [{"key": "secret1", "path": "abc"}], + "default_mode": "0644", } ], - "extra_volumes": [ - {"container_path": "/nail/tmp", "host_path": "/nail/tmp", "mode": "RW"} - ], "trigger_downstreams": True, "triggered_by": ["foo.bar.{shortdate}"], "trigger_timeout": "5m", + "service_account_name": "paasta--arn-aws-iam-000000000000-role-some-role", "secret_env": {}, "field_selector_env": {"PAASTA_POD_IP": {"field_path": "status.podIP"}}, - "service_account_name": "paasta--arn-aws-iam-000000000000-role-some-role--spark", - "node_selectors": {"yelp.com/pool": "special_pool"}, - "ports": [33000, 33001, 33002], + "env": { + "PAASTA_SERVICE": "my_service", + "PAASTA_INSTANCE": "my_job.do_something", + "PAASTA_CLUSTER": "test-cluster", + "PAASTA_DEPLOY_GROUP": "prod", + "PAASTA_DOCKER_IMAGE": "my_service:paasta-123abcde", + "PAASTA_RESOURCE_CPUS": "2", + "PAASTA_RESOURCE_MEM": "1200", + "PAASTA_RESOURCE_DISK": "42", + "PAASTA_GIT_SHA": "123abcde", + "PAASTA_INSTANCE_TYPE": "spark", + "SHELL": "/bin/bash", + "SPARK_USER": "root", + "STREAM_SUFFIX_LOGSPOUT": "spark", + "KUBECONFIG": "/etc/kubernetes/spark.conf", + }, + "node_selectors": {"yelp.com/pool": "stable"}, + "cap_add": [], + "cap_drop": [ + "SETPCAP", + "MKNOD", + "AUDIT_WRITE", + "CHOWN", + "NET_RAW", + "DAC_OVERRIDE", + "FOWNER", + "FSETID", + "KILL", + "SETGID", + "SETUID", + "NET_BIND_SERVICE", + "SYS_CHROOT", + "SETFCAP", + ], "labels": { "paasta.yelp.com/cluster": "test-cluster", - "paasta.yelp.com/instance": "my_job.do_something", - "paasta.yelp.com/pool": "special_pool", + "paasta.yelp.com/pool": "stable", "paasta.yelp.com/service": "my_service", + "paasta.yelp.com/instance": "my_job.do_something", "yelp.com/owner": "compute_infra_platform_experience", "app.kubernetes.io/managed-by": "tron", + "paasta.yelp.com/prometheus_shard": "ml-compute", + "spark.yelp.com/user": "TRON", + "spark.yelp.com/driver_ui_port": "39091", }, - "annotations": {"paasta.yelp.com/routable_ip": "true"}, - "cap_drop": CAPS_DROP, - "cap_add": [], + "annotations": { + "paasta.yelp.com/routable_ip": "true", + "prometheus.io/port": "39091", + "prometheus.io/path": "/metrics/prometheus", + }, + "extra_volumes": [ + {"container_path": "/nail/tmp", "host_path": "/nail/tmp", "mode": "RW"}, + { + "container_path": "/etc/kubernetes/spark.conf", + "host_path": "/etc/kubernetes/spark.conf", + "mode": "RO", + }, + ], + "ports": [39091], + "cpus": 2, + "mem": 1200, + "disk": 42, + "docker_image": "docker-registry.com:400/my_service:paasta-123abcde", } + + assert result == expected + expected_docker = "{}/{}".format( "docker-registry.com:400", branch_dict["docker_image"] ) diff --git a/tests/test_utils.py b/tests/test_utils.py index adb2db718c..c603c429e9 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -28,6 +28,9 @@ from pytest import raises from paasta_tools import utils +from paasta_tools.utils import PoolsNotConfiguredError +from paasta_tools.utils import SystemPaastaConfig +from paasta_tools.utils import SystemPaastaConfigDict def test_get_git_url_provided_by_serviceyaml(): @@ -2842,3 +2845,73 @@ def test_is_secrets_for_teams_enabled(): # if not present mock_read_extra_service_information.return_value = {"description": "something"} assert not utils.is_secrets_for_teams_enabled(service) + + +@pytest.mark.parametrize( + "cluster,pool,system_paasta_config,expected", + [ + ( + # allowed_pools key has test-cluster and test-pool + "test-cluster", + "test-pool", + SystemPaastaConfig( + SystemPaastaConfigDict( + {"allowed_pools": {"test-cluster": ["test-pool", "fake-pool"]}} + ), + "fake_dir", + ), + True, + ), + ( + # allowed_pools key has test-cluster but doesn't have test-pool + "test-cluster", + "test-pool", + SystemPaastaConfig( + SystemPaastaConfigDict( + {"allowed_pools": {"test-cluster": ["fail-test-pool", "fake-pool"]}} + ), + "fake_dir", + ), + False, + ), + ], +) +def test_validate_pool(cluster, pool, system_paasta_config, expected): + assert utils.validate_pool(cluster, pool, system_paasta_config) == expected + + +@pytest.mark.parametrize( + "cluster,pool,system_paasta_config", + [ + ( + # allowed_pools key doesn't have test-cluster + "test-cluster", + "test-pool", + SystemPaastaConfig( + SystemPaastaConfigDict( + {"allowed_pools": {"fail-test-cluster": ["test-pool", "fake-pool"]}} + ), + "fake_dir", + ), + ), + ( + # allowed_pools key is not present + "test-cluster", + "test-pool", + SystemPaastaConfig( + SystemPaastaConfigDict( + {"fail_allowed_pools": {"test-cluster": ["test-pool", "fake-pool"]}} # type: ignore + ), + "fake_dir", + ), + ), + ], +) +def test_validate_pool_error(cluster, pool, system_paasta_config): + assert pytest.raises( + PoolsNotConfiguredError, + utils.validate_pool, + cluster, + pool, + system_paasta_config, + )