Skip to content

Commit

Permalink
IMP pyspark2.4.0 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
qfliu committed Mar 15, 2019
1 parent 3f30071 commit c681c7f
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 42 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.7.0
* Migrate to spark 2.4.0
* Fix testing.DataType to use new convention to get field type

## 2.6.0
* Add argmax function to sparkly.functions

Expand Down
20 changes: 10 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,10 @@ LABEL maintainer="[email protected]"
# Install OpenJDK 8
RUN apt-get update && apt-get install -y default-jre

# Install Spark 1.6.2
# Install Spark 2.4.0
RUN apt-get update && apt-get install -y curl
RUN curl -s http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz | tar -xz -C /usr/local/
RUN cd /usr/local && ln -s spark-2.1.0-bin-hadoop2.7 spark

ENV SPARK_HOME "/usr/local/spark/"
ENV PYTHONPATH "/usr/local/spark/python/lib/pyspark.zip:/usr/local/spark/python/lib/py4j-0.10.4-src.zip:/opt/sparkly"
ENV SPARK_TESTING true
RUN curl -s https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz | tar -xz -C /usr/local/
RUN cd /usr/local && ln -s spark-2.4.0-bin-hadoop2.7 spark

# Install Python development & testing utils
RUN apt-get update && apt-get install -y python python-dev python3-pip
Expand All @@ -39,10 +35,10 @@ COPY spark.log4j.properties /usr/local/spark/conf/log4j.properties

# Make integration tests faster
RUN /usr/local/spark/bin/spark-shell --repositories=http://packages.confluent.io/maven/ --packages=\
datastax:spark-cassandra-connector:2.0.0-M2-s_2.11,\
datastax:spark-cassandra-connector:2.4.0-s_2.11,\
org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4,\
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,\
mysql:mysql-connector-java:5.1.39,\
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0,\
mysql:mysql-connector-java:6.0.6,\
io.confluent:kafka-avro-serializer:3.0.1

# Python env
Expand All @@ -55,6 +51,10 @@ RUN python3 -m pip install -r /tmp/requirements.txt
RUN python3 -m pip install -r /tmp/requirements_dev.txt
RUN python3 -m pip install -r /tmp/requirements_extras.txt

ENV SPARK_HOME "/usr/local/spark/"
ENV PYTHONPATH "/usr/local/spark/python/lib/pyspark.zip:/usr/local/spark/python/lib/py4j-0.10.7-src.zip:/opt/sparkly"
ENV SPARK_TESTING true

# Provision Sparkly
ADD . /opt/sparkly/
WORKDIR /opt/sparkly/
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
dev:
docker-compose build dev
docker-compose run dev bash
docker-compose down -v ; exit $$retcode

dist:
docker-compose build dev
Expand Down
12 changes: 7 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ Supported Spark Versions

At the moment we support:

+-------------------------------------------+
| sparkly 2.x | Spark 2.0.x and Spark 2.1.x |
+-------------------------------------------+
| sparkly 1.x | Spark 1.6.x |
+-------------------------------------------+
+---------------------------------------------------------------------------+
| sparkly >= 2.7 | Spark 2.4.x |
+---------------------------------------------------------------------------+
| sparkly 2.x | Spark 2.0.x and Spark 2.1.x and Spark 2.2.x |
+---------------------------------------------------------------------------+
| sparkly 1.x | Spark 1.6.x |
+---------------------------------------------------------------------------+

.. |Sparkly PyPi Version| image:: http://img.shields.io/pypi/v/sparkly.svg
:target: https://pypi.python.org/pypi/sparkly
Expand Down
16 changes: 9 additions & 7 deletions docs/source/reader_and_writer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ so you can keep your code agnostic to the storages you use.
Cassandra
---------

Sparkly relies on the official spark cassandra connector and was successfully tested in production using version `2.0.0-M2`.
Sparkly relies on the official spark cassandra connector and was successfully tested in production using version `2.4.0`.

+---------------+---------------------------------------------------------------------------------------+
| Package | https://spark-packages.org/package/datastax/spark-cassandra-connector |
+---------------+---------------------------------------------------------------------------------------+
| Configuration | https://github.com/datastax/spark-cassandra-connector/blob/v2.0.0-M2/doc/reference.md |
| Configuration | https://github.com/datastax/spark-cassandra-connector/blob/v2.4.0/doc/reference.md |
+---------------+---------------------------------------------------------------------------------------+

For using overwrite mode, it is needed to specify confirm.truncate as true. Otherwise, use append mode to update existing data.

.. code-block:: python
from sparkly import SparklySession
class MySession(SparklySession):
# Feel free to play with other versions
packages = ['datastax:spark-cassandra-connector:2.0.0-M2-s_2.11']
packages = ['datastax:spark-cassandra-connector:2.4.0-s_2.11']
spark = MySession()
Expand Down Expand Up @@ -78,9 +80,9 @@ The first one allows us to read data efficiently,
the second covers a lack of writing functionality in the official distribution.

+---------------+------------------------------------------------------------------------------------------+
| Package | https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0 |
| Package | https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0 |
+---------------+------------------------------------------------------------------------------------------+
| Configuration | http://spark.apache.org/docs/2.1.0/streaming-kafka-0-8-integration.html |
| Configuration | http://spark.apache.org/docs/2.4.0/streaming-kafka-0-8-integration.html |
+---------------+------------------------------------------------------------------------------------------+

