Skip to content

Commit

Permalink
Support Various Kinds of Consistent Hash
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Add Ketama Hashing, Jump Consistent Hashing, Maglev Hashing, and Multi Probe Hashing.

### Why are the changes needed?

Now alluxio's user worker selection policy is Consistent Hash Policy.  It bings too much time cost, and it is not enough uniform, and not strictly consistent.

Ketama: https://github.com/RJ/ketama
Jump Consistent Hashing: https://arxiv.org/pdf/1406.2294.pdf
Maglev Hashing: https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/44824.pdf
Multi Probe Hasing: https://arxiv.org/pdf/1505.00062.pdf

We strongly recommend using Maglev Hashing for User Worker Selection Policy. Under most situation, it has the minimum time cost, and it is the most uniform and balanced hashing policy.

### Does this PR introduce any user facing changes?

`alluxio.user.worker.selection.policy` has the following values: `CONSISTENT`, `JUMP`, `KETAMA`, `MAGLEV`, `MULTI_PROBE`, `LOCAL`, `REMOTE_ONLY`, corresponding to consistent hash policy, maglev hash policy, ketama hash policy, maglev hash policy, multi-probe respectively hash policy, local worker policy, remote only policy.

The current default value is `CONSISTENT`.

We recommend using Maglev Hash, which has the best hash consistency and is the least time-consuming. That is to say, set the value of `alluxio.user.worker.selection.policy` to `MAGLEV`. We will also consider setting this as the default value in the future.

**Ketama Hasing**
`alluxio.user.ketama.hash.replicas`: This is the value of replicas in the ketama hashing algorithm. When workers changes, it will guarantee the hash table is changed only in a minimal. The value of replicas should be X times the physical nodes in the cluster, where X is a balance between efficiency and cost.

**Jump Consistent Hashing**
None.

**Maglev Hashing**
`alluxio.user.maglev.hash.lookup.size`: This is the size of the lookup table in the maglev hashing algorithm. It must be a prime number. In the maglev hashing, it will generate a lookup table for workers. The bigger the size of the lookup table, the smaller the variance of this hashing algorithm will be. But bigger look up table will consume more time and memory.

**Multi Probe Hashing**
`alluxio.user.multi.probe.hash.probe.num`: This is the number of probes in the multi-probe hashing algorithm. In the multi-probe hashing algorithm, the bigger the number of probes, the smaller the variance of this hashing algorithm will be. But more probes will consume more time and memory.


			pr-link: #17817
			change-id: cid-bad21c6e5ad83eb3da15a8960ba372b14c67b081
  • Loading branch information
Zihao Zhao authored Jan 10, 2024
1 parent 9ef7552 commit b9de24c
Show file tree
Hide file tree
Showing 24 changed files with 2,480 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,18 @@
*/
public class ConsistentHashPolicy implements WorkerLocationPolicy {
private static final Logger LOG = LoggerFactory.getLogger(ConsistentHashPolicy.class);
private final ConsistentHashProvider mHashProvider =
new ConsistentHashProvider(100, Constants.SECOND_MS);
/**
* This is the number of virtual nodes in the consistent hashing algorithm.
* In a consistent hashing algorithm, on membership changes, some virtual nodes are
* re-distributed instead of rebuilding the whole hash table.
* This guarantees the hash table is changed only in a minimal.
* In order to achieve that, the number of virtual nodes should be X times the physical nodes
* in the cluster, where X is a balance between redistribution granularity and size.
*/
private final int mNumVirtualNodes;
private final ConsistentHashProvider mHashProvider;

/**
* Constructs a new {@link ConsistentHashPolicy}.
*
* @param conf the configuration used by the policy
*/
public ConsistentHashPolicy(AlluxioConfiguration conf) {
mNumVirtualNodes = conf.getInt(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER);
LOG.debug("%s is chosen for user worker hash algorithm",
conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY));
mHashProvider = new ConsistentHashProvider(100, Constants.SECOND_MS,
conf.getInt(PropertyKey.USER_CONSISTENT_HASH_VIRTUAL_NODE_COUNT_PER_WORKER));
}

