Skip to content

Commit

Permalink
Fix race causing duplicate subscribe requests (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Jan 23, 2025
1 parent 032ef72 commit 08c3551
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 10 deletions.
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3.8'

services:
centrifugo:
image: centrifugo/centrifugo:v6.0.0
image: centrifugo/centrifugo:v6
command:
- centrifugo
ports:
Expand All @@ -13,4 +13,5 @@ services:
- CENTRIFUGO_SSE_ENABLED=true
- CENTRIFUGO_CHANNEL_WITHOUT_NAMESPACE_PRESENCE=true
- CENTRIFUGO_CLIENT_CONCURRENCY=8
- CENTRIFUGO_LOG_LEVEL=debug
- CENTRIFUGO_LOG_LEVEL=trace
- CENTRIFUGO_CLIENT_TOKEN_HMAC_SECRET_KEY=secret
127 changes: 122 additions & 5 deletions src/centrifuge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,40 @@ test.each(transportCases)("%s: subscribe and unsubscribe loop", async (transport

sub.subscribe()
const presenceStats = await sub.presenceStats();
expect(presenceStats.numClients).toBe(1)
expect(presenceStats.numClients).toBe(1);
expect(presenceStats.numUsers).toBe(1);
const presence = await sub.presence();
expect(Object.keys(presence.clients).length).toBe(1)
await sub.unsubscribe()
const presenceStats2 = await c.presenceStats('test');
expect(presenceStats2.numClients).toBe(0)

const retryWithDelay = async (fn, validate, maxRetries, delay) => {
for (let i = 0; i < maxRetries; i++) {
const result = await fn();
if (validate(result)) {
return result;
}
await new Promise(resolve => setTimeout(resolve, delay));
}
throw new Error("Validation failed after retries");
};

const presenceStats2 = await retryWithDelay(
() => c.presenceStats('test'),
(stats: any) => stats.numClients === 0 && stats.numUsers === 0,
3,
2000
);

const presence2 = await retryWithDelay(
() => c.presence('test'),
(presence: any) => Object.keys(presence.clients).length === 0,
3,
2000
);

expect(presenceStats2.numClients).toBe(0);
expect(presenceStats2.numUsers).toBe(0);
const presence2 = await c.presence('test');
expect(Object.keys(presence2.clients).length).toBe(0)
expect(Object.keys(presence2.clients).length).toBe(0);

let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
Expand Down Expand Up @@ -552,3 +576,96 @@ test.each(websocketOnly)("%s: reconnect after close before transport open", asyn
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});

test.each(transportCases)("%s: subscribes and unsubscribes from many subs", async (transport, endpoint) => {
const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation',
// debug: true
});
// Keep an array of promises so that we can wait for each subscription's 'unsubscribed' event.
const unsubscribedPromises: Promise<UnsubscribedContext>[] = [];

const channels = [
'test1',
'test2',
'test3',
'test4',
'test5',
];

// Subscription tokens for anonymous users without ttl. Using an HMAC secret key used in tests ("secret").
const testTokens = {
'test1': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzNDgsImNoYW5uZWwiOiJ0ZXN0MSJ9.eqPQxbBtyYxL8Hvbkm-P6aH7chUsSG_EMWe-rTwF_HI",
'test2': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzODcsImNoYW5uZWwiOiJ0ZXN0MiJ9.tTJB3uSa8XpEmCvfkmrSKclijofnJ5RkQk6L2SaGtUE",
'test3': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzOTgsImNoYW5uZWwiOiJ0ZXN0MyJ9.nyLcMrIot441CszOKska7kQIjo2sEm8pSxV1XWfNCsI",
'test4': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzI0MDksImNoYW5uZWwiOiJ0ZXN0NCJ9.wWAX2AhJX6Ep4HVexQWSVF3-cWytVhzY9Pm7QsMdCsI",
'test5': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzI0MTgsImNoYW5uZWwiOiJ0ZXN0NSJ9.hCSfpHYws5TXLKkN0bW0DU6C-wgEUNuhGaIy8W1sT9o"
}

c.connect();

const subscriptions: any[] = [];

for (const channel of channels) {
const sub = c.newSubscription(channel, {
getToken: async function () {
// Sleep for a random time between 0 and 100 milliseconds to emulate network.
const sleep = (ms: any) => new Promise(resolve => setTimeout(resolve, ms));
await sleep(Math.random() * 100);
return testTokens[channel];
}
});

// Create a promise for the 'unsubscribed' event of this subscription.
const unsubPromise = new Promise<UnsubscribedContext>((resolve) => {
sub.on("unsubscribed", (ctx) => {
resolve(ctx);
});
});
unsubscribedPromises.push(unsubPromise);

// Actually subscribe.
sub.subscribe();
subscriptions.push(sub);
}

// Wait until all subscriptions are in the Subscribed state.
await Promise.all(
subscriptions.map(async (sub) => {
await sub.ready(5000);
expect(sub.state).toBe(SubscriptionState.Subscribed);
})
);

// The client itself should be connected now.
expect(c.state).toBe(State.Connected);

// Unsubscribe from all and then disconnect.
subscriptions.forEach((sub) => {
sub.unsubscribe();
});
c.disconnect();

// Wait until all 'unsubscribed' events are received.
const unsubscribedContexts = await Promise.all(unsubscribedPromises);

// Confirm each subscription is now Unsubscribed.
subscriptions.forEach((sub) => {
expect(sub.state).toBe(SubscriptionState.Unsubscribed);
});

// The client should be disconnected.
expect(c.state).toBe(State.Disconnected);

// Assert the correct unsubscribe code for each subscription.
unsubscribedContexts.forEach((ctx) => {
expect(ctx.code).toBe(unsubscribedCodes.unsubscribeCalled);
});
});
7 changes: 4 additions & 3 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,11 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
// we also need to check for transport state before sending subscription
// because it may change for subscription with side effects (getData, getToken options)
// @ts-ignore – we are hiding some symbols from public API autocompletion.
if (!this._centrifuge._transportIsOpen) {
if (!this._centrifuge._transportIsOpen || this._inflight) {
return null;
}
this._inflight = true;

const channel = this.channel;

const req: any = {
Expand Down Expand Up @@ -402,8 +404,6 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S

const cmd = { subscribe: req };

this._inflight = true;

// @ts-ignore – we are hiding some symbols from public API autocompletion.
this._centrifuge._call(cmd).then(resolveCtx => {
this._inflight = false;
Expand Down Expand Up @@ -464,6 +464,7 @@ export class Subscription extends (EventEmitter as new () => TypedEventEmitter<S
}
this._clearSubscribingState();
}
this._inflight = false;
if (this._setState(SubscriptionState.Unsubscribed)) {
this.emit('unsubscribed', { channel: this.channel, code: code, reason: reason });
}
Expand Down

0 comments on commit 08c3551

Please sign in to comment.