Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Followup on #2861 #2923

Merged
merged 6 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 8 additions & 28 deletions connectors/agent/connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import secrets
import string

from connectors.agent.logger import get_logger
from connectors.es.index import DocumentNotFoundError
from connectors.protocol import ConnectorIndex
from connectors.utils import generate_random_id

logger = get_logger("agent_connector_record_manager")

Expand All @@ -30,25 +28,27 @@ async def ensure_connector_records_exist(self, agent_config, connector_name=None
"""

if not self._agent_config_ready(agent_config):
logger.debug(
"Agent configuration is not ready to create a connector record."
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the copy to provide more specific detail on what's going on? How can configuration be not ready? Is it gonna be happening a lot?

Copy link
Member Author

@jedrazb jedrazb Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it gonna be happening a lot?

It should not be happening at all... just here to verify that the config is 100% what we expect this to be. We only call ensure_connector_records_exist once we have ES output from the agent protocol checkin.

return

# Initialize the ES client if it's not already initialized
if not self.connector_index:
self.connector_index = ConnectorIndex(agent_config.get("elasticsearch"))

for connector_config in self._get_connectors(agent_config):
for connector_config in agent_config.get("connectors"):
connector_id, service_type = (
connector_config["connector_id"],
connector_config["service_type"],
)

if not connector_name:
random_connector_name_id = self._generate_random_connector_name_id(
length=4
)
logger.debug("Connector name not provided, generating a random one.")
random_connector_name_id = generate_random_id(length=4)
connector_name = f"[Elastic-managed] {service_type} connector {random_connector_name_id}"

if not await self._connector_exists(connector_id):
if not await self.connector_index.connector_exists(connector_id):
try:
await self.connector_index.connector_put(
connector_id=connector_id,
Expand Down Expand Up @@ -82,23 +82,3 @@ def _agent_config_ready(self, agent_config):
return False

return True

async def _connector_exists(self, connector_id):
try:
doc = await self.connector_index.fetch_by_id(connector_id)
return doc is not None
except DocumentNotFoundError:
return False
except Exception as e:
logger.error(
f"Error while checking existence of connector '{connector_id}': {e}"
)
raise e

def _get_connectors(self, agent_config):
return agent_config.get("connectors")

def _generate_random_connector_name_id(self, length=4):
return "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(length)
)
13 changes: 13 additions & 0 deletions connectors/protocol/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from connectors.es import ESDocument, ESIndex
from connectors.es.client import with_concurrency_control
from connectors.es.index import DocumentNotFoundError
from connectors.filtering.validation import (
FilteringValidationState,
InvalidFilteringError,
Expand Down Expand Up @@ -172,6 +173,18 @@ async def connector_put(
index_name=index_name,
)

async def connector_exists(self, connector_id):
try:
doc = await self.fetch_by_id(connector_id)
return doc is not None
artem-shelkovnikov marked this conversation as resolved.
Show resolved Hide resolved
except DocumentNotFoundError:
return False
except Exception as e:
logger.error(
f"Error while checking existence of connector '{connector_id}': {e}"
)
raise e

async def connector_update_scheduling(
self, connector_id, full=None, incremental=None, access_control=None
):
Expand Down
14 changes: 11 additions & 3 deletions connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import os
import platform
import re
import secrets
import shutil
import ssl
import string
import subprocess # noqa S404
import time
import urllib.parse
Expand Down Expand Up @@ -754,10 +756,10 @@ def truncate_id(_id):

def has_duplicates(strings_list):
seen = set()
for string in strings_list:
if string in seen:
for s in strings_list:
if s in seen:
return True
seen.add(string)
seen.add(s)
return False


Expand Down Expand Up @@ -994,3 +996,9 @@ def get(self, key) -> int:

def to_dict(self):
return deepcopy(self._storage)


def generate_random_id(length=4):
return "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(length)
)
88 changes: 33 additions & 55 deletions tests/agent/test_connector_record_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
from unittest.mock import AsyncMock, Mock, patch
from unittest.mock import AsyncMock, patch

import pytest

from connectors.agent.connector_record_manager import (
ConnectorRecordManager,
)
from connectors.es.index import DocumentNotFoundError
from connectors.protocol import ConnectorIndex


Expand All @@ -35,49 +34,53 @@ def connector_record_manager(mock_connector_index):


@pytest.mark.asyncio
@patch("connectors.protocol.ConnectorIndex", new_callable=AsyncMock)
async def test_ensure_connector_records_exist_creates_connectors_if_not_exist(
mock_connector_index, mock_agent_config
connector_record_manager, mock_agent_config
):
manager = ConnectorRecordManager()
manager.connector_index = mock_connector_index
mock_connector_index.fetch_by_id.side_effect = DocumentNotFoundError
mock_connector_index.connector_put = AsyncMock()
connector_ui_id = "1234"
manager._generate_random_connector_name_id = Mock(return_value=connector_ui_id)

await manager.ensure_connector_records_exist(mock_agent_config)
assert mock_connector_index.connector_put.call_count == 1
mock_connector_index.connector_put.assert_any_await(
connector_id="1",
service_type="service1",
connector_name=f"[Elastic-managed] service1 connector {connector_ui_id}",
)
random_connector_name_id = "1234"

with patch(
"connectors.agent.connector_record_manager.generate_random_id",
return_value=random_connector_name_id,
):
connector_record_manager.connector_index.connector_exists = AsyncMock(
return_value=False
)
connector_record_manager.connector_index.connector_put = AsyncMock()

await connector_record_manager.ensure_connector_records_exist(mock_agent_config)
assert connector_record_manager.connector_index.connector_put.call_count == 1
connector_record_manager.connector_index.connector_put.assert_any_await(
connector_id="1",
service_type="service1",
connector_name=f"[Elastic-managed] service1 connector {random_connector_name_id}",
)


@pytest.mark.asyncio
async def test_ensure_connector_records_exist_connector_already_exists(
connector_record_manager, mock_agent_config
):
connector_record_manager._connector_exists = AsyncMock(return_value=True)
connector_record_manager.connector_index.connector_exists = AsyncMock(
return_value=True
)
await connector_record_manager.ensure_connector_records_exist(mock_agent_config)
assert connector_record_manager.connector_index.connector_put.call_count == 0


@pytest.mark.asyncio
@patch("connectors.protocol.ConnectorIndex", new_callable=AsyncMock)
async def test_ensure_connector_records_raises_on_non_404_error(
mock_connector_index, mock_agent_config
connector_record_manager, mock_agent_config
):
manager = ConnectorRecordManager()
manager.connector_index = mock_connector_index
mock_connector_index.fetch_by_id.side_effect = Exception("Unexpected error")
mock_connector_index.connector_put = AsyncMock()
connector_record_manager.connector_index.connector_exists = AsyncMock(
side_effect=Exception("Unexpected error")
)
connector_record_manager.connector_index.connector_put = AsyncMock()

with pytest.raises(Exception, match="Unexpected error"):
await manager.ensure_connector_records_exist(mock_agent_config)
await connector_record_manager.ensure_connector_records_exist(mock_agent_config)

assert mock_connector_index.connector_put.call_count == 0
assert connector_record_manager.connector_index.connector_put.call_count == 0


@pytest.mark.asyncio
Expand All @@ -93,41 +96,16 @@ async def test_ensure_connector_records_exist_agent_config_not_ready(
async def test_ensure_connector_records_exist_exception_on_create(
connector_record_manager, mock_agent_config
):
connector_record_manager._connector_exists = AsyncMock(return_value=False)
connector_record_manager.connector_index.connector_exists = AsyncMock(
return_value=False
)
connector_record_manager.connector_index.connector_put = AsyncMock(
side_effect=Exception("Failed to create")
)
with pytest.raises(Exception, match="Failed to create"):
await connector_record_manager.ensure_connector_records_exist(mock_agent_config)


@pytest.mark.asyncio
async def test_connector_exists_returns_true_when_found(connector_record_manager):
connector_record_manager.connector_index.fetch_by_id = AsyncMock(
return_value={"id": "1"}
)
exists = await connector_record_manager._connector_exists("1")
assert exists is True


@pytest.mark.asyncio
async def test_connector_exists_returns_false_when_not_found(connector_record_manager):
connector_record_manager.connector_index.fetch_by_id = AsyncMock(
side_effect=DocumentNotFoundError
)
exists = await connector_record_manager._connector_exists("1")
assert exists is False


@pytest.mark.asyncio
async def test_connector_exists_raises_non_404_exception(connector_record_manager):
connector_record_manager.connector_index.fetch_by_id = AsyncMock(
side_effect=Exception("Fetch error")
)
with pytest.raises(Exception, match="Fetch error"):
await connector_record_manager._connector_exists("1")


def test_agent_config_ready_with_valid_config(
connector_record_manager, mock_agent_config
):
Expand Down
46 changes: 46 additions & 0 deletions tests/protocol/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from elasticsearch import ApiError, ConflictError

from connectors.config import load_config
from connectors.es.index import DocumentNotFoundError
from connectors.filtering.validation import (
FilteringValidationResult,
FilteringValidationState,
Expand Down Expand Up @@ -1646,6 +1647,51 @@ async def test_connector_validate_filtering_valid_with_connector_api(set_env):
)


@pytest.mark.asyncio
async def test_connector_exists_returns_true_when_found():
config = {
"username": "elastic",
"password": "changeme",
"host": "http://nowhere.com:9200",
}

index = ConnectorIndex(config)
index.fetch_by_id = AsyncMock(return_value={"id": "1"})

exists = await index.connector_exists("1")
assert exists is True


@pytest.mark.asyncio
async def test_connector_exists_returns_false_when_not_found():
config = {
"username": "elastic",
"password": "changeme",
"host": "http://nowhere.com:9200",
}

index = ConnectorIndex(config)
index.fetch_by_id = AsyncMock(side_effect=DocumentNotFoundError)

exists = await index.connector_exists("1")
assert exists is False


@pytest.mark.asyncio
async def test_connector_exists_raises_non_404_exception():
config = {
"username": "elastic",
"password": "changeme",
"host": "http://nowhere.com:9200",
}

index = ConnectorIndex(config)
index.fetch_by_id = AsyncMock(side_effect=Exception("Fetch error"))

with pytest.raises(Exception, match="Fetch error"):
await index.connector_exists("1")


@pytest.mark.asyncio
async def test_document_count():
expected_count = 20
Expand Down