diff --git a/cli/src/alluxio.org/cli/cmd/job/load.go b/cli/src/alluxio.org/cli/cmd/job/load.go index 5d087a7f0416..4ffe47650422 100644 --- a/cli/src/alluxio.org/cli/cmd/job/load.go +++ b/cli/src/alluxio.org/cli/cmd/job/load.go @@ -37,6 +37,7 @@ type LoadCommand struct { partialListing bool metadataOnly bool skipIfExists bool + fileFilterRegx string } func (c *LoadCommand) Base() *env.BaseJavaCommand { @@ -82,6 +83,7 @@ $ ./bin/alluxio job load --path /path --stop`, cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details") cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata") cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files") + cmd.Flags().StringVar(&c.fileFilterRegx, "file-filter-regx", "", "[submit] Skip files that match the regx pattern") return cmd } @@ -104,5 +106,8 @@ func (c *LoadCommand) Run(_ []string) error { if c.skipIfExists { javaArgs = append(javaArgs, "--skip-if-exists") } + if c.fileFilterRegx != "" { + javaArgs = append(javaArgs, "--file-filter-regx") + } return c.Base().Run(javaArgs) } diff --git a/common/transport/src/main/proto/grpc/file_system_master.proto b/common/transport/src/main/proto/grpc/file_system_master.proto index eff3b7796b54..bb361a093bd2 100644 --- a/common/transport/src/main/proto/grpc/file_system_master.proto +++ b/common/transport/src/main/proto/grpc/file_system_master.proto @@ -608,7 +608,8 @@ message LoadJobPOptions { optional bool partialListing = 3; optional bool loadMetadataOnly = 4; optional bool skipIfExists = 5; - optional int32 replicas = 6; + optional string fileFilterRegx = 6; + optional int32 replicas = 7; } message CopyJobPOptions { diff --git a/common/transport/src/main/proto/proto/journal/job.proto b/common/transport/src/main/proto/proto/journal/job.proto index 919080a74a07..027c9e4c0d74 100644 --- a/common/transport/src/main/proto/proto/journal/job.proto +++ b/common/transport/src/main/proto/proto/journal/job.proto @@ -23,7 +23,8 @@ message LoadJobEntry { optional int64 end_time = 8; optional bool load_metadata_only = 9; optional bool skip_if_exists = 10; - optional int32 replicas = 11; + optional string file_filter_regx = 11; + optional int32 replicas = 12; } // next available id: 13 diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java index 47b4af7741d2..0a120414ceae 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java @@ -120,6 +120,8 @@ public class DoraLoadJob extends AbstractJob { private static final int RETRY_ATTEMPT_THRESHOLD = Configuration.getInt( PropertyKey.MASTER_DORA_LOAD_JOB_RETRIES); private final boolean mSkipIfExists; + + private final Optional mFileFilterRegx; private final long mVirtualBlockSize = Configuration.getBytes( PropertyKey.DORA_READ_VIRTUAL_BLOCK_SIZE); private Iterator mCurrentSubTaskIterator; @@ -135,12 +137,14 @@ public class DoraLoadJob extends AbstractJob { * @param verificationEnabled whether to verify the job after loaded * @param loadMetadataOnly if set to true, only metadata will be loaded without loading * @param skipIfExists skip if exists + * @param fileFilterRegx the regx pattern string for file filter * @param ufsStatusIterator ufsStatus iterable * @param ufs under file system */ public DoraLoadJob(String path, Optional user, String jobId, OptionalLong bandwidth, boolean usePartialListing, boolean verificationEnabled, boolean loadMetadataOnly, - boolean skipIfExists, Iterator ufsStatusIterator, UnderFileSystem ufs) { + boolean skipIfExists, Optional fileFilterRegx, Iterator ufsStatusIterator, + UnderFileSystem ufs) { super(user, jobId, new HashBasedWorkerAssignPolicy()); mLoadPath = requireNonNull(path, "path is null"); Preconditions.checkArgument( @@ -152,6 +156,7 @@ public DoraLoadJob(String path, Optional user, String jobId, OptionalLon mUfs = ufs; mLoadMetadataOnly = loadMetadataOnly; mSkipIfExists = skipIfExists; + mFileFilterRegx = fileFilterRegx; mUfsStatusIterator = ufsStatusIterator; LOG.info("DoraLoadJob for {} created.", path); } @@ -473,6 +478,7 @@ public Journal.JournalEntry toJournalEntry() { .setVerify(mVerificationEnabled) .setSkipIfExists(mSkipIfExists) .setJobId(mJobId); + mFileFilterRegx.ifPresent(jobEntry::setFileFilterRegx); mUser.ifPresent(jobEntry::setUser); mBandwidth.ifPresent(jobEntry::setBandwidth); mEndTime.ifPresent(jobEntry::setEndTime); diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/JournalLoadJobFactory.java b/dora/core/server/master/src/main/java/alluxio/master/job/JournalLoadJobFactory.java index eba989a9d823..cfed491a67c7 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/JournalLoadJobFactory.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/JournalLoadJobFactory.java @@ -15,6 +15,7 @@ import alluxio.annotation.SuppressFBWarnings; import alluxio.conf.Configuration; import alluxio.master.file.DefaultFileSystemMaster; +import alluxio.master.predicate.FilePredicate; import alluxio.scheduler.job.Job; import alluxio.scheduler.job.JobFactory; import alluxio.scheduler.job.JobState; @@ -28,6 +29,7 @@ import java.util.Optional; import java.util.OptionalLong; +import java.util.function.Predicate; /** * Factory for creating {@link LoadJob}s from journal entries. @@ -54,15 +56,28 @@ public Job create() { String path = mJobEntry.getLoadPath(); UnderFileSystem ufs = mFsMaster.getUfsManager().getOrAdd(new AlluxioURI(path), () -> UnderFileSystemConfiguration.defaults(Configuration.global())); + Predicate predicate = Predicates.alwaysTrue(); + Optional fileFilterRegx = Optional.empty(); + if (mJobEntry.hasFileFilterRegx()) { + String regxPatternStr = mJobEntry.getFileFilterRegx(); + if (regxPatternStr != null && !regxPatternStr.isEmpty()) { + alluxio.proto.journal.Job.FileFilter.Builder builder = + alluxio.proto.journal.Job.FileFilter.newBuilder() + .setName("fileNamePattern").setValue(regxPatternStr); + FilePredicate filePredicate = FilePredicate.create(builder.build()); + predicate = filePredicate.getUfsStatusPredicate(); + fileFilterRegx = Optional.of(regxPatternStr); + } + } Iterable iterable = new UfsStatusIterable(ufs, path, Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName), - Predicates.alwaysTrue()); + predicate); Optional user = mJobEntry.hasUser() ? Optional.of(mJobEntry.getUser()) : Optional.empty(); DoraLoadJob job = new DoraLoadJob(path, user, mJobEntry.getJobId(), mJobEntry.hasBandwidth() ? OptionalLong.of(mJobEntry.getBandwidth()) : OptionalLong.empty(), mJobEntry.getPartialListing(), mJobEntry.getVerify(), mJobEntry.getLoadMetadataOnly(), - mJobEntry.getSkipIfExists(), iterable.iterator(), ufs); + mJobEntry.getSkipIfExists(), fileFilterRegx, iterable.iterator(), ufs); job.setJobState(JobState.fromProto(mJobEntry.getState()), false); if (mJobEntry.hasEndTime()) { job.setEndTime(mJobEntry.getEndTime()); diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java b/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java index 66b8aa66b49d..262af7d962b0 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java @@ -16,6 +16,7 @@ import alluxio.grpc.LoadJobPOptions; import alluxio.job.LoadJobRequest; import alluxio.master.file.DefaultFileSystemMaster; +import alluxio.master.predicate.FilePredicate; import alluxio.scheduler.job.Job; import alluxio.scheduler.job.JobFactory; import alluxio.security.User; @@ -29,6 +30,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.UUID; +import java.util.function.Predicate; /** * Factory for creating {@link LoadJob}s that get file infos from master. @@ -60,14 +62,28 @@ public Job create() { .ofNullable(AuthenticatedClientUser.getOrNull()) .map(User::getName); + Predicate predicate = Predicates.alwaysTrue(); + Optional fileFilterRegx = Optional.empty(); + if (options.hasFileFilterRegx()) { + String regxPatternStr = options.getFileFilterRegx(); + if (regxPatternStr != null && !regxPatternStr.isEmpty()) { + alluxio.proto.journal.Job.FileFilter.Builder builder = + alluxio.proto.journal.Job.FileFilter.newBuilder() + .setName("fileNamePattern").setValue(regxPatternStr); + FilePredicate filePredicate = FilePredicate.create(builder.build()); + predicate = filePredicate.getUfsStatusPredicate(); + fileFilterRegx = Optional.of(regxPatternStr); + } + } + UnderFileSystem ufs = mFs.getUfsManager().getOrAdd(new AlluxioURI(path), () -> UnderFileSystemConfiguration.defaults(Configuration.global())); Iterable iterable = new UfsStatusIterable(ufs, path, Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName), - Predicates.alwaysTrue()); + predicate); return new DoraLoadJob(path, user, UUID.randomUUID().toString(), bandwidth, partialListing, verificationEnabled, options.getLoadMetadataOnly(), options.getSkipIfExists(), - iterable.iterator(), ufs); + fileFilterRegx, iterable.iterator(), ufs); } } diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java index 2453ab318898..933779708a0e 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java @@ -14,6 +14,7 @@ import alluxio.exception.runtime.InvalidArgumentRuntimeException; import alluxio.master.predicate.interval.Interval; import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; import alluxio.wire.FileInfo; import org.slf4j.Logger; @@ -141,4 +142,10 @@ public Predicate get() { } }; } + + @Override + public Predicate getUfsStatusPredicate() { + throw new UnsupportedOperationException( + "getUfsStatusPredicate() is unsupported in DatePredicate"); + } } diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/FileNamePatternPredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/FileNamePatternPredicate.java new file mode 100644 index 000000000000..e5b20de16c7b --- /dev/null +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/FileNamePatternPredicate.java @@ -0,0 +1,122 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.predicate; + +import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; +import alluxio.wire.FileInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * A predicate related to date of the file. + */ +public class FileNamePatternPredicate implements FilePredicate { + private static final Logger LOG = LoggerFactory.getLogger(FileNamePatternPredicate.class); + private final String mFilterName; + private String mRegexPatternStr; + + /** + * Factory for modification time predicate. + */ + public static class FileNamePatternFactory extends Factory { + @Override + public String getFilterName() { + return "fileNamePattern"; + } + } + + /** + * Factory for creating instances. + */ + public abstract static class Factory implements FilePredicateFactory { + /** + * @return filter name for the predicate + */ + public abstract String getFilterName(); + + /** + * Creates a {@link FilePredicate} from the string value. + * + * @param regexPatternStr the regex pattern string from the filter + * @return the created predicate + */ + public FilePredicate createFileNamePatternPredicate(String regexPatternStr) { + return new FileNamePatternPredicate(getFilterName(), regexPatternStr); + } + + @Override + public FilePredicate create(FileFilter filter) { + try { + if (filter.hasName() && filter.getName().equals(getFilterName())) { + if (filter.hasValue()) { + return createFileNamePatternPredicate(filter.getValue()); + } + } + } catch (Exception e) { + // fall through + } + return null; + } + } + + /** + * Creates an instance. + * + * @param filterName the filter name + * @param regexPatternStr the regex pattern string for file filtering + */ + public FileNamePatternPredicate(String filterName, String regexPatternStr) { + mFilterName = filterName; + mRegexPatternStr = regexPatternStr; + } + + @Override + public Predicate get() { + return FileInfo -> { + try { + String fileName = FileInfo.getName(); + return Pattern.matches(mRegexPatternStr, fileName); + } catch (RuntimeException e) { + LOG.debug("Failed to filter: ", e); + return false; + } + }; + } + + @Override + public Predicate getUfsStatusPredicate() { + return UfsStatus -> { + try { + String fileName = getFileName(UfsStatus); + return Pattern.matches(mRegexPatternStr, fileName); + } catch (RuntimeException e) { + LOG.debug("Failed to filter: ", e); + return false; + } + }; + } + + private String getFileName(UfsStatus ufsStatus) { + String name = ufsStatus.getName(); + int index = name.lastIndexOf("/"); + if (index == -1) { + return name; + } + name = name.substring(index + 1); + return name; + } +} diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java index 3c874c1e750c..1ce40320ac50 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java @@ -12,6 +12,7 @@ package alluxio.master.predicate; import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; import alluxio.wire.FileInfo; import com.google.common.cache.CacheBuilder; @@ -49,6 +50,12 @@ public List load(ClassLoader key) throws Exception { */ Predicate get(); + /** + * Get the predicate function from the file predicate. + * @return the predicate function + */ + Predicate getUfsStatusPredicate(); + /** * Creates a file predicate from a file filter. * If the filter name is invalid, it will throw exception. diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java index 4e064bac6df4..7d9addb97710 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java @@ -14,6 +14,7 @@ import alluxio.exception.runtime.AlluxioRuntimeException; import alluxio.master.predicate.interval.Interval; import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; import alluxio.util.FormatUtils; import alluxio.wire.FileInfo; @@ -200,4 +201,10 @@ public Predicate get() { } }; } + + @Override + public Predicate getUfsStatusPredicate() { + throw new UnsupportedOperationException( + "getUfsStatusPredicate() is unsupported in TimePredicate"); + } } diff --git a/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory b/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory index b23d4de83ecc..1aac24dda82e 100644 --- a/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory +++ b/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory @@ -9,6 +9,7 @@ # See the NOTICE file distributed with this work for information regarding copyright ownership. # +alluxio.master.predicate.FileNamePatternPredicate$FileNamePatternFactory alluxio.master.predicate.TimePredicate$DateFromFileNameOlderThanFactory alluxio.master.predicate.TimePredicate$UnmodifiedForFactory alluxio.master.predicate.DatePredicate$LastModifiedDateFactory \ No newline at end of file diff --git a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/DoraLoadJobTest.java b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/DoraLoadJobTest.java index 240cc400d059..1ef089cd7f62 100644 --- a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/DoraLoadJobTest.java +++ b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/DoraLoadJobTest.java @@ -69,7 +69,7 @@ public void testGetNextTaskWithVirtualBlocks() throws IOException { Predicates.alwaysTrue()).iterator(); DoraLoadJob loadJob = new DoraLoadJob(mLocalUfsRoot, Optional.of("user"), "1", OptionalLong.empty(), false, true, - false, false, iterator, mLocalUfs); + false, false, Optional.empty(), iterator, mLocalUfs); Collection workers = ImmutableList.of( new WorkerInfo().setId(1).setAddress( new WorkerNetAddress().setHost("worker1").setRpcPort(1234))); diff --git a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/SchedulerTest.java b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/SchedulerTest.java index a3d3842aeb4c..cb48e0b6a33d 100644 --- a/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/SchedulerTest.java +++ b/dora/core/server/master/src/test/java/alluxio/master/file/scheduler/SchedulerTest.java @@ -159,8 +159,9 @@ public void testSubmit() throws Exception { Optional user = Optional.of("user"); UnderFileSystem ufs = mock(UnderFileSystem.class); DoraLoadJob loadJob = - new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.empty(), false, true, - false, false, Collections.emptyIterator(), ufs); + new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.empty(), + false, true, false, false, + Optional.empty(), Collections.emptyIterator(), ufs); assertTrue(scheduler.submitJob(loadJob)); assertTrue(jobMetaStore.get(loadJob.getJobId()).getJobState() == JobState.RUNNING); assertEquals(1, scheduler.getJobs().size()); @@ -176,8 +177,10 @@ public void testSubmit() throws Exception { DoraLoadJob job = (DoraLoadJob) scheduler.getJobs().get(loadJob.getDescription()); assertEquals(OptionalLong.empty(), job.getBandwidth()); loadJob = - new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.of(1000), true, false, - false, false, Collections.emptyIterator(), ufs); + new DoraLoadJob(validLoadPath, Optional.of("user"), "1", + OptionalLong.of(1000), true, false, + false, false, Optional.empty(), + Collections.emptyIterator(), ufs); assertFalse(scheduler.submitJob(loadJob)); assertEquals(1, scheduler.getJobs().size()); job = (DoraLoadJob) scheduler.getJobs().get(loadJob.getDescription()); @@ -205,8 +208,10 @@ public void testStop() { Optional user = Optional.of("user"); UnderFileSystem ufs = mock(UnderFileSystem.class); DoraLoadJob job = - new DoraLoadJob(validLoadPath, Optional.of("user"), "1", OptionalLong.of(100), false, true, - false, false, Collections.emptyIterator(), ufs); + new DoraLoadJob(validLoadPath, Optional.of("user"), "1", + OptionalLong.of(100), false, true, + false, false, Optional.empty(), + Collections.emptyIterator(), ufs); assertTrue(scheduler.submitJob(job)); assertTrue(scheduler.stopJob(job.getDescription())); @@ -240,13 +245,15 @@ public void testSubmitExceedsCapacity() throws Exception { i -> { String path = String.format("/path/to/load/%d", i); assertTrue(scheduler.submitJob( - new DoraLoadJob(path, Optional.of("user"), "1", OptionalLong.empty(), false, true, - false, false, Collections.emptyIterator(), ufs) + new DoraLoadJob(path, Optional.of("user"), "1", OptionalLong.empty(), + false, true, false, false, + Optional.empty(), Collections.emptyIterator(), ufs) )); }); assertThrows(ResourceExhaustedRuntimeException.class, () -> scheduler.submitJob( - new DoraLoadJob("/path/to/load/101", Optional.of("user"), "1", OptionalLong.empty(), false, - true, false, false, Collections.emptyIterator(), ufs))); + new DoraLoadJob("/path/to/load/101", Optional.of("user"), "1", + OptionalLong.empty(), false, true, false, + false, Optional.empty(), Collections.emptyIterator(), ufs))); } @Ignore @@ -504,7 +511,7 @@ public void testJobRetention() throws Exception { UnderFileSystem ufs = mock(UnderFileSystem.class); assertTrue(scheduler.submitJob( new DoraLoadJob(path, Optional.of("user"), "1", OptionalLong.empty(), false, true, false, - false, Collections.emptyIterator(), ufs))); + false, Optional.empty(), Collections.emptyIterator(), ufs))); }); assertEquals(5, scheduler.getJobs().size()); scheduler.getJobs().get(JobDescription.newBuilder().setPath("/load/1").setType("load").build()) @@ -539,7 +546,7 @@ public void testStopScheduler() { UnderFileSystem ufs = mock(UnderFileSystem.class); DoraLoadJob job = new DoraLoadJob(path, Optional.of("user"), "5", OptionalLong.of(100), false, true, false, - false, Collections.emptyIterator(), ufs); + false, Optional.empty(), Collections.emptyIterator(), ufs); scheduler.start(); scheduler.submitJob(job); assertEquals(1, scheduler.getJobs().size()); @@ -548,7 +555,7 @@ public void testStopScheduler() { assertEquals(1, metaStore.getJobs().size()); DoraLoadJob job2 = new DoraLoadJob("new", Optional.of("user"), "6", OptionalLong.of(100), false, true, false, - false, Collections.emptyIterator(), ufs); + false, Optional.empty(), Collections.emptyIterator(), ufs); metaStore.updateJob(job2); assertEquals(0, scheduler.getJobs().size()); assertEquals(2, metaStore.getJobs().size()); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java index 2df73580392d..475ba8659fb5 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java @@ -11,6 +11,7 @@ package alluxio.worker.http; +import java.util.Optional; import java.util.OptionalLong; /** @@ -36,6 +37,8 @@ static class Builder { private boolean mSkipIfExists; + private Optional mFileFilterRegx = Optional.empty(); + private Builder() { } @@ -103,6 +106,14 @@ public void setSkipIfExists(boolean skipIfExists) { mSkipIfExists = skipIfExists; } + /** + * Set the file filter regx pattern string. + * @param fileFilterRegx the file filter regx pattern string + */ + public void setFileFilterRegx(Optional fileFilterRegx) { + mFileFilterRegx = fileFilterRegx; + } + /** * Get the operation type {@link OpType}. * @return the operation type {@link OpType} @@ -167,13 +178,21 @@ public boolean isSkipIfExists() { return mSkipIfExists; } + /** + * Get the file filter regx pattern string. + * @return the file filter regx pattern string + */ + public Optional getFileFilterRegx() { + return mFileFilterRegx; + } + public static Builder newBuilder() { return new Builder(); } public HttpLoadOptions build() { return new HttpLoadOptions(mOpType, mPartialListing, mVerify, mBandwidth, - mProgressFormat, mVerbose, mLoadMetadataOnly, mSkipIfExists); + mProgressFormat, mVerbose, mLoadMetadataOnly, mSkipIfExists, mFileFilterRegx); } } @@ -212,6 +231,8 @@ public static OpType of(String type) { private final boolean mSkipIfExists; + private final Optional mFileFilterRegx; + /** * Create an object of {@link HttpLoadOptions}. A data model for the HTTP load options. * @param opType the operation type @@ -222,10 +243,12 @@ public static OpType of(String type) { * @param verbose if we want to print the verbose information * @param loadMetadataOnly if we load metadata only * @param skipIfExists skip if exists + * @param fileFilterRegx the file filter regx pattern string */ public HttpLoadOptions(OpType opType, boolean partialListing, boolean verify, OptionalLong bandwidth, String progressFormat, boolean verbose, - boolean loadMetadataOnly, boolean skipIfExists) { + boolean loadMetadataOnly, boolean skipIfExists, + Optional fileFilterRegx) { mOpType = opType; mPartialListing = partialListing; mVerify = verify; @@ -234,6 +257,7 @@ public HttpLoadOptions(OpType opType, boolean partialListing, boolean verify, mVerbose = verbose; mLoadMetadataOnly = loadMetadataOnly; mSkipIfExists = skipIfExists; + mFileFilterRegx = fileFilterRegx; } /** @@ -299,4 +323,12 @@ public boolean isLoadMetadataOnly() { public boolean isSkipIfExists() { return mSkipIfExists; } + + /** + * Get the file filter regx pattern string. + * @return the file filter regx pattern string + */ + public Optional getFileFilterRegx() { + return mFileFilterRegx; + } } diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java index 05d46dc7ffd6..22c564adc1f0 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java @@ -104,6 +104,12 @@ private String submitLoad(AlluxioURI path, HttpLoadOptions loadOptions) { options.setBandwidth(bandWidth); } } + if (loadOptions.getFileFilterRegx().isPresent()) { + String fileFilterRegxPatternStr = loadOptions.getFileFilterRegx().get(); + if (fileFilterRegxPatternStr != null && !fileFilterRegxPatternStr.isEmpty()) { + options.setFileFilterRegx(fileFilterRegxPatternStr); + } + } LoadJobRequest job = new LoadJobRequest(path.toString(), options.build()); try { Optional jobId = mFileSystem.submitJob(job); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java index 9ddb1ba58550..03bedb6b4032 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java @@ -52,6 +52,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; /** * {@link HttpServerHandler} deals with HTTP requests received from Netty Channel. @@ -246,6 +247,10 @@ private HttpResponseContext doLoad(HttpRequest httpRequest, HttpRequestUri httpR if (skipIfExistsStr != null && !skipIfExistsStr.isEmpty()) { builder.setSkipIfExists(Boolean.parseBoolean(skipIfExistsStr)); } + String fileFilterRegxPattern = parameters.get("fileFilterRegx"); + if (fileFilterRegxPattern != null && !fileFilterRegxPattern.isEmpty()) { + builder.setFileFilterRegx(Optional.of(fileFilterRegxPattern)); + } String progressFormatStr = parameters.get("progressFormat"); if (progressFormatStr != null && !progressFormatStr.isEmpty()) { builder.setProgressFormat(progressFormatStr); diff --git a/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java b/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java index cfee7169d6e8..dfd192ff3848 100644 --- a/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java +++ b/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java @@ -116,6 +116,13 @@ public final class LoadCommand extends AbstractFileSystemCommand { .desc("If specified, skip files if they exist and are fully cached in alluxio.") .build(); + private static final Option FILE_FILTER_REGX = Option.builder() + .longOpt("file-filter-regx") + .required(false) + .hasArg(true) + .desc("If specified, skip files that doesn't match the regx pattern.") + .build(); + /** * Constructs a new instance to load a file or directory in Alluxio space. * @@ -142,7 +149,8 @@ public Options getOptions() { .addOption(PROGRESS_FORMAT) .addOption(PROGRESS_VERBOSE) .addOption(LOAD_METADATA_ONLY) - .addOption(SKIP_IF_EXISTS); + .addOption(SKIP_IF_EXISTS) + .addOption(FILE_FILTER_REGX); } @Override @@ -159,13 +167,18 @@ public int run(CommandLine cl) throws AlluxioException, IOException { bandwidth = OptionalLong.of(FormatUtils.parseSpaceSize( cl.getOptionValue(BANDWIDTH_OPTION.getLongOpt()))); } + Optional regxPatternStr = Optional.empty(); + if (cl.hasOption(FILE_FILTER_REGX.getLongOpt())) { + regxPatternStr = Optional.of(cl.getOptionValue(FILE_FILTER_REGX.getLongOpt())); + } return submitLoad( path, bandwidth, cl.hasOption(PARTIAL_LISTING_OPTION.getLongOpt()), cl.hasOption(VERIFY_OPTION.getLongOpt()), cl.hasOption(LOAD_METADATA_ONLY.getLongOpt()), - cl.hasOption(SKIP_IF_EXISTS.getLongOpt())); + cl.hasOption(SKIP_IF_EXISTS.getLongOpt()), + regxPatternStr); } if (cl.hasOption(STOP_OPTION.getLongOpt())) { @@ -186,7 +199,8 @@ public int run(CommandLine cl) throws AlluxioException, IOException { public String getUsage() { return "For distributed load:\n" + "\tload --submit " - + "[--bandwidth N] [--verify] [--partial-listing] [--metadata-only] [--skip-if-exists]\n" + + "[--bandwidth N] [--verify] [--partial-listing] [--metadata-only] [--skip-if-exists] " + + "[--file-filter-regx ]\n" + "\tload --stop\n" + "\tload --progress [--format TEXT|JSON] [--verbose]\n"; } @@ -215,7 +229,8 @@ public void validateArgs(CommandLine cl) throws InvalidArgumentException { } private int submitLoad(AlluxioURI path, OptionalLong bandwidth, - boolean usePartialListing, boolean verify, boolean loadMetadataOnly, boolean skipIfExists) { + boolean usePartialListing, boolean verify, boolean loadMetadataOnly, boolean skipIfExists, + Optional regxPatternStr) { LoadJobPOptions.Builder options = alluxio.grpc.LoadJobPOptions .newBuilder().setPartialListing(usePartialListing).setVerify(verify) .setLoadMetadataOnly(loadMetadataOnly) @@ -223,6 +238,9 @@ private int submitLoad(AlluxioURI path, OptionalLong bandwidth, if (bandwidth.isPresent()) { options.setBandwidth(bandwidth.getAsLong()); } + if (regxPatternStr.isPresent()) { + options.setFileFilterRegx(regxPatternStr.get()); + } LoadJobRequest job = new LoadJobRequest(path.toString(), options.build()); try { Optional jobId = mFileSystem.submitJob(job); diff --git a/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandIntegrationTest.java b/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandIntegrationTest.java index 5ec49b2842fa..da60c5aa89f3 100644 --- a/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandIntegrationTest.java +++ b/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandIntegrationTest.java @@ -82,5 +82,34 @@ public void testCommand() throws Exception { assertTrue(mOutput.toString().contains("\"mJobState\":\"SUCCEEDED\"")); mFsShell.run("load", path, "--progress", "--format", "JSON", "--verbose"); assertTrue(mOutput.toString().contains("\"mVerbose\":true")); + + // Test load with regx pattern file filter + createByteFileInUfs("/testRoot/testFileD", Constants.MB); + createByteFileInUfs("/testRoot/testDirectory/testFileE", Constants.MB); + createByteFileInUfs("/testRoot/testDirectory/testFileF", Constants.MB); + createByteFileInUfs("/testRoot/testDirectory/testFileG1", Constants.MB); + createByteFileInUfs("/testRoot/testDirectory/testFileG2", Constants.MB); + + AlluxioURI uriD = new AlluxioURI("/testRoot/testFileD"); + AlluxioURI uriE = new AlluxioURI("/testRoot/testDirectory/testFileE"); + AlluxioURI uriF = new AlluxioURI("/testRoot/testDirectory/testFileF"); + AlluxioURI uriG1 = new AlluxioURI("/testRoot/testDirectory/testFileG1"); + AlluxioURI uriG2 = new AlluxioURI("/testRoot/testDirectory/testFileG2"); + + mOutput.reset(); + assertEquals(0, mFsShell.run("load", path, "--submit", + "--file-filter-regx", ".*G[1|2]")); + assertEquals(0, mFsShell.run("load", path, "--progress")); + while (!mOutput.toString().contains("SUCCEEDED")) { + assertEquals(0, mFsShell.run("load", path, "--progress")); + Thread.sleep(1000); + } + assertTrue(mOutput.toString().contains("Inodes Processed: 2")); + assertEquals(0, mFileSystem.getStatus(uriD).getInAlluxioPercentage()); + assertEquals(0, mFileSystem.getStatus(uriE).getInAlluxioPercentage()); + assertEquals(0, mFileSystem.getStatus(uriF).getInAlluxioPercentage()); + assertEquals(0, mFileSystem.getStatus(uriD).getInAlluxioPercentage()); + assertEquals(100, mFileSystem.getStatus(uriG1).getInAlluxioPercentage()); + assertEquals(100, mFileSystem.getStatus(uriG2).getInAlluxioPercentage()); } } diff --git a/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandWithVirtualBlockIntegrationTest.java b/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandWithVirtualBlockIntegrationTest.java index 104c1a2dc692..308cf1298fe2 100644 --- a/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandWithVirtualBlockIntegrationTest.java +++ b/dora/tests/integration/src/test/java/alluxio/client/cli/fs/command/DoraLoadCommandWithVirtualBlockIntegrationTest.java @@ -37,7 +37,7 @@ public DoraLoadCommandWithVirtualBlockIntegrationTest() throws IOException { @Override public void before() throws Exception { mLocalAlluxioClusterResource.setProperty(PropertyKey.MASTER_SCHEDULER_INITIAL_DELAY, "1s") - .setProperty(PropertyKey.DORA_READ_VIRTUAL_BLOCK_SIZE, "30MB") + .setProperty(PropertyKey.DORA_READ_VIRTUAL_BLOCK_SIZE, "2MB") .setProperty(PropertyKey.UNDERFS_XATTR_CHANGE_ENABLED, false); super.before(); } @@ -48,11 +48,11 @@ public void testCommand() throws Exception { String path = testRoot.getAbsolutePath(); mTestFolder.newFolder("testRoot/testDirectory"); - int lengthA = 16 * Constants.MB; + int lengthA = 1 * Constants.MB; createByteFileInUfs("/testRoot/testFileA", lengthA); - int lengthB = 32 * Constants.MB; + int lengthB = 3 * Constants.MB; createByteFileInUfs("/testRoot/testFileB", lengthB); - int lengthC = 64 * Constants.MB; + int lengthC = 5 * Constants.MB; createByteFileInUfs("/testRoot/testDirectory/testFileC", lengthC); AlluxioURI uriA = new AlluxioURI("/testRoot/testFileA");