diff --git a/cli/src/alluxio.org/cli/cmd/job/load.go b/cli/src/alluxio.org/cli/cmd/job/load.go index 5d087a7f0416..39daf565a702 100644 --- a/cli/src/alluxio.org/cli/cmd/job/load.go +++ b/cli/src/alluxio.org/cli/cmd/job/load.go @@ -12,45 +12,46 @@ package job import ( - "github.com/palantir/stacktrace" - "github.com/spf13/cobra" + "github.com/palantir/stacktrace" + "github.com/spf13/cobra" - "alluxio.org/cli/cmd/names" - "alluxio.org/cli/env" + "alluxio.org/cli/cmd/names" + "alluxio.org/cli/env" ) var Load = &LoadCommand{ - BaseJobCommand: &BaseJobCommand{ - BaseJavaCommand: &env.BaseJavaCommand{ - CommandName: "load", - JavaClassName: names.FileSystemShellJavaClass, - }, - }, + BaseJobCommand: &BaseJobCommand{ + BaseJavaCommand: &env.BaseJavaCommand{ + CommandName: "load", + JavaClassName: names.FileSystemShellJavaClass, + }, + }, } type LoadCommand struct { - *BaseJobCommand - path string + *BaseJobCommand + path string - bandwidth string - verify bool - partialListing bool - metadataOnly bool - skipIfExists bool + bandwidth string + verify bool + partialListing bool + metadataOnly bool + skipIfExists bool + fileFilterRegx string } func (c *LoadCommand) Base() *env.BaseJavaCommand { - return c.BaseJavaCommand + return c.BaseJavaCommand } func (c *LoadCommand) ToCommand() *cobra.Command { - cmd := c.Base().InitRunJavaClassCmd(&cobra.Command{ - Use: Load.CommandName, - Short: "Submit or manage load jobs", - Long: `The load command moves data from the under storage system into Alluxio storage. + cmd := c.Base().InitRunJavaClassCmd(&cobra.Command{ + Use: Load.CommandName, + Short: "Submit or manage load jobs", + Long: `The load command moves data from the under storage system into Alluxio storage. For example, load can be used to prefetch data for analytics jobs. If load is run on a directory, files in the directory will be recursively loaded.`, - Example: `# Submit a load job + Example: `# Submit a load job $ ./bin/alluxio job load --path /path --submit # View the progress of a submitted job @@ -67,42 +68,46 @@ Progress for loading path '/path': # Stop a submitted job $ ./bin/alluxio job load --path /path --stop`, - Args: cobra.NoArgs, - RunE: func(cmd *cobra.Command, args []string) error { - return c.Run(args) - }, - }) - const path = "path" - cmd.Flags().StringVar(&c.path, path, "", "[all] Source path of load operation") - cmd.MarkFlagRequired(path) - c.AttachOperationFlags(cmd) + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + return c.Run(args) + }, + }) + const path = "path" + cmd.Flags().StringVar(&c.path, path, "", "[all] Source path of load operation") + cmd.MarkFlagRequired(path) + c.AttachOperationFlags(cmd) - cmd.Flags().StringVar(&c.bandwidth, "bandwidth", "", "[submit] Single worker read bandwidth limit") - cmd.Flags().BoolVar(&c.verify, "verify", false, "[submit] Run verification when load finishes and load new files if any") - cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details") - cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata") - cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files") - return cmd + cmd.Flags().StringVar(&c.bandwidth, "bandwidth", "", "[submit] Single worker read bandwidth limit") + cmd.Flags().BoolVar(&c.verify, "verify", false, "[submit] Run verification when load finishes and load new files if any") + cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details") + cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata") + cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files") + cmd.Flags().StringVar(&c.fileFilterRegx, "file-filter-regx", "", "[submit] Skip files that match the regx pattern") + return cmd } func (c *LoadCommand) Run(_ []string) error { - opWithArgs, err := c.OperationWithArgs() - if err != nil { - return stacktrace.Propagate(err, "error parsing operation") - } - javaArgs := []string{"load", c.path} - javaArgs = append(javaArgs, opWithArgs...) - if c.bandwidth != "" { - javaArgs = append(javaArgs, "--bandwidth", c.bandwidth) - } - if c.partialListing { - javaArgs = append(javaArgs, "--partial-listing") - } - if c.metadataOnly { - javaArgs = append(javaArgs, "--metadata-only") - } - if c.skipIfExists { - javaArgs = append(javaArgs, "--skip-if-exists") - } - return c.Base().Run(javaArgs) + opWithArgs, err := c.OperationWithArgs() + if err != nil { + return stacktrace.Propagate(err, "error parsing operation") + } + javaArgs := []string{"load", c.path} + javaArgs = append(javaArgs, opWithArgs...) + if c.bandwidth != "" { + javaArgs = append(javaArgs, "--bandwidth", c.bandwidth) + } + if c.partialListing { + javaArgs = append(javaArgs, "--partial-listing") + } + if c.metadataOnly { + javaArgs = append(javaArgs, "--metadata-only") + } + if c.skipIfExists { + javaArgs = append(javaArgs, "--skip-if-exists") + } + if c.fileFilterRegx != "" { + javaArgs = append(javaArgs, "--file-filter-regx") + } + return c.Base().Run(javaArgs) } diff --git a/common/transport/src/main/proto/grpc/file_system_master.proto b/common/transport/src/main/proto/grpc/file_system_master.proto index f9a36378c6cf..37066b99ebd2 100644 --- a/common/transport/src/main/proto/grpc/file_system_master.proto +++ b/common/transport/src/main/proto/grpc/file_system_master.proto @@ -608,6 +608,7 @@ message LoadJobPOptions { optional bool partialListing = 3; optional bool loadMetadataOnly = 4; optional bool skipIfExists = 5; + optional string fileFilterRegx = 6; } message CopyJobPOptions { diff --git a/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java b/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java index 24c4344a08d8..e16c38b8bf8f 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java +++ b/dora/core/server/master/src/main/java/alluxio/master/job/LoadJobFactory.java @@ -16,6 +16,7 @@ import alluxio.grpc.LoadJobPOptions; import alluxio.job.LoadJobRequest; import alluxio.master.file.DefaultFileSystemMaster; +import alluxio.master.predicate.FilePredicate; import alluxio.scheduler.job.Job; import alluxio.scheduler.job.JobFactory; import alluxio.security.User; @@ -29,6 +30,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.UUID; +import java.util.function.Predicate; /** * Factory for creating {@link LoadJob}s that get file infos from master. @@ -60,11 +62,23 @@ public Job create() { .ofNullable(AuthenticatedClientUser.getOrNull()) .map(User::getName); + Predicate predicate = Predicates.alwaysTrue(); + if (options.hasFileFilterRegx()) { + String regxPatternStr = options.getFileFilterRegx(); + if (regxPatternStr != null && !regxPatternStr.isEmpty()) { + alluxio.proto.journal.Job.FileFilter.Builder builder = + alluxio.proto.journal.Job.FileFilter.newBuilder() + .setName("fileNamePattern").setValue(regxPatternStr); + FilePredicate filePredicate = FilePredicate.create(builder.build()); + predicate = filePredicate.getUfsStatusPredicate(); + } + } + UnderFileSystem ufs = mFs.getUfsManager().getOrAdd(new AlluxioURI(path), UnderFileSystemConfiguration.defaults(Configuration.global())); Iterable iterable = new UfsStatusIterable(ufs, path, Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName), - Predicates.alwaysTrue()); + predicate); return new DoraLoadJob(path, user, UUID.randomUUID().toString(), bandwidth, partialListing, verificationEnabled, options.getLoadMetadataOnly(), options.getSkipIfExists(), iterable.iterator(), ufs); diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java index 2453ab318898..933779708a0e 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/DatePredicate.java @@ -14,6 +14,7 @@ import alluxio.exception.runtime.InvalidArgumentRuntimeException; import alluxio.master.predicate.interval.Interval; import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; import alluxio.wire.FileInfo; import org.slf4j.Logger; @@ -141,4 +142,10 @@ public Predicate get() { } }; } + + @Override + public Predicate getUfsStatusPredicate() { + throw new UnsupportedOperationException( + "getUfsStatusPredicate() is unsupported in DatePredicate"); + } } diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/FileNamePatternPredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/FileNamePatternPredicate.java new file mode 100644 index 000000000000..e5b20de16c7b --- /dev/null +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/FileNamePatternPredicate.java @@ -0,0 +1,122 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.master.predicate; + +import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; +import alluxio.wire.FileInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Predicate; +import java.util.regex.Pattern; + +/** + * A predicate related to date of the file. + */ +public class FileNamePatternPredicate implements FilePredicate { + private static final Logger LOG = LoggerFactory.getLogger(FileNamePatternPredicate.class); + private final String mFilterName; + private String mRegexPatternStr; + + /** + * Factory for modification time predicate. + */ + public static class FileNamePatternFactory extends Factory { + @Override + public String getFilterName() { + return "fileNamePattern"; + } + } + + /** + * Factory for creating instances. + */ + public abstract static class Factory implements FilePredicateFactory { + /** + * @return filter name for the predicate + */ + public abstract String getFilterName(); + + /** + * Creates a {@link FilePredicate} from the string value. + * + * @param regexPatternStr the regex pattern string from the filter + * @return the created predicate + */ + public FilePredicate createFileNamePatternPredicate(String regexPatternStr) { + return new FileNamePatternPredicate(getFilterName(), regexPatternStr); + } + + @Override + public FilePredicate create(FileFilter filter) { + try { + if (filter.hasName() && filter.getName().equals(getFilterName())) { + if (filter.hasValue()) { + return createFileNamePatternPredicate(filter.getValue()); + } + } + } catch (Exception e) { + // fall through + } + return null; + } + } + + /** + * Creates an instance. + * + * @param filterName the filter name + * @param regexPatternStr the regex pattern string for file filtering + */ + public FileNamePatternPredicate(String filterName, String regexPatternStr) { + mFilterName = filterName; + mRegexPatternStr = regexPatternStr; + } + + @Override + public Predicate get() { + return FileInfo -> { + try { + String fileName = FileInfo.getName(); + return Pattern.matches(mRegexPatternStr, fileName); + } catch (RuntimeException e) { + LOG.debug("Failed to filter: ", e); + return false; + } + }; + } + + @Override + public Predicate getUfsStatusPredicate() { + return UfsStatus -> { + try { + String fileName = getFileName(UfsStatus); + return Pattern.matches(mRegexPatternStr, fileName); + } catch (RuntimeException e) { + LOG.debug("Failed to filter: ", e); + return false; + } + }; + } + + private String getFileName(UfsStatus ufsStatus) { + String name = ufsStatus.getName(); + int index = name.lastIndexOf("/"); + if (index == -1) { + return name; + } + name = name.substring(index + 1); + return name; + } +} diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java index 3c874c1e750c..1ce40320ac50 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/FilePredicate.java @@ -12,6 +12,7 @@ package alluxio.master.predicate; import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; import alluxio.wire.FileInfo; import com.google.common.cache.CacheBuilder; @@ -49,6 +50,12 @@ public List load(ClassLoader key) throws Exception { */ Predicate get(); + /** + * Get the predicate function from the file predicate. + * @return the predicate function + */ + Predicate getUfsStatusPredicate(); + /** * Creates a file predicate from a file filter. * If the filter name is invalid, it will throw exception. diff --git a/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java b/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java index 4e064bac6df4..7d9addb97710 100644 --- a/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java +++ b/dora/core/server/master/src/main/java/alluxio/master/predicate/TimePredicate.java @@ -14,6 +14,7 @@ import alluxio.exception.runtime.AlluxioRuntimeException; import alluxio.master.predicate.interval.Interval; import alluxio.proto.journal.Job.FileFilter; +import alluxio.underfs.UfsStatus; import alluxio.util.FormatUtils; import alluxio.wire.FileInfo; @@ -200,4 +201,10 @@ public Predicate get() { } }; } + + @Override + public Predicate getUfsStatusPredicate() { + throw new UnsupportedOperationException( + "getUfsStatusPredicate() is unsupported in TimePredicate"); + } } diff --git a/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory b/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory index b23d4de83ecc..1aac24dda82e 100644 --- a/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory +++ b/dora/core/server/master/src/main/resources/META-INF/services/alluxio.master.predicate.FilePredicateFactory @@ -9,6 +9,7 @@ # See the NOTICE file distributed with this work for information regarding copyright ownership. # +alluxio.master.predicate.FileNamePatternPredicate$FileNamePatternFactory alluxio.master.predicate.TimePredicate$DateFromFileNameOlderThanFactory alluxio.master.predicate.TimePredicate$UnmodifiedForFactory alluxio.master.predicate.DatePredicate$LastModifiedDateFactory \ No newline at end of file diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java index 2df73580392d..475ba8659fb5 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadOptions.java @@ -11,6 +11,7 @@ package alluxio.worker.http; +import java.util.Optional; import java.util.OptionalLong; /** @@ -36,6 +37,8 @@ static class Builder { private boolean mSkipIfExists; + private Optional mFileFilterRegx = Optional.empty(); + private Builder() { } @@ -103,6 +106,14 @@ public void setSkipIfExists(boolean skipIfExists) { mSkipIfExists = skipIfExists; } + /** + * Set the file filter regx pattern string. + * @param fileFilterRegx the file filter regx pattern string + */ + public void setFileFilterRegx(Optional fileFilterRegx) { + mFileFilterRegx = fileFilterRegx; + } + /** * Get the operation type {@link OpType}. * @return the operation type {@link OpType} @@ -167,13 +178,21 @@ public boolean isSkipIfExists() { return mSkipIfExists; } + /** + * Get the file filter regx pattern string. + * @return the file filter regx pattern string + */ + public Optional getFileFilterRegx() { + return mFileFilterRegx; + } + public static Builder newBuilder() { return new Builder(); } public HttpLoadOptions build() { return new HttpLoadOptions(mOpType, mPartialListing, mVerify, mBandwidth, - mProgressFormat, mVerbose, mLoadMetadataOnly, mSkipIfExists); + mProgressFormat, mVerbose, mLoadMetadataOnly, mSkipIfExists, mFileFilterRegx); } } @@ -212,6 +231,8 @@ public static OpType of(String type) { private final boolean mSkipIfExists; + private final Optional mFileFilterRegx; + /** * Create an object of {@link HttpLoadOptions}. A data model for the HTTP load options. * @param opType the operation type @@ -222,10 +243,12 @@ public static OpType of(String type) { * @param verbose if we want to print the verbose information * @param loadMetadataOnly if we load metadata only * @param skipIfExists skip if exists + * @param fileFilterRegx the file filter regx pattern string */ public HttpLoadOptions(OpType opType, boolean partialListing, boolean verify, OptionalLong bandwidth, String progressFormat, boolean verbose, - boolean loadMetadataOnly, boolean skipIfExists) { + boolean loadMetadataOnly, boolean skipIfExists, + Optional fileFilterRegx) { mOpType = opType; mPartialListing = partialListing; mVerify = verify; @@ -234,6 +257,7 @@ public HttpLoadOptions(OpType opType, boolean partialListing, boolean verify, mVerbose = verbose; mLoadMetadataOnly = loadMetadataOnly; mSkipIfExists = skipIfExists; + mFileFilterRegx = fileFilterRegx; } /** @@ -299,4 +323,12 @@ public boolean isLoadMetadataOnly() { public boolean isSkipIfExists() { return mSkipIfExists; } + + /** + * Get the file filter regx pattern string. + * @return the file filter regx pattern string + */ + public Optional getFileFilterRegx() { + return mFileFilterRegx; + } } diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java index 05d46dc7ffd6..22c564adc1f0 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpLoadService.java @@ -104,6 +104,12 @@ private String submitLoad(AlluxioURI path, HttpLoadOptions loadOptions) { options.setBandwidth(bandWidth); } } + if (loadOptions.getFileFilterRegx().isPresent()) { + String fileFilterRegxPatternStr = loadOptions.getFileFilterRegx().get(); + if (fileFilterRegxPatternStr != null && !fileFilterRegxPatternStr.isEmpty()) { + options.setFileFilterRegx(fileFilterRegxPatternStr); + } + } LoadJobRequest job = new LoadJobRequest(path.toString(), options.build()); try { Optional jobId = mFileSystem.submitJob(job); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java index 477349f44d50..0498f3bcc72b 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/http/HttpServerHandler.java @@ -52,6 +52,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; /** * {@link HttpServerHandler} deals with HTTP requests received from Netty Channel. @@ -217,6 +218,10 @@ private HttpResponseContext doLoad(HttpRequest httpRequest, HttpRequestUri httpR if (skipIfExistsStr != null && !skipIfExistsStr.isEmpty()) { builder.setSkipIfExists(Boolean.parseBoolean(skipIfExistsStr)); } + String fileFilterRegxPattern = parameters.get("fileFilterRegx"); + if (fileFilterRegxPattern != null && !fileFilterRegxPattern.isEmpty()) { + builder.setFileFilterRegx(Optional.of(fileFilterRegxPattern)); + } String progressFormatStr = parameters.get("progressFormat"); if (progressFormatStr != null && !progressFormatStr.isEmpty()) { builder.setProgressFormat(progressFormatStr); diff --git a/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java b/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java index cfee7169d6e8..abba54d32b1a 100644 --- a/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java +++ b/dora/shell/src/main/java/alluxio/cli/fs/command/LoadCommand.java @@ -116,6 +116,13 @@ public final class LoadCommand extends AbstractFileSystemCommand { .desc("If specified, skip files if they exist and are fully cached in alluxio.") .build(); + private static final Option FILE_FILTER_REGX = Option.builder() + .longOpt("file-filter-regx") + .required(false) + .hasArg(false) + .desc("If specified, skip files that doesn't match the regx pattern.") + .build(); + /** * Constructs a new instance to load a file or directory in Alluxio space. * @@ -142,7 +149,8 @@ public Options getOptions() { .addOption(PROGRESS_FORMAT) .addOption(PROGRESS_VERBOSE) .addOption(LOAD_METADATA_ONLY) - .addOption(SKIP_IF_EXISTS); + .addOption(SKIP_IF_EXISTS) + .addOption(FILE_FILTER_REGX); } @Override @@ -159,13 +167,18 @@ public int run(CommandLine cl) throws AlluxioException, IOException { bandwidth = OptionalLong.of(FormatUtils.parseSpaceSize( cl.getOptionValue(BANDWIDTH_OPTION.getLongOpt()))); } + Optional regxPatternStr = Optional.empty(); + if (cl.hasOption(FILE_FILTER_REGX.getLongOpt())) { + regxPatternStr = Optional.of(cl.getOptionValue(FILE_FILTER_REGX.getLongOpt())); + } return submitLoad( path, bandwidth, cl.hasOption(PARTIAL_LISTING_OPTION.getLongOpt()), cl.hasOption(VERIFY_OPTION.getLongOpt()), cl.hasOption(LOAD_METADATA_ONLY.getLongOpt()), - cl.hasOption(SKIP_IF_EXISTS.getLongOpt())); + cl.hasOption(SKIP_IF_EXISTS.getLongOpt()), + regxPatternStr); } if (cl.hasOption(STOP_OPTION.getLongOpt())) { @@ -186,7 +199,8 @@ public int run(CommandLine cl) throws AlluxioException, IOException { public String getUsage() { return "For distributed load:\n" + "\tload --submit " - + "[--bandwidth N] [--verify] [--partial-listing] [--metadata-only] [--skip-if-exists]\n" + + "[--bandwidth N] [--verify] [--partial-listing] [--metadata-only] [--skip-if-exists] " + + "[--file-filter-regx ]\n" + "\tload --stop\n" + "\tload --progress [--format TEXT|JSON] [--verbose]\n"; } @@ -215,7 +229,8 @@ public void validateArgs(CommandLine cl) throws InvalidArgumentException { } private int submitLoad(AlluxioURI path, OptionalLong bandwidth, - boolean usePartialListing, boolean verify, boolean loadMetadataOnly, boolean skipIfExists) { + boolean usePartialListing, boolean verify, boolean loadMetadataOnly, boolean skipIfExists, + Optional regxPatternStr) { LoadJobPOptions.Builder options = alluxio.grpc.LoadJobPOptions .newBuilder().setPartialListing(usePartialListing).setVerify(verify) .setLoadMetadataOnly(loadMetadataOnly) @@ -223,6 +238,9 @@ private int submitLoad(AlluxioURI path, OptionalLong bandwidth, if (bandwidth.isPresent()) { options.setBandwidth(bandwidth.getAsLong()); } + if (regxPatternStr.isPresent()) { + options.setFileFilterRegx(regxPatternStr.get()); + } LoadJobRequest job = new LoadJobRequest(path.toString(), options.build()); try { Optional jobId = mFileSystem.submitJob(job);