Skip to content

Commit

Permalink
fix: initialize newly created channels in WebRTCService
Browse files Browse the repository at this point in the history
Use ping-pong mechanism to initialize the dataChannel on both side and only after that they are
ready use.
  • Loading branch information
kalitine committed Apr 7, 2016
1 parent 635fca0 commit 16206d3
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 107 deletions.
131 changes: 99 additions & 32 deletions dist/netflux.es2015.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
}
break
case JOIN_INIT:
console.log('JOIN_INIT my new id: ' + msg.id)
wc.topology = msg.manager
wc.myId = msg.id
ch.peerId = msg.intermediaryId
Expand Down Expand Up @@ -271,7 +272,7 @@
*/
const CONNECT_WITH = 1
const CONNECT_WITH_FEEDBACK = 2
const CONNECT_WITH_TIMEOUT = 4000
const CONNECT_WITH_TIMEOUT = 5000
const ADD_INTERMEDIARY_CHANNEL = 4

/**
Expand All @@ -284,17 +285,20 @@
let cBuilder = get(wc.settings.connector, wc.settings)
switch (msg.code) {
case CONNECT_WITH:
console.log('CONNECT_WITH received: ', msg)
msg.peers = this.reUseIntermediaryChannelIfPossible(wc, msg.jpId, msg.peers)
cBuilder
.connectMeToMany(wc, msg.peers)
.then(result => {
console.log('CONNECT_WITH result: ', result)
result.channels.forEach(c => {
wc.initChannel(c, c.peerId)
wc.getJoiningPeer(msg.jpId).toAddList(c)
c.send(wc.proxy.msg(THIS_CHANNEL_TO_JOINING_PEER,
{id: msg.jpId, toBeAdded: true}
))
})
console.log('CONNECT_WITH send feedback: ', {code: CONNECT_WITH_FEEDBACK, id: wc.myId, failed: result.failed})
wc.sendSrvMsg(this.name, msg.sender,
{code: CONNECT_WITH_FEEDBACK, id: wc.myId, failed: result.failed}
)
Expand All @@ -304,6 +308,7 @@
})
break
case CONNECT_WITH_FEEDBACK:
console.log('CONNECT_WITH_FEEDBACK received: ', msg)
wc.connectWithRequests.get(msg.id)(true)
break
case ADD_INTERMEDIARY_CHANNEL:
Expand All @@ -327,6 +332,7 @@
* @return {Promise} - Is resolved once some of the connections could be established. It is rejected when an error occured.
*/
connectWith (wc, id, jpId, peers) {
console.log('send CONNECT_WITH to: ' + id + ' JoiningPeerID: ' + jpId + ' with peers', peers)
wc.sendSrvMsg(this.name, id,
{code: CONNECT_WITH, jpId: jpId,
sender: wc.myId, peers}
Expand Down Expand Up @@ -626,7 +632,7 @@
window.webkitRTCSessionDescription
}

open (key, onChannel, options = {}) {
open (webChannel, key, onChannel, options = {}) {
let settings = Object.assign({}, this.settings, options)
// Connection array, because several connections may be establishing
// at the same time
Expand All @@ -640,6 +646,7 @@
socket.onopen = () => { socket.send(this.toStr({key})) }
socket.onmessage = evt => {
let msg = JSON.parse(evt.data)
console.log('NETFLUX: message: ', msg)
if (!Reflect.has(msg, 'id') || !Reflect.has(msg, 'data')) {
// throw new SignalingError(err.name + ': ' + err.message)
throw new Error('Incorrect message format from the signaling server.')
Expand All @@ -651,11 +658,15 @@
candidate => socket.send(this.toStr({id: msg.id, data: {candidate}})),
answer => socket.send(this.toStr({id: msg.id, data: {answer}})),
onChannel,
msg.data.offer
msg.data.offer,
webChannel
)
// On Ice Candidate
} else if (Reflect.has(msg.data, 'candidate')) {
connections[msg.id].addIceCandidate(this.createCandidate(msg.data.candidate))
console.log('NETFLUX adding candidate')
connections[msg.id].addIceCandidate(this.createCandidate(msg.data.candidate), () => {}, (e) => {
console.log('NETFLUX adding candidate failed: ', e)
})
}
}
socket.onerror = evt => {
Expand All @@ -676,44 +687,56 @@
}
}

join (key, options = {}) {
join (webChannel, key, options = {}) {
let settings = Object.assign({}, this.settings, options)
console.log('NETFLUX: joining: ' + key)
return new Promise((resolve, reject) => {
let connection

// Connect to the signaling server
let socket = new window.WebSocket(settings.signaling)
socket.onopen = () => {
console.log('NETFLUX: connection with Sigver has been established')
// Prepare and send offer
connection = this.createConnectionAndOffer(
candidate => socket.send(this.toStr({data: {candidate}})),
offer => socket.send(this.toStr({join: key, data: {offer}})),
channel => {
channel.connection = connection
console.log('NETFLUX: channel created')
resolve(channel)
},
key
key,
webChannel
)
}
socket.onmessage = (e) => {
let msg = JSON.parse(e.data)
console.log('NETFLUX: message: ', msg)

// Check message format
if (!Reflect.has(msg, 'data')) { reject() }

// If received an answer to the previously sent offer
if (Reflect.has(msg.data, 'answer')) {
let sd = this.createSDP(msg.data.answer)
connection.setRemoteDescription(sd, () => {}, reject)
console.log('NETFLUX adding answer')
connection.setRemoteDescription(sd, () => {}, (e) => {
console.log('NETFLUX adding answer failed: ', e)
reject()
})
// If received an Ice candidate
} else if (Reflect.has(msg.data, 'candidate')) {
connection.addIceCandidate(this.createCandidate(msg.data.candidate))
console.log('NETFLUX adding candidate')
connection.addIceCandidate(this.createCandidate(msg.data.candidate), () => {}, (e) => {
console.log('NETFLUX adding candidate failed: ', e)
})
} else { reject() }
}
socket.onerror = e => {
reject(`Signaling server socket error: ${e.message}`)
}
socket.onclose = e => {
console.log('Closing server: ', e)
if (e.code !== 1000) { reject(e.reason) }
}
})
Expand Down Expand Up @@ -756,11 +779,9 @@
webChannel.connections.set(id, connection)
webChannel.sendSrvMsg(this.name, id, {sender, offer})
},
channel => {
channel.connection = connection
channel.peerId = id
resolve(channel)
},
channel => resolve(channel),
id,
webChannel,
id
)
setTimeout(reject, CONNECTION_CREATION_TIMEOUT, 'Timeout')
Expand All @@ -778,10 +799,11 @@
answer => webChannel.sendSrvMsg(this.name, msg.sender,
{sender: webChannel.myId, answer}),
channel => {
webChannel.initChannel(channel, msg.sender)
webChannel.connections.delete(channel.peerId)
},
msg.offer
msg.offer,
webChannel,
msg.sender
)
)
console.log(msg.sender + ' create a NEW CONNECTION')
Expand All @@ -796,31 +818,74 @@
}
}

