diff --git a/examples/ChunkStreamDumper.js b/examples/ChunkStreamDumper.js deleted file mode 100644 index 85ed401..0000000 --- a/examples/ChunkStreamDumper.js +++ /dev/null @@ -1,101 +0,0 @@ -"use strict"; - -require('colors'); -var net = require('net'); -var mtrude = require('mtrude'); -var ChunkStream = mtrude.rtmp.ChunkStream; -var asSocket = mtrude.asSocket; - -console.log('' - + 'NOTE! ChunkStream is not complete without MessageStream and might get\n' - + 'stuck because of missing coordination with peer. Dump format:\n' - + 'CHUNK: %s %s %s %s %s:%s %s:%s,%s\n', - 'data-1st-8-bytes'.blue, 'ascii-8c'.yellow, - 'tstamp'.blue, 'ti'.green, 'msg-id'.blue, 'chs-id'.blue, - 'msglen'.magenta, 'chklen'.blue, 'rest'.blue -); - -if (process.argv[2] == '--debug') { - ChunkStream.DBG = true; - process.argv.splice(2, 1); -} - -if (process.argv.length == 2) { - var server = net.createServer(); - server.on('connection', function(socket) { - console.log('Connection from %s', socket.remoteAddress); - dumpChunks(socket); - }); - server.listen(1935); - console.log('Listening on port 1935'); -} -else { - mockedChunkStream(); -} - -function mockedChunkStream() { - var inFile = process.argv[2]; - var outFile = process.argv[3] || - 'out-' + new Date().getTime().toString(36) + '.raw'; - dumpChunks(asSocket(inFile, outFile - , function() { console.log('reading file ' + inFile); } - , function() { console.log('writing file ' + outFile); } - )); -} - -function dumpChunks(socket) { - var chunkStream = new ChunkStream(socket); - - chunkStream.on('error', function(message) { - console.log(message); - chunkStream.close(); - }); - chunkStream.on('end', function(graceful) { - var msg = 'END : Ended ' + (graceful ? '' : 'not ') + 'gracefully'; - console.log(graceful ? msg.green : msg.red); - }); - chunkStream.on('handshake', function() { - console.log('HANDS: %s'.green, 'handshake completed successfully'); - }); - chunkStream.on('warn', function(message) { - console.log('WARN : %s'.red, message); - }); - chunkStream.on('chunk', function(chunk) { - console.log( - 'CHUNK: %s %s %s %s %s:%s %s:%s,%s', - dump8(chunk.data).blue, ascii8(chunk.data), - hex6(chunk.timestamp).blue, hex2(chunk.typeid).green, - hex6(chunk.csid).blue, hex6(chunk.msid).blue, - chunk.length.toString().magenta, chunk.data.length.toString().blue, - chunk.rest.toString().blue); - if (chunk.typeid == 1) { - var chunkSize = chunk.data.readUInt32LE(0); - chunkStream.warn('ChunkStreamDumper setting chunk size to ' + chunkSize); - chunkStream.chunkSize = chunkSize; - } - }); -} - -function dump8(data) { - function b(i) { return i < data.length ? hex2(data.readUInt8(i)) : ' '; } - - return b(0) + b(1) + b(2) + b(3) + b(4) + b(5) + b(6) + b(7); -} - -function hex2(byte) { - function hex1(nybble) { return "0123456789abcdef"[nybble & 0xf]; } - return hex1(byte >> 4) + hex1(byte); -} - -function hex6(int24) { - return hex2(int24 >> 16) + hex2(int24 >> 8) + hex2(int24); -} - -function ascii8(data) { - function asChar(b) { return String.fromCharCode(b); } - function ascii(b) { return b > 31 && b < 128 ? asChar(b).yellow : '·'.black; } - function b(i) { return i < data.length ? ascii(data.readUInt8(i)) : '·'.white; } - - return b(0) + b(1) + b(2) + b(3) + b(4) + b(5) + b(6) + b(7); -} - diff --git a/examples/DumpTools.js b/examples/DumpTools.js new file mode 100644 index 0000000..a2e7a63 --- /dev/null +++ b/examples/DumpTools.js @@ -0,0 +1,25 @@ +"use strict"; + +var dump8 = exports.dump8 = function(data) { + function b(i) { return i < data.length ? hex2(data.readUInt8(i)) : ' '; } + + return b(0) + b(1) + b(2) + b(3) + ' ' + b(4) + b(5) + b(6) + b(7); +} + +var hex2 = exports.hex2 = function(byte) { + function hex1(nybble) { return "0123456789abcdef"[nybble & 0xf]; } + return hex1(byte >> 4) + hex1(byte); +} + +var hex6 = exports.hex6 = function(int24) { + return hex2(int24 >> 16) + hex2(int24 >> 8) + hex2(int24); +} + +var ascii8 = exports.ascii8 = function(data) { + function asChar(b) { return String.fromCharCode(b); } + function ascii(b) { return b > 31 && b < 128 ? asChar(b).yellow : '·'.black; } + function b(i) { return i < data.length ? ascii(data.readUInt8(i)) : '·'.white; } + + return b(0) + b(1) + b(2) + b(3) + ' ' + b(4) + b(5) + b(6) + b(7); +} + diff --git a/examples/MessageStreamDumper.js b/examples/MessageStreamDumper.js deleted file mode 100644 index d3c86ce..0000000 --- a/examples/MessageStreamDumper.js +++ /dev/null @@ -1,55 +0,0 @@ -require('colors'); -var net = require('net'); -var mtrude = require('mtrude'); -var ChunkStream = mtrude.rtmp.ChunkStream; -var MessageStream = mtrude.rtmp.MessageStream; -var asSocket = mtrude.asSocket; - -console.log(); - -if (process.argv[2] == '--debug') { - MessageStream.DBG = true; - process.argv.splice(2, 1); -} - -if (process.argv.length == 2) { - var server = net.createServer(); - server.on('connection', function(socket) { - console.log('Connection from %s', socket.remoteAddress); - dumpMessages(socket); - }); - server.listen(1935); - console.log('Listening on port 1935'); -} -else { - mockedMessageStream(); -} - -function mockedMessageStream() { - var inFile = process.argv[2]; - var outFile = process.argv[3] || - 'out-' + new Date().getTime().toString(36) + '.raw'; - dumpMessages(asSocket(inFile, outFile - , function() { console.log('reading file ' + inFile); } - , function() { console.log('writing file ' + outFile); } - )); -} - - -function dumpMessages(socket) { - var messageStream = new MessageStream(new ChunkStream(socket)); - - messageStream.on('error', function(exception) { - console.log(exception.stack); - messageStream.close(); - }); - messageStream.on('end', function(graceful) { - console.log('END :', 'Ended', (graceful ? '' : 'not ') + 'gracefully'); - }); - messageStream.on('message', function(message) { - console.log( - 'MESSAGE: %s:%s typeid=%s data=%s'.blue, message.csid, - message.msid, message.typeid, message.data.length); - }); -} - diff --git a/examples/dumpChunkStream.js b/examples/dumpChunkStream.js new file mode 100644 index 0000000..1bd4115 --- /dev/null +++ b/examples/dumpChunkStream.js @@ -0,0 +1,98 @@ +"use strict"; + +require('colors'); +var net = require('net'); +var mtrude = require('mtrude'); +var ChunkStream = mtrude.rtmp.ChunkStream; +var asSocket = mtrude.asSocket; + +function main() { + var optimist = require('optimist') + .usage('Usage: $0 [--debug] [--debugchain] [in [out]]') + .boolean('debug') + .boolean('debugchain') + .boolean('help') + .alias('h', 'help') + .describe('debug', 'Set ChunkStream.DBG = true') + .describe('debugchain', 'Set BufferChain.DBG = true') + ; + var argv = optimist.argv; + + if (argv.help || argv._.length > 2) { + optimist.showHelp(); + return; + } + + console.log('' + + 'NOTE! ChunkStream is not complete without MessageStream and might get\n' + + 'stuck because of missing coordination with peer. Dump format:\n' + + 'CHUNK: %s %s %s %s %s:%s %s:%s,%s\n', + 'data-1st -8-bytes'.blue, 'asci i-8c'.yellow, + 'tstamp'.blue, 'ti'.green, 'msg-id'.blue, 'chs-id'.blue, + 'msglen'.magenta, 'chklen'.blue, 'rest'.blue + ); + + if (argv.debug) ChunkStream.DBG = true; + + if (argv.chain) mtrude.BufferChain.DBG = true; + + if (argv._.length == 0) { + var server = net.createServer(); + server.on('connection', function(socket) { + console.log('CONN : ' + 'Connection from %s'.cyan, socket.remoteAddress); + dumpChunkStream(new ChunkStream(socket)); + }); + server.listen(1935); + console.log('LISTN: ' + 'Listening on port 1935'.cyan); + } + else { + var iFile = argv._[0]; + var time36 = new Date().getTime().toString(36) + var oFile = argv._[1] || 'out-' + time36 + '.raw'; + dumpChunkStream(new ChunkStream(asSocket(iFile, oFile + , function() { console.log('FILE : ' + 'Reading %s'.cyan, iFile); } + , function() { console.log('FILE : ' + 'Writing %s'.cyan, oFile); } + ))); + } +} + + +var DumpTools = require('./DumpTools'); +var dump8 = DumpTools.dump8; +var hex2 = DumpTools.hex2; +var hex6 = DumpTools.hex6; +var ascii8 = DumpTools.ascii8; + +var dumpChunkStream = function(chunkStream) { + chunkStream.on('error', function(errorMessage) { + console.log('ERROR:', ('ChunkStream: ' + errorMessage).red); + chunkStream.close(); + }); + chunkStream.on('end', function(graceful) { + var msg = 'ChunkStream ' + (graceful ? '' : 'not ') + 'gracefully ended'; + console.log('END :', graceful ? msg.green : msg.red); + }); + chunkStream.on('handshake', function() { + console.log('HANDS: %s', 'Handshake completed successfully'.green); + }); + chunkStream.on('warn', function(message) { + console.log('WARN : %s', ('ChunkStream: ' + message).red); + }); + chunkStream.on('chunk', function(chunk) { + console.log( + 'CHUNK: %s %s %s %s %s:%s %s:%s,%s', + dump8(chunk.data).blue, ascii8(chunk.data), + hex6(chunk.timestamp).blue, hex2(chunk.typeid).green, + hex6(chunk.csid).blue, hex6(chunk.msid).blue, + chunk.length.toString().magenta, chunk.data.length.toString().blue, + chunk.rest.toString().blue); + if (chunk.typeid == 1) { + var chunkSize = chunk.data.readUInt32LE(0); + chunkStream.warn('Setting chunk size to ' + chunkSize); + chunkStream.chunkSize = chunkSize; + } + }); +} + +if (require.main === module) main(); +module.exports = dumpChunkStream; diff --git a/examples/dumpMessageStream.js b/examples/dumpMessageStream.js new file mode 100644 index 0000000..f13bfa7 --- /dev/null +++ b/examples/dumpMessageStream.js @@ -0,0 +1,94 @@ +"use strict"; + +require('colors'); +var net = require('net'); +var mtrude = require('mtrude'); +var rtmp = mtrude.rtmp; +var ChunkStream = rtmp.ChunkStream; +var MessageStream = rtmp.MessageStream; +var asSocket = mtrude.asSocket; + +function main() { + var optimist = require('optimist') + .usage('Usage: $0 [--debug] [--chunks] [in [out]]') + .boolean('debug') + .boolean('chunks') + .boolean('help') + .alias('h', 'help') + .describe('debug', 'Set MessageStream.DBG = true') + .describe('chunks', 'Also dump chunks') + ; + var argv = optimist.argv; + + if (argv.help || argv._.length > 2) { + optimist.showHelp(); + return; + } + + console.log('Dump format:\n' + + 'MESSAGE: ....\n'); + + if (argv.debug) MessageStream.DBG = true; + + if (argv._.length == 0) { + var server = net.createServer(); + server.on('connection', function(socket) { + console.log('CONN : ' + 'Connection from %s'.cyan, socket.remoteAddress); + var chunkStream = new ChunkStream(); + }); + server.listen(1935); + console.log('LISTN: ' + 'Listening on port 1935'.cyan); + } + else { + var iFile = argv._[0]; + var time36 = new Date().getTime().toString(36) + var oFile = argv._[1] || 'out-' + time36 + '.raw'; + var chunkStream = new ChunkStream(asSocket(iFile, oFile + , function() { console.log('FILE : ' + 'Reading %s'.cyan, iFile); } + , function() { console.log('FILE : ' + 'Writing %s'.cyan, oFile); } + )); + } + + if (argv.chunks) require('./dumpChunkStream')(chunkStream); + + var messageStream = new MessageStream(chunkStream); + dumpMessageStream(messageStream); +} + +var DumpTools = require('./DumpTools'); +var dump8 = DumpTools.dump8; +var hex2 = DumpTools.hex2; +var hex6 = DumpTools.hex6; +var ascii8 = DumpTools.ascii8; + +function dumpMessageStream(messageStream) { + messageStream.on('error', function(errorMessage) { + console.log('ERROR:', ('MessageStream: ' + errorMessage).red); + messageStream.close(); + }); + messageStream.on('warn', function(warnMessage) { + console.log('WARN :', ('MessageStream: ' + warnMessage).red); + }); + messageStream.on('end', function(graceful) { + var msg = 'MessageStream ' + (graceful ? '' : 'not ') + 'gracefully ended'; + console.log('END :', graceful ? msg.green : msg.red); + }); + messageStream.on('message', function(message) { + console.log('MSG : %s %s %s %s %s:%s %s', + dump8(message.data).blue, ascii8(message.data), + hex6(message.timestamp).blue, hex2(message.typeid).green, + hex6(message.csid).blue, hex6(message.msid).blue, + message.data.length.toString().magenta); + }); + messageStream.on('ping', function(ping) { + var id = ping.id == null ? '-' : ping.id; + var timestamp = ping.timestamp == null ? ping.buflen : ping.timestamp; + console.log('PING : %s (%s) %s %s', + hex2(ping.type).green, rtmp.pingNames[ping.type].green, + id.toString().blue, (timestamp || '-').toString().blue); + }); +} + +if (require.main === module) main(); +exports.dumpMessageStream = dumpMessageStream; + diff --git a/lib/rtmp/ChunkStream.js b/lib/rtmp/ChunkStream.js index b4b6e8b..81d4b88 100644 --- a/lib/rtmp/ChunkStream.js +++ b/lib/rtmp/ChunkStream.js @@ -241,7 +241,8 @@ s.handshake0Server = function() { // RTMCSP 5.2 Handshake C0, S0 and S1 this.socket.writeFilled(0xc5, 1528); // S1: Random data this.state = 'handshake1Server'; - if (version != 3) warn('RTMP version is not 3, but ' + version); + if (version > 32) this.error('Not RTMP'); + else if (version != 3) this.warn('RTMP version is not 3, but ' + version); return true; } @@ -291,7 +292,6 @@ s.chunk = function() { // RTMCSP 6.1.1 Chunk Basic Header 1 (csid > 1) else { this.state = 'chunkType' + chunks.fmt; } - dbg('chunks', chunks); return true; } @@ -301,7 +301,6 @@ s.chunk0 = function() { // RTMCSP 6.1.1 Chunk Basic Header 2 (csid == 0) this.chunks.csid = this.buffers.consumeUInt8() + 64; this.state = 'chunkType' + this.chunks.fmt; - dbg('chunks', this.chunks); return true; } @@ -311,7 +310,6 @@ s.chunk1 = function() { // RTMCSP 6.1.1 Chunk Basic Header 3 (csid == 1) this.chunks.csid = this.buffers.consumeUInt16BE() + 64; this.state = 'chunkType' + chunks.fmt; - dbg('chunks', this.chunks); return true; } diff --git a/lib/rtmp/MessageStream.js b/lib/rtmp/MessageStream.js index 853deb0..d19beec 100644 --- a/lib/rtmp/MessageStream.js +++ b/lib/rtmp/MessageStream.js @@ -20,19 +20,24 @@ function dbg() { * * MessageStream emits these events: * 'error': function(message) {} - * Emitted on networking or protocol errors + * Emitted on networking or protocol errors * 'warn': function(message) {} - * Emitted on potential problems + * Emitted on potential problems * 'end' function(graceful) {} - * Emitted on disconnect from other end (graceful or not) + * Emitted on disconnect from other end (graceful or not) * 'message' function(message) {} - * Emitted on received complete message. message has the fields - * csid: Chunk Stream ID - * msid: Message Stream ID - * timestamp: Message timestamp (milliseconds, increasing and wrapping) - * typeid: Message type ID - * data: Message payload - * + * Emitted on received complete message. message has the fields + * csid: Chunk Stream ID + * msid: Message Stream ID + * timestamp: Message timestamp (milliseconds, increasing and wrapping) + * typeid: Message type ID + * data: Message payload + * 'ping' function(ping) {} + * Emitted on ping messages (User control message). ping has the fields + * type: One of the fields of rtmp.ping + * id: Stream ID the ping is about (for BEGIN to RECORD) + * timestamp: The timestamp (for PING and PONG) + * buflen: The buffer length in ms (for BUFLEN) * MessageStream has these methods: * send(message) send a message * close() close the connection @@ -56,7 +61,7 @@ function MessageStream(chunkStream) { util.inherits(MessageStream, EventEmitter); - +MessageStream.DBG = false; // === Prototype == var p = MessageStream.prototype; @@ -95,14 +100,67 @@ p.handleChunk = function(chunk) { } } +p.handlePing = function(data) { + function tooShort(len) { + if (data && data.length >= len) return false; + it.warn('Ping request with not enough data. Ignored.'); + return true; + } + function emit(ping) { it.emit('ping', ping); } + var it = this; + + if (tooShort(2)) return; + + var ping = { type: data.readUInt16BE(0) }; + switch (ping.type) { + case rtmp.ping.BEGIN: + case rtmp.ping.EOF: + case rtmp.ping.DRY: + case rtmp.ping.RECORDED: + if (tooShort(6)) return; + ping.id = data.readUInt32BE(2); + emit(ping); + break; + case rtmp.ping.PING: + case rtmp.ping.PONG: + if (tooShort(6)) return; + ping.timestamp = data.readUInt32BE(2); + emit(ping); + break; + case rtmp.ping.BUFLEN: + if (tooShort(10)) return; + ping.id = data.readUInt32BE(2); + ping.buflen = data.readUInt32BE(6); + emit(ping); + break; + default: + it.warn('Unknown ping request with type #' + ping.type + '. Ignored.'); + break; + } +} + p.controlProtocol = function(chunk) { + function tooShort(len) { + if (chunk.data && chunk.data.length >= len) return false; + it.warn('Protocol Control request with not enough data. Ignored.'); + return true; + } + switch (chunk.typeid) { case rtmp.CHUNK_SIZE: + if (tooShort(4)) return; this.chunkStream.chunkSize = chunk.data.readInt32BE(0); break; + case rtmp.ABORT: + this.warn('ABORT Protocol Control received, closing ChunkStream'); + this.chunkStream.close(); + break; case rtmp.SERVER: this.chunkStream.send(chunk); break; + case rtmp.PING: + this.handlePing(chunk.data); + break; default: this.error('Protocol Control Message #' + chunk.typeid + ' not implemented'); } @@ -131,7 +189,7 @@ Object.defineProperties(p, { }); Object.freeze(MessageStream.prototype); -Object.freeze(MessageStream); +Object.seal(MessageStream); module.exports = MessageStream; diff --git a/lib/rtmp/RTMP.js b/lib/rtmp/RTMP.js index 1599dce..fbe3320 100644 --- a/lib/rtmp/RTMP.js +++ b/lib/rtmp/RTMP.js @@ -7,8 +7,9 @@ var rtmp = module.exports = { BANDWITH: 6, AUDIO: 8, VIDEO: 9, - FLEX_SO: 16, - FLEX: 17, + DATA: 15, // same as NOTIFY but with AMF3 + FLEX_SO: 16, // same as SO but with AMF 3 + FLEX: 17, // same as INVOKE but with AMF3 NOTIFY: 18, SO: 19, INVOKE: 20, @@ -17,3 +18,35 @@ var rtmp = module.exports = { }, } + + +// FLEX and INVOKE take as parameters: +// - command name +// - transaction id +// - command object (parameters) + +// DATA and NOTOFY ... (see RTMP Commands 3.2) + +// FLEX_SO and SO encode Shared Object events. For details see RTMP Commands +// 3.3. In short it's a bit confusing and not well explained. But SO are not +// important for Tikato and therefore mtrude will not support SO (or much +// later). + + + +// PING is named "User Control message" and has the form 16bit type then data +// (depends on the type). For details see: RTMP Commands 3.7, page 9. +// type what len source description +// 0 BEGIN 4 Server ID of stream to be sent now +// 1 EOF 4 Server ID of stream which has ended or will end +// 2 DRY 4 Server ID of stream which has momentarily no data +// 3 BUFLEN 8 Client Allocate a buffer (in ms) for stream ID +// 4 RECORDED 4 Server ID of stream which is recorded +// 6 PING 4 Server Are you there (with timestamp)? +// 7 PONG 4 Client I am here (send back same timestamp) +rtmp.ping = { + BEGIN: 0, EOF: 1, DRY: 2, BUFLEN: 3, RECORDED: 4, PING: 6, PONG: 7, +} + +rtmp.pingNames = 'BEGIN EOF DRY BUFLEN RECORDED - PING PONG'.split(' '); + diff --git a/package.json b/package.json index 865305e..036b41b 100644 --- a/package.json +++ b/package.json @@ -10,5 +10,8 @@ , "dependencies": { "colors": "0.5.1" } + , "devDependencies": { + "optimist": ">=0.2.8" + } } diff --git a/test/data/invalid-rtmp.raw b/test/data/invalid-rtmp.raw new file mode 100644 index 0000000..04a8adf Binary files /dev/null and b/test/data/invalid-rtmp.raw differ