Skip to content

Commit

Permalink
support mqtt3.1
Browse files Browse the repository at this point in the history
-)remain length support >1 byte
-)swicth protocol from 3 to 4
  • Loading branch information
于家鹏 committed Nov 24, 2015
1 parent 1c95cd9 commit a03db15
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
43 changes: 36 additions & 7 deletions MQTTClient_AS3/src/com/godpaper/mqtt/as3/core/MQTT_Protocol.as
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ package com.godpaper.mqtt.as3.core
protected var qos:uint;
protected var retain:uint;
protected var remainingLength:uint;
protected var remainPosition:uint;

///* stores the will of the client {willFlag,willQos,willRetainFlag} */
public static var WILL:Array;
Expand Down Expand Up @@ -198,7 +199,20 @@ package com.godpaper.mqtt.as3.core

this.position = 0;
this.writeByte(value);
this.writeByte(remainingLength);
var le:uint = remainingLength;
var digit:uint = 0;
do {
digit = le % 128;
le = le / 128;
if(le > 0 ){
digit = digit| 0x80;
}
this.writeByte(digit);

}while(le > 0);
this.remainPosition = this.position;

//this.writeByte(remainingLength);
this.readBytes(fixHead);

type = value & 0xF0;
Expand All @@ -207,6 +221,7 @@ package com.godpaper.mqtt.as3.core
retain = value & 0x01;
}


public function writeMessageValue(value:*):void//Variable Head
{
this.position = 2;
Expand All @@ -220,11 +235,25 @@ package com.godpaper.mqtt.as3.core
this.position = 0;
this.writeType(input.readUnsignedByte());
//get VarHead and Payload use RemainingLength
remainingLength = input.readUnsignedByte();

//remainingLength = input.readUnsignedByte();

input.readBytes(this, 2, remainingLength);
serialize();
//input.readBytes(this, 2, remainingLength);

var multiplier :uint = 1;
var remainLength:uint = 0;
do
{
var le:uint = input.readUnsignedByte();
remainLength += (le & 127) * multiplier;
multiplier *= 128;

}
while ((le & 128) != 0);
remainingLength = remainLength;
writeMessageType( type + (dup << 3) + (qos << 1) + retain );
input.readBytes(this, this.remainPosition, remainingLength);
serialize();
}

public function readMessageType():ByteArray
Expand Down Expand Up @@ -256,9 +285,9 @@ package com.godpaper.mqtt.as3.core
payLoad = new ByteArray();

this.position = 0;
this.readBytes(fixHead, 0, 2);
this.readBytes(fixHead, 0, this.remainPosition);

this.position = 2;
this.position = this.remainPosition;
switch( type ){
case CONNECT://Remaining Length is the length of the variable header (12 bytes) and the length of the Payload
this.readBytes(varHead, 0 , 12);
Expand All @@ -268,7 +297,7 @@ package com.godpaper.mqtt.as3.core
break;
case PUBLISH://Remaining Length is the length of the variable header plus the length of the payload
var index:int = (this.readUnsignedByte() << 8) + this.readUnsignedByte();//the length of variable header
this.position = 2;
this.position = this.remainPosition;
this.readBytes(varHead, 0 , index + (qos?4:2));
this.readBytes(payLoad);

Expand Down
26 changes: 12 additions & 14 deletions MQTTClient_AS3/src/com/godpaper/mqtt/as3/impl/MQTTSocket.as
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ package com.godpaper.mqtt.as3.impl
*
*/
// public function MQTTSocket(host:String=null, port:int=1883, topicname:String=null, clientid:String=null, username:String=null, password:String=null,willRetain:Boolean=true,willQos:Boolean=true,willFlag:Boolean=true,cleanSession:Boolean=true)
public function MQTTSocket(host:String=null, port:int=1883,username:String=null, password:String=null, topicname:String=null, clientid:String=null, will:Boolean=true,cleanSession:Boolean=true)
public function MQTTSocket(host:String=null, port:int=1883,username:String=null, password:String=null, topicname:String=null, clientid:String=null, will:Boolean=false,cleanSession:Boolean=false)
{
//parameters store
if (host)
Expand Down Expand Up @@ -483,14 +483,12 @@ package com.godpaper.mqtt.as3.impl
this.connectMessage=new MQTT_Protocol();
var bytes:ByteArray=new ByteArray();
bytes.writeByte(0x00); //0
bytes.writeByte(0x06); //6
bytes.writeByte(0x04); //6
bytes.writeByte(0x4d); //M
bytes.writeByte(0x51); //Q
bytes.writeByte(0x49); //I
bytes.writeByte(0x73); //S
bytes.writeByte(0x64); //D
bytes.writeByte(0x70); //P
bytes.writeByte(0x03); //Protocol version = 3
bytes.writeByte(0x54); //T
bytes.writeByte(0x54); //T
bytes.writeByte(0x04); //Protocol version = 3
//Connect flags
var type:int=0;
if (cleanSession)
Expand Down Expand Up @@ -707,20 +705,20 @@ package com.godpaper.mqtt.as3.impl
//Variable header
//Payload
//Actions
var varHead:ByteArray=packet.readMessageValue();
var varHead:ByteArray = packet.readMessageValue();
var length:uint = (varHead.readUnsignedByte() << 8) + varHead.readUnsignedByte();
var topicName:String = varHead.readMultiByte(length, "utf");
if( packet.readQoS() ){
var messageId:uint = (varHead.readUnsignedByte() << 8) + varHead.readUnsignedByte();
LOG.info("Publish Message ID {0}", messageId);
}
var payLoad:ByteArray = packet.readPayLoad();
length = (payLoad.readUnsignedByte() << 8) + payLoad.readUnsignedByte();
if( length > payLoad.length ){
length = payLoad.length;
payLoad.position = 0;
}
var topicContent:String = payLoad.readMultiByte(length, "utf");
//length = (payLoad.readUnsignedByte() << 8) + payLoad.readUnsignedByte();
//if( length > payLoad.length ){
// length = payLoad.length;
// payLoad.position = 0;
//}
var topicContent:String = payLoad.readMultiByte(payLoad.length, "utf-8");

LOG.info("Publish TopicName {0}", topicName);
LOG.info("Publish TopicContent {0}", topicContent);
Expand Down

0 comments on commit a03db15

Please sign in to comment.