From 848be64ed9766a2f3935561885e4af7bcfcc5cca Mon Sep 17 00:00:00 2001 From: Yannis Mantzouratos Date: Wed, 23 May 2018 18:20:47 -0700 Subject: [PATCH] FIX respect pyspark_submit_args SparklySession used to overwrite PYSPARK_SUBMIT_ARGS; instead we now append to it. Why: You want to customize your spark session in a way that depends on the hardware specifications of your worker (or driver) machine(s), so you'd rather define them close to where the actual machine specs are requested / defined. Or you just want to test some new configuration without having to change your code. In both cases, you can do so by using the environmental variable PYSPARK_SUBMIT_ARGS appropriately. This commit also includes the following fix: the order in which options were processed in the case of SparklySession.options and additional_options specifying conflicting values was arbitrary, as the option whose value was alphabetically greater was selected. We now force the following precedence order: PYSPARK_SUBMIT_ARGS (--conf) > additional_options > SparklySession.options --- CHANGELOG.md | 2 + docs/source/session.rst | 22 +++++++ sparkly/session.py | 40 +++++++----- tests/unit/test_session.py | 125 ++++++++++++++++++++++++++++++++----- 4 files changed, 160 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c34fa3..c3e2cb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## 2.4.0 +* Respect PYSPARK_SUBMIT_ARGS if it is already set by appending SparklySession related options at the end instead of overwriting. +* Fix additional_options to always override SparklySession.options when a session is initialized * Fix ujson dependency on environments where redis-py is already installed * Access or initialize SparklySession through get_or_create classmethod * Ammend `sparkly.functions.switch_case` to accept a user defined function for diff --git a/docs/source/session.rst b/docs/source/session.rst index de73956..215c256 100644 --- a/docs/source/session.rst +++ b/docs/source/session.rst @@ -123,6 +123,28 @@ Tuning options spark = MySession({'spark.sql.shuffle.partitions': 10}) +Tuning options through shell environment +---------------------------------------- + +**Why**: You want to customize your spark session in a way that depends on the +hardware specifications of your worker (or driver) machine(s), so you'd rather +define them close to where the actual machine specs are requested / defined. +Or you just want to test some new configuration without having to change your +code. In both cases, you can do so by using the environmental variable +``PYSPARK_SUBMIT_ARGS``. Note that any options defined this way will override +any conflicting options from your Python code. + +**For example**: + + - ``spark.executor.cores`` to tune the cores used by each executor; + - ``spark.executor.memory`` to tune the memory available to each executor. + +.. code-block:: sh + + PYSPARK_SUBMIT_ARGS='--conf "spark.executor.cores=32" --conf "spark.executor.memory=160g"' \ + ./my_spark_app.py + + Using UDFs ---------- diff --git a/sparkly/session.py b/sparkly/session.py index 02cf8ae..3530b30 100755 --- a/sparkly/session.py +++ b/sparkly/session.py @@ -18,7 +18,7 @@ import signal import sys -from pyspark import SparkConf, SparkContext +from pyspark import SparkContext from pyspark.sql import SparkSession from sparkly.catalog import SparklyCatalog @@ -28,7 +28,7 @@ class SparklySession(SparkSession): - """Wrapper around HiveContext to simplify definition of options, packages, JARs and UDFs. + """Wrapper around SparkSession to simplify definition of options, packages, JARs and UDFs. Example:: @@ -55,9 +55,11 @@ class MySession(sparkly.SparklySession): spark.read_ext.cassandra(...) Attributes: - options (dict[str,str]): Configuration options that are passed to SparkConf. + options (dict[str,str]): Configuration options that are passed to spark-submit. See `the list of possible options `_. + Note that any options set already through PYSPARK_SUBMIT_ARGS will override + these. repositories (list[str]): List of additional maven repositories for package lookup. packages (list[str]): Spark packages that should be installed. See https://spark-packages.org/ @@ -78,33 +80,32 @@ class MySession(sparkly.SparklySession): def __init__(self, additional_options=None): os.environ['PYSPARK_PYTHON'] = sys.executable + submit_args = [ + # options that were already defined through PYSPARK_SUBMIT_ARGS + # take precedence over SparklySession's + os.environ.get('PYSPARK_SUBMIT_ARGS', '').replace('pyspark-shell', ''), self._setup_repositories(), self._setup_packages(), self._setup_jars(), + self._setup_options(additional_options), 'pyspark-shell', ] os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args)) - def _create_spark_context(): - spark_conf = SparkConf() - spark_conf.set('spark.sql.catalogImplementation', 'hive') - spark_conf.setAll(self._setup_options(additional_options)) - return SparkContext(conf=spark_conf) - # If we are in instant testing mode if InstantTesting.is_activated(): spark_context = InstantTesting.get_context() # It's the first run, so we have to create context and demonise the process. if spark_context is None: - spark_context = _create_spark_context() + spark_context = SparkContext() if os.fork() == 0: # Detached process. signal.pause() else: InstantTesting.set_context(spark_context) else: - spark_context = _create_spark_context() + spark_context = SparkContext() # Init HiveContext super(SparklySession, self).__init__(spark_context) @@ -187,11 +188,22 @@ def _setup_jars(self): return '' def _setup_options(self, additional_options): - options = list(self.options.items()) + options = {} + + options.update(self.options) + if additional_options: - options += list(additional_options.items()) + options.update(additional_options) + + if 'spark.sql.catalogImplementation' not in options: + options['spark.sql.catalogImplementation'] = 'hive' - return sorted(options) + # Here we massage conf properties with the intent to pass them to + # spark-submit; this is convenient as it is unified with the approach + # we take for repos, packages and jars, and it also handles precedence + # of conf properties already defined by the user in a very + # straightforward way (since we always append to PYSPARK_SUBMIT_ARGS) + return ' '.join('--conf "{}={}"'.format(*o) for o in sorted(options.items())) def _setup_udfs(self): for name, defn in self.udfs.items(): diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 646427e..5c14639 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -21,19 +21,20 @@ except ImportError: import mock -from pyspark import SparkConf, SparkContext +from pyspark import SparkContext from sparkly import SparklySession class TestSparklySession(unittest.TestCase): + + maxDiff = None + def setUp(self): super(TestSparklySession, self).setUp() - self.spark_conf_mock = mock.Mock(spec=SparkConf) self.spark_context_mock = mock.Mock(spec=SparkContext) self.patches = [ - mock.patch('sparkly.session.SparkConf', self.spark_conf_mock), mock.patch('sparkly.session.SparkContext', self.spark_context_mock), ] [p.start() for p in self.patches] @@ -67,7 +68,11 @@ class _Session(SparklySession): self.assertEqual(os_mock.environ, { 'PYSPARK_PYTHON': sys.executable, - 'PYSPARK_SUBMIT_ARGS': '--packages package1,package2 pyspark-shell', + 'PYSPARK_SUBMIT_ARGS': ( + '--packages package1,package2 ' + '--conf "spark.sql.catalogImplementation=hive" ' + 'pyspark-shell' + ), }) @mock.patch('sparkly.session.os') @@ -85,9 +90,12 @@ class _Session(SparklySession): self.assertEqual(os_mock.environ, { 'PYSPARK_PYTHON': sys.executable, - 'PYSPARK_SUBMIT_ARGS': + 'PYSPARK_SUBMIT_ARGS': ( '--repositories http://my.maven.repo,http://another.maven.repo ' - '--packages package1,package2 pyspark-shell', + '--packages package1,package2 ' + '--conf "spark.sql.catalogImplementation=hive" ' + 'pyspark-shell' + ), }) @mock.patch('sparkly.session.os') @@ -101,23 +109,71 @@ class _Session(SparklySession): self.assertEqual(os_mock.environ, { 'PYSPARK_PYTHON': sys.executable, - 'PYSPARK_SUBMIT_ARGS': '--jars file_a.jar,file_b.jar pyspark-shell', + 'PYSPARK_SUBMIT_ARGS': ( + '--jars file_a.jar,file_b.jar ' + '--conf "spark.sql.catalogImplementation=hive" ' + 'pyspark-shell' + ), }) - def test_session_with_options(self): + @mock.patch('sparkly.session.os') + def test_session_with_options(self, os_mock): + os_mock.environ = {} + + # test options attached to class definition class _Session(SparklySession): options = { 'spark.option.a': 'value_a', 'spark.option.b': 'value_b', } - _Session(additional_options={'spark.option.c': 'value_c'}) + _Session() + + self.assertEqual(os_mock.environ, { + 'PYSPARK_PYTHON': sys.executable, + 'PYSPARK_SUBMIT_ARGS': ( + '--conf "spark.option.a=value_a" ' + '--conf "spark.option.b=value_b" ' + '--conf "spark.sql.catalogImplementation=hive" ' + 'pyspark-shell' + ), + }) + + # test additional_options override/extend options attached to class definition + os_mock.environ = {} + + _Session(additional_options={ + 'spark.option.b': 'value_0', + 'spark.option.c': 'value_c', + }) - self.spark_conf_mock.return_value.setAll.assert_called_once_with([ - ('spark.option.a', 'value_a'), - ('spark.option.b', 'value_b'), - ('spark.option.c', 'value_c'), - ]) + self.assertEqual(os_mock.environ, { + 'PYSPARK_PYTHON': sys.executable, + 'PYSPARK_SUBMIT_ARGS': ( + '--conf "spark.option.a=value_a" ' + '--conf "spark.option.b=value_0" ' + '--conf "spark.option.c=value_c" ' + '--conf "spark.sql.catalogImplementation=hive" ' + 'pyspark-shell' + ), + }) + + # test catalog implementation is respected + os_mock.environ = {} + + _Session.options = { + 'spark.sql.catalogImplementation': 'my_fancy_catalog', + } + + _Session() + + self.assertEqual(os_mock.environ, { + 'PYSPARK_PYTHON': sys.executable, + 'PYSPARK_SUBMIT_ARGS': ( + '--conf "spark.sql.catalogImplementation=my_fancy_catalog" ' + 'pyspark-shell' + ), + }) @mock.patch('sparkly.session.os') def test_session_without_packages_jars_and_options(self, os_mock): @@ -127,7 +183,46 @@ def test_session_without_packages_jars_and_options(self, os_mock): self.assertEqual(os_mock.environ, { 'PYSPARK_PYTHON': sys.executable, - 'PYSPARK_SUBMIT_ARGS': 'pyspark-shell', + 'PYSPARK_SUBMIT_ARGS': '--conf "spark.sql.catalogImplementation=hive" pyspark-shell', + }) + + @mock.patch('sparkly.session.os') + def test_session_appends_to_pyspark_submit_args(self, os_mock): + os_mock.environ = { + 'PYSPARK_SUBMIT_ARGS': '--conf "my.conf.here=5g" --and-other-properties', + } + + SparklySession() + + self.assertEqual(os_mock.environ, { + 'PYSPARK_PYTHON': sys.executable, + 'PYSPARK_SUBMIT_ARGS': ( + '--conf "my.conf.here=5g" --and-other-properties ' + '--conf "spark.sql.catalogImplementation=hive" ' + 'pyspark-shell' + ), + }) + + # test more complicated session + os_mock.environ = { + 'PYSPARK_SUBMIT_ARGS': '--conf "my.conf.here=5g" --and-other-properties', + } + + class _Session(SparklySession): + options = {'my.conf.here': '10g'} + + _Session() + + self.assertEqual(os_mock.environ, { + 'PYSPARK_PYTHON': sys.executable, + 'PYSPARK_SUBMIT_ARGS': ( + '--conf "my.conf.here=5g" --and-other-properties ' + # Note that spark honors the first conf it sees when multiple + # are defined + '--conf "my.conf.here=10g" ' + '--conf "spark.sql.catalogImplementation=hive" ' + 'pyspark-shell' + ), }) def test_broken_udf(self):