diff --git a/smart_kit/start_points/main_loop_kafka.py b/smart_kit/start_points/main_loop_kafka.py index b8131ecd..8abb59e1 100644 --- a/smart_kit/start_points/main_loop_kafka.py +++ b/smart_kit/start_points/main_loop_kafka.py @@ -15,6 +15,7 @@ from core.model.heapq.heapq_storage import HeapqKV from core.mq.kafka.kafka_consumer import KafkaConsumer from core.mq.kafka.kafka_publisher import KafkaPublisher +from core.utils.pickle_copy import pickle_deepcopy from core.utils.stats_timer import StatsTimer from core.basic_models.actions.command import Command from smart_kit.compatibility.commands import combine_commands @@ -54,11 +55,12 @@ def __init__(self, *args, **kwargs): "%(class_name)s START CONSUMERS/PUBLISHERS CREATE", params={"class_name": self.__class__.__name__}, level="WARNING" ) - for key, config in kafka_config.items(): + kafka_config_copy = pickle_deepcopy(kafka_config) + for key, config in kafka_config_copy.items(): if config.get("consumer"): - consumers.update({key: KafkaConsumer(kafka_config[key])}) + consumers.update({key: KafkaConsumer(config)}) if config.get("publisher"): - publishers.update({key: KafkaPublisher(kafka_config[key])}) + publishers.update({key: KafkaPublisher(config)}) log( "%(class_name)s FINISHED CONSUMERS/PUBLISHERS CREATE", params={"class_name": self.__class__.__name__}, level="WARNING"