Skip to content

Commit

Permalink
[FIXED] GH-823 deadlock in js_MaybeFetchMore (#834)
Browse files Browse the repository at this point in the history
* Added a test to reproduce

* Added the fix

* PR feedback: use a malloc-ed buffer instead of shared
  • Loading branch information
levb authored Feb 3, 2025
1 parent c4565fc commit 7220e9f
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 13 deletions.
26 changes: 17 additions & 9 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1814,7 +1814,7 @@ js_checkFetchedMsg(natsSubscription *sub, natsMsg *msg, uint64_t fetchID, bool c
}

static natsStatus
_sendPullRequest(natsConnection *nc, const char *subj, const char *rply,
_publishPullRequest(natsConnection *nc, const char *subj, const char *rply,
natsBuffer *buf, jsFetchRequest *req)
{
natsStatus s;
Expand Down Expand Up @@ -1990,7 +1990,7 @@ _fetch(natsMsgList *list, natsSubscription *sub, jsFetchRequest *req, bool simpl
req->Batch = req->Batch - count;
req->Expires = NATS_MILLIS_TO_NANOS(timeout);
req->NoWait = noWait;
s = _sendPullRequest(nc, subj, rply, &buf, req);
s = _publishPullRequest(nc, subj, rply, &buf, req);
}
IFOK(s, natsSub_nextMsg(&msg, sub, timeout, true));
if (s == NATS_OK)
Expand Down Expand Up @@ -2908,6 +2908,7 @@ natsStatus
js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch)
{
jsFetchRequest req = {.Expires = 0};

if (fetch->opts.NextHandler == NULL)
return NATS_OK;

Expand All @@ -2922,27 +2923,34 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch)
req.NoWait = fetch->opts.NoWait;
req.Heartbeat = fetch->opts.Heartbeat * 1000 * 1000; // ns, go time.Duration

size_t replySubjectSize = 1 + strlen(sub->subject) + 20;
char *replySubject = NATS_MALLOC(replySubjectSize);
if (replySubject == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

char buffer[128];
natsBuffer buf;
natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer));

nats_lockSubAndDispatcher(sub);

jsSub *jsi = sub->jsi;
jsi->inFetch = true;
jsi->fetchID++;
snprintf(fetch->replySubject, sizeof(fetch->replySubject), "%.*s%" PRIu64,
snprintf(replySubject, replySubjectSize, "%.*s%" PRIu64,
(int)strlen(sub->subject) - 1, sub->subject, // exclude the last '*'
jsi->fetchID);
natsStatus s = _sendPullRequest(sub->conn, jsi->nxtMsgSubj, fetch->replySubject, &buf, &req);
nats_unlockSubAndDispatcher(sub);

natsStatus s = _publishPullRequest(sub->conn, jsi->nxtMsgSubj, replySubject, &buf, &req);
if (s == NATS_OK)
{
nats_lockSubAndDispatcher(sub);
fetch->requestedMsgs += req.Batch;
nats_unlockSubAndDispatcher(sub);
}

nats_unlockSubAndDispatcher(sub);

natsBuf_Destroy(&buf);
NATS_FREE(replySubject);
return NATS_UPDATE_ERR_STACK(s);
}

Expand Down Expand Up @@ -3085,14 +3093,14 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject,
natsTimer_Reset(jsi->hbTimer, dur);
}

nats_unlockSubAndDispatcher(sub);

if (s == NATS_OK)
{
// Send the first fetch request.
s = js_maybeFetchMore(sub, fetch);
}

nats_unlockSubAndDispatcher(sub);

if (s != NATS_OK)
{
natsSubscription_Destroy(sub);
Expand Down
3 changes: 0 additions & 3 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,6 @@ typedef struct __jsFetch
// Timer for the fetch expiration. We leverage the existing jsi->hbTimer for
// checking missed heartbeats.
natsTimer *expiresTimer;

// Matches jsi->fetchID
char replySubject[NATS_DEFAULT_INBOX_PRE_LEN + NUID_BUFFER_LEN + 32]; // big enough for {INBOX}.number
} jsFetch;

