Skip to content

Commit

Permalink
MLCOMPUTE-967 | changes for Spark drivers on k8s (#3679)
Browse files Browse the repository at this point in the history
* MLCOMPUTE-967 | add default iam role for Spark drivers on k8s

* MLCOMPUTE-967 | move default to SystemPaastaConfig

* MLCOMPUTE-967 | add volume mounts for kubeconfig

* MLCOMPUTE-967 | add spark to the list of executor types

* MLCOMPUTE-967 | use correct k8s url as spark master argument

* MLCOMPUTE-1057 | add flow to create Spark executor service account dynamically

* MLCOMPUTE-967 | use same IAM role for Spark drivers and executors

* MLCOMPUTE-1068 | logic for using duplicate iam role with eks suffix

* remove use_k8s deprecated flag (cherry-picked from 59beb52)

* Use stable pool for Spark driver

* MLCOMPUTE-1160 | consolidate tron and paasta logic to spark_tools, utils and service config lib

* MLCOMPUTE-1163 | handle space separated Spark args

* MLCOMPUTE-1119 | Support extra flags from paasta spark-run in tron_tools

* MLCOMPUTE-967 | mount pod template file for Spark executor

* Add pod labels for monitoring metrics

* Parse additional tronfig args

* MLCOMPUTE-967 | use default pod template without compact bin packing

* MLCOMPUTE-967 | bump service-configuration-lib version

* MLCOMPUTE-967 | updated timeout_spark regex acc to bash timeout, removed redundant spark ports assignment

* Specify prometheus scrape path

* Update driver pod annotations for metrics scraping

* MLCOMPUTE-967 | deprecate obsolete args, add mrjob capability to tron tools

* MLCOMPUTE-967 | remove obsolete args, sanitise code

* MLCOMPUTE-967 | fix tests

* MLCOMPUTE-967 | use already defined time_delta

---------

Co-authored-by: Sameer Sharma <[email protected]>
Co-authored-by: Jon Lee <[email protected]>
Co-authored-by: Chi Chang <[email protected]>
  • Loading branch information
4 people authored Mar 27, 2024
1 parent 5ab1808 commit 7135849
Show file tree
Hide file tree
Showing 11 changed files with 877 additions and 925 deletions.
445 changes: 103 additions & 342 deletions paasta_tools/cli/cmds/spark_run.py

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions paasta_tools/cli/schemas/tron_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@
"type": "string",
"pattern": "[-a-zA-z0-9_]"
},
"max_runtime": {
"$ref": "#definitions/time_delta"
},
"force_spark_resource_configs": {
"type": "boolean"
},
"mrjob": {
"type": "boolean"
},
"spark_args": {
"type": "object",
"additionalProperties": true,
Expand Down
3 changes: 2 additions & 1 deletion paasta_tools/kubernetes_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4008,6 +4008,7 @@ def create_or_find_service_account_name(
iam_role: str,
namespace: str,
k8s_role: Optional[str] = None,
kubeconfig_file: Optional[str] = None,
dry_run: bool = False,
) -> str:
# the service account is expected to always be prefixed with paasta- as using the actual namespace
Expand Down Expand Up @@ -4039,7 +4040,7 @@ def create_or_find_service_account_name(
if dry_run:
return sa_name

kube_client = KubeClient()
kube_client = KubeClient(config_file=kubeconfig_file)
if not any(
sa.metadata and sa.metadata.name == sa_name
for sa in get_all_service_accounts(kube_client, namespace)
Expand Down
221 changes: 104 additions & 117 deletions paasta_tools/spark_tools.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import copy
import logging
import re
import socket
import sys
from functools import lru_cache
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Mapping
from typing import Optional
from typing import Set

import yaml
from mypy_extensions import TypedDict
from service_configuration_lib import spark_config
from service_configuration_lib.spark_config import DEFAULT_SPARK_RUN_CONFIG

from paasta_tools.utils import DockerVolume
from paasta_tools.utils import get_runtimeenv
from paasta_tools.utils import PaastaColors

KUBERNETES_NAMESPACE = "paasta-spark"
DEFAULT_SPARK_SERVICE = "spark"
DEFAULT_SPARK_RUNTIME_TIMEOUT = "12h"
SPARK_AWS_CREDS_PROVIDER = "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
SPARK_EXECUTOR_NAMESPACE = "paasta-spark"
SPARK_DRIVER_POOL = "stable"
SPARK_JOB_USER = "TRON"
SPARK_PROMETHEUS_SHARD = "ml-compute"
SPARK_DNS_POD_TEMPLATE = "/nail/srv/configs/spark_dns_pod_template.yaml"

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,115 +54,10 @@ class SparkEnvironmentConfig(TypedDict):
)


@lru_cache(maxsize=1)
def get_default_spark_configuration() -> Optional[SparkEnvironmentConfig]:
"""
Read the globally distributed Spark configuration file and return the contents as a dictionary.
At the time this comment was written, the only bit of information that we care about from this file
is the default event log location on S3. See the TypedDict of the retval for the bits that we care about
"""
try:
with open(DEFAULT_SPARK_RUN_CONFIG, mode="r") as f:
return yaml.safe_load(f.read())
except OSError:
log.error(
f"Unable to open {DEFAULT_SPARK_RUN_CONFIG} and get default configuration values!"
)
except yaml.YAMLError:
log.error(
f"Unable to parse {DEFAULT_SPARK_RUN_CONFIG} and get default configuration values!"
)

return None


def get_webui_url(port: str) -> str:
return f"http://{socket.getfqdn()}:{port}"


def setup_event_log_configuration(spark_args: Dict[str, str]) -> Dict[str, str]:
"""
Adjusts user settings to provide a default event log storage path if event logging is
enabled but not configured.
If event logging is not enabled or is fully configured, this function will functionally noop.
"""
# don't enable event logging if explicitly disabled
if spark_args.get("spark.eventLog.enabled", "true") != "true":
# Note: we provide an empty dict as our return value as the expected
# usage of this function is something like CONF.update(setup_event_log_configuration(...))
# in in this case, we don't want to update the existing config
return {}

# user set an explicit event log location - there's nothing else for us to
# do here
if spark_args.get("spark.eventLog.dir") is not None:
# so, same as above, we return an empty dict so that there are no updates
return {}

default_spark_conf = get_default_spark_configuration()
if default_spark_conf is None:
log.error(
"Unable to access default Spark configuration, event log will be disabled"
)
# Note: we don't return an empty dict here since we want to make sure that our
# caller will overwrite the enabled option with our return value (see the first
# `if` block in this function for more details)
return {"spark.eventLog.enabled": "false"}

environment_config = default_spark_conf.get("environments", {}).get(
get_runtimeenv()
)
if environment_config is None:
log.error(
f"{get_runtimeenv()} not found in {DEFAULT_SPARK_RUN_CONFIG}, event log will be disabled"
)
return {"spark.eventLog.enabled": "false"}

return {
"spark.eventLog.enabled": "true",
"spark.eventLog.dir": environment_config["default_event_log_dir"],
}


def adjust_spark_resources(
spark_args: Dict[str, str], desired_pool: str
) -> Dict[str, str]:
"""
Wrapper around _adjust_spark_requested_resources from service_configuration_lib.
We have some code that will do some QoL translations from Mesos->K8s arguments as well
as set some more Yelpy defaults than what Spark uses.
"""
# TODO: would be nice if _adjust_spark_requested_resources only returned the stuff it
# modified
spark_conf_builder = spark_config.SparkConfBuilder()
return spark_conf_builder._adjust_spark_requested_resources(
# additionally, _adjust_spark_requested_resources modifies the dict you pass in
# so we make a copy to make things less confusing - consider dropping the
# service_configuration_lib dependency here so that we can do things in a slightly
# cleaner way
user_spark_opts=copy.copy(spark_args),
cluster_manager="kubernetes",
pool=desired_pool,
)


def setup_shuffle_partitions(spark_args: Dict[str, str]) -> Dict[str, str]:
"""
Wrapper around _append_sql_partitions_conf from service_configuration_lib.
For now, this really just sets a default number of partitions based on # of cores.
"""
# as above, this function also returns everything + mutates the passed in dictionary
# which is not ideal
spark_conf_builder = spark_config.SparkConfBuilder()
return spark_conf_builder._append_sql_partitions_conf(
spark_opts=copy.copy(spark_args),
)


def get_volumes_from_spark_mesos_configs(spark_conf: Mapping[str, str]) -> List[str]:
return (
spark_conf.get("spark.mesos.executor.docker.volumes", "").split(",")
Expand Down Expand Up @@ -253,10 +150,100 @@ def setup_volume_mounts(volumes: List[DockerVolume]) -> Dict[str, str]:
return conf


def inject_spark_conf_str(original_docker_cmd: str, spark_conf_str: str) -> str:
def create_spark_config_str(spark_config_dict: Dict[str, Any], is_mrjob: bool) -> str:
conf_option = "--jobconf" if is_mrjob else "--conf"
spark_config_entries = list()

if is_mrjob:
spark_master = spark_config_dict["spark.master"]
spark_config_entries.append(f"--spark-master={spark_master}")
spark_config_dict.pop("spark.master", None)

for opt, val in spark_config_dict.items():
# Process Spark configs with multiple space separated values to be in single quotes
if isinstance(val, str) and " " in val:
val = f"'{val}'"
spark_config_entries.append(f"{conf_option} {opt}={val}")
return " ".join(spark_config_entries)


def inject_spark_conf_str(original_cmd: str, spark_conf_str: str) -> str:
for base_cmd in ("pyspark", "spark-shell", "spark-submit"):
if base_cmd in original_docker_cmd:
return original_docker_cmd.replace(
base_cmd, base_cmd + " " + spark_conf_str, 1
if base_cmd in original_cmd:
return original_cmd.replace(base_cmd, base_cmd + " " + spark_conf_str, 1)
return original_cmd


def auto_add_timeout_for_spark_job(cmd: str, timeout_job_runtime: str) -> str:
# Timeout only to be added for spark-submit commands
# TODO: Add timeout for jobs using mrjob with spark-runner
if "spark-submit" not in cmd:
return cmd
try:
timeout_present = re.match(
r"^.*timeout[\s]+[\d]+[\.]?[\d]*[m|h][\s]+spark-submit .*$", cmd
)
if not timeout_present:
split_cmd = cmd.split("spark-submit")
# split_cmd[0] will always be an empty string or end with a space
cmd = f"{split_cmd[0]}timeout {timeout_job_runtime} spark-submit{split_cmd[1]}"
log.info(
PaastaColors.blue(
f"NOTE: Job will exit in given time {timeout_job_runtime}. "
f"Adjust timeout value using --timeout-job-timeout. "
f"New Updated Command with timeout: {cmd}"
),
)
return original_docker_cmd
except Exception as e:
err_msg = (
f"'timeout' could not be added to command: '{cmd}' due to error '{e}'. "
"Please report to #spark."
)
log.warn(err_msg)
print(PaastaColors.red(err_msg))
return cmd


def build_spark_command(
original_cmd: str,
spark_config_dict: Dict[str, Any],
is_mrjob: bool,
timeout_job_runtime: str,
) -> str:
command = f"{inject_spark_conf_str(original_cmd, create_spark_config_str(spark_config_dict, is_mrjob=is_mrjob))}"
return auto_add_timeout_for_spark_job(command, timeout_job_runtime)


def get_spark_ports_from_config(spark_conf: Dict[str, str]) -> List[int]:
ports = [int(v) for k, v in spark_conf.items() if k.endswith(".port")]
return ports


# TODO: Reuse by ad-hoc Spark-driver-on-k8s
def get_spark_driver_monitoring_annotations(
spark_config: Dict[str, str],
) -> Dict[str, str]:
"""
Returns Spark driver pod annotations - currently used for Prometheus metadata.
"""
ui_port_str = str(spark_config.get("spark.ui.port", ""))
annotations = {
"prometheus.io/port": ui_port_str,
"prometheus.io/path": "/metrics/prometheus",
}
return annotations


def get_spark_driver_monitoring_labels(
spark_config: Dict[str, str],
) -> Dict[str, str]:
"""
Returns Spark driver pod labels - generally for Prometheus metric relabeling.
"""
ui_port_str = str(spark_config.get("spark.ui.port", ""))
labels = {
"paasta.yelp.com/prometheus_shard": SPARK_PROMETHEUS_SHARD,
"spark.yelp.com/user": SPARK_JOB_USER,
"spark.yelp.com/driver_ui_port": ui_port_str,
}
return labels
Loading

0 comments on commit 7135849

Please sign in to comment.