Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dbw9580 committed Nov 29, 2023
1 parent b5bd290 commit 8b22649
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
Expand Down Expand Up @@ -116,116 +116,66 @@ public void join(WorkerInfo workerInfo) throws IOException {

@Override
public WorkerClusterView getAllMembers() throws IOException {
return new AllWorkersClusterView();
Iterable<WorkerInfo> workerInfoIterable = parseWorkersFromEtcdKvPairs(
mAlluxioEtcdClient.getChildren(getRingPathPrefix()))
.map(w -> new WorkerInfo()
.setIdentity(w.getIdentity())
.setAddress(w.getWorkerNetAddress()))
::iterator;
return new WorkerClusterView(workerInfoIterable);
}

@Override
public WorkerClusterView getLiveMembers() throws IOException {
return new LiveWorkersClusterView();
Iterable<WorkerInfo> workerInfoIterable = parseWorkersFromEtcdKvPairs(
mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices())
.map(w -> new WorkerInfo()
.setIdentity(w.getIdentity())
.setAddress(w.getWorkerNetAddress()))
::iterator;
return new WorkerClusterView(workerInfoIterable);
}

@Override
public WorkerClusterView getFailedMembers() throws IOException {
return new FailedWorkersClusterView(new AllWorkersClusterView(), new LiveWorkersClusterView());
}

class AllWorkersClusterView extends EtcdWorkerClusterView {
@Override
protected Stream<WorkerServiceEntity> getWorkersInView() {
return mAlluxioEtcdClient.getChildren(getRingPathPrefix())
.stream()
.map(super::decode)
.filter(Optional::isPresent)
.map(Optional::get);
}
}

class LiveWorkersClusterView extends EtcdWorkerClusterView {
@Override
protected Stream<WorkerServiceEntity> getWorkersInView() {
return mAlluxioEtcdClient.mServiceDiscovery
.getAllLiveServices()
.stream()
.map(super::decode)
.filter(Optional::isPresent)
.map(Optional::get);
}
}

static class FailedWorkersClusterView extends EtcdWorkerClusterView {
private final AllWorkersClusterView mAllWorkers;
private final LiveWorkersClusterView mLiveWorkers;

private FailedWorkersClusterView(
AllWorkersClusterView allWorkers,
LiveWorkersClusterView liveWorkers) {
mAllWorkers = allWorkers;
mLiveWorkers = liveWorkers;
}

@Override
protected Stream<WorkerServiceEntity> getWorkersInView() {
Set<WorkerIdentity> liveWorkerIds = mLiveWorkers.getWorkersInView()
.map(WorkerServiceEntity::getIdentity)
.collect(Collectors.toSet());
return mAllWorkers.getWorkersInView()
.filter(w -> !liveWorkerIds.contains(w.getIdentity()));
}
}

abstract static class EtcdWorkerClusterView implements WorkerClusterView {
@Override
public Optional<WorkerInfo> getWorkerById(WorkerIdentity toFind) {
return getWorkersInView()
.filter(w -> toFind.equals(w.getIdentity()))
.findAny()
.map(w -> new WorkerInfo()
.setIdentity(w.getIdentity())
.setAddress(w.getWorkerNetAddress()));
}

@Override
public Iterator<WorkerInfo> iterator() {
return getWorkersInView()
.map(w -> new WorkerInfo()
.setIdentity(w.getIdentity())
.setAddress(w.getWorkerNetAddress()))
.iterator();
}

@Override
public int size() {
return (int) getWorkersInView().count();
}

@Override
public boolean isEmpty() {
return !getWorkersInView().findAny().isPresent();
}

private Optional<WorkerServiceEntity> decode(KeyValue etcdKvPair) {
try {
WorkerServiceEntity entity = new WorkerServiceEntity();
entity.deserialize(etcdKvPair.getValue().getBytes());
return Optional.of(entity);
} catch (JsonParseException ex) {
return Optional.empty();
}
Set<WorkerIdentity> liveWorkerIds = parseWorkersFromEtcdKvPairs(
mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices())
.map(WorkerServiceEntity::getIdentity)
.collect(Collectors.toSet());
Iterable<WorkerInfo> failedWorkerIterable = parseWorkersFromEtcdKvPairs(
mAlluxioEtcdClient.getChildren(getRingPathPrefix()))
.filter(w -> !liveWorkerIds.contains(w.getIdentity()))
.map(w -> new WorkerInfo()
.setIdentity(w.getIdentity())
.setAddress(w.getWorkerNetAddress()))
::iterator;
return new WorkerClusterView(failedWorkerIterable);
}

private Stream<WorkerServiceEntity> parseWorkersFromEtcdKvPairs(List<KeyValue> workerKvs) {
return workerKvs
.stream()
.map(this::parseWorkerServiceEntity)
.filter(Optional::isPresent)
.map(Optional::get);
}

private Optional<WorkerServiceEntity> parseWorkerServiceEntity(KeyValue etcdKvPair) {
try {
WorkerServiceEntity entity = new WorkerServiceEntity();
entity.deserialize(etcdKvPair.getValue().getBytes());
return Optional.of(entity);
} catch (JsonParseException ex) {
return Optional.empty();
}

/**
* @implSpec implementations should provide a stream of workers that's contained in the
* view they want to represent.
*/
protected abstract Stream<WorkerServiceEntity> getWorkersInView();
}

@Override
@VisibleForTesting
public String showAllMembers() {
try {
WorkerClusterView registeredWorkers = getAllMembers().snapshot();
WorkerClusterView liveWorkers = getLiveMembers().snapshot();
WorkerClusterView registeredWorkers = getAllMembers();
WorkerClusterView liveWorkers = getLiveMembers();
String printFormat = "%s\t%s\t%s%n";
StringBuilder sb = new StringBuilder(
String.format(printFormat, "WorkerId", "Address", "Status"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.util.Collections;

/**
* A bypass no-op membership manager to disable MembershipManager module
Expand All @@ -38,17 +39,17 @@ public void join(WorkerInfo worker) throws IOException {

@Override
public WorkerClusterView getAllMembers() throws IOException {
return WorkerClusterView.ofWorkers();
return new WorkerClusterView(Collections.emptyList());
}

@Override
public WorkerClusterView getLiveMembers() throws IOException {
return WorkerClusterView.ofWorkers();
return new WorkerClusterView(Collections.emptyList());
}

@Override
public WorkerClusterView getFailedMembers() throws IOException {
return WorkerClusterView.ofWorkers();
return new WorkerClusterView(Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -130,18 +131,18 @@ public void join(WorkerInfo worker) throws IOException {

@Override
public WorkerClusterView getAllMembers() throws IOException {
return WorkerClusterView.ofWorkers(mMembers);
return new WorkerClusterView(mMembers);
}

@Override
public WorkerClusterView getLiveMembers() throws IOException {
// all workers are considered by the static membership manager to be always live
return WorkerClusterView.ofWorkers(mMembers);
return new WorkerClusterView(mMembers);
}

@Override
public WorkerClusterView getFailedMembers() throws IOException {
return WorkerClusterView.ofWorkers();
return new WorkerClusterView(Collections.emptyList());
}

@Override
Expand All @@ -150,7 +151,7 @@ public String showAllMembers() {
StringBuilder sb = new StringBuilder(
String.format(printFormat, "WorkerId", "Address", "Status"));
try {
for (WorkerInfo worker : getAllMembers().snapshot()) {
for (WorkerInfo worker : getAllMembers()) {
String entryLine = String.format(printFormat,
HashUtils.hashAsStringMD5(worker.getAddress().dumpMainInfo()),
worker.getAddress().getHost() + ":" + worker.getAddress().getRpcPort(),
Expand Down

This file was deleted.

Loading

0 comments on commit 8b22649

Please sign in to comment.