Skip to content

Commit

Permalink
add regx pattern file filter for distributed load
Browse files Browse the repository at this point in the history
format codestyle
  • Loading branch information
JiamingMai committed Oct 20, 2023
1 parent 47b7667 commit 3536ae5
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 64 deletions.
119 changes: 62 additions & 57 deletions cli/src/alluxio.org/cli/cmd/job/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,46 @@
package job

import (
"github.com/palantir/stacktrace"
"github.com/spf13/cobra"
"github.com/palantir/stacktrace"
"github.com/spf13/cobra"

"alluxio.org/cli/cmd/names"
"alluxio.org/cli/env"
"alluxio.org/cli/cmd/names"
"alluxio.org/cli/env"
)

var Load = &LoadCommand{
BaseJobCommand: &BaseJobCommand{
BaseJavaCommand: &env.BaseJavaCommand{
CommandName: "load",
JavaClassName: names.FileSystemShellJavaClass,
},
},
BaseJobCommand: &BaseJobCommand{
BaseJavaCommand: &env.BaseJavaCommand{
CommandName: "load",
JavaClassName: names.FileSystemShellJavaClass,
},
},
}

type LoadCommand struct {
*BaseJobCommand
path string
*BaseJobCommand
path string

bandwidth string
verify bool
partialListing bool
metadataOnly bool
skipIfExists bool
bandwidth string
verify bool
partialListing bool
metadataOnly bool
skipIfExists bool
fileFilterRegx string
}

func (c *LoadCommand) Base() *env.BaseJavaCommand {
return c.BaseJavaCommand
return c.BaseJavaCommand
}

func (c *LoadCommand) ToCommand() *cobra.Command {
cmd := c.Base().InitRunJavaClassCmd(&cobra.Command{
Use: Load.CommandName,
Short: "Submit or manage load jobs",
Long: `The load command moves data from the under storage system into Alluxio storage.
cmd := c.Base().InitRunJavaClassCmd(&cobra.Command{
Use: Load.CommandName,
Short: "Submit or manage load jobs",
Long: `The load command moves data from the under storage system into Alluxio storage.
For example, load can be used to prefetch data for analytics jobs.
If load is run on a directory, files in the directory will be recursively loaded.`,
Example: `# Submit a load job
Example: `# Submit a load job
$ ./bin/alluxio job load --path /path --submit
# View the progress of a submitted job
Expand All @@ -67,42 +68,46 @@ Progress for loading path '/path':
# Stop a submitted job
$ ./bin/alluxio job load --path /path --stop`,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return c.Run(args)
},
})
const path = "path"
cmd.Flags().StringVar(&c.path, path, "", "[all] Source path of load operation")
cmd.MarkFlagRequired(path)
c.AttachOperationFlags(cmd)
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return c.Run(args)
},
})
const path = "path"
cmd.Flags().StringVar(&c.path, path, "", "[all] Source path of load operation")
cmd.MarkFlagRequired(path)
c.AttachOperationFlags(cmd)

cmd.Flags().StringVar(&c.bandwidth, "bandwidth", "", "[submit] Single worker read bandwidth limit")
cmd.Flags().BoolVar(&c.verify, "verify", false, "[submit] Run verification when load finishes and load new files if any")
cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details")
cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata")
cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files")
return cmd
cmd.Flags().StringVar(&c.bandwidth, "bandwidth", "", "[submit] Single worker read bandwidth limit")
cmd.Flags().BoolVar(&c.verify, "verify", false, "[submit] Run verification when load finishes and load new files if any")
cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details")
cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata")
cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files")
cmd.Flags().StringVar(&c.fileFilterRegx, "file-filter-regx", "", "[submit] Skip files that match the regx pattern")
return cmd
}

