Skip to content

Commit

Permalink
fix: migrate to streamx
Browse files Browse the repository at this point in the history
  • Loading branch information
ThaUnknown committed May 26, 2023
1 parent f1a492d commit 64b2288
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 70 deletions.
116 changes: 53 additions & 63 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const debug = require('debug')('simple-peer')
const getBrowserRTC = require('get-browser-rtc')
const randombytes = require('randombytes')
const stream = require('readable-stream')
const { Duplex } = require('streamx')
const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported
const errCode = require('err-code')
const { Buffer } = require('buffer')
Expand All @@ -25,14 +25,16 @@ function warn (message) {
* Duplex stream.
* @param {Object} opts
*/
class Peer extends stream.Duplex {
class Peer extends Duplex {
constructor (opts) {
opts = Object.assign({
allowHalfOpen: false
}, opts)

super(opts)

this.__objectMode = !!opts.objectMode // streamx is objectMode by default, so implement readable's fuctionality

this._id = randombytes(4).toString('hex').slice(0, 7)
this._debug('new peer %o', opts)

Expand All @@ -52,8 +54,7 @@ class Peer extends stream.Duplex {
this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false
this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT

this.destroyed = false
this.destroying = false
this._destroying = false
this._connected = false

this.remoteAddress = undefined
Expand Down Expand Up @@ -100,7 +101,7 @@ class Peer extends stream.Duplex {
try {
this._pc = new (this._wrtc.RTCPeerConnection)(this.config)
} catch (err) {
this.destroy(errCode(err, 'ERR_PC_CONSTRUCTOR'))
this.__destroy(errCode(err, 'ERR_PC_CONSTRUCTOR'))
return
}

Expand All @@ -127,7 +128,7 @@ class Peer extends stream.Duplex {
// HACK: Fix for odd Firefox behavior, see: https://github.com/feross/simple-peer/pull/783
if (typeof this._pc.peerIdentity === 'object') {
this._pc.peerIdentity.catch(err => {
this.destroy(errCode(err, 'ERR_PC_PEER_IDENTITY'))
this.__destroy(errCode(err, 'ERR_PC_PEER_IDENTITY'))
})
}

Expand Down Expand Up @@ -180,7 +181,7 @@ class Peer extends stream.Duplex {
}

signal (data) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot signal after peer is destroyed'), 'ERR_DESTROYED')
if (typeof data === 'string') {
try {
Expand Down Expand Up @@ -219,11 +220,11 @@ class Peer extends stream.Duplex {
if (this._pc.remoteDescription.type === 'offer') this._createAnswer()
})
.catch(err => {
this.destroy(errCode(err, 'ERR_SET_REMOTE_DESCRIPTION'))
this.__destroy(errCode(err, 'ERR_SET_REMOTE_DESCRIPTION'))
})
}
if (!data.sdp && !data.candidate && !data.renegotiate && !data.transceiverRequest) {
this.destroy(errCode(new Error('signal() called with invalid signal data'), 'ERR_SIGNALING'))
this.__destroy(errCode(new Error('signal() called with invalid signal data'), 'ERR_SIGNALING'))
}
}

Expand All @@ -234,7 +235,7 @@ class Peer extends stream.Duplex {
if (!iceCandidateObj.address || iceCandidateObj.address.endsWith('.local')) {
warn('Ignoring unsupported ICE candidate.')
} else {
this.destroy(errCode(err, 'ERR_ADD_ICE_CANDIDATE'))
this.__destroy(errCode(err, 'ERR_ADD_ICE_CANDIDATE'))
}
})
}
Expand All @@ -244,7 +245,7 @@ class Peer extends stream.Duplex {
* @param {ArrayBufferView|ArrayBuffer|Buffer|string|Blob} chunk
*/
send (chunk) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot send after peer is destroyed'), 'ERR_DESTROYED')
this._channel.send(chunk)
}
Expand All @@ -255,7 +256,7 @@ class Peer extends stream.Duplex {
* @param {Object} init
*/
addTransceiver (kind, init) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTransceiver after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTransceiver()')

Expand All @@ -264,7 +265,7 @@ class Peer extends stream.Duplex {
this._pc.addTransceiver(kind, init)
this._needsNegotiation()
} catch (err) {
this.destroy(errCode(err, 'ERR_ADD_TRANSCEIVER'))
this.__destroy(errCode(err, 'ERR_ADD_TRANSCEIVER'))
}
} else {
this.emit('signal', { // request initiator to renegotiate
Expand All @@ -279,7 +280,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addStream (stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addStream()')

Expand All @@ -294,7 +295,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addTrack (track, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTrack()')

Expand All @@ -319,7 +320,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
replaceTrack (oldTrack, newTrack, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot replaceTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('replaceTrack()')

Expand All @@ -333,7 +334,7 @@ class Peer extends stream.Duplex {
if (sender.replaceTrack != null) {
sender.replaceTrack(newTrack)
} else {
this.destroy(errCode(new Error('replaceTrack is not supported in this browser'), 'ERR_UNSUPPORTED_REPLACETRACK'))
this.__destroy(errCode(new Error('replaceTrack is not supported in this browser'), 'ERR_UNSUPPORTED_REPLACETRACK'))
}
}

Expand All @@ -343,7 +344,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeTrack (track, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSender()')

Expand All @@ -359,7 +360,7 @@ class Peer extends stream.Duplex {
if (err.name === 'NS_ERROR_UNEXPECTED') {
this._sendersAwaitingStable.push(sender) // HACK: Firefox must wait until (signalingState === stable) https://bugzilla.mozilla.org/show_bug.cgi?id=1133874
} else {
this.destroy(errCode(err, 'ERR_REMOVE_TRACK'))
this.__destroy(errCode(err, 'ERR_REMOVE_TRACK'))
}
}
this._needsNegotiation()
Expand All @@ -370,7 +371,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeStream (stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSenders()')

Expand All @@ -396,7 +397,7 @@ class Peer extends stream.Duplex {
}

negotiate () {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot negotiate after peer is destroyed'), 'ERR_DESTROYED')

if (this.initiator) {
Expand Down Expand Up @@ -424,30 +425,23 @@ class Peer extends stream.Duplex {
this._isNegotiating = true
}

// TODO: Delete this method once readable-stream is updated to contain a default
// implementation of destroy() that automatically calls _destroy()
// See: https://github.com/nodejs/readable-stream/issues/283
destroy (err) {
this._destroy(err, () => {})
_final (cb) {
if (!this._readableState.ended) this.push(null)
cb(null)
}

_destroy (err, cb) {
if (this.destroyed || this.destroying) return
this.destroying = true

this._debug('destroying (error: %s)', err && (err.message || err))

queueMicrotask(() => { // allow events concurrent with the call to _destroy() to fire (see #692)
this.destroyed = true
this.destroying = false

this._debug('destroy (error: %s)', err && (err.message || err))
__destroy (err) {
this.end()
this._destroy(() => {}, err)
}

this.readable = this.writable = false
_destroy (cb, err) {
if (this.destroyed || this._destroying) return
this._destroying = true

if (!this._readableState.ended) this.push(null)
if (!this._writableState.finished) this.end()
this._debug('destroying (error: %s)', err && (err.message || err))

setTimeout(() => { // allow events concurrent with the call to _destroy() to fire (see #692)
this._connected = false
this._pcReady = false
this._channelReady = false
Expand Down Expand Up @@ -492,19 +486,17 @@ class Peer extends stream.Duplex {
}
this._pc = null
this._channel = null

if (err) this.emit('error', err)
this.emit('close')
cb()
})
}, 0)
}

_setupData (event) {
if (!event.channel) {
// In some situations `pc.createDataChannel()` returns `undefined` (in wrtc),
// which is invalid behavior. Handle it gracefully.
// See: https://github.com/feross/simple-peer/issues/163
return this.destroy(errCode(new Error('Data channel event is missing `channel` property'), 'ERR_DATA_CHANNEL'))
return this.__destroy(errCode(new Error('Data channel event is missing `channel` property'), 'ERR_DATA_CHANNEL'))
}

this._channel = event.channel
Expand Down Expand Up @@ -532,7 +524,7 @@ class Peer extends stream.Duplex {
const err = event.error instanceof Error
? event.error
: new Error(`Datachannel error: ${event.message} ${event.filename}:${event.lineno}:${event.colno}`)
this.destroy(errCode(err, 'ERR_DATA_CHANNEL'))
this.__destroy(errCode(err, 'ERR_DATA_CHANNEL'))
}

// HACK: Chrome will sometimes get stuck in readyState "closing", let's check for this condition
Expand All @@ -548,16 +540,14 @@ class Peer extends stream.Duplex {
}, CHANNEL_CLOSING_TIMEOUT)
}

_read () {}

_write (chunk, encoding, cb) {
_write (chunk, cb) {
if (this.destroyed) return cb(errCode(new Error('cannot write after peer is destroyed'), 'ERR_DATA_CHANNEL'))

if (this._connected) {
try {
this.send(chunk)
} catch (err) {
return this.destroy(errCode(err, 'ERR_DATA_CHANNEL'))
return this.__destroy(errCode(err, 'ERR_DATA_CHANNEL'))
}
if (this._channel.bufferedAmount > MAX_BUFFERED_AMOUNT) {
this._debug('start backpressure: bufferedAmount %d', this._channel.bufferedAmount)
Expand All @@ -580,7 +570,7 @@ class Peer extends stream.Duplex {
// Wait a bit before destroying so the socket flushes.
// TODO: is there a more reliable way to accomplish this?
const destroySoon = () => {
setTimeout(() => this.destroy(), 1000)
setTimeout(() => this.__destroy(), 1000)
}

if (this._connected) {
Expand Down Expand Up @@ -631,15 +621,15 @@ class Peer extends stream.Duplex {
}

const onError = err => {
this.destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
this.__destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
}

this._pc.setLocalDescription(offer)
.then(onSuccess)
.catch(onError)
})
.catch(err => {
this.destroy(errCode(err, 'ERR_CREATE_OFFER'))
this.__destroy(errCode(err, 'ERR_CREATE_OFFER'))
})
}

Expand Down Expand Up @@ -681,22 +671,22 @@ class Peer extends stream.Duplex {
}

const onError = err => {
this.destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
this.__destroy(errCode(err, 'ERR_SET_LOCAL_DESCRIPTION'))
}

this._pc.setLocalDescription(answer)
.then(onSuccess)
.catch(onError)
})
.catch(err => {
this.destroy(errCode(err, 'ERR_CREATE_ANSWER'))
this.__destroy(errCode(err, 'ERR_CREATE_ANSWER'))
})
}

_onConnectionStateChange () {
if (this.destroyed) return
if (this.destroyed || this._destroying) return
if (this._pc.connectionState === 'failed') {
this.destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE'))
this.__destroy(errCode(new Error('Connection failed.'), 'ERR_CONNECTION_FAILURE'))
}
}

Expand All @@ -717,10 +707,10 @@ class Peer extends stream.Duplex {
this._maybeReady()
}
if (iceConnectionState === 'failed') {
this.destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE'))
this.__destroy(errCode(new Error('Ice connection failed.'), 'ERR_ICE_CONNECTION_FAILURE'))
}
if (iceConnectionState === 'closed') {
this.destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED'))
this.__destroy(errCode(new Error('Ice connection closed.'), 'ERR_ICE_CONNECTION_CLOSED'))
}
}

Expand Down Expand Up @@ -781,10 +771,10 @@ class Peer extends stream.Duplex {

// HACK: We can't rely on order here, for details see https://github.com/js-platform/node-webrtc/issues/339
const findCandidatePair = () => {
if (this.destroyed) return
if (this.destroyed || this._destroying) return

this.getStats((err, items) => {
if (this.destroyed) return
if (this.destroyed || this._destroying) return

// Treat getStats error as non-fatal. It's not essential.
if (err) items = []
Expand Down Expand Up @@ -889,7 +879,7 @@ class Peer extends stream.Duplex {
try {
this.send(this._chunk)
} catch (err) {
return this.destroy(errCode(err, 'ERR_DATA_CHANNEL'))
return this.__destroy(errCode(err, 'ERR_DATA_CHANNEL'))
}
this._chunk = null
this._debug('sent chunk from "write before connect"')
Expand Down Expand Up @@ -972,7 +962,7 @@ class Peer extends stream.Duplex {
_onChannelMessage (event) {
if (this.destroyed) return
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
if (data instanceof ArrayBuffer || this.__objectMode === false) data = Buffer.from(data)
this.push(data)
}

Expand All @@ -994,7 +984,7 @@ class Peer extends stream.Duplex {
_onChannelClose () {
if (this.destroyed) return
this._debug('on channel close')
this.destroy()
this.__destroy()
}

_onTrack (event) {
Expand Down
Loading

0 comments on commit 64b2288

Please sign in to comment.