Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-1720: flush all incoming AMQP messages on connection drop #1721

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1202,41 +1202,50 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
}


/**
* Link Detached Handler
/** Pull any pending incoming messages off the link and forward them
*
* (DISPATCH-1085): When Proton indicates that an incoming link has detached OR the parent connection/session of the link
* has abruptly closed (link has not cleanly detached) there may still be incoming messages buffered on the link that
* need to be processed. This helper function attempts to read all buffered messages and forward them.
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
static void drain_link(qd_router_t *router, qd_link_t *qd_link)
{
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
return 0;

// DISPATCH-1085: If link is in the middle of receiving a message it is
// possible that the message is actually complete but the remaining message
// data is still in proton's buffers. (e.g. a large message is sent then
// the sender immediately detaches) Force a call to the rx_handler for the
// link in order to pull the buffered data into the message.
if (pn_link_is_receiver(pn_link)) {
pn_link_t *pn_link = qd_link_pn(qd_link);
if (!!pn_link && pn_link_is_receiver(pn_link)) {
pn_delivery_t *pnd = pn_link_current(pn_link);
if (pnd) {
qd_message_t *msg = qd_get_message_context(pnd);
if (msg) {
if (!qd_message_receive_complete(msg)) {
qd_link_set_q2_limit_unbounded(link, true);
qd_link_set_q2_limit_unbounded(qd_link, true);

// since this thread owns link we can call the
// rx_hander directly rather than schedule it via
// the unblock handler:
qd_message_clear_q2_unblocked_handler(msg);
qd_message_Q2_holdoff_disable(msg);
while (AMQP_rx_handler(router, link))
while (AMQP_rx_handler(router, qd_link))
;
}
}
}
}
}


/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
{
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
return 0;

// attempt to forward any remaining pending messages
drain_link(router, link);

// Notify the core that a detach has been received.

Expand Down Expand Up @@ -1265,6 +1274,9 @@ static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bo
{
assert(qd_link);

// attempt to forward any remaining pending messages
drain_link(router, qd_link);

// Clean up all qdr_delivery/pn_delivery bindings for the link.

qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link);
Expand All @@ -1277,7 +1289,7 @@ static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bo

// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call!
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv);
ref = DEQ_HEAD(*list);
ref = DEQ_HEAD(*list); // disconnecting the delivery removes ref from the list and frees it
}

qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);
Expand Down
Loading