diff --git a/protollm_tools/sdk/protollm_sdk/object_interface/rabbit_mq_wrapper.py b/protollm_tools/sdk/protollm_sdk/object_interface/rabbit_mq_wrapper.py index 07e588e..a2ef3ca 100644 --- a/protollm_tools/sdk/protollm_sdk/object_interface/rabbit_mq_wrapper.py +++ b/protollm_tools/sdk/protollm_sdk/object_interface/rabbit_mq_wrapper.py @@ -39,26 +39,37 @@ def get_channel(self): channel.close() connection.close() - def publish_message(self, queue_name: str, message: dict): + def publish_message(self, queue_name: str, message: dict, priority: int = None): """ - Publish a message to a specified queue. + Publish a message to a specified queue with an optional priority. :param queue_name: Name of the queue to publish to :param message: Message to publish (dictionary will be serialized to JSON) + :param priority: Optional priority of the message (0-255) """ try: with self.get_channel() as channel: - channel.queue_declare(queue=queue_name, durable=True) + # Declare the queue with priority if specified + arguments = {} + if priority is not None: + arguments['x-max-priority'] = 10 # Set the maximum priority level + + channel.queue_declare(queue=queue_name, durable=True, arguments=arguments) + + # Publish the message with the specified priority + properties = pika.BasicProperties( + delivery_mode=2, # Make message persistent + priority=priority if priority is not None else 0 # Default to 0 if no priority + ) channel.basic_publish( exchange='', routing_key=queue_name, body=json.dumps(message), - properties=pika.BasicProperties( - delivery_mode=2 # Make message persistent - ) + properties=properties ) - logger.info(f"Message published to queue '{queue_name}'") + logger.info( + f"Message published to queue '{queue_name}' with priority {priority if priority is not None else 'None'}") except Exception as ex: logger.error(f"Failed to publish message to queue '{queue_name}'. Error: {ex}") raise Exception(f"Failed to publish message to queue '{queue_name}'. Error: {ex}") from ex diff --git a/protollm_tools/sdk/tests/protollm_sdk/object_interface/integration/test_rabbit_mq_wrapper.py b/protollm_tools/sdk/tests/protollm_sdk/object_interface/integration/test_rabbit_mq_wrapper.py index 1f78f8e..cbb01ca 100644 --- a/protollm_tools/sdk/tests/protollm_sdk/object_interface/integration/test_rabbit_mq_wrapper.py +++ b/protollm_tools/sdk/tests/protollm_sdk/object_interface/integration/test_rabbit_mq_wrapper.py @@ -44,6 +44,24 @@ def test_publish_message(rabbit_wrapper): assert method_frame is not None, "Message not found in the queue" assert json.loads(body) == message, "Message in the queue does not match the sent message" +@pytest.mark.local +def test_publish_message_with_priority(rabbit_wrapper): + """ + Tests successful message publishing to a queue with priority. + """ + queue_name = "test_priority_queue" + message = {"key": "value"} + priority = 5 + + rabbit_wrapper.publish_message(queue_name, message, priority=priority) + + with rabbit_wrapper.get_channel() as channel: + method_frame, header_frame, body = channel.basic_get(queue_name, auto_ack=True) + assert method_frame is not None, "Message not found in the queue" + assert json.loads(body) == message, "Message in the queue does not match the sent message" + assert header_frame.priority == priority, f"Expected priority {priority}, but got {header_frame.priority}" + + @pytest.mark.local def test_consume_message(rabbit_wrapper): """ diff --git a/protollm_tools/sdk/tests/protollm_sdk/object_interface/unit/test_rabbit_mq_wrapper.py b/protollm_tools/sdk/tests/protollm_sdk/object_interface/unit/test_rabbit_mq_wrapper.py index a50d101..03ffbf2 100644 --- a/protollm_tools/sdk/tests/protollm_sdk/object_interface/unit/test_rabbit_mq_wrapper.py +++ b/protollm_tools/sdk/tests/protollm_sdk/object_interface/unit/test_rabbit_mq_wrapper.py @@ -27,12 +27,41 @@ def test_publish_message(rabbit_wrapper, mock_pika): rabbit_wrapper.publish_message(queue_name, message) - mock_pika.queue_declare.assert_called_once_with(queue=queue_name, durable=True) + mock_pika.queue_declare.assert_called_once_with(queue=queue_name, durable=True, arguments={}) + mock_pika.basic_publish.assert_called_once_with( + exchange="", + routing_key=queue_name, + body=json.dumps(message), + properties=pika.BasicProperties(delivery_mode=2, priority=0), + ) + +@pytest.mark.ci +def test_publish_message_with_priority(rabbit_wrapper, mock_pika): + """ + Tests successful message publishing to a queue with priority. + """ + queue_name = "test_queue" + message = {"key": "value"} + priority = 5 + + rabbit_wrapper.publish_message(queue_name, message, priority=priority) + + # Проверяем, что очередь была объявлена с аргументом 'x-max-priority' + mock_pika.queue_declare.assert_called_once_with( + queue=queue_name, + durable=True, + arguments={"x-max-priority": 10} # Убедитесь, что максимальный приоритет соответствует вашему коду + ) + + # Проверяем, что сообщение было опубликовано с заданным приоритетом mock_pika.basic_publish.assert_called_once_with( exchange="", routing_key=queue_name, body=json.dumps(message), - properties=pika.BasicProperties(delivery_mode=2), + properties=pika.BasicProperties( + delivery_mode=2, # Сделать сообщение постоянным + priority=priority + ), ) @pytest.mark.ci