Skip to content

Commit

Permalink
[COST-5886] update kafka status check (#5494)
Browse files Browse the repository at this point in the history
  • Loading branch information
maskarb authored Feb 10, 2025
1 parent 5a87eb6 commit 961176e
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 190 deletions.
1 change: 0 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ gunicorn = "*"
ibm-cloud-sdk-core = ">=3.5.2"
ibm-platform-services = ">=0.17.8"
jinjasql2 = "*"
kafka-python = ">=2.0.1"
kombu = "<5.3" # https://issues.redhat.com/browse/COST-3997
msrestazure = "*"
numpy = {version = "*", markers = "platform_machine == 'aarch64' or platform_machine == 'arm64' or platform_machine == 'x86_64'"}
Expand Down
311 changes: 149 additions & 162 deletions Pipfile.lock

Large diffs are not rendered by default.

24 changes: 18 additions & 6 deletions koku/kafka_utils/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ class KafkaUtilsTest(TestCase):

def test_check_kafka_connection(self):
"""Test check kafka connections."""
with patch("kafka.BrokerConnection.connect_blocking", return_value=False):
with patch("kafka_utils.utils.get_admin_client") as mock_client:
mock_client.return_value.list_topics.return_value.topics = []
result = utils.check_kafka_connection()
self.assertFalse(result)
with patch("kafka.BrokerConnection.connect_blocking", return_value=True):
with patch("kafka.BrokerConnection.close") as mock_close:
result = utils.check_kafka_connection()
mock_close.assert_called()
self.assertTrue(result)
with patch("kafka_utils.utils.get_admin_client") as mock_client:
mock_client.return_value.list_topics.return_value.topics = [1]
result = utils.check_kafka_connection()
self.assertTrue(result)

@patch("time.sleep", side_effect=None)
@patch("kafka_utils.utils.check_kafka_connection", side_effect=[bool(0), bool(1)])
Expand Down Expand Up @@ -78,3 +78,15 @@ def test_producer_singleton(self):
p1 = utils.ProducerSingleton({})
p2 = utils.ProducerSingleton({})
self.assertEqual(id(p1), id(p2))


class AdminClientSingletonTest(TestCase):
def test_producer_singleton(self):
"""Tests that the ID of two created ProducerSingletons are in fact the same whereas two Producers are not."""
# no provided bootstrap.servers create a producer that doesn't connect to anything.
pfake = utils.AdminClient({})
pfake2 = utils.AdminClient({})
self.assertNotEqual(id(pfake), id(pfake2))
p1 = utils.AdminClientSingleton({})
p2 = utils.AdminClientSingleton({})
self.assertEqual(id(p1), id(p2))
25 changes: 14 additions & 11 deletions koku/kafka_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
"""Common utility functions for Kafka implementations."""
import logging
import random
import socket
import time

from confluent_kafka import Consumer
from confluent_kafka import Producer
from kafka import BrokerConnection
from confluent_kafka.admin import AdminClient

from koku.configurator import CONFIGURATOR
from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER
from masu.util.common import SingletonMeta


LOG = logging.getLogger(__name__)
UPLOAD_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.announce")
VALIDATION_TOPIC = CONFIGURATOR.get_kafka_topic("platform.upload.validation")
Expand All @@ -26,6 +24,10 @@
SOURCES_TOPIC = CONFIGURATOR.get_kafka_topic("platform.sources.event-stream")


class AdminClientSingleton(AdminClient, metaclass=SingletonMeta):
"""Creates a singleton instance of a Kafka Producer"""


class ProducerSingleton(Producer, metaclass=SingletonMeta):
"""Creates a singleton instance of a Kafka Producer"""

Expand Down Expand Up @@ -68,6 +70,11 @@ def get_consumer(conf_settings): # pragma: no cover
return Consumer(conf, logger=LOG)


def get_admin_client(): # pragma: no cover
conf = _get_managed_kafka_config()
return AdminClientSingleton(conf)


def _get_producer_config(conf_settings): # pragma: no cover
"""Return Kafka Producer config"""
producer_conf = {}
Expand Down Expand Up @@ -102,14 +109,10 @@ def backoff(interval, maximum=120):

def check_kafka_connection():
"""Check connectability of Kafka Broker."""
for broker in CONFIGURATOR.get_kafka_broker_list():
host, port = broker.split(":")
conn = BrokerConnection(host, int(port), socket.AF_UNSPEC)
connected = conn.connect_blocking(timeout=1)
if connected:
conn.close()
break
return connected
client = get_admin_client()
topics = client.list_topics().topics
# if there is a list of topics, then we've successfully connected to kafka
return len(topics) > 0


def is_kafka_connected():
Expand Down
3 changes: 0 additions & 3 deletions koku/sources/kafka_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from kafka.errors import KafkaError
from rest_framework.exceptions import ValidationError

from api.provider.models import Sources
Expand Down Expand Up @@ -304,8 +303,6 @@ def listen_for_messages(kaf_msg, consumer, application_source_id): # noqa: C901
else:
consumer.commit()

except KafkaError as error:
LOG.error(f"[listen_for_messages] Kafka error encountered: {type(error).__name__}: {error}", exc_info=True)
except Exception as error:
LOG.error(f"[listen_for_messages] UNKNOWN error encountered: {type(error).__name__}: {error}", exc_info=True)

Expand Down
16 changes: 9 additions & 7 deletions koku/sources/test/test_kafka_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from uuid import uuid4

import requests_mock
from confluent_kafka import KafkaError
from confluent_kafka import KafkaException
from django.db import IntegrityError
from django.db import InterfaceError
from django.db import OperationalError
from django.db.models.signals import post_save
from django.forms.models import model_to_dict
from django.test.utils import override_settings
from faker import Faker
from kafka.errors import KafkaError
from rest_framework.exceptions import ValidationError

import sources.kafka_listener as source_integration
Expand Down Expand Up @@ -60,7 +61,6 @@
from sources.test.test_sources_http_client import MOCK_PREFIX
from sources.test.test_sources_http_client import MOCK_URL


faker = Faker()
FAKE_AWS_ARN = "arn:aws:iam::111111111111:role/CostManagement"
FAKE_EXTERNAL_ID = str(uuid4())
Expand Down Expand Up @@ -106,7 +106,7 @@ def seek(self, topic_partition):
def getone(self):
for msg in self.preloaded_messages:
return msg
raise KafkaError("Closing Mock Consumer")
raise KafkaException(KafkaError._PARTITION_EOF)

def __aiter__(self):
return self
Expand Down Expand Up @@ -955,8 +955,9 @@ def test_storage_callback_update(self):
local_source = Sources(**self.aws_local_source, koku_uuid=uuid, pending_update=True)
local_source.save()

with patch("sources.kafka_listener.execute_process_queue"), patch(
"sources.storage.screen_and_build_provider_sync_create_event", return_value=False
with (
patch("sources.kafka_listener.execute_process_queue"),
patch("sources.storage.screen_and_build_provider_sync_create_event", return_value=False),
):
storage_callback("", local_source)
_, msg = PROCESS_QUEUE.get_nowait()
Expand All @@ -968,8 +969,9 @@ def test_storage_callback_update_and_delete(self):
local_source = Sources(**self.aws_local_source, koku_uuid=uuid, pending_update=True, pending_delete=True)
local_source.save()

with patch("sources.kafka_listener.execute_process_queue"), patch(
"sources.storage.screen_and_build_provider_sync_create_event", return_value=False
with (
patch("sources.kafka_listener.execute_process_queue"),
patch("sources.storage.screen_and_build_provider_sync_create_event", return_value=False),
):
storage_callback("", local_source)
_, msg = PROCESS_QUEUE.get_nowait()
Expand Down

0 comments on commit 961176e

Please sign in to comment.