Skip to content

Commit

Permalink
updated
Browse files Browse the repository at this point in the history
  • Loading branch information
JiamingMai committed Oct 25, 2023
1 parent 1b8a977 commit 1409f44
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 19 deletions.
1 change: 1 addition & 0 deletions common/transport/src/main/proto/proto/journal/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message LoadJobEntry {
optional int64 end_time = 8;
optional bool load_metadata_only = 9;
optional bool skip_if_exists = 10;
optional string file_filter_regx = 11;
}

// next available id: 13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {
private static final int RETRY_ATTEMPT_THRESHOLD = Configuration.getInt(
PropertyKey.MASTER_DORA_LOAD_JOB_RETRIES);
private final boolean mSkipIfExists;

private final Optional<String> mFileFilterRegx;
private final long mVirtualBlockSize = Configuration.getBytes(
PropertyKey.DORA_READ_VIRTUAL_BLOCK_SIZE);
private Iterator<LoadSubTask> mCurrentSubTaskIterator;
Expand All @@ -135,12 +137,14 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {
* @param verificationEnabled whether to verify the job after loaded
* @param loadMetadataOnly if set to true, only metadata will be loaded without loading
* @param skipIfExists skip if exists
* @param fileFilterRegx the regx pattern string for file filter
* @param ufsStatusIterator ufsStatus iterable
* @param ufs under file system
*/
public DoraLoadJob(String path, Optional<String> user, String jobId, OptionalLong bandwidth,
boolean usePartialListing, boolean verificationEnabled, boolean loadMetadataOnly,
boolean skipIfExists, Iterator<UfsStatus> ufsStatusIterator, UnderFileSystem ufs) {
boolean skipIfExists, Optional<String> fileFilterRegx, Iterator<UfsStatus> ufsStatusIterator,
UnderFileSystem ufs) {
super(user, jobId, new HashBasedWorkerAssignPolicy());
mLoadPath = requireNonNull(path, "path is null");
Preconditions.checkArgument(
Expand All @@ -152,6 +156,7 @@ public DoraLoadJob(String path, Optional<String> user, String jobId, OptionalLon
mUfs = ufs;
mLoadMetadataOnly = loadMetadataOnly;
mSkipIfExists = skipIfExists;
mFileFilterRegx = fileFilterRegx;
mUfsStatusIterator = ufsStatusIterator;
LOG.info("DoraLoadJob for {} created.", path);
}
Expand Down Expand Up @@ -473,6 +478,7 @@ public Journal.JournalEntry toJournalEntry() {
.setVerify(mVerificationEnabled)
.setSkipIfExists(mSkipIfExists)
.setJobId(mJobId);
mFileFilterRegx.ifPresent(jobEntry::setFileFilterRegx);
mUser.ifPresent(jobEntry::setUser);
mBandwidth.ifPresent(jobEntry::setBandwidth);
mEndTime.ifPresent(jobEntry::setEndTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import alluxio.annotation.SuppressFBWarnings;
import alluxio.conf.Configuration;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.master.predicate.FilePredicate;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobFactory;
import alluxio.scheduler.job.JobState;
Expand All @@ -28,6 +29,7 @@

import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Predicate;

/**
* Factory for creating {@link LoadJob}s from journal entries.
Expand All @@ -54,15 +56,30 @@ public Job<?> create() {
String path = mJobEntry.getLoadPath();
UnderFileSystem ufs = mFsMaster.getUfsManager().getOrAdd(new AlluxioURI(path),
UnderFileSystemConfiguration.defaults(Configuration.global()));

Predicate<UfsStatus> predicate = Predicates.alwaysTrue();
Optional<String> fileFilterRegx = Optional.empty();
if (mJobEntry.hasFileFilterRegx()) {
String regxPatternStr = mJobEntry.getFileFilterRegx();
if (regxPatternStr != null && !regxPatternStr.isEmpty()) {
alluxio.proto.journal.Job.FileFilter.Builder builder =
alluxio.proto.journal.Job.FileFilter.newBuilder()
.setName("fileNamePattern").setValue(regxPatternStr);
FilePredicate filePredicate = FilePredicate.create(builder.build());
predicate = filePredicate.getUfsStatusPredicate();
fileFilterRegx = Optional.of(regxPatternStr);
}
}

Iterable<UfsStatus> iterable = new UfsStatusIterable(ufs, path,
Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName),
Predicates.alwaysTrue());
predicate);
Optional<String> user =
mJobEntry.hasUser() ? Optional.of(mJobEntry.getUser()) : Optional.empty();
DoraLoadJob job = new DoraLoadJob(path, user, mJobEntry.getJobId(),
mJobEntry.hasBandwidth() ? OptionalLong.of(mJobEntry.getBandwidth()) : OptionalLong.empty(),
mJobEntry.getPartialListing(), mJobEntry.getVerify(), mJobEntry.getLoadMetadataOnly(),
mJobEntry.getSkipIfExists(), iterable.iterator(), ufs);
mJobEntry.getSkipIfExists(), fileFilterRegx, iterable.iterator(), ufs);
job.setJobState(JobState.fromProto(mJobEntry.getState()), false);
if (mJobEntry.hasEndTime()) {
job.setEndTime(mJobEntry.getEndTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public Job<?> create() {
.map(User::getName);

Predicate<UfsStatus> predicate = Predicates.alwaysTrue();
Optional<String> fileFilterRegx = Optional.empty();
if (options.hasFileFilterRegx()) {
String regxPatternStr = options.getFileFilterRegx();
if (regxPatternStr != null && !regxPatternStr.isEmpty()) {
Expand All @@ -71,6 +72,7 @@ public Job<?> create() {
.setName("fileNamePattern").setValue(regxPatternStr);
FilePredicate filePredicate = FilePredicate.create(builder.build());
predicate = filePredicate.getUfsStatusPredicate();
fileFilterRegx = Optional.of(regxPatternStr);
}
}

Expand All @@ -81,7 +83,7 @@ public Job<?> create() {
predicate);
return new DoraLoadJob(path, user, UUID.randomUUID().toString(), bandwidth, partialListing,
verificationEnabled, options.getLoadMetadataOnly(), options.getSkipIfExists(),
iterable.iterator(), ufs);
fileFilterRegx, iterable.iterator(), ufs);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testGetNextTaskWithVirtualBlocks() throws IOException {
Predicates.alwaysTrue()).iterator();
DoraLoadJob loadJob =
new DoraLoadJob(mLocalUfsRoot, Optional.of("user"), "1", OptionalLong.empty(), false, true,
false, false, iterator, mLocalUfs);
false, false, Optional.empty(), iterator, mLocalUfs);
Collection<WorkerInfo> workers = ImmutableList.of(
new WorkerInfo().setId(1).setAddress(
new WorkerNetAddress().setHost("worker1").setRpcPort(1234)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ public void testSubmit() throws Exception {
Optional<String> user = Optional.of("user");
UnderFileSystem ufs = mock(UnderFileSystem.class);
DoraLoadJob loadJob =
new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.empty(), false, true,
false, false, Collections.emptyIterator(), ufs);
new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.empty(),
false, true, false, false,
Optional.empty(), Collections.emptyIterator(), ufs);
assertTrue(scheduler.submitJob(loadJob));
assertTrue(jobMetaStore.get(loadJob.getJobId()).getJobState() == JobState.RUNNING);
assertEquals(1, scheduler.getJobs().size());
Expand All @@ -176,8 +177,10 @@ public void testSubmit() throws Exception {
DoraLoadJob job = (DoraLoadJob) scheduler.getJobs().get(loadJob.getDescription());
assertEquals(OptionalLong.empty(), job.getBandwidth());
loadJob =
new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.of(1000), true, false,
false, false, Collections.emptyIterator(), ufs);
new DoraLoadJob(validLoadPath, Optional.of("user"), "1",
OptionalLong.of(1000), true, false,
false, false, Optional.empty(),
Collections.emptyIterator(), ufs);
assertFalse(scheduler.submitJob(loadJob));
assertEquals(1, scheduler.getJobs().size());
job = (DoraLoadJob) scheduler.getJobs().get(loadJob.getDescription());
Expand Down Expand Up @@ -205,8 +208,10 @@ public void testStop() {
Optional<String> user = Optional.of("user");
UnderFileSystem ufs = mock(UnderFileSystem.class);
DoraLoadJob job =
new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.of(100), false, true,
false, false, Collections.emptyIterator(), ufs);
new DoraLoadJob(validLoadPath, Optional.of("user"), "1",
OptionalLong.of(100), false, true,
false, false, Optional.empty(),
Collections.emptyIterator(), ufs);
assertTrue(scheduler.submitJob(job));

assertTrue(scheduler.stopJob(job.getDescription()));
Expand Down Expand Up @@ -240,13 +245,15 @@ public void testSubmitExceedsCapacity() throws Exception {
i -> {
String path = String.format("/path/to/load/%d", i);
assertTrue(scheduler.submitJob(
new DoraLoadJob(path, Optional.of("user"), "1", OptionalLong.empty(), false, true,
false, false, Collections.emptyIterator(), ufs)
new DoraLoadJob(path, Optional.of("user"), "1", OptionalLong.empty(),
false, true, false, false,
Optional.empty(), Collections.emptyIterator(), ufs)
));
});
assertThrows(ResourceExhaustedRuntimeException.class, () -> scheduler.submitJob(
new DoraLoadJob("/path/to/load/101", Optional.of("user"), "1", OptionalLong.empty(), false,
true, false, false, Collections.emptyIterator(), ufs)));
new DoraLoadJob("/path/to/load/101", Optional.of("user"), "1",
OptionalLong.empty(), false, true, false,
false, Optional.empty(), Collections.emptyIterator(), ufs)));
}

@Ignore
Expand Down Expand Up @@ -504,7 +511,7 @@ public void testJobRetention() throws Exception {
UnderFileSystem ufs = mock(UnderFileSystem.class);
assertTrue(scheduler.submitJob(
new DoraLoadJob(path, Optional.of("user"), "1", OptionalLong.empty(), false, true, false,
false, Collections.emptyIterator(), ufs)));
false, Optional.empty(), Collections.emptyIterator(), ufs)));
});
assertEquals(5, scheduler.getJobs().size());
scheduler.getJobs().get(JobDescription.newBuilder().setPath("/load/1").setType("load").build())
Expand Down Expand Up @@ -539,7 +546,7 @@ public void testStopScheduler() {
UnderFileSystem ufs = mock(UnderFileSystem.class);
DoraLoadJob job =
new DoraLoadJob(path, Optional.of("user"), "5", OptionalLong.of(100), false, true, false,
false, Collections.emptyIterator(), ufs);
false, Optional.empty(), Collections.emptyIterator(), ufs);
scheduler.start();
scheduler.submitJob(job);
assertEquals(1, scheduler.getJobs().size());
Expand All @@ -548,7 +555,7 @@ public void testStopScheduler() {
assertEquals(1, metaStore.getJobs().size());
DoraLoadJob job2 =
new DoraLoadJob("new", Optional.of("user"), "6", OptionalLong.of(100), false, true, false,
false, Collections.emptyIterator(), ufs);
false, Optional.empty(), Collections.emptyIterator(), ufs);
metaStore.updateJob(job2);
assertEquals(0, scheduler.getJobs().size());
assertEquals(2, metaStore.getJobs().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public final class LoadCommand extends AbstractFileSystemCommand {
private static final Option FILE_FILTER_REGX = Option.builder()
.longOpt("file-filter-regx")
.required(false)
.hasArg(false)
.hasArg(true)
.desc("If specified, skip files that doesn't match the regx pattern.")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,32 @@ public void testCommand() throws Exception {
mFsShell.run("load", path, "--progress", "--format", "JSON", "--verbose");
assertTrue(mOutput.toString().contains("\"mVerbose\":true"));
}

@Test
public void testRegxPatternFileFilter() throws Exception {
File testRoot = mTestFolder.newFolder("testRoot");
mTestFolder.newFolder("testRoot/testDirectory");
String path = testRoot.getAbsolutePath();
createByteFileInUfs("/testRoot/testFileA", Constants.MB);
createByteFileInUfs("/testRoot/testFileB", Constants.MB);
createByteFileInUfs("/testRoot/testDirectory/testFileC", Constants.MB);

AlluxioURI uriA = new AlluxioURI("/testRoot/testFileA");
AlluxioURI uriB = new AlluxioURI("/testRoot/testFileB");
AlluxioURI uriC = new AlluxioURI("/testRoot/testDirectory/testFileC");

assertEquals(0, mFileSystem.getStatus(uriA).getInAlluxioPercentage());
assertEquals(0, mFileSystem.getStatus(uriB).getInAlluxioPercentage());
assertEquals(0, mFileSystem.getStatus(uriC).getInAlluxioPercentage());
// Testing loading of a directory

assertEquals(0, mFsShell.run("load", path, "--submit", "--verify",
"--file-filter-regx", ".*B"));
assertEquals(0, mFsShell.run("load", path, "--progress"));
while (!mOutput.toString().contains("SUCCEEDED")) {
assertEquals(0, mFsShell.run("load", path, "--progress"));
Thread.sleep(1000);
}
assertTrue(mOutput.toString().contains("Inodes Processed: 1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class WorkerHttpServerIntegrationTest {
public LocalAlluxioClusterResource mLocalAlluxioClusterResource =
new LocalAlluxioClusterResource.Builder()
.setProperty(PropertyKey.WORKER_HTTP_SERVER_PORT, mHttpServerPort)
.setProperty(PropertyKey.UNDERFS_XATTR_CHANGE_ENABLED, false)
.build();

private static final String TEST_CONTENT = "test-content";
Expand Down

0 comments on commit 1409f44

Please sign in to comment.