Skip to content

Commit

Permalink
address review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
codings-dan committed Dec 13, 2023
1 parent 25294cb commit 940bbe6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,21 +288,27 @@ public List<BlockLocationInfo> getBlockLocations(URIStatus status)
if (locations.isEmpty() && mFsContext.getPathConf(new AlluxioURI(status.getPath()))
.getBoolean(PropertyKey.USER_UFS_BLOCK_LOCATION_ALL_FALLBACK_ENABLED)) {
// Case 2: Fallback to add all workers to locations so some apps (Impala) won't panic.
List<WorkerNetAddress> addresses = new ArrayList<>(getHostWorkerMap().values());
Collections.shuffle(addresses);

int count = mFsContext.getClusterConf().getInt(
PropertyKey.USER_UFS_BLOCK_LOCATION_RETURN_COUNT);
count = count >= 0 ? count : Integer.MAX_VALUE;
addresses = addresses.subList(0, Math.min(addresses.size(), count));
locations.addAll(addresses);
PropertyKey locKey = PropertyKey.USER_UFS_BLOCK_LOCATION_RETURN_LIMIT;
int count = mFsContext.getClusterConf().getInt(locKey);
if (count < 0) {
throw new IllegalArgumentException("Property" + locKey.getName()
+ " should not be set to a negative number");
}
List<WorkerNetAddress> addresses = getShuffleWorkerAddressList();
locations.addAll(addresses.subList(0, Math.min(addresses.size(), count)));
}
}
blockLocations.add(new BlockLocationInfo(fileBlockInfo, locations));
}
return blockLocations;
}

private List<WorkerNetAddress> getShuffleWorkerAddressList() throws IOException {
List<BlockWorkerInfo> workers = mFsContext.getCachedWorkers();
Collections.shuffle(workers);
return workers.stream().map(BlockWorkerInfo::getNetAddress).collect(toList());
}

private Map<String, WorkerNetAddress> getHostWorkerMap() throws IOException {
List<BlockWorkerInfo> workers = mFsContext.getCachedWorkers();
return workers.stream().collect(
Expand Down
12 changes: 6 additions & 6 deletions core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -6843,12 +6843,12 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
public static final PropertyKey USER_UFS_BLOCK_LOCATION_RETURN_COUNT =
intBuilder(Name.USER_UFS_BLOCK_LOCATION_RETURN_COUNT)
.setDefaultValue(-1)
public static final PropertyKey USER_UFS_BLOCK_LOCATION_RETURN_LIMIT =
intBuilder(Name.USER_UFS_BLOCK_LOCATION_RETURN_LIMIT)
.setDefaultValue(Integer.MAX_VALUE)
.setDescription("The return count of workers as block location if ufs block locations "
+ "are not co-located with any Alluxio workers or is empty. This item should be "
+ "greater than or equal to -1 and '-1' means return all workers")
+ "greater than or equal to 0 and " + Integer.MAX_VALUE + " means return all workers")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.CLIENT)
.build();
Expand Down Expand Up @@ -9091,8 +9091,8 @@ public static final class Name {
public static final String USER_RPC_RETRY_MAX_SLEEP_MS = "alluxio.user.rpc.retry.max.sleep";
public static final String USER_UFS_BLOCK_LOCATION_ALL_FALLBACK_ENABLED =
"alluxio.user.ufs.block.location.all.fallback.enabled";
public static final String USER_UFS_BLOCK_LOCATION_RETURN_COUNT =
"alluxio.user.block.location.return.count";
public static final String USER_UFS_BLOCK_LOCATION_RETURN_LIMIT =
"alluxio.user.block.location.return.limit";
public static final String USER_UFS_BLOCK_READ_LOCATION_POLICY =
"alluxio.user.ufs.block.read.location.policy";
public static final String USER_UFS_BLOCK_READ_LOCATION_POLICY_DETERMINISTIC_HASH_SHARDS =
Expand Down

0 comments on commit 940bbe6

Please sign in to comment.