Skip to content

Commit

Permalink
multicast message send
Browse files Browse the repository at this point in the history
  • Loading branch information
wenzhel101 committed Feb 8, 2014
1 parent 9d81748 commit 9f4953f
Show file tree
Hide file tree
Showing 14 changed files with 336 additions and 13 deletions.
Binary file added bin/lab2/GroupsRpg.class
Binary file not shown.
Binary file modified bin/lab2/Message.class
Binary file not shown.
Binary file modified bin/lab2/MessagePasser$ClientThread.class
Binary file not shown.
Binary file modified bin/lab2/MessagePasser$ListenThread.class
Binary file not shown.
Binary file modified bin/lab2/MessagePasser.class
Binary file not shown.
Binary file added bin/lab2/MulticastMessage.class
Binary file not shown.
Binary file added bin/lab2/MulticastMessagePasser.class
Binary file not shown.
16 changes: 15 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
configuration :
- name : alice
ip : 127.0.0.1
port : 12344 # This is the incoming port on which MP will listen
port : 12347 # This is the incoming port on which MP will listen
# May be changed for further communication as usual
- name : bob
ip : 127.0.0.1
Expand All @@ -23,6 +23,20 @@ configuration :
ip : 127.0.0.1
port : 2004

groups :
- name : Group1
members :
- alice
- bob
- charlie

- name : Group2
members : # This is the incoming port on which MP will listen
- bob
- charlie
- daphnie # May be changed for further communication as usual


clock: logical

sendRules :
Expand Down
77 changes: 77 additions & 0 deletions src/lab2/GroupsRpg.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package lab2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class GroupsRpg {
public Hashtable<String, Hashtable <String, Integer>> groups = new Hashtable<String, Hashtable <String, Integer>> ();

public GroupsRpg(){}
public GroupsRpg(Object groups){
ArrayList<Object> groupList = (ArrayList<Object>) groups;
for(Object group : groupList){
String groupName = null;
ArrayList<String> groupMem = new ArrayList<String>();
Map<String, Object> groupInfo = (Map<String, Object>) group;
for (Map.Entry<String, Object> entry : groupInfo.entrySet()){
Object value = entry.getValue();
String key = entry.getKey();
if("name".equals(key)){
groupName = (String) value;
}
else if("members".equals(key)){
groupMem = (ArrayList<String>) value;
}
else{
System.out.println("ERROR IN GROUP FILE");
}

Hashtable <String, Integer> Rpg = new Hashtable<String, Integer>();
for(String tmp : groupMem){
Rpg.put(tmp, 0);
}

this.groups.put(groupName, Rpg);
}


}
}

public GroupsRpg deepCopy(){
GroupsRpg copy = new GroupsRpg();
for(String key : this.groups.keySet()){
copy.groups.put(key, new Hashtable<String, Integer> (this.groups.get(key)));
}
return copy;
}


public Hashtable <String, Integer> getGroupByName(String groupName){
return this.groups.get(groupName);
}


@Override
public String toString() {
String toString = "{\n";
for (Entry<String, Hashtable<String, Integer>> entry : this.groups.entrySet()) {
Hashtable<String, Integer> tmp = entry.getValue();
toString += entry.getKey() + ":";
Object [] arrMem = tmp.keySet().toArray();
Arrays.sort(arrMem);
for(int i = 0; i < arrMem.length; i++){
toString += "[" + arrMem[i] + ": " + entry.getValue().get(arrMem[i]) + "] ";
}
//toString += tmp.keySet().toString();
toString += "\n";
}
toString += "}\n";
return toString;
}
}
4 changes: 4 additions & 0 deletions src/lab2/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public void setSource(String source){
this.source = source;
}

public void setDest(String dest){
this.dest = dest;
}

public String getSource(){
return this.source;
}
Expand Down
74 changes: 62 additions & 12 deletions src/lab2/MessagePasser.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.net.SocketException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -81,7 +82,8 @@ public class MessagePasser {
* "MessagePasser.receive()"
*/
private List<Message> inputQueue = new LinkedList<Message>();



/**
* Reference number for each ClientThread. For debugging purpose.
*/
Expand All @@ -97,6 +99,18 @@ public class MessagePasser {
*/
private ArrayList<ClientThread> connectionThreads = new ArrayList<ClientThread>();



public GroupsRpg groups;
public Hashtable<String, List<Message>> holdBuffer = new Hashtable<String, List<Message>>();
public MulticastMessagePasser mmp;
/**
* Store messages delivered from "receiveBuffer" to be displayed by
* "MessagePasser.receive()"
*/
//private List<Message> holdBuffer = new LinkedList<Message>();


/**
* Receive 2 arguments for config filename and local name
*
Expand All @@ -113,17 +127,26 @@ public static void main(String[] argv) {
+ "c\t\tShow local TimeStamp\n"
+ "q\t\tQuit");
MessagePasser mp = new MessagePasser(argv[0], argv[1]);

mp.listen();
mp.showCommandPrompt();
}
}

public String getLocalName(){
return this.localName;
}

public Clock getTimeStamp(){
return this.hostTimeStamp;
}
/**
* MessagePasser constructor. Defined by the assignment.
*
* @param configuration_filename
* @param local_name
*/
public MessagePasser(){}
public MessagePasser(String configuration_filename, String local_name) {
/*
System.out.println("== MessagePasser [wkanchan + ytobioka] ==\n" + "Usage:\n"
Expand All @@ -135,6 +158,7 @@ public MessagePasser(String configuration_filename, String local_name) {
this.localName = local_name;
this.configFileName = configuration_filename;
this.readConfig();
this.mmp = new MulticastMessagePasser(this);
/*
listen();
showCommandPrompt();
Expand Down Expand Up @@ -172,6 +196,15 @@ synchronized private void readConfig() {
System.exit(1);
}
}
/**
* author : wenzheli
* add group
*/
else if("groups".equals(key)){
this.groups = new GroupsRpg(entry.getValue());

//System.out.println(this.groups.toString());
}
else if ("sendRules".equals(key)) {
this.sendRules = new Rules(entry.getValue());
} else if ("receiveRules".equals(key)) {
Expand Down Expand Up @@ -337,22 +370,38 @@ private void showCommandPrompt() {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
System.out.print(localName + "# ");
String inputCommand;

try {
while ((inputCommand = in.readLine()) != null) {
String[] tokens = inputCommand.split(" ");
if (tokens.length > 0) {
if ("s".equals(tokens[0]) && tokens.length == 3) {
// Send: Create a new message and send
Message message = new Message(tokens[1], tokens[2], null);
message.setSource(localName);
message.setSeqNum(nextSeqNum++);
// Increment Local TimeStamp
this.hostTimeStamp.addClock();
// Add TimeStamp to Message
TimeStampedMessage t_message = new TimeStampedMessage(message,this.hostTimeStamp.deepCopy());
send(t_message);
/**
* send multicast message
*/
if(tokens[1].startsWith("Group")){
Message message = new Message(tokens[1], tokens[2], null);
message.setSource(localName);
this.hostTimeStamp.addClock();
this.mmp.send(tokens[1], message);
}
else{
// Send: Create a new message and send
Message message = new Message(tokens[1], tokens[2], null);
message.setSource(localName);
message.setSeqNum(nextSeqNum++);
// Increment Local TimeStamp
this.hostTimeStamp.addClock();
// Add TimeStamp to Message
TimeStampedMessage t_message = new TimeStampedMessage(message,this.hostTimeStamp.deepCopy());
send(t_message);
}
} else if ("r".equals(tokens[0])) {
// Receive

/**
* modified by wenzhe
*/
TimeStampedMessage message;
while ((message = (TimeStampedMessage)receive()) != null) {
System.out.println(message);
Expand Down Expand Up @@ -386,7 +435,7 @@ private void showCommandPrompt() {
* The message to send.
*/
/// void send(Message message) {
void send(TimeStampedMessage message) {
void send(Message message) {
// read configuration file to load Rules
this.readConfig();
// Check whether current connection is exists.
Expand Down Expand Up @@ -454,7 +503,8 @@ void send(TimeStampedMessage message) {
// Then, send delayed messages in sendBuffer
out.writeObject(message);
System.out.println(message + " :: Sent");
TimeStampedMessage duplicatedMessage = new TimeStampedMessage(message);
TimeStampedMessage duplicatedMessage = new TimeStampedMessage((TimeStampedMessage)(message));
//TimeStampedMessage duplicatedMessage = new TimeStampedMessage(message);
out.writeObject(duplicatedMessage);
System.out.println(duplicatedMessage + " :: Sent");
while (!sendBuffer.isEmpty()) {
Expand Down
108 changes: 108 additions & 0 deletions src/lab2/MulticastMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package lab2;

import java.io.Serializable;
import java.util.Hashtable;

public class MulticastMessage extends Message implements Serializable{
/**
*
*/
private static final long serialVersionUID = 403804568197137128L;
protected Clock timeStamp;
private boolean concurrent;
private String groupName;
protected Hashtable <String, Integer> acknowledgement = null;


public MulticastMessage(){};
public MulticastMessage(String dest, String kind, Object data, Clock timeStamp){
super(dest, kind, data);
this.timeStamp = timeStamp;
this.concurrent = false;
this.groupName = null;
}

public MulticastMessage(Message originalMessage, String gName, Clock timeStamp, Hashtable <String, Integer> Rpg){
this.source = originalMessage.source;
this.dest = originalMessage.dest;
this.kind = originalMessage.kind;
this.seqNum = originalMessage.seqNum;
this.duplicate = originalMessage.duplicate;
this.data = originalMessage.data;
this.timeStamp = timeStamp;
this.groupName = gName;
this.concurrent = false;
for(String tmpKey : Rpg.keySet()){
this.acknowledgement.put(tmpKey, Rpg.get(tmpKey));
}
}

public MulticastMessage(TimeStampedMessage originalMessage, Hashtable <String, Integer> Rpg) {
this.source = originalMessage.source;
this.dest = originalMessage.dest;
this.kind = originalMessage.kind;
this.seqNum = originalMessage.seqNum;
this.duplicate = originalMessage.duplicate; // * Important
this.data = originalMessage.data; // clone?
this.concurrent = false;
this.timeStamp = originalMessage.timeStamp;
for(String tmpKey : Rpg.keySet()){
this.acknowledgement.put(tmpKey, Rpg.get(tmpKey));
}
}

public MulticastMessage(MulticastMessage originalMessage) {
this.source = originalMessage.source;
this.dest = originalMessage.dest;
this.kind = originalMessage.kind;
this.seqNum = originalMessage.seqNum;
this.groupName = originalMessage.groupName;
this.duplicate = originalMessage.duplicate; // * Important
this.data = originalMessage.data; // clone?
this.concurrent = true;
this.timeStamp = originalMessage.timeStamp;
this.acknowledgement = originalMessage.acknowledgement;
}

public void setConcurrent(){
this.concurrent = true;
}

public boolean getConcurrent(){
return this.concurrent;
}

public void setGroupName(String gName){
this.groupName = gName;
}

public String getGroupName(){
return this.groupName;
}



public void setTimeStamp(Clock hostTimeStamp){
this.timeStamp = hostTimeStamp;
}

public Clock getTimeStamp(){
return this.timeStamp;
}

public Hashtable <String, Integer> getAcknowledgement(){
return this.acknowledgement;
}

public void setAcknowledgement(Hashtable <String, Integer> Rpg){
for(String key : Rpg.keySet()){
this.acknowledgement.put(key, Rpg.get(key));
}
}

@Override
public String toString() {
return "Message["+ source +"->"+ dest +" seqNum:"+ seqNum +" duplicate:"+ duplicate +" kind:"+ kind
+" data:"+ data +" \nTimeStamp:" + this.timeStamp + " \nAcknowledgement:" + this.acknowledgement + "]\n";
}
}
Loading

0 comments on commit 9f4953f

Please sign in to comment.