Skip to content

Commit

Permalink
rabbit consumer + config from file
Browse files Browse the repository at this point in the history
  • Loading branch information
VeikoAunapuu authored Feb 21, 2025
1 parent 866f4d0 commit d75c0f7
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 40 deletions.
141 changes: 141 additions & 0 deletions emf/common/integrations/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,147 @@
parse_app_properties(globals(), config.paths.integrations.rabbit)


class BlockingClient:

def __init__(self,
host: str = RMQ_SERVER,
port: int = int(RMQ_PORT),
username: str = RMQ_USERNAME,
password: str = RMQ_PASSWORD,
message_converter: object | None = None,
message_handler: object | None = None,
):
self.connection_params = {
'host': host,
'port': port,
'credentials': pika.PlainCredentials(username, password)
}
self.message_converter = message_converter
self.message_handler = message_handler
self._connect()
self.consuming = False

def _connect(self):
# Connect to RabbitMQ server
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(**self.connection_params)
)
self.publish_channel = self.connection.channel()
self.consume_channel = self.connection.channel()

def publish(self, payload: str, exchange_name: str, headers: dict | None = None, routing_key: str = ''):
# Publish message
self.publish_channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=payload,
properties=pika.BasicProperties(
headers=headers
)
)

def get_single_message(self, queue: str, auto_ack: bool = True):
"""
Attempt to fetch a single message from the specified queue.
:param queue: The name of the queue to fetch the message from.
:param auto_ack: Whether to automatically acknowledge the message. Defaults to True.
:return: The method frame, properties, and body of the message if available; otherwise, None.
"""

# Stop previous consume
if self.consuming:
self.consume_stop()

method_frame, properties, body = self.consume_channel.basic_get(queue, auto_ack=auto_ack)

if method_frame:
logger.info(f"Received message from {queue}: {properties}")

# Convert message
if self.message_converter:
try:
body, content_type = self.message_converter.convert(body)
properties.content_type = content_type
logger.info(f"Message converted")
except Exception as error:
logger.error(f"Message conversion failed: {error}")
return method_frame, properties, body
else:
logger.info(f"No message available in queue {queue}")
return None, None, None

def consume_start(self, queue: str, callback: object | None = None, auto_ack: bool = True):

# Stop previous consume
if self.consuming:
self.consume_stop()

# Set up consumer
if not callback:
callback = lambda ch, method, properties, body: logger.info(f"Received message: {properties} (No callback processing)")

self.consume_channel.basic_consume(
queue=queue,
on_message_callback=callback,
auto_ack=auto_ack
)

logger.info(f"Waiting for messages in {queue}. To exit press CTRL+C")

try:
self.consume_channel.start_consuming()
self.consuming = True
except KeyboardInterrupt:
self.consume_stop()

def shovel(self,
from_queue: str,
to_exchange: str,
callback: object | None = None,
headers: dict | None = None,
routing_key: str = ''):

def internal_callback(ch, method, properties, body):

if callback:
ch, method, properties, body = callback(ch, method, properties, body)

new_headers = properties.headers if properties.headers else {}
# Add or update the 'shovelled' flag
new_headers['shovelled'] = True

# If additional headers were provided, merge them with the existing ones
if headers:
new_headers.update(headers)

self.publish(body, to_exchange, headers=new_headers, routing_key=routing_key)

# Manually acknowledge the message to ensure it's only removed from the queue after successful processing
ch.basic_ack(delivery_tag=method.delivery_tag)

# Start consuming with the internal callback. Set auto_ack=False for manual ack in the callback.
self.consume_start(from_queue, callback=internal_callback, auto_ack=False)

def consume_stop(self):
self.consume_channel.stop_consuming()
self.consuming = False

def close(self):

# Stop consuming
if self.consuming:
self.consume_stop()

# Close the connection
if self.connection.is_open:
self.connection.close()

def __del__(self):
# Destructor to ensure the connection is closed properly
self.close()


class RMQConsumer:
"""This is an example consumer that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
Expand Down
20 changes: 10 additions & 10 deletions emf/task_generator/manual_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@
process_config_json[0]['runs'][0]['properties']['included'] = [tso.strip() for tso in INCLUDED_TSO.split(',')] if INCLUDED_TSO else []
process_config_json[0]['runs'][0]['properties']['excluded'] = [tso.strip() for tso in EXCLUDED_TSO.split(',')] if EXCLUDED_TSO else []
process_config_json[0]['runs'][0]['properties']['local_import'] = [tso.strip() for tso in LOCAL_IMPORT.split(',')] if LOCAL_IMPORT else []
process_config_json[0]['runs'][0]['properties']['version'] = TASK_VERSION
process_config_json[0]['runs'][0]['properties']['replacement'] = RUN_REPLACEMENT
process_config_json[0]['runs'][0]['properties']['replacement_local'] = RUN_REPLACEMENT_LOCAL
process_config_json[0]['runs'][0]['properties']['scaling'] = RUN_SCALING
process_config_json[0]['runs'][0]['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM
process_config_json[0]['runs'][0]['properties']['upload_to_minio'] = UPLOAD_TO_MINIO
process_config_json[0]['runs'][0]['properties']['send_merge_report'] = SEND_MERGE_REPORT
process_config_json[0]['runs'][0]['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES
process_config_json[0]['runs'][0]['properties']['post_temp_fixes'] = POST_TEMP_FIXES
process_config_json[0]['runs'][0]['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX
process_config_json[0]['runs'][0]['properties']['version'] = os.environ(TASK_VERSION, process_config_json[0]['runs'][0]['properties']['version'] )
process_config_json[0]['runs'][0]['properties']['replacement'] = os.environ(RUN_REPLACEMENT,process_config_json[0]['runs'][0]['properties']['replacement'])
process_config_json[0]['runs'][0]['properties']['replacement_local'] = os.environ(RUN_REPLACEMENT_LOCAL,process_config_json[0]['runs'][0]['properties']['replacement_local'])
process_config_json[0]['runs'][0]['properties']['scaling'] = os.environ(RUN_SCALING,process_config_json[0]['runs'][0]['properties']['scaling'])
process_config_json[0]['runs'][0]['properties']['upload_to_opdm'] = os.environ(UPLOAD_TO_OPDM,process_config_json[0]['runs'][0]['properties']['upload_to_opdm'])
process_config_json[0]['runs'][0]['properties']['upload_to_minio'] = os.environ(UPLOAD_TO_MINIO,process_config_json[0]['runs'][0]['properties']['upload_to_minio'])
process_config_json[0]['runs'][0]['properties']['send_merge_report'] = os.environ(SEND_MERGE_REPORT,process_config_json[0]['runs'][0]['properties']['send_merge_report'])
process_config_json[0]['runs'][0]['properties']['pre_temp_fixes'] = os.environ(PRE_TEMP_FIXES,process_config_json[0]['runs'][0]['properties']['pre_temp_fixes'])
process_config_json[0]['runs'][0]['properties']['post_temp_fixes'] = os.environ(POST_TEMP_FIXES,process_config_json[0]['runs'][0]['properties']['post_temp_fixes'])
process_config_json[0]['runs'][0]['properties']['force_outage_fix'] = os.environ(FORCE_OUTAGE_FIX,process_config_json[0]['runs'][0]['properties']['force_outage_fix'])


if PROCESS_TIME_SHIFT:
Expand Down
30 changes: 0 additions & 30 deletions emf/task_generator/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,6 @@

process_config_json = json.load(process_conf)

for runs in process_config_json[0]['runs']:
runs['properties']['included'] = [tso.strip() for tso in CGM_INCLUDED_TSO.split(',')] if CGM_INCLUDED_TSO else []
runs['properties']['excluded'] = [tso.strip() for tso in CGM_EXCLUDED_TSO.split(',')] if CGM_EXCLUDED_TSO else []
runs['properties']['local_import'] = [tso.strip() for tso in CGM_LOCAL_IMPORT.split(',')] if CGM_LOCAL_IMPORT else []
runs['properties']['replacement'] = RUN_REPLACEMENT_CGM
runs['properties']['replacement_local'] = RUN_REPLACEMENT_LOCAL
runs['properties']['scaling'] = RUN_SCALING_CGM
runs['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM_CGM
runs['properties']['upload_to_minio'] = UPLOAD_TO_MINIO_CGM
runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_CGM
runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES
runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES
runs['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX

for runs in process_config_json[1]['runs']:
runs['properties']['included'] = [tso.strip() for tso in RMM_INCLUDED_TSO.split(',')] if RMM_INCLUDED_TSO else []
runs['properties']['excluded'] = [tso.strip() for tso in RMM_EXCLUDED_TSO.split(',')] if RMM_EXCLUDED_TSO else []
runs['properties']['local_import'] = [tso.strip() for tso in RMM_LOCAL_IMPORT.split(',')] if RMM_LOCAL_IMPORT else []
runs['properties']['replacement'] = RUN_REPLACEMENT_RMM
runs['properties']['replacement_local'] = RUN_REPLACEMENT_LOCAL
runs['properties']['scaling'] = RUN_SCALING_RMM
runs['properties']['upload_to_opdm'] = UPLOAD_TO_OPDM_RMM
runs['properties']['upload_to_minio'] = UPLOAD_TO_MINIO_RMM
runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_RMM
runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES
runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES
runs['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX

with open(process_conf, 'w') as file:
json.dump(process_config_json, file, indent=1)

tasks = list(generate_tasks(TASK_WINDOW_DURATION, TASK_WINDOW_REFERENCE, process_conf, timeframe_conf, TIMETRAVEL))

Expand Down

0 comments on commit d75c0f7

Please sign in to comment.