Skip to content
This repository has been archived by the owner on Sep 9, 2023. It is now read-only.

WIP: Decorators - Load Balance #204

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/abstract-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const { EventEmitter } = require('events');

const CHANNELS = [
'connecting',
'connected',
'disconnected',
'reconnecting',
'closing',
'closed',
'error',
'trade',
'ticker',
'candle',
'l2update',
'l2snapshot',
];

/**
* AbstractClient will expose all the methods that need to be populated by specific implementations.
* Also it have some utils and should had the repeated code across clients.
*/
class AbstractClient extends EventEmitter {
static relay(source, dest, channels = CHANNELS) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I like this pattern for forward event emission.

for (const channel of channels) {
source.on(channel, (...args) => dest.emit(channel, ...args));
}
}

static destroy(source, channels = CHANNELS) {
for (const channel of channels) {
source.removeAllListeners(channel);
}
}

subscribeTicker(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

unsubscribeTicker(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

subscribeCandles(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

unsubscribeCandles(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

subscribeTrades(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

unsubscribeTrades(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

subscribeLevel2Snapshots(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

unsubscribeLevel2Snapshots(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

subscribeLevel2Updates(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

unsubscribeLevel2Updates(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

subscribeLevel3Updates(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}

unsubscribeLevel3Updates(market) {
throw new TypeError(`Not implemented. Market: ${JSON.stringify(market)}`);
}
}

module.exports = {
CHANNELS,
AbstractClient,
};
39 changes: 21 additions & 18 deletions src/basic-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,38 @@ class BasicTradeClient extends EventEmitter {
}

subscribeTicker(market) {
if (!this.hasTickers) return;
if (!this.hasTickers) return false;
return this._subscribe(market, this._tickerSubs, this._sendSubTicker.bind(this));
}

unsubscribeTicker(market) {
if (!this.hasTickers) return;
this._unsubscribe(market, this._tickerSubs, this._sendUnsubTicker.bind(this));
if (!this.hasTickers) return false;
return this._unsubscribe(market, this._tickerSubs, this._sendUnsubTicker.bind(this));
}

subscribeCandles(market) {
if (!this.hasCandles) return;
if (!this.hasCandles) return false;
return this._subscribe(market, this._candleSubs, this._sendSubCandles.bind(this));
}

unsubscribeCandles(market) {
if (!this.hasCandles) return;
this._unsubscribe(market, this._candleSubs, this._sendUnsubCandles.bind(this));
if (!this.hasCandles) return false;
return this._unsubscribe(market, this._candleSubs, this._sendUnsubCandles.bind(this));
}

subscribeTrades(market) {
if (!this.hasTrades) return;
console.log('Basic client', market);
if (!this.hasTrades) return false;
return this._subscribe(market, this._tradeSubs, this._sendSubTrades.bind(this));
}

unsubscribeTrades(market) {
if (!this.hasTrades) return;
this._unsubscribe(market, this._tradeSubs, this._sendUnsubTrades.bind(this));
if (!this.hasTrades) return false;
return this._unsubscribe(market, this._tradeSubs, this._sendUnsubTrades.bind(this));
}

subscribeLevel2Snapshots(market) {
if (!this.hasLevel2Snapshots) return;
if (!this.hasLevel2Snapshots) return false;
return this._subscribe(
market,
this._level2SnapshotSubs,
Expand All @@ -97,28 +98,28 @@ class BasicTradeClient extends EventEmitter {
}

unsubscribeLevel2Snapshots(market) {
if (!this.hasLevel2Snapshots) return;
this._unsubscribe(market, this._level2SnapshotSubs, this._sendUnsubLevel2Snapshots.bind(this));
if (!this.hasLevel2Snapshots) return false;
return this._unsubscribe(market, this._level2SnapshotSubs, this._sendUnsubLevel2Snapshots.bind(this));
}

subscribeLevel2Updates(market) {
if (!this.hasLevel2Updates) return;
if (!this.hasLevel2Updates) return false;
return this._subscribe(market, this._level2UpdateSubs, this._sendSubLevel2Updates.bind(this));
}

unsubscribeLevel2Updates(market) {
if (!this.hasLevel2Updates) return;
this._unsubscribe(market, this._level2UpdateSubs, this._sendUnsubLevel2Updates.bind(this));
if (!this.hasLevel2Updates) return false;
return this._unsubscribe(market, this._level2UpdateSubs, this._sendUnsubLevel2Updates.bind(this));
}

subscribeLevel3Updates(market) {
if (!this.hasLevel3Updates) return;
if (!this.hasLevel3Updates) return false;
return this._subscribe(market, this._level3UpdateSubs, this._sendSubLevel3Updates.bind(this));
}

unsubscribeLevel3Updates(market) {
if (!this.hasLevel3Updates) return;
this._unsubscribe(market, this._level3UpdateSubs, this._sendUnsubLevel3Updates.bind(this));
if (!this.hasLevel3Updates) return false;
return this._unsubscribe(market, this._level3UpdateSubs, this._sendUnsubLevel3Updates.bind(this));
}

////////////////////////////////////////////
Expand Down Expand Up @@ -168,7 +169,9 @@ class BasicTradeClient extends EventEmitter {
if (this._wss.isConnected) {
sendFn(remote_id, market);
}
return true;
}
return false;
}

/**
Expand Down
85 changes: 85 additions & 0 deletions src/decorators/client-with-counter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
const { AbstractClient } = require('../abstract-client');

const EnumDirection = {
ADD: 0x00,
SUB: 0x01,
};

class ClientWithCounter extends AbstractClient {
constructor(client) {
super();
this._client = client;
this._counter = 0;

// Relay events from original client
AbstractClient.relay(this._client, this);
}

_checkCounter(direction, isOk) {
if (isOk) {
const n = (direction === EnumDirection.ADD) ? 1 : -1;
this._counter = this._counter + n;
}
if (this._counter <= 0) {
// Remove relay listeners before close.
AbstractClient.destroy(this);
this._client.close();
}
}

getCounter() {
return this._counter;
}

subscribeTicker(market) {
return this._checkCounter(EnumDirection.ADD, this._client.subscribeTicker(market));
}

unsubscribeTicker(market) {
return this._checkCounter(EnumDirection.SUB, this._client.unsubscribeTicker(market));
}

subscribeCandles(market) {
return this._checkCounter(EnumDirection.ADD, this._client.subscribeCandles(market));
}

unsubscribeCandles(market) {
return this._checkCounter(EnumDirection.SUB, this._client.unsubscribeCandles(market));
}

subscribeTrades(market) {
return this._checkCounter(EnumDirection.ADD, this._client.subscribeTrades(market));
}

unsubscribeTrades(market) {
return this._checkCounter(EnumDirection.SUB, this._client.unsubscribeTrades(market));
}

subscribeLevel2Snapshots(market) {
return this._checkCounter(EnumDirection.ADD, this._client.subscribeLevel2Snapshots(market));
}

unsubscribeLevel2Snapshots(market) {
return this._checkCounter(EnumDirection.SUB, this._client.unsubscribeLevel2Snapshots(market));
}

subscribeLevel2Updates(market) {
return this._checkCounter(EnumDirection.ADD, this._client.subscribeLevel2Updates(market));
}

unsubscribeLevel2Updates(market) {
return this._checkCounter(EnumDirection.SUB, this._client.unsubscribeLevel2Updates(market));
}

subscribeLevel3Updates(market) {
return this._checkCounter(EnumDirection.ADD, this._client.subscribeLevel3Updates(market));
}

unsubscribeLevel3Updates(market) {
return this._checkCounter(EnumDirection.SUB, this._client.unsubscribeLevel3Updates(market));
}
}

module.exports = {
ClientWithCounter,
};
7 changes: 7 additions & 0 deletions src/decorators/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const ClientWithCounter = require('./client-with-counter');
const balanced = require('./load-balance');

module.exports = {
...ClientWithCounter,
balanced,
};
7 changes: 7 additions & 0 deletions src/decorators/load-balance/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const LoadBalanceClient = require('./load-balance-client');
const strategies = require('./strategies');

module.exports = {
...LoadBalanceClient,
strategies,
};
89 changes: 89 additions & 0 deletions src/decorators/load-balance/load-balance-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const { AbstractClient } = require('../../abstract-client');
const { FillHolesStrategy } = require('./strategies');

const EnumAction = {
ADD: 0x00,
SUB: 0x01,
};

/**
* LoadBalanceClient create clients on-the-fly with a maximum subscriptions
*/
class LoadBalanceClient extends AbstractClient {
/**
* @param {{ clientFactory: (clientOptions) => Client}, maxSubscriptions: number, strategy: (clientFactory) => Strategy }} options
* @param {clientOptions: any} clientOptions
*/
constructor(options, clientArgs) {
super();
this._options = options;
this._options.clientArgs = clientArgs;
this._strategy = typeof options.strategy === 'function' ? options.strategy(this._options) : new FillHolesStrategy(this._options);

this._clients = new WeakMap();
}

static create(clientFactory, options) {
return class extends LoadBalanceClient {
constructor(...clientArgs) {
super({ ...options, clientFactory }, clientArgs);
}
};
}

_resolve(action, market, fn) {
const method = action === EnumAction.ADD ? 'take' : 'leave';
const client = this._strategy[method](market.id);

// Unsubscribe could be undefined if we doesnt subscribed yet.
if (!client) {
return false;
}

// On subscribe, if client is new, relay their events.
// Destroy must handled Strategy when leaves all channels.
if (action === EnumAction.ADD && !this._clients.has(client)) {
this._clients.set(client, true);
AbstractClient.relay(client, this);
}

return fn(client);
}

_subscribe(market, fn) {
return this._resolve(EnumAction.ADD, market, fn);
}

_unsubscribe(market, fn) {
return this._resolve(EnumAction.SUB, market, fn);
}

subscribeTicker(market) {
return this._subscribe(market, client => client.subscribeTicker(market));
}

unsubscribeTicker(market) {
return this._unsubscribe(market, client => client.unsubscribeTicker(market));
}

subscribeCandles(market) {
return this._subscribe(market, client => client.subscribeCandles(market));
}

unsubscribeCandles(market) {
return this._unsubscribe(market, client => client.unsubscribeCandles(market));
}

subscribeTrades(market) {
return this._subscribe(market, client => client.subscribeTrades(market));
}

unsubscribeTrades(market) {
return this._unsubscribe(market, client => client.unsubscribeTrades(market));
}
}


module.exports = {
LoadBalanceClient
};
18 changes: 18 additions & 0 deletions src/decorators/load-balance/strategies/abstract-strategy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class Strategy {
constructor(options) {
this._options = options;
this._clientFactory = options.clientFactory;
}

take() {
throw new Error('not implemented');
}

leave() {
throw new Error('not implemented');
}
}

module.exports = {
Strategy,
};
Loading