Skip to content

Commit

Permalink
Merge branch 'feature/WorkerMemoryOptimization' of https://github.com…
Browse files Browse the repository at this point in the history
…/thu-david/alluxio into feature/WorkerMemoryOptimization
  • Loading branch information
thu-david committed Oct 21, 2024
2 parents 76435ee + 56d54fd commit 4facc1f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import alluxio.file.NettyBufTargetBuffer;
import alluxio.file.ReadTargetBuffer;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.uri.UfsUrl;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.BufferUtils;
Expand All @@ -55,8 +56,10 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.DefaultFileRegion;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -65,6 +68,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -1203,6 +1207,59 @@ public void getDataFileChannel() throws Exception {
assertArrayEquals(PAGE1, bytes);
}

@Ignore("Remove this annotation once OS worker supports the prefix search.")
@Test
public void searchPagesByPrefix() throws Exception {
mCacheManager = createLocalCacheManager();

String scheme = "file:///";
String prefix1 = "prefix1";
String prefix2 = "prefix2";
String prefix3 = "q";
String prefix4 = "~";

int fileNum = 10;
cachePrefixPages(prefix1, fileNum);
cachePrefixPages(prefix2, fileNum);
cachePrefixPages(prefix3, fileNum);
cachePrefixPages(prefix4, fileNum);

UfsUrl rootPrefix = UfsUrl.createInstance(scheme);

checkPrefixSearch(rootPrefix, rootPrefix.join(prefix1));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix2));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix3));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix4));
}

private void cachePrefixPages(String prefix, int fileNum) {
String scheme = "file:///";
for (int i = 0; i < fileNum; i++) {
String fileId = scheme + prefix + "_" + i;
PageId pageId = new PageId(fileId, 0);
mCacheManager.put(pageId, PAGE1);
}
}

private void checkPrefixSearch(UfsUrl rootUrl, UfsUrl prefixUrl) {
Collection<PageInfo> rootResult = mCacheManager.getPageInfoByPrefix(rootUrl);
Assert.assertFalse(rootResult.isEmpty());

Collection<PageInfo> prefixSearchResult = mCacheManager.getPageInfoByPrefix(prefixUrl);
Assert.assertFalse(prefixSearchResult.isEmpty());

for (PageInfo p : prefixSearchResult) {
Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
}
for (PageInfo p : rootResult) {
if (prefixSearchResult.contains(p)) {
Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
} else {
Assert.assertFalse(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
}
}
}

/**
* A PageStore where put can throw IOException on put or delete.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import alluxio.client.file.cache.CacheManager;
import alluxio.client.file.cache.CacheManagerOptions;
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageInfo;
import alluxio.client.file.cache.PageMetaStore;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
Expand Down Expand Up @@ -54,9 +53,7 @@
import alluxio.metrics.MetricsSystem;
import alluxio.security.authorization.Mode;
import alluxio.underfs.UfsStatus;
import alluxio.uri.UfsUrl;
import alluxio.util.io.BufferUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.WorkerIdentity;
import alluxio.worker.block.BlockMasterClientPool;

Expand All @@ -78,7 +75,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -1012,53 +1008,4 @@ public int readInternal(long position, ReadTargetBuffer buffer, int length) {
return size;
}
}

@Test
public void searchPagesByPrefix() throws Exception {
String prefix1 = "prefix1";
String prefix2 = "prefix2";
String prefix3 = "q";
String prefix4 = "~";

UfsUrl rootPrefix = PathUtils.convertUfsPathToUfsUrl(
mTestFolder.getRoot().getAbsolutePath() + "/");

int fileNum = 10;
createFilesWithPrefix(prefix1, fileNum);
createFilesWithPrefix(prefix2, fileNum);
createFilesWithPrefix(prefix3, fileNum);
createFilesWithPrefix(prefix4, fileNum);

checkPrefixSearch(rootPrefix, rootPrefix.join(prefix1));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix2));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix3));
checkPrefixSearch(rootPrefix, rootPrefix.join(prefix4));
}

private void checkPrefixSearch(UfsUrl rootUrl, UfsUrl prefixUrl) {
Set<PageInfo> rootResultSet = mWorker.getPageInfoByPrefix(rootUrl);

Set<PageInfo> prefixSearchSet = mWorker.getPageInfoByPrefix(prefixUrl);

for (PageInfo p : prefixSearchSet) {
Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
}
for (PageInfo p : rootResultSet) {
if (prefixSearchSet.contains(p)) {
Assert.assertTrue(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
} else {
Assert.assertFalse(p.getUfsUrl().toString().startsWith(prefixUrl.toString()));
}
}
}

private void createFilesWithPrefix(String prefix, int fileNum)
throws AccessControlException, IOException, ExecutionException, InterruptedException,
TimeoutException {
for (int i = 0; i < fileNum; i++) {
File f = mTestFolder.newFile(prefix + "_" + i);
Files.write(f.toPath(), mFileContent.getBytes());
loadFileData(f.getPath());
}
}
}

0 comments on commit 4facc1f

Please sign in to comment.