Skip to content

Commit

Permalink
change of clock
Browse files Browse the repository at this point in the history
  • Loading branch information
wenzhel101 committed Feb 11, 2014
1 parent e834323 commit e28b445
Show file tree
Hide file tree
Showing 17 changed files with 35 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="lib" path="E:/18842/snakeyaml-1.13.jar"/>
<classpathentry kind="lib" path="/Users/wenzheli/Workspaces/lab2/snakeyaml-1.13.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Binary file modified bin/lab2/Clock.class
Binary file not shown.
Binary file modified bin/lab2/GroupsRpg.class
Binary file not shown.
Binary file modified bin/lab2/HostInfo.class
Binary file not shown.
Binary file modified bin/lab2/Hosts.class
Binary file not shown.
Binary file modified bin/lab2/Logger$LogClientThread.class
Binary file not shown.
Binary file modified bin/lab2/Logger.class
Binary file not shown.
Binary file modified bin/lab2/MessagePasser$ClientThread.class
Binary file not shown.
Binary file modified bin/lab2/MessagePasser.class
Binary file not shown.
Binary file modified bin/lab2/MulticastMessage.class
Binary file not shown.
Binary file modified bin/lab2/MulticastMessagePasser.class
Binary file not shown.
Binary file modified bin/lab2/Rules.class
Binary file not shown.
Binary file modified bin/lab2/VectorClock.class
Binary file not shown.
Binary file added lab2.jar
Binary file not shown.
23 changes: 17 additions & 6 deletions src/lab2/MessagePasser.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ public void run() {
MulticastMessage receiveMessage= mmp.receive((MulticastMessage)(incomingMessage));
if(receiveMessage != null){
// update group clock
groupsRpg.getClockGroup().get(receiveMessage.getGroupName()
).adjustClock(receiveMessage.getClockForGroup());
/*groupsRpg.getClockGroup().get(receiveMessage.getGroupName()
).adjustClock(receiveMessage.getClockForGroup());*/

inputQueue.add(receiveMessage);

Expand Down Expand Up @@ -337,8 +337,8 @@ public void run() {
MulticastMessage dup = new MulticastMessage((MulticastMessage)receiveMessage);
dup.setDuplicate(true);
// update group clock
groupsRpg.getClockGroup().get(receiveMessage.getGroupName()
).adjustClock(receiveMessage.getClockForGroup());
/*groupsRpg.getClockGroup().get(receiveMessage.getGroupName()
).adjustClock(receiveMessage.getClockForGroup());*/
inputQueue.add(receiveMessage);
inputQueue.add(dup);
}
Expand Down Expand Up @@ -409,8 +409,8 @@ private void inputReceiveBuffer(){
MulticastMessage receiveMessage = mmp.receive((MulticastMessage)(delayMsg));
if(receiveMessage != null) {
// update group clock
groupsRpg.getClockGroup().get(receiveMessage.getGroupName()
).adjustClock(receiveMessage.getClockForGroup());
/*groupsRpg.getClockGroup().get(receiveMessage.getGroupName()
).adjustClock(receiveMessage.getClockForGroup());*/
inputQueue.add(receiveMessage);
}
}
Expand Down Expand Up @@ -471,6 +471,8 @@ else if(message instanceof MulticastMessage){
MulticastMessage mcMessage = (MulticastMessage) message;
System.out.println(mcMessage.toString());
this.hostTimeStamp.adjustClock(mcMessage.getTimeStamp());
groupsRpg.getClockGroup().get(mcMessage.getGroupName()
).adjustClock(mcMessage.getClockForGroup());
}
}

Expand All @@ -488,6 +490,15 @@ else if(message instanceof MulticastMessage){
} else if ("c".equals(tokens[0])){
System.out.println("CurrentClock:"+this.hostTimeStamp);
this.hostTimeStamp.addClock();
} else if ("gc".equals(tokens[0])){
if(tokens[1] != null){
System.out.println("CurrentGroupClock:"+this.groupsRpg.getClockGroup().get(tokens[1]));
this.hostTimeStamp.addClock();
}
else{
System.out.println("please input groupname");
System.out.print(localName + "# ");
}
} else {
// Invalid command
System.out.println("Couldn't recognize this command.");
Expand Down
4 changes: 2 additions & 2 deletions src/lab2/MulticastMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public void setClockForGroup(VectorClock clockForGroup) {

@Override
public String toString() {
return "MESSAGE{"+ source +"->"+ dest +" seqNum:"+ seqNum +" duplicate:"+ duplicate +" kind:"+ kind
return "MESSAGE(MULTICASTMESSAGE)\n{"+ source +"->"+ dest +" seqNum:"+ seqNum +" duplicate:"+ duplicate +" kind:"+ kind
+" data:"+ data +" \nTIMESTAMP[:" + this.timeStamp + " ]\nACK:" + this.acknowledgement + "} "
+ " clockForGroup = {" + clockForGroup.toString() + "}";
+ "\nGROUPCLOCK = {" + clockForGroup.toString() + "}";
}
}
15 changes: 15 additions & 0 deletions src/lab2/MulticastMessagePasser.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public void send(String groupName, Message message) {
this.groupRpg.groups.get(groupName).put(this.localHostName, this.groupSpg.get(groupName));
this.groupSpg.put(groupName, this.groupSpg.get(groupName) + 1);

//this.messagePasser.getTimeStamp().addClock();

// update local group vector clock
this.groupRpg.getClockGroup().get(groupName).addClock();

Expand All @@ -69,9 +71,14 @@ public MulticastMessage receive(MulticastMessage mMsg){
if(mMsg.getKind().equals("NACK")){
int lastSeq = mMsg.getSeqNum();
String groupName = mMsg.getGroupName();
//adjust local timestamp
this.messagePasser.getTimeStamp().adjustClock(mMsg.getTimeStamp());


System.out.println("RECEIVE NACK MESSAGE, START RESEND");
for(int i = lastSeq; i < this.groupSpg.get(groupName); i++){
MulticastMessage reSendMessage = new MulticastMessage(this.sentQueue.get(groupName).get(i));
this.messagePasser.getTimeStamp().addClock();
//change set
//if()
reSendMessage.setKind("normal");
Expand All @@ -90,8 +97,13 @@ public MulticastMessage receive(MulticastMessage mMsg){
//return mMsg;
this.groupRpg.groups.get(groupName).put(src, thisSeq);
}
//
else if(thisSeq > this.groupRpg.groups.get(groupName).get(src) + 1){ // send NACK ask for resend
Message NACKmsg = new Message(src, "NACK", null);

//add clock
this.messagePasser.getTimeStamp().addClock();

NACKmsg.setSource(this.localHostName);
NACKmsg.setSeqNum(this.groupRpg.groups.get(groupName).get(src) + 1);
MulticastMessage multicastNackMsg = new MulticastMessage(NACKmsg, groupName, this.messagePasser.getTimeStamp(),
Expand All @@ -112,6 +124,9 @@ else if(thisSeq > this.groupRpg.groups.get(groupName).get(src) + 1){ // send NAC
Message NACKmsg = new Message(memName, "NACK", null);
NACKmsg.setSource(this.localHostName);
NACKmsg.setSeqNum(this.groupRpg.groups.get(groupName).get(memName) + 1);
//add clock
this.messagePasser.getTimeStamp().addClock();

MulticastMessage multicastNackMsg = new MulticastMessage(NACKmsg, groupName, this.messagePasser.getTimeStamp(),
new Hashtable<String, Integer>(this.groupRpg.groups.get(groupName)),
groupRpg.getClockGroup().get(groupName));
Expand Down

0 comments on commit e28b445

Please sign in to comment.