Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Feb 28, 2024
1 parent ab46faa commit cf6769c
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 41 deletions.
1 change: 1 addition & 0 deletions examples/simple-consumer-redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
whether each consumer gets only the messages
which have not been obtained by others so far.
"""

import asyncio
import json
import logging
Expand Down
1 change: 1 addition & 0 deletions examples/simple-publisher-redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
simultaneously, so as to check whether messages from
multiple publishers are distributed among the consumers.
"""

import asyncio
import json
import logging
Expand Down
20 changes: 8 additions & 12 deletions examples/simple-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,14 @@ async def handle_output(request):

async def handle_show_memory_stat(request):
global last_snapshot, scheduler
last_snapshot = last_snapshot.filter_traces(
(
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, tracemalloc.__file__),
)
)
new_snapshot = tracemalloc.take_snapshot().filter_traces(
(
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, tracemalloc.__file__),
)
)
last_snapshot = last_snapshot.filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, tracemalloc.__file__),
))
new_snapshot = tracemalloc.take_snapshot().filter_traces((
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, tracemalloc.__file__),
))
top_stats = new_snapshot.compare_to(last_snapshot, "lineno")
last_snapshot = new_snapshot
print("[ Top 10 differences ]")
Expand Down
1 change: 0 additions & 1 deletion src/callosum/lower/dispatch_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ async def __aexit__(self, exc_type, exc_obj, exc_tb):


class DispatchRedisTransport(BaseTransport):

"""
Implementation for unidirectional transport backend by Redis Streams.
"""
Expand Down
1 change: 0 additions & 1 deletion src/callosum/lower/rpc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ async def __aexit__(self, exc_type, exc_obj, exc_tb):


class RPCRedisTransport(BaseTransport):

"""
Implementation for bidirectional transport backend by Redis Streams.
"""
Expand Down
41 changes: 17 additions & 24 deletions src/callosum/lower/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,12 @@ async def recv_message(self) -> AsyncGenerator[Optional[RawHeaderBody], None]:
multipart_msg = await self.transport._sock.recv_multipart()
*pre, zmsg_type, raw_header, raw_body = multipart_msg
if zmsg_type == b"PING":
await self.transport._sock.send_multipart(
[
*pre,
b"PONG",
raw_header,
raw_body,
]
)
await self.transport._sock.send_multipart([
*pre,
b"PONG",
raw_header,
raw_body,
])
elif zmsg_type == b"UPPER":
if len(pre) > 0:
# server
Expand All @@ -250,23 +248,19 @@ async def send_message(self, raw_msg: RawHeaderBody) -> None:
peer_id = raw_msg.peer_id
if peer_id is not None:
# server
await self.transport._sock.send_multipart(
[
peer_id,
b"UPPER",
raw_msg.header,
raw_msg.body,
]
)
await self.transport._sock.send_multipart([
peer_id,
b"UPPER",
raw_msg.header,
raw_msg.body,
])
else:
# client
await self.transport._sock.send_multipart(
[
b"UPPER",
raw_msg.header,
raw_msg.body,
]
)
await self.transport._sock.send_multipart([
b"UPPER",
raw_msg.header,
raw_msg.body,
])


class ZeroMQMonitorMixin:
Expand Down Expand Up @@ -469,7 +463,6 @@ class ZeroMQSubConnector(ZeroMQBaseConnector):


class ZeroMQBaseTransport(BaseTransport):

"""
Implementation for the ZeorMQ-backed transport.
Expand Down
6 changes: 3 additions & 3 deletions src/callosum/rpc/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ async def _recv_loop(self) -> None:
client_request_id[1],
server_seq_id,
)
self._req_idmap[
(request.peer_id, client_request_id)
] = server_request_id
self._req_idmap[(request.peer_id, client_request_id)] = (
server_request_id
)
func_handler = self._lookup_func(request.method)
task = asyncio.create_task(
self._func_task(
Expand Down

0 comments on commit cf6769c

Please sign in to comment.