From d75c0f7bb5eee0f5ec5055a9fa4ae2e94ac6e815 Mon Sep 17 00:00:00 2001 From: VeikoAunapuu Date: Fri, 21 Feb 2025 14:39:23 +0200 Subject: [PATCH] rabbit consumer + config from file --- emf/common/integrations/rabbit.py | 141 ++++++++++++++++++++++++++++ emf/task_generator/manual_worker.py | 20 ++-- emf/task_generator/worker.py | 30 ------ 3 files changed, 151 insertions(+), 40 deletions(-) diff --git a/emf/common/integrations/rabbit.py b/emf/common/integrations/rabbit.py index b15424c..f482b22 100644 --- a/emf/common/integrations/rabbit.py +++ b/emf/common/integrations/rabbit.py @@ -15,6 +15,147 @@ parse_app_properties(globals(), config.paths.integrations.rabbit) +class BlockingClient: + + def __init__(self, + host: str = RMQ_SERVER, + port: int = int(RMQ_PORT), + username: str = RMQ_USERNAME, + password: str = RMQ_PASSWORD, + message_converter: object | None = None, + message_handler: object | None = None, + ): + self.connection_params = { + 'host': host, + 'port': port, + 'credentials': pika.PlainCredentials(username, password) + } + self.message_converter = message_converter + self.message_handler = message_handler + self._connect() + self.consuming = False + + def _connect(self): + # Connect to RabbitMQ server + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(**self.connection_params) + ) + self.publish_channel = self.connection.channel() + self.consume_channel = self.connection.channel() + + def publish(self, payload: str, exchange_name: str, headers: dict | None = None, routing_key: str = ''): + # Publish message + self.publish_channel.basic_publish( + exchange=exchange_name, + routing_key=routing_key, + body=payload, + properties=pika.BasicProperties( + headers=headers + ) + ) + + def get_single_message(self, queue: str, auto_ack: bool = True): + """ + Attempt to fetch a single message from the specified queue. + + :param queue: The name of the queue to fetch the message from. + :param auto_ack: Whether to automatically acknowledge the message. Defaults to True. + :return: The method frame, properties, and body of the message if available; otherwise, None. + """ + + # Stop previous consume + if self.consuming: + self.consume_stop() + + method_frame, properties, body = self.consume_channel.basic_get(queue, auto_ack=auto_ack) + + if method_frame: + logger.info(f"Received message from {queue}: {properties}") + + # Convert message + if self.message_converter: + try: + body, content_type = self.message_converter.convert(body) + properties.content_type = content_type + logger.info(f"Message converted") + except Exception as error: + logger.error(f"Message conversion failed: {error}") + return method_frame, properties, body + else: + logger.info(f"No message available in queue {queue}") + return None, None, None + + def consume_start(self, queue: str, callback: object | None = None, auto_ack: bool = True): + + # Stop previous consume + if self.consuming: + self.consume_stop() + + # Set up consumer + if not callback: + callback = lambda ch, method, properties, body: logger.info(f"Received message: {properties} (No callback processing)") + + self.consume_channel.basic_consume( + queue=queue, + on_message_callback=callback, + auto_ack=auto_ack + ) + + logger.info(f"Waiting for messages in {queue}. To exit press CTRL+C") + + try: + self.consume_channel.start_consuming() + self.consuming = True + except KeyboardInterrupt: + self.consume_stop() + + def shovel(self, + from_queue: str, + to_exchange: str, + callback: object | None = None, + headers: dict | None = None, + routing_key: str = ''): + + def internal_callback(ch, method, properties, body): + + if callback: + ch, method, properties, body = callback(ch, method, properties, body) + + new_headers = properties.headers if properties.headers else {} + # Add or update the 'shovelled' flag + new_headers['shovelled'] = True + + # If additional headers were provided, merge them with the existing ones + if headers: + new_headers.update(headers) + + self.publish(body, to_exchange, headers=new_headers, routing_key=routing_key) + + # Manually acknowledge the message to ensure it's only removed from the queue after successful processing + ch.basic_ack(delivery_tag=method.delivery_tag) + + # Start consuming with the internal callback. Set auto_ack=False for manual ack in the callback. + self.consume_start(from_queue, callback=internal_callback, auto_ack=False) + + def consume_stop(self): + self.consume_channel.stop_consuming() + self.consuming = False + + def close(self): + + # Stop consuming + if self.consuming: + self.consume_stop() + + # Close the connection + if self.connection.is_open: + self.connection.close() + + def __del__(self): + # Destructor to ensure the connection is closed properly + self.close() + + class RMQConsumer: """This is an example consumer that will handle unexpected interactions with RabbitMQ such as channel and connection closures. diff --git a/emf/task_generator/manual_worker.py b/emf/task_generator/manual_worker.py index 338496e..46f2035 100644 --- a/emf/task_generator/manual_worker.py +++ b/emf/task_generator/manual_worker.py @@ -52,16 +52,16 @@ process_config_json[0]['runs'][0]['properties']['included'] = [tso.strip() for tso in INCLUDED_TSO.split(',')] if INCLUDED_TSO else [] process_config_json[0]['runs'][0]['properties']['excluded'] = [tso.strip() for tso in EXCLUDED_TSO.split(',')] if EXCLUDED_TSO else [] process_config_json[0]['runs'][0]['properties']['local_import'] = [tso.strip() for tso in LOCAL_IMPORT.split(',')] if LOCAL_IMPORT else [] -process_config_json[0]['runs'][0]['properties']['version'] = TASK_VERSION -process_config_json[0]['runs'][0]['properties']['replacement'] = RUN_REPLACEMENT -process_config_json[0]['runs'][0]['properties']['replacement_local'] = RUN_REPLACEMENT_LOCAL -process_config_json[0]['runs'][0]['properties']['scaling'] = RUN_SCALING -process_config_json[0]['runs'][0]['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM -process_config_json[0]['runs'][0]['properties']['upload_to_minio'] = UPLOAD_TO_MINIO -process_config_json[0]['runs'][0]['properties']['send_merge_report'] = SEND_MERGE_REPORT -process_config_json[0]['runs'][0]['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES -process_config_json[0]['runs'][0]['properties']['post_temp_fixes'] = POST_TEMP_FIXES -process_config_json[0]['runs'][0]['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX +process_config_json[0]['runs'][0]['properties']['version'] = os.environ(TASK_VERSION, process_config_json[0]['runs'][0]['properties']['version'] ) +process_config_json[0]['runs'][0]['properties']['replacement'] = os.environ(RUN_REPLACEMENT,process_config_json[0]['runs'][0]['properties']['replacement']) +process_config_json[0]['runs'][0]['properties']['replacement_local'] = os.environ(RUN_REPLACEMENT_LOCAL,process_config_json[0]['runs'][0]['properties']['replacement_local']) +process_config_json[0]['runs'][0]['properties']['scaling'] = os.environ(RUN_SCALING,process_config_json[0]['runs'][0]['properties']['scaling']) +process_config_json[0]['runs'][0]['properties']['upload_to_opdm'] = os.environ(UPLOAD_TO_OPDM,process_config_json[0]['runs'][0]['properties']['upload_to_opdm']) +process_config_json[0]['runs'][0]['properties']['upload_to_minio'] = os.environ(UPLOAD_TO_MINIO,process_config_json[0]['runs'][0]['properties']['upload_to_minio']) +process_config_json[0]['runs'][0]['properties']['send_merge_report'] = os.environ(SEND_MERGE_REPORT,process_config_json[0]['runs'][0]['properties']['send_merge_report']) +process_config_json[0]['runs'][0]['properties']['pre_temp_fixes'] = os.environ(PRE_TEMP_FIXES,process_config_json[0]['runs'][0]['properties']['pre_temp_fixes']) +process_config_json[0]['runs'][0]['properties']['post_temp_fixes'] = os.environ(POST_TEMP_FIXES,process_config_json[0]['runs'][0]['properties']['post_temp_fixes']) +process_config_json[0]['runs'][0]['properties']['force_outage_fix'] = os.environ(FORCE_OUTAGE_FIX,process_config_json[0]['runs'][0]['properties']['force_outage_fix']) if PROCESS_TIME_SHIFT: diff --git a/emf/task_generator/worker.py b/emf/task_generator/worker.py index cb90951..c101348 100644 --- a/emf/task_generator/worker.py +++ b/emf/task_generator/worker.py @@ -16,36 +16,6 @@ process_config_json = json.load(process_conf) -for runs in process_config_json[0]['runs']: - runs['properties']['included'] = [tso.strip() for tso in CGM_INCLUDED_TSO.split(',')] if CGM_INCLUDED_TSO else [] - runs['properties']['excluded'] = [tso.strip() for tso in CGM_EXCLUDED_TSO.split(',')] if CGM_EXCLUDED_TSO else [] - runs['properties']['local_import'] = [tso.strip() for tso in CGM_LOCAL_IMPORT.split(',')] if CGM_LOCAL_IMPORT else [] - runs['properties']['replacement'] = RUN_REPLACEMENT_CGM - runs['properties']['replacement_local'] = RUN_REPLACEMENT_LOCAL - runs['properties']['scaling'] = RUN_SCALING_CGM - runs['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM_CGM - runs['properties']['upload_to_minio'] = UPLOAD_TO_MINIO_CGM - runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_CGM - runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES - runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES - runs['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX - -for runs in process_config_json[1]['runs']: - runs['properties']['included'] = [tso.strip() for tso in RMM_INCLUDED_TSO.split(',')] if RMM_INCLUDED_TSO else [] - runs['properties']['excluded'] = [tso.strip() for tso in RMM_EXCLUDED_TSO.split(',')] if RMM_EXCLUDED_TSO else [] - runs['properties']['local_import'] = [tso.strip() for tso in RMM_LOCAL_IMPORT.split(',')] if RMM_LOCAL_IMPORT else [] - runs['properties']['replacement'] = RUN_REPLACEMENT_RMM - runs['properties']['replacement_local'] = RUN_REPLACEMENT_LOCAL - runs['properties']['scaling'] = RUN_SCALING_RMM - runs['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM_RMM - runs['properties']['upload_to_minio'] = UPLOAD_TO_MINIO_RMM - runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_RMM - runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES - runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES - runs['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX - -with open(process_conf, 'w') as file: - json.dump(process_config_json, file, indent=1) tasks = list(generate_tasks(TASK_WINDOW_DURATION, TASK_WINDOW_REFERENCE, process_conf, timeframe_conf, TIMETRAVEL))