Skip to content

Commit

Permalink
Merge pull request #102 from jmetev1/chore(upgrade-toolings)
Browse files Browse the repository at this point in the history
Chore(upgrade toolings)
  • Loading branch information
jeremymeng authored Jun 9, 2023
2 parents ee528d1 + 3ee3f32 commit 25a8aee
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 235 deletions.
40 changes: 40 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module.exports = {
"env": {
"browser": true,
"es6": true,
"node": true
},
"extends": [
"prettier"
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"project": "./tsconfig.json",
"sourceType": "module"
},
"plugins": [
"jsdoc",
"@typescript-eslint",
],
"root": true,
overrides: [
{
files:
[
'lib/**/*.ts',
'examples/**/*.ts',
'test/**/*.ts',
],
extends: ['eslint:recommended', 'plugin:@typescript-eslint/recommended'],
"rules": {
"jsdoc/check-alignment": "error",
"jsdoc/check-indentation": "error",
},
}
],
"ignorePatterns": [
"test/**",
"tsconfig.json",
"dist/**"
]
}
11 changes: 7 additions & 4 deletions lib/awaitableSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ export declare interface AwaitableSender {
on(event: SenderEvents, listener: OnAmqpEvent): this;
}

export interface AwaitableSenderOptions extends BaseSenderOptions {
}
export type AwaitableSenderOptions = BaseSenderOptions

export interface AwaitableSendOptions {
/**
Expand Down Expand Up @@ -224,7 +223,9 @@ export class AwaitableSender extends BaseSender {
};

const removeAbortListener = () => {
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

const delivery = (this._link as RheaSender).send(msg, options.tag, options.format);
Expand All @@ -240,7 +241,9 @@ export class AwaitableSender extends BaseSender {
timer: timer
});

if (abortSignal) { abortSignal.addEventListener("abort", onAbort); }
if (abortSignal) {
abortSignal.addEventListener("abort", onAbort);
}
} else {
// Please send the message after some time.
const msg =
Expand Down
73 changes: 33 additions & 40 deletions lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export interface CreateSenderOptions extends SenderOptions {
/**
* A signal used to cancel the Connection.createSender() operation.
*/
abortSignal?: AbortSignalLike;
abortSignal?: AbortSignalLike;
}

/**
Expand All @@ -55,7 +55,7 @@ export interface CreateReceiverOptions extends ReceiverOptions {
/**
* A signal used to cancel the Connection.createReceiver() operation.
*/
abortSignal?: AbortSignalLike;
abortSignal?: AbortSignalLike;
}

/**
Expand All @@ -67,7 +67,7 @@ export interface CreateRequestResponseLinkOptions {
/**
* A signal used to cancel the Connection.createRequestResponseLink() operation.
*/
abortSignal?: AbortSignalLike;
abortSignal?: AbortSignalLike;
}

/**
Expand Down Expand Up @@ -134,12 +134,12 @@ export type ConnectionOptions = RheaConnectionOptions & {
* @property {string[]} {protocol} - Websocket SubProtocol to be passed to the function
* returned by rhea.websocket_connect()
*/
protocol: string[],
/***
protocol: string[];
/** *
* @property {any} {options} - Options to be passed to the function returned by
* rhea.websocket_connect()
*/
options?: any
options?: any;
};
};

