From bbbe9713cdc504da0a6a3780cad97dfe696f5170 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Fri, 24 Jan 2025 09:13:27 -0500 Subject: [PATCH] Fixes #1720: flush all incoming AMQP messages on connection drop --- src/adaptors/amqp/amqp_adaptor.c | 48 ++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 78e12592b..f3cb032a6 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1202,41 +1202,50 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) } -/** - * Link Detached Handler +/** Pull any pending incoming messages off the link and forward them + * + * (DISPATCH-1085): When Proton indicates that an incoming link has detached OR the parent connection/session of the link + * has abruptly closed (link has not cleanly detached) there may still be incoming messages buffered on the link that + * need to be processed. This helper function attempts to read all buffered messages and forward them. */ -static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link) +static void drain_link(qd_router_t *router, qd_link_t *qd_link) { - assert(link); - - pn_link_t *pn_link = qd_link_pn(link); - if (!pn_link) - return 0; - - // DISPATCH-1085: If link is in the middle of receiving a message it is - // possible that the message is actually complete but the remaining message - // data is still in proton's buffers. (e.g. a large message is sent then - // the sender immediately detaches) Force a call to the rx_handler for the - // link in order to pull the buffered data into the message. - if (pn_link_is_receiver(pn_link)) { + pn_link_t *pn_link = qd_link_pn(qd_link); + if (!!pn_link && pn_link_is_receiver(pn_link)) { pn_delivery_t *pnd = pn_link_current(pn_link); if (pnd) { qd_message_t *msg = qd_get_message_context(pnd); if (msg) { if (!qd_message_receive_complete(msg)) { - qd_link_set_q2_limit_unbounded(link, true); + qd_link_set_q2_limit_unbounded(qd_link, true); // since this thread owns link we can call the // rx_hander directly rather than schedule it via // the unblock handler: qd_message_clear_q2_unblocked_handler(msg); qd_message_Q2_holdoff_disable(msg); - while (AMQP_rx_handler(router, link)) + while (AMQP_rx_handler(router, qd_link)) ; } } } } +} + + +/** + * Link Detached Handler + */ +static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link) +{ + assert(link); + + pn_link_t *pn_link = qd_link_pn(link); + if (!pn_link) + return 0; + + // attempt to forward any remaining pending messages + drain_link(router, link); // Notify the core that a detach has been received. @@ -1265,6 +1274,9 @@ static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bo { assert(qd_link); + // attempt to forward any remaining pending messages + drain_link(router, qd_link); + // Clean up all qdr_delivery/pn_delivery bindings for the link. qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link); @@ -1277,7 +1289,7 @@ static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bo // This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call! qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv); - ref = DEQ_HEAD(*list); + ref = DEQ_HEAD(*list); // disconnecting the delivery removes ref from the list and frees it } qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);