Skip to content

Commit

Permalink
Improve RPC peer error handling (#7)
Browse files Browse the repository at this point in the history
Define a new error hierarchy for being unable to complete a method call.
Reject method call Promises with instances of the `MethodCallError`
subclasses `MethodCallTimeout` or `RPCStreamClosed` in response to the
respective events.

Define a new error class `UnexpectedResponse`, and emit that instead of
a plain `Error` when receiving an unexpected JSON-RPC response object.

Reject the method call Promise instead of throwing when calling
`callMethod` while the RPC stream is already closed, for consistency
with the RPC stream closing while the response is pending.

No longer assert that the Writable side of the Peer is open in the
`sendNotification` and `pushError` methods. This is consistent with the
behaviour of the `Readable.push` method, and allows Peer to be operated
in a simplex mode for sending notifications. This change also fixes a
bug with handling of a request handler's error response when the
Writable side is closed. Before, success responses were pushed to the
Readable side but errors resulted in an unhandled Promise rejection
within the library and no response being pushed. Now all responses are
pushed to the Readable side irrespective of the response type or state
of the Peer's Writable side.
  • Loading branch information
corhere authored Dec 13, 2018
1 parent 74cf015 commit d46f0e2
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 54 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@fitbit/jsonrpc-ts",
"version": "1.0.2",
"version": "2.0.0",
"description": "A very flexible library for building JSON-RPC 2.0 endpoints.",
"files": [
"lib",
Expand Down Expand Up @@ -37,6 +37,6 @@
"error-subclass": "^2.2.0"
},
"peerDependencies": {
"io-ts": "1.4.1"
"io-ts": "^1.4.2"
}
}
125 changes: 107 additions & 18 deletions src/peer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,61 @@ describe('InvalidParams', () => {
});
});

describe('MethodCallTimeout', () => {
it('constructs objects which are instances of MethodCallTimeout', () => {
const obj = new peer.MethodCallTimeout('foo');
expect(obj).toBeInstanceOf(Error);
expect(obj).toBeInstanceOf(peer.MethodCallError);
expect(obj).toBeInstanceOf(peer.MethodCallTimeout);
});

it('sets the message and method', () => {
const method = 'some.method.name';
expect(new peer.MethodCallTimeout(method)).toMatchObject({
method,
message: `No response received for RPC call to '${method}'`,
});
});
});

describe('RPCStreamClosed', () => {
it('constructs objects which are instances of RPCStreamClosed', () => {
const obj = new peer.RPCStreamClosed('foo');
expect(obj).toBeInstanceOf(Error);
expect(obj).toBeInstanceOf(peer.MethodCallError);
expect(obj).toBeInstanceOf(peer.RPCStreamClosed);
});

it('sets the error message and method', () => {
const method = 'some.method.name';
expect(new peer.RPCStreamClosed(method)).toMatchObject({
method,
message: `RPC call to '${method}' could not be completed as the RPC stream is closed`,
});
});
});

describe('UnexpectedResponse', () => {
it('constructs objects which are instances of UnexpectedResponse', () => {
const obj = new peer.UnexpectedResponse(0);
expect(obj).toBeInstanceOf(Error);
expect(obj).toBeInstanceOf(peer.UnexpectedResponse);
});

it('sets the error message, kind and id', () => {
expect(new peer.UnexpectedResponse('eye dee', 'error')).toMatchObject({
id: 'eye dee',
kind: 'error',
// tslint:disable-next-line:max-line-length
message: 'Received error with id \'"eye dee"\', which does not correspond to any outstanding RPC call',
});
});

it('defaults to kind "response"', () => {
expect(new peer.UnexpectedResponse(0)).toHaveProperty('kind', 'response');
});
});