typedef struct __jsSub
Expand Down
3 changes: 2 additions & 1 deletion test/list_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ _test(IPResolutionOrder)
_test(IsClosed)
_test(IsReconnectingAndStatus)
_test(IsValidSubscriber)
_test(JetStream_GH823)
_test(JetStreamBackOffRedeliveries)
_test(JetStreamContext)
_test(JetStreamContextDomain)
Expand Down Expand Up @@ -260,8 +261,8 @@ _test(SSLMultithreads)
_test(SSLReconnectWithAuthError)
_test(SSLServerNameIndication)
_test(SSLSkipServerVerification)
_test(SSLVerificationCallback)
_test(SSLSocketLeakWithEventLoop)
_test(SSLVerificationCallback)
_test(SSLVerify)
_test(SSLVerifyHostname)
_test(StaleConnection)
Expand Down
83 changes: 83 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -29089,6 +29089,13 @@ _recvPullAsync(natsConnection *nc, natsSubscription *sub, natsMsg *msg,
natsMutex_Unlock(arg->m);
}

static void
_recvPullAsyncNoop(natsConnection *nc, natsSubscription *sub, natsMsg *msg,
void *closure)
{
natsMsg_Destroy(msg);
}

static void
_completePullAsync(natsConnection *nc, natsSubscription *sub, natsStatus exitStatus,
void *closure)
Expand Down Expand Up @@ -29151,6 +29158,82 @@ _testBatchCompleted(struct threadArg *args, natsSubscription *sub, natsStatus ex
return result;
}


static bool _GH823_nextHandler(int *messages, int64_t *maxBytes, natsSubscription *sub, void *closure)
{
*messages = 5;
return true;
}

void test_JetStream_GH823(void)
{
natsStatus s = NATS_OK;
jsErrCode jerr = 0;
jsStreamConfig sc;
struct threadArg args;
const int numMsgs = 5000;
natsConnection *ncSub = NULL;
jsCtx *jsSub = NULL;

JS_SETUP(2, 9, 2);

s = _createDefaultThreadArgsForCbTests(&args);
if (s != NATS_OK)
FAIL("Unable to setup test");

test("Create stream for foo, bar: ");
jsStreamConfig_Init(&sc);
sc.Name = "TEST";
sc.Subjects = (const char *[2]){"foo","bar"};
sc.SubjectsLen = 2;
s = js_AddStream(NULL, js, &sc, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Publish thousands of test messages: ");
for (int i=0; i<numMsgs; i++)
{
char buf[64];
snprintf(buf, sizeof(buf), "hello-foo-%d", i);
s = js_Publish(NULL, js, "foo", buf, (int) strlen(buf), NULL, &jerr);
if (s != NATS_OK)
break;
snprintf(buf, sizeof(buf), "hello-bar-%d", i);
s = js_Publish(NULL, js, "bar", buf, (int) strlen(buf), NULL, &jerr);
if (s != NATS_OK)
break;
}
testCond(s == NATS_OK);

test("Make a separate connection for subscribers: ");
s = natsConnection_Connect(&ncSub, NULL);
if (s == NATS_OK)
s = natsConnection_JetStream(&jsSub, ncSub, NULL);
testCond(s == NATS_OK);

test("Create the first async pull subscriber and start receiving: ");
natsSubscription *sub1 = NULL;
jsOptions so;
jsOptions_Init(&so);
so.PullSubscribeAsync.NextHandler = _GH823_nextHandler;
s = js_PullSubscribeAsync(&sub1, jsSub, "foo", NULL, _recvPullAsyncNoop, &args, &so, NULL, &jerr);
testCond(s == NATS_OK);

test("Create the second async pull subscriber and start receiving: ");
natsSubscription *sub2 = NULL;
s = js_PullSubscribeAsync(&sub2, jsSub, "bar", NULL, _recvPullAsyncNoop, &args, &so, NULL, &jerr);
testCond(s == NATS_OK);

natsSubscription_Destroy(sub1);
natsSubscription_Destroy(sub2);
jsCtx_Destroy(jsSub);
natsConnection_Destroy(ncSub);

JS_TEARDOWN
_destroyDefaultThreadArgs(&args);
}



void test_JetStreamSubscribePullAsync(void)
{
natsStatus s;
Expand Down

0 comments on commit 7220e9f

Please sign in to comment.