Skip to content

Commit

Permalink
In-between commit for application support
Browse files Browse the repository at this point in the history
  • Loading branch information
nalply committed Oct 31, 2011
1 parent 773b9f1 commit 7b97145
Show file tree
Hide file tree
Showing 21 changed files with 350 additions and 68 deletions.
13 changes: 13 additions & 0 deletions docs/dialog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Handshake fertig.
Client: INVOKE [ 'connect', 1, {
app: 'flvplayback', flashVer: 'MAC 10,2,159,1', swfUrl: undefined,
tcUrl: 'rtmp://muse/flvplayback', fpad: false, capabilities: 239,
audioCodecs: 3191, videoCodecs: 252, videoFunction: 1,
pageUrl: undefined, objectEncoding: 3 } ]
Client: SERVER (windowSize = 0x2625a0)
Server: SERVER (windowSize = idem)
Server: BANDWIDTH (windowSize = idem, limit = 2)
Server: INVOKE [ '_result', 1, { fmsVer: 'FMS/3,0,1,123', capabilities: 31 },
{ level: 'status', code: 'NetConnection.Connect.Success',
description: 'Connection succeeded', objectEncoding: 3 } ]

63 changes: 63 additions & 0 deletions examples/connector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"use strict";

var net = require('net');
var mtrude = require('mtrude');
var rtmp = mtrude.rtmp;
var ChunkStream = rtmp.ChunkStream;
var MessageStream = rtmp.MessageStream;
var Application = rtmp.Application;
var utils = mtrude.utils;


function workWithChunkStream(chunkStream) {
var messageStream = new MessageStream(chunkStream);
var application = new Application(messageStream, {
connect: function(command) {
console.log('NetConnection.connect(): ', command[2]);
return true;
},
});
}

function main() {
var optimist = require('optimist')
.usage('Usage: $0 [--debug] [in [out]]')
.boolean('debug')
.boolean('help')
.alias('h', 'help')
.describe('debug', 'Set MessageStream.DBG = true')
.describe('nocolor', 'Don\'t use colors')
;
var argv = optimist.argv;

if (argv.help || argv._.length > 2) {
optimist.showHelp();
return;
}

if (argv.debug) MessageStream.DBG = true;

if (argv._.length == 0) {
var server = net.createServer();
server.on('connection', function(socket) {
console.log('CONN : ' + 'Connection from %s', socket.remoteAddress);
workWithChunkStream(new ChunkStream(socket));
});
server.listen(1935);
console.log('LISTN: Listening on port 1935');
}
else {
var iFile = argv._[0];
var time36 = new Date().getTime().toString(36)
var oFile = argv._[1] || 'out-' + time36 + '.raw';
var chunkStream = new ChunkStream(utils.asSocket(iFile, oFile
, function() { console.log('FILE : ' + 'Reading %s', iFile); }
, function() { console.log('FILE : ' + 'Writing %s', oFile); }
));

workWithChunkSteram(chunkStream);
}
}

if (require.main === module) main();

16 changes: 10 additions & 6 deletions examples/dumpAmf.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ var rtmp = mtrude.rtmp;
var ChunkStream = rtmp.ChunkStream;
var MessageStream = rtmp.MessageStream;
var AMF = rtmp.AMF;
var asSocket = mtrude.asSocket;
var dumpTools = require('./dumpTools');
var utils = mtrude.utils;
var asSocket = utils.asSocket;

