Skip to content

Commit

Permalink
FIX read and write from url with custom port (#65)
Browse files Browse the repository at this point in the history
Python built-in `urlparse` return `netloc` attribute with the port.
Commit changes resolver behavior for reader/writer for MySQL
and Cassandra from using `netloc` to `hostname` attribute.

ElasticSearch reader and writer hasn't affected as supports nodes
hostname with port specified[1].
[1] https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#cfg-network
  • Loading branch information
Oleksandr Kozlenko authored Jan 9, 2019
1 parent bcfa7e8 commit d74b090
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 2.5.1
* Fix port issue with reading and writing `by_url`. `urlparse` return `netloc` with port, which breaks read and write from MySQL and Cassandra.

## 2.5.0
* Add `port` argument to `CassandraFixture` and `MysqlFixture`
* Add `Content-Type` header to `ElasticFixture` to support ElasticSearch `6.x`
Expand Down
2 changes: 1 addition & 1 deletion sparkly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@
assert SparklySession


__version__ = '2.5.0'
__version__ = '2.5.1'
4 changes: 2 additions & 2 deletions sparkly/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def _basic_read(self, reader_options, additional_options, parallelism):

def _resolve_cassandra(self, parsed_url, parsed_qs):
return self.cassandra(
host=parsed_url.netloc,
host=parsed_url.hostname,
keyspace=parsed_url.path.split('/')[1],
table=parsed_url.path.split('/')[2],
consistency=parsed_qs.pop('consistency', None),
Expand Down Expand Up @@ -342,7 +342,7 @@ def _resolve_elastic(self, parsed_url, parsed_qs):

def _resolve_mysql(self, parsed_url, parsed_qs):
return self.mysql(
host=parsed_url.netloc,
host=parsed_url.hostname,
database=parsed_url.path.split('/')[1],
table=parsed_url.path.split('/')[2],
port=parsed_url.port,
Expand Down
4 changes: 2 additions & 2 deletions sparkly/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def _basic_write(self, writer_options, additional_options, parallelism, mode):

def _resolve_cassandra(self, parsed_url, parsed_qs):
return self.cassandra(
host=parsed_url.netloc,
host=parsed_url.hostname,
keyspace=parsed_url.path.split('/')[1],
table=parsed_url.path.split('/')[2],
consistency=parsed_qs.pop('consistency', None),
Expand Down Expand Up @@ -524,7 +524,7 @@ def _resolve_elastic(self, parsed_url, parsed_qs):

def _resolve_mysql(self, parsed_url, parsed_qs):
return self.mysql(
host=parsed_url.netloc,
host=parsed_url.hostname,
database=parsed_url.path.split('/')[1],
table=parsed_url.path.split('/')[2],
port=parsed_url.port,
Expand Down
33 changes: 33 additions & 0 deletions tests/unit/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ def test_cassandra(self):
options={'query.retry.count': '2'},
)

def test_cassandra_custom_port(self):
self.read_ext.cassandra = mock.Mock(return_value=self.fake_df)

df = self.read_ext.by_url('cassandra://localhost:19042/test_cf/test_table?'
'consistency=ONE&parallelism=8&query.retry.count=2')

self.assertEqual(df, self.fake_df)
self.read_ext.cassandra.assert_called_with(
host='localhost',
port=19042,
keyspace='test_cf',
table='test_table',
consistency='ONE',
parallelism=8,
options={'query.retry.count': '2'},
)

def test_mysql(self):
self.read_ext.mysql = mock.Mock(return_value=self.fake_df)

Expand All @@ -129,5 +146,21 @@ def test_mysql(self):
options={'user': 'root', 'password': 'pass'},
)

def test_mysql_custom_port(self):
self.read_ext.mysql = mock.Mock(return_value=self.fake_df)

df = self.read_ext.by_url('mysql://localhost:33306/test_database/test_table?'
'user=root&password=pass')

self.assertEqual(df, self.fake_df)
self.read_ext.mysql.assert_called_with(
host='localhost',
database='test_database',
table='test_table',
port=33306,
parallelism=None,
options={'user': 'root', 'password': 'pass'},
)

def test_unknown_format(self):
self.assertRaises(NotImplementedError, self.read_ext.by_url, 'fake://host')
33 changes: 33 additions & 0 deletions tests/unit/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ def test_cassandra(self):
options={},
)

def test_cassandra_custom_port(self):
self.write_ext.cassandra = mock.Mock()

self.write_ext.by_url(
'cassandra://host:19042/ks/cf?consistency=ONE&mode=overwrite&parallelism=10',
)

self.write_ext.cassandra.assert_called_once_with(
host='host',
keyspace='ks',
table='cf',
port=19042,
mode='overwrite',
consistency='ONE',
parallelism=10,
options={},
)

def test_elastic(self):
self.write_ext.elastic = mock.Mock()

Expand Down Expand Up @@ -106,3 +124,18 @@ def test_mysql(self):
parallelism=20,
options={},
)

def test_mysql_custom_port(self):
self.write_ext.mysql = mock.Mock()

self.write_ext.by_url('mysql://host:33306/db/table?parallelism=20')

self.write_ext.mysql.assert_called_with(
host='host',
database='db',
table='table',
port=33306,
mode=None,
parallelism=20,
options={},
)

0 comments on commit d74b090

Please sign in to comment.