From 6a9e7c05a6e7f55b030a66c5d8703c2d588ce8c4 Mon Sep 17 00:00:00 2001 From: Yaolong Liu Date: Wed, 13 Dec 2023 19:16:54 +0800 Subject: [PATCH] address review comment --- .../alluxio/client/file/BaseFileSystem.java | 22 ++++++++++++------- .../main/java/alluxio/conf/PropertyKey.java | 12 +++++----- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/core/client/fs/src/main/java/alluxio/client/file/BaseFileSystem.java b/core/client/fs/src/main/java/alluxio/client/file/BaseFileSystem.java index 318e7189f30e..a36f628edfa7 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/BaseFileSystem.java +++ b/core/client/fs/src/main/java/alluxio/client/file/BaseFileSystem.java @@ -288,14 +288,14 @@ public List getBlockLocations(URIStatus status) if (locations.isEmpty() && mFsContext.getPathConf(new AlluxioURI(status.getPath())) .getBoolean(PropertyKey.USER_UFS_BLOCK_LOCATION_ALL_FALLBACK_ENABLED)) { // Case 2: Fallback to add all workers to locations so some apps (Impala) won't panic. - List addresses = new ArrayList<>(getHostWorkerMap().values()); - Collections.shuffle(addresses); - - int count = mFsContext.getClusterConf().getInt( - PropertyKey.USER_UFS_BLOCK_LOCATION_RETURN_COUNT); - count = count >= 0 ? count : Integer.MAX_VALUE; - addresses = addresses.subList(0, Math.min(addresses.size(), count)); - locations.addAll(addresses); + PropertyKey locKey = PropertyKey.USER_UFS_BLOCK_LOCATION_RETURN_LIMIT; + int count = mFsContext.getClusterConf().getInt(locKey); + if (count < 0) { + throw new IllegalArgumentException("Property" + locKey.getName() + + " should not be set to a negative number"); + } + List addresses = getShuffleWorkerAddressList(); + locations.addAll(addresses.subList(0, Math.min(addresses.size(), count))); } } blockLocations.add(new BlockLocationInfo(fileBlockInfo, locations)); @@ -303,6 +303,12 @@ public List getBlockLocations(URIStatus status) return blockLocations; } + private List getShuffleWorkerAddressList() throws IOException { + List workers = mFsContext.getCachedWorkers(); + Collections.shuffle(workers); + return workers.stream().map(BlockWorkerInfo::getNetAddress).collect(toList()); + } + private Map getHostWorkerMap() throws IOException { List workers = mFsContext.getCachedWorkers(); return workers.stream().collect( diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index b5e4e2fd4997..18852bc6facf 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -6843,12 +6843,12 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); - public static final PropertyKey USER_UFS_BLOCK_LOCATION_RETURN_COUNT = - intBuilder(Name.USER_UFS_BLOCK_LOCATION_RETURN_COUNT) - .setDefaultValue(-1) + public static final PropertyKey USER_UFS_BLOCK_LOCATION_RETURN_LIMIT = + intBuilder(Name.USER_UFS_BLOCK_LOCATION_RETURN_LIMIT) + .setDefaultValue(Integer.MAX_VALUE) .setDescription("The return count of workers as block location if ufs block locations " + "are not co-located with any Alluxio workers or is empty. This item should be " - + "greater than or equal to -1 and '-1' means return all workers") + + "greater than or equal to 0 and " + Integer.MAX_VALUE + " means return all workers") .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); @@ -9091,8 +9091,8 @@ public static final class Name { public static final String USER_RPC_RETRY_MAX_SLEEP_MS = "alluxio.user.rpc.retry.max.sleep"; public static final String USER_UFS_BLOCK_LOCATION_ALL_FALLBACK_ENABLED = "alluxio.user.ufs.block.location.all.fallback.enabled"; - public static final String USER_UFS_BLOCK_LOCATION_RETURN_COUNT = - "alluxio.user.block.location.return.count"; + public static final String USER_UFS_BLOCK_LOCATION_RETURN_LIMIT = + "alluxio.user.block.location.return.limit"; public static final String USER_UFS_BLOCK_READ_LOCATION_POLICY = "alluxio.user.ufs.block.read.location.policy"; public static final String USER_UFS_BLOCK_READ_LOCATION_POLICY_DETERMINISTIC_HASH_SHARDS =