Skip to content

Commit

Permalink
Working as 2 sets on same machine
Browse files Browse the repository at this point in the history
  • Loading branch information
sakagg committed Nov 3, 2016
1 parent 0bf67ee commit f322054
Show file tree
Hide file tree
Showing 10 changed files with 941 additions and 145 deletions.
7 changes: 5 additions & 2 deletions config.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
rmiserver.host = localhost
rmiserver.port = 1099
rmi.namenode.ip = localhost
rmi.namenode.port = 1099

rmi.datanode.ip = localhost
rmi.datanode.port = 1432
6 changes: 6 additions & 0 deletions hdfs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,10 @@ message HeartBeatRequest {

message HeartBeatResponse {
optional int32 status = 1;
}

message ReportIPRequest {
optional int32 id = 1;
optional string ip = 2;
optional int32 port = 3;
}
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CLIENT_CODE_DIR="Client"
PROTO_CODE_DIR="Proto"
DEPENDENCIES="ExternalLibs/protobuf-java-3.0.0.jar"
COMPILED_DIR="Compiled"
NUM_DNS=3
NUM_DNS=2

kill $(ps aux | grep 'NameNode' | awk '{print $2}')
kill $(ps aux | grep 'DataNode' | awk '{print $2}')
Expand Down
120 changes: 59 additions & 61 deletions src/Client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
import NameNode.INameNode;
import DataNode.IDataNode;
import NameNode.NameNode;
import static NameNode.NameNode.log;
import Proto.Hdfs;
import Proto.ProtoMessage;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.rmi.Naming;
import java.util.ArrayList;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
Expand All @@ -35,11 +35,9 @@ public class Client {
private static final String NN_NAME = "NameNode";
private static final String DN_PREFIX = "DataNode";
private static final Integer CHUNK_SIZE = 10;
private static Integer DN_COUNT = -1;
private static String rmiHost = "";
private static final Properties props = new Properties();

private INameNode nn = null;
private HashMap<Integer, IDataNode> dns = new HashMap<>();

public static void log(String s) {
String op = String.valueOf(System.currentTimeMillis()) + " ";
Expand All @@ -49,78 +47,50 @@ public static void log(String s) {
}

public static void main(String args[]) {
Properties props = new Properties();
try {
props.load(new BufferedReader(new FileReader("config.properties")));
} catch (IOException ex) {
Logger.getLogger(NameNode.class.getName()).log(Level.SEVERE, null, ex);
}

rmiHost = props.getProperty("rmiserver.host", "localhost")
+ ":" + props.getProperty("rmiserver.port", "1099");

DN_COUNT = Integer.parseInt(args[1]);
Client client = new Client();
client.findnn();
client.finddns(DN_COUNT);
client.mainloop();
}

public void findnn() {
while(nn == null)
{
try {
nn = (INameNode) Naming.lookup("rmi://" + rmiHost + "/" + NN_NAME);
nn = (INameNode) Naming.lookup("rmi://" + props.getProperty("rmi.namenode.ip")
+ ":" + props.getProperty("rmi.namenode.port") + "/" + NN_NAME);
log("Found Name Node");
} catch (Exception e) {}
} catch (NotBoundException | MalformedURLException | RemoteException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
if (nn == null)
try {
Thread.sleep(1000);
} catch (Exception e) {}
}
}

public void finddns(Integer numberDNs) {
HashSet<Integer> leftPeers = new HashSet<>();
for(int i=0; i<numberDNs; i++)
leftPeers.add(i);
for(;;) {
ArrayList<Integer> toDelete = new ArrayList<>();
for(Integer i: leftPeers) {
IDataNode dn;
try {
dn = (IDataNode) Naming.lookup("rmi://" + rmiHost + "/" + DN_PREFIX + i.toString());
} catch (Exception e) {
continue;
}
toDelete.add(i);
dns.put(i, dn);
log("Found Data Node " + i.toString());
}
toDelete.stream().forEach((i) -> {leftPeers.remove(i);});
if(leftPeers.isEmpty()) {
break;
}
try {
Thread.sleep(1000);
} catch (Exception E) {}
}
}

public void listFiles() {
byte[] inp = "*".getBytes();
try {
byte[] res = nn.list(inp);
Hdfs.ListFilesResponse listFilesResponse = Hdfs.ListFilesResponse.parseFrom(res);
if(listFilesResponse.getStatus() == 1) {
for(String fileName : listFilesResponse.getFileNamesList()) {
listFilesResponse.getFileNamesList().stream().forEach((fileName) -> {
System.out.println(fileName);
}
});
System.out.println("");
} else {
System.err.println("Some Error Occured During Listing File");
}
} catch (Exception e) { log(e.toString()); }
} catch (RemoteException | InvalidProtocolBufferException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}

}

Expand All @@ -130,7 +100,9 @@ public Integer openFileForWrite(String filename) {
try {
byte[] openResponse = nn.openFile(openRequest);
handle = Hdfs.OpenFileResponse.parseFrom(openResponse).getHandle();
} catch (Exception e) {}
} catch (InvalidProtocolBufferException | RemoteException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
return handle;
}

Expand All @@ -146,7 +118,9 @@ public List<Hdfs.BlockLocations> openFileForRead(String fileName) {
Hdfs.BlockLocationResponse response = Hdfs.BlockLocationResponse.parseFrom(blockLocationResponse);
blockLocations = response.getBlockLocationsList();
}
} catch (Exception e) { log(e.toString()); }
} catch (RemoteException | InvalidProtocolBufferException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
return blockLocations;
}