func (c *LoadCommand) Run(_ []string) error {
opWithArgs, err := c.OperationWithArgs()
if err != nil {
return stacktrace.Propagate(err, "error parsing operation")
}
javaArgs := []string{"load", c.path}
javaArgs = append(javaArgs, opWithArgs...)
if c.bandwidth != "" {
javaArgs = append(javaArgs, "--bandwidth", c.bandwidth)
}
if c.partialListing {
javaArgs = append(javaArgs, "--partial-listing")
}
if c.metadataOnly {
javaArgs = append(javaArgs, "--metadata-only")
}
if c.skipIfExists {
javaArgs = append(javaArgs, "--skip-if-exists")
}
return c.Base().Run(javaArgs)
opWithArgs, err := c.OperationWithArgs()
if err != nil {
return stacktrace.Propagate(err, "error parsing operation")
}
javaArgs := []string{"load", c.path}
javaArgs = append(javaArgs, opWithArgs...)
if c.bandwidth != "" {
javaArgs = append(javaArgs, "--bandwidth", c.bandwidth)
}
if c.partialListing {
javaArgs = append(javaArgs, "--partial-listing")
}
if c.metadataOnly {
javaArgs = append(javaArgs, "--metadata-only")
}
if c.skipIfExists {
javaArgs = append(javaArgs, "--skip-if-exists")
}
if c.fileFilterRegx != "" {
javaArgs = append(javaArgs, "--file-filter-regx")
}
return c.Base().Run(javaArgs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ message LoadJobPOptions {
optional bool partialListing = 3;
optional bool loadMetadataOnly = 4;
optional bool skipIfExists = 5;
optional string fileFilterRegx = 6;
}

message CopyJobPOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import alluxio.grpc.LoadJobPOptions;
import alluxio.job.LoadJobRequest;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.master.predicate.FilePredicate;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobFactory;
import alluxio.security.User;
Expand All @@ -29,6 +30,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.function.Predicate;

/**
* Factory for creating {@link LoadJob}s that get file infos from master.
Expand Down Expand Up @@ -60,11 +62,23 @@ public Job<?> create() {
.ofNullable(AuthenticatedClientUser.getOrNull())
.map(User::getName);

Predicate<UfsStatus> predicate = Predicates.alwaysTrue();
if (options.hasFileFilterRegx()) {
String regxPatternStr = options.getFileFilterRegx();
if (regxPatternStr != null && !regxPatternStr.isEmpty()) {
alluxio.proto.journal.Job.FileFilter.Builder builder =
alluxio.proto.journal.Job.FileFilter.newBuilder()
.setName("fileNamePattern").setValue(regxPatternStr);
FilePredicate filePredicate = FilePredicate.create(builder.build());
predicate = filePredicate.getUfsStatusPredicate();
}
}

UnderFileSystem ufs = mFs.getUfsManager().getOrAdd(new AlluxioURI(path),
UnderFileSystemConfiguration.defaults(Configuration.global()));
Iterable<UfsStatus> iterable = new UfsStatusIterable(ufs, path,
Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName),
Predicates.alwaysTrue());
predicate);
return new DoraLoadJob(path, user, UUID.randomUUID().toString(), bandwidth, partialListing,
verificationEnabled, options.getLoadMetadataOnly(), options.getSkipIfExists(),
iterable.iterator(), ufs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.exception.runtime.InvalidArgumentRuntimeException;
import alluxio.master.predicate.interval.Interval;
import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import org.slf4j.Logger;
Expand Down Expand Up @@ -141,4 +142,10 @@ public Predicate<FileInfo> get() {
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
throw new UnsupportedOperationException(
"getUfsStatusPredicate() is unsupported in DatePredicate");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.predicate;

import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Predicate;
import java.util.regex.Pattern;

/**
* A predicate related to date of the file.
*/
public class FileNamePatternPredicate implements FilePredicate {
private static final Logger LOG = LoggerFactory.getLogger(FileNamePatternPredicate.class);
private final String mFilterName;
private String mRegexPatternStr;

/**
* Factory for modification time predicate.
*/
public static class FileNamePatternFactory extends Factory {
@Override
public String getFilterName() {
return "fileNamePattern";
}
}

/**
* Factory for creating instances.
*/
public abstract static class Factory implements FilePredicateFactory {
/**
* @return filter name for the predicate
*/
public abstract String getFilterName();

/**
* Creates a {@link FilePredicate} from the string value.
*
* @param regexPatternStr the regex pattern string from the filter
* @return the created predicate
*/
public FilePredicate createFileNamePatternPredicate(String regexPatternStr) {
return new FileNamePatternPredicate(getFilterName(), regexPatternStr);
}

@Override
public FilePredicate create(FileFilter filter) {
try {
if (filter.hasName() && filter.getName().equals(getFilterName())) {
if (filter.hasValue()) {
return createFileNamePatternPredicate(filter.getValue());
}
}
} catch (Exception e) {
// fall through
}
return null;
}
}

/**
* Creates an instance.
*
* @param filterName the filter name
* @param regexPatternStr the regex pattern string for file filtering
*/
public FileNamePatternPredicate(String filterName, String regexPatternStr) {
mFilterName = filterName;
mRegexPatternStr = regexPatternStr;
}

@Override
public Predicate<FileInfo> get() {
return FileInfo -> {
try {
String fileName = FileInfo.getName();
return Pattern.matches(mRegexPatternStr, fileName);
} catch (RuntimeException e) {
LOG.debug("Failed to filter: ", e);
return false;
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
return UfsStatus -> {
try {
String fileName = getFileName(UfsStatus);
return Pattern.matches(mRegexPatternStr, fileName);
} catch (RuntimeException e) {
LOG.debug("Failed to filter: ", e);
return false;
}
};
}

private String getFileName(UfsStatus ufsStatus) {
String name = ufsStatus.getName();
int index = name.lastIndexOf("/");
if (index == -1) {
return name;
}
name = name.substring(index + 1);
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.master.predicate;

import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -49,6 +50,12 @@ public List<FilePredicateFactory> load(ClassLoader key) throws Exception {
*/
Predicate<FileInfo> get();

/**
* Get the predicate function from the file predicate.
* @return the predicate function
*/
Predicate<UfsStatus> getUfsStatusPredicate();

/**
* Creates a file predicate from a file filter.
* If the filter name is invalid, it will throw exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.master.predicate.interval.Interval;
import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.util.FormatUtils;
import alluxio.wire.FileInfo;

Expand Down Expand Up @@ -200,4 +201,10 @@ public Predicate<FileInfo> get() {
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
throw new UnsupportedOperationException(
"getUfsStatusPredicate() is unsupported in TimePredicate");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the NOTICE file distributed with this work for information regarding copyright ownership.
#

alluxio.master.predicate.FileNamePatternPredicate$FileNamePatternFactory
alluxio.master.predicate.TimePredicate$DateFromFileNameOlderThanFactory
alluxio.master.predicate.TimePredicate$UnmodifiedForFactory
alluxio.master.predicate.DatePredicate$LastModifiedDateFactory
Loading

0 comments on commit 3536ae5

Please sign in to comment.