diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index 467a93cc83..5892066452 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -593,22 +593,23 @@ async def publish(self, message: str, channel: str, sharded: bool = False) -> in """ Publish a message on pubsub channel. This command aggregates PUBLISH and SPUBLISH commands functionalities. - The mode is selected using the 'sharded' parameter + The mode is selected using the 'sharded' parameter. + For both sharded and non-sharded mode, request is routed using hashed channel as key. See https://valkey.io/commands/publish and https://valkey.io/commands/spublish for more details. Args: message (str): Message to publish channel (str): Channel to publish the message on. - sharded (bool): Use sharded pubsub mode. + sharded (bool): Use sharded pubsub mode. Available since Redis version 7.0. Returns: - int: Number of subscriptions in that shard that received the message. + int: Number of subscriptions in that node that received the message. Examples: >>> await client.publish("Hi all!", "global-channel", False) - 1 # Publishes "Hi all!" message on global-channel channel using non-sharded mode + 1 # Published 1 instance of "Hi all!" message on global-channel channel using non-sharded mode >>> await client.publish("Hi to sharded channel1!", "channel1, True) - 2 # Publishes "Hi to sharded channel1!" message on channel1 using sharded mode + 2 # Published 2 instances of "Hi to sharded channel1!" message on channel1 using sharded mode """ result = await self._execute_command( RequestType.SPublish if sharded else RequestType.Publish, [channel, message] diff --git a/python/python/glide/async_commands/standalone_commands.py b/python/python/glide/async_commands/standalone_commands.py index 00d969862a..4c0ee234aa 100644 --- a/python/python/glide/async_commands/standalone_commands.py +++ b/python/python/glide/async_commands/standalone_commands.py @@ -497,7 +497,7 @@ async def sort_store( result = await self._execute_command(RequestType.Sort, args) return cast(int, result) - async def publish(self, message: str, channel: str) -> TOK: + async def publish(self, message: str, channel: str) -> int: """ Publish a message on pubsub channel. See https://valkey.io/commands/publish for more details. @@ -507,14 +507,15 @@ async def publish(self, message: str, channel: str) -> TOK: channel (str): Channel to publish the message on. Returns: - TOK: a simple `OK` response. + int: Number of subscriptions in primary node that received the message. + Note that this value does not include subscriptions that configured on replicas. Examples: >>> await client.publish("Hi all!", "global-channel") - "OK" + 1 # This message was posted to 1 subscription which is configured on primary node """ - await self._execute_command(RequestType.Publish, [channel, message]) - return cast(TOK, OK) + result = await self._execute_command(RequestType.Publish, [channel, message]) + return cast(int, result) async def flushall(self, flush_mode: Optional[FlushMode] = None) -> TOK: """ diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index fdbd25dded..a570176325 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -4456,6 +4456,21 @@ def copy( return self.append_command(RequestType.Copy, args) + def publish(self: TTransaction, message: str, channel: str) -> TTransaction: + """ + Publish a message on pubsub channel. + See https://valkey.io/commands/publish for more details. + + Args: + message (str): Message to publish + channel (str): Channel to publish the message on. + + Returns: + TOK: a simple `OK` response. + + """ + return self.append_command(RequestType.Publish, [channel, message]) + class ClusterTransaction(BaseTransaction): """ @@ -4551,4 +4566,25 @@ def copy( return self.append_command(RequestType.Copy, args) + def publish( + self: TTransaction, message: str, channel: str, sharded: bool = False + ) -> TTransaction: + """ + Publish a message on pubsub channel. + This command aggregates PUBLISH and SPUBLISH commands functionalities. + The mode is selected using the 'sharded' parameter + See https://valkey.io/commands/publish and https://valkey.io/commands/spublish for more details. + + Args: + message (str): Message to publish + channel (str): Channel to publish the message on. + sharded (bool): Use sharded pubsub mode. Available since Redis version 7.0. + + Returns: + int: Number of subscriptions in that shard that received the message. + """ + return self.append_command( + RequestType.SPublish if sharded else RequestType.Publish, [channel, message] + ) + # TODO: add all CLUSTER commands diff --git a/python/python/glide/config.py b/python/python/glide/config.py index 8c6405e313..080d03c5c7 100644 --- a/python/python/glide/config.py +++ b/python/python/glide/config.py @@ -388,7 +388,7 @@ class PubSubChannelModes(IntEnum): Pattern = 1 """ Use channel name patterns """ Sharded = 2 - """ Use sharded pubsub """ + """ Use sharded pubsub. Available since Redis version 7.0. """ @dataclass class PubSubSubscriptions: diff --git a/python/python/glide/glide_client.py b/python/python/glide/glide_client.py index 9c111fef8d..7c1147ef8f 100644 --- a/python/python/glide/glide_client.py +++ b/python/python/glide/glide_client.py @@ -403,6 +403,8 @@ def _notification_to_pubsub_message_safe( or message_kind == "Subscribe" or message_kind == "SSubscribe" or message_kind == "Unsubscribe" + or message_kind == "PUnsubscribe" + or message_kind == "SUnsubscribe" ): pass else: diff --git a/python/python/tests/test_pubsub.py b/python/python/tests/test_pubsub.py index 8453a2d423..9a636a8494 100644 --- a/python/python/tests/test_pubsub.py +++ b/python/python/tests/test_pubsub.py @@ -169,7 +169,6 @@ async def test_pubsub_exact_happy_path( """ channel = get_random_string(10) message = get_random_string(5) - publish_response = 1 if cluster_mode else OK callback, context = None, None callback_messages: List[CoreCommands.PubSubMsg] = [] @@ -190,7 +189,9 @@ async def test_pubsub_exact_happy_path( ) try: - assert await publishing_client.publish(message, channel) == publish_response + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # allow the message to propagate await asyncio.sleep(1) @@ -224,7 +225,6 @@ async def test_pubsub_exact_happy_path_coexistence( channel = get_random_string(10) message = get_random_string(5) message2 = get_random_string(7) - publish_response = 1 if cluster_mode else OK pub_sub = create_pubsub_subscription( cluster_mode, @@ -237,10 +237,11 @@ async def test_pubsub_exact_happy_path_coexistence( ) try: - assert await publishing_client.publish(message, channel) == publish_response - assert ( - await publishing_client.publish(message2, channel) == publish_response - ) + for msg in [message, message2]: + result = await publishing_client.publish(msg, channel) + if cluster_mode: + assert result == 1 + # allow the message to propagate await asyncio.sleep(1) @@ -288,7 +289,6 @@ async def test_pubsub_exact_happy_path_many_channels( """ NUM_CHANNELS = 256 shard_prefix = "{same-shard}" - publish_response = 1 if cluster_mode else OK # Create a map of channels to random messages with shard prefix channels_and_messages = { @@ -324,10 +324,9 @@ async def test_pubsub_exact_happy_path_many_channels( try: # Publish messages to each channel for channel, message in channels_and_messages.items(): - assert ( - await publishing_client.publish(message, channel) - == publish_response - ) + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # Allow the messages to propagate await asyncio.sleep(1) @@ -371,7 +370,6 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( """ NUM_CHANNELS = 256 shard_prefix = "{same-shard}" - publish_response = 1 if cluster_mode else OK # Create a map of channels to random messages with shard prefix channels_and_messages = { @@ -400,10 +398,9 @@ async def test_pubsub_exact_happy_path_many_channels_co_existence( try: # Publish messages to each channel for channel, message in channels_and_messages.items(): - assert ( - await publishing_client.publish(message, channel) - == publish_response - ) + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # Allow the messages to propagate await asyncio.sleep(1) @@ -684,7 +681,6 @@ async def test_pubsub_pattern( "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), } - publish_response = 1 if cluster_mode else OK callback, context = None, None callback_messages: List[CoreCommands.PubSubMsg] = [] @@ -705,10 +701,9 @@ async def test_pubsub_pattern( try: for channel, message in channels.items(): - assert ( - await publishing_client.publish(message, channel) - == publish_response - ) + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # allow the message to propagate await asyncio.sleep(1) @@ -749,7 +744,6 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5), } - publish_response = 1 if cluster_mode else OK pub_sub = create_pubsub_subscription( cluster_mode, @@ -763,10 +757,9 @@ async def test_pubsub_pattern_co_existence(self, request, cluster_mode: bool): try: for channel, message in channels.items(): - assert ( - await publishing_client.publish(message, channel) - == publish_response - ) + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # allow the message to propagate await asyncio.sleep(1) @@ -817,7 +810,6 @@ async def test_pubsub_pattern_many_channels( "{{{}}}:{}".format("channel", get_random_string(5)): get_random_string(5) for _ in range(NUM_CHANNELS) } - publish_response = 1 if cluster_mode else OK callback, context = None, None callback_messages: List[CoreCommands.PubSubMsg] = [] @@ -838,10 +830,9 @@ async def test_pubsub_pattern_many_channels( try: for channel, message in channels.items(): - assert ( - await publishing_client.publish(message, channel) - == publish_response - ) + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # allow the message to propagate await asyncio.sleep(1) @@ -904,8 +895,6 @@ async def test_pubsub_combined_exact_and_pattern_one_client( **pattern_channels_and_messages, } - publish_response = 1 if cluster_mode else OK - callback, context = None, None callback_messages: List[CoreCommands.PubSubMsg] = [] @@ -941,10 +930,9 @@ async def test_pubsub_combined_exact_and_pattern_one_client( try: # Publish messages to all channels for channel, message in all_channels_and_messages.items(): - assert ( - await publishing_client.publish(message, channel) - == publish_response - ) + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # allow the message to propagate await asyncio.sleep(1) @@ -1018,8 +1006,6 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( **pattern_channels_and_messages, } - publish_response = 1 if cluster_mode else OK - callback, context = None, None callback_messages: List[CoreCommands.PubSubMsg] = [] @@ -1071,10 +1057,9 @@ async def test_pubsub_combined_exact_and_pattern_multiple_clients( try: # Publish messages to all channels for channel, message in all_channels_and_messages.items(): - assert ( - await publishing_client.publish(message, channel) - == publish_response - ) + result = await publishing_client.publish(message, channel) + if cluster_mode: + assert result == 1 # allow the messages to propagate await asyncio.sleep(1) @@ -1638,7 +1623,6 @@ async def test_pubsub_two_publishing_clients_same_name( CHANNEL_NAME = "channel-name" MESSAGE_EXACT = get_random_string(10) MESSAGE_PATTERN = get_random_string(7) - publish_response = 2 if cluster_mode else OK callback, context_exact, context_pattern = None, None, None callback_messages_exact: List[CoreCommands.PubSubMsg] = [] callback_messages_pattern: List[CoreCommands.PubSubMsg] = [] @@ -1671,14 +1655,10 @@ async def test_pubsub_two_publishing_clients_same_name( try: # Publish messages to each channel - both clients publishing - assert ( - await client_pattern.publish(MESSAGE_EXACT, CHANNEL_NAME) - == publish_response - ) - assert ( - await client_exact.publish(MESSAGE_PATTERN, CHANNEL_NAME) - == publish_response - ) + for msg in [MESSAGE_EXACT, MESSAGE_PATTERN]: + result = await client_pattern.publish(msg, CHANNEL_NAME) + if cluster_mode: + assert result == 2 # allow the message to propagate await asyncio.sleep(1) diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 635115fc22..39affff14c 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -760,6 +760,10 @@ async def test_cluster_transaction(self, redis_client: GlideClusterClient): keyslot = get_random_string(3) transaction = ClusterTransaction() transaction.info() + if await check_if_server_version_lt(redis_client, "7.0.0"): + transaction.publish("test_message", keyslot, False) + else: + transaction.publish("test_message", keyslot, True) expected = await transaction_test(transaction, keyslot, redis_client) result = await redis_client.exec(transaction) assert isinstance(result, list) @@ -768,7 +772,8 @@ async def test_cluster_transaction(self, redis_client: GlideClusterClient): assert isinstance(result[0], str) # Making sure the "info" command is indeed a return at position 0 assert "# Memory" in result[0] - assert result[1:] == expected + assert result[1] == 0 + assert result[2:] == expected @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) @@ -845,6 +850,7 @@ async def test_standalone_transaction(self, redis_client: GlideClient): ) transaction.select(0) transaction.get(key) + transaction.publish("test_message", "test_channel") expected = await transaction_test(transaction, keyslot, redis_client) result = await redis_client.exec(transaction) assert isinstance(result, list) @@ -853,8 +859,8 @@ async def test_standalone_transaction(self, redis_client: GlideClient): assert isinstance(result[0], str) assert "# Memory" in result[0] assert result[1:5] == [OK, False, OK, value.encode()] - assert result[5:12] == [2, 2, 2, [b"Bob", b"Alice"], 2, OK, None] - assert result[12:] == expected + assert result[5:13] == [2, 2, 2, [b"Bob", b"Alice"], 2, OK, None, 0] + assert result[13:] == expected def test_transaction_clear(self): transaction = Transaction()