Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sometimes lost message from amq.rabbitmq.reply-to #422

Open
ikrivosheev opened this issue Dec 18, 2024 · 1 comment
Open

Sometimes lost message from amq.rabbitmq.reply-to #422

ikrivosheev opened this issue Dec 18, 2024 · 1 comment

Comments

@ikrivosheev
Copy link

Hello! Thank you for the library! I have very strange issue: sometimes i don't get message from queue. My flow:

  1. I have server and client
  2. Server flow: queue_declare -> queue_bind -> basic_consume
  3. Client flow: queue_declare(queue_name='amq.rabbitmq.reply-to', durable: false, exclusive: true) -> basic_consume

How it works:

  1. Client: basic_publish request to the server
  2. Server get message and then send response
  3. Client get message from server.

Here is logs:

15:03:57.774 DEBUG test_service_error: ptms_test_utils::container: Start starting container container=ContainerBuilder { name: "ptms_rabbitmq_e3f07976-5022-4218-adae-bef40aac84e9", image: "rabbitmq:3.12.14-management" }
15:04:07.014  WARN test_service_error: ptms_test_utils::container::rabbitmq: RabbitMQ container container=Container { name: "ptms_rabbitmq_e3f07976-5022-4218-adae-bef40aac84e9", image: "rabbitmq:3.12.14-management" } ip_addr=172.17.0.3
15:04:07.036  INFO test_service_error: amqp-rpc: Start amqp-rpc server app_id=AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0") rabbitmq_uri="amqp://172.17.0.3/test_02eec5c7-06ef-476f-968c-5d0338388cdf"
15:04:07.036 DEBUG test_service_error: lapin::channels: create channel id=0
15:04:07.036 TRACE test_service_error: lapin::channel: send_frame channel=0
15:04:07.036 TRACE test_service_error: lapin::channel: wake channel=0
15:04:07.039 TRACE lapin::io_loop: io_loop run
15:04:07.039 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.039 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.039 TRACE lapin::io_loop: io_loop do_run can_read=true can_write=true has_data=true
15:04:07.039 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::ProtocolHeader(0.9.1)
15:04:07.039 TRACE lapin::io_loop: wrote 8 bytes
15:04:07.039 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.039 TRACE lapin::io_loop: io_loop run
15:04:07.039 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.042 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.042 TRACE lapin::io_loop: read 523 bytes
15:04:07.042 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Start(Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) })))
15:04:07.042 TRACE lapin::channel: Server sent connection::Start method=Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) }
15:04:07.042 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.042 TRACE lapin::io_loop: io_loop run
15:04:07.042 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.042 TRACE lapin::channel: send_frame channel=0
15:04:07.042 TRACE lapin::channel: wake channel=0
15:04:07.042 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.042 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(StartOk(StartOk { client_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("platform"): LongString(LongString([114, 117, 115, 116])), ShortString("product"): LongString(LongString([108, 97, 112, 105, 110])), ShortString("version"): LongString(LongString([50, 46, 53, 46, 48]))}), mechanism: ShortString("PLAIN"), response: LongString([0, 103, 117, 101, 115, 116, 0, 103, 117, 101, 115, 116]), locale: ShortString("en_US") })))
15:04:07.042 TRACE lapin::io_loop: wrote 315 bytes
15:04:07.042 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.042 TRACE lapin::io_loop: io_loop run
15:04:07.042 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.042 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.043 TRACE lapin::io_loop: read 20 bytes
15:04:07.043 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Tune(Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })))
15:04:07.043 TRACE lapin::channel: Server sent Connection::Tune method=Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 }
15:04:07.043 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.043 TRACE lapin::io_loop: io_loop run
15:04:07.043 TRACE lapin::channel: send_frame channel=0
15:04:07.043 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.043 TRACE lapin::channel: wake channel=0
15:04:07.043 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.043 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(TuneOk(TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 20 })))
15:04:07.043 TRACE lapin::io_loop: wrote 20 bytes
15:04:07.043 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.043 TRACE lapin::io_loop: io_loop run
15:04:07.043 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.043 TRACE lapin::channel: send_frame channel=0
15:04:07.043 TRACE lapin::frames: state is now waiting channel=0 expected_reply=ExpectedReply(ConnectionOpenOk(Pinky, Connection { configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, channels: Channels { channels: [Channel { id: 0, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }], channel_id: IdSequence { allow_zero: false, max: None, id: 0 }, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, frames: Frames, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, error_handler: ErrorHandler } }))
15:04:07.043 TRACE lapin::channel: wake channel=0
15:04:07.043 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.043 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(Open(Open { virtual_host: ShortString("test_02eec5c7-06ef-476f-968c-5d0338388cdf") })))
15:04:07.043 TRACE lapin::io_loop: wrote 56 bytes
15:04:07.043 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.043 TRACE lapin::io_loop: io_loop run
15:04:07.043 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.043 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.043 TRACE lapin::io_loop: read 13 bytes
15:04:07.043 TRACE lapin::channels: will handle frame frame=Method(0, Connection(OpenOk(OpenOk)))
15:04:07.044 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.044 TRACE lapin::io_loop: io_loop run
15:04:07.044 DEBUG test_service_error: lapin::channels: create channel
15:04:07.044 DEBUG test_service_error: lapin::channels: create channel id=1
15:04:07.044 TRACE test_service_error: lapin::channel: send_frame channel=1
15:04:07.044 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ChannelOpenOk(Pinky, Channel { id: 1, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ChannelStatus { state: Initial, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connected, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }))
15:04:07.044 TRACE test_service_error: lapin::channel: wake channel=1
15:04:07.045 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=true
15:04:07.045 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.045 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Open(Open)))
15:04:07.045 TRACE lapin::io_loop: wrote 13 bytes
15:04:07.045 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.045 TRACE lapin::io_loop: io_loop run
15:04:07.045 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.046 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.046 TRACE lapin::io_loop: read 16 bytes
15:04:07.046 TRACE lapin::channels: will handle frame frame=Method(1, Channel(OpenOk(OpenOk)))
15:04:07.046 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.046 TRACE lapin::io_loop: io_loop run
15:04:07.046 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.046 TRACE test_service_error: lapin::channel: send_frame channel=1
15:04:07.046 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ExchangeDeclareOk(Pinky, ShortString("services"), Topic, ExchangeDeclareOptions { passive: false, durable: false, auto_delete: false, internal: false, nowait: false }, FieldTable({})))
15:04:07.046 TRACE test_service_error: lapin::channel: wake channel=1
15:04:07.046 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.046 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Exchange(Declare(Declare { exchange: ShortString("services"), kind: ShortString("topic"), passive: false, durable: false, auto_delete: false, internal: false, nowait: false, arguments: FieldTable({}) })))
15:04:07.047 TRACE lapin::io_loop: wrote 34 bytes
15:04:07.047 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.047 TRACE lapin::io_loop: io_loop run
15:04:07.047 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.047 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.047 TRACE lapin::io_loop: read 12 bytes
15:04:07.047 TRACE lapin::channels: will handle frame frame=Method(1, Exchange(DeclareOk(DeclareOk)))
15:04:07.047 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.047 TRACE lapin::io_loop: io_loop run
15:04:07.047 TRACE test_service_error: lapin::channel: send_frame channel=1
15:04:07.047 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ChannelCloseOk(Pinky))
15:04:07.047 TRACE test_service_error: lapin::channel: wake channel=1
15:04:07.047 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.047 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.047 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Close(Close { reply_code: 0, reply_text: ShortString("Normal Shutdown"), class_id: 0, method_id: 0 })))
15:04:07.047 TRACE lapin::io_loop: wrote 34 bytes
15:04:07.048 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.048 TRACE lapin::io_loop: io_loop run
15:04:07.048 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.048 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.048 TRACE lapin::io_loop: read 12 bytes
15:04:07.048 TRACE lapin::channels: will handle frame frame=Method(1, Channel(CloseOk(CloseOk)))
15:04:07.048 TRACE lapin::internal_rpc: Queuing internal RPC command command=RemoveChannel(1, InvalidChannelState(Closed))
15:04:07.048 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.048 TRACE lapin::io_loop: io_loop run
15:04:07.048 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.048 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.048 TRACE lapin::internal_rpc: Handling internal RPC command command=RemoveChannel(1, InvalidChannelState(Closed))
15:04:07.048 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.048 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.048 TRACE lapin::io_loop: io_loop run
15:04:07.048 DEBUG test_service_error: amqp-rpc: Add service server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld"
15:04:07.048 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.048 DEBUG test_service_error: lapin::channels: create channel
15:04:07.048 DEBUG test_service_error: lapin::channels: create channel id=2
15:04:07.048 TRACE test_service_error: lapin::channel: send_frame channel=2
15:04:07.048 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(ChannelOpenOk(Pinky, Channel { id: 2, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ChannelStatus { state: Initial, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connected, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }))
15:04:07.048 TRACE test_service_error: lapin::channel: wake channel=2
15:04:07.049 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.049 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Open(Open)))
15:04:07.049 TRACE lapin::io_loop: wrote 13 bytes
15:04:07.049 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.049 TRACE lapin::io_loop: io_loop run
15:04:07.049 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.049 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.050 TRACE lapin::io_loop: read 16 bytes
15:04:07.050 TRACE lapin::channels: will handle frame frame=Method(2, Channel(OpenOk(OpenOk)))
15:04:07.050 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.050 TRACE lapin::io_loop: io_loop run
15:04:07.050 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.050 TRACE test_service_error: lapin::channel: send_frame channel=2
15:04:07.050 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(BasicQosOk(Pinky))
15:04:07.050 TRACE test_service_error: lapin::channel: wake channel=2
15:04:07.050 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.050 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Qos(Qos { prefetch_count: 5, global: false })))
15:04:07.050 TRACE lapin::io_loop: wrote 19 bytes
15:04:07.050 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.050 TRACE lapin::io_loop: io_loop run
15:04:07.050 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.050 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.050 TRACE lapin::io_loop: read 12 bytes
15:04:07.050 TRACE lapin::channels: will handle frame frame=Method(2, Basic(QosOk(QosOk)))
15:04:07.050 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.050 TRACE lapin::io_loop: io_loop run
15:04:07.050 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.050 DEBUG test_service_error: amqp-rpc: Declare queue server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld" queue_name="services.helloworld.HelloWorld"
15:04:07.050 TRACE test_service_error: lapin::channel: send_frame channel=2
15:04:07.050 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(QueueDeclareOk(Pinky, QueueDeclareOptions { passive: false, durable: false, exclusive: false, auto_delete: false, nowait: false }, FieldTable({ShortString("x-max-priority"): LongInt(5)})))
15:04:07.050 TRACE test_service_error: lapin::channel: wake channel=2
15:04:07.050 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.050 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Queue(Declare(Declare { queue: ShortString("services.helloworld.HelloWorld"), passive: false, durable: false, exclusive: false, auto_delete: false, nowait: false, arguments: FieldTable({ShortString("x-max-priority"): LongInt(5)}) })))
15:04:07.051 TRACE lapin::io_loop: wrote 70 bytes
15:04:07.051 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.051 TRACE lapin::io_loop: io_loop run
15:04:07.051 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.064 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.064 TRACE lapin::io_loop: read 51 bytes
15:04:07.064 TRACE lapin::channels: will handle frame frame=Method(2, Queue(DeclareOk(DeclareOk { queue: ShortString("services.helloworld.HelloWorld"), message_count: 0, consumer_count: 0 })))
15:04:07.065 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.065 TRACE lapin::io_loop: io_loop run
15:04:07.065 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.065 DEBUG test_service_error: amqp-rpc: Bind queue server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld" queue_name="services.helloworld.HelloWorld"
15:04:07.065 TRACE test_service_error: lapin::channel: send_frame channel=2
15:04:07.065 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(QueueBindOk(Pinky, ShortString("services.helloworld.HelloWorld"), ShortString("services"), ShortString("helloworld.HelloWorld"), FieldTable({})))
15:04:07.065 TRACE test_service_error: lapin::channel: wake channel=2
15:04:07.065 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.065 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Queue(Bind(Bind { queue: ShortString("services.helloworld.HelloWorld"), exchange: ShortString("services"), routing_key: ShortString("helloworld.HelloWorld"), nowait: false, arguments: FieldTable({}) })))
15:04:07.065 TRACE lapin::io_loop: wrote 81 bytes
15:04:07.065 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.065 TRACE lapin::io_loop: io_loop run
15:04:07.065 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.067 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.067 TRACE lapin::io_loop: read 12 bytes
15:04:07.067 TRACE lapin::channels: will handle frame frame=Method(2, Queue(BindOk(BindOk)))
15:04:07.067 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.067 TRACE lapin::io_loop: io_loop run
15:04:07.067 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.067 DEBUG test_service_error: amqp-rpc: Consume queue server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld" consumer_tag="ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca"
15:04:07.067 TRACE test_service_error: lapin::channel: send_frame channel=2
15:04:07.067 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(BasicConsumeOk(Pinky, Some(ChannelCloser { id: 2, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true } }), ShortString("services.helloworld.HelloWorld"), BasicConsumeOptions { no_local: false, no_ack: false, exclusive: false, nowait: false }, FieldTable({}), None))
15:04:07.067 TRACE test_service_error: lapin::channel: wake channel=2
15:04:07.067 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.067 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Consume(Consume { queue: ShortString("services.helloworld.HelloWorld"), consumer_tag: ShortString("ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca"), no_local: false, no_ack: false, exclusive: false, nowait: false, arguments: FieldTable({}) })))
15:04:07.067 TRACE lapin::io_loop: wrote 93 bytes
15:04:07.067 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.067 TRACE lapin::io_loop: io_loop run
15:04:07.067 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.068 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.068 TRACE lapin::io_loop: read 55 bytes
15:04:07.068 TRACE lapin::channels: will handle frame frame=Method(2, Basic(ConsumeOk(ConsumeOk { consumer_tag: ShortString("ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca") })))
15:04:07.068 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.068 TRACE lapin::io_loop: io_loop run
15:04:07.068 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.068 TRACE lapin::consumer: consumer poll_next
15:04:07.068 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:07.068 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:07.068  INFO test_service_error: amqp-rpc: Start amqp-rpc client factory app_id=AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0") rabbitmq_uri="amqp://172.17.0.3/test_02eec5c7-06ef-476f-968c-5d0338388cdf"
15:04:07.068 DEBUG test_service_error: lapin::channels: create channel id=0
15:04:07.068 TRACE test_service_error: lapin::channel: send_frame channel=0
15:04:07.068 TRACE test_service_error: lapin::channel: wake channel=0
15:04:07.070 TRACE lapin::io_loop: io_loop run
15:04:07.070 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.070 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.070 TRACE lapin::io_loop: io_loop do_run can_read=true can_write=true has_data=true
15:04:07.070 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::ProtocolHeader(0.9.1)
15:04:07.070 TRACE lapin::io_loop: wrote 8 bytes
15:04:07.070 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.070 TRACE lapin::io_loop: io_loop run
15:04:07.070 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.073 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.073 TRACE lapin::io_loop: read 523 bytes
15:04:07.073 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Start(Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) })))
15:04:07.073 TRACE lapin::channel: Server sent connection::Start method=Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) }
15:04:07.073 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.073 TRACE lapin::io_loop: io_loop run
15:04:07.073 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.073 TRACE lapin::channel: send_frame channel=0
15:04:07.073 TRACE lapin::channel: wake channel=0
15:04:07.073 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.073 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(StartOk(StartOk { client_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("platform"): LongString(LongString([114, 117, 115, 116])), ShortString("product"): LongString(LongString([108, 97, 112, 105, 110])), ShortString("version"): LongString(LongString([50, 46, 53, 46, 48]))}), mechanism: ShortString("PLAIN"), response: LongString([0, 103, 117, 101, 115, 116, 0, 103, 117, 101, 115, 116]), locale: ShortString("en_US") })))
15:04:07.073 TRACE lapin::io_loop: wrote 315 bytes
15:04:07.073 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.073 TRACE lapin::io_loop: io_loop run
15:04:07.073 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.073 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.073 TRACE lapin::io_loop: read 20 bytes
15:04:07.074 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Tune(Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })))
15:04:07.074 TRACE lapin::channel: Server sent Connection::Tune method=Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 }
15:04:07.074 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.074 TRACE lapin::io_loop: io_loop run
15:04:07.074 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.074 TRACE lapin::channel: send_frame channel=0
15:04:07.074 TRACE lapin::channel: wake channel=0
15:04:07.074 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.074 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(TuneOk(TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })))
15:04:07.074 TRACE lapin::io_loop: wrote 20 bytes
15:04:07.074 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.074 TRACE lapin::io_loop: io_loop run
15:04:07.074 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.074 TRACE lapin::channel: send_frame channel=0
15:04:07.074 TRACE lapin::frames: state is now waiting channel=0 expected_reply=ExpectedReply(ConnectionOpenOk(Pinky, Connection { configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, channels: Channels { channels: [Channel { id: 0, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }], channel_id: IdSequence { allow_zero: false, max: None, id: 0 }, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, frames: Frames, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, error_handler: ErrorHandler } }))
15:04:07.074 TRACE lapin::channel: wake channel=0
15:04:07.074 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.074 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(Open(Open { virtual_host: ShortString("test_02eec5c7-06ef-476f-968c-5d0338388cdf") })))
15:04:07.074 TRACE lapin::io_loop: wrote 56 bytes
15:04:07.074 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.074 TRACE lapin::io_loop: io_loop run
15:04:07.074 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.075 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.075 TRACE lapin::io_loop: read 13 bytes
15:04:07.075 TRACE lapin::channels: will handle frame frame=Method(0, Connection(OpenOk(OpenOk)))
15:04:07.075 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
15:04:07.075 TRACE lapin::io_loop: io_loop run
15:04:07.075 DEBUG test_service_error: lapin::channels: create channel
15:04:07.075 DEBUG test_service_error: lapin::channels: create channel id=1
15:04:07.075 TRACE test_service_error: lapin::channel: send_frame channel=1
15:04:07.075 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ChannelOpenOk(Pinky, Channel { id: 1, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ChannelStatus { state: Initial, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connected, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }))
15:04:07.075 TRACE test_service_error: lapin::channel: wake channel=1
15:04:07.078 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=true
15:04:07.078 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.078 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Open(Open)))
15:04:07.078 TRACE lapin::io_loop: wrote 13 bytes
15:04:07.078 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.078 TRACE lapin::io_loop: io_loop run
15:04:07.078 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.079 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.079 TRACE lapin::io_loop: read 16 bytes
15:04:07.079 TRACE lapin::channels: will handle frame frame=Method(1, Channel(OpenOk(OpenOk)))
15:04:07.079 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.079 TRACE lapin::io_loop: io_loop run
15:04:07.079 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.079 TRACE test_service_error: lapin::channel: send_frame channel=1
15:04:07.079 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(QueueDeclareOk(Pinky, QueueDeclareOptions { passive: false, durable: false, exclusive: true, auto_delete: false, nowait: false }, FieldTable({})))
15:04:07.079 TRACE test_service_error: lapin::channel: wake channel=1
15:04:07.079 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.079 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Queue(Declare(Declare { queue: ShortString("amq.rabbitmq.reply-to"), passive: false, durable: false, exclusive: true, auto_delete: false, nowait: false, arguments: FieldTable({}) })))
15:04:07.079 TRACE lapin::io_loop: wrote 41 bytes
15:04:07.079 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.079 TRACE lapin::io_loop: io_loop run
15:04:07.079 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.080 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.080 TRACE lapin::io_loop: read 42 bytes
15:04:07.080 TRACE lapin::channels: will handle frame frame=Method(1, Queue(DeclareOk(DeclareOk { queue: ShortString("amq.rabbitmq.reply-to"), message_count: 0, consumer_count: 1 })))
15:04:07.080 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.080 TRACE lapin::io_loop: io_loop run
15:04:07.080 TRACE test_service_error: lapin::channel: send_frame channel=1
15:04:07.080 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(BasicConsumeOk(Pinky, Some(ChannelCloser { id: 1, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true } }), ShortString("amq.rabbitmq.reply-to"), BasicConsumeOptions { no_local: false, no_ack: true, exclusive: false, nowait: false }, FieldTable({}), None))
15:04:07.080 TRACE test_service_error: lapin::channel: wake channel=1
15:04:07.080 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.080 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.080 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Consume(Consume { queue: ShortString("amq.rabbitmq.reply-to"), consumer_tag: ShortString("ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f"), no_local: false, no_ack: true, exclusive: false, nowait: false, arguments: FieldTable({}) })))
15:04:07.080 TRACE lapin::io_loop: wrote 84 bytes
15:04:07.080 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.080 TRACE lapin::io_loop: io_loop run
15:04:07.080 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.081 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.081 TRACE lapin::io_loop: read 55 bytes
15:04:07.081 TRACE lapin::channels: will handle frame frame=Method(1, Basic(ConsumeOk(ConsumeOk { consumer_tag: ShortString("ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f") })))
15:04:07.081 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.081 TRACE lapin::io_loop: io_loop run
15:04:07.081 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.081 TRACE lapin::consumer: consumer poll_next
15:04:07.081 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f
15:04:07.081 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f
15:04:07.081 DEBUG test_service_error: amqp-rpc: Send request request=Request { context: Context { id: Some(1), app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), priority: 0, correlation_id: "53fc3ca5-bd51-11ef-ab3e-0682ef4da55f", user_id: None, message_kind: Service { method: "Hello", service: "helloworld.HelloWorld" }, reply_to: Some("amq.rabbitmq.reply-to"), self_created_time: 1734534247, request_created_time: 1734534247 }, timeout: 30s, server_timeout: None }
15:04:07.081 TRACE test_service_error: lapin::channel: send_frames channel=1
15:04:07.081 TRACE test_service_error: lapin::channel: wake channel=1
15:04:07.081 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.081 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Publish(Publish { exchange: ShortString("services"), routing_key: ShortString("helloworld.HelloWorld"), mandatory: false, immediate: false })))
15:04:07.081 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Header
15:04:07.081 TRACE lapin::io_loop: wrote 251 bytes
15:04:07.081 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.081 TRACE lapin::io_loop: io_loop run
15:04:07.081 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.082 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:07.082 TRACE lapin::io_loop: read 370 bytes
15:04:07.082 TRACE lapin::channels: will handle frame frame=Method(2, Basic(Deliver(Deliver { consumer_tag: ShortString("ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca"), delivery_tag: 1, redelivered: false, exchange: ShortString("services"), routing_key: ShortString("helloworld.HelloWorld") })))
15:04:07.082 TRACE lapin::channels: will handle frame frame=Header(2, 60, AMQPContentHeader { class_id: 60, body_size: 0, properties: AMQPProperties { content_type: Some(ShortString("application/x-protobuf")), content_encoding: None, headers: Some(FieldTable({})), delivery_mode: Some(2), priority: Some(0), correlation_id: Some(ShortString("53fc3ca5-bd51-11ef-ab3e-0682ef4da55f")), reply_to: Some(ShortString("amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw==")), expiration: Some(ShortString("30000")), message_id: Some(ShortString("1")), timestamp: Some(1734534247), kind: Some(ShortString("helloworld.HelloWorld:Hello")), user_id: None, app_id: Some(ShortString("pid-7817@runner-bbf8mlut-project-2833-concurrent-0")), cluster_id: None } })
15:04:07.082 TRACE lapin::consumer: new_delivery consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:07.082 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.082 TRACE lapin::io_loop: io_loop run
15:04:07.082 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.082 TRACE lapin::consumer: consumer poll_next
15:04:07.082 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:07.083 TRACE lapin::consumer: delivery consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca delivery_tag=1
15:04:07.083 TRACE lapin::consumer: consumer poll_next
15:04:07.083 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:07.083 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:07.083  INFO request{correlation_id=53fc3ca5-bd51-11ef-ab3e-0682ef4da55f user_id=None method=Service { method: "Hello", service: "helloworld.HelloWorld" }}: amqp-rpc: New request request=Request { context: Context { id: Some(1), app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), priority: 0, correlation_id: "53fc3ca5-bd51-11ef-ab3e-0682ef4da55f", user_id: None, message_kind: Service { method: "Hello", service: "helloworld.HelloWorld" }, reply_to: Some("amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw=="), self_created_time: 1734534247, request_created_time: 1734534247 }, timeout: 300s }
15:04:07.083 DEBUG amqp-rpc: Send response response=AMQPResponse { routing_key: "amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw==", properties: AMQPProperties { content_type: Some(ShortString("application/x-protobuf")), content_encoding: None, headers: Some(FieldTable({ShortString("request_received_time"): LongString(LongString([49, 55, 51, 52, 53, 51, 52, 50, 52, 55])), ShortString("response_sent_time"): LongString(LongString([49, 55, 51, 52, 53, 51, 52, 50, 52, 55]))})), delivery_mode: Some(2), priority: Some(0), correlation_id: Some(ShortString("53fc3ca5-bd51-11ef-ab3e-0682ef4da55f")), reply_to: None, expiration: None, message_id: Some(ShortString("1")), timestamp: Some(1734534247), kind: Some(ShortString("helloworld.HelloWorld:Hello")), user_id: None, app_id: Some(ShortString("pid-7817@runner-bbf8mlut-project-2833-concurrent-0")), cluster_id: None }, payload: 15 }
15:04:07.083 TRACE lapin::channel: send_frames channel=2
15:04:07.083 TRACE lapin::channel: wake channel=2
15:04:07.083 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Publish(Publish { exchange: ShortString(""), routing_key: ShortString("amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw=="), mandatory: false, immediate: false })))
15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Header
15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Body
15:04:07.083 TRACE lapin::io_loop: wrote 379 bytes
15:04:07.083 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.083 TRACE lapin::io_loop: io_loop run
15:04:07.083 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.083 TRACE lapin::publisher_confirm: PublisherConfirm dropped without use, registering it for wait_for_confirms
15:04:07.083 TRACE lapin::returned_messages: Dropped PublisherConfirm was ready but didn't carry a message, discarding
15:04:07.083 TRACE lapin::channel: send_frame channel=2
15:04:07.083 TRACE lapin::channel: wake channel=2
15:04:07.083 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Ack(Ack { delivery_tag: 1, multiple: false })))
15:04:07.083 TRACE lapin::io_loop: wrote 21 bytes
15:04:07.083 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:07.083 TRACE lapin::io_loop: io_loop run
15:04:07.083 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:07.083 TRACE lapin::consumer: consumer poll_next
15:04:07.083 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:07.083 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca
15:04:17.044 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:17.044 TRACE lapin::io_loop: read 8 bytes
15:04:17.044 TRACE lapin::channels: will handle frame frame=Heartbeat(0)
15:04:17.044 DEBUG lapin::channels: received heartbeat from server
15:04:17.044 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:17.044 TRACE lapin::io_loop: io_loop run
15:04:17.044 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:17.084 DEBUG lapin::channels: send heartbeat
15:04:17.084 TRACE lapin::channel: send_frame channel=0
15:04:17.084 TRACE lapin::channel: wake channel=0
15:04:17.084 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:17.084 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Heartbeat
15:04:17.084 TRACE lapin::io_loop: wrote 8 bytes
15:04:17.084 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:17.084 TRACE lapin::io_loop: io_loop run
15:04:17.084 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:27.045 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:27.045 TRACE lapin::io_loop: read 8 bytes
15:04:27.045 TRACE lapin::channels: will handle frame frame=Heartbeat(0)
15:04:27.045 DEBUG lapin::channels: received heartbeat from server
15:04:27.045 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:27.045 TRACE lapin::io_loop: io_loop run
15:04:27.045 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:27.086 DEBUG lapin::channels: send heartbeat
15:04:27.086 TRACE lapin::channel: send_frame channel=0
15:04:27.086 TRACE lapin::channel: wake channel=0
15:04:27.086 TRACE lapin::socket_state: Got event for socket event=Wake
15:04:27.086 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Heartbeat
15:04:27.086 TRACE lapin::io_loop: wrote 8 bytes
15:04:27.086 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:27.086 TRACE lapin::io_loop: io_loop run
15:04:27.086 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:37.046 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:37.046 TRACE lapin::io_loop: read 8 bytes
15:04:37.046 TRACE lapin::channels: will handle frame frame=Heartbeat(0)
15:04:37.046 DEBUG lapin::channels: received heartbeat from server
15:04:37.046 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:37.046 TRACE lapin::io_loop: io_loop run
15:04:37.046 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:37.075 TRACE lapin::socket_state: Got event for socket event=Readable
15:04:37.075 TRACE lapin::io_loop: read 8 bytes
15:04:37.075 TRACE lapin::channels: will handle frame frame=Heartbeat(0)
15:04:37.075 DEBUG lapin::channels: received heartbeat from server
15:04:37.075 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected
15:04:37.075 TRACE lapin::io_loop: io_loop run
15:04:37.075 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
15:04:37.082 DEBUG lapin::channels: send heartbeat
15:04:37.082 TRACE lapin::channel: send_frame channel=0
15:04:37.082 TRACE lapin::channel: wake channel=0
15:04:37.082 TRACE lapin::socket_state: Got event for socket event=Wake
thread 'test_service_erro15:04:37.082 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Heartbeat
r' panicked at ptms-amqp-rpc/tests/test_helloworld/tests/test_service.rs:90:10:
called `Result::unwrap()` on an `Err` value: ClientTimeout
@ikrivosheev ikrivosheev changed the title Lost AMQP from amq.rabbitmq.reply-to Lost message from amq.rabbitmq.reply-to Dec 18, 2024
@ikrivosheev ikrivosheev changed the title Lost message from amq.rabbitmq.reply-to Sometimes lost message from amq.rabbitmq.reply-to Dec 18, 2024
@michaelklishin
Copy link

I don't see any evidence that this publisher uses publisher confirms. This approach to publisher is not safe by definition.

Direct Reply-to relies on channel state, that is, a single Erlang process on a single node.
Again, it does not offer much in terms of data safety.

A library such as Lapin cannot possibly address either of those problems.

If you want higher data safety guarantees, use relevant data safety features and a replicated queue type (one per request-reply client).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants