Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

function maybe_awaitable: coroutine object IncomingMessage. worker cannot recieve the massage from broker #373

Open
Tsovak opened this issue Nov 4, 2024 · 2 comments

Comments

@Tsovak
Copy link

Tsovak commented Nov 4, 2024

the worker reises an error. I guess the root cause came when the connection with RabbitMQ terminated or was lost.
No exception was found before or after this error.

Unfortunately, I don't know how to reproduce it.

My configuration:

result_expire_time_seconds = int(one_day_seconds)

retry = SimpleRetryMiddleware(default_retry_count=3)

result_backend = RedisAsyncResultBackend(
    redis_url=str(settings.redis.redis_url.with_path("/1")),
    result_ex_time=result_expire_time_seconds,
    serializer=JSONSerializer(),
)

broker = (
    AioPikaBroker(
        url=str(settings.rabbit.rabbit_url),
    )
    .with_result_backend(result_backend)
    .with_middlewares(retry)
)


Exception:

2024-11-04 12:25:26.767 | ERROR    | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Task exception was never retrieved
future: <Task finished name='Task-427' coro=<Receiver.callback() done, defined at /usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py:87> exception=ChannelInvalidStateError()>
Traceback (most recent call last):

> File "/usr/local/lib/python3.11/site-packages/taskiq/receiver/receiver.py", line 184, in callback
    await maybe_awaitable(message.ack())
          │               │       └ <bound method IncomingMessage.ack of IncomingMessage:{'app_id': None,
          │               │          'body_size': 274,
          │               │          'cluster_id': '',
          │               │          'consumer_tag': ...
          │               └ AckableMessage(data=b'{"task_id": "15fecea81c6048bb80764c2ebddf14e0", "task_name": "task_drive_fetch_files", "labels": {"asan...
          └ <function maybe_awaitable at 0x7f738b1df240>
  File "/usr/local/lib/python3.11/site-packages/taskiq/utils.py", line 23, in maybe_awaitable
    return await possible_coroutine
                 └ <coroutine object IncomingMessage.ack at 0x7f7313fcd460>
  File "/usr/local/lib/python3.11/site-packages/aio_pika/message.py", line 453, in ack
    await self.channel.basic_ack(
          │    └ <property object at 0x7f7388f0e160>
          └ IncomingMessage:{'app_id': None,
             'body_size': 274,
             'cluster_id': '',
             'consumer_tag': 'ctag1.0580faeeeaf941c7b8e9a7148f4bff...
  File "/usr/local/lib/python3.11/site-packages/aio_pika/message.py", line 391, in channel
    raise ChannelInvalidStateError
          └ <class 'aiormq.exceptions.ChannelInvalidStateError'>

aiormq.exceptions.ChannelInvalidStateError
@s3rius
Copy link
Member

s3rius commented Nov 5, 2024

Sadly, I'm not sure how to reproduce such case either.

But what happens next after this exception is raised? Workers hang or restart? In second case I guess it's fine to leave this implementation as is.

@Tsovak
Copy link
Author

Tsovak commented Nov 5, 2024

But what happens next after this exception is raised? Workers hang or restart? In second case I guess it's fine to leave this implementation as is.

I enabled debug logging and found out that

  • the connection was actually lost 4 seconds ago,
  • but could reestablish itself.
  • reises the error
  • continues work normally

but after I found a huge exception that maybe related to the issue. but I think it comes from the connection stability
the full log here https://pastebin.com/H1u7hFKm

2024-11-05 03:15:53.559 | WARNING  | trace_id=0 | span_id=0 | logging:callHandlers:1706 - Cannot parse message: b''. Skipping execution.
 Expecting value: line 1 column 1 (char 0)
Traceback (most recent call last):

  File "/usr/local/bin/taskiq", line 8, in <module>
    sys.exit(main())
    │   │    └ <function main at 0x7fd7adc498a0>
    │   └ <built-in function exit>
    └ <module 'sys' (built-in)>
  File "/usr/local/lib/python3.11/site-packages/taskiq/__main__.py", line 73, in main
    status = command.exec(sys.argv[1:])
             │       │    │   └ ['worker', 'backend.tkq:broker']
             │       │    └ <module 'sys' (built-in)>
             │       └ <function WorkerCMD.exec at 0x7fd7adb071a0>
             └ <taskiq.cli.worker.cmd.WorkerCMD object at 0x7fd7ade253d0>
  File "/usr/local/lib/python3.11/site-packages/taskiq/cli/worker/cmd.py", line 27, in exec
  
  ........
  
  
  
             └ <function model_validate at 0x7fd7add77d80>
  File "/usr/local/lib/python3.11/site-packages/taskiq/serializers/json_serializer.py", line 32, in loadb
    return loads(value.decode())
           │     │     └ <method 'decode' of 'bytes' objects>
           │     └ b''
           └ <function loads at 0x7fd7af056b60>
  File "/usr/local/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           │                │      └ ''
           │                └ <function JSONDecoder.decode at 0x7fd7af056480>
           └ <json.decoder.JSONDecoder object at 0x7fd7af0a2050>
  File "/usr/local/lib/python3.11/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               │    │          │      │  └ ''
               │    │          │      └ <built-in method match of re.Pattern object at 0x7fd7aefef1d0>
               │    │          └ ''
               │    └ <function JSONDecoder.raw_decode at 0x7fd7af056520>
               └ <json.decoder.JSONDecoder object at 0x7fd7af0a2050>
  File "/usr/local/lib/python3.11/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
          │                                  └ ''
          └ <class 'json.decoder.JSONDecodeError'>

json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants