Skip to content

Commit

Permalink
Fix recv() 120s Timeout on EAGAIN by Retrying Indefinitely
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Feb 27, 2025
1 parent 85c5539 commit 7cf71ca
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 43 deletions.
3 changes: 2 additions & 1 deletion api/librdb-ext-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ typedef enum {
RDBX_ERR_RESP_INVALID_TARGET_VERSION,
RDBX_ERR_RESP_READ,
RDBX_ERR_RESP2REDIS_CREATE_SOCKET,
RDBX_ERR_RESP2REDIS_CONF_NONBLOCK_SOCKET,
RDBX_ERR_RESP2REDIS_CONF_BLOCK_SOCKET,
RDBX_ERR_RESP2REDIS_INVALID_ADDRESS,
RDBX_ERR_RESP2REDIS_FAILED_CONNECT,
RDBX_ERR_RESP2REDIS_FAILED_READ,
RDBX_ERR_RESP2REDIS_FAILED_WRITE,
RDBX_ERR_RESP2REDIS_CONN_CLOSE,
RDBX_ERR_RESP2REDIS_MAX_RETRIES,
RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
RDBX_ERR_RESP2REDIS_AUTH_FAILED,
} RdbxRes;

/****************************************************************
Expand Down
4 changes: 3 additions & 1 deletion src/cli/rdb-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ static void logger(RdbLogLevel l, const char *msg) {
[RDB_LOG_DBG] = "DEBUG :",
};

if (logfile != NULL)
if (logfile != NULL) {
fprintf(logfile, "%s %s\n", logLevelStr[l], msg);
fflush(logfile);
}

if (l == RDB_LOG_ERR)
printf("%s %s\n", logLevelStr[l], msg);
Expand Down
92 changes: 56 additions & 36 deletions src/ext/respToRedisLoader.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include "extCommon.h"
#include "readerResp.h"

Expand All @@ -14,16 +15,16 @@
#include <openssl/err.h>
#endif

#define PIPELINE_DEPTH_DEF 200 /* Default Number of pending cmds before waiting for response(s) */
#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */
#define PIPELINE_DEPTH_DEF 200 /* Default Number of pending cmds before waiting for response(s) */
#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */

#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */
#define RECORDED_KEY_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */
#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */
#define RECORDED_KEY_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */

#define REPLY_BUFF_SIZE 1024 /* reply buffer size */

#define MAX_EINTR_RETRY 3
#define REPLY_BUFF_SIZE 1024 /* reply buffer size */

#define MAX_EINTR_RETRY 5
#define RECV_CMD_TIMEOUT_SEC 10 /* recv() command timeout in seconds */

struct RdbxRespToRedisLoader {

Expand Down Expand Up @@ -86,22 +87,22 @@ static int onReadRepliesErrorCb(void *context, char *msg) {
*
* numToRead - minimum number of replies to read from the socket before
* returning.
* sendError - if set, an error occurred while writing to the server. In
* sentError - if set, an error occurred while writing to the server. In
* this case the function will try to read replies from the
* server. Maybe one of the replies will contain an error message
* that explains why write got failed. Whether error message is
* received or not, the function will return to the original issue.
*
* Return 0 for success, 1 otherwise. */
static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) {
int noDataEv = 0;
static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sentError) {
int retries = 0;
char buff[REPLY_BUFF_SIZE];

RespReaderCtx *respReader = &ctx->respReader;
size_t countRepliesBefore = respReader->countReplies;
size_t repliesExpected = respReader->countReplies + numToRead;

while ((respReader->countReplies < repliesExpected) || (sendError)) {
while ((respReader->countReplies < repliesExpected) || (sentError)) {
int bytesReceived = recv(ctx->fd, buff, sizeof(buff), 0);

if (bytesReceived > 0) {
Expand All @@ -114,18 +115,25 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError)

/* handle error */

if (sendError)
return 0; /* Failed read error message from dst. Back to original issue. */
if (sentError)
return 0; /* Done lookup for error message. Return to original issue */

if (bytesReceived == 0) {
RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE,
"Connection closed by the remote side");
return 1;
} else {
if ((!noDataEv) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
noDataEv = 1;
RDB_log(ctx->p, RDB_LOG_WRN, "No data available from redis-server");
continue; /* Try one more time (Timeout is 60sec) */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
retries++;
RDB_log(ctx->p, RDB_LOG_INF,
"No reply from redis-server for %d seconds",
retries * RECV_CMD_TIMEOUT_SEC);

/* Parser got external error? Currently Used only for testing */
if (RDB_getErrorCode(ctx->p) != RDB_OK)
return 1;

continue;
}

RDB_reportError(ctx->p,
Expand Down Expand Up @@ -209,7 +217,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt,

/* Error occurred. Try to receive error msg from dst, which might explain
why write got failed */
readReplies(ctx, 0, 1/*sendError*/);
readReplies(ctx, 0, 1/*sentError*/);
return 1;
}

Expand Down Expand Up @@ -324,18 +332,35 @@ _LIBRDB_API void RDBX_setPipelineDepth(RdbxRespToRedisLoader *r2r, int depth) {
r2r->pendingCmds.pipelineDepth = (depth <= 0 || depth>PIPELINE_DEPTH_MAX) ? PIPELINE_DEPTH_DEF : depth;
}

/* Create a loader from an existing file descriptor */
_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p,
RdbxToResp *rdbToResp,
RdbxRedisAuth *auth,
int fd) {
RdbxRespToRedisLoader *ctx;
if ((ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader))) == NULL) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC,
"Failed to allocate struct RdbxRespToRedisLoader");
/* Ensure the socket is in blocking mode */
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1 || fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == -1) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CONF_BLOCK_SOCKET,
"Failed to configure for blocking mode. errno=%d: %s",
errno, strerror(errno));
return NULL;
}

