Skip to content

Commit

Permalink
Partial implementation of AMF3 and Flex messages
Browse files Browse the repository at this point in the history
  • Loading branch information
nalply committed Oct 28, 2011
1 parent 5e7aa69 commit 9ed13a8
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
out
tmp

114 changes: 93 additions & 21 deletions examples/dumpAmf.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,109 @@
"use strict";

require('colors');
var net = require('net');
var util = require('util');
var net = require('net');
var mtrude = require('mtrude');
var ChunkStream = mtrude.rtmp.ChunkStream;
var MessageStream = mtrude.rtmp.MessageStream;
var AMF = mtrude.rtmp.AMF;
var rtmp = mtrude.rtmp;
var ChunkStream = rtmp.ChunkStream;
var MessageStream = rtmp.MessageStream;
var AMF = rtmp.AMF;
var asSocket = mtrude.asSocket;
var dumpTools = require('./dumpTools');

function main() {
var optimist = require('optimist')
.usage('Usage: $0 [--debug] [--messages] [--nocolor] [--ignore] [in [out]]')
.boolean('debug')
.boolean('nocolor')
.boolean('messages')
.boolean('ignore')
.boolean('help')
.alias('h', 'help')
.describe('debug', 'Set MessageStream.DBG = true')
.describe('messages', 'Also dump messages')
.describe('nocolor', 'Don\'t use colors')
.describe('ignore', 'Ignore errors')
;
var argv = optimist.argv;

if (argv.help || argv._.length > 2) {
var messageStream = new MessageStream(chunkStream);
if (argv.messages) require('./dumpMessageStream')(messageStream);
dumpAmf(messageStream);
optimist.showHelp();
return;
}

if (argv.nocolor) dumpTools.dontColor();

if (argv.debug) MessageStream.DBG = true;

if (argv._.length == 0) {
var server = net.createServer();
server.on('connection', function(socket) {
console.log('CONN : ' + 'Connection from %s'.grey, socket.remoteAddress);
var chunkStream = new ChunkStream();
});
server.listen(1935);
console.log('LISTN: ' + 'Listening on port 1935'.grey);
}
else {
var iFile = argv._[0];
var time36 = new Date().getTime().toString(36)
var oFile = argv._[1] || 'out-' + time36 + '.raw';
var chunkStream = new ChunkStream(asSocket(iFile, oFile
, function() { console.log('FILE : ' + 'Reading %s'.grey, iFile); }
, function() { console.log('FILE : ' + 'Writing %s'.grey, oFile); }
));
}

var chain = new mtrude.BufferChain();
var messageStream = new MessageStream(chunkStream);
if (argv.messages) require('./dumpMessageStream')(messageStream);
dumpAmf(messageStream, argv.ignore);
}

function dumpZ(buffer, messageStream) {
return util.inspect(AMF.deserializeZ(buffer, messageStream));
}

