From d58353badd8e985e663573c780959db8ae3585bd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 05:55:52 -0700 Subject: [PATCH 1/6] wip --- src/conn.c | 90 +++++++++++++++++++++++++++++++++++------------------ src/nats.h | 12 +++++++ test/test.c | 5 +++ 3 files changed, 77 insertions(+), 30 deletions(-) diff --git a/src/conn.c b/src/conn.c index bbe5c2933..2387a4fb0 100644 --- a/src/conn.c +++ b/src/conn.c @@ -81,8 +81,14 @@ _processConnInit(natsConnection *nc); static void _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doCBs); -static bool -_processOpError(natsConnection *nc, natsStatus s, bool initialConnect); +static natsStatus +_tryReconnect(natsConnection *nc, natsStatus s, bool forcedReconnect, bool *started); + +static void +_maybeReconnect(natsConnection *nc, natsStatus s) { _tryReconnect(nc, s, false, NULL); } + +static natsStatus +_forceReconnect(natsConnection *nc, natsStatus s, bool *started) { return _tryReconnect(nc, s, true, started); } static natsStatus _flushTimeout(natsConnection *nc, int64_t timeout); @@ -2096,13 +2102,16 @@ _connect(natsConnection *nc) } // If not connected and retry asynchronously on failed connect + printf("<>/<> nc->status: %d retry: %d, has CB:%d\n", nc->status, retryOnFailedConnect, hasConnectedCb); if ((nc->status != NATS_CONN_STATUS_CONNECTED) && retryOnFailedConnect && hasConnectedCb) { natsConn_Unlock(nc); - if (_processOpError(nc, retSts, true)) + bool reconnectStarted = false; + s = _forceReconnect(nc, retSts, &reconnectStarted); + if ((s == NATS_OK) && reconnectStarted) { nats_clearLastError(); return NATS_NOT_YET_CONNECTED; @@ -2134,29 +2143,33 @@ _evStopPolling(natsConnection *nc) return s; } -// _processOpError handles errors from reading or parsing the protocol. +// _tryReconnect handles errors from reading or parsing the protocol, or forced +// reconnection. It will fire off a doReconnect thread if needed. // The lock should not be held entering this function. -static bool -_processOpError(natsConnection *nc, natsStatus s, bool initialConnect) +static natsStatus +_tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool *started) { + natsStatus s = NATS_OK; + natsConn_Lock(nc); - if (!initialConnect) + printf("<>/<> tryReconnect 1: forced:%d\n", forcedReconnect); + if (!forcedReconnect) { if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0)) { + printf("<>/<> tryReconnect 2: nothing to do\n"); natsConn_Unlock(nc); - return false; + return NATS_OK; } } // Do reconnect only if allowed and we were actually connected // or if we are retrying on initial failed connect. - if (initialConnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) + if (forcedReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) { - natsStatus ls = NATS_OK; - + printf("<>/<> tryReconnect 3: try: %d %d\n", nc->opts->allowReconnect, nc->status); // Set our new status nc->status = NATS_CONN_STATUS_RECONNECTING; @@ -2176,7 +2189,7 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) // on the socket since we are going to reconnect. if (nc->el.attached) { - ls = _evStopPolling(nc); + s = _evStopPolling(nc); natsSock_Close(nc->sockCtx.fd); nc->sockCtx.fd = NATS_SOCK_INVALID; @@ -2185,25 +2198,24 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) } // Fail pending flush requests. - if (ls == NATS_OK) + if (s == NATS_OK) _clearPendingFlushRequests(nc); // If option set, also fail pending requests. - if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect) + if ((s == NATS_OK) && nc->opts->failRequestsOnDisconnect) _clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED); // Create the pending buffer to hold all write requests while we try // to reconnect. - if (ls == NATS_OK) - ls = natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize); - if (ls == NATS_OK) + IFOK (s, natsBuf_Create(&(nc->pending), nc->opts->reconnectBufSize)); + if (s == NATS_OK) { nc->usePending = true; // Start the reconnect thread - ls = natsThread_Create(&(nc->reconnectThread), + s = natsThread_Create(&(nc->reconnectThread), _doReconnect, (void*) nc); } - if (ls == NATS_OK) + if (s == NATS_OK) { // We created the reconnect thread successfully, so retain // the connection. @@ -2211,20 +2223,26 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect) nc->inReconnect++; natsConn_Unlock(nc); - return true; + if (started != NULL) + *started = true; + + printf("<>/<> tryReconnect 4: OK\n"); + return NATS_OK; } } // reconnect not allowed or we failed to setup the reconnect code. + if (started != NULL) + *started = false; nc->status = NATS_CONN_STATUS_DISCONNECTED; - nc->err = s; + nc->err = newErr; natsConn_Unlock(nc); _close(nc, NATS_CONN_STATUS_CLOSED, false, true); - return false; + return NATS_UPDATE_ERR_STACK(s); } static void @@ -2267,7 +2285,7 @@ _readLoop(void *arg) s = natsParser_Parse(nc, buffer, n); if (s != NATS_OK) - _processOpError(nc, s, false); + _maybeReconnect(nc, s); natsConn_Lock(nc); } @@ -2396,7 +2414,7 @@ _processPingTimer(natsTimer *timer, void *arg) if (++(nc->pout) > nc->opts->maxPingsOut) { natsConn_Unlock(nc); - _processOpError(nc, NATS_STALE_CONNECTION, false); + _maybeReconnect(nc, NATS_STALE_CONNECTION); return; } @@ -2921,7 +2939,7 @@ natsConn_processErr(natsConnection *nc, char *buf, int bufLen) if (strcasecmp(error, STALE_CONNECTION) == 0) { - _processOpError(nc, NATS_STALE_CONNECTION, false); + _maybeReconnect(nc, NATS_STALE_CONNECTION); } else if (nats_strcasestr(error, PERMISSIONS_ERR) != NULL) { @@ -3306,9 +3324,9 @@ natsConn_create(natsConnection **newConn, natsOptions *options) natsStatus natsConnection_Connect(natsConnection **newConn, natsOptions *options) { - natsStatus s = NATS_OK; - natsConnection *nc = NULL; - natsOptions *opts = NULL; + natsStatus s = NATS_OK; + natsConnection *nc = NULL; + natsOptions *opts = NULL; if (options == NULL) { @@ -3333,6 +3351,18 @@ natsConnection_Connect(natsConnection **newConn, natsOptions *options) return NATS_UPDATE_ERR_STACK(s); } +natsStatus +natsConnection_Reconnect(natsConnection *nc) +{ + natsStatus s = NATS_OK; + + if (natsConnection_IsClosed(nc)) + return nats_setDefaultError(NATS_INVALID_ARG); + + IFOK(s, _forceReconnect(nc, NATS_OK, NULL)); + return NATS_UPDATE_ERR_STACK(s); +} + static natsStatus _processUrlString(natsOptions *opts, const char *urls) { @@ -4117,7 +4147,7 @@ natsConnection_ProcessReadEvent(natsConnection *nc) s = natsParser_Parse(nc, buffer, n); if (s != NATS_OK) - _processOpError(nc, s, false); + _maybeReconnect(nc, s); natsConn_release(nc); } @@ -4166,7 +4196,7 @@ natsConnection_ProcessWriteEvent(natsConnection *nc) natsConn_Unlock(nc); if (s != NATS_OK) - _processOpError(nc, s, false); + _maybeReconnect(nc, s); (void) NATS_UPDATE_ERR_STACK(s); } diff --git a/src/nats.h b/src/nats.h index 9dd2c9b0e..dc331e08f 100644 --- a/src/nats.h +++ b/src/nats.h @@ -4033,6 +4033,18 @@ stanMsg_Destroy(stanMsg *msg); NATS_EXTERN natsStatus natsConnection_Connect(natsConnection **nc, natsOptions *options); +/** \brief Causes the client to drop the connection to the current server and + * perform standard reconnection process. + * + * This means that all subscriptions and consumers should be resubscribed and + * their work resumed after successful reconnect where all reconnect options are + * respected. + * + * @param nc the pointer to the #natsConnection object. + */ +natsStatus +natsConnection_Reconnect(natsConnection *nc); + /** \brief Process a read event when using external event loop. * * When using an external event loop, and the callback indicating that diff --git a/test/test.c b/test/test.c index 070255638..71373709c 100644 --- a/test/test.c +++ b/test/test.c @@ -12047,12 +12047,17 @@ test_RequestTimeout(void) natsMsg *msg = NULL; natsPid serverPid = NATS_INVALID_PID; + printf("<>/<> 1\n"); serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); CHECK_SERVER_STARTED(serverPid); test("Test Request should timeout: ") s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + printf("<>/<> 2: %d\n", s); + IFOK(s, natsConnection_RequestString(&msg, nc, "foo", "bar", 500)); + printf("<>/<> 3: %d\n", s); + testCond(serverVersionAtLeast(2, 2, 0) ? (s == NATS_NO_RESPONDERS) : (s == NATS_TIMEOUT)); natsConnection_Destroy(nc); From 46236d818939eb3678ff701fad17be46f647f5dd Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 06:18:50 -0700 Subject: [PATCH 2/6] removed debug log --- src/conn.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/conn.c b/src/conn.c index 2387a4fb0..1f861a6ff 100644 --- a/src/conn.c +++ b/src/conn.c @@ -2102,7 +2102,6 @@ _connect(natsConnection *nc) } // If not connected and retry asynchronously on failed connect - printf("<>/<> nc->status: %d retry: %d, has CB:%d\n", nc->status, retryOnFailedConnect, hasConnectedCb); if ((nc->status != NATS_CONN_STATUS_CONNECTED) && retryOnFailedConnect && hasConnectedCb) @@ -2153,12 +2152,10 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool natsConn_Lock(nc); - printf("<>/<> tryReconnect 1: forced:%d\n", forcedReconnect); if (!forcedReconnect) { if (_isConnecting(nc) || natsConn_isClosed(nc) || (nc->inReconnect > 0)) { - printf("<>/<> tryReconnect 2: nothing to do\n"); natsConn_Unlock(nc); return NATS_OK; @@ -2169,7 +2166,6 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool // or if we are retrying on initial failed connect. if (forcedReconnect || (nc->opts->allowReconnect && (nc->status == NATS_CONN_STATUS_CONNECTED))) { - printf("<>/<> tryReconnect 3: try: %d %d\n", nc->opts->allowReconnect, nc->status); // Set our new status nc->status = NATS_CONN_STATUS_RECONNECTING; @@ -2226,7 +2222,6 @@ _tryReconnect(natsConnection *nc, natsStatus newErr, bool forcedReconnect, bool if (started != NULL) *started = true; - printf("<>/<> tryReconnect 4: OK\n"); return NATS_OK; } } From a5e99d3892db9cbc86135901f85cb880fd1555dc Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 06:22:58 -0700 Subject: [PATCH 3/6] removed whitespace --- src/conn.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/conn.c b/src/conn.c index 1f861a6ff..4a31d8921 100644 --- a/src/conn.c +++ b/src/conn.c @@ -3319,9 +3319,9 @@ natsConn_create(natsConnection **newConn, natsOptions *options) natsStatus natsConnection_Connect(natsConnection **newConn, natsOptions *options) { - natsStatus s = NATS_OK; - natsConnection *nc = NULL; - natsOptions *opts = NULL; + natsStatus s = NATS_OK; + natsConnection *nc = NULL; + natsOptions *opts = NULL; if (options == NULL) { From e5a31f3b30aa3704b9fc3c82216d7a7be033ee66 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 06:24:23 -0700 Subject: [PATCH 4/6] removed debug log --- test/test.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/test/test.c b/test/test.c index 71373709c..070255638 100644 --- a/test/test.c +++ b/test/test.c @@ -12047,17 +12047,12 @@ test_RequestTimeout(void) natsMsg *msg = NULL; natsPid serverPid = NATS_INVALID_PID; - printf("<>/<> 1\n"); serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); CHECK_SERVER_STARTED(serverPid); test("Test Request should timeout: ") s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); - printf("<>/<> 2: %d\n", s); - IFOK(s, natsConnection_RequestString(&msg, nc, "foo", "bar", 500)); - printf("<>/<> 3: %d\n", s); - testCond(serverVersionAtLeast(2, 2, 0) ? (s == NATS_NO_RESPONDERS) : (s == NATS_TIMEOUT)); natsConnection_Destroy(nc); From 8227d5fe082c3e62efcd6c8934300f623e853a6b Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 08:56:02 -0700 Subject: [PATCH 5/6] Added a test --- test/list.txt | 1 + test/test.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/test/list.txt b/test/list.txt index 60cee3e79..60be59288 100644 --- a/test/list.txt +++ b/test/list.txt @@ -86,6 +86,7 @@ ReconnectBufSize RetryOnFailedConnect NoPartialOnReconnect ReconnectFailsPendingRequests +ForcedReconnect ErrOnConnectAndDeadlock ErrOnMaxPayloadLimit Auth diff --git a/test/test.c b/test/test.c index 070255638..823350186 100644 --- a/test/test.c +++ b/test/test.c @@ -20202,6 +20202,74 @@ test_NoPartialOnReconnect(void) _stopServer(pid); } +static void +test_ForcedReconnect(void) +{ + natsStatus s; + struct threadArg arg; + natsOptions *opts = NULL; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + natsMsg *msg = NULL; + natsPid pid = NATS_INVALID_PID; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("unable to setup test"); + + test("Start server, connect, subscribe: "); + pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true); + CHECK_SERVER_STARTED(pid); + IFOK(s, natsOptions_Create(&opts)); + IFOK(s, natsOptions_SetReconnectedCB(opts, _reconnectedCb, &arg)); + IFOK(s, natsConnection_Connect(&nc, opts)); + IFOK(s, natsConnection_SubscribeSync(&sub, nc, "foo")); + testCond(s == NATS_OK); + + test("Send a message to foo: "); + s = natsMsg_Create(&msg, "foo", NULL, "bar", 3); + IFOK(s, natsConnection_PublishMsg(nc, msg)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Receive the message: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) && (msg != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + test("Forced reconnect: "); + s = natsConnection_Reconnect(nc); + testCond(s == NATS_OK); + + test("Waiting for reconnect: "); + natsMutex_Lock(arg.m); + while ((s != NATS_TIMEOUT) && !arg.reconnected) + s = natsCondition_TimedWait(arg.c, arg.m, 5000); + arg.reconnected = false; + natsMutex_Unlock(arg.m); + testCond(s == NATS_OK); + + test("Send a message to foo: "); + s = natsMsg_Create(&msg, "foo", NULL, "bar", 3); + IFOK(s, natsConnection_PublishMsg(nc, msg)); + testCond(s == NATS_OK); + natsMsg_Destroy(msg); + msg = NULL; + + test("Receive the message: "); + s = natsSubscription_NextMsg(&msg, sub, 1000); + testCond((s == NATS_OK) && (msg != NULL)); + natsMsg_Destroy(msg); + msg = NULL; + + natsSubscription_Destroy(sub); + sub = NULL; + natsConnection_Destroy(nc); + nc = NULL; +} + static void _stopServerInThread(void *closure) { @@ -36210,6 +36278,7 @@ static testInfo allTests[] = {"RetryOnFailedConnect", test_RetryOnFailedConnect}, {"NoPartialOnReconnect", test_NoPartialOnReconnect}, {"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest}, + {"ForcedReconnect", test_ForcedReconnect}, {"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock}, {"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit}, From 27fd5dfbcc81b0f7faf0a1b97ee3237bc92ce425 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Fri, 3 May 2024 09:10:27 -0700 Subject: [PATCH 6/6] Oops forgot to free opts/threadargs from last edit --- test/test.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test.c b/test/test.c index 823350186..c22f0fc87 100644 --- a/test/test.c +++ b/test/test.c @@ -20265,9 +20265,9 @@ test_ForcedReconnect(void) msg = NULL; natsSubscription_Destroy(sub); - sub = NULL; natsConnection_Destroy(nc); - nc = NULL; + natsOptions_Destroy(opts); + _destroyDefaultThreadArgs(&arg); } static void