/* Set receive timeout (blocking, but with a limit) */
struct timeval timeout = { .tv_sec = RECV_CMD_TIMEOUT_SEC, .tv_usec = 0 };
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
"Failed to configure for blocking mode. errno=%d: %s",
errno, strerror(errno));
return NULL;
}

RdbxRespToRedisLoader *ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader));
if (!ctx) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC, "Failed to allocate struct RdbxRespToRedisLoader");
return NULL;
}

/* init RdbxRespToRedisLoader context */
memset(ctx, 0, sizeof(RdbxRespToRedisLoader));
ctx->p = p;
ctx->fd = fd;
Expand All @@ -345,23 +370,27 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p,
readRespInit(&ctx->respReader);
setErrorCb(&ctx->respReader, ctx, onReadRepliesErrorCb);

if (auth && (redisAuth(ctx, auth) != RDB_OK))
if (auth && (redisAuth(ctx, auth) != RDB_OK)) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_AUTH_FAILED, "Redis authentication failed.");
RDB_free(p, ctx);
return NULL;
}

/* Set 'this' writer to rdbToResp */
/* Set writer to rdbToResp */
RdbxRespWriter inst = {ctx, redisLoaderDelete, redisLoaderWritev, redisLoaderFlush};
RDBX_attachRespWriter(rdbToResp, &inst);
return ctx;
}

/* Create a loader and establish a TCP connection */
_LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p,
RdbxToResp *rdbToResp,
RdbxRedisAuth *auth,
const char *hostname,
int port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CREATE_SOCKET, "Failed to create tcp socket");
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CREATE_SOCKET, "Failed to create TCP socket");
return NULL;
}

Expand All @@ -377,21 +406,12 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p,
}

if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_INVALID_ADDRESS,
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_CONNECT,
"Failed to connect(hostname=%s, port=%d) => errno=%d",
hostname, port, errno);
goto createErr;
}

/* Set the recv() timeout. Avoid blocking forever. */
struct timeval timeout = { .tv_sec = 60, .tv_usec = 0 };
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_SET_TIMEOUT,
"Failed to set socket receive timeout. errno=%d: %s",
errno, strerror(errno));
goto createErr;
}

RdbxRespToRedisLoader *res = RDBX_createRespToRedisFd(p, rdbToResp, auth, sockfd);

if (!res) goto createErr;
Expand Down
85 changes: 80 additions & 5 deletions test/test_rdb_to_redis.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <arpa/inet.h>
#include <assert.h>
#include "test_common.h"

int serverMajorVer, serverMinorVer;
Expand Down Expand Up @@ -303,7 +303,7 @@ static void test_rdb_to_redis_function(void **state) {
}

