diff --git a/docker-compose.yml b/docker-compose.yml index 1d1e5a4..76cdcc5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.8' services: centrifugo: - image: centrifugo/centrifugo:v6.0.0 + image: centrifugo/centrifugo:v6 command: - centrifugo ports: @@ -13,4 +13,5 @@ services: - CENTRIFUGO_SSE_ENABLED=true - CENTRIFUGO_CHANNEL_WITHOUT_NAMESPACE_PRESENCE=true - CENTRIFUGO_CLIENT_CONCURRENCY=8 - - CENTRIFUGO_LOG_LEVEL=debug + - CENTRIFUGO_LOG_LEVEL=trace + - CENTRIFUGO_CLIENT_TOKEN_HMAC_SECRET_KEY=secret diff --git a/src/centrifuge.test.ts b/src/centrifuge.test.ts index 8b81a9c..6504289 100644 --- a/src/centrifuge.test.ts +++ b/src/centrifuge.test.ts @@ -552,3 +552,96 @@ test.each(websocketOnly)("%s: reconnect after close before transport open", asyn await disconnectedPromise; expect(c.state).toBe(State.Disconnected); }); + +test.each(transportCases)("%s: subscribes and unsubscribes from many subs", async (transport, endpoint) => { + const c = new Centrifuge([{ + transport: transport as TransportName, + endpoint: endpoint, + }], { + websocket: WebSocket, + fetch: fetch, + eventsource: EventSource, + readableStream: ReadableStream, + emulationEndpoint: 'http://localhost:8000/emulation', + // debug: true + }); + // Keep an array of promises so that we can wait for each subscription's 'unsubscribed' event. + const unsubscribedPromises: Promise[] = []; + + const channels = [ + 'test1', + 'test2', + 'test3', + 'test4', + 'test5', + ]; + + // Subscription tokens for anonymous users without ttl. Using an HMAC secret key used in tests ("secret"). + const testTokens = { + 'test1': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzNDgsImNoYW5uZWwiOiJ0ZXN0MSJ9.eqPQxbBtyYxL8Hvbkm-P6aH7chUsSG_EMWe-rTwF_HI", + 'test2': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzODcsImNoYW5uZWwiOiJ0ZXN0MiJ9.tTJB3uSa8XpEmCvfkmrSKclijofnJ5RkQk6L2SaGtUE", + 'test3': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzIzOTgsImNoYW5uZWwiOiJ0ZXN0MyJ9.nyLcMrIot441CszOKska7kQIjo2sEm8pSxV1XWfNCsI", + 'test4': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzI0MDksImNoYW5uZWwiOiJ0ZXN0NCJ9.wWAX2AhJX6Ep4HVexQWSVF3-cWytVhzY9Pm7QsMdCsI", + 'test5': "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Mzc1MzI0MTgsImNoYW5uZWwiOiJ0ZXN0NSJ9.hCSfpHYws5TXLKkN0bW0DU6C-wgEUNuhGaIy8W1sT9o" + } + + c.connect(); + + const subscriptions: any[] = []; + + for (const channel of channels) { + const sub = c.newSubscription(channel, { + getToken: async function () { + // Sleep for a random time between 0 and 100 milliseconds to emulate network. + const sleep = (ms: any) => new Promise(resolve => setTimeout(resolve, ms)); + await sleep(Math.random() * 100); + return testTokens[channel]; + } + }); + + // Create a promise for the 'unsubscribed' event of this subscription. + const unsubPromise = new Promise((resolve) => { + sub.on("unsubscribed", (ctx) => { + resolve(ctx); + }); + }); + unsubscribedPromises.push(unsubPromise); + + // Actually subscribe. + sub.subscribe(); + subscriptions.push(sub); + } + + // Wait until all subscriptions are in the Subscribed state. + await Promise.all( + subscriptions.map(async (sub) => { + await sub.ready(5000); + expect(sub.state).toBe(SubscriptionState.Subscribed); + }) + ); + + // The client itself should be connected now. + expect(c.state).toBe(State.Connected); + + // Unsubscribe from all and then disconnect. + subscriptions.forEach((sub) => { + sub.unsubscribe(); + }); + c.disconnect(); + + // Wait until all 'unsubscribed' events are received. + const unsubscribedContexts = await Promise.all(unsubscribedPromises); + + // Confirm each subscription is now Unsubscribed. + subscriptions.forEach((sub) => { + expect(sub.state).toBe(SubscriptionState.Unsubscribed); + }); + + // The client should be disconnected. + expect(c.state).toBe(State.Disconnected); + + // Assert the correct unsubscribe code for each subscription. + unsubscribedContexts.forEach((ctx) => { + expect(ctx.code).toBe(unsubscribedCodes.unsubscribeCalled); + }); +}); \ No newline at end of file