From aa76fc2e8293fe779d82209dac28cb141da7b742 Mon Sep 17 00:00:00 2001 From: Makar Shevchenko Date: Tue, 26 Apr 2022 19:19:25 +0300 Subject: [PATCH 1/2] fix modification of config by kafka utils in kafka main loop --- smart_kit/start_points/main_loop_kafka.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index b8131ecd..f08168ac 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -15,6 +15,7 @@ from core.model.heapq.heapq_storage import HeapqKV from core.mq.kafka.kafka_consumer import KafkaConsumer from core.mq.kafka.kafka_publisher import KafkaPublisher +from core.utils.pickle_copy import pickle_deepcopy from core.utils.stats_timer import StatsTimer from core.basic_models.actions.command import Command from smart_kit.compatibility.commands import combine_commands @@ -56,9 +57,9 @@ def __init__(self, *args, **kwargs): ) for key, config in kafka_config.items(): if config.get("consumer"): - consumers.update({key: KafkaConsumer(kafka_config[key])}) + consumers.update({key: KafkaConsumer(pickle_deepcopy(kafka_config[key]))}) if config.get("publisher"): - publishers.update({key: KafkaPublisher(kafka_config[key])}) + publishers.update({key: KafkaPublisher(pickle_deepcopy(kafka_config[key]))}) log( "%(class_name)s FINISHED CONSUMERS/PUBLISHERS CREATE", params={"class_name": self.__class__.__name__}, level="WARNING" From e123ef1058c36bdf7772c261bc29522755321efd Mon Sep 17 00:00:00 2001 From: Makar Shevchenko Date: Tue, 26 Apr 2022 20:08:33 +0300 Subject: [PATCH 2/2] refactor fix modifieble config --- smart_kit/start_points/main_loop_kafka.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index f08168ac..8abb59e1 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -55,11 +55,12 @@ def __init__(self, *args, **kwargs): "%(class_name)s START CONSUMERS/PUBLISHERS CREATE", params={"class_name": self.__class__.__name__}, level="WARNING" ) - for key, config in kafka_config.items(): + kafka_config_copy = pickle_deepcopy(kafka_config) + for key, config in kafka_config_copy.items(): if config.get("consumer"): - consumers.update({key: KafkaConsumer(pickle_deepcopy(kafka_config[key]))}) + consumers.update({key: KafkaConsumer(config)}) if config.get("publisher"): - publishers.update({key: KafkaPublisher(pickle_deepcopy(kafka_config[key]))}) + publishers.update({key: KafkaPublisher(config)}) log( "%(class_name)s FINISHED CONSUMERS/PUBLISHERS CREATE", params={"class_name": self.__class__.__name__}, level="WARNING"