/* test relied on rdbtest module within redis repo, if available */
void test_rdb_to_redis_module(void **state) {
static void test_rdb_to_redis_module(void **state) {
UNUSED(state);

/* Skip test if testrdb is not loaded */
Expand Down Expand Up @@ -360,7 +360,7 @@ static void test_rdb_to_redis_module_aux_empty(void **state) {
rdb_to_tcp(DUMP_FOLDER("module_aux_empty.rdb"), 1, 1, NULL);
}

void test_rdb_to_redis_stream(void **state) {
static void test_rdb_to_redis_stream(void **state) {
UNUSED(state);
test_rdb_to_redis_common(DUMP_FOLDER("stream_v11.rdb"), 1, NULL, NULL);
}
Expand Down Expand Up @@ -523,6 +523,80 @@ static void test_rdb_to_redis_func_lib_replace_if_exist(void **state) {
}
}

/* Create dummy TCP server that doesn't respond to the client and verify that
* the parser retries after TIMEOUT_SECONDS. Not part of CI since it takes to long */
int countdownRetries;
RdbParser *parser;
void dummyTcpTimeoutLogger(RdbLogLevel l, const char *msg) {
UNUSED(l);
if (strstr(msg, "No reply from redis-server for") != NULL)
if (--countdownRetries == 0)
RDB_reportError(parser, (RdbRes)12345678, "Inject error to end the test");
}
void test_rdb_tcp_timeout(void **state) {
UNUSED(state);
const int RECV_TIMEOUT_SECONDS = 10; /* socket retry timeout */
int test_retries = 3; /* limit test to finite number of retries */
int server_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);

/* expected to retry 3 times before ending the test */
countdownRetries = test_retries;

/* Dummy TCP server that only receives messages but does not respond */
server_fd = socket(AF_INET, SOCK_STREAM, 0);
assert_true(server_fd >= 0);

int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(0);
assert_int_equal(bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)), 0);
socklen_t addr_len = sizeof(server_addr);
assert_int_equal(getsockname(server_fd, (struct sockaddr *)&server_addr, &addr_len), 0);
int assigned_port = ntohs(server_addr.sin_port);
assert_int_equal(listen(server_fd, 1), 0);
printf("Dummy TCP server started, waiting for client to connect...\n");

parser = RDB_createParserRdb(NULL);
RDB_setLogLevel(parser, RDB_LOG_INF);
RDB_setLogger(parser, dummyTcpTimeoutLogger);
assert_non_null(RDBX_createReaderFile(parser, DUMP_FOLDER("single_key.rdb")));

RdbxToRespConf rdb2respConf = {
.supportRestore = 1,
.dstRedisVersion = getTargetRedisVersion(NULL, NULL),
.supportRestoreModuleAux = isSupportRestoreModuleAux()
};

RdbxToResp *rdbToResp;
assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf));
assert_non_null(RDBX_createRespToRedisTcp(parser, rdbToResp, NULL, "127.0.0.1", assigned_port));

client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
assert_true(client_fd >= 0);

/* Start the timer to measure timeout and run parser */
time_t start_time = time(NULL);
RdbStatus status;
while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA);

/* Verify dummy error code */
assert_int_equal(RDB_getErrorCode(parser), 12345678);

/* Measure elapsed time and verify it's within the expected range */
time_t elapsedTime = time(NULL) - start_time;
int expectedTime = RECV_TIMEOUT_SECONDS * test_retries;
printf("Elapsed time: %ld, expected time: %d\n", elapsedTime, expectedTime);
assert_in_range(elapsedTime, expectedTime - 2, expectedTime + 2);

RDB_deleteParser(parser);
close(client_fd);
close(server_fd);
}

/*************************** group_rdb_to_redis *******************************/
int group_rdb_to_redis(void) {

Expand Down Expand Up @@ -580,6 +654,7 @@ int group_rdb_to_redis(void) {
cmocka_unit_test_setup(test_rdb_to_redis_multiple_dbs, setupTest),
cmocka_unit_test_setup(test_rdb_to_redis_function, setupTest),
cmocka_unit_test_setup(test_rdb_to_redis_func_lib_replace_if_exist, setupTest),
//cmocka_unit_test_setup(test_rdb_tcp_timeout, setupTest), /* too long to run */
};

int res = cmocka_run_group_tests(tests, NULL, NULL);
Expand Down

0 comments on commit 7cf71ca

Please sign in to comment.