From bcfa7e81789a578844cb1eeb6a59f4187d1bccd1 Mon Sep 17 00:00:00 2001 From: Oleksandr Kozlenko Date: Tue, 8 Jan 2019 14:29:54 +0200 Subject: [PATCH] IMP add port argument to fixtures and update ES (#64) - Add port argument to `CassandraFixture` and `MysqlFixture` - Add `Content-Type` header to `ElasticFixture` to support ElasticSearch `6.x` - Update `elasticsearch-hadoop` conector to `6.5.4` - Update image tag for elasticsearch to `6.5.4` in docker-compose.yml --- CHANGELOG.md | 6 ++++++ Dockerfile | 2 +- README.rst | 2 +- docker-compose.yml | 2 +- docs/source/reader_and_writer.rst | 6 +++--- sparkly/__init__.py | 2 +- sparkly/testing.py | 12 ++++++++---- tests/integration/base.py | 2 +- .../integration/resources/test_fixtures/mapping.json | 4 ++-- 9 files changed, 24 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38b4147..a01b445 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 2.5.0 +* Add `port` argument to `CassandraFixture` and `MysqlFixture` +* Add `Content-Type` header to `ElasticFixture` to support ElasticSearch `6.x` +* Update `elasticsearch-hadoop` connector to `6.5.4` +* Update image tag for elasticsearch to `6.5.4` + ## 2.4.1 * Fix write_ext.kafka: run foreachPartition instead of mapPartitions because returned value can cause spark.driver.maxResultSize excess diff --git a/Dockerfile b/Dockerfile index 01ca909..a0f5248 100644 --- a/Dockerfile +++ b/Dockerfile @@ -40,7 +40,7 @@ COPY spark.log4j.properties /usr/local/spark/conf/log4j.properties # Make integration tests faster RUN /usr/local/spark/bin/spark-shell --repositories=http://packages.confluent.io/maven/ --packages=\ datastax:spark-cassandra-connector:2.0.0-M2-s_2.11,\ -org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1,\ +org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4,\ org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,\ mysql:mysql-connector-java:5.1.39,\ io.confluent:kafka-avro-serializer:3.0.1 diff --git a/README.rst b/README.rst index d23b9f7..c86234e 100644 --- a/README.rst +++ b/README.rst @@ -47,7 +47,7 @@ and write its content to ElasticSearch index:: class MySession(SparklySession): packages = [ 'datastax:spark-cassandra-connector:2.0.0-M2-s_2.11', - 'org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1', + 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4', ] diff --git a/docker-compose.yml b/docker-compose.yml index 27446cb..1ffbcdc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,7 +57,7 @@ services: test: ps ax | grep cassandra elastic.docker: - image: elasticsearch:1.7.0 + image: docker.elastic.co/elasticsearch/elasticsearch:6.5.4 healthcheck: test: ps ax | grep elastic diff --git a/docs/source/reader_and_writer.rst b/docs/source/reader_and_writer.rst index 110ea6d..3f40f53 100644 --- a/docs/source/reader_and_writer.rst +++ b/docs/source/reader_and_writer.rst @@ -43,7 +43,7 @@ Sparkly relies on the official spark cassandra connector and was successfully te Elastic ------- -Sparkly relies on the official elastic spark connector and was successfully tested in production using version `5.1.1`. +Sparkly relies on the official elastic spark connector and was successfully tested in production using version `6.5.4`. +---------------+-----------------------------------------------------------------------------+ | Package | https://spark-packages.org/package/elastic/elasticsearch-hadoop | @@ -58,7 +58,7 @@ Sparkly relies on the official elastic spark connector and was successfully test class MySession(SparklySession): # Feel free to play with other versions - packages = ['org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1'] + packages = ['org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4'] spark = MySession() @@ -226,7 +226,7 @@ To solve the problem, we decided to add the universal api to read/write `DataFra packages = [ 'datastax:spark-cassandra-connector:1.6.1-s_2.10', 'com.databricks:spark-csv_2.10:1.4.0', - 'org.elasticsearch:elasticsearch-spark_2.10:2.3.0', + 'org.elasticsearch:elasticsearch-spark_2.10:6.5.4', ] hc = MyContext() diff --git a/sparkly/__init__.py b/sparkly/__init__.py index baa44e3..6aa8eb8 100644 --- a/sparkly/__init__.py +++ b/sparkly/__init__.py @@ -19,4 +19,4 @@ assert SparklySession -__version__ = '2.4.1' +__version__ = '2.5.0' diff --git a/sparkly/testing.py b/sparkly/testing.py index 8532e47..05c5075 100644 --- a/sparkly/testing.py +++ b/sparkly/testing.py @@ -583,16 +583,17 @@ class CassandraFixture(Fixture): ... """ - def __init__(self, host, setup_file, teardown_file): + def __init__(self, host, setup_file, teardown_file, port=9042): if not CASSANDRA_FIXTURES_SUPPORT: raise NotImplementedError('cassandra-driver package isn\'t available. ' 'Use pip install sparkly[test] to fix it.') self.host = host + self.port = port self.setup_file = setup_file self.teardown_file = teardown_file def _execute(self, statements): - cluster = Cluster([self.host]) + cluster = Cluster([self.host], port=self.port) session = cluster.connect() for statement in statements.split(';'): if bool(statement.strip()): @@ -669,8 +670,9 @@ def teardown_data(self): ) def _request(self, method, url, body=None): + headers = {'Content-Type': 'application/json'} connection = HTTPConnection(self.host, port=self.port) - connection.request(method, url, body) + connection.request(method, url, body, headers) response = connection.getresponse() if sys.version_info.major == 3: code = response.code @@ -698,11 +700,12 @@ class MysqlFixture(Fixture): ... """ - def __init__(self, host, user, password=None, data=None, teardown=None): + def __init__(self, host, user, password=None, data=None, teardown=None, port=3306): if not MYSQL_FIXTURES_SUPPORT: raise NotImplementedError('PyMySQL package isn\'t available. ' 'Use pip install sparkly[test] to fix it.') self.host = host + self.port = port self.user = user self.password = password self.data = data @@ -713,6 +716,7 @@ def _execute(self, statements): user=self.user, password=self.password, host=self.host, + port=self.port, ) cursor = ctx.cursor() cursor.execute(statements) diff --git a/tests/integration/base.py b/tests/integration/base.py index e57dd3f..0072741 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -23,7 +23,7 @@ class SparklyTestSession(SparklySession): packages = [ 'datastax:spark-cassandra-connector:2.0.0-M2-s_2.11', - 'org.elasticsearch:elasticsearch-spark-20_2.11:5.1.1', + 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4', 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0', 'mysql:mysql-connector-java:5.1.39', 'io.confluent:kafka-avro-serializer:3.0.1', diff --git a/tests/integration/resources/test_fixtures/mapping.json b/tests/integration/resources/test_fixtures/mapping.json index d991f16..b217396 100644 --- a/tests/integration/resources/test_fixtures/mapping.json +++ b/tests/integration/resources/test_fixtures/mapping.json @@ -1,10 +1,10 @@ { "properties": { "name": { - "type": "string" + "type": "text" }, "age": { "type": "integer" } } -} \ No newline at end of file +}