.. note::
Expand All @@ -99,7 +101,7 @@ the second covers a lack of writing functionality in the official distribution.
class MySession(SparklySession):
packages = [
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0',
]
spark = MySession()
Expand Down Expand Up @@ -157,7 +159,7 @@ Basically, it's just a high level api on top of the native
class MySession(SparklySession):
# Feel free to play with other versions.
packages = ['mysql:mysql-connector-java:5.1.39']
packages = ['mysql:mysql-connector-java:6.0.6']
spark = MySession()
Expand Down
12 changes: 11 additions & 1 deletion sparkly/instant_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,16 @@ def set_context(cls, spark_context):
session_pid = os.getpid()

with open(cls.LOCK_FILE_PATH, 'w') as lock:
json.dump({'gateway_port': gateway_port, 'session_pid': session_pid}, lock)
json.dump(
{
'gateway_port': gateway_port,
'session_pid': session_pid,
'gateway_secret': getattr(
spark_context._gateway.gateway_parameters, 'auth_token', None,
),
},
lock,
)
logger.info(
'Successfully set spark context for the instant testing [pid=%s, gateway=%s]',
session_pid, gateway_port
Expand Down Expand Up @@ -143,6 +152,7 @@ def get_context(cls):
)

os.environ['PYSPARK_GATEWAY_PORT'] = str(state['gateway_port'])
os.environ['PYSPARK_GATEWAY_SECRET'] = str(state['gateway_secret'])
gateway = launch_gateway()
java_import(gateway.jvm, 'org.apache.spark.SparkContext')
jvm_spark_context = gateway.jvm.SparkContext.getOrCreate()
Expand Down
29 changes: 15 additions & 14 deletions sparkly/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,29 +307,30 @@ def __init__(self, dt, ignore_order_depth=0):
def _sort_structs(dt, ignore_order_depth):
if ignore_order_depth == 0:
return dt
if dt.typeName() == 'array':

if isinstance(dt, T.StructField):
return T.StructField(
dt.name,
_sort_structs(dt.dataType, ignore_order_depth),
nullable=ignore_nullability or dt.containsNull,
metadata=dt.metadata,
)
elif dt.typeName() == 'struct':
return T.StructType([
_sort_structs(f, ignore_order_depth - 1)
for f in sorted(dt.fields, key=lambda f: f.name)
])
elif dt.typeName() == 'array':
return T.ArrayType(
elementType=_sort_structs(dt.elementType, ignore_order_depth),
containsNull=ignore_nullability or dt.containsNull,
)
if dt.typeName() == 'map':
elif dt.typeName() == 'map':
return T.MapType(
keyType=_sort_structs(dt.keyType, ignore_order_depth),
valueType=_sort_structs(dt.valueType, ignore_order_depth),
valueContainsNull=ignore_nullability or dt.valueContainsNull,
)
if dt.typeName() == 'struct':
return T.StructType([
_sort_structs(f, ignore_order_depth - 1)
for f in sorted(dt.fields, key=lambda f: f.name)
])
if dt.typeName() == 'structf':
return T.StructField(
dt.name,
_sort_structs(dt.dataType, ignore_order_depth),
nullable=ignore_nullability or dt.nullable,
metadata=dt.metadata,
)
return dt

self._dt = _sort_structs(dt, ignore_order_depth)
Expand Down
8 changes: 5 additions & 3 deletions tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#

import os

from pyspark.sql.types import StringType

from sparkly import SparklySession
Expand All @@ -22,10 +24,10 @@

class SparklyTestSession(SparklySession):
packages = [
'datastax:spark-cassandra-connector:2.0.0-M2-s_2.11',
'datastax:spark-cassandra-connector:2.4.0-s_2.11',
'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0',
'mysql:mysql-connector-java:5.1.39',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0',
'mysql:mysql-connector-java:6.0.6',
'io.confluent:kafka-avro-serializer:3.0.1',
]

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_instant_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def test_set_context(self):
'gateway_port':
self.spark.sparkContext._gateway.java_gateway_server.getListeningPort(),
'session_pid': os.getpid(),
'gateway_secret': getattr(
self.spark.sparkContext._gateway.gateway_parameters, 'auth_token', None,
),
})

def test_get_context(self):
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def test_write_cassandra(self):
table='test_writer',
consistency='ONE',
mode='overwrite',
# overwrite would first perform truncation.
# Either change mode to 'append' to change data already in the table,
# or set confirm.truncate to true
options={'confirm.truncate': True},
)

written_df = self.spark.read_ext.by_url(
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ deps =
-rrequirements_dev.txt
-rrequirements_extras.txt
passenv = *
setenv =
PYTHONPATH = /usr/local/spark/python/lib/pyspark.zip:/usr/local/spark/python/lib/py4j-0.10.4-src.zip
setenv =
PYTHONPATH = /usr/local/spark/python/lib/pyspark.zip:/usr/local/spark/python/lib/py4j-0.10.7-src.zip

[testenv:no_extras]
commands = py.test tests/no_extras
Expand Down

0 comments on commit c681c7f

Please sign in to comment.