var server = net.createServer();
server.on('connection', function(socket) {
console.log('Connection from %s', socket.remoteAddress);
var messageStream = new MessageStream(new ChunkStream(socket));
function dump3(buffer, messageStream) {
return util.inspect(AMF.deserialize3(buffer, messageStream));
}

function dumpAmf(messageStream, ignoreErrors) {
messageStream.on('error', function(msg) {
console.log('ERROR %s'.red, msg);
server.close();
console.log('ERROR: %s', msg.red);
if (!ignoreErrors) messageStream.close();
});
messageStream.on('warn', function(msg) {
console.log('WARN %s'.magenta, msg);
console.log('WARN : %s', msg.magenta);
if (!ignoreErrors) messageStream.close();
});
messageStream.on('end', function(graceful) {
console.log('END :', 'Ended', (graceful ? '' : 'not ') + 'gracefully');
var msg = 'dumpAmf MessageStream ' + (graceful ? '' : 'not ')
+ 'gracefully ended';
console.log('END :', graceful ? msg.green : msg.red);
});
messageStream.on('message', function(message) {
console.log(
'MESSAGE: %s:%s typeid=%s data=%s'.blue, message.csid,
message.msid, message.typeid, message.data.length);
if (message.typeid == 20) {
console.log('INVOKE'.green, util.inspect(AMF.deserialize(message.data)));
switch (message.typeid) {
case rtmp.types.INVOKE:
console.log('INVOK: %s', dumpZ(message.data, messageStream).magenta);
break;
case rtmp.types.SO:
console.log('SHOBJ: %s', dumpZ(message.data, messageStream).blue);
break;
case rtmp.types.NOTIFY:
console.log('NOTIF: %s', dumpZ(message.data, messageStream).blue);
break;
case rtmp.types.FLEX:
console.log('FLEX : %s', dump3(message.data, messageStream).cyan);
break;
}
});
});
}

server.listen(1935);
console.log();
console.log('Listening on port 1935');
if (require.main === module) main();
exports.dumpAmf = dumpAmf;

35 changes: 27 additions & 8 deletions lib/BufferChain.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ push: function(buffer) {
return true;
}
else if (buffer instanceof BufferChain) {
for (var i = 0; i < buffer.length; i++) this.push(buffer.chain[i]);
for (var i = 0; i < buffer.length; i++)
this.push(buffer.chain[i]);
return true;
}
else {
Expand All @@ -41,16 +42,30 @@ push: function(buffer) {

_index: function(ptr) {
var index = 0;
while (ptr >= this.chain[index].length) ptr -= this.chain[index++].length;
while (ptr >= this.chain[index].length)
ptr -= this.chain[index++].length;
return [index, ptr];
},

toString: function(encoding, start, end) {
var subChain = this.slice(start, end);
if (subChain.chain.length == 0) return '';
if (subChain.chain.length == 1) return subChain.chain[0].toString(encoding);
if (subChain.chain.length == 0)
return '';

if (subChain.chain.length == 1)
return subChain.chain[0].toString(encoding);

if (subChain.chain.length == 2) {
// Either copy both chains or handle utf8 multibyte code points correctly
// at the boundary between the first and the second buffer
// I choose copying.
var buffer = new Buffer(subChain.length);
subChain.chain[0].copy(buffer);
subChain.chain[1].copy(buffer, subChain.chain[0].length);
return buffer.toString(encoding);
}

throw new Error('toString() over several buffers not implemented');
throw new Error('toString() over more than two buffers not implemented');
},

readDoubleBE: function(ptr) {
Expand All @@ -77,14 +92,14 @@ readUInt32LE: function(ptr) {
var index = this._index(ptr);
return this.chain[index[0]].readUInt32LE(index[1]);

// todo cross-buffer read
// todo cross-buffer read, but don't worry node has an assert above
},

readUInt32BE: function(ptr) {
var index = this._index(ptr);
return this.chain[index[0]].readUInt32BE(index[1]);

// todo cross-buffer read
// todo cross-buffer read, but don't worry, node has an assert above
},

consumeInt32BE: function() {
Expand All @@ -107,6 +122,8 @@ consumeUInt24BE: function() {
readUInt16BE: function(ptr) {
var index = this._index(ptr);
return this.chain[index[0]].readUInt16BE(index[1]);

// todo cross-buffer read, but don't worry, node has an assert above
},

consumeUInt16BE: function() {
Expand Down Expand Up @@ -144,6 +161,8 @@ consume: function(length) {
length, 'rest', oldRest, '->', this.rest, 'ptr', oldPtr, '->', this.ptr);
},

// Todo TEST THIS CAREFULLY maybe the second loop does bufferLength wrong
// if the next chain has differing length and other corner cases.
slice: function(start, end) {
if (start < 0 || start >= this._length) throw new Error(
'start out of bounds ' + start + ', 0 to ' + this.length + ' exclusive');
Expand Down Expand Up @@ -183,7 +202,7 @@ slice: function(start, end) {

for (var i = i0; i < i1 - 1; i++) chain.push(this.chain[i]);

chain.push(this.chain[i].slice(0, j1));
chain.push(this.chain[i + 1].slice(0, j1));
return chain;
},

Expand Down
Loading

0 comments on commit 9ed13a8

Please sign in to comment.