Expand Down Expand Up @@ -327,36 +327,34 @@ export class Connection extends Entity {
return new Promise((resolve, reject) => {
if (!this.isOpen()) {

let onOpen: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

const removeListeners: Function = () => {
const removeListeners = () => {
clearTimeout(waitTimer);
this.actionInitiated--;
this._connection.removeListener(ConnectionEvents.connectionOpen, onOpen);
this._connection.removeListener(ConnectionEvents.connectionClose, onClose);
this._connection.removeListener(ConnectionEvents.disconnected, onClose);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

onOpen = (context: RheaEventContext) => {
const onOpen = (context: RheaEventContext) => {
removeListeners();
log.connection("[%s] Resolving the promise with amqp connection.", this.id);
return resolve(this);
};

onClose = (context: RheaEventContext) => {
const onClose = (context: RheaEventContext) => {
removeListeners();
const err = context.error || context.connection.error || Error('Failed to connect');
log.error("[%s] Error occurred while establishing amqp connection: %O",
this.id, err);
return reject(err);
};

onAbort = () => {
const onAbort = () => {
removeListeners();
this._connection.close();
const err = createAbortError();
Expand All @@ -366,7 +364,7 @@ export class Connection extends Entity {

const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to open the amqp connection "${this.id}" due to operation timeout.`;
const msg = `Unable to open the amqp connection "${this.id}" due to operation timeout.`;
log.error("[%s] %s", this.id, msg);
return reject(new Error(msg));
};
Expand All @@ -375,7 +373,7 @@ export class Connection extends Entity {
this._connection.once(ConnectionEvents.connectionOpen, onOpen);
this._connection.once(ConnectionEvents.connectionClose, onClose);
this._connection.once(ConnectionEvents.disconnected, onClose);
waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
const waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
log.connection("[%s] Trying to create a new amqp connection.", this.id);
this._connection.connect();
this.actionInitiated++;
Expand Down Expand Up @@ -409,45 +407,42 @@ export class Connection extends Entity {
return new Promise<void>((resolve, reject) => {
log.error("[%s] The connection is open ? -> %s", this.id, this.isOpen());
if (this.isOpen()) {
let onClose: Func<RheaEventContext, void>;
let onError: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options && options.abortSignal;
let waitTimer: any;

const removeListeners = () => {
clearTimeout(waitTimer);
this.actionInitiated--;
this._connection.removeListener(ConnectionEvents.connectionError, onError);
this._connection.removeListener(ConnectionEvents.connectionClose, onClose);
this._connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

onClose = (context: RheaEventContext) => {
const onClose = (context: RheaEventContext) => {
removeListeners();
log.connection("[%s] Resolving the promise as the connection has been successfully closed.",
this.id);
return resolve();
};

onError = (context: RheaEventContext) => {
const onError = (context: RheaEventContext) => {
removeListeners();
log.error("[%s] Error occurred while closing amqp connection: %O.",
this.id, context.connection.error);
return reject(context.connection.error);
};

onDisconnected = (context: RheaEventContext) => {
const onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
: context.error;
log.error("[%s] Connection got disconnected while closing itself: %O.", this.id, error);
};

onAbort = () => {
const onAbort = () => {
removeListeners();
const err = createAbortError();
log.error("[%s] [%s]", this.id, err.message);
Expand All @@ -456,7 +451,7 @@ export class Connection extends Entity {

const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to close the amqp connection "${this.id}" due to operation timeout.`;
const msg = `Unable to close the amqp connection "${this.id}" due to operation timeout.`;
log.error("[%s] %s", this.id, msg);
return reject(new Error(msg));
};
Expand All @@ -465,7 +460,7 @@ export class Connection extends Entity {
this._connection.once(ConnectionEvents.connectionClose, onClose);
this._connection.once(ConnectionEvents.connectionError, onError);
this._connection.once(ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
const waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
this._connection.close();
this.actionInitiated++;

Expand All @@ -487,7 +482,7 @@ export class Connection extends Entity {
* @returns {boolean} result `true` - is open; `false` otherwise.
*/
isOpen(): boolean {
let result: boolean = false;
let result = false;
if (this._connection && this._connection.is_open && this._connection.is_open()) {
result = true;
}
Expand Down Expand Up @@ -596,34 +591,32 @@ export class Connection extends Entity {
const rheaSession = this._connection.create_session();
const session = new Session(this, rheaSession);
session.actionInitiated++;
let onOpen: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

const removeListeners = () => {
clearTimeout(waitTimer);
session.actionInitiated--;
rheaSession.removeListener(SessionEvents.sessionOpen, onOpen);
rheaSession.removeListener(SessionEvents.sessionClose, onClose);
rheaSession.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

onOpen = (context: RheaEventContext) => {
const onOpen = (context: RheaEventContext) => {
removeListeners();
log.session("[%s] Resolving the promise with amqp session '%s'.", this.id, session.id);
return resolve(session);
};

onClose = (context: RheaEventContext) => {
const onClose = (context: RheaEventContext) => {
removeListeners();
log.error("[%s] Error occurred while establishing a session over amqp connection: %O.",
this.id, context.session!.error);
return reject(context.session!.error);
};

onDisconnected = (context: RheaEventContext) => {
const onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
Expand All @@ -635,7 +628,7 @@ export class Connection extends Entity {

const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to create the amqp session due to operation timeout.`;
const msg = `Unable to create the amqp session due to operation timeout.`;
log.error("[%s] %s", this.id, msg);
return reject(new OperationTimeoutError(msg));
};
Expand All @@ -645,7 +638,7 @@ export class Connection extends Entity {
rheaSession.once(SessionEvents.sessionClose, onClose);
rheaSession.connection.once(ConnectionEvents.disconnected, onDisconnected);
log.session("[%s] Calling amqp session.begin().", this.id);
waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
const waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
rheaSession.begin();
});
}
Expand Down
2 changes: 1 addition & 1 deletion lib/entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export abstract class Entity extends EventEmitter {
* Whenever the action completes (succeeds/fails) the count will be decremented by 1.
* Default value: `0`.
*/
actionInitiated: number = 0;
actionInitiated = 0;
constructor() {
super();
}
Expand Down
3 changes: 2 additions & 1 deletion lib/eventContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ export interface EventContext {
_context: RheaEventContext;
}

export module EventContext {
// eslint-disable-next-line @typescript-eslint/no-namespace
export namespace EventContext {
/**
* Translates rhea's EventContext into rhea-promise EventContext
* @param rheaContext The received context from rhea's event emitter
Expand Down
14 changes: 5 additions & 9 deletions lib/link.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from "rhea";
import { Session } from "./session";
import { Connection } from "./connection";
import { Func, emitEvent, EmitParameters } from './util/utils';
import { emitEvent, EmitParameters } from './util/utils';
import { Entity } from "./entity";
import { OperationTimeoutError } from "./errorDefinitions";

Expand Down Expand Up @@ -243,10 +243,6 @@ export abstract class Link extends Entity {
const closeEvent = this.type === LinkType.sender
? SenderEvents.senderClose
: ReceiverEvents.receiverClose;
let onError: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let waitTimer: any;

const removeListeners = () => {
clearTimeout(waitTimer);
Expand All @@ -256,14 +252,14 @@ export abstract class Link extends Entity {
this._link.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
};

onClose = (context: RheaEventContext) => {
const onClose = (context: RheaEventContext) => {
removeListeners();
log[this.type]("[%s] Resolving the promise as the %s '%s' on amqp session '%s' " +
"has been closed.", this.connection.id, this.type, this.name, this.session.id);
return resolve();
};

onError = (context: RheaEventContext) => {
const onError = (context: RheaEventContext) => {
removeListeners();
let error = context.session!.error;
if (this.type === LinkType.sender && context.sender && context.sender.error) {
Expand All @@ -277,7 +273,7 @@ export abstract class Link extends Entity {
return reject(error);
};

onDisconnected = (context: RheaEventContext) => {
const onDisconnected = (context: RheaEventContext) => {
removeListeners();
const error = context.connection && context.connection.error
? context.connection.error
Expand All @@ -298,7 +294,7 @@ export abstract class Link extends Entity {
this._link.once(closeEvent, onClose);
this._link.once(errorEvent, onError);
this._link.connection.once(ConnectionEvents.disconnected, onDisconnected);
waitTimer = setTimeout(actionAfterTimeout,
const waitTimer = setTimeout(actionAfterTimeout,
this.connection.options!.operationTimeoutInSeconds! * 1000);
this._link.close();
this.actionInitiated++;
Expand Down
Loading

0 comments on commit 25a8aee

Please sign in to comment.