function main() {
var optimist = require('optimist')
Expand Down Expand Up @@ -89,16 +89,20 @@ function dumpAmf(messageStream, ignoreErrors) {
messageStream.on('message', function(message) {
switch (message.typeid) {
case rtmp.types.INVOKE:
console.log('INVOK: %s', dumpZ(message.data, messageStream).magenta);
console.log('INVOK: %s:%s %s', message.msid, message.csid,
dumpZ(message.data, messageStream).magenta);
break;
case rtmp.types.SO:
console.log('SHOBJ: %s', dumpZ(message.data, messageStream).blue);
console.log('SHOBJ: %s:%s %s', message.msid, message.csid,
dumpZ(message.data, messageStream).yellow);
break;
case rtmp.types.NOTIFY:
console.log('NOTIF: %s', dumpZ(message.data, messageStream).blue);
console.log('NOTIF: %s:%s %s', message.msid, message.csid,
dumpZ(message.data, messageStream).blue);
break;
case rtmp.types.FLEX:
console.log('FLEX : %s', dump3(message.data, messageStream).cyan);
console.log('FLEX : %s:%s %s', message.msid, message.csid,
dump3(message.data, messageStream).cyan);
break;
}
});
Expand Down
24 changes: 12 additions & 12 deletions examples/dumpChunkStream.js → examples/dumpChunks.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ var net = require('net');
var mtrude = require('mtrude');
var ChunkStream = mtrude.rtmp.ChunkStream;
var asSocket = mtrude.asSocket;
var dumpTools = require('./dumpTools');
var utils = mtrude.utils;

function main() {
var optimist = require('optimist')
.usage('Usage: $0 [--debug] [--debugchain] [--nocolor] [in [out]]')
.boolean('debug')
.usage('Usage: $0 [--debugchunk] [--debugchain] [--nocolor] [in [out]]')
.boolean('debugchunk')
.boolean('debugchain')
.boolean('nocolor')
.boolean('help')
.alias('h', 'help')
.describe('debug', 'Set ChunkStream.DBG = true')
.describe('debugchunk', 'Set ChunkStream.DBG = true')
.describe('debugchain', 'Set BufferChain.DBG = true')
.describe('nocolor', 'Don\'t use colors')
;
Expand All @@ -26,7 +26,7 @@ function main() {
return;
}

if (argv.nocolor) dumpTools.dontColor();
if (argv.nocolor) utils.dontColor();

console.log(''
+ 'NOTE! ChunkStream is not complete without MessageStream and might get\n'
Expand All @@ -37,9 +37,9 @@ function main() {
'msglen'.magenta, 'chklen'.blue, 'rest'.blue
);

if (argv.debug) ChunkStream.DBG = true;
ChunkStream.DBG = !!argv.debugchunk;

if (argv.chain) mtrude.BufferChain.DBG = true;
mtrude.BufferChain.DBG = !!argv.debugchain;

if (argv._.length == 0) {
var server = net.createServer();
Expand All @@ -54,18 +54,18 @@ function main() {
var iFile = argv._[0];
var time36 = new Date().getTime().toString(36)
var oFile = argv._[1] || 'out-' + time36 + '.raw';
dumpChunkStream(new ChunkStream(asSocket(iFile, oFile
dumpChunkStream(new ChunkStream(utils.asSocket(iFile, oFile
, function() { console.log('FILE : ' + 'Reading %s'.cyan, iFile); }
, function() { console.log('FILE : ' + 'Writing %s'.cyan, oFile); }
)));
}
}


var dump8 = dumpTools.dump8;
var hex2 = dumpTools.hex2;
var hex6 = dumpTools.hex6;
var ascii8 = dumpTools.ascii8;
var dump8 = utils.dump8;
var hex2 = utils.hex2;
var hex6 = utils.hex6;
var ascii8 = utils.ascii8;

var dumpChunkStream = function(chunkStream) {
chunkStream.on('error', function(errorMessage) {
Expand Down
4 changes: 2 additions & 2 deletions examples/dumpMessageStream.js → examples/dumpMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ var mtrude = require('mtrude');
var rtmp = mtrude.rtmp;
var ChunkStream = rtmp.ChunkStream;
var MessageStream = rtmp.MessageStream;
var asSocket = mtrude.asSocket;
var dumpTools = require('./dumpTools');
var utils = mtrude.utils;
var asSocket = utils.asSocket;

function main() {
var optimist = require('optimist')
Expand Down
4 changes: 4 additions & 0 deletions lib/BufferChain.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ consume: function(length) {
var oldRest = this.rest;
var oldPtr = this.ptr;
var value1 = this.chain[0][oldPtr];
if (value1 == null) {
dbg(' consume() value1 undefined');
value1 = 0;
}

this.ptr += length;
var length0 = this.chain[0].length;
Expand Down
34 changes: 0 additions & 34 deletions lib/asSocket.js

This file was deleted.

3 changes: 2 additions & 1 deletion lib/mtrude.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
exports.BufferChain = require('./BufferChain');
exports.OutgoingTools = require('./OutgoingTools');
exports.asSocket = require('./asSocket');
exports.utils = require('./utils');
exports.rtmp = require('./rtmp/RTMP');
exports.rtmp.ChunkStream = require('./rtmp/ChunkStream');
exports.rtmp.MessageStream = require('./rtmp/MessageStream');
exports.rtmp.Application = require('./rtmp/Application');
exports.rtmp.AMF = require('./rtmp/AMF');

122 changes: 119 additions & 3 deletions lib/rtmp/AMF.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"use strict"

var mtrude = require('mtrude');
var BufferChain = mtrude.BufferChain;
var util = require('util');
var assert = require('assert');
var BufferChain = require('../BufferChain');

var AMF = module.exports = {};

Expand Down Expand Up @@ -42,8 +43,9 @@ AMF.amfZMarkers = {
AVMPLUS: 0x11, // switch to AMF3
}

var a0 = AMF.amfZMarkers;

function amfZAny(data) {
var a0 = AMF.amfZMarkers;
var marker = data.readUInt8(data.ptr++);
var ptr = data.ptr;

Expand Down Expand Up @@ -105,7 +107,121 @@ function amfZDate(data) {
data.ptr += 10; // including ignored time zone
}

AMF.serializeZ = function() {
var serBuffer = { buffer: new Buffer(200), ptr: 0 };
for (var i = 0; i < arguments.length; i++)
serZAny(serBuffer, arguments[i]);

dbg(hexdump(serBuffer.buffer));

return serBuffer.buffer;
}

function serZAny(serBuffer, value) {
dbgSerBuffer('Any', serBuffer);
if (value === null) return serZNull(serBuffer);
if (value === void 0) return serZUndefined(serBuffer);
if (value === true) return serZTrue(serBuffer);
if (value === false) return serZFalse(serBuffer);
if (typeof value == 'number') return serZNumber(serBuffer, value);
if (typeof value == 'string') return serZString(serBuffer, value);
if (Array.isArray(value)) return serZArray(serBuffer, value);
if (typeof value == 'object') return serZObject(serBuffer, value);
throw new Error('value type not supported');
}

function dbgSerBuffer(text, serBuffer, value) {
assert(serBuffer.buffer && serBuffer.buffer.constructor == Buffer);
function spaces(n) {
var s = '';
for (var i = 0; i < n; i++) s += ' ';
return s;
}
var at = ' @ ' + serBuffer.ptr + ' / ' + serBuffer.buffer.length;
console.log('serZ' + text + spaces(20 - text.length)
+ at + spaces(16 - at.length)
+ (value == null ? '' : util.inspect(value)));
}

function serZNull(serBuffer) {
dbgSerBuffer('Null', serBuffer);
maybeExpand(serBuffer, 1);
serBuffer.buffer.writeUInt8(a0.NULL, serBuffer.ptr++);
}

function serZUndefined(serBuffer) {
dbgSerBuffer('Undefined', serBuffer);
maybeExpand(serBuffer, 1);
serBuffer.buffer.writeUInt8(a0.UNDEFINED, serBuffer.ptr++);
}

function serZTrue(serBuffer) {
dbgSerBuffer('True', serBuffer);
maybeExpand(serBuffer, 2);
serBuffer.buffer.writeUInt8(a0.BOOLEAN, serBuffer.ptr++);
serBuffer.buffer.writeUInt8(1, serBuffer.ptr++);
}

function serZFalse(serBuffer) {
dbgSerBuffer('False', serBuffer);
maybeExpand(serBuffer, 2);
serBuffer.buffer.writeUInt8(a0.BOOLEAN, serBuffer.ptr++);
serBuffer.buffer.writeUInt8(0, serBuffer.ptr++);
}

function serZNumber(serBuffer, value) {
dbgSerBuffer('Number', serBuffer, value);
maybeExpand(serBuffer, 9);

serBuffer.buffer.writeUInt8(a0.NUMBER, serBuffer.ptr);
serBuffer.buffer.writeDoubleBE(value, serBuffer.ptr + 1);
serBuffer.ptr += 8;
}

function serZString(serBuffer, value, withMarker) {
dbgSerBuffer('String', serBuffer, value);
var len = Buffer.byteLength(value);
if (len > 65535) throw new Error('no more than 65535 bytes in a string');
maybeExpand(serBuffer, len + 2 + withMarker ? 1 : 0);

if (withMarker) serBuffer.buffer.writeUInt8(a0.STRING, serBuffer.ptr++);
serBuffer.buffer.writeUInt16BE(len, serBuffer.ptr);
serBuffer.buffer.write(value, serBuffer.ptr + 2);
assert(serBuffer.buffer.constructor == Buffer);
serBuffer.ptr += len + 2;
}

// Note: only serializes the array elements, not the additional associative
// elements.
function serZArray(serBuffer, value) {
dbgSerBuffer('Array', serBuffer, {length: value.length});
maybeExpand(serBuffer, 4);
serBuffer.buffer.writeUInt32BE(value.length, serBuffer.ptr);
serBuffer.ptr += 4;

for (var i = 0; i < value.length; i++) serZAny(serBuffer, value[i]);
}

function serZObject(serBuffer, value) {
dbgSerBuffer('Object', serBuffer, {keys: Object.keys(value).join(' ')});
for (var key in value) {
serZString(serBuffer, key, false);
serZAny(serBuffer, value[key]);
}
serZString(serBuffer, '', false);
maybeExpand(serBuffer, 1);
serBuffer.buffer.writeUInt8(a0.OBJECT_END, serBuffer.ptr++);
}

function maybeExpand(serBuffer, length) {
if (serBuffer.ptr + length < serBuffer.buffer.length) return;

length = Math.max(length, serBuffer.buffer.length);
var buffer = new Buffer(serBuffer.buffer.length + length);
serBuffer.buffer.copy(buffer);
serBuffer.buffer = buffer;
dbgSerBuffer(' - expanded', serBuffer);
}

AMF.deserialize3 = function(data, emitter) {
if (data.ptr == null) data.ptr = 0;
Expand Down
Loading

0 comments on commit 7b97145

Please sign in to comment.