Skip to content

Commit

Permalink
add: priority in rabbit wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
1martin1 committed Jan 22, 2025
1 parent a989347 commit 21f2c43
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,37 @@ def get_channel(self):
channel.close()
connection.close()

def publish_message(self, queue_name: str, message: dict):
def publish_message(self, queue_name: str, message: dict, priority: int = None):
"""
Publish a message to a specified queue.
Publish a message to a specified queue with an optional priority.
:param queue_name: Name of the queue to publish to
:param message: Message to publish (dictionary will be serialized to JSON)
:param priority: Optional priority of the message (0-255)
"""
try:
with self.get_channel() as channel:
channel.queue_declare(queue=queue_name, durable=True)
# Declare the queue with priority if specified
arguments = {}
if priority is not None:
arguments['x-max-priority'] = 10 # Set the maximum priority level

channel.queue_declare(queue=queue_name, durable=True, arguments=arguments)

# Publish the message with the specified priority
properties = pika.BasicProperties(
delivery_mode=2, # Make message persistent
priority=priority if priority is not None else 0 # Default to 0 if no priority
)

channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2 # Make message persistent
)
properties=properties
)
logger.info(f"Message published to queue '{queue_name}'")
logger.info(
f"Message published to queue '{queue_name}' with priority {priority if priority is not None else 'None'}")
except Exception as ex:
logger.error(f"Failed to publish message to queue '{queue_name}'. Error: {ex}")
raise Exception(f"Failed to publish message to queue '{queue_name}'. Error: {ex}") from ex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ def test_publish_message(rabbit_wrapper):
assert method_frame is not None, "Message not found in the queue"
assert json.loads(body) == message, "Message in the queue does not match the sent message"

@pytest.mark.local
def test_publish_message_with_priority(rabbit_wrapper):
"""
Tests successful message publishing to a queue with priority.
"""
queue_name = "test_priority_queue"
message = {"key": "value"}
priority = 5

rabbit_wrapper.publish_message(queue_name, message, priority=priority)

with rabbit_wrapper.get_channel() as channel:
method_frame, header_frame, body = channel.basic_get(queue_name, auto_ack=True)
assert method_frame is not None, "Message not found in the queue"
assert json.loads(body) == message, "Message in the queue does not match the sent message"
assert header_frame.priority == priority, f"Expected priority {priority}, but got {header_frame.priority}"


@pytest.mark.local
def test_consume_message(rabbit_wrapper):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,41 @@ def test_publish_message(rabbit_wrapper, mock_pika):

rabbit_wrapper.publish_message(queue_name, message)

mock_pika.queue_declare.assert_called_once_with(queue=queue_name, durable=True)
mock_pika.queue_declare.assert_called_once_with(queue=queue_name, durable=True, arguments={})
mock_pika.basic_publish.assert_called_once_with(
exchange="",
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2, priority=0),
)

@pytest.mark.ci
def test_publish_message_with_priority(rabbit_wrapper, mock_pika):
"""
Tests successful message publishing to a queue with priority.
"""
queue_name = "test_queue"
message = {"key": "value"}
priority = 5

rabbit_wrapper.publish_message(queue_name, message, priority=priority)

# Проверяем, что очередь была объявлена с аргументом 'x-max-priority'
mock_pika.queue_declare.assert_called_once_with(
queue=queue_name,
durable=True,
arguments={"x-max-priority": 10} # Убедитесь, что максимальный приоритет соответствует вашему коду
)

# Проверяем, что сообщение было опубликовано с заданным приоритетом
mock_pika.basic_publish.assert_called_once_with(
exchange="",
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2),
properties=pika.BasicProperties(
delivery_mode=2, # Сделать сообщение постоянным
priority=priority
),
)

@pytest.mark.ci
Expand Down

0 comments on commit 21f2c43

Please sign in to comment.