diff --git a/api/librdb-ext-api.h b/api/librdb-ext-api.h index 7b5d974..02e1472 100644 --- a/api/librdb-ext-api.h +++ b/api/librdb-ext-api.h @@ -43,7 +43,7 @@ typedef enum { RDBX_ERR_RESP_INVALID_TARGET_VERSION, RDBX_ERR_RESP_READ, RDBX_ERR_RESP2REDIS_CREATE_SOCKET, - RDBX_ERR_RESP2REDIS_CONF_NONBLOCK_SOCKET, + RDBX_ERR_RESP2REDIS_CONF_BLOCK_SOCKET, RDBX_ERR_RESP2REDIS_INVALID_ADDRESS, RDBX_ERR_RESP2REDIS_FAILED_CONNECT, RDBX_ERR_RESP2REDIS_FAILED_READ, @@ -51,6 +51,7 @@ typedef enum { RDBX_ERR_RESP2REDIS_CONN_CLOSE, RDBX_ERR_RESP2REDIS_MAX_RETRIES, RDBX_ERR_RESP2REDIS_SET_TIMEOUT, + RDBX_ERR_RESP2REDIS_AUTH_FAILED, } RdbxRes; /**************************************************************** diff --git a/src/cli/rdb-cli.c b/src/cli/rdb-cli.c index 777aac1..ad4fda3 100644 --- a/src/cli/rdb-cli.c +++ b/src/cli/rdb-cli.c @@ -77,8 +77,10 @@ static void logger(RdbLogLevel l, const char *msg) { [RDB_LOG_DBG] = "DEBUG :", }; - if (logfile != NULL) + if (logfile != NULL) { fprintf(logfile, "%s %s\n", logLevelStr[l], msg); + fflush(logfile); + } if (l == RDB_LOG_ERR) printf("%s %s\n", logLevelStr[l], msg); diff --git a/src/ext/respToRedisLoader.c b/src/ext/respToRedisLoader.c index fd84912..562bfd8 100644 --- a/src/ext/respToRedisLoader.c +++ b/src/ext/respToRedisLoader.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "extCommon.h" #include "readerResp.h" @@ -14,16 +15,16 @@ #include #endif -#define PIPELINE_DEPTH_DEF 200 /* Default Number of pending cmds before waiting for response(s) */ -#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */ +#define PIPELINE_DEPTH_DEF 200 /* Default Number of pending cmds before waiting for response(s) */ +#define PIPELINE_DEPTH_MAX 1000 /* limit the max value allowed to configure for pipeline depth */ -#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */ -#define RECORDED_KEY_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */ +#define NUM_RECORDED_CMDS 400 /* Number of commands to backlog, in a cyclic array */ +#define RECORDED_KEY_MAX_LEN 40 /* Maximum payload size from any command to record into cyclic array */ -#define REPLY_BUFF_SIZE 1024 /* reply buffer size */ - -#define MAX_EINTR_RETRY 3 +#define REPLY_BUFF_SIZE 1024 /* reply buffer size */ +#define MAX_EINTR_RETRY 5 +#define RECV_CMD_TIMEOUT_SEC 10 /* recv() command timeout in seconds */ struct RdbxRespToRedisLoader { @@ -86,22 +87,22 @@ static int onReadRepliesErrorCb(void *context, char *msg) { * * numToRead - minimum number of replies to read from the socket before * returning. - * sendError - if set, an error occurred while writing to the server. In + * sentError - if set, an error occurred while writing to the server. In * this case the function will try to read replies from the * server. Maybe one of the replies will contain an error message * that explains why write got failed. Whether error message is * received or not, the function will return to the original issue. * * Return 0 for success, 1 otherwise. */ -static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) { - int noDataEv = 0; +static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sentError) { + int retries = 0; char buff[REPLY_BUFF_SIZE]; RespReaderCtx *respReader = &ctx->respReader; size_t countRepliesBefore = respReader->countReplies; size_t repliesExpected = respReader->countReplies + numToRead; - while ((respReader->countReplies < repliesExpected) || (sendError)) { + while ((respReader->countReplies < repliesExpected) || (sentError)) { int bytesReceived = recv(ctx->fd, buff, sizeof(buff), 0); if (bytesReceived > 0) { @@ -114,18 +115,25 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead, int sendError) /* handle error */ - if (sendError) - return 0; /* Failed read error message from dst. Back to original issue. */ + if (sentError) + return 0; /* Done lookup for error message. Return to original issue */ if (bytesReceived == 0) { RDB_reportError(ctx->p, (RdbRes) RDBX_ERR_RESP2REDIS_CONN_CLOSE, "Connection closed by the remote side"); return 1; } else { - if ((!noDataEv) && (errno == EAGAIN || errno == EWOULDBLOCK)) { - noDataEv = 1; - RDB_log(ctx->p, RDB_LOG_WRN, "No data available from redis-server"); - continue; /* Try one more time (Timeout is 60sec) */ + if (errno == EAGAIN || errno == EWOULDBLOCK) { + retries++; + RDB_log(ctx->p, RDB_LOG_INF, + "No reply from redis-server for %d seconds", + retries * RECV_CMD_TIMEOUT_SEC); + + /* Parser got external error? Currently Used only for testing */ + if (RDB_getErrorCode(ctx->p) != RDB_OK) + return 1; + + continue; } RDB_reportError(ctx->p, @@ -209,7 +217,7 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt, /* Error occurred. Try to receive error msg from dst, which might explain why write got failed */ - readReplies(ctx, 0, 1/*sendError*/); + readReplies(ctx, 0, 1/*sentError*/); return 1; } @@ -324,18 +332,35 @@ _LIBRDB_API void RDBX_setPipelineDepth(RdbxRespToRedisLoader *r2r, int depth) { r2r->pendingCmds.pipelineDepth = (depth <= 0 || depth>PIPELINE_DEPTH_MAX) ? PIPELINE_DEPTH_DEF : depth; } +/* Create a loader from an existing file descriptor */ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p, RdbxToResp *rdbToResp, RdbxRedisAuth *auth, int fd) { - RdbxRespToRedisLoader *ctx; - if ((ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader))) == NULL) { - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC, - "Failed to allocate struct RdbxRespToRedisLoader"); + /* Ensure the socket is in blocking mode */ + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1 || fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == -1) { + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CONF_BLOCK_SOCKET, + "Failed to configure for blocking mode. errno=%d: %s", + errno, strerror(errno)); + return NULL; + } + + /* Set receive timeout (blocking, but with a limit) */ + struct timeval timeout = { .tv_sec = RECV_CMD_TIMEOUT_SEC, .tv_usec = 0 }; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) { + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_SET_TIMEOUT, + "Failed to configure for blocking mode. errno=%d: %s", + errno, strerror(errno)); + return NULL; + } + + RdbxRespToRedisLoader *ctx = RDB_alloc(p, sizeof(RdbxRespToRedisLoader)); + if (!ctx) { + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP_FAILED_ALLOC, "Failed to allocate struct RdbxRespToRedisLoader"); return NULL; } - /* init RdbxRespToRedisLoader context */ memset(ctx, 0, sizeof(RdbxRespToRedisLoader)); ctx->p = p; ctx->fd = fd; @@ -345,15 +370,19 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisFd(RdbParser *p, readRespInit(&ctx->respReader); setErrorCb(&ctx->respReader, ctx, onReadRepliesErrorCb); - if (auth && (redisAuth(ctx, auth) != RDB_OK)) + if (auth && (redisAuth(ctx, auth) != RDB_OK)) { + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_AUTH_FAILED, "Redis authentication failed."); + RDB_free(p, ctx); return NULL; + } - /* Set 'this' writer to rdbToResp */ + /* Set writer to rdbToResp */ RdbxRespWriter inst = {ctx, redisLoaderDelete, redisLoaderWritev, redisLoaderFlush}; RDBX_attachRespWriter(rdbToResp, &inst); return ctx; } +/* Create a loader and establish a TCP connection */ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p, RdbxToResp *rdbToResp, RdbxRedisAuth *auth, @@ -361,7 +390,7 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p, int port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd == -1) { - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CREATE_SOCKET, "Failed to create tcp socket"); + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_CREATE_SOCKET, "Failed to create TCP socket"); return NULL; } @@ -377,21 +406,12 @@ _LIBRDB_API RdbxRespToRedisLoader *RDBX_createRespToRedisTcp(RdbParser *p, } if (connect(sockfd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) { - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_INVALID_ADDRESS, + RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_FAILED_CONNECT, "Failed to connect(hostname=%s, port=%d) => errno=%d", hostname, port, errno); goto createErr; } - /* Set the recv() timeout. Avoid blocking forever. */ - struct timeval timeout = { .tv_sec = 60, .tv_usec = 0 }; - if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) { - RDB_reportError(p, (RdbRes) RDBX_ERR_RESP2REDIS_SET_TIMEOUT, - "Failed to set socket receive timeout. errno=%d: %s", - errno, strerror(errno)); - goto createErr; - } - RdbxRespToRedisLoader *res = RDBX_createRespToRedisFd(p, rdbToResp, auth, sockfd); if (!res) goto createErr; diff --git a/test/test_rdb_to_redis.c b/test/test_rdb_to_redis.c index fc5ac4b..ef65088 100644 --- a/test/test_rdb_to_redis.c +++ b/test/test_rdb_to_redis.c @@ -1,9 +1,9 @@ #include #include -#include #include -#include -#include +#include +#include +#include #include "test_common.h" int serverMajorVer, serverMinorVer; @@ -303,7 +303,7 @@ static void test_rdb_to_redis_function(void **state) { } /* test relied on rdbtest module within redis repo, if available */ -void test_rdb_to_redis_module(void **state) { +static void test_rdb_to_redis_module(void **state) { UNUSED(state); /* Skip test if testrdb is not loaded */ @@ -360,7 +360,7 @@ static void test_rdb_to_redis_module_aux_empty(void **state) { rdb_to_tcp(DUMP_FOLDER("module_aux_empty.rdb"), 1, 1, NULL); } -void test_rdb_to_redis_stream(void **state) { +static void test_rdb_to_redis_stream(void **state) { UNUSED(state); test_rdb_to_redis_common(DUMP_FOLDER("stream_v11.rdb"), 1, NULL, NULL); } @@ -523,6 +523,80 @@ static void test_rdb_to_redis_func_lib_replace_if_exist(void **state) { } } +/* Create dummy TCP server that doesn't respond to the client and verify that + * the parser retries after TIMEOUT_SECONDS. Not part of CI since it takes to long */ +int countdownRetries; +RdbParser *parser; +void dummyTcpTimeoutLogger(RdbLogLevel l, const char *msg) { + UNUSED(l); + if (strstr(msg, "No reply from redis-server for") != NULL) + if (--countdownRetries == 0) + RDB_reportError(parser, (RdbRes)12345678, "Inject error to end the test"); +} +void test_rdb_tcp_timeout(void **state) { + UNUSED(state); + const int RECV_TIMEOUT_SECONDS = 10; /* socket retry timeout */ + int test_retries = 3; /* limit test to finite number of retries */ + int server_fd, client_fd; + struct sockaddr_in server_addr, client_addr; + socklen_t client_len = sizeof(client_addr); + + /* expected to retry 3 times before ending the test */ + countdownRetries = test_retries; + + /* Dummy TCP server that only receives messages but does not respond */ + server_fd = socket(AF_INET, SOCK_STREAM, 0); + assert_true(server_fd >= 0); + + int opt = 1; + setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(0); + assert_int_equal(bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)), 0); + socklen_t addr_len = sizeof(server_addr); + assert_int_equal(getsockname(server_fd, (struct sockaddr *)&server_addr, &addr_len), 0); + int assigned_port = ntohs(server_addr.sin_port); + assert_int_equal(listen(server_fd, 1), 0); + printf("Dummy TCP server started, waiting for client to connect...\n"); + + parser = RDB_createParserRdb(NULL); + RDB_setLogLevel(parser, RDB_LOG_INF); + RDB_setLogger(parser, dummyTcpTimeoutLogger); + assert_non_null(RDBX_createReaderFile(parser, DUMP_FOLDER("single_key.rdb"))); + + RdbxToRespConf rdb2respConf = { + .supportRestore = 1, + .dstRedisVersion = getTargetRedisVersion(NULL, NULL), + .supportRestoreModuleAux = isSupportRestoreModuleAux() + }; + + RdbxToResp *rdbToResp; + assert_non_null(rdbToResp = RDBX_createHandlersToResp(parser, &rdb2respConf)); + assert_non_null(RDBX_createRespToRedisTcp(parser, rdbToResp, NULL, "127.0.0.1", assigned_port)); + + client_fd = accept(server_fd, (struct sockaddr *)&client_addr, &client_len); + assert_true(client_fd >= 0); + + /* Start the timer to measure timeout and run parser */ + time_t start_time = time(NULL); + RdbStatus status; + while ((status = RDB_parse(parser)) == RDB_STATUS_WAIT_MORE_DATA); + + /* Verify dummy error code */ + assert_int_equal(RDB_getErrorCode(parser), 12345678); + + /* Measure elapsed time and verify it's within the expected range */ + time_t elapsedTime = time(NULL) - start_time; + int expectedTime = RECV_TIMEOUT_SECONDS * test_retries; + printf("Elapsed time: %ld, expected time: %d\n", elapsedTime, expectedTime); + assert_in_range(elapsedTime, expectedTime - 2, expectedTime + 2); + + RDB_deleteParser(parser); + close(client_fd); + close(server_fd); +} + /*************************** group_rdb_to_redis *******************************/ int group_rdb_to_redis(void) { @@ -580,6 +654,7 @@ int group_rdb_to_redis(void) { cmocka_unit_test_setup(test_rdb_to_redis_multiple_dbs, setupTest), cmocka_unit_test_setup(test_rdb_to_redis_function, setupTest), cmocka_unit_test_setup(test_rdb_to_redis_func_lib_replace_if_exist, setupTest), + //cmocka_unit_test_setup(test_rdb_tcp_timeout, setupTest), /* too long to run */ }; int res = cmocka_run_group_tests(tests, NULL, NULL);