From ae6be0cfbb873fb841e1f5e88f2af8309f6f2bc2 Mon Sep 17 00:00:00 2001 From: Chenhui Wang Date: Tue, 14 Nov 2023 22:30:18 +0800 Subject: [PATCH 1/2] Add meta methods to ConnectorIndex --- connectors/protocol/connectors.py | 10 +++++++ tests/protocol/test_connectors.py | 49 +++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index e4074adba..cf836a443 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -188,6 +188,16 @@ async def all_connectors(self): async for connector in self.get_all_docs(): yield connector + async def meta(self): + response = await self.client.indices.get_mapping(index=self.index_name) + latest_index = max(response.keys()) + return response[latest_index].get("mappings", {}).get("_meta") + + async def update_meta(self, meta): + await self.client.indices.put_mapping( + index=self.index_name, meta=meta, write_index_only=True + ) + def filter_ingestion_stats(ingestion_stats): if ingestion_stats is None: diff --git a/tests/protocol/test_connectors.py b/tests/protocol/test_connectors.py index 23c81f3d7..755e55237 100644 --- a/tests/protocol/test_connectors.py +++ b/tests/protocol/test_connectors.py @@ -316,6 +316,55 @@ async def test_all_connectors(mock_responses): assert len(conns) == 1 +@pytest.mark.asyncio +@pytest.mark.parametrize( + "mappings, expected_meta", + [ + ({".elastic-connectors-v1": {"mappings": {}}}, None), + ( + {".elastic-connectors-v1": {"mappings": {"_meta": {"foo": "bar"}}}}, + {"foo": "bar"}, + ), + ( + { + ".elastic-connectors-v1": {"mappings": {"_meta": {"foo": "bar"}}}, + ".elastic-connectors-v2": {"mappings": {"_meta": {"baz": "qux"}}}, + }, + {"baz": "qux"}, + ), + ], +) +async def test_connector_index_meta(mappings, expected_meta, mock_responses): + config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} + headers = {"X-Elastic-Product": "Elasticsearch"} + mock_responses.get( + "http://nowhere.com:9200/.elastic-connectors/_mapping", + payload=mappings, + headers=headers, + ) + + connectors = ConnectorIndex(config) + + meta = await connectors.meta() + assert meta == expected_meta + await connectors.close() + + +@pytest.mark.asyncio +async def test_connector_index_update_meta(mock_responses): + config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} + meta = {"foo": "bar"} + headers = {"X-Elastic-Product": "Elasticsearch"} + mock_responses.put( + "http://nowhere.com:9200/.elastic-connectors/_mapping?write_index_only=true", + payload={"_meta": meta}, + headers=headers, + ) + connectors = ConnectorIndex(config) + await connectors.update_meta(meta) + await connectors.close() + + @pytest.mark.asyncio async def test_connector_properties(): connector_src = { From 86346e572631b8c81a1cef27f79843d17ae2379c Mon Sep 17 00:00:00 2001 From: Chenhui Wang Date: Tue, 14 Nov 2023 22:43:16 +0800 Subject: [PATCH 2/2] move meta methods to ESIndex --- connectors/es/index.py | 11 +++++++ connectors/protocol/connectors.py | 10 ------- tests/es/test_index.py | 45 ++++++++++++++++++++++++++++ tests/protocol/test_connectors.py | 49 ------------------------------- 4 files changed, 56 insertions(+), 59 deletions(-) diff --git a/connectors/es/index.py b/connectors/es/index.py index d6875d3e9..740b155f4 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -127,3 +127,14 @@ async def get_all_docs(self, query=None, sort=None, page_size=DEFAULT_PAGE_SIZE) if count >= total: break offset += len(hits) + + async def meta(self): + response = await self.client.indices.get_mapping(index=self.index_name) + # Assume index name is the same pattern and the latest index is always the one with the largest suffix + latest_index = max(response.keys()) + return response[latest_index].get("mappings", {}).get("_meta") + + async def update_meta(self, meta): + await self.client.indices.put_mapping( + index=self.index_name, meta=meta, write_index_only=True + ) diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index cf836a443..e4074adba 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -188,16 +188,6 @@ async def all_connectors(self): async for connector in self.get_all_docs(): yield connector - async def meta(self): - response = await self.client.indices.get_mapping(index=self.index_name) - latest_index = max(response.keys()) - return response[latest_index].get("mappings", {}).get("_meta") - - async def update_meta(self, meta): - await self.client.indices.put_mapping( - index=self.index_name, meta=meta, write_index_only=True - ) - def filter_ingestion_stats(ingestion_stats): if ingestion_stats is None: diff --git a/tests/es/test_index.py b/tests/es/test_index.py index 08750c596..e91488923 100644 --- a/tests/es/test_index.py +++ b/tests/es/test_index.py @@ -249,3 +249,48 @@ async def test_get_all_docs(mock_responses): assert doc_count == total await index.close() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "mappings, expected_meta", + [ + ({".elastic-connectors-v1": {"mappings": {}}}, None), + ( + {".elastic-connectors-v1": {"mappings": {"_meta": {"foo": "bar"}}}}, + {"foo": "bar"}, + ), + ( + { + ".elastic-connectors-v1": {"mappings": {"_meta": {"foo": "bar"}}}, + ".elastic-connectors-v2": {"mappings": {"_meta": {"baz": "qux"}}}, + }, + {"baz": "qux"}, + ), + ], +) +async def test__meta(mappings, expected_meta, mock_responses): + mock_responses.get( + "http://nowhere.com:9200/fake_index/_mapping", + payload=mappings, + headers=headers, + ) + + index = ESIndex(index_name, config) + + meta = await index.meta() + assert meta == expected_meta + await index.close() + + +@pytest.mark.asyncio +async def test_update_meta(mock_responses): + meta = {"foo": "bar"} + mock_responses.put( + "http://nowhere.com:9200/fake_index/_mapping?write_index_only=true", + payload={"_meta": meta}, + headers=headers, + ) + index = ESIndex(index_name, config) + await index.update_meta(meta) + await index.close() diff --git a/tests/protocol/test_connectors.py b/tests/protocol/test_connectors.py index 755e55237..23c81f3d7 100644 --- a/tests/protocol/test_connectors.py +++ b/tests/protocol/test_connectors.py @@ -316,55 +316,6 @@ async def test_all_connectors(mock_responses): assert len(conns) == 1 -@pytest.mark.asyncio -@pytest.mark.parametrize( - "mappings, expected_meta", - [ - ({".elastic-connectors-v1": {"mappings": {}}}, None), - ( - {".elastic-connectors-v1": {"mappings": {"_meta": {"foo": "bar"}}}}, - {"foo": "bar"}, - ), - ( - { - ".elastic-connectors-v1": {"mappings": {"_meta": {"foo": "bar"}}}, - ".elastic-connectors-v2": {"mappings": {"_meta": {"baz": "qux"}}}, - }, - {"baz": "qux"}, - ), - ], -) -async def test_connector_index_meta(mappings, expected_meta, mock_responses): - config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} - headers = {"X-Elastic-Product": "Elasticsearch"} - mock_responses.get( - "http://nowhere.com:9200/.elastic-connectors/_mapping", - payload=mappings, - headers=headers, - ) - - connectors = ConnectorIndex(config) - - meta = await connectors.meta() - assert meta == expected_meta - await connectors.close() - - -@pytest.mark.asyncio -async def test_connector_index_update_meta(mock_responses): - config = {"host": "http://nowhere.com:9200", "user": "tarek", "password": "blah"} - meta = {"foo": "bar"} - headers = {"X-Elastic-Product": "Elasticsearch"} - mock_responses.put( - "http://nowhere.com:9200/.elastic-connectors/_mapping?write_index_only=true", - payload={"_meta": meta}, - headers=headers, - ) - connectors = ConnectorIndex(config) - await connectors.update_meta(meta) - await connectors.close() - - @pytest.mark.asyncio async def test_connector_properties(): connector_src = {