Expand All @@ -157,6 +131,18 @@ public void closeFile(Integer handle) {
} catch (Exception e) {}
}

IDataNode getDataNode(String ip, Integer port) {
IDataNode dn = null;
try {
dn = (IDataNode) Naming.lookup("rmi://" + ip
+ ":" + port
+ "/" + DN_PREFIX);
} catch (NotBoundException | MalformedURLException | RemoteException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
return dn;
}

public void writeFile(String fileName, byte[] data) {
Integer handle = openFileForWrite(fileName);
byte[] assignBlockRequest = ProtoMessage.assignBlockRequest(handle);
Expand All @@ -167,13 +153,17 @@ public void writeFile(String fileName, byte[] data) {
try {
byte[] response = nn.assignBlock(assignBlockRequest);
blockLocations = Hdfs.AssignBlockResponse.parseFrom(response).getNewBlock();
} catch (Exception e) {}
// NOTE: Assuming Data Node lacation's port represents its id.
IDataNode dn = dns.get(blockLocations.getLocations(0).getPort());
} catch (RemoteException | InvalidProtocolBufferException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}

IDataNode dn = getDataNode(blockLocations.getLocations(0).getIp(), blockLocations.getLocations(0).getPort());
byte[] writeBlockRequest = ProtoMessage.writeBlockRequest(chunk_data, blockLocations);
try {
dn.writeBlock(writeBlockRequest);
} catch (Exception e) {}
} catch (RemoteException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
}
closeFile(handle);
}
Expand All @@ -186,14 +176,15 @@ public void readFile(String fileName) {
Random rand = new Random();
Integer dataNodeInd = rand.nextInt(block.getLocationsCount());
Hdfs.DataNodeLocation dnl = block.getLocations(dataNodeInd);
Integer dataNodeId = dnl.getPort();
log("Pulling data from DN " + dataNodeId.toString());
log("Pulling data from DN " + dnl.getIp() + ":" + dnl.getPort());
try {
byte[] request = ProtoMessage.readBlockRequest(block.getBlockNumber());
byte[] response = dns.get(dataNodeId).readBlock(request);
byte[] response = getDataNode(dnl.getIp(), dnl.getPort()).readBlock(request);
Hdfs.ReadBlockResponse readBlockResponse = Hdfs.ReadBlockResponse.parseFrom(response);
data = data.concat(readBlockResponse.getData(0));
} catch (Exception e) { log(e.toString()); }
} catch (RemoteException | InvalidProtocolBufferException ex) {
Logger.getLogger(Client.class.getName()).log(Level.SEVERE, null, ex);
}
}
System.out.println("File " + fileName + " has contents: " + data.toStringUtf8());
}
Expand All @@ -207,12 +198,19 @@ public void mainloop() {
for(;;) {
String line = in.nextLine();
String[] ip = line.split(" ");
if(ip[0].contentEquals("write"))
writeFile(ip[1], ip[2].getBytes());
else if(ip[0].contentEquals("read"))
readFile(ip[1]);
else if(ip[0].contentEquals("list"))
listFiles();
switch (ip[0]) {
case "write":
writeFile(ip[1], ip[2].getBytes());
break;
case "read":
readFile(ip[1]);
break;
case "list":
listFiles();
break;
default:
break;
}
}
}
}
Loading

0 comments on commit f322054

Please sign in to comment.