Skip to content

Commit

Permalink
Merge pull request #373 from ZIMkaRU/feature/add-interruption-ability…
Browse files Browse the repository at this point in the history
…-in-case-rate-limit

Add interruption ability in case rate limit
  • Loading branch information
ezewer authored Jun 21, 2024
2 parents 2929356 + 2303e7e commit aedfaa7
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 49 deletions.
2 changes: 1 addition & 1 deletion workers/loc.api/bfx.api.router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { decorateInjectable } = require('../di/utils')

class BfxApiRouter {
// Method does an idle job to be overridden in framework mode
route (methodName, method) {
route (methodName, method, interrupter) {
return method()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ module.exports = (
const { res } = await getDataFromApi({
getData: rService[name].bind(rService),
args,
callerName: 'CSV_WRITER'
callerName: 'CSV_WRITER',
shouldNotInterrupt: true
})

wStream.setMaxListeners(50)
Expand Down
1 change: 1 addition & 0 deletions workers/loc.api/helpers/get-data-from-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ module.exports = (
* due to an internet connection issue
*/
_args.shouldNotBeLoggedToStdErrorStream = true
_args.interrupter = _interrupter

if (
typeof getData === 'string' &&
Expand Down
41 changes: 32 additions & 9 deletions workers/loc.api/helpers/get-rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,25 @@ const _bfxFactory = (conf) => {
})
}

const _route = (bfxApiRouter, methodName, args) => {
const _route = (bfxApiRouter, methodName, args, interrupter) => {
if (!(bfxApiRouter instanceof BfxApiRouter)) {
return Reflect.apply(...args)
}

return bfxApiRouter.route(
methodName,
() => Reflect.apply(...args)
() => Reflect.apply(...args),
interrupter
)
}

const _asyncApplyHook = async (bfxApiRouter, incomingRes, propKey, ...args) => {
const _asyncApplyHook = async (
bfxApiRouter,
incomingRes,
propKey,
args,
interrupter
) => {
let attemptsCount = 0
let caughtErr = null

Expand All @@ -70,7 +77,8 @@ const _asyncApplyHook = async (bfxApiRouter, incomingRes, propKey, ...args) => {
const res = await _route(
bfxApiRouter,
propKey,
args
args,
interrupter
)

return res
Expand Down Expand Up @@ -99,7 +107,11 @@ const _isNotPromiseOrBluebird = (instance) => (
)
)

const _getRestProxy = (rest, bfxApiRouter) => {
const _getRestProxy = (rest, deps) => {
const {
bfxApiRouter,
interrupter
} = deps
return new Proxy(rest, {
get (target, propKey) {
if (typeof target[propKey] !== 'function') {
Expand All @@ -121,14 +133,21 @@ const _getRestProxy = (rest, bfxApiRouter) => {
const res = _route(
bfxApiRouter,
propKey,
args
args,
interrupter
)

if (_isNotPromiseOrBluebird(res)) {
return res
}

return _asyncApplyHook(bfxApiRouter, res, propKey, ...args)
return _asyncApplyHook(
bfxApiRouter,
res,
propKey,
args,
interrupter
)
} catch (err) {
if (isNonceSmallError(err)) {
attemptsCount += 1
Expand Down Expand Up @@ -165,7 +184,8 @@ module.exports = (conf, bfxApiRouter) => {
authToken: _authToken = ''
} = auth
const {
timeout = 90000
timeout = 90000,
interrupter
} = opts ?? {}

/*
Expand All @@ -185,7 +205,10 @@ module.exports = (conf, bfxApiRouter) => {
}

const rest = bfxInstance.rest(2, restOpts)
const proxy = _getRestProxy(rest, bfxApiRouter)
const proxy = _getRestProxy(rest, {
bfxApiRouter,
interrupter
})

return proxy
}
Expand Down
8 changes: 5 additions & 3 deletions workers/loc.api/helpers/prepare-response/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const _requestToApi = (
getREST,
apiMethodName,
params,
auth
auth,
interrupter
) => {
const rest = getREST(auth)
const rest = getREST(auth, { interrupter })
const bfxApiMethodName = getBfxApiMethodName(apiMethodName)

const fn = rest[bfxApiMethodName].bind(rest)
Expand Down Expand Up @@ -55,7 +56,8 @@ const _getResAndParams = async (
getREST,
apiMethodName,
queryParams,
args.auth
args.auth,
args?.interrupter
)

return {
Expand Down
3 changes: 2 additions & 1 deletion workers/loc.api/queue/write-data-to-stream/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ module.exports = (
const _res = await getDataFromApi({
getData,
args: currIterationArgs,
callerName: 'REPORT_FILE_WRITER'
callerName: 'REPORT_FILE_WRITER',
shouldNotInterrupt: true
})

const isGetWalletsMethod = method === 'getWallets'
Expand Down
98 changes: 64 additions & 34 deletions workers/loc.api/service.report.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class ReportService extends Api {
}

_generateToken (args, opts) {
const rest = this._getREST(args?.auth)
const rest = this._getREST(args?.auth, {
interrupter: args?.interrupter
})

return rest.generateToken({
ttl: opts?.ttl ?? 3600,
Expand All @@ -38,26 +40,34 @@ class ReportService extends Api {
}

_invalidateAuthToken (args) {
const rest = this._getREST(args?.auth)
const rest = this._getREST(args?.auth, {
interrupter: args?.interrupter
})
const { authToken } = args?.params ?? {}

return rest.invalidateAuthToken({ authToken })
}

_getUserInfo (args) {
const rest = this._getREST(args.auth)
const rest = this._getREST(args.auth, {
interrupter: args?.interrupter
})

return rest.userInfo()
}

_getSymbols () {
const rest = this._getREST({})
_getSymbols (args) {
const rest = this._getREST({}, {
interrupter: args?.interrupter
})

return rest.symbols()
}

_getFutures () {
const rest = this._getREST({})
_getFutures (args) {
const rest = this._getREST({}, {
interrupter: args?.interrupter
})

return rest.futures()
}
Expand All @@ -68,41 +78,46 @@ class ReportService extends Api {
return rest.currencies()
}

_getInactiveSymbols () {
const rest = this._getREST({})
_getInactiveSymbols (args) {
const rest = this._getREST({}, {
interrupter: args?.interrupter
})

return rest.inactiveSymbols()
}

async _getConf (opts) {
const { keys: _keys } = opts ?? {}
async _getConf (args) {
const { keys: _keys, interrupter } = args ?? {}
const keys = Array.isArray(_keys) ? _keys : [_keys]
const rest = this._getREST({})
const rest = this._getREST({}, { interrupter })

const res = await rest.conf({ keys })

return Array.isArray(res) ? res : []
}

async _getMapSymbols () {
async _getMapSymbols (args) {
const [res] = await this._getConf({
keys: 'pub:map:pair:sym'
keys: 'pub:map:pair:sym',
interrupter: args?.interrupter
})

return Array.isArray(res) ? res : []
}

async _getInactiveCurrencies () {
async _getInactiveCurrencies (args) {
const [res] = await this._getConf({
keys: 'pub:list:currency:inactive'
keys: 'pub:list:currency:inactive',
interrupter: args?.interrupter
})

return Array.isArray(res) ? res : []
}

async _getMarginCurrencyList () {
async _getMarginCurrencyList (args) {
const [res] = await this._getConf({
keys: 'pub:list:currency:margin'
keys: 'pub:list:currency:margin',
interrupter: args?.interrupter
})

return Array.isArray(res) ? res : []
Expand All @@ -111,7 +126,9 @@ class ReportService extends Api {
_getWeightedAveragesReportFromApi (args) {
const { auth, params } = args ?? {}

const rest = this._getREST(auth)
const rest = this._getREST(auth, {
interrupter: args?.interrupter
})

return rest.getWeightedAverages(params)
}
Expand Down Expand Up @@ -218,13 +235,13 @@ class ReportService extends Api {
inactiveCurrencies,
marginCurrencyList
] = await Promise.all([
this._getSymbols(),
this._getFutures(),
this._getCurrencies(),
this._getInactiveSymbols(),
this._getMapSymbols(),
this._getInactiveCurrencies(),
this._getMarginCurrencyList()
this._getSymbols(args),
this._getFutures(args),
this._getCurrencies(args),
this._getInactiveSymbols(args),
this._getMapSymbols(args),
this._getInactiveCurrencies(args),
this._getMarginCurrencyList(args)
])

const res = prepareSymbolResponse({
Expand All @@ -248,7 +265,9 @@ class ReportService extends Api {
const { auth, params } = args ?? {}
const { keys = [] } = params ?? {}

const rest = this._getREST(auth)
const rest = this._getREST(auth, {
interrupter: args?.interrupter
})

return rest.getSettings({ keys })
}, 'getSettings', args, cb)
Expand All @@ -259,7 +278,9 @@ class ReportService extends Api {
const { auth, params } = args ?? {}
const { settings = {} } = params ?? {}

const rest = this._getREST(auth)
const rest = this._getREST(auth, {
interrupter: args?.interrupter
})

return rest.updateSettings({ settings })
}, 'updateSettings', args, cb)
Expand Down Expand Up @@ -311,7 +332,9 @@ class ReportService extends Api {

getActivePositions (space, args, cb) {
return this._responder(async () => {
const rest = this._getREST(args.auth)
const rest = this._getREST(args.auth, {
interrupter: args?.interrupter
})
const positions = omitPrivateModelFields(
await rest.positions()
)
Expand Down Expand Up @@ -341,7 +364,9 @@ class ReportService extends Api {
return this._responder(async () => {
checkParams(args, 'paramsSchemaForWallets')

const rest = this._getREST(args.auth)
const rest = this._getREST(args.auth, {
interrupter: args?.interrupter
})

return omitPrivateModelFields(await rest.wallets())
}, 'getWallets', args, cb)
Expand Down Expand Up @@ -510,7 +535,9 @@ class ReportService extends Api {

getActiveOrders (space, args, cb) {
return this._responder(async () => {
const rest = this._getREST(args.auth)
const rest = this._getREST(args.auth, {
interrupter: args?.interrupter
})

const _res = omitPrivateModelFields(
await rest.activeOrders()
Expand All @@ -537,8 +564,8 @@ class ReportService extends Api {
return this._responder(async () => {
checkParams(args, 'paramsSchemaForMovementInfo', ['id'])

const { auth, params } = args ?? {}
const rest = this._getREST(auth)
const { auth, params, interrupter } = args ?? {}
const rest = this._getREST(auth, { interrupter })

const res = omitPrivateModelFields(
await rest.movementInfo({ id: params?.id })
Expand Down Expand Up @@ -604,7 +631,10 @@ class ReportService extends Api {
getAccountSummary (space, args, cb) {
return this._responder(async () => {
const { auth } = { ...args }
const rest = this._getREST(auth, { timeout: 30000 })
const rest = this._getREST(auth, {
timeout: 30000,
interrupter: args?.interrupter
})

const res = omitPrivateModelFields(
await rest.accountSummary()
Expand Down

0 comments on commit aedfaa7

Please sign in to comment.