From b9de24c44996152e42cbdb7a629afe506b865021 Mon Sep 17 00:00:00 2001 From: Zihao Zhao Date: Wed, 10 Jan 2024 16:27:30 +0800 Subject: [PATCH] Support Various Kinds of Consistent Hash ### What changes are proposed in this pull request? Add Ketama Hashing, Jump Consistent Hashing, Maglev Hashing, and Multi Probe Hashing. ### Why are the changes needed? Now alluxio's user worker selection policy is Consistent Hash Policy. It bings too much time cost, and it is not enough uniform, and not strictly consistent. Ketama: https://github.com/RJ/ketama Jump Consistent Hashing: https://arxiv.org/pdf/1406.2294.pdf Maglev Hashing: https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/44824.pdf Multi Probe Hasing: https://arxiv.org/pdf/1505.00062.pdf We strongly recommend using Maglev Hashing for User Worker Selection Policy. Under most situation, it has the minimum time cost, and it is the most uniform and balanced hashing policy. ### Does this PR introduce any user facing changes? `alluxio.user.worker.selection.policy` has the following values: `CONSISTENT`, `JUMP`, `KETAMA`, `MAGLEV`, `MULTI_PROBE`, `LOCAL`, `REMOTE_ONLY`, corresponding to consistent hash policy, maglev hash policy, ketama hash policy, maglev hash policy, multi-probe respectively hash policy, local worker policy, remote only policy. The current default value is `CONSISTENT`. We recommend using Maglev Hash, which has the best hash consistency and is the least time-consuming. That is to say, set the value of `alluxio.user.worker.selection.policy` to `MAGLEV`. We will also consider setting this as the default value in the future. **Ketama Hasing** `alluxio.user.ketama.hash.replicas`: This is the value of replicas in the ketama hashing algorithm. When workers changes, it will guarantee the hash table is changed only in a minimal. The value of replicas should be X times the physical nodes in the cluster, where X is a balance between efficiency and cost. **Jump Consistent Hashing** None. **Maglev Hashing** `alluxio.user.maglev.hash.lookup.size`: This is the size of the lookup table in the maglev hashing algorithm. It must be a prime number. In the maglev hashing, it will generate a lookup table for workers. The bigger the size of the lookup table, the smaller the variance of this hashing algorithm will be. But bigger look up table will consume more time and memory. **Multi Probe Hashing** `alluxio.user.multi.probe.hash.probe.num`: This is the number of probes in the multi-probe hashing algorithm. In the multi-probe hashing algorithm, the bigger the number of probes, the smaller the variance of this hashing algorithm will be. But more probes will consume more time and memory. pr-link: Alluxio/alluxio#17817 change-id: cid-bad21c6e5ad83eb3da15a8960ba372b14c67b081 --- .../file/dora/ConsistentHashPolicy.java | 19 +- .../file/dora/ConsistentHashProvider.java | 25 +- .../client/file/dora/JumpHashPolicy.java | 102 +++++ .../client/file/dora/JumpHashProvider.java | 253 ++++++++++++ .../client/file/dora/KetamaHashPolicy.java | 107 +++++ .../client/file/dora/KetamaHashProvider.java | 243 ++++++++++++ .../client/file/dora/MaglevHashPolicy.java | 124 ++++++ .../client/file/dora/MaglevHashProvider.java | 370 ++++++++++++++++++ .../file/dora/MultiProbeHashPolicy.java | 108 +++++ .../file/dora/MultiProbeHashProvider.java | 337 ++++++++++++++++ .../file/dora/WorkerLocationPolicy.java | 14 +- .../file/dora/WorkerLocationPolicyEnum.java | 38 ++ .../file/dora/ConsistentHashPolicyTest.java | 2 +- .../file/dora/ConsistentHashProviderTest.java | 29 +- .../client/file/dora/JumpHashPolicyTest.java | 170 ++++++++ .../file/dora/KetamaHashPolicyTest.java | 169 ++++++++ .../file/dora/LocalWorkerPolicyTest.java | 2 +- .../file/dora/MaglevtHashPolicyTest.java | 169 ++++++++ .../file/dora/MultiProbeHashPolicyTest.java | 169 ++++++++ .../main/java/alluxio/conf/PropertyKey.java | 41 +- .../main/java/alluxio/cli/CheckCluster.java | 2 +- .../stress/worker/WorkerBenchMode.java | 6 +- .../stress/worker/WorkerBenchParameters.java | 2 +- .../stress/cli/worker/StressWorkerBench.java | 24 +- 24 files changed, 2480 insertions(+), 45 deletions(-) create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashPolicy.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashProvider.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashPolicy.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashProvider.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashPolicy.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashProvider.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashPolicy.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashProvider.java create mode 100644 dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicyEnum.java create mode 100644 dora/core/client/fs/src/test/java/alluxio/client/file/dora/JumpHashPolicyTest.java create mode 100644 dora/core/client/fs/src/test/java/alluxio/client/file/dora/KetamaHashPolicyTest.java create mode 100644 dora/core/client/fs/src/test/java/alluxio/client/file/dora/MaglevtHashPolicyTest.java create mode 100644 dora/core/client/fs/src/test/java/alluxio/client/file/dora/MultiProbeHashPolicyTest.java diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashPolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashPolicy.java index cfa3f50b0dc7..6e7bfc7477e2 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashPolicy.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashPolicy.java @@ -39,17 +39,7 @@ */ public class ConsistentHashPolicy implements WorkerLocationPolicy { private static final Logger LOG = LoggerFactory.getLogger(ConsistentHashPolicy.class); - private final ConsistentHashProvider mHashProvider = - new ConsistentHashProvider(100, Constants.SECOND_MS); - /** - * This is the number of virtual nodes in the consistent hashing algorithm. - * In a consistent hashing algorithm, on membership changes, some virtual nodes are - * re-distributed instead of rebuilding the whole hash table. - * This guarantees the hash table is changed only in a minimal. - * In order to achieve that, the number of virtual nodes should be X times the physical nodes - * in the cluster, where X is a balance between redistribution granularity and size. - */ - private final int mNumVirtualNodes; + private final ConsistentHashProvider mHashProvider; /** * Constructs a new {@link ConsistentHashPolicy}. @@ -57,7 +47,10 @@ public class ConsistentHashPolicy implements WorkerLocationPolicy { * @param conf the configuration used by the policy */ public ConsistentHashPolicy(AlluxioConfiguration conf) { - mNumVirtualNodes = conf.getInt(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER); + LOG.debug("%s is chosen for user worker hash algorithm", + conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY)); + mHashProvider = new ConsistentHashProvider(100, Constants.SECOND_MS, + conf.getInt(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER)); } @Override @@ -69,7 +62,7 @@ public List getPreferredWorkers(WorkerClusterView workerCluster workerClusterView.size(), count)); } Set workerIdentities = workerClusterView.workerIds(); - mHashProvider.refresh(workerIdentities, mNumVirtualNodes); + mHashProvider.refresh(workerIdentities); List workers = mHashProvider.getMultiple(fileId, count); if (workers.size() != count) { throw new ResourceExhaustedException(String.format( diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashProvider.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashProvider.java index 933354bf5f1a..50156e37534a 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashProvider.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/ConsistentHashProvider.java @@ -91,15 +91,27 @@ public class ConsistentHashProvider { */ private final Object mInitLock = new Object(); + /** + * This is the number of virtual nodes in the consistent hashing algorithm. + * In a consistent hashing algorithm, on membership changes, some virtual nodes are + * re-distributed instead of rebuilding the whole hash table. + * This guarantees the hash table is changed only in a minimal. + * In order to achieve that, the number of virtual nodes should be X times the physical nodes + * in the cluster, where X is a balance between redistribution granularity and size. + */ + private final int mNumVirtualNodes; + /** * Constructor. * * @param maxAttempts max attempts to rehash * @param workerListTtlMs interval between retries + * @param numVirtualNodes number of virtual nodes */ - public ConsistentHashProvider(int maxAttempts, long workerListTtlMs) { + public ConsistentHashProvider(int maxAttempts, long workerListTtlMs, int numVirtualNodes) { mMaxAttempts = maxAttempts; mWorkerInfoUpdateIntervalNs = workerListTtlMs * Constants.MS_NANO; + mNumVirtualNodes = numVirtualNodes; } /** @@ -130,12 +142,11 @@ public List getMultiple(String key, int count) { * others will not change the internal state of the hash provider. * * @param workers the up-to-date worker list - * @param numVirtualNodes the number of virtual nodes used by consistent hashing */ - public void refresh(Set workers, int numVirtualNodes) { + public void refresh(Set workers) { Preconditions.checkArgument(!workers.isEmpty(), "cannot refresh hash provider with empty worker list"); - maybeInitialize(workers, numVirtualNodes); + maybeInitialize(workers); // check if the worker list has expired if (shouldRebuildActiveNodesMapExclusively()) { // thread safety is valid provided that build() takes less than @@ -144,7 +155,7 @@ public void refresh(Set workers, int numVirtualNodes) { Set lastWorkerIds = mLastWorkers.get(); if (!workers.equals(lastWorkerIds)) { Set newWorkerIds = ImmutableSet.copyOf(workers); - NavigableMap nodes = build(newWorkerIds, numVirtualNodes); + NavigableMap nodes = build(newWorkerIds, mNumVirtualNodes); mActiveNodesByConsistentHashing = nodes; mLastWorkers.set(newWorkerIds); mUpdateCount.increment(); @@ -176,14 +187,14 @@ private boolean shouldRebuildActiveNodesMapExclusively() { * Only one caller gets to initialize the map while all others are blocked. * After the initialization, the map must not be null. */ - private void maybeInitialize(Set workers, int numVirtualNodes) { + private void maybeInitialize(Set workers) { if (mActiveNodesByConsistentHashing == null) { synchronized (mInitLock) { // only one thread should reach here // test again to skip re-initialization if (mActiveNodesByConsistentHashing == null) { Set workerIdentities = ImmutableSet.copyOf(workers); - mActiveNodesByConsistentHashing = build(workerIdentities, numVirtualNodes); + mActiveNodesByConsistentHashing = build(workerIdentities, mNumVirtualNodes); mLastWorkers.set(workerIdentities); mLastUpdatedTimestamp.set(System.nanoTime()); } diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashPolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashPolicy.java new file mode 100644 index 000000000000..7bbe03fb9102 --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashPolicy.java @@ -0,0 +1,102 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import alluxio.Constants; +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentity; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * An impl of Jump Consistent Hash Policy. + * + * A policy where a file path is matched to worker(s) by Jump Consistent Hashing Algorithm. + * The algorithm is described in this paper: + * https://arxiv.org/pdf/1406.2294.pdf + * + * The disadvantage of this algorithm is that + * buckets can only be inserted and deleted at the head and tail of the worker list + * to maintain hash consistency, + * that is, nodes can only be added and deleted at the head and tail of the worker list. + */ +public class JumpHashPolicy implements WorkerLocationPolicy { + private static final Logger LOG = LoggerFactory.getLogger(JumpHashPolicy.class); + private final JumpHashProvider mHashProvider; + + /** + * Constructs a new {@link JumpHashPolicy}. + * + * @param conf the configuration used by the policy + */ + public JumpHashPolicy(AlluxioConfiguration conf) { + LOG.debug("%s is chosen for user worker hash algorithm", + conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY)); + mHashProvider = new JumpHashProvider(100, Constants.SECOND_MS); + } + + @Override + public List getPreferredWorkers(WorkerClusterView workerClusterView, + String fileId, int count) throws ResourceExhaustedException { + if (workerClusterView.size() < count) { + throw new ResourceExhaustedException(String.format( + "Not enough workers in the cluster %d workers in the cluster but %d required", + workerClusterView.size(), count)); + } + Set workerIdentities = workerClusterView.workerIds(); + mHashProvider.refresh(workerIdentities); + List workers = mHashProvider.getMultiple(fileId, count); + if (workers.size() != count) { + throw new ResourceExhaustedException(String.format( + "Found %d workers from the hash ring but %d required", workers.size(), count)); + } + ImmutableList.Builder builder = ImmutableList.builder(); + for (WorkerIdentity worker : workers) { + Optional optionalWorkerInfo = workerClusterView.getWorkerById(worker); + final WorkerInfo workerInfo; + if (optionalWorkerInfo.isPresent()) { + workerInfo = optionalWorkerInfo.get(); + } else { + // the worker returned by the policy does not exist in the cluster view + // supplied by the client. + // this can happen when the membership changes and some callers fail to update + // to the latest worker cluster view. + // in this case, just skip this worker + LOG.debug("Inconsistency between caller's view of cluster and that of " + + "the consistent hash policy's: worker {} selected by policy does not exist in " + + "caller's view {}. Skipping this worker.", + worker, workerClusterView); + continue; + } + + BlockWorkerInfo blockWorkerInfo = new BlockWorkerInfo( + worker, workerInfo.getAddress(), workerInfo.getCapacityBytes(), + workerInfo.getUsedBytes(), workerInfo.getState() == WorkerState.LIVE + ); + builder.add(blockWorkerInfo); + } + List infos = builder.build(); + return infos; + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashProvider.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashProvider.java new file mode 100644 index 000000000000..92b135a58f35 --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/JumpHashProvider.java @@ -0,0 +1,253 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static com.google.common.hash.Hashing.murmur3_32_fixed; +import static java.lang.String.format; +import static java.nio.charset.StandardCharsets.UTF_8; + +import alluxio.Constants; +import alluxio.wire.WorkerIdentity; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashFunction; + +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import javax.annotation.Nullable; + +/** + * An impl of JumpConsistentHash. + */ +public class JumpHashProvider { + private static final HashFunction HASH_FUNCTION = murmur3_32_fixed(); + private final int mMaxAttempts; + private final long mWorkerInfoUpdateIntervalNs; + private static final long UNSIGNED_MASK = 0x7fffffffffffffffL; + + private static final long JUMP = 1L << 31; + + private static final long CONSTANT = Long + .parseUnsignedLong("2862933555777941757"); + + /** + * Timestamp of the last update to {@link #mActiveNodesByConsistentHashing}. + * Must use System.nanoTime to ensure monotonic increment. Otherwise, earlier updates + * may overwrite the latest as the expiry based on TTL cannot be reliably determined. + */ + private final AtomicLong mLastUpdatedTimestamp = new AtomicLong(System.nanoTime()); + /** + * Counter for how many times the map has been updated. + */ + private final LongAdder mUpdateCount = new LongAdder(); + /** + * The worker list which the {@link #mActiveNodesByConsistentHashing} was built from. + * Must be kept in sync with {@link #mActiveNodesByConsistentHashing}. + * Used to compare with incoming worker list to skip the heavy build process if the worker + * list has not changed. + */ + private final AtomicReference> mLastWorkers = + new AtomicReference<>(ImmutableSet.of()); + /** + * Requirements for interacting with this map: + * 1. This bucket list is lazy initialized (cannot init in the constructor). + * Multiple threads may try to enter the init section, and it should be only initialized once. + * 2. This bucket list is timestamped. After the TTL expires, we need to compare the worker + * list with the current available workers and possibly rebuild the bucket list. + * 3. While the bucket list is being updated, readers should see a stale bucket list + * without blocking. + * + * Thread safety guarantees: + * 1. At lazy-init time, mutual exclusion is provided by `synchronized(mInitLock)` + * and double-checking. At this stage it is guarded by `mInitLock`. + * 2. After init, updating the bucket list is guarded by an optimistic lock using CAS(timestamp). + * There will be no blocking but a read may see a stale bucket list. + * At this stage it is guarded by `mLastUpdatedTimestamp`. + */ + @Nullable + private volatile HashMap mActiveNodesByConsistentHashing; + /** + * Lock to protect the lazy initialization of {@link #mActiveNodesByConsistentHashing}. + */ + private final Object mInitLock = new Object(); + + /** + * @param maxAttempts the max attempts + * @param workerListTtlMs interval between retries + */ + public JumpHashProvider(int maxAttempts, long workerListTtlMs) { + mMaxAttempts = maxAttempts; + mWorkerInfoUpdateIntervalNs = workerListTtlMs * Constants.MS_NANO; + } + + /** + * Finds multiple workers from the buckets. + * + * @param key the key to use for hashing + * @param count the expected number of workers + * @return a list of workers following the worker list + */ + public List getMultiple(String key, int count) { + Set workers = new LinkedHashSet<>(); + int attempts = 0; + while (workers.size() < count && attempts < mMaxAttempts) { + attempts++; + WorkerIdentity selectedWorker = get(key, attempts); + workers.add(selectedWorker); + } + return ImmutableList.copyOf(workers); + } + + /** + * Initializes or refreshes the worker list using the given list of workers. + *
+ * Thread safety: + * If called concurrently by two or more threads, only one of the callers will actually + * update the state of the hash provider using the worker list provided by that thread, and all + * others will not change the internal state of the hash provider. + * @param workers the up-to-date worker list + */ + public void refresh(Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), + "cannot refresh hash provider with empty worker list"); + maybeInitialize(workers); + // check if the worker list has expired + if (shouldRebuildActiveNodesMapExclusively()) { + // thread safety is valid provided that build() takes less than + // WORKER_INFO_UPDATE_INTERVAL_NS, so that before next update the current update has been + // finished + Set lastWorkerIds = mLastWorkers.get(); + if (!workers.equals(lastWorkerIds)) { + mActiveNodesByConsistentHashing = build(workers); + mLastWorkers.set(workers); + mUpdateCount.increment(); + } + } + // otherwise, do nothing and proceed with stale worker list. on next access, the worker list + // will have been updated by another thread + } + + /** + * Check whether the current map has expired and needs update. + * If called by multiple threads concurrently, only one of the callers will get a return value + * of true, so that the map will be updated only once. The other threads will not try to + * update and use stale information instead. + */ + private boolean shouldRebuildActiveNodesMapExclusively() { + // check if the worker list has expired + long lastUpdateTs = mLastUpdatedTimestamp.get(); + long currentTs = System.nanoTime(); + if (currentTs - lastUpdateTs > mWorkerInfoUpdateIntervalNs) { + // use CAS to only allow one thread to actually update the timestamp + return mLastUpdatedTimestamp.compareAndSet(lastUpdateTs, currentTs); + } + return false; + } + + /** + * Lazily initializes the bucket list. + * Only one caller gets to initialize the map while all others are blocked. + * After the initialization, the map must not be null. + */ + private void maybeInitialize(Set workers) { + if (mActiveNodesByConsistentHashing == null) { + synchronized (mInitLock) { + // only one thread should reach here + // test again to skip re-initialization + if (mActiveNodesByConsistentHashing == null) { + mActiveNodesByConsistentHashing = build(workers); + mLastWorkers.set(workers); + mLastUpdatedTimestamp.set(System.nanoTime()); + } + } + } + } + + WorkerIdentity get(String key, int index) { + HashMap hashMap = mActiveNodesByConsistentHashing; + Preconditions.checkState(hashMap != null, "Hash provider is not properly initialized"); + return get(hashMap, key, index); + } + + @VisibleForTesting + static WorkerIdentity get(HashMap hashMap, String key, int index) { + int hashKey = HASH_FUNCTION.hashString(format("%s%d", key, index), UTF_8).asInt(); + int workerId = jumpConsistentHash(hashKey, hashMap.size()); + return hashMap.get(workerId); + } + + /** + * Accepts "a 64-bit key and the number of buckets. It outputs a number in + * the range [0, buckets]." + * + * @param key + * key to store + * @param buckets + * number of available buckets + * @return the hash of the key + */ + private static int jumpConsistentHash(final int key, final int buckets) { + long hashValue = -1; + long k = key; + long j = 0; + while (j < buckets) { + hashValue = j; + k = k * CONSTANT + 1L; + j = (long) ((hashValue + 1L) * (JUMP / toDouble((k >>> 33) + 1L))); + } + return (int) hashValue; + } + + private static double toDouble(final long n) { + double d = n & UNSIGNED_MASK; + if (n < 0) { + d += 0x1.0p63; + } + return d; + } + + @VisibleForTesting + Set getLastWorkers() { + return mLastWorkers.get(); + } + + @VisibleForTesting + HashMap getActiveNodesMap() { + return mActiveNodesByConsistentHashing; + } + + @VisibleForTesting + long getUpdateCount() { + return mUpdateCount.sum(); + } + + @VisibleForTesting + static HashMap build( + Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), "worker list is empty"); + HashMap activeNodesByJumpConsistentHashing = new HashMap<>(); + int workerIndex = 0; + for (WorkerIdentity worker : workers) { + activeNodesByJumpConsistentHashing.put(workerIndex, worker); + workerIndex++; + } + return activeNodesByJumpConsistentHashing; + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashPolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashPolicy.java new file mode 100644 index 000000000000..d0ed5e8ce5e6 --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashPolicy.java @@ -0,0 +1,107 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import alluxio.Constants; +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentity; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * An impl of Ketama Hash Policy. + * + * A policy where a file path is matched to worker(s) by a consistenct hashing algorithm. + * The hash algorithm makes sure the same path maps to the same worker sequence. + * On top of that, consistent hashing makes sure worker membership changes incur minimal + * hash changes. + * + * Relevant article: + * https://www.metabrew.com/article/libketama-consistent-hashing-algo-memcached-clients + * + * The difference with ConsistentHashPolicy is that whenever the Worker changes, + * ConsistentHashPolicy will re-establish the hash ring and re-establish virtual nodes, + * while Ketama directly modifies the hash ring, adds and deletes virtual nodes. + * + */ +public class KetamaHashPolicy implements WorkerLocationPolicy { + private static final Logger LOG = LoggerFactory.getLogger(KetamaHashPolicy.class); + private final KetamaHashProvider mHashProvider; + + /** + * Constructs a new {@link KetamaHashPolicy}. + * + * @param conf the configuration used by the policy + */ + public KetamaHashPolicy(AlluxioConfiguration conf) { + LOG.debug("%s is chosen for user worker hash algorithm", + conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY)); + mHashProvider = new KetamaHashProvider(100, Constants.SECOND_MS, + conf.getInt(PropertyKey.USER_KETAMA_HASH_REPLICAS)); + } + + @Override + public List getPreferredWorkers(WorkerClusterView workerClusterView, + String fileId, int count) throws ResourceExhaustedException { + if (workerClusterView.size() < count) { + throw new ResourceExhaustedException(String.format( + "Not enough workers in the cluster %d workers in the cluster but %d required", + workerClusterView.size(), count)); + } + Set workerIdentities = workerClusterView.workerIds(); + mHashProvider.refresh(workerIdentities); + List workers = mHashProvider.getMultiple(fileId, count); + if (workers.size() != count) { + throw new ResourceExhaustedException(String.format( + "Found %d workers from the hash ring but %d required", workers.size(), count)); + } + ImmutableList.Builder builder = ImmutableList.builder(); + for (WorkerIdentity worker : workers) { + Optional optionalWorkerInfo = workerClusterView.getWorkerById(worker); + final WorkerInfo workerInfo; + if (optionalWorkerInfo.isPresent()) { + workerInfo = optionalWorkerInfo.get(); + } else { + // the worker returned by the policy does not exist in the cluster view + // supplied by the client. + // this can happen when the membership changes and some callers fail to update + // to the latest worker cluster view. + // in this case, just skip this worker + LOG.debug("Inconsistency between caller's view of cluster and that of " + + "the consistent hash policy's: worker {} selected by policy does not exist in " + + "caller's view {}. Skipping this worker.", + worker, workerClusterView); + continue; + } + + BlockWorkerInfo blockWorkerInfo = new BlockWorkerInfo( + worker, workerInfo.getAddress(), workerInfo.getCapacityBytes(), + workerInfo.getUsedBytes(), workerInfo.getState() == WorkerState.LIVE + ); + builder.add(blockWorkerInfo); + } + List infos = builder.build(); + return infos; + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashProvider.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashProvider.java new file mode 100644 index 000000000000..b3993f09d21b --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/KetamaHashProvider.java @@ -0,0 +1,243 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static com.google.common.hash.Hashing.murmur3_32_fixed; +import static java.nio.charset.StandardCharsets.UTF_8; + +import alluxio.Constants; +import alluxio.wire.WorkerIdentity; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A consistent hashing algorithm implementation. + * + * This implementation is thread safe in lazy init and in refreshing the worker list. + * See inline comments for thread safety guarantees and semantics. + */ +@VisibleForTesting +@ThreadSafe +public class KetamaHashProvider { + private final int mReplicas; + private final int mMaxAttempts; + private final long mWorkerInfoUpdateIntervalNs; + private static final HashFunction HASH_FUNCTION = murmur3_32_fixed(); + + private final AtomicLong mLastUpdatedTimestamp = new AtomicLong(System.nanoTime()); + /** + * Counter for how many times the map has been updated. + */ + private final LongAdder mUpdateCount = new LongAdder(); + + private final AtomicReference> mLastWorkers = + new AtomicReference<>(ImmutableSet.of()); + + @Nullable + private volatile SortedMap mActiveNodes; + /** + * Lock to protect the lazy initialization of {@link #mActiveNodes}. + */ + private final Object mInitLock = new Object(); + + /** + * Constructor. + * + * @param maxAttempts max attempts to rehash + * @param workerListTtlMs interval between retries + * @param replicas the number of replicas of workers + */ + public KetamaHashProvider(int maxAttempts, long workerListTtlMs, int replicas) { + mMaxAttempts = maxAttempts; + mWorkerInfoUpdateIntervalNs = workerListTtlMs * Constants.MS_NANO; + mReplicas = replicas; + } + + /** + * Finds multiple workers from the hash ring. + * + * @param key the key to hash on + * @param count the expected number of workers + * @return a list of workers following the hash ring + */ + public List getMultiple(String key, int count) { + Set workers = new LinkedHashSet<>(); + int attempts = 0; + while (workers.size() < count && attempts < mMaxAttempts) { + attempts++; + WorkerIdentity selectedWorker = get(key, attempts); + workers.add(selectedWorker); + } + return ImmutableList.copyOf(workers); + } + + /** + * Initializes or refreshes the worker list using the given list of workers. + * @param workers the up-to-date worker list + */ + public void refresh(Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), + "cannot refresh hash provider with empty worker list"); + maybeInitialize(workers); + // check if the worker list has expired + if (shouldRebuildActiveNodesMapExclusively()) { + // thread safety is valid provided that build() takes less than + // WORKER_INFO_UPDATE_INTERVAL_NS, so that before next update the current update has been + // finished + Set lastWorkerIds = mLastWorkers.get(); + if (!workers.equals(lastWorkerIds)) { + updateActiveNodes(workers, mLastWorkers.get()); + mLastWorkers.set(workers); + mUpdateCount.increment(); + } + } + // otherwise, do nothing and proceed with stale worker list. on next access, the worker list + // will have been updated by another thread + } + + /** + * Check whether the current map has expired and needs update. + * If called by multiple threads concurrently, only one of the callers will get a return value + * of true, so that the map will be updated only once. The other threads will not try to + * update and use stale information instead. + */ + private boolean shouldRebuildActiveNodesMapExclusively() { + // check if the worker list has expired + long lastUpdateTs = mLastUpdatedTimestamp.get(); + long currentTs = System.nanoTime(); + if (currentTs - lastUpdateTs > mWorkerInfoUpdateIntervalNs) { + // use CAS to only allow one thread to actually update the timestamp + return mLastUpdatedTimestamp.compareAndSet(lastUpdateTs, currentTs); + } + return false; + } + + /** + * Lazily initializes the hash ring. + * Only one caller gets to initialize the map while all others are blocked. + * After the initialization, the map must not be null. + */ + private void maybeInitialize(Set workers) { + if (mActiveNodes == null) { + synchronized (mInitLock) { + // only one thread should reach here + // test again to skip re-initialization + if (mActiveNodes == null) { + build(workers); + mLastWorkers.set(workers); + mLastUpdatedTimestamp.set(System.nanoTime()); + } + } + } + } + + /** + * Update the active nodes. + * @param workers + * @param lastWorkers + */ + private void updateActiveNodes(Set workers, + Set lastWorkers) { + HashSet workerSet = new HashSet<>(workers); + HashSet lastWorkerSet = new HashSet<>(lastWorkers); + // remove the workers that are no longer active + for (WorkerIdentity worker : lastWorkerSet) { + if (!workerSet.contains(worker)) { + remove(worker); + } + } + // add the new workers + for (WorkerIdentity worker : workerSet) { + if (!lastWorkerSet.contains(worker)) { + add(worker); + } + } + } + + @VisibleForTesting + WorkerIdentity get(String key, int index) { + Preconditions.checkState(mActiveNodes != null, "Hash provider is not properly initialized"); + if (mActiveNodes.isEmpty()) { + return null; + } + int hash = hash(String.format("%s%d", key, index)); + if (!mActiveNodes.containsKey(hash)) { + SortedMap tailMap = mActiveNodes.tailMap(hash); + hash = tailMap.isEmpty() ? mActiveNodes.firstKey() : tailMap.firstKey(); + } + return mActiveNodes.get(hash); + } + + @VisibleForTesting + Set getLastWorkers() { + return mLastWorkers.get(); + } + + @VisibleForTesting + SortedMap getActiveNodesMap() { + return mActiveNodes; + } + + @VisibleForTesting + long getUpdateCount() { + return mUpdateCount.sum(); + } + + @VisibleForTesting + private void build( + Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), "worker list is empty"); + mActiveNodes = new TreeMap<>(); + for (WorkerIdentity worker : workers) { + add(worker); + } + } + + private void add(WorkerIdentity node) { + Preconditions.checkState(mActiveNodes != null, "Hash provider is not properly initialized"); + final HashCode hashCode = HASH_FUNCTION.newHasher() + .putObject(node, WorkerIdentity.HashFunnel.INSTANCE).hash(); + for (int i = 0; i < mReplicas; i++) { + mActiveNodes.put(hashCode.asInt() + i, node); + } + } + + private void remove(WorkerIdentity node) { + Preconditions.checkState(mActiveNodes != null, "Hash provider is not properly initialized"); + final HashCode hashCode = HASH_FUNCTION.newHasher() + .putObject(node, WorkerIdentity.HashFunnel.INSTANCE).hash(); + for (int i = 0; i < mReplicas; i++) { + mActiveNodes.remove(hashCode.asInt() + i); + } + } + + private int hash(String key) { + return HASH_FUNCTION.hashString(key, UTF_8).asInt(); + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashPolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashPolicy.java new file mode 100644 index 000000000000..981d2559f625 --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashPolicy.java @@ -0,0 +1,124 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import alluxio.Constants; +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentity; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * An impl of Maglev Hash policy. + * + * A policy where a file path is matched to worker(s) by Jump Consistent Hashing Algorithm. + * The algorithm is described in this paper: + * https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/44824.pdf + * + * One thing to note about Maglev hashing is that alluxio.user.maglev.hash.lookup.size + * needs to be set to a prime number. + * The bigger the size of the lookup table, + * the smaller the variance of this hashing algorithm will be. + * But bigger look up table will consume more time and memory. + * + * We strongly recommend using Maglev Hashing for User Worker Selection Policy. + * Under most situation, it has the minimum time cost, + * and it is the most uniform and balanced hashing policy. + * + */ +public class MaglevHashPolicy implements WorkerLocationPolicy { + private static final Logger LOG = LoggerFactory.getLogger(MaglevHashPolicy.class); + private final MaglevHashProvider mHashProvider; + + /** + * Constructor. + * @param conf Alluxio Configuration + */ + public MaglevHashPolicy(AlluxioConfiguration conf) { + LOG.debug("%s is chosen for user worker hash algorithm", + conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY)); + int lookupSize = conf.getInt(PropertyKey.USER_MAGLEV_HASH_LOOKUP_SIZE); + mHashProvider = new MaglevHashProvider(100, Constants.SECOND_MS, lookupSize); + if (!isPrime(lookupSize)) { + System.out.println("The number of alluxio.user.maglev.hash.lookup.size " + + "must be a prime number!"); + } + } + + @Override + public List getPreferredWorkers(WorkerClusterView workerClusterView, + String fileId, int count) throws ResourceExhaustedException { + if (workerClusterView.size() < count) { + throw new ResourceExhaustedException(String.format( + "Not enough workers in the cluster %d workers in the cluster but %d required", + workerClusterView.size(), count)); + } + Set workerIdentities = workerClusterView.workerIds(); + mHashProvider.refresh(workerIdentities); + List workers = mHashProvider.getMultiple(fileId, count); + if (workers.size() != count) { + throw new ResourceExhaustedException(String.format( + "Found %d workers from the hash ring but %d required", workers.size(), count)); + } + ImmutableList.Builder builder = ImmutableList.builder(); + for (WorkerIdentity worker : workers) { + Optional optionalWorkerInfo = workerClusterView.getWorkerById(worker); + final WorkerInfo workerInfo; + if (optionalWorkerInfo.isPresent()) { + workerInfo = optionalWorkerInfo.get(); + } else { + // the worker returned by the policy does not exist in the cluster view + // supplied by the client. + // this can happen when the membership changes and some callers fail to update + // to the latest worker cluster view. + // in this case, just skip this worker + LOG.debug("Inconsistency between caller's view of cluster and that of " + + "the consistent hash policy's: worker {} selected by policy does not exist in " + + "caller's view {}. Skipping this worker.", + worker, workerClusterView); + continue; + } + + BlockWorkerInfo blockWorkerInfo = new BlockWorkerInfo( + worker, workerInfo.getAddress(), workerInfo.getCapacityBytes(), + workerInfo.getUsedBytes(), workerInfo.getState() == WorkerState.LIVE + ); + builder.add(blockWorkerInfo); + } + List infos = builder.build(); + return infos; + } + + private boolean isPrime(int n) { + if (n <= 1) { + return false; + } + for (int i = 2; i * i <= n; ++i) { + if (n % i == 0) { + return false; + } + } + return true; + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashProvider.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashProvider.java new file mode 100644 index 000000000000..ac3c226878dd --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MaglevHashProvider.java @@ -0,0 +1,370 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static com.google.common.hash.Hashing.murmur3_32_fixed; +import static java.nio.charset.StandardCharsets.UTF_8; + +import alluxio.Constants; +import alluxio.wire.WorkerIdentity; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; +import org.apache.curator.shaded.com.google.common.hash.Hashing; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A consistent hashing algorithm implementation. + * + * This implementation is thread safe in lazy init and in refreshing the worker list. + * See inline comments for thread safety guarantees and semantics. + */ +@VisibleForTesting +@ThreadSafe +public class MaglevHashProvider { + private final int mMaxAttempts; + private final long mWorkerInfoUpdateIntervalNs; + private static final HashFunction HASH_FUNCTION = murmur3_32_fixed(); + + /** + * Must use System.nanoTime to ensure monotonic increment. Otherwise, earlier updates + * may overwrite the latest as the expiry based on TTL cannot be reliably determined. + */ + private final AtomicLong mLastUpdatedTimestamp = new AtomicLong(System.nanoTime()); + + /** + * Counter for how many times the map has been updated. + */ + private final LongAdder mUpdateCount = new LongAdder(); + + private final AtomicReference> mLastWorkers = + new AtomicReference<>(ImmutableSet.of()); + + /** + * Lock to protect the lazy initialization of {@link #mLookup}. + */ + private final Object mInitLock = new Object(); + + /** Seed used to compute the lookup index. */ + private static final int INDEX_SEED = 0xDEADBEEF; + + /** + * The lookup table size should be a prime number and should be much bigger + * than the number of nodes (lookupSize >> maxNodes). + */ + private final int mLookupSize; + + /** + * The lookup table. + */ + private WorkerIdentity[] mLookup; + + /** Maps each backend to the related permutation. */ + private Map mPermutations; + + /** + * Constructor. + * + * @param maxAttempts max attempts to rehash + * @param workerListTtlMs interval between retries + * @param lookupSize the size of the lookup table + */ + public MaglevHashProvider(int maxAttempts, long workerListTtlMs, int lookupSize) { + mMaxAttempts = maxAttempts; + mWorkerInfoUpdateIntervalNs = workerListTtlMs * Constants.MS_NANO; + mLookupSize = lookupSize; + mPermutations = new HashMap<>(); + } + + /** + * Finds multiple workers from the lookup table. + * + * @param key the key to hash on + * @param count the expected number of workers + * @return a list of workers to be mapped + */ + public List getMultiple(String key, int count) { + Set workers = new LinkedHashSet<>(); + int attempts = 0; + while (workers.size() < count && attempts < mMaxAttempts) { + attempts++; + WorkerIdentity selectedWorker = get(key, attempts); + workers.add(selectedWorker); + } + return ImmutableList.copyOf(workers); + } + + /** + * Initializes or refreshes the worker list using the given list of workers. + * @param workers the up-to-date worker list + */ + public void refresh(Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), + "cannot refresh hash provider with empty worker list"); + maybeInitialize(workers); + // check if the worker list has expired + if (shouldRebuildActiveNodesMapExclusively()) { + // thread safety is valid provided that build() takes less than + // WORKER_INFO_UPDATE_INTERVAL_NS, so that before next update the current update has been + // finished + Set lastWorkerIds = mLastWorkers.get(); + if (!workers.equals(lastWorkerIds)) { + updateActiveNodes(workers, mLastWorkers.get()); + mLastWorkers.set(workers); + mUpdateCount.increment(); + } + } + // otherwise, do nothing and proceed with stale worker list. on next access, the worker list + // will have been updated by another thread + } + + /** + * Check whether the current map has expired and needs update. + * If called by multiple threads concurrently, only one of the callers will get a return value + * of true, so that the map will be updated only once. The other threads will not try to + * update and use stale information instead. + */ + private boolean shouldRebuildActiveNodesMapExclusively() { + // check if the worker list has expired + long lastUpdateTs = mLastUpdatedTimestamp.get(); + long currentTs = System.nanoTime(); + if (currentTs - lastUpdateTs > mWorkerInfoUpdateIntervalNs) { + // use CAS to only allow one thread to actually update the timestamp + return mLastUpdatedTimestamp.compareAndSet(lastUpdateTs, currentTs); + } + return false; + } + + /** + * Lazily initializes the hash map. + * Only one caller gets to initialize the map while all others are blocked. + * After the initialization, the map must not be null. + */ + private void maybeInitialize(Set workers) { + if (mLookup == null) { + synchronized (mInitLock) { + // only one thread should reach here + // test again to skip re-initialization + if (mLookup == null) { + build(workers); + mLastWorkers.set(workers); + mLastUpdatedTimestamp.set(System.nanoTime()); + } + } + } + } + + /** + * Update the active nodes. + * @param workers + * @param lastWorkers + */ + private void updateActiveNodes(Set workers, + Set lastWorkers) { + HashSet workerSet = new HashSet<>(workers); + HashSet lastWorkerSet = new HashSet<>(lastWorkers); + HashSet toRemove = new HashSet<>(); + HashSet toAdd = new HashSet<>(); + // remove the workers that are no longer active + for (WorkerIdentity worker : lastWorkerSet) { + if (!workerSet.contains(worker)) { + toRemove.add(worker); + } + } + // add the new workers + for (WorkerIdentity worker : workerSet) { + if (!lastWorkerSet.contains(worker)) { + toAdd.add(worker); + } + } + // remove the workers that are no longer active + remove(toRemove); + // add the new workers + add(toAdd); + } + + @VisibleForTesting + WorkerIdentity get(String key, int index) { + Preconditions.checkState(mLookup != null, "Hash provider is not properly initialized"); + if (mLookup.length == 0) { + return null; + } + final int id = Math.abs(hash(String.format("%s%d%d", key, index, INDEX_SEED)) % mLookup.length); + return mLookup[id]; + } + + @VisibleForTesting + long getUpdateCount() { + return mUpdateCount.sum(); + } + + @VisibleForTesting + private void build( + Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), "worker list is empty"); + mLookup = new WorkerIdentity[0]; + add(workers); + } + + private void add(Set toAdd) { + mPermutations.values().forEach(Permutation::reset); + for (WorkerIdentity backend : toAdd) { + mPermutations.put(backend, newPermutation(backend)); + } + mLookup = newLookup(); + } + + private void remove(Collection toRemove) { + toRemove.forEach(mPermutations::remove); + mPermutations.values().forEach(Permutation::reset); + mLookup = newLookup(); + } + + int hash(String key) { + return HASH_FUNCTION.hashString(key, UTF_8).asInt(); + } + + /** + * Creates a new permutation for the given backend. + * + * @param backend the source of the permutation + * @return a new permutation + */ + private Permutation newPermutation(WorkerIdentity backend) { + return new Permutation(backend, mLookupSize); + } + + /** + * Creates a new lookup table. + * + * @return the new lookup table + */ + private WorkerIdentity[] newLookup() { + final WorkerIdentity[] lookup = new WorkerIdentity[mLookupSize]; + final AtomicInteger filled = new AtomicInteger(); + do { + mPermutations.values().forEach(permutation -> { + final int pos = permutation.next(); + if (lookup[pos] == null) { + lookup[pos] = permutation.backend(); + } + }); + } while (filled.incrementAndGet() < mLookupSize); + return lookup; + } + + class Permutation { + /** + * Seed used to compute the state offset. + */ + private static final int OFFSET_SEED = 0xDEADBABE; + + /** + * Seed used to compute the state skip. + */ + private static final int SKIP_SEED = 0xDEADDEAD; + + /** + * The backend associated to the permutation. + */ + private final WorkerIdentity mBackend; + + /** + * The size of the lookup table. + */ + private final int mSize; + + /** + * Position where to start. + */ + private final int mOffset; + + /** + * Positions to skip. + */ + private final int mSkip; + + /** + * The current value of the permutation. + */ + private int mCurrent; + + int hash1(String key) { + return HASH_FUNCTION.hashString(key, UTF_8).asInt(); + } + + int hash2(String key) { + // use XXHash + return Hashing.crc32c().hashString(key, UTF_8).asInt(); + } + + /** + * Constructor with parameters. + * + * @param backend the backend to wrap + * @param size size of the lookup table + */ + Permutation(WorkerIdentity backend, int size) { + mSize = size; + mBackend = backend; + final HashCode hashCode = HASH_FUNCTION.newHasher() + .putObject(backend, WorkerIdentity.HashFunnel.INSTANCE).hash(); + mOffset = hash1(String.format("%d%d", + hashCode.asInt(), OFFSET_SEED)) % size; + mSkip = hash2(String.format("%d%d", + hashCode.asInt(), SKIP_SEED)) % (size - 1) + 1; + mCurrent = mOffset; + } + + /** + * Returns the backend related to the current permutation. + * + * @return the backend related to the current permutation + */ + WorkerIdentity backend() { + return mBackend; + } + + /** + * Returns the next value in the permutation. + * + * @return the next value + */ + int next() { + mCurrent = (mCurrent + mSkip) % mSize; + return Math.abs(mCurrent); + } + + /** + * Resets the permutation for the new lookup size. + */ + void reset() { + mCurrent = mOffset; + } + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashPolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashPolicy.java new file mode 100644 index 000000000000..d463dd4885a2 --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashPolicy.java @@ -0,0 +1,108 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import alluxio.Constants; +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.AlluxioConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentity; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * An impl of Multi Probe Hash Policy. + * + * A policy where a file path is matched to worker(s) by the multi-probe algorithm. + * The hash algorithm makes sure the same path maps to the same worker sequence. + * On top of that, this hashing algorithm makes sure worker membership changes incur minimal + * hash changes. + * + * alluxio.user.multi.probe.hash.probe.num: + * This is the number of probes in the multi-probe hashing algorithm. + * In the multi-probe hashing algorithm, the bigger the number of probes, + * the smaller the variance of this hashing algorithm will be. + * But more probes will consume more time and memory. + * + * Relevant paper: + * https://arxiv.org/pdf/1505.00062.pdf + */ +public class MultiProbeHashPolicy implements WorkerLocationPolicy { + private static final Logger LOG = LoggerFactory.getLogger(MultiProbeHashPolicy.class); + private final MultiProbeHashProvider mHashProvider; + + /** + * Constructs a new {@link MultiProbeHashPolicy}. + * + * @param conf the configuration used by the policy + */ + public MultiProbeHashPolicy(AlluxioConfiguration conf) { + LOG.debug("%s is chosen for user worker hash algorithm", + conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY)); + mHashProvider = new MultiProbeHashProvider(100, Constants.SECOND_MS, + conf.getInt(PropertyKey.USER_MULTI_PROBE_HASH_PROBE_NUM)); + } + + @Override + public List getPreferredWorkers(WorkerClusterView workerClusterView, + String fileId, int count) throws ResourceExhaustedException { + if (workerClusterView.size() < count) { + throw new ResourceExhaustedException(String.format( + "Not enough workers in the cluster %d workers in the cluster but %d required", + workerClusterView.size(), count)); + } + Set workerIdentities = workerClusterView.workerIds(); + mHashProvider.refresh(workerIdentities); + List workers = mHashProvider.getMultiple(fileId, count); + if (workers.size() != count) { + throw new ResourceExhaustedException(String.format( + "Found %d workers from the hash ring but %d required", workers.size(), count)); + } + ImmutableList.Builder builder = ImmutableList.builder(); + for (WorkerIdentity worker : workers) { + Optional optionalWorkerInfo = workerClusterView.getWorkerById(worker); + final WorkerInfo workerInfo; + if (optionalWorkerInfo.isPresent()) { + workerInfo = optionalWorkerInfo.get(); + } else { + // the worker returned by the policy does not exist in the cluster view + // supplied by the client. + // this can happen when the membership changes and some callers fail to update + // to the latest worker cluster view. + // in this case, just skip this worker + LOG.debug("Inconsistency between caller's view of cluster and that of " + + "the consistent hash policy's: worker {} selected by policy does not exist in " + + "caller's view {}. Skipping this worker.", + worker, workerClusterView); + continue; + } + + BlockWorkerInfo blockWorkerInfo = new BlockWorkerInfo( + worker, workerInfo.getAddress(), workerInfo.getCapacityBytes(), + workerInfo.getUsedBytes(), workerInfo.getState() == WorkerState.LIVE + ); + builder.add(blockWorkerInfo); + } + List infos = builder.build(); + return infos; + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashProvider.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashProvider.java new file mode 100644 index 000000000000..0842d014b1fb --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/MultiProbeHashProvider.java @@ -0,0 +1,337 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static com.google.common.hash.Hashing.murmur3_32_fixed; +import static java.nio.charset.StandardCharsets.UTF_8; + +import alluxio.Constants; +import alluxio.wire.WorkerIdentity; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A multi probe hashing algorithm implementation. + */ +@VisibleForTesting +@ThreadSafe +public class MultiProbeHashProvider { + private final int mProbes; + private final int mMaxAttempts; + private final long mWorkerInfoUpdateIntervalNs; + private static final HashFunction HASH_FUNCTION = murmur3_32_fixed(); + + private final AtomicLong mLastUpdatedTimestamp = new AtomicLong(System.nanoTime()); + /** + * Counter for how many times the map has been updated. + */ + private final LongAdder mUpdateCount = new LongAdder(); + + private final AtomicReference> mLastWorkers = + new AtomicReference<>(ImmutableSet.of()); + + /** Common default seed to use during hashing of the nodes. */ + private static final int SEED = 0xDEADBEEF; + + /** Internal representation of the consistent hashing key ring. */ + @Nullable + private List mRing; + + /** + * Lock to protect the lazy initialization of {@link #mRing}. + */ + private final Object mInitLock = new Object(); + + /** + * Constructor. + * + * @param maxAttempts max attempts to rehash + * @param workerListTtlMs interval between retries + * @param probes number of probes to use + */ + public MultiProbeHashProvider(int maxAttempts, long workerListTtlMs, int probes) { + mMaxAttempts = maxAttempts; + mWorkerInfoUpdateIntervalNs = workerListTtlMs * Constants.MS_NANO; + mProbes = probes; + } + + /** + * Finds multiple workers from the hash ring. + * + * @param key the key to hash on + * @param count the expected number of workers + * @return a list of workers following the hash ring + */ + public List getMultiple(String key, int count) { + Set workers = new LinkedHashSet<>(); + int attempts = 0; + while (workers.size() < count && attempts < mMaxAttempts) { + attempts++; + WorkerIdentity selectedWorker = get(key, attempts); + workers.add(selectedWorker); + } + return ImmutableList.copyOf(workers); + } + + /** + * Initializes or refreshes the worker list using the given list of workers. + * @param workers the up-to-date worker list + */ + public void refresh(Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), + "cannot refresh hash provider with empty worker list"); + maybeInitialize(workers); + // check if the worker list has expired + if (shouldRebuildActiveNodesMapExclusively()) { + // thread safety is valid provided that build() takes less than + // WORKER_INFO_UPDATE_INTERVAL_NS, so that before next update the current update has been + // finished + Set lastWorkerIds = mLastWorkers.get(); + if (!workers.equals(lastWorkerIds)) { + updateActiveNodes(workers, mLastWorkers.get()); + mLastWorkers.set(workers); + mUpdateCount.increment(); + } + } + // otherwise, do nothing and proceed with stale worker list. on next access, the worker list + // will have been updated by another thread + } + + /** + * Check whether the current map has expired and needs update. + * If called by multiple threads concurrently, only one of the callers will get a return value + * of true, so that the map will be updated only once. The other threads will not try to + * update and use stale information instead. + */ + private boolean shouldRebuildActiveNodesMapExclusively() { + // check if the worker list has expired + long lastUpdateTs = mLastUpdatedTimestamp.get(); + long currentTs = System.nanoTime(); + if (currentTs - lastUpdateTs > mWorkerInfoUpdateIntervalNs) { + // use CAS to only allow one thread to actually update the timestamp + return mLastUpdatedTimestamp.compareAndSet(lastUpdateTs, currentTs); + } + return false; + } + + /** + * Lazily initializes the hash ring. + * Only one caller gets to initialize the map while all others are blocked. + * After the initialization, the map must not be null. + */ + private void maybeInitialize(Set workers) { + if (mRing == null) { + synchronized (mInitLock) { + // only one thread should reach here + // test again to skip re-initialization + if (mRing == null) { + build(workers); + mLastWorkers.set(workers); + mLastUpdatedTimestamp.set(System.nanoTime()); + } + } + } + } + + /** + * Update the active nodes. + * @param workers + * @param lastWorkers + */ + private void updateActiveNodes(Set workers, + Set lastWorkers) { + HashSet workerSet = new HashSet<>(workers); + HashSet lastWorkerSet = new HashSet<>(lastWorkers); + // remove the workers that are no longer active + for (WorkerIdentity worker : lastWorkerSet) { + if (!workerSet.contains(worker)) { + remove(worker); + } + } + // add the new workers + for (WorkerIdentity worker : workerSet) { + if (!lastWorkerSet.contains(worker)) { + add(worker); + } + } + } + + @VisibleForTesting + WorkerIdentity get(String key, int index) { + Preconditions.checkState(mRing != null, "Hash provider is not properly initialized"); + if (mRing.isEmpty()) { + return null; + } + + final int id = getIndex(String.format("%s%d", key, index)); + return mRing.get(id).mResource; + } + + @VisibleForTesting + Set getLastWorkers() { + return mLastWorkers.get(); + } + + @VisibleForTesting + long getUpdateCount() { + return mUpdateCount.sum(); + } + + @VisibleForTesting + private void build( + Set workers) { + Preconditions.checkArgument(!workers.isEmpty(), "worker list is empty"); + mRing = new ArrayList<>(); + for (WorkerIdentity worker : workers) { + add(worker); + } + } + + private void add(WorkerIdentity node) { + Preconditions.checkState(mRing != null, "Hash provider is not properly initialized"); + final Point bucket = wrap(node); + final int pos = Collections.binarySearch(mRing, bucket); + final int index = -(pos + 1); + mRing.add(index, bucket); + } + + private void remove(WorkerIdentity node) { + Preconditions.checkState(mRing != null, "Hash provider is not properly initialized"); + final Point bucket = wrap(node); + final int pos = Collections.binarySearch(mRing, bucket); + mRing.remove(pos); + } + + /** + * Wraps the given resource into a point in the ring. + * + * @param resource the resource to wrap + * @return the related point in the ring + */ + private Point wrap(WorkerIdentity resource) { + final HashCode hashCode = HASH_FUNCTION.newHasher() + .putObject(resource, WorkerIdentity.HashFunnel.INSTANCE).hash(); + final int hash = hash(String.format("%d%d", hashCode.asInt(), SEED)); + return new Point(resource, hash); + } + + /** + * Computes the index of the point related to the given key. + * + * @param key key to search + * @return index of the related point + */ + private int hash(String key) { + return HASH_FUNCTION.hashString(key, UTF_8).asInt(); + } + + private int getIndex(String key) { + int index = 0; + int minDistance = Integer.MAX_VALUE; + for (int i = 0; i < mProbes; i++) { + final int hashValue = hash(String.format("%s%d", key, i)); + int low = 0; + int high = mRing.size(); + while (low < high) { + final int mid = (low + high) >>> 1; + if (mRing.get(mid).mHash > hashValue) { + high = mid; + } else { + low = mid + 1; + } + } + + /* + * This check implements the concept of ring. + * If we exceed the last we start over. + */ + if (low >= mRing.size()) { + low = 0; + } + + final int distance = mRing.get(low).distance(hashValue); + if (distance < minDistance) { + minDistance = distance; + index = low; + } + } + return index; + } + + class Point implements Comparable { + /** The resource to store. */ + final WorkerIdentity mResource; + + /** The position in the consistent hash ring. */ + final int mHash; + + /** + * Constructor with parameters. + * @param resource the resource to store + * @param hashValue the position in the consistent hash ring + */ + Point(WorkerIdentity resource, int hashValue) { + mResource = resource; + mHash = hashValue; + } + + /** + * Returns the distance between the given hash + * and the hash of the current bucket. + * + * @param hash the hash to test + * @return the related distance + */ + int distance(int hash) { + return Math.abs(mHash - hash); + } + + /** + * {@inheritDoc} + */ + @Override + public int compareTo(Point other) { + return Integer.compare(mHash, other.mHash); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof Point)) { + return false; + } + return Objects.equals(this, other); + } + + @Override + public int hashCode() { + return mHash; + } + } +} diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java index edab6f97b515..2a4c43b0a70c 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicy.java @@ -69,14 +69,22 @@ private Factory() {} // prevent instantiation */ public static WorkerLocationPolicy create(AlluxioConfiguration conf) { try { - WorkerLocationPolicy workerLocationPolicy = CommonUtils.createNewClassInstance( - conf.getClass(PropertyKey.USER_WORKER_SELECTION_POLICY), - new Class[] {AlluxioConfiguration.class}, new Object[] {conf}); + // Find the name of the class corresponding to the enumerated hash algorithm + String policyName = (String) conf.get(PropertyKey.USER_WORKER_SELECTION_POLICY); + WorkerLocationPolicyEnum policyEnum = + WorkerLocationPolicyEnum.valueOf(policyName.toUpperCase()); + Class policyClass = Class.forName(policyEnum.getPolicyName()); + + WorkerLocationPolicy workerLocationPolicy = + (WorkerLocationPolicy) CommonUtils.createNewClassInstance( + policyClass, new Class[] {AlluxioConfiguration.class}, new Object[] {conf}); LOG.debug("Using worker location policy: {}", workerLocationPolicy.getClass().getSimpleName()); return workerLocationPolicy; } catch (ClassCastException e) { throw new RuntimeException(e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); } } } diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicyEnum.java b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicyEnum.java new file mode 100644 index 000000000000..ce7ab69c0da6 --- /dev/null +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/dora/WorkerLocationPolicyEnum.java @@ -0,0 +1,38 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +/** + * The enum of worker location policy. + */ +public enum WorkerLocationPolicyEnum { + CONSISTENT("alluxio.client.file.dora.ConsistentHashPolicy"), + JUMP("alluxio.client.file.dora.JumpHashPolicy"), + KETAMA("alluxio.client.file.dora.KetamaHashPolicy"), + MAGLEV("alluxio.client.file.dora.MaglevHashPolicy"), + MULTI_PROBE("alluxio.client.file.dora.MultiProbeHashPolicy"), + LOCAL("alluxio.client.file.dora.LocalWorkerPolicy"), + REMOTE_ONLY("alluxio.client.file.dora.RemoteOnlyPolicy"); + + private final String mPolicyName; + + WorkerLocationPolicyEnum(String policyName) { + mPolicyName = policyName; + } + + /** + * @return the hash policy name + */ + public String getPolicyName() { + return mPolicyName; + } +} diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashPolicyTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashPolicyTest.java index 0d4e99b51ba8..51622f707719 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashPolicyTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashPolicyTest.java @@ -42,7 +42,7 @@ public class ConsistentHashPolicyTest { public void setup() { mConf = new InstancedConfiguration(Configuration.copyProperties()); mConf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, - "alluxio.client.file.dora.ConsistentHashPolicy"); + "CONSISTENT"); } @Test diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashProviderTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashProviderTest.java index 960f363268e4..895c4cdaa1e2 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashProviderTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/ConsistentHashProviderTest.java @@ -45,7 +45,8 @@ public class ConsistentHashProviderTest { @Test public void uninitializedThrowsException() { - ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS); + ConsistentHashProvider provider = new ConsistentHashProvider( + 1, WORKER_LIST_TTL_MS, NUM_VIRTUAL_NODES); assertThrows(IllegalStateException.class, () -> provider.get(OBJECT_KEY, 0)); } @@ -60,10 +61,11 @@ public void uninitializedThrowsException() { * the bound is likely going to change. */ public void virtualNodeDistribution() { - ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS); + ConsistentHashProvider provider = new ConsistentHashProvider( + 1, WORKER_LIST_TTL_MS, NUM_VIRTUAL_NODES); Set workerList = generateRandomWorkerList(50); // set initial state - provider.refresh(workerList, 2000); + provider.refresh(workerList); NavigableMap map = provider.getActiveNodesMap(); Map count = new HashMap<>(); long last = Integer.MIN_VALUE; @@ -90,7 +92,8 @@ private double calcSDoverMean(Collection list) { @Test public void concurrentInitialization() { - ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS); + ConsistentHashProvider provider = new ConsistentHashProvider( + 1, WORKER_LIST_TTL_MS, NUM_VIRTUAL_NODES); final int numThreads = 16; CountDownLatch startSignal = new CountDownLatch(numThreads); ExecutorService executorService = Executors.newFixedThreadPool(numThreads); @@ -107,7 +110,7 @@ public void concurrentInitialization() { } catch (InterruptedException e) { fail("interrupted"); } - provider.refresh(list, NUM_VIRTUAL_NODES); + provider.refresh(list); return provider.getActiveNodesMap(); }); }) @@ -145,8 +148,9 @@ public void concurrentRefresh() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(numThreads); for (int repeat = 0; repeat < 100; repeat++) { - ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS); - provider.refresh(generateRandomWorkerList(50), NUM_VIRTUAL_NODES); + ConsistentHashProvider provider = new ConsistentHashProvider( + 1, WORKER_LIST_TTL_MS, NUM_VIRTUAL_NODES); + provider.refresh(generateRandomWorkerList(50)); long initialCount = provider.getUpdateCount(); Thread.sleep(WORKER_LIST_TTL_MS); @@ -166,7 +170,7 @@ public void concurrentRefresh() throws Exception { } catch (InterruptedException e) { fail("interrupted"); } - provider.refresh(list, NUM_VIRTUAL_NODES); + provider.refresh(list); }); }) .collect(Collectors.toList()); @@ -192,10 +196,11 @@ public void concurrentRefresh() throws Exception { @Test public void workerListTtl() throws Exception { - ConsistentHashProvider provider = new ConsistentHashProvider(1, WORKER_LIST_TTL_MS); + ConsistentHashProvider provider = new ConsistentHashProvider( + 1, WORKER_LIST_TTL_MS, NUM_VIRTUAL_NODES); Set workerList = generateRandomWorkerList(5); // set initial state - provider.refresh(workerList, NUM_VIRTUAL_NODES); + provider.refresh(workerList); long initialUpdateCount = provider.getUpdateCount(); assertEquals(workerList, provider.getLastWorkers()); assertEquals( @@ -204,7 +209,7 @@ public void workerListTtl() throws Exception { // before TTL is up, refresh does not change the internal states of the provider Set newList = generateRandomWorkerList(5); - provider.refresh(newList, NUM_VIRTUAL_NODES); + provider.refresh(newList); assertEquals(0, provider.getUpdateCount() - initialUpdateCount); assertNotEquals(newList, workerList); assertEquals(workerList, provider.getLastWorkers()); @@ -214,7 +219,7 @@ public void workerListTtl() throws Exception { // after TTL expires, refresh should change the worker list and the active nodes map Thread.sleep(WORKER_LIST_TTL_MS); - provider.refresh(newList, NUM_VIRTUAL_NODES); + provider.refresh(newList); assertEquals(1, provider.getUpdateCount() - initialUpdateCount); assertEquals(newList, provider.getLastWorkers()); assertEquals( diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/JumpHashPolicyTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/JumpHashPolicyTest.java new file mode 100644 index 000000000000..56f629d6dc95 --- /dev/null +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/JumpHashPolicyTest.java @@ -0,0 +1,170 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.Configuration; +import alluxio.conf.InstancedConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentityTestUtils; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerNetAddress; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class JumpHashPolicyTest { + InstancedConfiguration mConf; + + @Before + public void setup() { + mConf = new InstancedConfiguration(Configuration.copyProperties()); + mConf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, + "JUMP"); + } + + @Test + public void getOneWorker() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof JumpHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 1); + assertEquals(1, assignedWorkers.size()); + assertTrue(contains(workers, assignedWorkers.get(0))); + + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 1 out of no workers will result in an error + policy.getPreferredWorkers(new WorkerClusterView(ImmutableList.of()), "hdfs://a/b/c", 1); + }); + } + + @Test + public void getMultipleWorkers() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof JumpHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 2); + assertEquals(2, assignedWorkers.size()); + assertTrue(assignedWorkers.stream().allMatch(w -> contains(workers, w))); + // The two workers should be different + assertNotEquals(assignedWorkers.get(0).getNetAddress().getHost(), + assignedWorkers.get(1).getNetAddress().getHost()); + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 2 out of 1 worker will result in an error + policy.getPreferredWorkers( + new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))), + "hdfs://a/b/c", 2); + }); + } + + /** + * Tests that the policy returns latest worker address even though the worker's ID + * has stayed the same during the refresh interval. + */ + @Test + public void workerAddrUpdateWithIdUnchanged() throws Exception { + JumpHashPolicy policy = new JumpHashPolicy(mConf); + List workers = new ArrayList<>(); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host1")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(2L)) + .setAddress(new WorkerNetAddress().setHost("host2")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List selectedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals("host1", + selectedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + + // now the worker 1 has migrated to host 3 + workers.set(0, new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host3")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List updatedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals( + selectedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList()), + updatedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList())); + assertEquals("host3", + updatedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + } + + private boolean contains(WorkerClusterView workers, BlockWorkerInfo targetWorker) { + // BlockWorkerInfo's equality is delegated to the WorkerNetAddress + return workers.stream().anyMatch(w -> + w.getAddress().equals(targetWorker.getNetAddress())); + } +} diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/KetamaHashPolicyTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/KetamaHashPolicyTest.java new file mode 100644 index 000000000000..0fed459706c9 --- /dev/null +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/KetamaHashPolicyTest.java @@ -0,0 +1,169 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.Configuration; +import alluxio.conf.InstancedConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentityTestUtils; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerNetAddress; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class KetamaHashPolicyTest { + InstancedConfiguration mConf; + + @Before + public void setup() { + mConf = new InstancedConfiguration(Configuration.copyProperties()); + mConf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, + "KETAMA"); + } + + @Test + public void getOneWorker() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof KetamaHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 1); + assertEquals(1, assignedWorkers.size()); + assertTrue(contains(workers, assignedWorkers.get(0))); + + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 1 out of no workers will result in an error + policy.getPreferredWorkers(new WorkerClusterView(ImmutableList.of()), "hdfs://a/b/c", 1); + }); + } + + @Test + public void getMultipleWorkers() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof KetamaHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 2); + assertEquals(2, assignedWorkers.size()); + assertTrue(assignedWorkers.stream().allMatch(w -> contains(workers, w))); + // The order of the workers should be consistent + assertEquals(assignedWorkers.get(0).getNetAddress().getHost(), "master1"); + assertEquals(assignedWorkers.get(1).getNetAddress().getHost(), "master2"); + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 2 out of 1 worker will result in an error + policy.getPreferredWorkers( + new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))), + "hdfs://a/b/c", 2); + }); + } + + /** + * Tests that the policy returns latest worker address even though the worker's ID + * has stayed the same during the refresh interval. + */ + @Test + public void workerAddrUpdateWithIdUnchanged() throws Exception { + KetamaHashPolicy policy = new KetamaHashPolicy(mConf); + List workers = new ArrayList<>(); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host1")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(2L)) + .setAddress(new WorkerNetAddress().setHost("host2")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List selectedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals("host1", + selectedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + + // now the worker 1 has migrated to host 3 + workers.set(0, new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host3")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List updatedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals( + selectedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList()), + updatedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList())); + assertEquals("host3", + updatedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + } + + private boolean contains(WorkerClusterView workers, BlockWorkerInfo targetWorker) { + // BlockWorkerInfo's equality is delegated to the WorkerNetAddress + return workers.stream().anyMatch(w -> + w.getAddress().equals(targetWorker.getNetAddress())); + } +} diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/LocalWorkerPolicyTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/LocalWorkerPolicyTest.java index 67c0a0aa7fdd..770d6efc0b55 100644 --- a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/LocalWorkerPolicyTest.java +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/LocalWorkerPolicyTest.java @@ -46,7 +46,7 @@ public class LocalWorkerPolicyTest { public void setup() { mConf = new InstancedConfiguration(Configuration.copyProperties()); mConf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, - "alluxio.client.file.dora.LocalWorkerPolicy"); + "LOCAL"); mConf.set(PropertyKey.USER_HOSTNAME, LOCAL_HOSTNAME); } diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/MaglevtHashPolicyTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/MaglevtHashPolicyTest.java new file mode 100644 index 000000000000..f5b7387f6f80 --- /dev/null +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/MaglevtHashPolicyTest.java @@ -0,0 +1,169 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.Configuration; +import alluxio.conf.InstancedConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentityTestUtils; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerNetAddress; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class MaglevtHashPolicyTest { + InstancedConfiguration mConf; + + @Before + public void setup() { + mConf = new InstancedConfiguration(Configuration.copyProperties()); + mConf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, + "MAGLEV"); + } + + @Test + public void getOneWorker() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof MaglevHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 1); + assertEquals(1, assignedWorkers.size()); + assertTrue(contains(workers, assignedWorkers.get(0))); + + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 1 out of no workers will result in an error + policy.getPreferredWorkers(new WorkerClusterView(ImmutableList.of()), "hdfs://a/b/c", 1); + }); + } + + @Test + public void getMultipleWorkers() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof MaglevHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 2); + assertEquals(2, assignedWorkers.size()); + assertTrue(assignedWorkers.stream().allMatch(w -> contains(workers, w))); + // The order of the workers should be consistent + assertEquals(assignedWorkers.get(0).getNetAddress().getHost(), "master1"); + assertEquals(assignedWorkers.get(1).getNetAddress().getHost(), "master2"); + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 2 out of 1 worker will result in an error + policy.getPreferredWorkers( + new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))), + "hdfs://a/b/c", 2); + }); + } + + /** + * Tests that the policy returns latest worker address even though the worker's ID + * has stayed the same during the refresh interval. + */ + @Test + public void workerAddrUpdateWithIdUnchanged() throws Exception { + MaglevHashPolicy policy = new MaglevHashPolicy(mConf); + List workers = new ArrayList<>(); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host1")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(2L)) + .setAddress(new WorkerNetAddress().setHost("host2")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List selectedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals("host1", + selectedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + + // now the worker 1 has migrated to host 3 + workers.set(0, new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host3")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List updatedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals( + selectedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList()), + updatedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList())); + assertEquals("host3", + updatedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + } + + private boolean contains(WorkerClusterView workers, BlockWorkerInfo targetWorker) { + // BlockWorkerInfo's equality is delegated to the WorkerNetAddress + return workers.stream().anyMatch(w -> + w.getAddress().equals(targetWorker.getNetAddress())); + } +} diff --git a/dora/core/client/fs/src/test/java/alluxio/client/file/dora/MultiProbeHashPolicyTest.java b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/MultiProbeHashPolicyTest.java new file mode 100644 index 000000000000..604e7829406b --- /dev/null +++ b/dora/core/client/fs/src/test/java/alluxio/client/file/dora/MultiProbeHashPolicyTest.java @@ -0,0 +1,169 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.client.file.dora; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import alluxio.client.block.BlockWorkerInfo; +import alluxio.conf.Configuration; +import alluxio.conf.InstancedConfiguration; +import alluxio.conf.PropertyKey; +import alluxio.exception.status.ResourceExhaustedException; +import alluxio.membership.WorkerClusterView; +import alluxio.wire.WorkerIdentityTestUtils; +import alluxio.wire.WorkerInfo; +import alluxio.wire.WorkerNetAddress; +import alluxio.wire.WorkerState; + +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class MultiProbeHashPolicyTest { + InstancedConfiguration mConf; + + @Before + public void setup() { + mConf = new InstancedConfiguration(Configuration.copyProperties()); + mConf.set(PropertyKey.USER_WORKER_SELECTION_POLICY, + "MULTI_PROBE"); + } + + @Test + public void getOneWorker() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof MultiProbeHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 1); + assertEquals(1, assignedWorkers.size()); + assertTrue(contains(workers, assignedWorkers.get(0))); + + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 1 out of no workers will result in an error + policy.getPreferredWorkers(new WorkerClusterView(ImmutableList.of()), "hdfs://a/b/c", 1); + }); + } + + @Test + public void getMultipleWorkers() throws Exception { + WorkerLocationPolicy policy = WorkerLocationPolicy.Factory.create(mConf); + assertTrue(policy instanceof MultiProbeHashPolicy); + // Prepare a worker list + WorkerClusterView workers = new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0), + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(2)) + .setAddress(new WorkerNetAddress() + .setHost("master2").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))); + + List assignedWorkers = policy.getPreferredWorkers(workers, "hdfs://a/b/c", 2); + assertEquals(2, assignedWorkers.size()); + assertTrue(assignedWorkers.stream().allMatch(w -> contains(workers, w))); + // The order of the workers should be consistent + assertEquals(assignedWorkers.get(0).getNetAddress().getHost(), "master1"); + assertEquals(assignedWorkers.get(1).getNetAddress().getHost(), "master2"); + assertThrows(ResourceExhaustedException.class, () -> { + // Getting 2 out of 1 worker will result in an error + policy.getPreferredWorkers( + new WorkerClusterView(Arrays.asList( + new WorkerInfo() + .setIdentity(WorkerIdentityTestUtils.ofLegacyId(1)) + .setAddress(new WorkerNetAddress() + .setHost("master1").setRpcPort(29998).setDataPort(29999).setWebPort(30000)) + .setCapacityBytes(1024) + .setUsedBytes(0))), + "hdfs://a/b/c", 2); + }); + } + + /** + * Tests that the policy returns latest worker address even though the worker's ID + * has stayed the same during the refresh interval. + */ + @Test + public void workerAddrUpdateWithIdUnchanged() throws Exception { + MultiProbeHashPolicy policy = new MultiProbeHashPolicy(mConf); + List workers = new ArrayList<>(); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host1")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + workers.add(new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(2L)) + .setAddress(new WorkerNetAddress().setHost("host2")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List selectedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals("host1", + selectedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + + // now the worker 1 has migrated to host 3 + workers.set(0, new WorkerInfo().setIdentity(WorkerIdentityTestUtils.ofLegacyId(1L)) + .setAddress(new WorkerNetAddress().setHost("host3")) + .setCapacityBytes(0) + .setUsedBytes(0) + .setState(WorkerState.LIVE)); + List updatedWorkers = + policy.getPreferredWorkers(new WorkerClusterView(workers), "fileId", 2); + assertEquals( + selectedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList()), + updatedWorkers.stream().map(BlockWorkerInfo::getIdentity).collect(Collectors.toList())); + assertEquals("host3", + updatedWorkers.stream() + .filter(w -> w.getIdentity().equals(WorkerIdentityTestUtils.ofLegacyId(1L))) + .findFirst() + .get() + .getNetAddress() + .getHost()); + } + + private boolean contains(WorkerClusterView workers, BlockWorkerInfo targetWorker) { + // BlockWorkerInfo's equality is delegated to the WorkerNetAddress + return workers.stream().anyMatch(w -> + w.getAddress().equals(targetWorker.getNetAddress())); + } +} diff --git a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java index c94664e2f16a..ebf7fc115f21 100755 --- a/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/dora/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -5691,6 +5691,39 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); + public static final PropertyKey USER_KETAMA_HASH_REPLICAS = + intBuilder(Name.USER_KETAMA_HASH_REPLICAS) + .setDefaultValue(200) + .setDescription("This is the value of replicas in the ketama hashing " + + "algorithm. When workers changes, it will guarantee the hash table is " + + "changed only in a minimal. The value of replicas should be X times " + + "the physical nodes in the cluster, where X is a balance between " + + "efficiency and cost.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.CLIENT) + .build(); + public static final PropertyKey USER_MAGLEV_HASH_LOOKUP_SIZE = + intBuilder(Name.USER_MAGLEV_HASH_LOOKUP_SIZE) + .setDefaultValue(65537) + .setDescription("This is the size of the lookup table in the maglev hashing " + + "algorithm. It must be a prime number. In the maglev hashing, " + + "it will generate a lookup table for workers. " + + "The bigger the size of the lookup table, " + + "the smaller the variance of this hashing algorithm will be. " + + "But bigger look up table will consume more time and memory") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.CLIENT) + .build(); + public static final PropertyKey USER_MULTI_PROBE_HASH_PROBE_NUM = + intBuilder(Name.USER_MULTI_PROBE_HASH_PROBE_NUM) + .setDefaultValue(21) + .setDescription("This is the number of probes in the multi-probe hashing " + + "algorithm. In the multi-probe hashing algorithm, the bigger the number of probes, " + + "the smaller the variance of this hashing algorithm will be. But more probes " + + "will consume more time and memory") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.CLIENT) + .build(); public static final PropertyKey USER_FILE_WRITE_TYPE_DEFAULT = enumBuilder(Name.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class) .setDefaultValue(WriteType.CACHE_THROUGH) @@ -6277,7 +6310,7 @@ public String toString() { .build(); public static final PropertyKey USER_WORKER_SELECTION_POLICY = classBuilder(Name.USER_WORKER_SELECTION_POLICY) - .setDefaultValue("alluxio.client.file.dora.ConsistentHashPolicy") + .setDefaultValue("CONSISTENT") .setDescription("The policy a client uses to map a file path to a worker address. " + "The only option is `alluxio.client.file.dora.ConsistentHashPolicy`. " + "Other options are for internal tests only and not for real deployments.") @@ -8342,6 +8375,12 @@ public static final class Name { "alluxio.user.client.cache.include.mtime"; public static final String USER_CLIENT_REPORT_VERSION_ENABLED = "alluxio.user.client.report.version.enabled"; + public static final String USER_KETAMA_HASH_REPLICAS = + "alluxio.user.ketama.hash.replicas"; + public static final String USER_MAGLEV_HASH_LOOKUP_SIZE = + "alluxio.user.maglev.hash.lookup.size"; + public static final String USER_MULTI_PROBE_HASH_PROBE_NUM = + "alluxio.user.multi.probe.hash.probe.num"; public static final String USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER = "alluxio.user.consistent.hash.virtual.node.count.per.worker"; public static final String USER_CONF_CLUSTER_DEFAULT_ENABLED = diff --git a/dora/shell/src/main/java/alluxio/cli/CheckCluster.java b/dora/shell/src/main/java/alluxio/cli/CheckCluster.java index 056dfa667913..ff9aa0b048de 100644 --- a/dora/shell/src/main/java/alluxio/cli/CheckCluster.java +++ b/dora/shell/src/main/java/alluxio/cli/CheckCluster.java @@ -113,7 +113,7 @@ private static void handleException(Exception e) { private static void setupConfiguration() { Configuration.set(PropertyKey.USER_WORKER_SELECTION_POLICY, - "alluxio.client.file.dora.ConsistentHashPolicy"); + "CONSISTENT"); Configuration.set(PropertyKey.DORA_CLIENT_UFS_FALLBACK_ENABLED, false); } diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchMode.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchMode.java index 2bed5c724d43..8c64c70c143c 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchMode.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchMode.java @@ -15,7 +15,11 @@ * WorkerBenchMode, HASH or LOCAL_ONLY. */ public enum WorkerBenchMode { - HASH("HASH"), + CONSISTENT("CONSISTENT"), + JUMP("JUMP"), + KETAMA("KETAMA"), + MAGLEV("MAGLEV"), + MULTI_PROBE("MULTI_PROBE"), LOCAL_ONLY("LOCAL_ONLY"), REMOTE_ONLY("REMOTE_ONLY"); diff --git a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java index 65efe80fae44..cfe00da2537d 100644 --- a/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java +++ b/dora/stress/common/src/main/java/alluxio/stress/worker/WorkerBenchParameters.java @@ -104,7 +104,7 @@ public final class WorkerBenchParameters extends FileSystemParameters { + "LOCAL_ONLY -> alluxio.client.file.dora.LocalWorkerPolicy" + "REMOTE_ONLY -> alluxio.client.file.dora.RemoteOnlyPolicy" + "The default is HASH.") - public WorkerBenchMode mMode = WorkerBenchMode.HASH; + public WorkerBenchMode mMode = WorkerBenchMode.CONSISTENT; @DynamicParameter(names = "--conf", description = "HDFS client configuration. Can be repeated.") public Map mConf = new HashMap<>(); diff --git a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java index 37bf82771267..cc775ac6ef19 100644 --- a/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java +++ b/dora/stress/shell/src/main/java/alluxio/stress/cli/worker/StressWorkerBench.java @@ -193,13 +193,29 @@ public void prepare() throws Exception { // default mode value: hash, using consistent hash switch (mParameters.mMode) { - case HASH: + case CONSISTENT: hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, - "alluxio.client.file.dora.ConsistentHashPolicy"); + "CONSISTENT"); + break; + case JUMP: + hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, + "JUMP"); + break; + case KETAMA: + hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, + "KETAMA"); + break; + case MAGLEV: + hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, + "MAGLEV"); + break; + case MULTI_PROBE: + hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, + "MULTI_PROBE"); break; case LOCAL_ONLY: hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, - "alluxio.client.file.dora.LocalWorkerPolicy"); + "LOCAL"); break; case REMOTE_ONLY: // if is cluster run and cluster size = 1, REMOTE_ONLY is not supported. @@ -207,7 +223,7 @@ public void prepare() throws Exception { throw new IllegalArgumentException("Cluster size is 1. REMOTE_ONLY mode not supported."); } hdfsConf.set(PropertyKey.Name.USER_WORKER_SELECTION_POLICY, - "alluxio.client.file.dora.RemoteOnlyPolicy"); + "REMOTE"); break; default: throw new IllegalArgumentException("Unrecognized mode" + mParameters.mMode);