Skip to content

Commit

Permalink
Fixes #1720: flush all incoming AMQP messages on connection drop
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Jan 24, 2025
1 parent 2a0aaba commit bbbe971
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1202,41 +1202,50 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
}


/**
* Link Detached Handler
/** Pull any pending incoming messages off the link and forward them
*
* (DISPATCH-1085): When Proton indicates that an incoming link has detached OR the parent connection/session of the link
* has abruptly closed (link has not cleanly detached) there may still be incoming messages buffered on the link that
* need to be processed. This helper function attempts to read all buffered messages and forward them.
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
static void drain_link(qd_router_t *router, qd_link_t *qd_link)
{
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
return 0;

// DISPATCH-1085: If link is in the middle of receiving a message it is
// possible that the message is actually complete but the remaining message
// data is still in proton's buffers. (e.g. a large message is sent then
// the sender immediately detaches) Force a call to the rx_handler for the
// link in order to pull the buffered data into the message.
if (pn_link_is_receiver(pn_link)) {
pn_link_t *pn_link = qd_link_pn(qd_link);
if (!!pn_link && pn_link_is_receiver(pn_link)) {
pn_delivery_t *pnd = pn_link_current(pn_link);
if (pnd) {
qd_message_t *msg = qd_get_message_context(pnd);
if (msg) {
if (!qd_message_receive_complete(msg)) {
qd_link_set_q2_limit_unbounded(link, true);
qd_link_set_q2_limit_unbounded(qd_link, true);

// since this thread owns link we can call the
// rx_hander directly rather than schedule it via
// the unblock handler:
qd_message_clear_q2_unblocked_handler(msg);
qd_message_Q2_holdoff_disable(msg);
while (AMQP_rx_handler(router, link))
while (AMQP_rx_handler(router, qd_link))
;
}
}
}
}
}


/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
{
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
return 0;

// attempt to forward any remaining pending messages
drain_link(router, link);

// Notify the core that a detach has been received.

Expand Down Expand Up @@ -1265,6 +1274,9 @@ static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bo
{
assert(qd_link);

// attempt to forward any remaining pending messages
drain_link(router, qd_link);

// Clean up all qdr_delivery/pn_delivery bindings for the link.

qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link);
Expand All @@ -1277,7 +1289,7 @@ static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bo

// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call!
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv);
ref = DEQ_HEAD(*list);
ref = DEQ_HEAD(*list); // disconnecting the delivery removes ref from the list and frees it
}

qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);
Expand Down

0 comments on commit bbbe971

Please sign in to comment.