From 88894ff058aea2014ef6d763465d62c85fe0dea4 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Fri, 25 Oct 2024 13:44:32 +0200 Subject: [PATCH 1/3] Followup on #2861 --- connectors/agent/connector_record_manager.py | 36 ++------ connectors/protocol/connectors.py | 13 +++ connectors/utils.py | 14 +++- tests/agent/test_connector_record_manager.py | 88 ++++++++------------ tests/protocol/test_connectors.py | 46 ++++++++++ 5 files changed, 111 insertions(+), 86 deletions(-) diff --git a/connectors/agent/connector_record_manager.py b/connectors/agent/connector_record_manager.py index 039784e62..0740e4d15 100644 --- a/connectors/agent/connector_record_manager.py +++ b/connectors/agent/connector_record_manager.py @@ -3,12 +3,10 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # -import secrets -import string from connectors.agent.logger import get_logger -from connectors.es.index import DocumentNotFoundError from connectors.protocol import ConnectorIndex +from connectors.utils import generate_random_id logger = get_logger("agent_connector_record_manager") @@ -30,25 +28,27 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None """ if not self._agent_config_ready(agent_config): + logger.debug( + "Agent configuration is not ready to create a connector record." + ) return # Initialize the ES client if it's not already initialized if not self.connector_index: self.connector_index = ConnectorIndex(agent_config.get("elasticsearch")) - for connector_config in self._get_connectors(agent_config): + for connector_config in agent_config.get("connectors"): connector_id, service_type = ( connector_config["connector_id"], connector_config["service_type"], ) if not connector_name: - random_connector_name_id = self._generate_random_connector_name_id( - length=4 - ) + logger.debug("Connector name not provided, generating a random one.") + random_connector_name_id = generate_random_id(length=4) connector_name = f"[Elastic-managed] {service_type} connector {random_connector_name_id}" - if not await self._connector_exists(connector_id): + if not await self.connector_index.connector_exists(connector_id): try: await self.connector_index.connector_put( connector_id=connector_id, @@ -82,23 +82,3 @@ def _agent_config_ready(self, agent_config): return False return True - - async def _connector_exists(self, connector_id): - try: - doc = await self.connector_index.fetch_by_id(connector_id) - return doc is not None - except DocumentNotFoundError: - return False - except Exception as e: - logger.error( - f"Error while checking existence of connector '{connector_id}': {e}" - ) - raise e - - def _get_connectors(self, agent_config): - return agent_config.get("connectors") - - def _generate_random_connector_name_id(self, length=4): - return "".join( - secrets.choice(string.ascii_letters + string.digits) for _ in range(length) - ) diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index 0c33c1f6f..8fb303fee 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -23,6 +23,7 @@ from connectors.es import ESDocument, ESIndex from connectors.es.client import with_concurrency_control +from connectors.es.index import DocumentNotFoundError from connectors.filtering.validation import ( FilteringValidationState, InvalidFilteringError, @@ -172,6 +173,18 @@ async def connector_put( index_name=index_name, ) + async def connector_exists(self, connector_id): + try: + doc = await self.fetch_by_id(connector_id) + return doc is not None + except DocumentNotFoundError: + return False + except Exception as e: + logger.error( + f"Error while checking existence of connector '{connector_id}': {e}" + ) + raise e + async def connector_update_scheduling( self, connector_id, full=None, incremental=None, access_control=None ): diff --git a/connectors/utils.py b/connectors/utils.py index 178d58303..1a0bfdec4 100644 --- a/connectors/utils.py +++ b/connectors/utils.py @@ -11,8 +11,10 @@ import os import platform import re +import secrets import shutil import ssl +import string import subprocess # noqa S404 import time import urllib.parse @@ -754,10 +756,10 @@ def truncate_id(_id): def has_duplicates(strings_list): seen = set() - for string in strings_list: - if string in seen: + for s in strings_list: + if s in seen: return True - seen.add(string) + seen.add(s) return False @@ -994,3 +996,9 @@ def get(self, key) -> int: def to_dict(self): return deepcopy(self._storage) + + +def generate_random_id(length=4): + return "".join( + secrets.choice(string.ascii_letters + string.digits) for _ in range(length) + ) diff --git a/tests/agent/test_connector_record_manager.py b/tests/agent/test_connector_record_manager.py index 46ae9f87c..2f84a2b9d 100644 --- a/tests/agent/test_connector_record_manager.py +++ b/tests/agent/test_connector_record_manager.py @@ -3,14 +3,13 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # -from unittest.mock import AsyncMock, Mock, patch +from unittest.mock import AsyncMock, patch import pytest from connectors.agent.connector_record_manager import ( ConnectorRecordManager, ) -from connectors.es.index import DocumentNotFoundError from connectors.protocol import ConnectorIndex @@ -35,49 +34,53 @@ def connector_record_manager(mock_connector_index): @pytest.mark.asyncio -@patch("connectors.protocol.ConnectorIndex", new_callable=AsyncMock) async def test_ensure_connector_records_exist_creates_connectors_if_not_exist( - mock_connector_index, mock_agent_config + connector_record_manager, mock_agent_config ): - manager = ConnectorRecordManager() - manager.connector_index = mock_connector_index - mock_connector_index.fetch_by_id.side_effect = DocumentNotFoundError - mock_connector_index.connector_put = AsyncMock() - connector_ui_id = "1234" - manager._generate_random_connector_name_id = Mock(return_value=connector_ui_id) - - await manager.ensure_connector_records_exist(mock_agent_config) - assert mock_connector_index.connector_put.call_count == 1 - mock_connector_index.connector_put.assert_any_await( - connector_id="1", - service_type="service1", - connector_name=f"[Elastic-managed] service1 connector {connector_ui_id}", - ) + random_connector_name_id = "1234" + + with patch( + "connectors.agent.connector_record_manager.generate_random_id", + return_value=random_connector_name_id, + ): + connector_record_manager.connector_index.connector_exists = AsyncMock( + return_value=False + ) + connector_record_manager.connector_index.connector_put = AsyncMock() + + await connector_record_manager.ensure_connector_records_exist(mock_agent_config) + assert connector_record_manager.connector_index.connector_put.call_count == 1 + connector_record_manager.connector_index.connector_put.assert_any_await( + connector_id="1", + service_type="service1", + connector_name=f"[Elastic-managed] service1 connector {random_connector_name_id}", + ) @pytest.mark.asyncio async def test_ensure_connector_records_exist_connector_already_exists( connector_record_manager, mock_agent_config ): - connector_record_manager._connector_exists = AsyncMock(return_value=True) + connector_record_manager.connector_index.connector_exists = AsyncMock( + return_value=True + ) await connector_record_manager.ensure_connector_records_exist(mock_agent_config) assert connector_record_manager.connector_index.connector_put.call_count == 0 @pytest.mark.asyncio -@patch("connectors.protocol.ConnectorIndex", new_callable=AsyncMock) async def test_ensure_connector_records_raises_on_non_404_error( - mock_connector_index, mock_agent_config + connector_record_manager, mock_agent_config ): - manager = ConnectorRecordManager() - manager.connector_index = mock_connector_index - mock_connector_index.fetch_by_id.side_effect = Exception("Unexpected error") - mock_connector_index.connector_put = AsyncMock() + connector_record_manager.connector_index.connector_exists = AsyncMock( + side_effect=Exception("Unexpected error") + ) + connector_record_manager.connector_index.connector_put = AsyncMock() with pytest.raises(Exception, match="Unexpected error"): - await manager.ensure_connector_records_exist(mock_agent_config) + await connector_record_manager.ensure_connector_records_exist(mock_agent_config) - assert mock_connector_index.connector_put.call_count == 0 + assert connector_record_manager.connector_index.connector_put.call_count == 0 @pytest.mark.asyncio @@ -93,7 +96,9 @@ async def test_ensure_connector_records_exist_agent_config_not_ready( async def test_ensure_connector_records_exist_exception_on_create( connector_record_manager, mock_agent_config ): - connector_record_manager._connector_exists = AsyncMock(return_value=False) + connector_record_manager.connector_index.connector_exists = AsyncMock( + return_value=False + ) connector_record_manager.connector_index.connector_put = AsyncMock( side_effect=Exception("Failed to create") ) @@ -101,33 +106,6 @@ async def test_ensure_connector_records_exist_exception_on_create( await connector_record_manager.ensure_connector_records_exist(mock_agent_config) -@pytest.mark.asyncio -async def test_connector_exists_returns_true_when_found(connector_record_manager): - connector_record_manager.connector_index.fetch_by_id = AsyncMock( - return_value={"id": "1"} - ) - exists = await connector_record_manager._connector_exists("1") - assert exists is True - - -@pytest.mark.asyncio -async def test_connector_exists_returns_false_when_not_found(connector_record_manager): - connector_record_manager.connector_index.fetch_by_id = AsyncMock( - side_effect=DocumentNotFoundError - ) - exists = await connector_record_manager._connector_exists("1") - assert exists is False - - -@pytest.mark.asyncio -async def test_connector_exists_raises_non_404_exception(connector_record_manager): - connector_record_manager.connector_index.fetch_by_id = AsyncMock( - side_effect=Exception("Fetch error") - ) - with pytest.raises(Exception, match="Fetch error"): - await connector_record_manager._connector_exists("1") - - def test_agent_config_ready_with_valid_config( connector_record_manager, mock_agent_config ): diff --git a/tests/protocol/test_connectors.py b/tests/protocol/test_connectors.py index 0289992e6..4de4646ed 100644 --- a/tests/protocol/test_connectors.py +++ b/tests/protocol/test_connectors.py @@ -13,6 +13,7 @@ from elasticsearch import ApiError, ConflictError from connectors.config import load_config +from connectors.es.index import DocumentNotFoundError from connectors.filtering.validation import ( FilteringValidationResult, FilteringValidationState, @@ -1646,6 +1647,51 @@ async def test_connector_validate_filtering_valid_with_connector_api(set_env): ) +@pytest.mark.asyncio +async def test_connector_exists_returns_true_when_found(): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + + index = ConnectorIndex(config) + index.fetch_by_id = AsyncMock(return_value={"id": "1"}) + + exists = await index.connector_exists("1") + assert exists is True + + +@pytest.mark.asyncio +async def test_connector_exists_returns_false_when_not_found(): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + + index = ConnectorIndex(config) + index.fetch_by_id = AsyncMock(side_effect=DocumentNotFoundError) + + exists = await index.connector_exists("1") + assert exists is False + + +@pytest.mark.asyncio +async def test_connector_exists_raises_non_404_exception(): + config = { + "username": "elastic", + "password": "changeme", + "host": "http://nowhere.com:9200", + } + + index = ConnectorIndex(config) + index.fetch_by_id = AsyncMock(side_effect=Exception("Fetch error")) + + with pytest.raises(Exception, match="Fetch error"): + await index.connector_exists("1") + + @pytest.mark.asyncio async def test_document_count(): expected_count = 20 From aae8c5f769add778b4db131053b3bc75f00ca9a6 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Mon, 4 Nov 2024 17:03:22 +0100 Subject: [PATCH 2/3] Add detailed message to config check --- connectors/agent/connector_record_manager.py | 43 +++++++++++++++----- tests/agent/test_connector_record_manager.py | 6 +-- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/connectors/agent/connector_record_manager.py b/connectors/agent/connector_record_manager.py index 0740e4d15..141027e73 100644 --- a/connectors/agent/connector_record_manager.py +++ b/connectors/agent/connector_record_manager.py @@ -27,9 +27,11 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None If the connector record with a given ID doesn't exist, create a new one. """ - if not self._agent_config_ready(agent_config): + config_ready, msg = self._check_agent_config_ready(agent_config) + + if not config_ready: logger.debug( - "Agent configuration is not ready to create a connector record." + f"Agent configuration is not ready to create a connector record. Skipping. Reason: {msg} " ) return @@ -62,23 +64,42 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None ) raise e - def _agent_config_ready(self, agent_config): + def _check_agent_config_ready(self, agent_config): """ Validates the agent configuration to check if all info is present to create a connector record. + + Returns: + tuple: (bool, str or None) - True and None if valid, otherwise False and an error message. """ + connectors = agent_config.get("connectors") - if connectors is None or len(connectors) == 0: - return False + if connectors is None: + return False, "No 'connectors' key found in the service configuration." + + if len(connectors) == 0: + return False, "Empty connectors array found in the service configuration." for connector in connectors: - if "connector_id" not in connector or "service_type" not in connector: - return False + if "connector_id" not in connector: + return ( + False, + "No 'connector_id' key found in the connector object.", + ) + + if "service_type" not in connector: + return ( + False, + "No 'service_type' key found in the connector object.", + ) elasticsearch_config = agent_config.get("elasticsearch") if not elasticsearch_config: - return False + return False, "No 'elasticsearch' key found in the service configuration." + + if "host" not in elasticsearch_config: + return False, "No 'host' key found in the elasticsearch configuration." - if "host" not in elasticsearch_config or "api_key" not in elasticsearch_config: - return False + if "api_key" not in elasticsearch_config: + return False, "No 'api_key' key found in the elasticsearch configuration." - return True + return True, None diff --git a/tests/agent/test_connector_record_manager.py b/tests/agent/test_connector_record_manager.py index 2f84a2b9d..34818289d 100644 --- a/tests/agent/test_connector_record_manager.py +++ b/tests/agent/test_connector_record_manager.py @@ -109,7 +109,7 @@ async def test_ensure_connector_records_exist_exception_on_create( def test_agent_config_ready_with_valid_config( connector_record_manager, mock_agent_config ): - ready = connector_record_manager._agent_config_ready(mock_agent_config) + ready, _ = connector_record_manager._check_agent_config_ready(mock_agent_config) assert ready is True @@ -119,7 +119,7 @@ def test_agent_config_ready_with_invalid_config_missing_connectors( invalid_config = { "elasticsearch": {"host": "http://localhost:9200", "api_key": "dummy_key"} } - ready = connector_record_manager._agent_config_ready(invalid_config) + ready, _ = connector_record_manager._check_agent_config_ready(invalid_config) assert ready is False @@ -127,5 +127,5 @@ def test_agent_config_ready_with_invalid_config_missing_elasticsearch( connector_record_manager, ): invalid_config = {"connectors": [{"connector_id": "1", "service_type": "service1"}]} - ready = connector_record_manager._agent_config_ready(invalid_config) + ready, _ = connector_record_manager._check_agent_config_ready(invalid_config) assert ready is False From c59adb3c9f61ab1b4c15af6fa17a9ff53e47266f Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Thu, 7 Nov 2024 10:01:34 +0100 Subject: [PATCH 3/3] Update connectors/agent/connector_record_manager.py Co-authored-by: Artem Shelkovnikov --- connectors/agent/connector_record_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/agent/connector_record_manager.py b/connectors/agent/connector_record_manager.py index 141027e73..331a95fba 100644 --- a/connectors/agent/connector_record_manager.py +++ b/connectors/agent/connector_record_manager.py @@ -77,7 +77,7 @@ def _check_agent_config_ready(self, agent_config): return False, "No 'connectors' key found in the service configuration." if len(connectors) == 0: - return False, "Empty connectors array found in the service configuration." + return False, "Empty 'connectors' array found in the service configuration." for connector in connectors: if "connector_id" not in connector: