diff --git a/.gitignore b/.gitignore index db0326e..0175673 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules out +tmp diff --git a/examples/dumpAmf.js b/examples/dumpAmf.js index 8cba5a9..42f5940 100644 --- a/examples/dumpAmf.js +++ b/examples/dumpAmf.js @@ -1,37 +1,109 @@ +"use strict"; + require('colors'); -var net = require('net'); var util = require('util'); +var net = require('net'); var mtrude = require('mtrude'); -var ChunkStream = mtrude.rtmp.ChunkStream; -var MessageStream = mtrude.rtmp.MessageStream; -var AMF = mtrude.rtmp.AMF; +var rtmp = mtrude.rtmp; +var ChunkStream = rtmp.ChunkStream; +var MessageStream = rtmp.MessageStream; +var AMF = rtmp.AMF; +var asSocket = mtrude.asSocket; +var dumpTools = require('./dumpTools'); + +function main() { + var optimist = require('optimist') + .usage('Usage: $0 [--debug] [--messages] [--nocolor] [--ignore] [in [out]]') + .boolean('debug') + .boolean('nocolor') + .boolean('messages') + .boolean('ignore') + .boolean('help') + .alias('h', 'help') + .describe('debug', 'Set MessageStream.DBG = true') + .describe('messages', 'Also dump messages') + .describe('nocolor', 'Don\'t use colors') + .describe('ignore', 'Ignore errors') + ; + var argv = optimist.argv; + + if (argv.help || argv._.length > 2) { + var messageStream = new MessageStream(chunkStream); + if (argv.messages) require('./dumpMessageStream')(messageStream); + dumpAmf(messageStream); + optimist.showHelp(); + return; + } + + if (argv.nocolor) dumpTools.dontColor(); + + if (argv.debug) MessageStream.DBG = true; + + if (argv._.length == 0) { + var server = net.createServer(); + server.on('connection', function(socket) { + console.log('CONN : ' + 'Connection from %s'.grey, socket.remoteAddress); + var chunkStream = new ChunkStream(); + }); + server.listen(1935); + console.log('LISTN: ' + 'Listening on port 1935'.grey); + } + else { + var iFile = argv._[0]; + var time36 = new Date().getTime().toString(36) + var oFile = argv._[1] || 'out-' + time36 + '.raw'; + var chunkStream = new ChunkStream(asSocket(iFile, oFile + , function() { console.log('FILE : ' + 'Reading %s'.grey, iFile); } + , function() { console.log('FILE : ' + 'Writing %s'.grey, oFile); } + )); + } + + var chain = new mtrude.BufferChain(); + var messageStream = new MessageStream(chunkStream); + if (argv.messages) require('./dumpMessageStream')(messageStream); + dumpAmf(messageStream, argv.ignore); +} + +function dumpZ(buffer, messageStream) { + return util.inspect(AMF.deserializeZ(buffer, messageStream)); +} -var server = net.createServer(); -server.on('connection', function(socket) { - console.log('Connection from %s', socket.remoteAddress); - var messageStream = new MessageStream(new ChunkStream(socket)); +function dump3(buffer, messageStream) { + return util.inspect(AMF.deserialize3(buffer, messageStream)); +} +function dumpAmf(messageStream, ignoreErrors) { messageStream.on('error', function(msg) { - console.log('ERROR %s'.red, msg); - server.close(); + console.log('ERROR: %s', msg.red); + if (!ignoreErrors) messageStream.close(); }); messageStream.on('warn', function(msg) { - console.log('WARN %s'.magenta, msg); + console.log('WARN : %s', msg.magenta); + if (!ignoreErrors) messageStream.close(); }); messageStream.on('end', function(graceful) { - console.log('END :', 'Ended', (graceful ? '' : 'not ') + 'gracefully'); + var msg = 'dumpAmf MessageStream ' + (graceful ? '' : 'not ') + + 'gracefully ended'; + console.log('END :', graceful ? msg.green : msg.red); }); messageStream.on('message', function(message) { - console.log( - 'MESSAGE: %s:%s typeid=%s data=%s'.blue, message.csid, - message.msid, message.typeid, message.data.length); - if (message.typeid == 20) { - console.log('INVOKE'.green, util.inspect(AMF.deserialize(message.data))); + switch (message.typeid) { + case rtmp.types.INVOKE: + console.log('INVOK: %s', dumpZ(message.data, messageStream).magenta); + break; + case rtmp.types.SO: + console.log('SHOBJ: %s', dumpZ(message.data, messageStream).blue); + break; + case rtmp.types.NOTIFY: + console.log('NOTIF: %s', dumpZ(message.data, messageStream).blue); + break; + case rtmp.types.FLEX: + console.log('FLEX : %s', dump3(message.data, messageStream).cyan); + break; } }); -}); +} -server.listen(1935); -console.log(); -console.log('Listening on port 1935'); +if (require.main === module) main(); +exports.dumpAmf = dumpAmf; diff --git a/lib/BufferChain.js b/lib/BufferChain.js index e742d6e..730274b 100644 --- a/lib/BufferChain.js +++ b/lib/BufferChain.js @@ -31,7 +31,8 @@ push: function(buffer) { return true; } else if (buffer instanceof BufferChain) { - for (var i = 0; i < buffer.length; i++) this.push(buffer.chain[i]); + for (var i = 0; i < buffer.length; i++) + this.push(buffer.chain[i]); return true; } else { @@ -41,16 +42,30 @@ push: function(buffer) { _index: function(ptr) { var index = 0; - while (ptr >= this.chain[index].length) ptr -= this.chain[index++].length; + while (ptr >= this.chain[index].length) + ptr -= this.chain[index++].length; return [index, ptr]; }, toString: function(encoding, start, end) { var subChain = this.slice(start, end); - if (subChain.chain.length == 0) return ''; - if (subChain.chain.length == 1) return subChain.chain[0].toString(encoding); + if (subChain.chain.length == 0) + return ''; + + if (subChain.chain.length == 1) + return subChain.chain[0].toString(encoding); + + if (subChain.chain.length == 2) { + // Either copy both chains or handle utf8 multibyte code points correctly + // at the boundary between the first and the second buffer + // I choose copying. + var buffer = new Buffer(subChain.length); + subChain.chain[0].copy(buffer); + subChain.chain[1].copy(buffer, subChain.chain[0].length); + return buffer.toString(encoding); + } - throw new Error('toString() over several buffers not implemented'); + throw new Error('toString() over more than two buffers not implemented'); }, readDoubleBE: function(ptr) { @@ -77,14 +92,14 @@ readUInt32LE: function(ptr) { var index = this._index(ptr); return this.chain[index[0]].readUInt32LE(index[1]); - // todo cross-buffer read + // todo cross-buffer read, but don't worry node has an assert above }, readUInt32BE: function(ptr) { var index = this._index(ptr); return this.chain[index[0]].readUInt32BE(index[1]); - // todo cross-buffer read + // todo cross-buffer read, but don't worry, node has an assert above }, consumeInt32BE: function() { @@ -107,6 +122,8 @@ consumeUInt24BE: function() { readUInt16BE: function(ptr) { var index = this._index(ptr); return this.chain[index[0]].readUInt16BE(index[1]); + + // todo cross-buffer read, but don't worry, node has an assert above }, consumeUInt16BE: function() { @@ -144,6 +161,8 @@ consume: function(length) { length, 'rest', oldRest, '->', this.rest, 'ptr', oldPtr, '->', this.ptr); }, +// Todo TEST THIS CAREFULLY maybe the second loop does bufferLength wrong +// if the next chain has differing length and other corner cases. slice: function(start, end) { if (start < 0 || start >= this._length) throw new Error( 'start out of bounds ' + start + ', 0 to ' + this.length + ' exclusive'); @@ -183,7 +202,7 @@ slice: function(start, end) { for (var i = i0; i < i1 - 1; i++) chain.push(this.chain[i]); - chain.push(this.chain[i].slice(0, j1)); + chain.push(this.chain[i + 1].slice(0, j1)); return chain; }, diff --git a/lib/rtmp/AMF.js b/lib/rtmp/AMF.js index 248686d..b1affdf 100644 --- a/lib/rtmp/AMF.js +++ b/lib/rtmp/AMF.js @@ -5,22 +5,23 @@ var BufferChain = mtrude.BufferChain; var AMF = module.exports = {}; -AMF.deserialize = function(data) { +AMF.deserializeZ = function(data, emitter) { if (data.ptr == null) data.ptr = 0; - //if (data.amfReferences == null) data.amfReferences = []; var values = []; try { - while (data.ptr < data.length) values.push(amfZeroAny(data)); + while (data.ptr < data.length) values.push(amfZAny(data)); } catch (err) { - console.log('deserialize(): ' + err); + var message = 'AMF.deserialize(): ' + err; + if (emitter && emitter.emit) emitter.emit('error', message); + else console.log('ERROR: ' + message); } return values; } -AMF.amfZeroMarkers = { +AMF.amfZMarkers = { NUMBER: 0x00, BOOLEAN: 0x01, STRING: 0x02, @@ -41,65 +42,194 @@ AMF.amfZeroMarkers = { AVMPLUS: 0x11, // switch to AMF3 } -function amfZeroAny(data) { - var a0 = AMF.amfZeroMarkers; +function amfZAny(data) { + var a0 = AMF.amfZMarkers; var marker = data.readUInt8(data.ptr++); var ptr = data.ptr; switch (marker) { case a0.NUMBER: data.ptr += 8; return data.readDoubleBE(ptr); case a0.BOOLEAN: data.ptr++; return data.readUInt8(ptr) != 0; - case a0.STRING: return amfZeroString(data); - case a0.OBJECT: return amfZeroObject(data); + case a0.STRING: return amfZString(data); + case a0.OBJECT: return amfZObject(data); case a0.NULL: return null; case a0.UNDEFINED: return void 0; - case a0.ECMA_ARRAY: return amfZeroEcmaArray(data); - case a0.STRICT_ARRAY: return amfZeroStrictArray(data); - case a0.DATE: return amfZeroDate(data); - default: throw new Error('Unsupported marker 0x%s', marker.toString(16)); + case a0.ECMA_ARRAY: return amfZEcmaArray(data); + case a0.STRICT_ARRAY: return amfZStrictArray(data); + case a0.DATE: return amfZDate(data); + default: throw new Error('Unsupported AMF0 marker 0x' + + marker.toString(16)); } } -function amfZeroString(data) { +function amfZString(data) { var length = data.readUInt16BE(data.ptr); var ptr = data.ptr += 2; data.ptr += length; return data.toString('utf8', ptr, ptr + length); } -function amfZeroObject(data) { +function amfZObject(data) { var object = {}; while (true) { - var key = amfZeroString(data); + var key = amfZString(data); if (key == '') { var marker = data.readUInt8(data.ptr++); - if (marker != AMF.amfZeroMarkers.OBJECT_END) throw new Error( + if (marker != AMF.amfZMarkers.OBJECT_END) throw new Error( 'Bad end-of-object marker 0x%s != 0x9', marker.toString(16)); break; } - object[key] = amfZeroAny(data); + object[key] = amfZAny(data); } return object; } -function amfZeroEcmaArray(data) { +function amfZEcmaArray(data) { data.ptr += 4; /* ignore length */ - return amfZeroObject(data); + return amfZObject(data); } -function amfZeroStrictArray(data) { +function amfZStrictArray(data) { var array = [] var count = data.readUInt32BE(data.ptr); data.ptr += 4; - for (var i = 0; i < count; i++) array.push(amfZeroAny(data)); + for (var i = 0; i < count; i++) array.push(amfZAny(data)); return array; } -function amfZeroDate(data) { +function amfZDate(data) { var date = new Date(); date.setTime = data.readDoubleBE(data.ptr); data.ptr += 10; // including ignored time zone } + + +AMF.deserialize3 = function(data, emitter) { + if (data.ptr == null) data.ptr = 0; + + var type = data.readUInt8(data.ptr++); + if (type == 0) return AMF.deserializeZ(data, emitter); + try { + if (type == 3) return amf3Any(data); + } + catch (err) { + var message = 'AMF.deserialize3(): ' + err; + if (emitter && emitter.emit) emitter.emit('error', message); + else console.log('ERROR: ' + message); + } + throw new Error('Unknown AMF type #' + type); +} + + +AMF.amf3Markers = { + UNDEFINED: 0x00, + NULL: 0x01, + FALSE: 0x02, + TRUE: 0x03, + INTEGER: 0x04, + DOUBLE: 0x05, + STRING: 0x06, + XML_DOC: 0x07, + DATE: 0x08, + ARRAY: 0x09, + OBJECT: 0x0a, + XML: 0x0b, + BYTEARRAY: 0x0c, +}; + +function amf3Any(data) { + var a3 = AMF.amf3Markers; + var marker = data.readUInt8(data.ptr++); + var ptr = data.ptr; + + switch (marker) { + case a3.UNDEFINED: return void 0; + case a3.NULL: return null; + case a3.FALSE: return false; + case a3.TRUE: return true; + case a3.INTEGER: return amf3Integer(data); + case a3.DOUBLE: return data.ptr += 8; return data.readDoubleBE(ptr); + case a3.STRING: return amf3String(data); + case a3.DATE: return amf3Date(data); + case a3.ARRAY: return amf3Array(data); + case a3.OBJECT: return amf3Object(data); + default: throw new Error('Unsupported AMF3 marker 0x' + + marker.toString(16)); + } +} + +// Read unsigned 29-bit Integer Encoding (AMF 3 Specification 1.3.1) +function amf3Integer(data) { + var result = 0; + for (var i = 0; i < 4; i++) { + var x = data.readUInt8(data.ptr++); + if (!(x & 0x80)) return result << 8 | x; + result = result << 7 | (i == 3 ? x : x & 0x7f); + } + return result; +} + +// Read string or reference (AMF 3 Specification 1.3.2) +function amf3String(data) { + var length = amf3Integer(data); + if (length & 0x01) { + length = length >> 2; + data.ptr += length; + return data.toString('utf8', ptr, ptr + length); + } + else { + throw new Error('String references not supported'); + } +} + +function amf3Date(data) { + var reference = amf3Integer(data); + if (reference & 0x01) { + var date = new Date(); + date.setTime = data.readDoubleBE(data.ptr); + data.ptr += 8; + return date; + } + else { + throw new Error('Date referenes not supported'); + } +} + +function amf3Array(data) { + var denseLength = amf3Integer(data); + if (denseLength & 0x01) { + var array = []; + while (true) { + var key = amf3String(data); + if (key == '') break; + array[key] = amf3Any(data); + } + for (var i = 0; i < denseLength; i++) array[i] = amfAny(data); + return array; + } + else { + throw new Error('Array references not supported'); + } +} + +function amf3Object(data) { + var reference = amf3Integer(data); + switch (reference & 0x07) { + case 0x00: case 0x02: case 0x04: case 0x06: + throw new Error('Object references not supported'); + case 0x01: case 0x05: + throw new Error('Trait references not supported'); + case 0x03: + throw new Error('Traits not supported'); + case 0x07: + throw new Error('Externalizable classes not supported'); + } +} + + + + + diff --git a/test/TestMessageStream.js b/test/TestMessageStream.js index 444a48b..b7e00eb 100644 --- a/test/TestMessageStream.js +++ b/test/TestMessageStream.js @@ -33,7 +33,7 @@ module.exports = { assert.equal(message.data.length, [235][messageIndex]); if (messageIndex == 0) { - var invokeParameters = AMF.deserialize(message.data); + var invokeParameters = AMF.deserializeZ(message.data); assert.equal(invokeParameters.length, 3); assert.equal(invokeParameters[0], 'connect'); assert.equal(invokeParameters[1], 1);