createConnectionAndOffer (candidateCB, sdpCB, channelCB, key) {
createConnectionAndOffer (candidateCB, sdpCB, channelCB, key, webChannel, id = '') {
let connection = this.initConnection(candidateCB)
let dc = connection.createDataChannel(key)
dc.onopen = () => channelCB(dc)
console.log('NETFLUX: dataChannel created')
dc.onopen = () => {
console.log('NETFLUX: Channel opened')
dc.send('ping')
console.log('SEND PING')
}
window.dc = dc
dc.onmessage = msgEvt => {
if (msgEvt.data === 'pong') {
console.log('PONG Received')
dc.connection = connection
webChannel.initChannel(dc, id)
channelCB(dc)
}
}
dc.onerror = evt => {
console.log('NETFLUX: channel error: ', evt)
}
connection.createOffer(offer => {
connection.setLocalDescription(offer, () => {
sdpCB(connection.localDescription.toJSON())
}, (err) => { throw new Error(`Could not set local description: ${err}`) })
}, (err) => { throw new Error(`Could not create offer: ${err}`) })
}, (err) => {
console.log('NETFLUX: error 1: ', err)
throw new Error(`Could not set local description: ${err}`)
})
}, (err) => {
console.log('NETFLUX: error 2: ', err)
throw new Error(`Could not create offer: ${err}`)
})
return connection
}

createConnectionAndAnswer (candidateCB, sdpCB, channelCB, offer) {
createConnectionAndAnswer (candidateCB, sdpCB, channelCB, offer, webChannel, id = '') {
let connection = this.initConnection(candidateCB)
connection.ondatachannel = e => {
e.channel.connection = connection
e.channel.onopen = () => channelCB(e.channel)
e.channel.onmessage = msgEvt => {
if (msgEvt.data === 'ping') {
console.log('PING Received, send PONG')
e.channel.connection = connection
webChannel.initChannel(e.channel, id)
e.channel.send('pong')
channelCB(e.channel)
}
}
e.channel.onopen = () => {
console.log('NETFLUX: Channel opened')
}
}
console.log('NETFLUX adding offer')
connection.setRemoteDescription(this.createSDP(offer), () => {
connection.createAnswer(answer => {
connection.setLocalDescription(answer, () => {
sdpCB(connection.localDescription.toJSON())
}, (err) => { throw new Error(`Could not set local description: ${err}`) })
}, (err) => { throw new Error(`Could not create answer: ${err}`) })
}, (err) => { throw new Error(`Could not set remote description: ${err}`) })
}, (err) => {
console.log('NETFLUX: error: ', err)
throw new Error(`Could not set local description: ${err}`)
})
}, (err) => {
console.log('NETFLUX: error: ', err)
throw new Error(`Could not create answer: ${err}`)
})
}, (err) => {
console.log('NETFLUX: error: ', err)
throw new Error(`Could not set remote description: ${err}`)
})
return connection
}

Expand Down Expand Up @@ -1063,11 +1128,13 @@
let cBuilder = get(settings.connector, settings)
let key = this.id + this.myId
try {
let data = cBuilder.open(key, channel => {
this.initChannel(channel)
let data = cBuilder.open(this, key, channel => {
//this.initChannel(channel)
let jp = new JoiningPeer(channel.peerId, this.myId)
jp.intermediaryChannel = channel
this.joiningPeers.add(jp)
console.log('send JOIN_INIT his new id: ' + channel.peerId)
console.log('New channel: ' + channel.readyState)
channel.send(this.proxy.msg(JOIN_INIT,
{manager: this.settings.topology,
id: channel.peerId,
Expand All @@ -1080,7 +1147,7 @@
.then(() => {
channel.send(this.proxy.msg(JOIN_FINILIZE))
})
.catch((msg) => {
.catch(msg => {
console.log(`Adding peer ${channel.peerId} failed: ${msg}`)
this.manager.broadcast(this, this.proxy.msg(REMOVE_NEW_MEMBER,
{id: channel.peerId}
Expand Down Expand Up @@ -1117,9 +1184,9 @@
let cBuilder = get(settings.connector, settings)
return new Promise((resolve, reject) => {
cBuilder
.join(key)
.then((channel) => {
this.initChannel(channel)
.join(this, key)
.then(channel => {
//this.initChannel(channel)
console.log('JOIN channel established')
this.onJoin = () => { resolve(this) }
})
Expand Down
Loading

0 comments on commit 16206d3

Please sign in to comment.