@Override
Expand All @@ -69,7 +62,7 @@ public List<BlockWorkerInfo> getPreferredWorkers(WorkerClusterView workerCluster
workerClusterView.size(), count));
}
Set<WorkerIdentity> workerIdentities = workerClusterView.workerIds();
mHashProvider.refresh(workerIdentities, mNumVirtualNodes);
mHashProvider.refresh(workerIdentities);
List<WorkerIdentity> workers = mHashProvider.getMultiple(fileId, count);
if (workers.size() != count) {
throw new ResourceExhaustedException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,27 @@ public class ConsistentHashProvider {
*/
private final Object mInitLock = new Object();

/**
* This is the number of virtual nodes in the consistent hashing algorithm.
* In a consistent hashing algorithm, on membership changes, some virtual nodes are
* re-distributed instead of rebuilding the whole hash table.
* This guarantees the hash table is changed only in a minimal.
* In order to achieve that, the number of virtual nodes should be X times the physical nodes
* in the cluster, where X is a balance between redistribution granularity and size.
*/
private final int mNumVirtualNodes;

/**
* Constructor.
*
* @param maxAttempts max attempts to rehash
* @param workerListTtlMs interval between retries
* @param numVirtualNodes number of virtual nodes
*/
public ConsistentHashProvider(int maxAttempts, long workerListTtlMs) {
public ConsistentHashProvider(int maxAttempts, long workerListTtlMs, int numVirtualNodes) {
mMaxAttempts = maxAttempts;
mWorkerInfoUpdateIntervalNs = workerListTtlMs * Constants.MS_NANO;
mNumVirtualNodes = numVirtualNodes;
}

/**
Expand Down Expand Up @@ -130,12 +142,11 @@ public List<WorkerIdentity> getMultiple(String key, int count) {
* others will not change the internal state of the hash provider.
*
* @param workers the up-to-date worker list
* @param numVirtualNodes the number of virtual nodes used by consistent hashing
*/
public void refresh(Set<WorkerIdentity> workers, int numVirtualNodes) {
public void refresh(Set<WorkerIdentity> workers) {
Preconditions.checkArgument(!workers.isEmpty(),
"cannot refresh hash provider with empty worker list");
maybeInitialize(workers, numVirtualNodes);
maybeInitialize(workers);
// check if the worker list has expired
if (shouldRebuildActiveNodesMapExclusively()) {
// thread safety is valid provided that build() takes less than
Expand All @@ -144,7 +155,7 @@ public void refresh(Set<WorkerIdentity> workers, int numVirtualNodes) {
Set<WorkerIdentity> lastWorkerIds = mLastWorkers.get();
if (!workers.equals(lastWorkerIds)) {
Set<WorkerIdentity> newWorkerIds = ImmutableSet.copyOf(workers);
NavigableMap<Integer, WorkerIdentity> nodes = build(newWorkerIds, numVirtualNodes);
NavigableMap<Integer, WorkerIdentity> nodes = build(newWorkerIds, mNumVirtualNodes);
mActiveNodesByConsistentHashing = nodes;
mLastWorkers.set(newWorkerIds);
mUpdateCount.increment();
Expand Down Expand Up @@ -176,14 +187,14 @@ private boolean shouldRebuildActiveNodesMapExclusively() {
* Only one caller gets to initialize the map while all others are blocked.
* After the initialization, the map must not be null.
*/
private void maybeInitialize(Set<WorkerIdentity> workers, int numVirtualNodes) {
private void maybeInitialize(Set<WorkerIdentity> workers) {
if (mActiveNodesByConsistentHashing == null) {
synchronized (mInitLock) {
// only one thread should reach here
// test again to skip re-initialization
if (mActiveNodesByConsistentHashing == null) {
Set<WorkerIdentity> workerIdentities = ImmutableSet.copyOf(workers);
mActiveNodesByConsistentHashing = build(workerIdentities, numVirtualNodes);
mActiveNodesByConsistentHashing = build(workerIdentities, mNumVirtualNodes);
mLastWorkers.set(workerIdentities);
mLastUpdatedTimestamp.set(System.nanoTime());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file.dora;

import alluxio.Constants;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.membership.WorkerClusterView;
import alluxio.wire.WorkerIdentity;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerState;

import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* An impl of Jump Consistent Hash Policy.
*
* A policy where a file path is matched to worker(s) by Jump Consistent Hashing Algorithm.
* The algorithm is described in this paper:
* https://arxiv.org/pdf/1406.2294.pdf
*
* The disadvantage of this algorithm is that
* buckets can only be inserted and deleted at the head and tail of the worker list
* to maintain hash consistency,
* that is, nodes can only be added and deleted at the head and tail of the worker list.
*/
public class JumpHashPolicy implements WorkerLocationPolicy {
private static final Logger LOG = LoggerFactory.getLogger(JumpHashPolicy.class);
private final JumpHashProvider mHashProvider;

/**
* Constructs a new {@link JumpHashPolicy}.
*
* @param conf the configuration used by the policy
*/
public JumpHashPolicy(AlluxioConfiguration conf) {
LOG.debug("%s is chosen for user worker hash algorithm",
conf.getString(PropertyKey.USER_WORKER_SELECTION_POLICY));
mHashProvider = new JumpHashProvider(100, Constants.SECOND_MS);
}

@Override
public List<BlockWorkerInfo> getPreferredWorkers(WorkerClusterView workerClusterView,
String fileId, int count) throws ResourceExhaustedException {
if (workerClusterView.size() < count) {
throw new ResourceExhaustedException(String.format(
"Not enough workers in the cluster %d workers in the cluster but %d required",
workerClusterView.size(), count));
}
Set<WorkerIdentity> workerIdentities = workerClusterView.workerIds();
mHashProvider.refresh(workerIdentities);
List<WorkerIdentity> workers = mHashProvider.getMultiple(fileId, count);
if (workers.size() != count) {
throw new ResourceExhaustedException(String.format(
"Found %d workers from the hash ring but %d required", workers.size(), count));
}
ImmutableList.Builder<BlockWorkerInfo> builder = ImmutableList.builder();
for (WorkerIdentity worker : workers) {
Optional<WorkerInfo> optionalWorkerInfo = workerClusterView.getWorkerById(worker);
final WorkerInfo workerInfo;
if (optionalWorkerInfo.isPresent()) {
workerInfo = optionalWorkerInfo.get();
} else {
// the worker returned by the policy does not exist in the cluster view
// supplied by the client.
// this can happen when the membership changes and some callers fail to update
// to the latest worker cluster view.
// in this case, just skip this worker
LOG.debug("Inconsistency between caller's view of cluster and that of "
+ "the consistent hash policy's: worker {} selected by policy does not exist in "
+ "caller's view {}. Skipping this worker.",
worker, workerClusterView);
continue;
}

BlockWorkerInfo blockWorkerInfo = new BlockWorkerInfo(
worker, workerInfo.getAddress(), workerInfo.getCapacityBytes(),
workerInfo.getUsedBytes(), workerInfo.getState() == WorkerState.LIVE
);
builder.add(blockWorkerInfo);
}
List<BlockWorkerInfo> infos = builder.build();
return infos;
}
}
Loading

0 comments on commit b9de24c

Please sign in to comment.