Skip to content

Commit

Permalink
Merge pull request #51 from bugwheels94/beta
Browse files Browse the repository at this point in the history
Beta
  • Loading branch information
bugwheels94 authored May 6, 2024
2 parents 3e4bdf8 + f93adec commit d98c03c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
"husky": "^7.0.4",
"jest": "^29.6.4",
"lint-staged": "^12.3.7",
"nanoid": "^5.0.7",
"npm-run-all": "^4.1.5",
"prettier": "2.6.0",
"replace": "^1.2.1",
Expand Down
14 changes: 8 additions & 6 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import EventEmitter from 'events';
import { AllowedType, DataMapping, JsonObject, Serialize } from './utils';
import { SoxtendServer } from '.';
import { GROUPS_BY_CONNECTION_ID, SERVERS_HAVING_GROUP } from './constants';

import { nanoid } from 'nanoid';
export type ClientResponse = {
_id: number;
status: HttpStatusCode;
Expand All @@ -33,7 +33,7 @@ export type ClientPromiseStore = Record<
>;

export class Socket<DataSentOverWire extends AllowedType = 'string'> extends EventEmitter {
public readonly id: string = crypto.randomUUID();
public readonly id: string;
private serialize: Serialize<DataMapping<DataSentOverWire>>;
storage: Record<string, any> = {};
mode?: string | Uint8Array;
Expand All @@ -49,12 +49,12 @@ export class Socket<DataSentOverWire extends AllowedType = 'string'> extends Eve
public async initialize() {
const id = this.id;
this.server.individualSocketConnectionStore.add(this);
return this.server.distributor.set(`i:${id}`, this.server.serverId);
return this.server.distributor.set(`i:${id}`, this.server.id);
}
public async clear() {
const id = this.id;
this.server.individualSocketConnectionStore.remove(id);
return this.server.distributor.set(`i:${id}`, this.server.serverId);
return this.server.distributor.set(`i:${id}`, this.server.id);
}
constructor(
socket: WebSocket,
Expand All @@ -74,6 +74,8 @@ export class Socket<DataSentOverWire extends AllowedType = 'string'> extends Eve
this.mode = mode;
this.rawSocket = socket;
this.server = server;
this.id = this.server.id + nanoid();

// this.store = store;
this.initialize();
}
Expand All @@ -83,14 +85,14 @@ export class Socket<DataSentOverWire extends AllowedType = 'string'> extends Eve
this.server.socketGroupStore.add(this, groupId);
return Promise.all([
this.server.distributor.addListItem(`${GROUPS_BY_CONNECTION_ID}${this.id}`, groupId),
this.server.distributor.addListItem(`${SERVERS_HAVING_GROUP}${groupId}`, this.server.serverId),
this.server.distributor.addListItem(`${SERVERS_HAVING_GROUP}${groupId}`, this.server.id),
]);
}

async joinGroups(groupdIds: Iterable<string>) {
for (let groupId of groupdIds) {
this.server.socketGroupStore.add(this, groupId);
this.server.distributor.addListItem(`${SERVERS_HAVING_GROUP}${groupId}`, this.server.serverId);
this.server.distributor.addListItem(`${SERVERS_HAVING_GROUP}${groupId}`, this.server.id);
}
this.server.distributor.addListItems(`${GROUPS_BY_CONNECTION_ID}${this.id}`, groupdIds);
}
Expand Down
17 changes: 9 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const encoder = new TextEncoder();

const decoder = new TextDecoder();
export class SoxtendServer<MessageType extends AllowedType = 'string'> extends EventEmitter {
serverId: string;
id: string;
rawWebSocketServer: WebSocket.Server;
distributor: MessageDistributor<MessageType, any>;
eventStore: Record<
Expand All @@ -100,14 +100,14 @@ export class SoxtendServer<MessageType extends AllowedType = 'string'> extends E
}
if (!this.distributor) return;

const server = await this.distributor.get(`i:${id}`);
const serverId = id.slice(0, 21);
const groupArray = encoder.encode(id);
const messageWithGroupId = new Uint8Array(serializedMessage.length + groupArray.length + 1);
messageWithGroupId[0] = groupArray.length;
messageWithGroupId.set(groupArray, 1);
messageWithGroupId.set(serializedMessage, 1 + groupArray.length);
// @ts-ignore
this.distributor.enqueue(`${server}`, messageWithGroupId);
this.distributor.enqueue(`${serverId}`, messageWithGroupId);
}
private async sendMessageAsBufferToGroup(id: string, message: JsonObject) {
// this.socketGroupStore.find(id)?.forEach((socket) => {
Expand All @@ -132,11 +132,12 @@ export class SoxtendServer<MessageType extends AllowedType = 'string'> extends E
}
if (!this.distributor) return;

const server = await this.distributor.get(`i:${id}`);
const serverId = id.slice(0, 21);

const messageWithGroupId = id + ':' + serializedMessage;

//@ts-ignore
this.distributor.enqueue(`${server}`, messageWithGroupId);
this.distributor.enqueue(`${serverId}`, messageWithGroupId);
}
private async sendMessageAsStringToGroup(id: string, message: JsonObject) {
const serializedMessage = this.serialize(message) as string;
Expand Down Expand Up @@ -230,21 +231,21 @@ export class SoxtendServer<MessageType extends AllowedType = 'string'> extends E
this.sendToGroup = this.sendMessageAsStringToGroup;
}
this.distributor = distributor;
this.serverId = crypto.randomUUID();
this.id = crypto.randomUUID();
this.individualSocketConnectionStore = new IndividualSocketConnectionStore();
this.socketGroupStore = new SocketGroupStore<MessageType>();
this.rawWebSocketServer = new WebSocket.Server(options);

Promise.all([
this.distributor
? this.distributor.initialize(this.serverId, {
? this.distributor.initialize(this.id, {
messageType,
})
: undefined,
// options.messageStore ? options.messageStore.initialize(this.serverId) : undefined,
])
.then(() => {
this.listenToIndividualQueue(`${this.serverId}`);
this.listenToIndividualQueue(`${this.id}`);
this.listenToGroupQueue(`broadcast`);
this.emit('ready');
this.rawWebSocketServer.on('connection', (rawSocket: WebSocket) => {
Expand Down

0 comments on commit d98c03c

Please sign in to comment.