Skip to content

Commit

Permalink
Merge pull request #144 from flowbased/improve_webrtc
Browse files Browse the repository at this point in the history
Improve WebRTC signalling
  • Loading branch information
bergie authored Oct 1, 2020
2 parents c0566ff + e2bdf8d commit d9c1572
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 184 deletions.
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ exports.transports = {
base: require('./src/lib/base'),
};
exports.connection = require('./src/helpers/connection');
exports.signaller = require('./src/helpers/signaller');
exports.Signaller = require('./src/helpers/signaller');

exports.getTransport = function getTransport(transport) {
return exports.transports[transport];
Expand Down
2 changes: 1 addition & 1 deletion karma.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module.exports = (config) => {
files: [
'dist/*.js',
'spec/utils/inject.js',
'spec/webrtc.js',
'spec/*.js',
],
browsers: ['ChromeHeadless'],
logLevel: config.LOG_WARN,
Expand Down
1 change: 1 addition & 0 deletions spec/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"baseDir": false,
"chai": false,
"noflo": false,
"uuid": false,
"client": false,
"window": false
}
Expand Down
102 changes: 102 additions & 0 deletions spec/signaller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
describe('Signaller', () => {
const connectSignaller = (signaller, callback) => {
if (signaller.isConnected()) {
callback();
return;
}
signaller.once('error', callback);
signaller.once('connected', () => {
signaller.removeListener('error', callback);
callback();
});
signaller.connect();
};
const disconnectSignaller = (signaller, callback) => {
if (!signaller.isConnected()) {
callback();
return;
}
signaller.once('error', callback);
signaller.once('disconnected', () => {
signaller.removeListener('error', callback);
callback();
});
signaller.disconnect();
};
[
{
name: 'with default server',
browser: true,
},
].forEach((variant) => {
if (noflo.isBrowser() && !variant.browser) {
return;
}

describe(`${variant.name}`, () => {
if (variant.before) {
before(variant.before);
}
if (variant.after) {
after(variant.after);
}
const getSignaller = () => {
if (variant.instantiate) {
return variant.instantiate();
}
return new client.Signaller(uuid());
};
const signaller1 = getSignaller();
const signaller2 = getSignaller();
it('should be able to connect', (done) => {
connectSignaller(signaller1, done);
});
describe('during session', () => {
const roomId = uuid();
before((done) => {
connectSignaller(signaller2, (err) => {
if (err) {
done(err);
return;
}
signaller2.join(roomId);
setTimeout(done, 100);
});
});
after((done) => {
disconnectSignaller(signaller2, done);
});
it('should be able to join a room', (done) => {
signaller2.once('join', (peer) => {
chai.expect(peer.id).to.equal(signaller1.id);
done();
});
signaller1.join(roomId);
});
it('should be able to receive a DM from a peer', (done) => {
signaller1.once('signal', (signal, peer) => {
chai.expect(signal.hello).to.equal('World');
chai.expect(peer.id).to.equal(signaller2.id);
done();
});
signaller2.signal(signaller1.id, {
hello: 'World',
});
});
it('should be able to send a DM to a peer', (done) => {
signaller2.once('signal', (signal, peer) => {
chai.expect(signal.hello).to.equal('NoFlo');
chai.expect(peer.id).to.equal(signaller1.id);
done();
});
signaller1.signal(signaller2.id, {
hello: 'NoFlo',
});
});
it('should be able to disconnect', (done) => {
disconnectSignaller(signaller1, done);
});
});
});
});
});
4 changes: 4 additions & 0 deletions spec/utils/inject.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ if (typeof global !== 'undefined') {
// Node.js injections for Mocha tests
global.chai = require('chai');
global.client = require('../../index');
global.uuid = require('uuid').v4;
global.noflo = require('noflo');
} else {
window.client = require('fbp-protocol-client');
window.uuid = require('uuid').v4;
window.noflo = require('noflo');
}
46 changes: 31 additions & 15 deletions spec/webrtc.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
const noflo = require('noflo');
const Peer = require('simple-peer');
const { v4: uuid } = require('uuid');

const {
EventEmitter,
} = require('events');

const Base = client.getTransport('base');
const WebRtcRuntime = client.getTransport('webrtc');
const Signaller = client.signaller;

class FakeRuntime extends EventEmitter {
constructor(address) {
Expand All @@ -17,13 +14,13 @@ class FakeRuntime extends EventEmitter {
this.signaller = address.split('#')[0];
this.id = address.split('#')[1];
} else {
this.signaller = 'ws://api.flowhub.io/';
this.signaller = 'wss://api.flowhub.io/';
this.id = address;
}

const signaller = new Signaller(this.signaller, uuid());
const signaller = new client.Signaller(uuid(), 'runtime', this.signaller);

this.channel = null;
this.peers = {};
const options = {
channelName: this.id,
};
Expand All @@ -33,12 +30,18 @@ class FakeRuntime extends EventEmitter {
}
signaller.connect();
signaller.once('connected', () => {
signaller.announce(this.id);
this.peer = new Peer(options);
this.peer.on('signal', (data) => {
signaller.announce(this.id, data);
signaller.join(this.id);
});
signaller.on('join', (member) => {
if (this.peers[member.id]) {
return;
}
signaller.joinReply(member.id, this.id);
const peer = new Peer(options);
peer.on('signal', (data) => {
signaller.signal(member.id, data);
});
this.peer.on('data', (data) => {
peer.on('data', (data) => {
const msg = JSON.parse(data);
this.emit('message', msg);
if ((msg.protocol === 'runtime') && (msg.command === 'getruntime')) {
Expand All @@ -50,12 +53,23 @@ class FakeRuntime extends EventEmitter {
});
}
});
peer.on('close', () => {
delete this.peers[member.id];
});
this.peers[member.id] = peer;
});
signaller.on('signal', (data) => {
if (!this.peer && !this.peer.destroyed) {
signaller.on('signal', (data, member) => {
if (!this.peers[member.id]) {
// We don't know about this peer, ignore
console.log(`Unknown peer ${member.id}`);
return;
}
this.peer.signal(data);
if (this.peers[member.id].destroyed) {
console.log(`Destroyed peer ${member.id}`);
// Disconnected
return;
}
this.peers[member.id].signal(data);
});
}

Expand All @@ -66,7 +80,9 @@ class FakeRuntime extends EventEmitter {
payload,
};
const m = JSON.stringify(msg);
this.peer.send(m);
Object.keys(this.peers).forEach((peerId) => {
this.peers[peerId].send(m);
});
}
}

Expand Down
54 changes: 1 addition & 53 deletions src/helpers/platform.js
Original file line number Diff line number Diff line change
@@ -1,63 +1,11 @@
const debug = require('debug')('fbp-protocol-client:platform');

const {
EventEmitter,
} = require('events');

const isBrowser = () => !((typeof (process) !== 'undefined') && process.execPath && (process.execPath.indexOf('node') !== -1));

// Simple compatibility layer between node.js WebSocket client and native browser APIs
// Respects events: open, close, message, error
// Note: no data is passed with open and close events
class NodeWebSocketClient extends EventEmitter {
constructor(address, protocol) {
super();
const WebSocketClient = require('websocket').client;
this.client = new WebSocketClient(); // the real client
this.connection = null;

this.client.on('connectFailed', (error) => this.emit('error', error));
this.client.on('connect', (connection) => {
if (this.connection) { debug('WARNING: multiple connections for one NodeWebSocketClient'); }
this.connection = connection;
connection.on('error', (error) => {
this.connection = null;
this.emit('error', error);
});
connection.on('close', () => {
this.connection = null;
this.emit('close');
});
connection.on('message', (message) => {
this.emit('message', {
...message,
data: message.utf8Data,
});
});

this.emit('open');
});

this.client.connect(address, protocol);
}

addEventListener(event, listener) {
this.on(event, listener);
}

close() {
if (!this.connection) { return; }
this.connection.close();
this.connection = null;
}

send(msg) {
this.connection.sendUTF(msg);
}
}

module.exports = {
isBrowser,
EventEmitter,
WebSocket: isBrowser() ? window.WebSocket : NodeWebSocketClient,
WebSocket: isBrowser() ? window.WebSocket : require('websocket').w3cwebsocket,
};
Loading

0 comments on commit d9c1572

Please sign in to comment.