From 8b226496da0031143d6fc69529c99e9ed21893e2 Mon Sep 17 00:00:00 2001 From: Bowen Ding <6999708+dbw9580@users.noreply.github.com> Date: Wed, 29 Nov 2023 11:37:50 +0800 Subject: [PATCH] address comments --- .../membership/EtcdMembershipManager.java | 142 ++++++------------ .../membership/MasterMembershipManager.java | 7 +- .../membership/StaticMembershipManager.java | 9 +- .../membership/WorkerClusterSnapshot.java | 103 ------------- .../alluxio/membership/WorkerClusterView.java | 105 +++++++++---- .../membership/MembershipManagerTest.java | 10 +- 6 files changed, 132 insertions(+), 244 deletions(-) delete mode 100644 dora/core/common/src/main/java/alluxio/membership/WorkerClusterSnapshot.java diff --git a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java index 859182a4dd89..e5cae8eeaea8 100644 --- a/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/EtcdMembershipManager.java @@ -25,7 +25,7 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; @@ -116,116 +116,66 @@ public void join(WorkerInfo workerInfo) throws IOException { @Override public WorkerClusterView getAllMembers() throws IOException { - return new AllWorkersClusterView(); + Iterable workerInfoIterable = parseWorkersFromEtcdKvPairs( + mAlluxioEtcdClient.getChildren(getRingPathPrefix())) + .map(w -> new WorkerInfo() + .setIdentity(w.getIdentity()) + .setAddress(w.getWorkerNetAddress())) + ::iterator; + return new WorkerClusterView(workerInfoIterable); } @Override public WorkerClusterView getLiveMembers() throws IOException { - return new LiveWorkersClusterView(); + Iterable workerInfoIterable = parseWorkersFromEtcdKvPairs( + mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()) + .map(w -> new WorkerInfo() + .setIdentity(w.getIdentity()) + .setAddress(w.getWorkerNetAddress())) + ::iterator; + return new WorkerClusterView(workerInfoIterable); } @Override public WorkerClusterView getFailedMembers() throws IOException { - return new FailedWorkersClusterView(new AllWorkersClusterView(), new LiveWorkersClusterView()); - } - - class AllWorkersClusterView extends EtcdWorkerClusterView { - @Override - protected Stream getWorkersInView() { - return mAlluxioEtcdClient.getChildren(getRingPathPrefix()) - .stream() - .map(super::decode) - .filter(Optional::isPresent) - .map(Optional::get); - } - } - - class LiveWorkersClusterView extends EtcdWorkerClusterView { - @Override - protected Stream getWorkersInView() { - return mAlluxioEtcdClient.mServiceDiscovery - .getAllLiveServices() - .stream() - .map(super::decode) - .filter(Optional::isPresent) - .map(Optional::get); - } - } - - static class FailedWorkersClusterView extends EtcdWorkerClusterView { - private final AllWorkersClusterView mAllWorkers; - private final LiveWorkersClusterView mLiveWorkers; - - private FailedWorkersClusterView( - AllWorkersClusterView allWorkers, - LiveWorkersClusterView liveWorkers) { - mAllWorkers = allWorkers; - mLiveWorkers = liveWorkers; - } - - @Override - protected Stream getWorkersInView() { - Set liveWorkerIds = mLiveWorkers.getWorkersInView() - .map(WorkerServiceEntity::getIdentity) - .collect(Collectors.toSet()); - return mAllWorkers.getWorkersInView() - .filter(w -> !liveWorkerIds.contains(w.getIdentity())); - } - } - - abstract static class EtcdWorkerClusterView implements WorkerClusterView { - @Override - public Optional getWorkerById(WorkerIdentity toFind) { - return getWorkersInView() - .filter(w -> toFind.equals(w.getIdentity())) - .findAny() - .map(w -> new WorkerInfo() - .setIdentity(w.getIdentity()) - .setAddress(w.getWorkerNetAddress())); - } - - @Override - public Iterator iterator() { - return getWorkersInView() - .map(w -> new WorkerInfo() - .setIdentity(w.getIdentity()) - .setAddress(w.getWorkerNetAddress())) - .iterator(); - } - - @Override - public int size() { - return (int) getWorkersInView().count(); - } - - @Override - public boolean isEmpty() { - return !getWorkersInView().findAny().isPresent(); - } - - private Optional decode(KeyValue etcdKvPair) { - try { - WorkerServiceEntity entity = new WorkerServiceEntity(); - entity.deserialize(etcdKvPair.getValue().getBytes()); - return Optional.of(entity); - } catch (JsonParseException ex) { - return Optional.empty(); - } + Set liveWorkerIds = parseWorkersFromEtcdKvPairs( + mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()) + .map(WorkerServiceEntity::getIdentity) + .collect(Collectors.toSet()); + Iterable failedWorkerIterable = parseWorkersFromEtcdKvPairs( + mAlluxioEtcdClient.getChildren(getRingPathPrefix())) + .filter(w -> !liveWorkerIds.contains(w.getIdentity())) + .map(w -> new WorkerInfo() + .setIdentity(w.getIdentity()) + .setAddress(w.getWorkerNetAddress())) + ::iterator; + return new WorkerClusterView(failedWorkerIterable); + } + + private Stream parseWorkersFromEtcdKvPairs(List workerKvs) { + return workerKvs + .stream() + .map(this::parseWorkerServiceEntity) + .filter(Optional::isPresent) + .map(Optional::get); + } + + private Optional parseWorkerServiceEntity(KeyValue etcdKvPair) { + try { + WorkerServiceEntity entity = new WorkerServiceEntity(); + entity.deserialize(etcdKvPair.getValue().getBytes()); + return Optional.of(entity); + } catch (JsonParseException ex) { + return Optional.empty(); } - - /** - * @implSpec implementations should provide a stream of workers that's contained in the - * view they want to represent. - */ - protected abstract Stream getWorkersInView(); } @Override @VisibleForTesting public String showAllMembers() { try { - WorkerClusterView registeredWorkers = getAllMembers().snapshot(); - WorkerClusterView liveWorkers = getLiveMembers().snapshot(); + WorkerClusterView registeredWorkers = getAllMembers(); + WorkerClusterView liveWorkers = getLiveMembers(); String printFormat = "%s\t%s\t%s%n"; StringBuilder sb = new StringBuilder( String.format(printFormat, "WorkerId", "Address", "Status")); diff --git a/dora/core/common/src/main/java/alluxio/membership/MasterMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/MasterMembershipManager.java index 8039a0e9a752..afb78e32fe59 100644 --- a/dora/core/common/src/main/java/alluxio/membership/MasterMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/MasterMembershipManager.java @@ -16,6 +16,7 @@ import org.apache.commons.lang3.StringUtils; import java.io.IOException; +import java.util.Collections; /** * A bypass no-op membership manager to disable MembershipManager module @@ -38,17 +39,17 @@ public void join(WorkerInfo worker) throws IOException { @Override public WorkerClusterView getAllMembers() throws IOException { - return WorkerClusterView.ofWorkers(); + return new WorkerClusterView(Collections.emptyList()); } @Override public WorkerClusterView getLiveMembers() throws IOException { - return WorkerClusterView.ofWorkers(); + return new WorkerClusterView(Collections.emptyList()); } @Override public WorkerClusterView getFailedMembers() throws IOException { - return WorkerClusterView.ofWorkers(); + return new WorkerClusterView(Collections.emptyList()); } @Override diff --git a/dora/core/common/src/main/java/alluxio/membership/StaticMembershipManager.java b/dora/core/common/src/main/java/alluxio/membership/StaticMembershipManager.java index f3485006b53d..c0b86e0421f3 100644 --- a/dora/core/common/src/main/java/alluxio/membership/StaticMembershipManager.java +++ b/dora/core/common/src/main/java/alluxio/membership/StaticMembershipManager.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.UUID; @@ -130,18 +131,18 @@ public void join(WorkerInfo worker) throws IOException { @Override public WorkerClusterView getAllMembers() throws IOException { - return WorkerClusterView.ofWorkers(mMembers); + return new WorkerClusterView(mMembers); } @Override public WorkerClusterView getLiveMembers() throws IOException { // all workers are considered by the static membership manager to be always live - return WorkerClusterView.ofWorkers(mMembers); + return new WorkerClusterView(mMembers); } @Override public WorkerClusterView getFailedMembers() throws IOException { - return WorkerClusterView.ofWorkers(); + return new WorkerClusterView(Collections.emptyList()); } @Override @@ -150,7 +151,7 @@ public String showAllMembers() { StringBuilder sb = new StringBuilder( String.format(printFormat, "WorkerId", "Address", "Status")); try { - for (WorkerInfo worker : getAllMembers().snapshot()) { + for (WorkerInfo worker : getAllMembers()) { String entryLine = String.format(printFormat, HashUtils.hashAsStringMD5(worker.getAddress().dumpMainInfo()), worker.getAddress().getHost() + ":" + worker.getAddress().getRpcPort(), diff --git a/dora/core/common/src/main/java/alluxio/membership/WorkerClusterSnapshot.java b/dora/core/common/src/main/java/alluxio/membership/WorkerClusterSnapshot.java deleted file mode 100644 index 35d53500d1ce..000000000000 --- a/dora/core/common/src/main/java/alluxio/membership/WorkerClusterSnapshot.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 - * (the "License"). You may not use this work except in compliance with the License, which is - * available at www.apache.org/licenses/LICENSE-2.0 - * - * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied, as more fully set forth in the License. - * - * See the NOTICE file distributed with this work for information regarding copyright ownership. - */ - -package alluxio.membership; - -import alluxio.collections.IndexDefinition; -import alluxio.collections.IndexedSet; -import alluxio.wire.WorkerIdentity; -import alluxio.wire.WorkerInfo; - -import com.google.common.collect.Iterators; - -import java.time.Instant; -import java.util.Iterator; -import java.util.Objects; -import java.util.Optional; -import javax.annotation.concurrent.Immutable; - -/** - * Snapshot for a cluster view. - */ -@Immutable -public final class WorkerClusterSnapshot implements WorkerClusterView { - private final Instant mInstantCreated; - private final IndexedSet mWorkers; - - private static final IndexDefinition INDEX_WORKER_ID = - IndexDefinition.ofUnique(WorkerInfo::getIdentity); - - WorkerClusterSnapshot(Iterable workers) { - mWorkers = new IndexedSet<>(INDEX_WORKER_ID); - for (WorkerInfo workerInfo : workers) { - mWorkers.add(workerInfo); - } - // Note Instant.now() uses the system clock and is NOT monotonic - // which is fine because we want to invalidate stale snapshots based on wall clock time - mInstantCreated = Instant.now(); - } - - @Override - public Optional getWorkerById(WorkerIdentity workerIdentity) { - return Optional.ofNullable( - mWorkers.getFirstByField(INDEX_WORKER_ID, workerIdentity)); - } - - @Override - public Iterator iterator() { - return Iterators.unmodifiableIterator(mWorkers.iterator()); - } - - @Override - public WorkerClusterSnapshot snapshot() { - return this; - } - - @Override - public int size() { - return mWorkers.size(); - } - - @Override - public boolean isEmpty() { - return mWorkers.isEmpty(); - } - - /** - * @return the time when this snapshot was created. - */ - public Instant getSnapshotTime() { - return mInstantCreated; - } - - /** - * Note that the equals implementation considers the creation timestamp to be part of the - * snapshot's identity. To compare two snapshots only by the contained workers, - * use {@link com.google.common.collect.Iterables#elementsEqual(Iterable, Iterable)}. - */ - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - WorkerClusterSnapshot that = (WorkerClusterSnapshot) o; - return Objects.equals(mInstantCreated, that.mInstantCreated) - && Objects.equals(mWorkers, that.mWorkers); - } - - @Override - public int hashCode() { - return Objects.hash(mInstantCreated, mWorkers); - } -} diff --git a/dora/core/common/src/main/java/alluxio/membership/WorkerClusterView.java b/dora/core/common/src/main/java/alluxio/membership/WorkerClusterView.java index 4fa27f397284..90f2b632c4dc 100644 --- a/dora/core/common/src/main/java/alluxio/membership/WorkerClusterView.java +++ b/dora/core/common/src/main/java/alluxio/membership/WorkerClusterView.java @@ -11,65 +11,106 @@ package alluxio.membership; +import alluxio.collections.IndexDefinition; +import alluxio.collections.IndexedSet; import alluxio.wire.WorkerIdentity; import alluxio.wire.WorkerInfo; -import java.util.Arrays; +import com.google.common.collect.Iterators; + +import java.time.Instant; +import java.util.Iterator; +import java.util.Objects; import java.util.Optional; +import java.util.stream.Stream; +import javax.annotation.concurrent.Immutable; /** - * Cluster view. A view may be live or a snapshot. + * Snapshot for a cluster view. */ -public interface WorkerClusterView extends Iterable { +@Immutable +public final class WorkerClusterView implements Iterable { + private final Instant mInstantCreated; + private final IndexedSet mWorkers; + + private static final IndexDefinition INDEX_WORKER_ID = + IndexDefinition.ofUnique(WorkerInfo::getIdentity); + /** - * Gets the information about a worker from this cluster view given its identity. + * Creates a cluster view with the give workers. * - * @param workerIdentity the worker's ID to query - * @return worker info or none if the cluster view does not contain the specified worker. + * @param workers workers in this view */ - Optional getWorkerById(WorkerIdentity workerIdentity); + public WorkerClusterView(Iterable workers) { + mWorkers = new IndexedSet<>(INDEX_WORKER_ID); + for (WorkerInfo workerInfo : workers) { + mWorkers.add(workerInfo); + } + // Note Instant.now() uses the system clock and is NOT monotonic + // which is fine because we want to invalidate stale snapshots based on wall clock time + mInstantCreated = Instant.now(); + } + + public Optional getWorkerById(WorkerIdentity workerIdentity) { + return Optional.ofNullable( + mWorkers.getFirstByField(INDEX_WORKER_ID, workerIdentity)); + } + + @Override + public Iterator iterator() { + return Iterators.unmodifiableIterator(mWorkers.iterator()); + } /** - * Gets the number of workers contained in this view. + * Converts to a stream of {@link WorkerInfo}. * - * @return number of workers + * @return stream of workers */ - int size(); + public Stream stream() { + return mWorkers.stream(); + } /** - * Checks if the view contain no workers. - * - * @return if the view is empty - * @implSpec this should return {@code true} iff {@code size() == 0}. + * @return number of workers contained in the cluster view */ - boolean isEmpty(); + public int size() { + return mWorkers.size(); + } /** - * Creates a snapshot of a live view. - * - * @return snapshot + * @return if the cluster contains no worker */ - default WorkerClusterSnapshot snapshot() { - return new WorkerClusterSnapshot(this); + public boolean isEmpty() { + return mWorkers.isEmpty(); } /** - * Creates a static view of the given workers. - * - * @param workers worker in the cluster - * @return a view of the given workers + * @return the time when this snapshot was created. */ - static WorkerClusterView ofWorkers(WorkerInfo... workers) { - return new WorkerClusterSnapshot(Arrays.stream(workers)::iterator); + public Instant getSnapshotTime() { + return mInstantCreated; } /** - * Creates a static view of the given workers. - * - * @param workers worker in the cluster - * @return a view of the given workers + * Note that the equals implementation considers the creation timestamp to be part of the + * snapshot's identity. To compare two snapshots only by the contained workers, + * use {@link com.google.common.collect.Iterables#elementsEqual(Iterable, Iterable)}. */ - static WorkerClusterView ofWorkers(Iterable workers) { - return new WorkerClusterSnapshot(workers); + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerClusterView that = (WorkerClusterView) o; + return Objects.equals(mInstantCreated, that.mInstantCreated) + && Objects.equals(mWorkers, that.mWorkers); + } + + @Override + public int hashCode() { + return Objects.hash(mInstantCreated, mWorkers); } } diff --git a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java index cdfdaf4f61d1..8dcf347fd97b 100644 --- a/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java +++ b/dora/tests/testcontainers/src/test/java/alluxio/membership/MembershipManagerTest.java @@ -20,7 +20,6 @@ import alluxio.wire.WorkerNetAddress; import com.google.common.collect.Lists; -import com.google.common.collect.Streams; import eu.rekawek.toxiproxy.model.ToxicDirection; import io.etcd.jetcd.Auth; import io.etcd.jetcd.ByteSequence; @@ -217,7 +216,7 @@ public void testEtcdMembership(MembershipManager membershipManager) throws Excep wkrs.add(wkr1); wkrs.add(wkr2); wkrs.add(wkr3); - List allMembers = Streams.stream(membershipManager.getAllMembers().snapshot()) + List allMembers = membershipManager.getAllMembers().stream() .sorted(Comparator.comparing(w -> w.getAddress().getHost())) .collect(Collectors.toList()); Assert.assertEquals(wkrs, allMembers); @@ -236,9 +235,8 @@ public void testEtcdMembership(MembershipManager membershipManager) throws Excep List expectedFailedList = new ArrayList<>(); expectedFailedList.add(wkr2); Assert.assertEquals(expectedFailedList, - Lists.newArrayList(membershipManager.getFailedMembers().snapshot())); - List actualLiveMembers = Streams.stream(membershipManager - .getLiveMembers().snapshot()) + Lists.newArrayList(membershipManager.getFailedMembers())); + List actualLiveMembers = membershipManager.getLiveMembers().stream() .sorted(Comparator.comparing(w -> w.getAddress().getHost())) .collect(Collectors.toList()); List expectedLiveMembers = new ArrayList<>(); @@ -345,7 +343,7 @@ public void testStaticMembership() throws Exception { wkrHosts.add(wkr2.getAddress().getHost()); wkrHosts.add(wkr3.getAddress().getHost()); // As for static membership mgr, only hostnames are provided in the static file - List allMemberHosts = Streams.stream(membershipManager.getAllMembers().snapshot()) + List allMemberHosts = membershipManager.getAllMembers().stream() .map(w -> w.getAddress().getHost()) .sorted() .collect(Collectors.toList());