We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
amq.rabbitmq.reply-to
Hello! Thank you for the library! I have very strange issue: sometimes i don't get message from queue. My flow:
server
client
How it works:
basic_publish
Here is logs:
15:03:57.774 DEBUG test_service_error: ptms_test_utils::container: Start starting container container=ContainerBuilder { name: "ptms_rabbitmq_e3f07976-5022-4218-adae-bef40aac84e9", image: "rabbitmq:3.12.14-management" } 15:04:07.014 WARN test_service_error: ptms_test_utils::container::rabbitmq: RabbitMQ container container=Container { name: "ptms_rabbitmq_e3f07976-5022-4218-adae-bef40aac84e9", image: "rabbitmq:3.12.14-management" } ip_addr=172.17.0.3 15:04:07.036 INFO test_service_error: amqp-rpc: Start amqp-rpc server app_id=AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0") rabbitmq_uri="amqp://172.17.0.3/test_02eec5c7-06ef-476f-968c-5d0338388cdf" 15:04:07.036 DEBUG test_service_error: lapin::channels: create channel id=0 15:04:07.036 TRACE test_service_error: lapin::channel: send_frame channel=0 15:04:07.036 TRACE test_service_error: lapin::channel: wake channel=0 15:04:07.039 TRACE lapin::io_loop: io_loop run 15:04:07.039 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.039 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.039 TRACE lapin::io_loop: io_loop do_run can_read=true can_write=true has_data=true 15:04:07.039 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::ProtocolHeader(0.9.1) 15:04:07.039 TRACE lapin::io_loop: wrote 8 bytes 15:04:07.039 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.039 TRACE lapin::io_loop: io_loop run 15:04:07.039 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.042 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.042 TRACE lapin::io_loop: read 523 bytes 15:04:07.042 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Start(Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) }))) 15:04:07.042 TRACE lapin::channel: Server sent connection::Start method=Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) } 15:04:07.042 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.042 TRACE lapin::io_loop: io_loop run 15:04:07.042 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.042 TRACE lapin::channel: send_frame channel=0 15:04:07.042 TRACE lapin::channel: wake channel=0 15:04:07.042 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.042 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(StartOk(StartOk { client_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("platform"): LongString(LongString([114, 117, 115, 116])), ShortString("product"): LongString(LongString([108, 97, 112, 105, 110])), ShortString("version"): LongString(LongString([50, 46, 53, 46, 48]))}), mechanism: ShortString("PLAIN"), response: LongString([0, 103, 117, 101, 115, 116, 0, 103, 117, 101, 115, 116]), locale: ShortString("en_US") }))) 15:04:07.042 TRACE lapin::io_loop: wrote 315 bytes 15:04:07.042 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.042 TRACE lapin::io_loop: io_loop run 15:04:07.042 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.042 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.043 TRACE lapin::io_loop: read 20 bytes 15:04:07.043 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Tune(Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 }))) 15:04:07.043 TRACE lapin::channel: Server sent Connection::Tune method=Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 } 15:04:07.043 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.043 TRACE lapin::io_loop: io_loop run 15:04:07.043 TRACE lapin::channel: send_frame channel=0 15:04:07.043 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.043 TRACE lapin::channel: wake channel=0 15:04:07.043 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.043 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(TuneOk(TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 20 }))) 15:04:07.043 TRACE lapin::io_loop: wrote 20 bytes 15:04:07.043 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.043 TRACE lapin::io_loop: io_loop run 15:04:07.043 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.043 TRACE lapin::channel: send_frame channel=0 15:04:07.043 TRACE lapin::frames: state is now waiting channel=0 expected_reply=ExpectedReply(ConnectionOpenOk(Pinky, Connection { configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, channels: Channels { channels: [Channel { id: 0, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }], channel_id: IdSequence { allow_zero: false, max: None, id: 0 }, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, frames: Frames, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, error_handler: ErrorHandler } })) 15:04:07.043 TRACE lapin::channel: wake channel=0 15:04:07.043 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.043 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(Open(Open { virtual_host: ShortString("test_02eec5c7-06ef-476f-968c-5d0338388cdf") }))) 15:04:07.043 TRACE lapin::io_loop: wrote 56 bytes 15:04:07.043 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.043 TRACE lapin::io_loop: io_loop run 15:04:07.043 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.043 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.043 TRACE lapin::io_loop: read 13 bytes 15:04:07.043 TRACE lapin::channels: will handle frame frame=Method(0, Connection(OpenOk(OpenOk))) 15:04:07.044 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.044 TRACE lapin::io_loop: io_loop run 15:04:07.044 DEBUG test_service_error: lapin::channels: create channel 15:04:07.044 DEBUG test_service_error: lapin::channels: create channel id=1 15:04:07.044 TRACE test_service_error: lapin::channel: send_frame channel=1 15:04:07.044 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ChannelOpenOk(Pinky, Channel { id: 1, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ChannelStatus { state: Initial, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connected, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames })) 15:04:07.044 TRACE test_service_error: lapin::channel: wake channel=1 15:04:07.045 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=true 15:04:07.045 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.045 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Open(Open))) 15:04:07.045 TRACE lapin::io_loop: wrote 13 bytes 15:04:07.045 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.045 TRACE lapin::io_loop: io_loop run 15:04:07.045 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.046 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.046 TRACE lapin::io_loop: read 16 bytes 15:04:07.046 TRACE lapin::channels: will handle frame frame=Method(1, Channel(OpenOk(OpenOk))) 15:04:07.046 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.046 TRACE lapin::io_loop: io_loop run 15:04:07.046 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.046 TRACE test_service_error: lapin::channel: send_frame channel=1 15:04:07.046 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ExchangeDeclareOk(Pinky, ShortString("services"), Topic, ExchangeDeclareOptions { passive: false, durable: false, auto_delete: false, internal: false, nowait: false }, FieldTable({}))) 15:04:07.046 TRACE test_service_error: lapin::channel: wake channel=1 15:04:07.046 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.046 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Exchange(Declare(Declare { exchange: ShortString("services"), kind: ShortString("topic"), passive: false, durable: false, auto_delete: false, internal: false, nowait: false, arguments: FieldTable({}) }))) 15:04:07.047 TRACE lapin::io_loop: wrote 34 bytes 15:04:07.047 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.047 TRACE lapin::io_loop: io_loop run 15:04:07.047 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.047 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.047 TRACE lapin::io_loop: read 12 bytes 15:04:07.047 TRACE lapin::channels: will handle frame frame=Method(1, Exchange(DeclareOk(DeclareOk))) 15:04:07.047 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.047 TRACE lapin::io_loop: io_loop run 15:04:07.047 TRACE test_service_error: lapin::channel: send_frame channel=1 15:04:07.047 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ChannelCloseOk(Pinky)) 15:04:07.047 TRACE test_service_error: lapin::channel: wake channel=1 15:04:07.047 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.047 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.047 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Close(Close { reply_code: 0, reply_text: ShortString("Normal Shutdown"), class_id: 0, method_id: 0 }))) 15:04:07.047 TRACE lapin::io_loop: wrote 34 bytes 15:04:07.048 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.048 TRACE lapin::io_loop: io_loop run 15:04:07.048 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.048 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.048 TRACE lapin::io_loop: read 12 bytes 15:04:07.048 TRACE lapin::channels: will handle frame frame=Method(1, Channel(CloseOk(CloseOk))) 15:04:07.048 TRACE lapin::internal_rpc: Queuing internal RPC command command=RemoveChannel(1, InvalidChannelState(Closed)) 15:04:07.048 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.048 TRACE lapin::io_loop: io_loop run 15:04:07.048 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.048 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.048 TRACE lapin::internal_rpc: Handling internal RPC command command=RemoveChannel(1, InvalidChannelState(Closed)) 15:04:07.048 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.048 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.048 TRACE lapin::io_loop: io_loop run 15:04:07.048 DEBUG test_service_error: amqp-rpc: Add service server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld" 15:04:07.048 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.048 DEBUG test_service_error: lapin::channels: create channel 15:04:07.048 DEBUG test_service_error: lapin::channels: create channel id=2 15:04:07.048 TRACE test_service_error: lapin::channel: send_frame channel=2 15:04:07.048 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(ChannelOpenOk(Pinky, Channel { id: 2, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 20 }, status: ChannelStatus { state: Initial, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connected, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames })) 15:04:07.048 TRACE test_service_error: lapin::channel: wake channel=2 15:04:07.049 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.049 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Open(Open))) 15:04:07.049 TRACE lapin::io_loop: wrote 13 bytes 15:04:07.049 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.049 TRACE lapin::io_loop: io_loop run 15:04:07.049 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.049 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.050 TRACE lapin::io_loop: read 16 bytes 15:04:07.050 TRACE lapin::channels: will handle frame frame=Method(2, Channel(OpenOk(OpenOk))) 15:04:07.050 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.050 TRACE lapin::io_loop: io_loop run 15:04:07.050 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.050 TRACE test_service_error: lapin::channel: send_frame channel=2 15:04:07.050 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(BasicQosOk(Pinky)) 15:04:07.050 TRACE test_service_error: lapin::channel: wake channel=2 15:04:07.050 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.050 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Qos(Qos { prefetch_count: 5, global: false }))) 15:04:07.050 TRACE lapin::io_loop: wrote 19 bytes 15:04:07.050 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.050 TRACE lapin::io_loop: io_loop run 15:04:07.050 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.050 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.050 TRACE lapin::io_loop: read 12 bytes 15:04:07.050 TRACE lapin::channels: will handle frame frame=Method(2, Basic(QosOk(QosOk))) 15:04:07.050 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.050 TRACE lapin::io_loop: io_loop run 15:04:07.050 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.050 DEBUG test_service_error: amqp-rpc: Declare queue server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld" queue_name="services.helloworld.HelloWorld" 15:04:07.050 TRACE test_service_error: lapin::channel: send_frame channel=2 15:04:07.050 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(QueueDeclareOk(Pinky, QueueDeclareOptions { passive: false, durable: false, exclusive: false, auto_delete: false, nowait: false }, FieldTable({ShortString("x-max-priority"): LongInt(5)}))) 15:04:07.050 TRACE test_service_error: lapin::channel: wake channel=2 15:04:07.050 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.050 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Queue(Declare(Declare { queue: ShortString("services.helloworld.HelloWorld"), passive: false, durable: false, exclusive: false, auto_delete: false, nowait: false, arguments: FieldTable({ShortString("x-max-priority"): LongInt(5)}) }))) 15:04:07.051 TRACE lapin::io_loop: wrote 70 bytes 15:04:07.051 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.051 TRACE lapin::io_loop: io_loop run 15:04:07.051 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.064 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.064 TRACE lapin::io_loop: read 51 bytes 15:04:07.064 TRACE lapin::channels: will handle frame frame=Method(2, Queue(DeclareOk(DeclareOk { queue: ShortString("services.helloworld.HelloWorld"), message_count: 0, consumer_count: 0 }))) 15:04:07.065 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.065 TRACE lapin::io_loop: io_loop run 15:04:07.065 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.065 DEBUG test_service_error: amqp-rpc: Bind queue server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld" queue_name="services.helloworld.HelloWorld" 15:04:07.065 TRACE test_service_error: lapin::channel: send_frame channel=2 15:04:07.065 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(QueueBindOk(Pinky, ShortString("services.helloworld.HelloWorld"), ShortString("services"), ShortString("helloworld.HelloWorld"), FieldTable({}))) 15:04:07.065 TRACE test_service_error: lapin::channel: wake channel=2 15:04:07.065 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.065 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Queue(Bind(Bind { queue: ShortString("services.helloworld.HelloWorld"), exchange: ShortString("services"), routing_key: ShortString("helloworld.HelloWorld"), nowait: false, arguments: FieldTable({}) }))) 15:04:07.065 TRACE lapin::io_loop: wrote 81 bytes 15:04:07.065 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.065 TRACE lapin::io_loop: io_loop run 15:04:07.065 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.067 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.067 TRACE lapin::io_loop: read 12 bytes 15:04:07.067 TRACE lapin::channels: will handle frame frame=Method(2, Queue(BindOk(BindOk))) 15:04:07.067 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.067 TRACE lapin::io_loop: io_loop run 15:04:07.067 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.067 DEBUG test_service_error: amqp-rpc: Consume queue server=Application { app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), services: [] } service="helloworld.HelloWorld" consumer_tag="ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca" 15:04:07.067 TRACE test_service_error: lapin::channel: send_frame channel=2 15:04:07.067 TRACE test_service_error: lapin::frames: state is now waiting channel=2 expected_reply=ExpectedReply(BasicConsumeOk(Pinky, Some(ChannelCloser { id: 2, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true } }), ShortString("services.helloworld.HelloWorld"), BasicConsumeOptions { no_local: false, no_ack: false, exclusive: false, nowait: false }, FieldTable({}), None)) 15:04:07.067 TRACE test_service_error: lapin::channel: wake channel=2 15:04:07.067 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.067 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Consume(Consume { queue: ShortString("services.helloworld.HelloWorld"), consumer_tag: ShortString("ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca"), no_local: false, no_ack: false, exclusive: false, nowait: false, arguments: FieldTable({}) }))) 15:04:07.067 TRACE lapin::io_loop: wrote 93 bytes 15:04:07.067 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.067 TRACE lapin::io_loop: io_loop run 15:04:07.067 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.068 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.068 TRACE lapin::io_loop: read 55 bytes 15:04:07.068 TRACE lapin::channels: will handle frame frame=Method(2, Basic(ConsumeOk(ConsumeOk { consumer_tag: ShortString("ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca") }))) 15:04:07.068 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.068 TRACE lapin::io_loop: io_loop run 15:04:07.068 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.068 TRACE lapin::consumer: consumer poll_next 15:04:07.068 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:07.068 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:07.068 INFO test_service_error: amqp-rpc: Start amqp-rpc client factory app_id=AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0") rabbitmq_uri="amqp://172.17.0.3/test_02eec5c7-06ef-476f-968c-5d0338388cdf" 15:04:07.068 DEBUG test_service_error: lapin::channels: create channel id=0 15:04:07.068 TRACE test_service_error: lapin::channel: send_frame channel=0 15:04:07.068 TRACE test_service_error: lapin::channel: wake channel=0 15:04:07.070 TRACE lapin::io_loop: io_loop run 15:04:07.070 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.070 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.070 TRACE lapin::io_loop: io_loop do_run can_read=true can_write=true has_data=true 15:04:07.070 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::ProtocolHeader(0.9.1) 15:04:07.070 TRACE lapin::io_loop: wrote 8 bytes 15:04:07.070 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.070 TRACE lapin::io_loop: io_loop run 15:04:07.070 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.073 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.073 TRACE lapin::io_loop: read 523 bytes 15:04:07.073 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Start(Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) }))) 15:04:07.073 TRACE lapin::channel: Server sent connection::Start method=Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 55, 53, 54, 51, 54, 51, 49, 54, 98, 98, 49, 99])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 52, 32, 66, 114, 111, 97, 100, 99, 111, 109, 32, 73, 110, 99, 32, 97, 110, 100, 47, 111, 114, 32, 105, 116, 115, 32, 115, 117, 98, 115, 105, 100, 105, 97, 114, 105, 101, 115])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 49, 50])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 52]))}), mechanisms: LongString([65, 77, 81, 80, 76, 65, 73, 78, 32, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) } 15:04:07.073 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.073 TRACE lapin::io_loop: io_loop run 15:04:07.073 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.073 TRACE lapin::channel: send_frame channel=0 15:04:07.073 TRACE lapin::channel: wake channel=0 15:04:07.073 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.073 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(StartOk(StartOk { client_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("platform"): LongString(LongString([114, 117, 115, 116])), ShortString("product"): LongString(LongString([108, 97, 112, 105, 110])), ShortString("version"): LongString(LongString([50, 46, 53, 46, 48]))}), mechanism: ShortString("PLAIN"), response: LongString([0, 103, 117, 101, 115, 116, 0, 103, 117, 101, 115, 116]), locale: ShortString("en_US") }))) 15:04:07.073 TRACE lapin::io_loop: wrote 315 bytes 15:04:07.073 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.073 TRACE lapin::io_loop: io_loop run 15:04:07.073 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.073 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.073 TRACE lapin::io_loop: read 20 bytes 15:04:07.074 TRACE lapin::channels: will handle frame frame=Method(0, Connection(Tune(Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 }))) 15:04:07.074 TRACE lapin::channel: Server sent Connection::Tune method=Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 } 15:04:07.074 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.074 TRACE lapin::io_loop: io_loop run 15:04:07.074 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.074 TRACE lapin::channel: send_frame channel=0 15:04:07.074 TRACE lapin::channel: wake channel=0 15:04:07.074 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.074 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(TuneOk(TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 }))) 15:04:07.074 TRACE lapin::io_loop: wrote 20 bytes 15:04:07.074 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.074 TRACE lapin::io_loop: io_loop run 15:04:07.074 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.074 TRACE lapin::channel: send_frame channel=0 15:04:07.074 TRACE lapin::frames: state is now waiting channel=0 expected_reply=ExpectedReply(ConnectionOpenOk(Pinky, Connection { configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, channels: Channels { channels: [Channel { id: 0, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }], channel_id: IdSequence { allow_zero: false, max: None, id: 0 }, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, frames: Frames, connection_status: ConnectionStatus { state: Connecting, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, error_handler: ErrorHandler } })) 15:04:07.074 TRACE lapin::channel: wake channel=0 15:04:07.074 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.074 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(Open(Open { virtual_host: ShortString("test_02eec5c7-06ef-476f-968c-5d0338388cdf") }))) 15:04:07.074 TRACE lapin::io_loop: wrote 56 bytes 15:04:07.074 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.074 TRACE lapin::io_loop: io_loop run 15:04:07.074 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.075 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.075 TRACE lapin::io_loop: read 13 bytes 15:04:07.075 TRACE lapin::channels: will handle frame frame=Method(0, Connection(OpenOk(OpenOk))) 15:04:07.075 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial 15:04:07.075 TRACE lapin::io_loop: io_loop run 15:04:07.075 DEBUG test_service_error: lapin::channels: create channel 15:04:07.075 DEBUG test_service_error: lapin::channels: create channel id=1 15:04:07.075 TRACE test_service_error: lapin::channel: send_frame channel=1 15:04:07.075 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(ChannelOpenOk(Pinky, Channel { id: 1, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ChannelStatus { state: Initial, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connected, vhost: "test_02eec5c7-06ef-476f-968c-5d0338388cdf", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames })) 15:04:07.075 TRACE test_service_error: lapin::channel: wake channel=1 15:04:07.078 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=true 15:04:07.078 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.078 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Channel(Open(Open))) 15:04:07.078 TRACE lapin::io_loop: wrote 13 bytes 15:04:07.078 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.078 TRACE lapin::io_loop: io_loop run 15:04:07.078 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.079 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.079 TRACE lapin::io_loop: read 16 bytes 15:04:07.079 TRACE lapin::channels: will handle frame frame=Method(1, Channel(OpenOk(OpenOk))) 15:04:07.079 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.079 TRACE lapin::io_loop: io_loop run 15:04:07.079 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.079 TRACE test_service_error: lapin::channel: send_frame channel=1 15:04:07.079 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(QueueDeclareOk(Pinky, QueueDeclareOptions { passive: false, durable: false, exclusive: true, auto_delete: false, nowait: false }, FieldTable({}))) 15:04:07.079 TRACE test_service_error: lapin::channel: wake channel=1 15:04:07.079 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.079 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Queue(Declare(Declare { queue: ShortString("amq.rabbitmq.reply-to"), passive: false, durable: false, exclusive: true, auto_delete: false, nowait: false, arguments: FieldTable({}) }))) 15:04:07.079 TRACE lapin::io_loop: wrote 41 bytes 15:04:07.079 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.079 TRACE lapin::io_loop: io_loop run 15:04:07.079 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.080 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.080 TRACE lapin::io_loop: read 42 bytes 15:04:07.080 TRACE lapin::channels: will handle frame frame=Method(1, Queue(DeclareOk(DeclareOk { queue: ShortString("amq.rabbitmq.reply-to"), message_count: 0, consumer_count: 1 }))) 15:04:07.080 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.080 TRACE lapin::io_loop: io_loop run 15:04:07.080 TRACE test_service_error: lapin::channel: send_frame channel=1 15:04:07.080 TRACE test_service_error: lapin::frames: state is now waiting channel=1 expected_reply=ExpectedReply(BasicConsumeOk(Pinky, Some(ChannelCloser { id: 1, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true } }), ShortString("amq.rabbitmq.reply-to"), BasicConsumeOptions { no_local: false, no_ack: true, exclusive: false, nowait: false }, FieldTable({}), None)) 15:04:07.080 TRACE test_service_error: lapin::channel: wake channel=1 15:04:07.080 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.080 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.080 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Consume(Consume { queue: ShortString("amq.rabbitmq.reply-to"), consumer_tag: ShortString("ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f"), no_local: false, no_ack: true, exclusive: false, nowait: false, arguments: FieldTable({}) }))) 15:04:07.080 TRACE lapin::io_loop: wrote 84 bytes 15:04:07.080 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.080 TRACE lapin::io_loop: io_loop run 15:04:07.080 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.081 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.081 TRACE lapin::io_loop: read 55 bytes 15:04:07.081 TRACE lapin::channels: will handle frame frame=Method(1, Basic(ConsumeOk(ConsumeOk { consumer_tag: ShortString("ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f") }))) 15:04:07.081 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.081 TRACE lapin::io_loop: io_loop run 15:04:07.081 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.081 TRACE lapin::consumer: consumer poll_next 15:04:07.081 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f 15:04:07.081 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag1.4f481ab9-177b-40da-9b4a-076ddddc7d8f 15:04:07.081 DEBUG test_service_error: amqp-rpc: Send request request=Request { context: Context { id: Some(1), app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), priority: 0, correlation_id: "53fc3ca5-bd51-11ef-ab3e-0682ef4da55f", user_id: None, message_kind: Service { method: "Hello", service: "helloworld.HelloWorld" }, reply_to: Some("amq.rabbitmq.reply-to"), self_created_time: 1734534247, request_created_time: 1734534247 }, timeout: 30s, server_timeout: None } 15:04:07.081 TRACE test_service_error: lapin::channel: send_frames channel=1 15:04:07.081 TRACE test_service_error: lapin::channel: wake channel=1 15:04:07.081 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.081 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Publish(Publish { exchange: ShortString("services"), routing_key: ShortString("helloworld.HelloWorld"), mandatory: false, immediate: false }))) 15:04:07.081 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Header 15:04:07.081 TRACE lapin::io_loop: wrote 251 bytes 15:04:07.081 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.081 TRACE lapin::io_loop: io_loop run 15:04:07.081 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.082 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:07.082 TRACE lapin::io_loop: read 370 bytes 15:04:07.082 TRACE lapin::channels: will handle frame frame=Method(2, Basic(Deliver(Deliver { consumer_tag: ShortString("ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca"), delivery_tag: 1, redelivered: false, exchange: ShortString("services"), routing_key: ShortString("helloworld.HelloWorld") }))) 15:04:07.082 TRACE lapin::channels: will handle frame frame=Header(2, 60, AMQPContentHeader { class_id: 60, body_size: 0, properties: AMQPProperties { content_type: Some(ShortString("application/x-protobuf")), content_encoding: None, headers: Some(FieldTable({})), delivery_mode: Some(2), priority: Some(0), correlation_id: Some(ShortString("53fc3ca5-bd51-11ef-ab3e-0682ef4da55f")), reply_to: Some(ShortString("amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw==")), expiration: Some(ShortString("30000")), message_id: Some(ShortString("1")), timestamp: Some(1734534247), kind: Some(ShortString("helloworld.HelloWorld:Hello")), user_id: None, app_id: Some(ShortString("pid-7817@runner-bbf8mlut-project-2833-concurrent-0")), cluster_id: None } }) 15:04:07.082 TRACE lapin::consumer: new_delivery consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:07.082 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.082 TRACE lapin::io_loop: io_loop run 15:04:07.082 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.082 TRACE lapin::consumer: consumer poll_next 15:04:07.082 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:07.083 TRACE lapin::consumer: delivery consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca delivery_tag=1 15:04:07.083 TRACE lapin::consumer: consumer poll_next 15:04:07.083 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:07.083 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:07.083 INFO request{correlation_id=53fc3ca5-bd51-11ef-ab3e-0682ef4da55f user_id=None method=Service { method: "Hello", service: "helloworld.HelloWorld" }}: amqp-rpc: New request request=Request { context: Context { id: Some(1), app_id: AppId("pid-7817@runner-bbf8mlut-project-2833-concurrent-0"), priority: 0, correlation_id: "53fc3ca5-bd51-11ef-ab3e-0682ef4da55f", user_id: None, message_kind: Service { method: "Hello", service: "helloworld.HelloWorld" }, reply_to: Some("amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw=="), self_created_time: 1734534247, request_created_time: 1734534247 }, timeout: 300s } 15:04:07.083 DEBUG amqp-rpc: Send response response=AMQPResponse { routing_key: "amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw==", properties: AMQPProperties { content_type: Some(ShortString("application/x-protobuf")), content_encoding: None, headers: Some(FieldTable({ShortString("request_received_time"): LongString(LongString([49, 55, 51, 52, 53, 51, 52, 50, 52, 55])), ShortString("response_sent_time"): LongString(LongString([49, 55, 51, 52, 53, 51, 52, 50, 52, 55]))})), delivery_mode: Some(2), priority: Some(0), correlation_id: Some(ShortString("53fc3ca5-bd51-11ef-ab3e-0682ef4da55f")), reply_to: None, expiration: None, message_id: Some(ShortString("1")), timestamp: Some(1734534247), kind: Some(ShortString("helloworld.HelloWorld:Hello")), user_id: None, app_id: Some(ShortString("pid-7817@runner-bbf8mlut-project-2833-concurrent-0")), cluster_id: None }, payload: 15 } 15:04:07.083 TRACE lapin::channel: send_frames channel=2 15:04:07.083 TRACE lapin::channel: wake channel=2 15:04:07.083 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Publish(Publish { exchange: ShortString(""), routing_key: ShortString("amq.rabbitmq.reply-to.g1h2AA5yZXBseUA5NDE0MzI1MQAAA0oAAAAAZ2LkYQ==.0zD8MF07KX/Ay+v4YOZWxw=="), mandatory: false, immediate: false }))) 15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Header 15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Body 15:04:07.083 TRACE lapin::io_loop: wrote 379 bytes 15:04:07.083 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.083 TRACE lapin::io_loop: io_loop run 15:04:07.083 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.083 TRACE lapin::publisher_confirm: PublisherConfirm dropped without use, registering it for wait_for_confirms 15:04:07.083 TRACE lapin::returned_messages: Dropped PublisherConfirm was ready but didn't carry a message, discarding 15:04:07.083 TRACE lapin::channel: send_frame channel=2 15:04:07.083 TRACE lapin::channel: wake channel=2 15:04:07.083 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:07.083 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Basic(Ack(Ack { delivery_tag: 1, multiple: false }))) 15:04:07.083 TRACE lapin::io_loop: wrote 21 bytes 15:04:07.083 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:07.083 TRACE lapin::io_loop: io_loop run 15:04:07.083 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:07.083 TRACE lapin::consumer: consumer poll_next 15:04:07.083 TRACE lapin::consumer: consumer poll; acquired inner lock consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:07.083 TRACE lapin::consumer: delivery; status=NotReady consumer_tag=ctag2.3b2cd2ce-89c7-4447-a85c-5f9f8337c1ca 15:04:17.044 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:17.044 TRACE lapin::io_loop: read 8 bytes 15:04:17.044 TRACE lapin::channels: will handle frame frame=Heartbeat(0) 15:04:17.044 DEBUG lapin::channels: received heartbeat from server 15:04:17.044 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:17.044 TRACE lapin::io_loop: io_loop run 15:04:17.044 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:17.084 DEBUG lapin::channels: send heartbeat 15:04:17.084 TRACE lapin::channel: send_frame channel=0 15:04:17.084 TRACE lapin::channel: wake channel=0 15:04:17.084 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:17.084 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Heartbeat 15:04:17.084 TRACE lapin::io_loop: wrote 8 bytes 15:04:17.084 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:17.084 TRACE lapin::io_loop: io_loop run 15:04:17.084 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:27.045 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:27.045 TRACE lapin::io_loop: read 8 bytes 15:04:27.045 TRACE lapin::channels: will handle frame frame=Heartbeat(0) 15:04:27.045 DEBUG lapin::channels: received heartbeat from server 15:04:27.045 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:27.045 TRACE lapin::io_loop: io_loop run 15:04:27.045 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:27.086 DEBUG lapin::channels: send heartbeat 15:04:27.086 TRACE lapin::channel: send_frame channel=0 15:04:27.086 TRACE lapin::channel: wake channel=0 15:04:27.086 TRACE lapin::socket_state: Got event for socket event=Wake 15:04:27.086 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Heartbeat 15:04:27.086 TRACE lapin::io_loop: wrote 8 bytes 15:04:27.086 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:27.086 TRACE lapin::io_loop: io_loop run 15:04:27.086 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:37.046 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:37.046 TRACE lapin::io_loop: read 8 bytes 15:04:37.046 TRACE lapin::channels: will handle frame frame=Heartbeat(0) 15:04:37.046 DEBUG lapin::channels: received heartbeat from server 15:04:37.046 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:37.046 TRACE lapin::io_loop: io_loop run 15:04:37.046 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:37.075 TRACE lapin::socket_state: Got event for socket event=Readable 15:04:37.075 TRACE lapin::io_loop: read 8 bytes 15:04:37.075 TRACE lapin::channels: will handle frame frame=Heartbeat(0) 15:04:37.075 DEBUG lapin::channels: received heartbeat from server 15:04:37.075 TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Connected 15:04:37.075 TRACE lapin::io_loop: io_loop run 15:04:37.075 TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false 15:04:37.082 DEBUG lapin::channels: send heartbeat 15:04:37.082 TRACE lapin::channel: send_frame channel=0 15:04:37.082 TRACE lapin::channel: wake channel=0 15:04:37.082 TRACE lapin::socket_state: Got event for socket event=Wake thread 'test_service_erro15:04:37.082 TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Heartbeat r' panicked at ptms-amqp-rpc/tests/test_helloworld/tests/test_service.rs:90:10: called `Result::unwrap()` on an `Err` value: ClientTimeout
The text was updated successfully, but these errors were encountered:
I don't see any evidence that this publisher uses publisher confirms. This approach to publisher is not safe by definition.
Direct Reply-to relies on channel state, that is, a single Erlang process on a single node. Again, it does not offer much in terms of data safety.
A library such as Lapin cannot possibly address either of those problems.
If you want higher data safety guarantees, use relevant data safety features and a replicated queue type (one per request-reply client).
Sorry, something went wrong.
No branches or pull requests
Hello! Thank you for the library! I have very strange issue: sometimes i don't get message from queue. My flow:
server
andclient
How it works:
basic_publish
request to the serverHere is logs:
The text was updated successfully, but these errors were encountered: