diff --git a/bin/lab2/GroupsRpg.class b/bin/lab2/GroupsRpg.class new file mode 100644 index 0000000..4cfb43b Binary files /dev/null and b/bin/lab2/GroupsRpg.class differ diff --git a/bin/lab2/Message.class b/bin/lab2/Message.class index 598d9ec..8a9276d 100644 Binary files a/bin/lab2/Message.class and b/bin/lab2/Message.class differ diff --git a/bin/lab2/MessagePasser$ClientThread.class b/bin/lab2/MessagePasser$ClientThread.class index 79ffc04..eb0734c 100644 Binary files a/bin/lab2/MessagePasser$ClientThread.class and b/bin/lab2/MessagePasser$ClientThread.class differ diff --git a/bin/lab2/MessagePasser$ListenThread.class b/bin/lab2/MessagePasser$ListenThread.class index 1780bbf..c63849e 100644 Binary files a/bin/lab2/MessagePasser$ListenThread.class and b/bin/lab2/MessagePasser$ListenThread.class differ diff --git a/bin/lab2/MessagePasser.class b/bin/lab2/MessagePasser.class index 180dfa9..320155f 100644 Binary files a/bin/lab2/MessagePasser.class and b/bin/lab2/MessagePasser.class differ diff --git a/bin/lab2/MulticastMessage.class b/bin/lab2/MulticastMessage.class new file mode 100644 index 0000000..4a86946 Binary files /dev/null and b/bin/lab2/MulticastMessage.class differ diff --git a/bin/lab2/MulticastMessagePasser.class b/bin/lab2/MulticastMessagePasser.class new file mode 100644 index 0000000..cb45e85 Binary files /dev/null and b/bin/lab2/MulticastMessagePasser.class differ diff --git a/config.yaml b/config.yaml index 2dc0f7e..2027950 100644 --- a/config.yaml +++ b/config.yaml @@ -8,7 +8,7 @@ configuration : - name : alice ip : 127.0.0.1 - port : 12344 # This is the incoming port on which MP will listen + port : 12347 # This is the incoming port on which MP will listen # May be changed for further communication as usual - name : bob ip : 127.0.0.1 @@ -23,6 +23,20 @@ configuration : ip : 127.0.0.1 port : 2004 +groups : +- name : Group1 + members : + - alice + - bob + - charlie + +- name : Group2 + members : # This is the incoming port on which MP will listen + - bob + - charlie + - daphnie # May be changed for further communication as usual + + clock: logical sendRules : diff --git a/src/lab2/GroupsRpg.java b/src/lab2/GroupsRpg.java new file mode 100644 index 0000000..723df76 --- /dev/null +++ b/src/lab2/GroupsRpg.java @@ -0,0 +1,77 @@ +package lab2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Hashtable; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +public class GroupsRpg { + public Hashtable> groups = new Hashtable> (); + + public GroupsRpg(){} + public GroupsRpg(Object groups){ + ArrayList groupList = (ArrayList) groups; + for(Object group : groupList){ + String groupName = null; + ArrayList groupMem = new ArrayList(); + Map groupInfo = (Map) group; + for (Map.Entry entry : groupInfo.entrySet()){ + Object value = entry.getValue(); + String key = entry.getKey(); + if("name".equals(key)){ + groupName = (String) value; + } + else if("members".equals(key)){ + groupMem = (ArrayList) value; + } + else{ + System.out.println("ERROR IN GROUP FILE"); + } + + Hashtable Rpg = new Hashtable(); + for(String tmp : groupMem){ + Rpg.put(tmp, 0); + } + + this.groups.put(groupName, Rpg); + } + + + } + } + + public GroupsRpg deepCopy(){ + GroupsRpg copy = new GroupsRpg(); + for(String key : this.groups.keySet()){ + copy.groups.put(key, new Hashtable (this.groups.get(key))); + } + return copy; + } + + + public Hashtable getGroupByName(String groupName){ + return this.groups.get(groupName); + } + + + @Override + public String toString() { + String toString = "{\n"; + for (Entry> entry : this.groups.entrySet()) { + Hashtable tmp = entry.getValue(); + toString += entry.getKey() + ":"; + Object [] arrMem = tmp.keySet().toArray(); + Arrays.sort(arrMem); + for(int i = 0; i < arrMem.length; i++){ + toString += "[" + arrMem[i] + ": " + entry.getValue().get(arrMem[i]) + "] "; + } + //toString += tmp.keySet().toString(); + toString += "\n"; + } + toString += "}\n"; + return toString; + } +} diff --git a/src/lab2/Message.java b/src/lab2/Message.java index a9b153c..aff1d25 100644 --- a/src/lab2/Message.java +++ b/src/lab2/Message.java @@ -43,6 +43,10 @@ public void setSource(String source){ this.source = source; } + public void setDest(String dest){ + this.dest = dest; + } + public String getSource(){ return this.source; } diff --git a/src/lab2/MessagePasser.java b/src/lab2/MessagePasser.java index 09774f1..81d8376 100644 --- a/src/lab2/MessagePasser.java +++ b/src/lab2/MessagePasser.java @@ -13,6 +13,7 @@ import java.net.SocketException; import java.util.Collections; import java.util.HashMap; +import java.util.Hashtable; import java.util.LinkedList; import java.util.ArrayList; import java.util.List; @@ -81,7 +82,8 @@ public class MessagePasser { * "MessagePasser.receive()" */ private List inputQueue = new LinkedList(); - + + /** * Reference number for each ClientThread. For debugging purpose. */ @@ -97,6 +99,18 @@ public class MessagePasser { */ private ArrayList connectionThreads = new ArrayList(); + + + public GroupsRpg groups; + public Hashtable> holdBuffer = new Hashtable>(); + public MulticastMessagePasser mmp; + /** + * Store messages delivered from "receiveBuffer" to be displayed by + * "MessagePasser.receive()" + */ + //private List holdBuffer = new LinkedList(); + + /** * Receive 2 arguments for config filename and local name * @@ -113,17 +127,26 @@ public static void main(String[] argv) { + "c\t\tShow local TimeStamp\n" + "q\t\tQuit"); MessagePasser mp = new MessagePasser(argv[0], argv[1]); + mp.listen(); mp.showCommandPrompt(); } } + public String getLocalName(){ + return this.localName; + } + + public Clock getTimeStamp(){ + return this.hostTimeStamp; + } /** * MessagePasser constructor. Defined by the assignment. * * @param configuration_filename * @param local_name */ + public MessagePasser(){} public MessagePasser(String configuration_filename, String local_name) { /* System.out.println("== MessagePasser [wkanchan + ytobioka] ==\n" + "Usage:\n" @@ -135,6 +158,7 @@ public MessagePasser(String configuration_filename, String local_name) { this.localName = local_name; this.configFileName = configuration_filename; this.readConfig(); + this.mmp = new MulticastMessagePasser(this); /* listen(); showCommandPrompt(); @@ -172,6 +196,15 @@ synchronized private void readConfig() { System.exit(1); } } + /** + * author : wenzheli + * add group + */ + else if("groups".equals(key)){ + this.groups = new GroupsRpg(entry.getValue()); + + //System.out.println(this.groups.toString()); + } else if ("sendRules".equals(key)) { this.sendRules = new Rules(entry.getValue()); } else if ("receiveRules".equals(key)) { @@ -337,22 +370,38 @@ private void showCommandPrompt() { BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); System.out.print(localName + "# "); String inputCommand; + try { while ((inputCommand = in.readLine()) != null) { String[] tokens = inputCommand.split(" "); if (tokens.length > 0) { if ("s".equals(tokens[0]) && tokens.length == 3) { - // Send: Create a new message and send - Message message = new Message(tokens[1], tokens[2], null); - message.setSource(localName); - message.setSeqNum(nextSeqNum++); - // Increment Local TimeStamp - this.hostTimeStamp.addClock(); - // Add TimeStamp to Message - TimeStampedMessage t_message = new TimeStampedMessage(message,this.hostTimeStamp.deepCopy()); - send(t_message); + /** + * send multicast message + */ + if(tokens[1].startsWith("Group")){ + Message message = new Message(tokens[1], tokens[2], null); + message.setSource(localName); + this.hostTimeStamp.addClock(); + this.mmp.send(tokens[1], message); + } + else{ + // Send: Create a new message and send + Message message = new Message(tokens[1], tokens[2], null); + message.setSource(localName); + message.setSeqNum(nextSeqNum++); + // Increment Local TimeStamp + this.hostTimeStamp.addClock(); + // Add TimeStamp to Message + TimeStampedMessage t_message = new TimeStampedMessage(message,this.hostTimeStamp.deepCopy()); + send(t_message); + } } else if ("r".equals(tokens[0])) { // Receive + + /** + * modified by wenzhe + */ TimeStampedMessage message; while ((message = (TimeStampedMessage)receive()) != null) { System.out.println(message); @@ -386,7 +435,7 @@ private void showCommandPrompt() { * The message to send. */ /// void send(Message message) { - void send(TimeStampedMessage message) { + void send(Message message) { // read configuration file to load Rules this.readConfig(); // Check whether current connection is exists. @@ -454,7 +503,8 @@ void send(TimeStampedMessage message) { // Then, send delayed messages in sendBuffer out.writeObject(message); System.out.println(message + " :: Sent"); - TimeStampedMessage duplicatedMessage = new TimeStampedMessage(message); + TimeStampedMessage duplicatedMessage = new TimeStampedMessage((TimeStampedMessage)(message)); + //TimeStampedMessage duplicatedMessage = new TimeStampedMessage(message); out.writeObject(duplicatedMessage); System.out.println(duplicatedMessage + " :: Sent"); while (!sendBuffer.isEmpty()) { diff --git a/src/lab2/MulticastMessage.java b/src/lab2/MulticastMessage.java new file mode 100644 index 0000000..fa40dd3 --- /dev/null +++ b/src/lab2/MulticastMessage.java @@ -0,0 +1,108 @@ +package lab2; + +import java.io.Serializable; +import java.util.Hashtable; + +public class MulticastMessage extends Message implements Serializable{ + /** + * + */ + private static final long serialVersionUID = 403804568197137128L; + protected Clock timeStamp; + private boolean concurrent; + private String groupName; + protected Hashtable acknowledgement = null; + + + public MulticastMessage(){}; + public MulticastMessage(String dest, String kind, Object data, Clock timeStamp){ + super(dest, kind, data); + this.timeStamp = timeStamp; + this.concurrent = false; + this.groupName = null; + } + + public MulticastMessage(Message originalMessage, String gName, Clock timeStamp, Hashtable Rpg){ + this.source = originalMessage.source; + this.dest = originalMessage.dest; + this.kind = originalMessage.kind; + this.seqNum = originalMessage.seqNum; + this.duplicate = originalMessage.duplicate; + this.data = originalMessage.data; + this.timeStamp = timeStamp; + this.groupName = gName; + this.concurrent = false; + for(String tmpKey : Rpg.keySet()){ + this.acknowledgement.put(tmpKey, Rpg.get(tmpKey)); + } + } + + public MulticastMessage(TimeStampedMessage originalMessage, Hashtable Rpg) { + this.source = originalMessage.source; + this.dest = originalMessage.dest; + this.kind = originalMessage.kind; + this.seqNum = originalMessage.seqNum; + this.duplicate = originalMessage.duplicate; // * Important + this.data = originalMessage.data; // clone? + this.concurrent = false; + this.timeStamp = originalMessage.timeStamp; + for(String tmpKey : Rpg.keySet()){ + this.acknowledgement.put(tmpKey, Rpg.get(tmpKey)); + } + } + + public MulticastMessage(MulticastMessage originalMessage) { + this.source = originalMessage.source; + this.dest = originalMessage.dest; + this.kind = originalMessage.kind; + this.seqNum = originalMessage.seqNum; + this.groupName = originalMessage.groupName; + this.duplicate = originalMessage.duplicate; // * Important + this.data = originalMessage.data; // clone? + this.concurrent = true; + this.timeStamp = originalMessage.timeStamp; + this.acknowledgement = originalMessage.acknowledgement; + } + + public void setConcurrent(){ + this.concurrent = true; + } + + public boolean getConcurrent(){ + return this.concurrent; + } + + public void setGroupName(String gName){ + this.groupName = gName; + } + + public String getGroupName(){ + return this.groupName; + } + + + + public void setTimeStamp(Clock hostTimeStamp){ + this.timeStamp = hostTimeStamp; + } + + public Clock getTimeStamp(){ + return this.timeStamp; + } + + public Hashtable getAcknowledgement(){ + return this.acknowledgement; + } + + public void setAcknowledgement(Hashtable Rpg){ + for(String key : Rpg.keySet()){ + this.acknowledgement.put(key, Rpg.get(key)); + } + } + + @Override + public String toString() { + return "Message["+ source +"->"+ dest +" seqNum:"+ seqNum +" duplicate:"+ duplicate +" kind:"+ kind + +" data:"+ data +" \nTimeStamp:" + this.timeStamp + " \nAcknowledgement:" + this.acknowledgement + "]\n"; + } +} diff --git a/src/lab2/MulticastMessagePasser.java b/src/lab2/MulticastMessagePasser.java new file mode 100644 index 0000000..353fc1e --- /dev/null +++ b/src/lab2/MulticastMessagePasser.java @@ -0,0 +1,68 @@ +package lab2; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.net.Socket; +import java.net.SocketException; +import java.util.Hashtable; + +public class MulticastMessagePasser{ + /** + * groupRpg stores the Rpg for each node in each group; + */ + public String localHostName; + public MessagePasser messagePasser; + public GroupsRpg groupRpg = null; + /** + * groupSpg stores the local Spg in each group; + */ + public Hashtable groupSpg = null; + public Hashtable sentQueue = new Hashtable(); + + + public MulticastMessagePasser(MessagePasser gMessagePasser){ + this.groupSpg = new Hashtable (); + this.localHostName = gMessagePasser.getLocalName(); + this.messagePasser = gMessagePasser; + for(String key : gMessagePasser.groups.groups.keySet()){ + this.groupSpg.put(key, 0); + } + this.groupRpg = gMessagePasser.groups; + + this.sentQueue = new Hashtable(); + + } + + + public void send(String groupName, Message message) { + + message.setSeqNum(this.groupSpg.get(groupName)); + this.groupSpg.put(groupName, this.groupSpg.get(groupName) + 1); + + for(String mem : this.groupRpg.groups.get(groupName).keySet()){ + if(mem.equals(this.localHostName)){ + message.setDest(mem); + MulticastMessage multicastMsg = new MulticastMessage(message, groupName, this.messagePasser.getTimeStamp(), + new Hashtable(this.groupRpg.groups.get(groupName))); + + this.messagePasser.send(multicastMsg); + } + } + } + + public void receive(){ + + } + + public adjustRpg(){ + + } + + + + + + + + +} diff --git a/src/lab2/TimeStampedMessage.java b/src/lab2/TimeStampedMessage.java index 03fb21d..c1d1f26 100644 --- a/src/lab2/TimeStampedMessage.java +++ b/src/lab2/TimeStampedMessage.java @@ -64,5 +64,7 @@ public Clock getTimeStamp(){ public String toString() { return "Message["+source+"->"+dest+" seqNum:"+seqNum+" duplicate:"+duplicate+" kind:"+kind+" data:"+data+" TimeStamp:" + this.timeStamp + "]"; } + + }