Skip to content

Commit

Permalink
Add regx pattern file filter for distributed load
Browse files Browse the repository at this point in the history
Add regx pattern file filter for distributed load.

**Example:**
The following request allows us to load the files under `/test-load` directory with "hello" prefix:
`curl -X GET http://localhost:28080/v1/load?path=s3a://jiamingmai-test/test-load&opType=submit&verbose=true&fileFilterRegx=^hello.*`
			pr-link: #18311
			change-id: cid-4ec2bfe58bfba413f6d2925f5b3937bd6f5c2eb1
  • Loading branch information
JiamingMai authored Oct 26, 2023
1 parent 53c49f7 commit d0ad98f
Show file tree
Hide file tree
Showing 19 changed files with 316 additions and 31 deletions.
5 changes: 5 additions & 0 deletions cli/src/alluxio.org/cli/cmd/job/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type LoadCommand struct {
partialListing bool
metadataOnly bool
skipIfExists bool
fileFilterRegx string
}

func (c *LoadCommand) Base() *env.BaseJavaCommand {
Expand Down Expand Up @@ -82,6 +83,7 @@ $ ./bin/alluxio job load --path /path --stop`,
cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details")
cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata")
cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files")
cmd.Flags().StringVar(&c.fileFilterRegx, "file-filter-regx", "", "[submit] Skip files that match the regx pattern")
return cmd
}

Expand All @@ -104,5 +106,8 @@ func (c *LoadCommand) Run(_ []string) error {
if c.skipIfExists {
javaArgs = append(javaArgs, "--skip-if-exists")
}
if c.fileFilterRegx != "" {
javaArgs = append(javaArgs, "--file-filter-regx")
}
return c.Base().Run(javaArgs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,8 @@ message LoadJobPOptions {
optional bool partialListing = 3;
optional bool loadMetadataOnly = 4;
optional bool skipIfExists = 5;
optional int32 replicas = 6;
optional string fileFilterRegx = 6;
optional int32 replicas = 7;
}

message CopyJobPOptions {
Expand Down
3 changes: 2 additions & 1 deletion common/transport/src/main/proto/proto/journal/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ message LoadJobEntry {
optional int64 end_time = 8;
optional bool load_metadata_only = 9;
optional bool skip_if_exists = 10;
optional int32 replicas = 11;
optional string file_filter_regx = 11;
optional int32 replicas = 12;
}

// next available id: 13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {
private static final int RETRY_ATTEMPT_THRESHOLD = Configuration.getInt(
PropertyKey.MASTER_DORA_LOAD_JOB_RETRIES);
private final boolean mSkipIfExists;

private final Optional<String> mFileFilterRegx;
private final long mVirtualBlockSize = Configuration.getBytes(
PropertyKey.DORA_READ_VIRTUAL_BLOCK_SIZE);
private Iterator<LoadSubTask> mCurrentSubTaskIterator;
Expand All @@ -135,12 +137,14 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {
* @param verificationEnabled whether to verify the job after loaded
* @param loadMetadataOnly if set to true, only metadata will be loaded without loading
* @param skipIfExists skip if exists
* @param fileFilterRegx the regx pattern string for file filter
* @param ufsStatusIterator ufsStatus iterable
* @param ufs under file system
*/
public DoraLoadJob(String path, Optional<String> user, String jobId, OptionalLong bandwidth,
boolean usePartialListing, boolean verificationEnabled, boolean loadMetadataOnly,
boolean skipIfExists, Iterator<UfsStatus> ufsStatusIterator, UnderFileSystem ufs) {
boolean skipIfExists, Optional<String> fileFilterRegx, Iterator<UfsStatus> ufsStatusIterator,
UnderFileSystem ufs) {
super(user, jobId, new HashBasedWorkerAssignPolicy());
mLoadPath = requireNonNull(path, "path is null");
Preconditions.checkArgument(
Expand All @@ -152,6 +156,7 @@ public DoraLoadJob(String path, Optional<String> user, String jobId, OptionalLon
mUfs = ufs;
mLoadMetadataOnly = loadMetadataOnly;
mSkipIfExists = skipIfExists;
mFileFilterRegx = fileFilterRegx;
mUfsStatusIterator = ufsStatusIterator;
LOG.info("DoraLoadJob for {} created.", path);
}
Expand Down Expand Up @@ -473,6 +478,7 @@ public Journal.JournalEntry toJournalEntry() {
.setVerify(mVerificationEnabled)
.setSkipIfExists(mSkipIfExists)
.setJobId(mJobId);
mFileFilterRegx.ifPresent(jobEntry::setFileFilterRegx);
mUser.ifPresent(jobEntry::setUser);
mBandwidth.ifPresent(jobEntry::setBandwidth);
mEndTime.ifPresent(jobEntry::setEndTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import alluxio.annotation.SuppressFBWarnings;
import alluxio.conf.Configuration;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.master.predicate.FilePredicate;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobFactory;
import alluxio.scheduler.job.JobState;
Expand All @@ -28,6 +29,7 @@

import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Predicate;

/**
* Factory for creating {@link LoadJob}s from journal entries.
Expand All @@ -54,15 +56,28 @@ public Job<?> create() {
String path = mJobEntry.getLoadPath();
UnderFileSystem ufs = mFsMaster.getUfsManager().getOrAdd(new AlluxioURI(path),
() -> UnderFileSystemConfiguration.defaults(Configuration.global()));
Predicate<UfsStatus> predicate = Predicates.alwaysTrue();
Optional<String> fileFilterRegx = Optional.empty();
if (mJobEntry.hasFileFilterRegx()) {
String regxPatternStr = mJobEntry.getFileFilterRegx();
if (regxPatternStr != null && !regxPatternStr.isEmpty()) {
alluxio.proto.journal.Job.FileFilter.Builder builder =
alluxio.proto.journal.Job.FileFilter.newBuilder()
.setName("fileNamePattern").setValue(regxPatternStr);
FilePredicate filePredicate = FilePredicate.create(builder.build());
predicate = filePredicate.getUfsStatusPredicate();
fileFilterRegx = Optional.of(regxPatternStr);
}
}
Iterable<UfsStatus> iterable = new UfsStatusIterable(ufs, path,
Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName),
Predicates.alwaysTrue());
predicate);
Optional<String> user =
mJobEntry.hasUser() ? Optional.of(mJobEntry.getUser()) : Optional.empty();
DoraLoadJob job = new DoraLoadJob(path, user, mJobEntry.getJobId(),
mJobEntry.hasBandwidth() ? OptionalLong.of(mJobEntry.getBandwidth()) : OptionalLong.empty(),
mJobEntry.getPartialListing(), mJobEntry.getVerify(), mJobEntry.getLoadMetadataOnly(),
mJobEntry.getSkipIfExists(), iterable.iterator(), ufs);
mJobEntry.getSkipIfExists(), fileFilterRegx, iterable.iterator(), ufs);
job.setJobState(JobState.fromProto(mJobEntry.getState()), false);
if (mJobEntry.hasEndTime()) {
job.setEndTime(mJobEntry.getEndTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import alluxio.grpc.LoadJobPOptions;
import alluxio.job.LoadJobRequest;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.master.predicate.FilePredicate;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobFactory;
import alluxio.security.User;
Expand All @@ -29,6 +30,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.function.Predicate;

/**
* Factory for creating {@link LoadJob}s that get file infos from master.
Expand Down Expand Up @@ -60,14 +62,28 @@ public Job<?> create() {
.ofNullable(AuthenticatedClientUser.getOrNull())
.map(User::getName);

Predicate<UfsStatus> predicate = Predicates.alwaysTrue();
Optional<String> fileFilterRegx = Optional.empty();
if (options.hasFileFilterRegx()) {
String regxPatternStr = options.getFileFilterRegx();
if (regxPatternStr != null && !regxPatternStr.isEmpty()) {
alluxio.proto.journal.Job.FileFilter.Builder builder =
alluxio.proto.journal.Job.FileFilter.newBuilder()
.setName("fileNamePattern").setValue(regxPatternStr);
FilePredicate filePredicate = FilePredicate.create(builder.build());
predicate = filePredicate.getUfsStatusPredicate();
fileFilterRegx = Optional.of(regxPatternStr);
}
}

UnderFileSystem ufs = mFs.getUfsManager().getOrAdd(new AlluxioURI(path),
() -> UnderFileSystemConfiguration.defaults(Configuration.global()));
Iterable<UfsStatus> iterable = new UfsStatusIterable(ufs, path,
Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName),
Predicates.alwaysTrue());
predicate);
return new DoraLoadJob(path, user, UUID.randomUUID().toString(), bandwidth, partialListing,
verificationEnabled, options.getLoadMetadataOnly(), options.getSkipIfExists(),
iterable.iterator(), ufs);
fileFilterRegx, iterable.iterator(), ufs);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.exception.runtime.InvalidArgumentRuntimeException;
import alluxio.master.predicate.interval.Interval;
import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import org.slf4j.Logger;
Expand Down Expand Up @@ -141,4 +142,10 @@ public Predicate<FileInfo> get() {
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
throw new UnsupportedOperationException(
"getUfsStatusPredicate() is unsupported in DatePredicate");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.predicate;

import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Predicate;
import java.util.regex.Pattern;

/**
* A predicate related to date of the file.
*/
public class FileNamePatternPredicate implements FilePredicate {
private static final Logger LOG = LoggerFactory.getLogger(FileNamePatternPredicate.class);
private final String mFilterName;
private String mRegexPatternStr;

/**
* Factory for modification time predicate.
*/
public static class FileNamePatternFactory extends Factory {
@Override
public String getFilterName() {
return "fileNamePattern";
}
}

/**
* Factory for creating instances.
*/
public abstract static class Factory implements FilePredicateFactory {
/**
* @return filter name for the predicate
*/
public abstract String getFilterName();

/**
* Creates a {@link FilePredicate} from the string value.
*
* @param regexPatternStr the regex pattern string from the filter
* @return the created predicate
*/
public FilePredicate createFileNamePatternPredicate(String regexPatternStr) {
return new FileNamePatternPredicate(getFilterName(), regexPatternStr);
}

@Override
public FilePredicate create(FileFilter filter) {
try {
if (filter.hasName() && filter.getName().equals(getFilterName())) {
if (filter.hasValue()) {
return createFileNamePatternPredicate(filter.getValue());
}
}
} catch (Exception e) {
// fall through
}
return null;
}
}

/**
* Creates an instance.
*
* @param filterName the filter name
* @param regexPatternStr the regex pattern string for file filtering
*/
public FileNamePatternPredicate(String filterName, String regexPatternStr) {
mFilterName = filterName;
mRegexPatternStr = regexPatternStr;
}

@Override
public Predicate<FileInfo> get() {
return FileInfo -> {
try {
String fileName = FileInfo.getName();
return Pattern.matches(mRegexPatternStr, fileName);
} catch (RuntimeException e) {
LOG.debug("Failed to filter: ", e);
return false;
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
return UfsStatus -> {
try {
String fileName = getFileName(UfsStatus);
return Pattern.matches(mRegexPatternStr, fileName);
} catch (RuntimeException e) {
LOG.debug("Failed to filter: ", e);
return false;
}
};
}

private String getFileName(UfsStatus ufsStatus) {
String name = ufsStatus.getName();
int index = name.lastIndexOf("/");
if (index == -1) {
return name;
}
name = name.substring(index + 1);
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.master.predicate;

import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -49,6 +50,12 @@ public List<FilePredicateFactory> load(ClassLoader key) throws Exception {
*/
Predicate<FileInfo> get();

/**
* Get the predicate function from the file predicate.
* @return the predicate function
*/
Predicate<UfsStatus> getUfsStatusPredicate();

/**
* Creates a file predicate from a file filter.
* If the filter name is invalid, it will throw exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.master.predicate.interval.Interval;
import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.util.FormatUtils;
import alluxio.wire.FileInfo;

Expand Down Expand Up @@ -200,4 +201,10 @@ public Predicate<FileInfo> get() {
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
throw new UnsupportedOperationException(
"getUfsStatusPredicate() is unsupported in TimePredicate");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the NOTICE file distributed with this work for information regarding copyright ownership.
#

alluxio.master.predicate.FileNamePatternPredicate$FileNamePatternFactory
alluxio.master.predicate.TimePredicate$DateFromFileNameOlderThanFactory
alluxio.master.predicate.TimePredicate$UnmodifiedForFactory
alluxio.master.predicate.DatePredicate$LastModifiedDateFactory
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testGetNextTaskWithVirtualBlocks() throws IOException {
Predicates.alwaysTrue()).iterator();
DoraLoadJob loadJob =
new DoraLoadJob(mLocalUfsRoot, Optional.of("user"), "1", OptionalLong.empty(), false, true,
false, false, iterator, mLocalUfs);
false, false, Optional.empty(), iterator, mLocalUfs);
Collection<WorkerInfo> workers = ImmutableList.of(
new WorkerInfo().setId(1).setAddress(
new WorkerNetAddress().setHost("worker1").setRpcPort(1234)));
Expand Down
Loading

0 comments on commit d0ad98f

Please sign in to comment.