- Local Spark Cluster Repository
This repository is designed to provide a Docker-based setup for running a Spark cluster, specifically tailored for executing PySpark scripts. Here’s a breakdown of its purpose:
Containerized Spark Environment: The repository uses Docker to create a self-contained Spark environment. This allows users to run Spark scripts locally without needing to install Spark and its dependencies directly on their machine.
Testing and Verification: The setup is intended for running small Spark scripts to test and verify their functionality. This is particularly useful for development and debugging purposes before deploying the scripts to a production environment.
local-spark-cluster/
├── .github/ # GitHub workflows and actions
│ └── workflows/
│ └── ci.yml # Continuous Integration workflow
├── data/ # Data storage
│ ├── input/ # Input data
│ └── output/ # Output data
├── src/ # Source code for Python scripts
│ ├── compute/ # Compute-related scripts and configurations
│ ├── jobs/ # Directory for user scripts
│ │ ├── script_template.py # Template for new scripts
│ │ └── example_script.py # Example Python script
├── vscode extension/ # Contains a vscode extension for spark submit
├── .gitignore # Git ignore file
├── docker-compose.yml # Docker Compose configuration
├── README.md # Project documentation
├── spark_cluster.bat # Batch script to manage Spark cluster
└── spark_cluster.sh # Bash script to manage Spark cluster
- Docker and Docker Compose installed on your machine.
- Basic understanding of Apache Spark and Python.
cd path/to/your_local_repository
path/to/your_local_repository
with your local repository path
You can start the Docker containers using either the Docker Compose command or the provided deployment scripts:
Linux/Mac:
./spark_cluster.sh deploy
Windows:
.\spark_cluster.bat deploy
docker compose up -d
Linux/Mac:
./spark_cluster.sh status
Windows:
.\spark_cluster.bat status
docker compose ps
✅If everything is working correctly, you should see the list of your running containers, showing the name, state (e.g., Up), and ports for each service.
For example:
Name Command State Ports
-----------------------------------------------------------------------------------------------
spark-cluster-spark-master_1 /opt/bitnami/scripts/ Up 0.0.0.0:8080->8080/tcp
spark-cluster-spark-worker-1 /opt/bitnami/scripts/ Up
This means that both the Spark master and worker nodes are running as expected and are accessible on their respective ports.
❌ If there are any issues, the State
column might show Exited
or Restarting
, indicating that one or more containers have failed to start or are repeatedly restarting.
For example:
Name Command State Ports
-----------------------------------------------------------------------------------------------
spark-cluster-spark-master_1 /opt/bitnami/scripts/ Exited 0.0.0.0:8080->8080/tcp
spark-cluster-spark-worker-1 /opt/bitnami/scripts/ Up
You can create and run your own PySpark jobs by placing them in the ./src/jobs/ folder.
To do so, follow the process outlined below.
To initialize a Spark session, import the session
function from the compute
module:
- In the
session
function, you need to provide a unique name for your Spark application. - This name will identify the application in the Spark UI and will also be used to define the input and output paths.
from compute import session
# Get the spark session and specify a name for your spark application
spark_session = session("YourSparkApplicationName")
Place your input files in ./data/input/YourSparkApplicationName.
To use your inputs, utilize the read
function, of the Spark DataFrame functionality:
Example:
# Example usage of read_dataframe function
df: DataFrame = spark_session.read.csv(
"data/input/PysparkWindow/fake_data",
header=True,
inferSchema=True,
)
After initializing the Spark session, you can write your Spark logic as needed, such as reading data, performing transformations, analyzing datasets etc...
To write your DataFrame output, use the write
function, of the Spark DataFrame functionality:
Exemple:
df.write.mode("overwrite") \
.format("parquet") \
.save("data/output/PysparkWindow/final")
If you're unsure how to structure your script, a template is available at ./src/jobs/script_classic.py
This template includes the basic setup for initializing a Spark session, reading input data, processing data and writing an output dataset.
You can run your spark jobs using either the Docker Compose command or the provided deployment jobs:
Linux/Mac:
./spark_cluster.sh run <script_classic.py>
Windows:
.\spark_cluster.bat run <script_classic.py>
or you ca use this button directly after installing the vscode extensions
![Run Script](vscode extension/source/button.png)
*script_classic.py
with the name of your python file job
docker exec -it spark-cluster-spark-master-1 /bin/bash
Use spark-submit
to execute your script:
spark-submit /src/jobs/script1.py
script1.py
with the name of your script.
Scripts are located in the /src/jobs/
directory within the container
You can access the Spark Master web UI to monitor your cluster and jobs by navigating to: http://localhost:8080.
This UI provides an overview of the Spark cluster, including the status of jobs, executors, and other resources.
For more details, refer to the official documentation: Spark Monitoring and Instrumentation
To view the history of completed Spark applications, you can access the Spark History Server at: http://localhost:18080.
This interface allows you to review the details of past Spark jobs, including execution times and resource usage.
For more details, refer to the official documentation: Spark History Server
When you are finished using the Spark cluster, you can stop all running containers.
You can stop hte PSark Cluster using either the Docker Compose command or the provided deployment scripts:
Linux/Mac:
./spark_cluster.sh stop
Windows:
.\spark_cluster.bat stop
docker compose down
These command will stop and remove the containers defined in the docker-compose.yaml.
The @comput
and @cluster_config
decorators are likely used to manage and configure the computational environment for running Spark jobs. Here's a detailed explanation of why these might be created:
The @compute
decorator is typically used to define a function or method that performs a specific computation. In the context of a Spark environment, this decorator might be used to:
Encapsulate Computation Logic: It helps in encapsulating the logic of a Spark job, making the code more modular and easier to manage. Check src/jobs/script_compute.py
Resource Management: It can handle the setup and teardown of resources needed for the computation, such as initializing Spark contexts and ensuring they are properly closed after the job is done.
Error Handling: It can provide a standardized way to handle errors that occur during the computation, ensuring that resources are cleaned up properly even if an error occurs.
ere’s a hypothetical example to illustrate how these decorators might be used: In this example:
from compute import cluster_config, compute, Input, Output
@compute(
input1=Input("path_to_input"),
output1=Output("path_to_output"),
)
The @cluster_config
is an optional decorator and is used to manage the configuration of the Spark cluster. This can include:
Cluster Settings: It allows you to specify settings for the Spark cluster, such as the number of nodes, memory allocation, and other cluster-specific parameters.
Environment Configuration: It can be used to set up environment variables and other configurations needed for the Spark jobs to run correctly.
Reusability: By using a decorator, you can easily apply the same configuration to multiple Spark jobs without duplicating code. Example Usage
Here’s a hypothetical example to illustrate how these decorators might be used: In this example:
from compute import cluster_config, compute, Input, Output
spark_sql_config = {
"spark.sql.shuffle.partitions": "200", # Number of partitions to use when shuffling data for joins or aggregations.
"spark.sql.execution.arrow.enabled": "true", # Enables Arrow optimization for Pandas UDFs.
"spark.sql.sources.partitionOverwriteMode": "dynamic", # Allows dynamic partition overwrite.
"spark.sql.adaptive.enabled": "true", # Enables adaptive query execution for better performance.
"spark.sql.catalogImplementation": "hive", # Uses Hive for catalog implementation.
"spark.sql.parquet.filterPushdown": "true", # Enables pushdown of filters for Parquet files.
"spark.sql.caseSensitive": "false", # Makes Spark SQL case insensitive.
"spark.sql.join.preferSortMergeJoin": "true", # Prefers sort-merge join strategy.
"spark.sql.files.maxPartitionBytes": "128MB", # Max size of a single file partition for input files.
}
@cluster_config(spark_sql_config) # configures the Spark cluster.
@compute(
input1=Input("path_to_input"),
output1=Output("path_to_output"),
)
@compute
handles the setup and teardown of the Spark session, as well as any error handling.
These decorators help in making the code more readable, maintainable, and reusable by abstracting away the boilerplate code associated with setting up and configuring Spark jobs.
As show in the example found in src/jobs/script_compute.py
, both decorator indicates clearly what is the spark configuration and what are my inputs and my outputs and where they will be stored.
In a glance, we can identify these information without digging into the code base.
Also with this wrapper we can have multiple inputs and multiple outputs.
The repository includes a comprehensive testing strategy to ensure the reliability and correctness of the Spark cluster setup and PySpark jobs. The testing strategy involves:
- Unit Tests:
- Located in the
tests/
directory. - Ensure individual components and functions work as expected.
- Use the
pytest
framework for writing and running tests.
- Located in the
The repository uses GitHub Actions for Continuous Integration (CI) to automate the testing process. The CI pipeline is defined in the .github/workflows/ci.yml
file and includes the following steps:
The main tools used are:
- ruff
- mypy
- pytest
- tox
This repository provides a Spark environment to run small PySpark jobs.
Feel free to add jobs in the /src/jobs/ directory as needed for your data processing needs.
For any questions or issues, please refer to the following resources:
- Apache Spark Documentation
- Docker Documentation
- Bitnami Spark Docker Documentation
- PySpark Documentation
For further assistance, feel free to contact us directly at