Skip to content

Commit

Permalink
fix(decode): catch possible errors thrown by decode function
Browse files Browse the repository at this point in the history
  • Loading branch information
kalitine committed Jun 18, 2018
1 parent beddfbf commit bb68e1f
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 80 deletions.
50 changes: 30 additions & 20 deletions src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ export class Channel {
})
)
this.wsOrDc.onmessage = ({ data }: { data: ArrayBuffer }) => {
this.handleInitMessage(proto.Message.decode(new Uint8Array(data)))
try {
const msg = proto.Message.decode(new Uint8Array(data))
this.handleInitMessage(msg)
} catch (err) {
log.warn('Decode inner Channel message error: ', err)
}
}
}
}
Expand Down Expand Up @@ -188,27 +193,32 @@ export class Channel {
private initHandlers() {
// Configure handlers
this.wsOrDc.onmessage = ({ data }: { data: ArrayBuffer }) => {
const msg = Message.decode(new Uint8Array(data))

// 0: broadcast message
if (msg.recipientId === 0 || msg.recipientId === this.wc.myId) {
// User Message
if (msg.serviceId === UserMessage.SERVICE_ID) {
const userData = this.wc.userMsg.decodeUserMessage(msg.content, msg.senderId)
if (userData) {
this.wc.onMessage(msg.senderId as number, userData)
try {
const msg = Message.decode(new Uint8Array(data))

// 0: broadcast message or a message to me
if (msg.recipientId === 0 || msg.recipientId === this.wc.myId) {
// User Message
if (msg.serviceId === UserMessage.SERVICE_ID) {
const userData = this.wc.userMsg.decodeUserMessage(msg.content, msg.senderId)
if (userData) {
this.wc.onMessage(msg.senderId as number, userData)
}

// Heartbeat message
} else if (msg.serviceId === 0) {
this.missedHeartbeat = 0

// Service Message
} else {
this.wc.streamSubject.next(Object.assign({ channel: this }, msg))
}

// Heartbeat message
} else if (msg.serviceId === 0) {
this.missedHeartbeat = 0
// Service Message
} else {
this.wc.streamSubject.next(Object.assign({ channel: this }, msg))
}
}
if (msg.recipientId !== this.wc.myId) {
this.wc.topology.forward(msg)
if (msg.recipientId !== this.wc.myId) {
this.wc.topology.forward(msg)
}
} catch (err) {
log.warn('Decode general Channel message error: ', err)
}
}
this.wsOrDc.onclose = (evt: Event) => {
Expand Down
40 changes: 22 additions & 18 deletions src/Signaling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,26 +141,30 @@ export class Signaling implements IStream<OutSigMsg, InSigMsg> {
}

private handleMessage(bytes: ArrayBuffer) {
const msg = proto.Message.decode(new Uint8Array(bytes))
switch (msg.type) {
case 'heartbeat':
this.missedHeartbeat = 0
break
case 'connected':
this.connected = msg.connected
this.setState(SignalingState.CHECKED)
if (!msg.connected) {
this.wc.channelBuilder.connectOverSignaling().catch(() => this.check())
try {
const msg = proto.Message.decode(new Uint8Array(bytes))
switch (msg.type) {
case 'heartbeat':
this.missedHeartbeat = 0
break
case 'connected':
this.connected = msg.connected
this.setState(SignalingState.CHECKED)
if (!msg.connected) {
this.wc.channelBuilder.connectOverSignaling().catch(() => this.check())
}
break
case 'content': {
const { data, id } = msg.content as proto.Content
const streamMessage = Message.decode(data)
streamMessage.senderId = id
log.signaling('StreamMessage RECEIVED: ', streamMessage)
this.streamSubject.next(streamMessage)
break
}
break
case 'content': {
const { data, id } = msg.content as proto.Content
const streamMessage = Message.decode(data)
streamMessage.senderId = id
log.signaling('StreamMessage RECEIVED: ', streamMessage)
this.streamSubject.next(streamMessage)
break
}
} catch (err) {
log.warn('Decode Signaling message error: ', err)
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/service/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export abstract class Service<OutMsg, InMsg extends OutMsg> {
senderId,
recipientId,
msg: this.decode(content),
}))
})),
filter(({ msg }: any) => msg && msg.type)
),
send: (content: Uint8Array | OutMsg, id?: number) => {
wc.sendOverStream({
Expand All @@ -87,7 +88,8 @@ export abstract class Service<OutMsg, InMsg extends OutMsg> {
id: sig.STREAM_ID,
message: sig.messageFromStream.pipe(
filter(({ serviceId }) => serviceId === this.serviceId),
map(({ senderId, content }) => ({ senderId, msg: this.decode(content) }))
map(({ senderId, content }) => ({ senderId, msg: this.decode(content) })),
filter(({ msg }: any) => msg && msg.type)
),
send: (content: Uint8Array | OutMsg, id?: number) => {
sig.sendOverStream({
Expand Down
74 changes: 40 additions & 34 deletions src/service/UserMessage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { env } from '../misc/env'
import { log } from '../misc/util'
import { userMessage as proto } from '../proto'
import { Service } from '../service/Service'

Expand Down Expand Up @@ -65,45 +66,50 @@ export class UserMessage extends Service<proto.IMessage, proto.Message> {
* Decode user message received from the network.
*/
decodeUserMessage(bytes: Uint8Array, senderId: number): UserDataType | undefined {
const { length, type, contentType, full, chunk } = super.decode(bytes) as {
length: number
type: proto.Message.Type
contentType: string
full: Uint8Array | undefined
chunk: proto.Message.Chunk
}
let result
switch (contentType) {
case 'full': {
result = full
break
try {
const { length, type, contentType, full, chunk } = super.decode(bytes) as {
length: number
type: proto.Message.Type
contentType: string
full: Uint8Array | undefined
chunk: proto.Message.Chunk
}
case 'chunk': {
let buffer = this.getBuffer(senderId, chunk.id)
if (buffer === undefined) {
buffer = new Buffer(length, chunk.content, chunk.nb)
this.setBuffer(senderId, chunk.id, buffer)
result = undefined
} else {
result = buffer.append(chunk.content, chunk.nb)
let result
switch (contentType) {
case 'full': {
result = full
break
}
case 'chunk': {
let buffer = this.getBuffer(senderId, chunk.id)
if (buffer === undefined) {
buffer = new Buffer(length, chunk.content, chunk.nb)
this.setBuffer(senderId, chunk.id, buffer)
result = undefined
} else {
result = buffer.append(chunk.content, chunk.nb)
}
break
}
default: {
throw new Error('Unknown message integrity')
}
break
}
default: {
throw new Error('Unknown message integrity')
}
}
if (result !== undefined) {
switch (type) {
case proto.Message.Type.U_INT_8_ARRAY:
return result
case proto.Message.Type.STRING:
return textDecoder.decode(result)
default:
throw new Error('Unknown message type')
if (result !== undefined) {
switch (type) {
case proto.Message.Type.U_INT_8_ARRAY:
return result
case proto.Message.Type.STRING:
return textDecoder.decode(result)
default:
throw new Error('Unknown message type')
}
}
return result
} catch (err) {
log.warn('Decode user message error: ', err)
return undefined
}
return result
}

/**
Expand Down
17 changes: 11 additions & 6 deletions src/service/topology/FullMesh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,17 @@ export class FullMesh extends Topology<proto.IMessage, proto.Message> implements

// Set onConnectionRequest callback for ChannelBuilder
this.wc.channelBuilder.onConnectionRequest = (streamId: number, data: Uint8Array) => {
const { id, adjacentIds } = proto.ConnectionRequest.decode(data)
if (streamId === this.wc.STREAM_ID) {
log.topology(`CONNECTION REQUEST from ${id}, where adjacent members are: `, adjacentIds)
return this.createOrUpdateDistantMember(id, adjacentIds)
} else {
return true
try {
const { id, adjacentIds } = proto.ConnectionRequest.decode(data)
if (streamId === this.wc.STREAM_ID) {
log.topology(`CONNECTION REQUEST from ${id}, where adjacent members are: `, adjacentIds)
return this.createOrUpdateDistantMember(id, adjacentIds)
} else {
return true
}
} catch (err) {
log.warn('Decode Fullmesh onConnectionRequest message error: ', err)
return false
}
}

Expand Down

0 comments on commit bb68e1f

Please sign in to comment.