From bb68e1f9ec83b5cace08a572ff483506a42ed811 Mon Sep 17 00:00:00 2001 From: Philippe Kalitine Date: Mon, 18 Jun 2018 10:41:43 +0200 Subject: [PATCH] fix(decode): catch possible errors thrown by decode function --- src/Channel.ts | 50 ++++++++++++--------- src/Signaling.ts | 40 +++++++++-------- src/service/Service.ts | 6 ++- src/service/UserMessage.ts | 74 +++++++++++++++++--------------- src/service/topology/FullMesh.ts | 17 +++++--- 5 files changed, 107 insertions(+), 80 deletions(-) diff --git a/src/Channel.ts b/src/Channel.ts index d8fa59dc..ea87bdbc 100644 --- a/src/Channel.ts +++ b/src/Channel.ts @@ -83,7 +83,12 @@ export class Channel { }) ) this.wsOrDc.onmessage = ({ data }: { data: ArrayBuffer }) => { - this.handleInitMessage(proto.Message.decode(new Uint8Array(data))) + try { + const msg = proto.Message.decode(new Uint8Array(data)) + this.handleInitMessage(msg) + } catch (err) { + log.warn('Decode inner Channel message error: ', err) + } } } } @@ -188,27 +193,32 @@ export class Channel { private initHandlers() { // Configure handlers this.wsOrDc.onmessage = ({ data }: { data: ArrayBuffer }) => { - const msg = Message.decode(new Uint8Array(data)) - - // 0: broadcast message - if (msg.recipientId === 0 || msg.recipientId === this.wc.myId) { - // User Message - if (msg.serviceId === UserMessage.SERVICE_ID) { - const userData = this.wc.userMsg.decodeUserMessage(msg.content, msg.senderId) - if (userData) { - this.wc.onMessage(msg.senderId as number, userData) + try { + const msg = Message.decode(new Uint8Array(data)) + + // 0: broadcast message or a message to me + if (msg.recipientId === 0 || msg.recipientId === this.wc.myId) { + // User Message + if (msg.serviceId === UserMessage.SERVICE_ID) { + const userData = this.wc.userMsg.decodeUserMessage(msg.content, msg.senderId) + if (userData) { + this.wc.onMessage(msg.senderId as number, userData) + } + + // Heartbeat message + } else if (msg.serviceId === 0) { + this.missedHeartbeat = 0 + + // Service Message + } else { + this.wc.streamSubject.next(Object.assign({ channel: this }, msg)) } - - // Heartbeat message - } else if (msg.serviceId === 0) { - this.missedHeartbeat = 0 - // Service Message - } else { - this.wc.streamSubject.next(Object.assign({ channel: this }, msg)) } - } - if (msg.recipientId !== this.wc.myId) { - this.wc.topology.forward(msg) + if (msg.recipientId !== this.wc.myId) { + this.wc.topology.forward(msg) + } + } catch (err) { + log.warn('Decode general Channel message error: ', err) } } this.wsOrDc.onclose = (evt: Event) => { diff --git a/src/Signaling.ts b/src/Signaling.ts index 671f9c65..865cf798 100644 --- a/src/Signaling.ts +++ b/src/Signaling.ts @@ -141,26 +141,30 @@ export class Signaling implements IStream { } private handleMessage(bytes: ArrayBuffer) { - const msg = proto.Message.decode(new Uint8Array(bytes)) - switch (msg.type) { - case 'heartbeat': - this.missedHeartbeat = 0 - break - case 'connected': - this.connected = msg.connected - this.setState(SignalingState.CHECKED) - if (!msg.connected) { - this.wc.channelBuilder.connectOverSignaling().catch(() => this.check()) + try { + const msg = proto.Message.decode(new Uint8Array(bytes)) + switch (msg.type) { + case 'heartbeat': + this.missedHeartbeat = 0 + break + case 'connected': + this.connected = msg.connected + this.setState(SignalingState.CHECKED) + if (!msg.connected) { + this.wc.channelBuilder.connectOverSignaling().catch(() => this.check()) + } + break + case 'content': { + const { data, id } = msg.content as proto.Content + const streamMessage = Message.decode(data) + streamMessage.senderId = id + log.signaling('StreamMessage RECEIVED: ', streamMessage) + this.streamSubject.next(streamMessage) + break } - break - case 'content': { - const { data, id } = msg.content as proto.Content - const streamMessage = Message.decode(data) - streamMessage.senderId = id - log.signaling('StreamMessage RECEIVED: ', streamMessage) - this.streamSubject.next(streamMessage) - break } + } catch (err) { + log.warn('Decode Signaling message error: ', err) } } diff --git a/src/service/Service.ts b/src/service/Service.ts index a49c75cb..6f302497 100644 --- a/src/service/Service.ts +++ b/src/service/Service.ts @@ -69,7 +69,8 @@ export abstract class Service { senderId, recipientId, msg: this.decode(content), - })) + })), + filter(({ msg }: any) => msg && msg.type) ), send: (content: Uint8Array | OutMsg, id?: number) => { wc.sendOverStream({ @@ -87,7 +88,8 @@ export abstract class Service { id: sig.STREAM_ID, message: sig.messageFromStream.pipe( filter(({ serviceId }) => serviceId === this.serviceId), - map(({ senderId, content }) => ({ senderId, msg: this.decode(content) })) + map(({ senderId, content }) => ({ senderId, msg: this.decode(content) })), + filter(({ msg }: any) => msg && msg.type) ), send: (content: Uint8Array | OutMsg, id?: number) => { sig.sendOverStream({ diff --git a/src/service/UserMessage.ts b/src/service/UserMessage.ts index c5884efe..d5ab124f 100644 --- a/src/service/UserMessage.ts +++ b/src/service/UserMessage.ts @@ -1,4 +1,5 @@ import { env } from '../misc/env' +import { log } from '../misc/util' import { userMessage as proto } from '../proto' import { Service } from '../service/Service' @@ -65,45 +66,50 @@ export class UserMessage extends Service { * Decode user message received from the network. */ decodeUserMessage(bytes: Uint8Array, senderId: number): UserDataType | undefined { - const { length, type, contentType, full, chunk } = super.decode(bytes) as { - length: number - type: proto.Message.Type - contentType: string - full: Uint8Array | undefined - chunk: proto.Message.Chunk - } - let result - switch (contentType) { - case 'full': { - result = full - break + try { + const { length, type, contentType, full, chunk } = super.decode(bytes) as { + length: number + type: proto.Message.Type + contentType: string + full: Uint8Array | undefined + chunk: proto.Message.Chunk } - case 'chunk': { - let buffer = this.getBuffer(senderId, chunk.id) - if (buffer === undefined) { - buffer = new Buffer(length, chunk.content, chunk.nb) - this.setBuffer(senderId, chunk.id, buffer) - result = undefined - } else { - result = buffer.append(chunk.content, chunk.nb) + let result + switch (contentType) { + case 'full': { + result = full + break + } + case 'chunk': { + let buffer = this.getBuffer(senderId, chunk.id) + if (buffer === undefined) { + buffer = new Buffer(length, chunk.content, chunk.nb) + this.setBuffer(senderId, chunk.id, buffer) + result = undefined + } else { + result = buffer.append(chunk.content, chunk.nb) + } + break + } + default: { + throw new Error('Unknown message integrity') } - break - } - default: { - throw new Error('Unknown message integrity') } - } - if (result !== undefined) { - switch (type) { - case proto.Message.Type.U_INT_8_ARRAY: - return result - case proto.Message.Type.STRING: - return textDecoder.decode(result) - default: - throw new Error('Unknown message type') + if (result !== undefined) { + switch (type) { + case proto.Message.Type.U_INT_8_ARRAY: + return result + case proto.Message.Type.STRING: + return textDecoder.decode(result) + default: + throw new Error('Unknown message type') + } } + return result + } catch (err) { + log.warn('Decode user message error: ', err) + return undefined } - return result } /** diff --git a/src/service/topology/FullMesh.ts b/src/service/topology/FullMesh.ts index 65833ebe..e8f5993e 100644 --- a/src/service/topology/FullMesh.ts +++ b/src/service/topology/FullMesh.ts @@ -60,12 +60,17 @@ export class FullMesh extends Topology implements // Set onConnectionRequest callback for ChannelBuilder this.wc.channelBuilder.onConnectionRequest = (streamId: number, data: Uint8Array) => { - const { id, adjacentIds } = proto.ConnectionRequest.decode(data) - if (streamId === this.wc.STREAM_ID) { - log.topology(`CONNECTION REQUEST from ${id}, where adjacent members are: `, adjacentIds) - return this.createOrUpdateDistantMember(id, adjacentIds) - } else { - return true + try { + const { id, adjacentIds } = proto.ConnectionRequest.decode(data) + if (streamId === this.wc.STREAM_ID) { + log.topology(`CONNECTION REQUEST from ${id}, where adjacent members are: `, adjacentIds) + return this.createOrUpdateDistantMember(id, adjacentIds) + } else { + return true + } + } catch (err) { + log.warn('Decode Fullmesh onConnectionRequest message error: ', err) + return false } }