Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check socket error codes to avoid overcounting short sends (fixes #1756) #1759

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions aeron-client/src/main/c/aeron_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ ssize_t aeron_sendmsg(aeron_socket_t fd, struct msghdr *msghdr, int flags)

if (result < 0)
{
if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno)
if (aeron_is_acceptable_socket_error())
{
return 0;
}
Expand All @@ -140,7 +140,7 @@ ssize_t aeron_send(aeron_socket_t fd, const void *buf, size_t len, int flags)

if (result < 0)
{
if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno)
if (aeron_is_acceptable_socket_error())
{
return 0;
}
Expand All @@ -160,7 +160,7 @@ ssize_t aeron_recvmsg(aeron_socket_t fd, struct msghdr *msghdr, int flags)

if (result < 0)
{
if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno)
if (aeron_is_acceptable_socket_error())
{
return 0;
}
Expand Down
6 changes: 6 additions & 0 deletions aeron-client/src/main/c/util/aeron_netutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -758,3 +758,9 @@ int aeron_sockaddr_storage_cmp(struct sockaddr_storage *a, struct sockaddr_stora

return 0;
}

bool aeron_is_acceptable_socket_error(void)
{
int err = errno;
return EAGAIN == err || EWOULDBLOCK == err || ECONNREFUSED == err || EINTR == err;
}
2 changes: 2 additions & 0 deletions aeron-client/src/main/c/util/aeron_netutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ int aeron_netutil_get_so_buf_lengths(size_t *default_so_rcvbuf, size_t *default_

int aeron_sockaddr_storage_cmp(struct sockaddr_storage *a, struct sockaddr_storage *b, bool *result);

bool aeron_is_acceptable_socket_error(void);

#endif //AERON_NETUTIL_H
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_driver_name_resolver.c
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ static int aeron_driver_name_resolver_do_send(
&resolver->data_paths, &resolver->transport, neighbor_address, &iov, 1, &bytes_sent);
if (0 <= send_result)
{
if (bytes_sent < (int64_t)iov.iov_len)
if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(resolver->short_sends_counter, 1);
}
Expand Down
13 changes: 8 additions & 5 deletions aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "util/aeron_netutil.h"
#if defined(__linux__)
#define _BSD_SOURCE
#define _GNU_SOURCE
Expand Down Expand Up @@ -470,7 +471,7 @@ int aeron_network_publication_setup_message_check(

if (0 <= (result = aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent)))
{
if (bytes_sent < (int64_t)iov.iov_len)
if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(publication->short_sends_counter, 1);
}
Expand Down Expand Up @@ -526,7 +527,7 @@ int aeron_network_publication_heartbeat_message_check(
if (0 <= (result = aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent)))
{
result = (int)bytes_sent;
if (bytes_sent < (int64_t)iov.iov_len)
if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(publication->short_sends_counter, 1);
}
Expand Down Expand Up @@ -602,7 +603,9 @@ int aeron_network_publication_send_data(
else if (result >= 0)
{
publication->current_messages_per_send = 1;
aeron_counter_increment(publication->short_sends_counter, 1);
if (!aeron_is_acceptable_socket_error()) {
aeron_counter_increment(publication->short_sends_counter, 1);
}
}
}
else if (publication->track_sender_limits && available_window <= 0)
Expand Down Expand Up @@ -720,7 +723,7 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter
int sendmsg_result = aeron_network_publication_do_send(publication, &iov, 1, &msg_bytes_sent);
if (0 <= sendmsg_result)
{
if (msg_bytes_sent < (int64_t)iov.iov_len)
if (msg_bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(publication->short_sends_counter, 1);
break;
Expand Down Expand Up @@ -924,7 +927,7 @@ void aeron_network_publication_on_rttm(

if (0 <= aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent))
{
if (bytes_sent < (int64_t)iov.iov_len)
if (bytes_sent < (int64_t)iov.iov_len && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(publication->short_sends_counter, 1);
}
Expand Down
10 changes: 5 additions & 5 deletions aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ int aeron_receive_channel_endpoint_send_sm(
int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, control_addr, &iov);
if (bytes_sent != (int)iov.iov_len)
{
if (bytes_sent >= 0)
if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(endpoint->short_sends_counter, 1);
}
Expand Down Expand Up @@ -330,7 +330,7 @@ int aeron_receive_channel_endpoint_send_nak(
int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, addr, &iov);
if (bytes_sent != (int)iov.iov_len)
{
if (bytes_sent >= 0)
if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(endpoint->short_sends_counter, 1);
}
Expand Down Expand Up @@ -383,7 +383,7 @@ int aeron_receive_channel_endpoint_send_rttm(
int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, addr, &iov);
if (bytes_sent != (int)iov.iov_len)
{
if (bytes_sent >= 0)
if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(endpoint->short_sends_counter, 1);
}
Expand Down Expand Up @@ -417,7 +417,7 @@ int aeron_receive_channel_endpoint_send_response_setup(
int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, addr, &iov);
if (bytes_sent != (int)iov.iov_len)
{
if (bytes_sent >= 0)
if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(endpoint->short_sends_counter, 1);
}
Expand Down Expand Up @@ -458,7 +458,7 @@ int aeron_receiver_channel_endpoint_send_error_frame(
int bytes_sent = aeron_receive_channel_endpoint_send(channel_endpoint, destination, control_addr, &iov);
if (bytes_sent != (int)iov.iov_len)
{
if (bytes_sent >= 0)
if (bytes_sent >= 0 && !aeron_is_acceptable_socket_error())
{
aeron_counter_increment(channel_endpoint->short_sends_counter, 1);
}
Expand Down
4 changes: 2 additions & 2 deletions aeron-driver/src/main/c/media/aeron_udp_channel_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ int aeron_udp_channel_transport_recvmmsg(

// ECONNREFUSED can sometimes occur with connected UDP sockets if ICMP traffic is able to indicate that the
// remote end had closed on a previous send.
if (EINTR == err || EAGAIN == err || ECONNREFUSED == err)
if (aeron_is_acceptable_socket_error())
{
return 0;
}
Expand Down Expand Up @@ -601,7 +601,7 @@ static int aeron_udp_channel_transport_sendv(
int num_sent = sendmmsg(transport->fd, msg, msg_i, 0);
if (num_sent < 0)
{
if (EAGAIN == errno || EWOULDBLOCK == errno || ECONNREFUSED == errno || EINTR == errno)
if (aeron_is_acceptable_socket_error())
{
return 0;
}
Expand Down
Loading