Skip to content

Commit

Permalink
Update pyspark_pi.py (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandraGH5 authored Jun 4, 2021
1 parent 1d16311 commit 45c019b
Showing 1 changed file with 95 additions and 7 deletions.
102 changes: 95 additions & 7 deletions cookbook/integrations/kubernetes/k8s_spark/pyspark_pi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,96 @@
"""
.. _intermediate_using_spark_tasks:
Creating spark tasks as part of your workflow OR running spark jobs
------------------------------------------------------------------------
Spark Tasks
------------
Flyte has an optional plugin that makes it possible to run `Apache Spark <https://spark.apache.org/>`_ jobs natively on your kubernetes cluster. This plugin has been used extensively at Lyft and is battle tested.
It makes it extremely easy to run your pyspark (coming soon to scala/java) code as a task. The plugin creates a new virtual cluster for the spark execution dynamically and Flyte will manage the execution, auto-scaling
for the spark job.
.. NOTE::
This has been tested at scale and more than 100k Spark Jobs run through Flyte at Lyft. This still needs a large capacity on Kubernetes and careful configuration.
We recommend using Multi-cluster mode - :ref:`howto-multi-cluster`, and enabling :ref:`howto-resource-quota` for large and extremely frequent Spark Jobs.
For extremely short running jobs, this is still not a recommended approach, and it might be better to use a pre-spawned cluster.
Why Use K8s Spark?
===================
Managing Python dependencies is hard. Flyte makes it easy to version and manage dependencies using containers. The K8s Spark plugin brings all the benefits of containerization
to spark without needing to manage special spark clusters.
Pros:
------
#. Extremely easy to get started and get complete isolation between workloads
#. Every job runs in isolation and has its own virtual cluster - no more nightmarish dependency management
#. Flyte manages everything for you!
Cons:
-----
#. Short running, bursty jobs are not a great fit - because of the container overhead
#. No interactive spark capabilities available with Flyte K8s spark, which is more suited for running, adhoc and/or scheduled jobs
How to Enable Spark in the Flyte Backend
========================================
Flyte Spark uses the `Spark On K8s Operator <https://github.com/GoogleCloudPlatform/spark-on-k8s-operator>`_ and a custom built `Flyte Spark Plugin <https://pkg.go.dev/github.com/flyteorg/[email protected]/go/tasks/plugins/k8s/spark>`_.
The plugin is a backend plugin which has to be enabled in your deployment. To enable a plugin follow the steps in :ref:`howto-enable-backend-plugins`.
You can optionally configure the Plugin as per the - `backend Config Structure <https://pkg.go.dev/github.com/flyteorg/[email protected]/go/tasks/plugins/k8s/spark#Config>`_ and an example Config is defined
`here <https://github.com/flyteorg/flyte/blob/master/kustomize/overlays/sandbox/config/propeller/plugins/spark.yaml>`_, which looks like:
.. rli:: https://raw.githubusercontent.com/flyteorg/flyte/master/kustomize/overlays/sandbox/config/propeller/plugins/spark.yaml
:language: yaml
Spark in Flytekit
=================
For a more complete example refer to the :std:ref:`User Guide <cookbook:sphx_glr_auto_integrations_kubernetes_k8s_spark>`
#. Ensure you have ``flytekit>=0.16.0``
#. Enable Spark in backend, following the previous section.
#. Install the `flytekit spark plugin <https://pypi.org/project/flytekitplugins-spark/>`_ ::
pip install flytekitplugins-spark
#. Write regular pyspark code - with one change in ``@task`` decorator. Refer to the example below:
.. code-block:: python
@task(
task_config=Spark(
# this configuration is applied to the spark cluster
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
cache_version="1",
cache=True,
)
def hello_spark(partitions: int) -> float:
...
sess = flytekit.current_context().spark_session
# Regular Pypsark code
...
#. Run it locally
.. code-block:: python
hello_spark(partitions=10)
#. Use it in a workflow (check cookbook)
#. Run it on a remote cluster - To do this, you have to build the correct dockerfile, as explained here :std:ref:`spark-docker-image`. You can also you the `Standard Dockerfile recommended by Spark <https://github.com/apache/spark/blob/master/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile#L22>`_.
Examples
========
How Flytekit Simplifies Usage of Pyspark in a Users Code
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This example shows how flytekit simplifies usage of pyspark in a users code.
The task ``hello_spark`` runs a new spark cluster, which when run locally runs a single node client only cluster,
but when run remote spins up a arbitrarily sized cluster depending on the specified spark configuration. ``spark_conf``
Expand All @@ -19,12 +105,14 @@
from flytekit import task, workflow

# %%
# The follow import is required to configure a Spark Server in Flyte.
# The following import is required to configure a Spark Server in Flyte:
from flytekitplugins.spark import Spark


# %%
# Spark Task sample. This example shows how a spark task can be written simply by adding a ``@task(task_config=Spark(...)...)`` decorator.
# Spark Task Sample
# ^^^^^^^^^^^^^^^^^^
# This example shows how a spark task can be written simply by adding a ``@task(task_config=Spark(...)...)`` decorator.
# Refer to :py:class:`flytekit.Spark` class to understand the various configuration options.
@task(
task_config=Spark(
Expand Down Expand Up @@ -67,7 +155,7 @@ def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -


# %%
# The Workflow shows that a spark task and any python function (or any other task type) can be chained together as long as they match the parameter specifications
# The Workflow shows that a spark task and any python function (or any other task type) can be chained together as long as they match the parameter specifications.
@workflow
def my_spark(triggered_date: datetime.datetime) -> float:
"""
Expand All @@ -80,7 +168,7 @@ def my_spark(triggered_date: datetime.datetime) -> float:


# %%
# Workflows with spark tasks can be executed locally. Some aspects of spark, like links to plugins_hive metastores etc may not work, but these are limitations of using Spark and are not introduced by Flyte.
# Workflows with spark tasks can be executed locally. Some aspects of spark, like links to plugins_hive metastores may not work, but these are limitations of using Spark and are not introduced by Flyte.
if __name__ == "__main__":
"""
NOTE: To run a multi-image workflow locally, all dependencies of all the tasks should be installed, ignoring which
Expand Down

0 comments on commit 45c019b

Please sign in to comment.