From 9498bc0a284358705c78613205864bffb244bd1e Mon Sep 17 00:00:00 2001 From: Kelly Littlepage Date: Wed, 5 Mar 2025 15:41:20 -0500 Subject: [PATCH] fix: check socket error codes to avoid overcounting short sends (fixes #1756) Connected Aeron sockets can produce zero-sized reads and writes in non-exceptional situations, e.g., when performing a network publication setup check against a host that hasn't started listening. This commit adds a check for non-exceptional socket errors to prevent incrementing the short send counter in such situations. Fixes https://github.com/aeron-io/aeron/issues/1756 --- aeron-client/src/main/c/aeron_socket.c | 6 +++--- aeron-client/src/main/c/util/aeron_netutil.c | 6 ++++++ aeron-client/src/main/c/util/aeron_netutil.h | 2 ++ .../src/main/c/aeron_driver_name_resolver.c | 2 +- aeron-driver/src/main/c/aeron_network_publication.c | 13 ++++++++----- .../main/c/media/aeron_receive_channel_endpoint.c | 10 +++++----- .../src/main/c/media/aeron_udp_channel_transport.c | 4 ++-- 7 files changed, 27 insertions(+), 16 deletions(-) diff --git a/aeron-client/src/main/c/aeron_socket.c b/aeron-client/src/main/c/aeron_socket.c index 520b060028..25f75ee135 100644 --- a/aeron-client/src/main/c/aeron_socket.c +++ b/aeron-client/src/main/c/aeron_socket.c @@ -120,7 +120,7 @@ ssize_t aeron_sendmsg(aeron_socket_t fd, struct msghdr *msghdr, int flags) if (result < 0) { - if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno) + if (aeron_is_acceptable_socket_error()) { return 0; } @@ -140,7 +140,7 @@ ssize_t aeron_send(aeron_socket_t fd, const void *buf, size_t len, int flags) if (result < 0) { - if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno) + if (aeron_is_acceptable_socket_error()) { return 0; } @@ -160,7 +160,7 @@ ssize_t aeron_recvmsg(aeron_socket_t fd, struct msghdr *msghdr, int flags) if (result < 0) { - if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno) + if (aeron_is_acceptable_socket_error()) { return 0; } diff --git a/aeron-client/src/main/c/util/aeron_netutil.c b/aeron-client/src/main/c/util/aeron_netutil.c index fd3ba7bb9f..9ad0119bb4 100644 --- a/aeron-client/src/main/c/util/aeron_netutil.c +++ b/aeron-client/src/main/c/util/aeron_netutil.c @@ -758,3 +758,9 @@ int aeron_sockaddr_storage_cmp(struct sockaddr_storage *a, struct sockaddr_stora return 0; } + +bool aeron_is_acceptable_socket_error(void) +{ + int err = errno; + return EAGAIN == err || EWOULDBLOCK == err || ECONNREFUSED == err || EINTR == err; +} diff --git a/aeron-client/src/main/c/util/aeron_netutil.h b/aeron-client/src/main/c/util/aeron_netutil.h index e2c6e79fd8..91024a295a 100644 --- a/aeron-client/src/main/c/util/aeron_netutil.h +++ b/aeron-client/src/main/c/util/aeron_netutil.h @@ -92,4 +92,6 @@ int aeron_netutil_get_so_buf_lengths(size_t *default_so_rcvbuf, size_t *default_ int aeron_sockaddr_storage_cmp(struct sockaddr_storage *a, struct sockaddr_storage *b, bool *result); +bool aeron_is_acceptable_socket_error(void); + #endif //AERON_NETUTIL_H diff --git a/aeron-driver/src/main/c/aeron_driver_name_resolver.c b/aeron-driver/src/main/c/aeron_driver_name_resolver.c index e1a08efa4e..c533a46426 100644 --- a/aeron-driver/src/main/c/aeron_driver_name_resolver.c +++ b/aeron-driver/src/main/c/aeron_driver_name_resolver.c @@ -760,7 +760,7 @@ static int aeron_driver_name_resolver_do_send( &resolver->data_paths, &resolver->transport, neighbor_address, &iov, 1, &bytes_sent); if (0 <= send_result) { - if (bytes_sent < (int64_t)iov.iov_len) + if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(resolver->short_sends_counter, 1); } diff --git a/aeron-driver/src/main/c/aeron_network_publication.c b/aeron-driver/src/main/c/aeron_network_publication.c index c46b62123d..592868925d 100644 --- a/aeron-driver/src/main/c/aeron_network_publication.c +++ b/aeron-driver/src/main/c/aeron_network_publication.c @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "util/aeron_netutil.h" #if defined(__linux__) #define _BSD_SOURCE #define _GNU_SOURCE @@ -470,7 +471,7 @@ int aeron_network_publication_setup_message_check( if (0 <= (result = aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent))) { - if (bytes_sent < (int64_t)iov.iov_len) + if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(publication->short_sends_counter, 1); } @@ -526,7 +527,7 @@ int aeron_network_publication_heartbeat_message_check( if (0 <= (result = aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent))) { result = (int)bytes_sent; - if (bytes_sent < (int64_t)iov.iov_len) + if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(publication->short_sends_counter, 1); } @@ -602,7 +603,9 @@ int aeron_network_publication_send_data( else if (result >= 0) { publication->current_messages_per_send = 1; - aeron_counter_increment(publication->short_sends_counter, 1); + if (!aeron_is_acceptable_socket_error()) { + aeron_counter_increment(publication->short_sends_counter, 1); + } } } else if (publication->track_sender_limits && available_window <= 0) @@ -720,7 +723,7 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter int sendmsg_result = aeron_network_publication_do_send(publication, &iov, 1, &msg_bytes_sent); if (0 <= sendmsg_result) { - if (msg_bytes_sent < (int64_t)iov.iov_len) + if (msg_bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(publication->short_sends_counter, 1); break; @@ -924,7 +927,7 @@ void aeron_network_publication_on_rttm( if (0 <= aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent)) { - if (bytes_sent < (int64_t)iov.iov_len) + if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(publication->short_sends_counter, 1); } diff --git a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c index ba5a67a593..d6a89c642f 100644 --- a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c +++ b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c @@ -291,7 +291,7 @@ int aeron_receive_channel_endpoint_send_sm( int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, control_addr, &iov); if (bytes_sent != (int)iov.iov_len) { - if (bytes_sent >= 0) + if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(endpoint->short_sends_counter, 1); } @@ -330,7 +330,7 @@ int aeron_receive_channel_endpoint_send_nak( int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, addr, &iov); if (bytes_sent != (int)iov.iov_len) { - if (bytes_sent >= 0) + if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(endpoint->short_sends_counter, 1); } @@ -383,7 +383,7 @@ int aeron_receive_channel_endpoint_send_rttm( int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, addr, &iov); if (bytes_sent != (int)iov.iov_len) { - if (bytes_sent >= 0) + if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(endpoint->short_sends_counter, 1); } @@ -417,7 +417,7 @@ int aeron_receive_channel_endpoint_send_response_setup( int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, addr, &iov); if (bytes_sent != (int)iov.iov_len) { - if (bytes_sent >= 0) + if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(endpoint->short_sends_counter, 1); } @@ -458,7 +458,7 @@ int aeron_receiver_channel_endpoint_send_error_frame( int bytes_sent = aeron_receive_channel_endpoint_send(channel_endpoint, destination, control_addr, &iov); if (bytes_sent != (int)iov.iov_len) { - if (bytes_sent >= 0) + if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error()) { aeron_counter_increment(channel_endpoint->short_sends_counter, 1); } diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c b/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c index ce970cf448..921101c6fb 100644 --- a/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c +++ b/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c @@ -463,7 +463,7 @@ int aeron_udp_channel_transport_recvmmsg( // ECONNREFUSED can sometimes occur with connected UDP sockets if ICMP traffic is able to indicate that the // remote end had closed on a previous send. - if (EINTR == err || EAGAIN == err || ECONNREFUSED == err) + if (aeron_is_acceptable_socket_error()) { return 0; } @@ -601,7 +601,7 @@ static int aeron_udp_channel_transport_sendv( int num_sent = sendmmsg(transport->fd, msg, msg_i, 0); if (num_sent < 0) { - if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno) + if (aeron_is_acceptable_socket_error()) { return 0; }