describe('numeric request id iterator', () => {
it('does not repeat values', () => {
// Not quite true; values will repeat when it wraps around.
Expand Down Expand Up @@ -145,9 +200,14 @@ describe('Peer', () => {
});
});

it('handles an unexpected response by emitting an error', (done) => {
it('handles an unexpected response by emitting an UnexpectedResponse error', (done) => {
uut.once('error', (err: Error) => {
expect(err.message).toMatch(/Received response with id '55'/);
expect(err).toMatchObject({
message: expect.stringContaining("Received response with id \'55\'"),
kind: 'response',
id: 55,
});
expect(err).toBeInstanceOf(peer.UnexpectedResponse);
done();
});
uut.write(jrpc.response(55));
Expand All @@ -168,11 +228,16 @@ describe('Peer', () => {
});

it(
'handles an error with an id not matching any outstanding request ' +
'by emitting an error',
// tslint:disable-next-line:max-line-length
'handles an error with an id not matching any outstanding request by emitting an UnexpectedResponse error',
(done) => {
uut.once('error', (err: Error) => {
expect(err.message).toMatch(/Received error with id 'yellow'/);
expect(err).toMatchObject({
message: expect.stringContaining('Received error with id \'"yellow"\''),
kind: 'error',
id: 'yellow',
});
expect(err).toBeInstanceOf(peer.UnexpectedResponse);
done();
});
uut.write(jrpc.error({ id: 'yellow', message: '', code: 1 }));
Expand Down Expand Up @@ -361,6 +426,26 @@ describe('Peer', () => {
uut.write(jrpc.request('foo', '', [5, 4, 3]));
});

it('sends a response after the Writable side is closed', (done) => {
uut.onRequest = () => {
uut.end();
return new Promise(resolve => setImmediate(resolve));
};
uut.on('data', (value) => {
try {
const message = jrpc.parse(value);
expect(message).toMatchObject({
kind: 'response',
id: 'bar',
});
done();
} catch (e) {
done.fail(e);
}
});
uut.write(jrpc.request('bar', ''));
});

describe('sends an internal error response', () => {
function testInternalError(onRequest: peer.RequestHandler) {
uut.onRequest = onRequest;
Expand Down Expand Up @@ -433,6 +518,15 @@ describe('Peer', () => {
return Promise.reject(new peer.RPCError('You dun goofed', 5555));
});
});

test('after the Writable side is closed', (done) => {
testErrorResponse(done, () => {
uut.end();
return new Promise((resolve, reject) => {
setImmediate(() => reject(new peer.RPCError('You dun goofed', 5555)));
});
});
});
});

it('forwards a parse error from the deserializer to the remote peer', (done) => {
Expand Down Expand Up @@ -480,28 +574,23 @@ describe('Peer', () => {
const methodCalls = [uut.callMethod('foo'), uut.callMethod('bar')];
uut.end();
return Promise.all(methodCalls.map((call) => {
return expect(call).rejects.toEqual(
expect.objectContaining({
message: expect.stringMatching(/RPC stream closed/),
}),
);
return expect(call).rejects.toThrow(peer.RPCStreamClosed);
}));
});

describe('after the stream ends', () => {
beforeEach(() => uut.end());

it('throws an error when attempting to call a method', () => {
expect(() => uut.callMethod('foo')).toThrow(/RPC stream closed/);
it('rejects when attempting to call a method', () => {
return expect(uut.callMethod('foo')).rejects.toThrow(peer.RPCStreamClosed);
});

it('throws an error when attempting to send a notification', () => {
expect(() => uut.sendNotification('foo')).toThrow(/RPC stream closed/);
it('does not throw when attempting to send a notification', () => {
expect(() => uut.sendNotification('foo')).not.toThrow();
});

it('throws an error when attempting to push an error', () => {
expect(() => uut.pushError({ code: 0, message: 'foo' }))
.toThrow(/RPC stream closed/);
it('does not throw when attempting to push an error', () => {
expect(() => uut.pushError({ code: 0, message: 'foo' })).not.toThrow();
});
});

Expand Down Expand Up @@ -555,7 +644,7 @@ describe('Peer', () => {
beforeEach(() => uut.end());

it('rejects with an error', () => expect(methodCall).rejects.toThrow(
/RPC stream closed/,
peer.RPCStreamClosed,
));
it('clears the timeout timer', () => expect(clearTimeout).toBeCalled());
});
Expand Down
101 changes: 67 additions & 34 deletions src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ export enum ErrorCodes {
*/
export class RPCError extends ErrorSubclass {
static displayName = 'RPCError';
readonly code: number;
readonly data: any;

constructor(message: string, code: number = ErrorCodes.INTERNAL_ERROR, data?: any) {
constructor(
message: string,
public readonly code: number = ErrorCodes.INTERNAL_ERROR,
public readonly data?: any,
) {
super(message);
this.code = code;
this.data = data;
}

toErrorObject(): jrpc.ErrorObject {
Expand Down Expand Up @@ -96,16 +96,52 @@ export class ParseError extends RPCError {
}
}

/**
* The method call could not be completed.
*/
export class MethodCallError extends ErrorSubclass {
static displayName = 'MethodCallError';

constructor(
public readonly method: string,
message = `RPC call to '${method}' could not be completed`,
) {
super(message);
}
}

/**
* No response to a method call was received in time.
*/
export class MethodCallTimeout extends ErrorSubclass {
export class MethodCallTimeout extends MethodCallError {
static displayName = 'MethodCallTimeout';
readonly method: string;

constructor(method: string) {
super(`${method} timed out`);
this.method = method;
super(method, `No response received for RPC call to '${method}'`);
}
}

/**
* The method call could not be completed as the Peer's writable stream
* has been closed.
*/
export class RPCStreamClosed extends MethodCallError {
static displayName = 'RPCStreamClosed';

constructor(method: string) {
super(method, `RPC call to '${method}' could not be completed as the RPC stream is closed`);
}
}

/**
* An unexpected JSON-RPC Response has been received.
*/
export class UnexpectedResponse extends ErrorSubclass {
static displayName = 'UnexpectedResponse';

constructor(public readonly id: jrpc.RPCID, public readonly kind = 'response') {
// tslint:disable-next-line:max-line-length
super(`Received ${kind} with id '${JSON.stringify(id)}', which does not correspond to any outstanding RPC call`);
}
}

Expand Down Expand Up @@ -155,6 +191,12 @@ export interface PeerOptions {
idIterator?: Iterator<jrpc.RPCID>;
}

interface PendingRequest {
method: string;
resolve: (value: any) => void;
reject: (reason: Error) => void;
}

/**
* A JSON-RPC Peer which reads and writes JSON-RPC objects as
* JavaScript objects.
Expand All @@ -175,8 +217,7 @@ export class Peer extends stream.Duplex {
onNotification?: NotificationHandler;
requestIdIterator: Iterator<jrpc.RPCID>;

private pendingRequests = new Map<
jrpc.RPCID, { resolve: (value: any) => void, reject: (reason: Error) => void }>();
private pendingRequests = new Map<jrpc.RPCID, PendingRequest>();

ended = false;

Expand All @@ -201,13 +242,10 @@ export class Peer extends stream.Duplex {

private onend() {
this.ended = true;
this.pendingRequests.forEach(({ reject }) => {
reject(new Error('RPC stream closed'));
this.pendingRequests.forEach(({ method, reject }) => {
reject(new RPCStreamClosed(method));
});
}

private assertNotEnded() {
if (this.ended) throw new Error('RPC stream closed');
this.pendingRequests.clear();
}

/**
Expand All @@ -223,7 +261,7 @@ export class Peer extends stream.Duplex {
params?: jrpc.RPCParams,
{ timeout = undefined as number | undefined } = {},
): Promise<any> {
this.assertNotEnded();
if (this.ended) return Promise.reject(new RPCStreamClosed(method));
const idResult = this.requestIdIterator.next();
if (idResult.done) {
throw new Error(
Expand All @@ -245,15 +283,14 @@ export class Peer extends stream.Duplex {

let timer: NodeJS.Timer | undefined;

const promise = new Promise(
(resolve: (value: any) => void, reject: (reason: any) => void) => {
this.push(jrpc.request(id, method, params));
this.pendingRequests.set(id, { resolve, reject });
const promise = new Promise<any>((resolve, reject) => {
this.push(jrpc.request(id, method, params));
this.pendingRequests.set(id, { method, resolve, reject });

if (timeout !== undefined) {
timer = setTimeout(() => reject(new MethodCallTimeout(method)), timeout);
}
});
if (timeout !== undefined) {
timer = setTimeout(() => reject(new MethodCallTimeout(method)), timeout);
}
});

if (timer !== undefined) {
const timerRef = timer;
Expand All @@ -274,14 +311,12 @@ export class Peer extends stream.Duplex {

/** Send an RPC Notification object to the remote peer. */
sendNotification(method: string, params?: jrpc.RPCParams) {
this.assertNotEnded();
this.push(jrpc.notification(method, params));
return this.push(jrpc.notification(method, params));
}

/** Push an RPC Error object to the remote peer. */
pushError(error: jrpc.ErrorObject) {
this.assertNotEnded();
this.push(jrpc.error(error));
return this.push(jrpc.error(error));
}

// tslint:disable-next-line:function-name
Expand Down Expand Up @@ -392,8 +427,7 @@ export class Peer extends stream.Duplex {
this.pendingRequests.delete(id);
rpcCall.resolve(result);
} else {
throw new Error(
`Received response with id '${id}', which does not correspond to any outstanding RPC call`);
throw new UnexpectedResponse(id);
}
}

Expand All @@ -413,8 +447,7 @@ export class Peer extends stream.Duplex {
this.pendingRequests.delete(id);
rpcCall.reject(rpcError);
} else {
throw new Error(
`Received error with id '${id}', which does not correspond to any outstanding RPC call`);
throw new UnexpectedResponse(id, 'error');
}
} else {
throw rpcError;
Expand Down

0 comments on commit d46f0e2

Please sign in to comment.