Skip to content

Commit

Permalink
IMP add port argument to fixtures and update ES (#64)
Browse files Browse the repository at this point in the history
- Add port argument to `CassandraFixture` and `MysqlFixture`
- Add `Content-Type` header to `ElasticFixture` to support ElasticSearch `6.x`
- Update `elasticsearch-hadoop` conector to `6.5.4`
- Update image tag for elasticsearch to `6.5.4` in docker-compose.yml
  • Loading branch information
Oleksandr Kozlenko authored Jan 8, 2019
1 parent ecbc6b9 commit bcfa7e8
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 14 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.5.0
* Add `port` argument to `CassandraFixture` and `MysqlFixture`
* Add `Content-Type` header to `ElasticFixture` to support ElasticSearch `6.x`
* Update `elasticsearch-hadoop` connector to `6.5.4`
* Update image tag for elasticsearch to `6.5.4`

## 2.4.1
* Fix write_ext.kafka: run foreachPartition instead of mapPartitions because returned value can cause spark.driver.maxResultSize excess

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ COPY spark.log4j.properties /usr/local/spark/conf/log4j.properties
# Make integration tests faster
RUN /usr/local/spark/bin/spark-shell --repositories=http://packages.confluent.io/maven/ --packages=\
datastax:spark-cassandra-connector:2.0.0-M2-s_2.11,\
org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1,\
org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4,\
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,\
mysql:mysql-connector-java:5.1.39,\
io.confluent:kafka-avro-serializer:3.0.1
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ and write its content to ElasticSearch index::
class MySession(SparklySession):
packages = [
'datastax:spark-cassandra-connector:2.0.0-M2-s_2.11',
'org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1',
'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4',
]

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ services:
test: ps ax | grep cassandra

elastic.docker:
image: elasticsearch:1.7.0
image: docker.elastic.co/elasticsearch/elasticsearch:6.5.4
healthcheck:
test: ps ax | grep elastic

Expand Down
6 changes: 3 additions & 3 deletions docs/source/reader_and_writer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Sparkly relies on the official spark cassandra connector and was successfully te
Elastic
-------

Sparkly relies on the official elastic spark connector and was successfully tested in production using version `5.1.1`.
Sparkly relies on the official elastic spark connector and was successfully tested in production using version `6.5.4`.

+---------------+-----------------------------------------------------------------------------+
| Package | https://spark-packages.org/package/elastic/elasticsearch-hadoop |
Expand All @@ -58,7 +58,7 @@ Sparkly relies on the official elastic spark connector and was successfully test
class MySession(SparklySession):
# Feel free to play with other versions
packages = ['org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1']
packages = ['org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4']
spark = MySession()
Expand Down Expand Up @@ -226,7 +226,7 @@ To solve the problem, we decided to add the universal api to read/write `DataFra
packages = [
'datastax:spark-cassandra-connector:1.6.1-s_2.10',
'com.databricks:spark-csv_2.10:1.4.0',
'org.elasticsearch:elasticsearch-spark_2.10:2.3.0',
'org.elasticsearch:elasticsearch-spark_2.10:6.5.4',
]
hc = MyContext()
Expand Down
2 changes: 1 addition & 1 deletion sparkly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
assert SparklySession


__version__ = '2.4.1'
__version__ = '2.5.0'
12 changes: 8 additions & 4 deletions sparkly/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,16 +583,17 @@ class CassandraFixture(Fixture):
...
"""

def __init__(self, host, setup_file, teardown_file):
def __init__(self, host, setup_file, teardown_file, port=9042):
if not CASSANDRA_FIXTURES_SUPPORT:
raise NotImplementedError('cassandra-driver package isn\'t available. '
'Use pip install sparkly[test] to fix it.')
self.host = host
self.port = port
self.setup_file = setup_file
self.teardown_file = teardown_file

def _execute(self, statements):
cluster = Cluster([self.host])
cluster = Cluster([self.host], port=self.port)
session = cluster.connect()
for statement in statements.split(';'):
if bool(statement.strip()):
Expand Down Expand Up @@ -669,8 +670,9 @@ def teardown_data(self):
)

def _request(self, method, url, body=None):
headers = {'Content-Type': 'application/json'}
connection = HTTPConnection(self.host, port=self.port)
connection.request(method, url, body)
connection.request(method, url, body, headers)
response = connection.getresponse()
if sys.version_info.major == 3:
code = response.code
Expand Down Expand Up @@ -698,11 +700,12 @@ class MysqlFixture(Fixture):
...
"""

def __init__(self, host, user, password=None, data=None, teardown=None):
def __init__(self, host, user, password=None, data=None, teardown=None, port=3306):
if not MYSQL_FIXTURES_SUPPORT:
raise NotImplementedError('PyMySQL package isn\'t available. '
'Use pip install sparkly[test] to fix it.')
self.host = host
self.port = port
self.user = user
self.password = password
self.data = data
Expand All @@ -713,6 +716,7 @@ def _execute(self, statements):
user=self.user,
password=self.password,
host=self.host,
port=self.port,
)
cursor = ctx.cursor()
cursor.execute(statements)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
class SparklyTestSession(SparklySession):
packages = [
'datastax:spark-cassandra-connector:2.0.0-M2-s_2.11',
'org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1',
'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4',
'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0',
'mysql:mysql-connector-java:5.1.39',
'io.confluent:kafka-avro-serializer:3.0.1',
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/resources/test_fixtures/mapping.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"properties": {
"name": {
"type": "string"
"type": "text"
},
"age": {
"type": "integer"
}
}
}
}

0 comments on commit bcfa7e8

Please sign in to comment.