Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add regx pattern file filter for distributed load #18311

Merged
merged 5 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cli/src/alluxio.org/cli/cmd/job/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type LoadCommand struct {
partialListing bool
metadataOnly bool
skipIfExists bool
fileFilterRegx string
}

func (c *LoadCommand) Base() *env.BaseJavaCommand {
Expand Down Expand Up @@ -82,6 +83,7 @@ $ ./bin/alluxio job load --path /path --stop`,
cmd.Flags().BoolVar(&c.partialListing, "partial-listing", false, "[submit] Use partial directory listing, initializing load before reading the entire directory but cannot report on certain progress details")
cmd.Flags().BoolVar(&c.metadataOnly, "metadata-only", false, "[submit] Only load file metadata")
cmd.Flags().BoolVar(&c.skipIfExists, "skip-if-exists", false, "[submit] Skip existing fullly cached files")
cmd.Flags().StringVar(&c.fileFilterRegx, "file-filter-regx", "", "[submit] Skip files that match the regx pattern")
return cmd
}

Expand All @@ -104,5 +106,8 @@ func (c *LoadCommand) Run(_ []string) error {
if c.skipIfExists {
javaArgs = append(javaArgs, "--skip-if-exists")
}
if c.fileFilterRegx != "" {
javaArgs = append(javaArgs, "--file-filter-regx")
}
return c.Base().Run(javaArgs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ message LoadJobPOptions {
optional bool partialListing = 3;
optional bool loadMetadataOnly = 4;
optional bool skipIfExists = 5;
optional string fileFilterRegx = 6;
}

message CopyJobPOptions {
Expand Down
1 change: 1 addition & 0 deletions common/transport/src/main/proto/proto/journal/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message LoadJobEntry {
optional int64 end_time = 8;
optional bool load_metadata_only = 9;
optional bool skip_if_exists = 10;
optional string file_filter_regx = 11;
}

// next available id: 13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {
private static final int RETRY_ATTEMPT_THRESHOLD = Configuration.getInt(
PropertyKey.MASTER_DORA_LOAD_JOB_RETRIES);
private final boolean mSkipIfExists;

private final Optional<String> mFileFilterRegx;
private final long mVirtualBlockSize = Configuration.getBytes(
PropertyKey.DORA_READ_VIRTUAL_BLOCK_SIZE);
private Iterator<LoadSubTask> mCurrentSubTaskIterator;
Expand All @@ -135,12 +137,14 @@ public class DoraLoadJob extends AbstractJob<DoraLoadJob.DoraLoadTask> {
* @param verificationEnabled whether to verify the job after loaded
* @param loadMetadataOnly if set to true, only metadata will be loaded without loading
* @param skipIfExists skip if exists
* @param fileFilterRegx the regx pattern string for file filter
* @param ufsStatusIterator ufsStatus iterable
* @param ufs under file system
*/
public DoraLoadJob(String path, Optional<String> user, String jobId, OptionalLong bandwidth,
boolean usePartialListing, boolean verificationEnabled, boolean loadMetadataOnly,
boolean skipIfExists, Iterator<UfsStatus> ufsStatusIterator, UnderFileSystem ufs) {
boolean skipIfExists, Optional<String> fileFilterRegx, Iterator<UfsStatus> ufsStatusIterator,
UnderFileSystem ufs) {
super(user, jobId, new HashBasedWorkerAssignPolicy());
mLoadPath = requireNonNull(path, "path is null");
Preconditions.checkArgument(
Expand All @@ -152,6 +156,7 @@ public DoraLoadJob(String path, Optional<String> user, String jobId, OptionalLon
mUfs = ufs;
mLoadMetadataOnly = loadMetadataOnly;
mSkipIfExists = skipIfExists;
mFileFilterRegx = fileFilterRegx;
mUfsStatusIterator = ufsStatusIterator;
LOG.info("DoraLoadJob for {} created.", path);
}
Expand Down Expand Up @@ -473,6 +478,7 @@ public Journal.JournalEntry toJournalEntry() {
.setVerify(mVerificationEnabled)
.setSkipIfExists(mSkipIfExists)
.setJobId(mJobId);
mFileFilterRegx.ifPresent(jobEntry::setFileFilterRegx);
mUser.ifPresent(jobEntry::setUser);
mBandwidth.ifPresent(jobEntry::setBandwidth);
mEndTime.ifPresent(jobEntry::setEndTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import alluxio.annotation.SuppressFBWarnings;
import alluxio.conf.Configuration;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.master.predicate.FilePredicate;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobFactory;
import alluxio.scheduler.job.JobState;
Expand All @@ -28,6 +29,7 @@

import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Predicate;

/**
* Factory for creating {@link LoadJob}s from journal entries.
Expand All @@ -54,15 +56,28 @@ public Job<?> create() {
String path = mJobEntry.getLoadPath();
UnderFileSystem ufs = mFsMaster.getUfsManager().getOrAdd(new AlluxioURI(path),
() -> UnderFileSystemConfiguration.defaults(Configuration.global()));
Predicate<UfsStatus> predicate = Predicates.alwaysTrue();
Optional<String> fileFilterRegx = Optional.empty();
if (mJobEntry.hasFileFilterRegx()) {
String regxPatternStr = mJobEntry.getFileFilterRegx();
if (regxPatternStr != null && !regxPatternStr.isEmpty()) {
alluxio.proto.journal.Job.FileFilter.Builder builder =
alluxio.proto.journal.Job.FileFilter.newBuilder()
.setName("fileNamePattern").setValue(regxPatternStr);
FilePredicate filePredicate = FilePredicate.create(builder.build());
predicate = filePredicate.getUfsStatusPredicate();
fileFilterRegx = Optional.of(regxPatternStr);
}
}
Iterable<UfsStatus> iterable = new UfsStatusIterable(ufs, path,
Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName),
Predicates.alwaysTrue());
predicate);
Optional<String> user =
mJobEntry.hasUser() ? Optional.of(mJobEntry.getUser()) : Optional.empty();
DoraLoadJob job = new DoraLoadJob(path, user, mJobEntry.getJobId(),
mJobEntry.hasBandwidth() ? OptionalLong.of(mJobEntry.getBandwidth()) : OptionalLong.empty(),
mJobEntry.getPartialListing(), mJobEntry.getVerify(), mJobEntry.getLoadMetadataOnly(),
mJobEntry.getSkipIfExists(), iterable.iterator(), ufs);
mJobEntry.getSkipIfExists(), fileFilterRegx, iterable.iterator(), ufs);
job.setJobState(JobState.fromProto(mJobEntry.getState()), false);
if (mJobEntry.hasEndTime()) {
job.setEndTime(mJobEntry.getEndTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import alluxio.grpc.LoadJobPOptions;
import alluxio.job.LoadJobRequest;
import alluxio.master.file.DefaultFileSystemMaster;
import alluxio.master.predicate.FilePredicate;
import alluxio.scheduler.job.Job;
import alluxio.scheduler.job.JobFactory;
import alluxio.security.User;
Expand All @@ -29,6 +30,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.function.Predicate;

/**
* Factory for creating {@link LoadJob}s that get file infos from master.
Expand Down Expand Up @@ -60,14 +62,28 @@ public Job<?> create() {
.ofNullable(AuthenticatedClientUser.getOrNull())
.map(User::getName);

Predicate<UfsStatus> predicate = Predicates.alwaysTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These logic should also appear at JournaledLoadJobFactory so we can recover from journal entry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I put these logic into JournaledLoadJobFactory.

Optional<String> fileFilterRegx = Optional.empty();
if (options.hasFileFilterRegx()) {
String regxPatternStr = options.getFileFilterRegx();
if (regxPatternStr != null && !regxPatternStr.isEmpty()) {
alluxio.proto.journal.Job.FileFilter.Builder builder =
alluxio.proto.journal.Job.FileFilter.newBuilder()
.setName("fileNamePattern").setValue(regxPatternStr);
FilePredicate filePredicate = FilePredicate.create(builder.build());
predicate = filePredicate.getUfsStatusPredicate();
fileFilterRegx = Optional.of(regxPatternStr);
}
}

UnderFileSystem ufs = mFs.getUfsManager().getOrAdd(new AlluxioURI(path),
() -> UnderFileSystemConfiguration.defaults(Configuration.global()));
Iterable<UfsStatus> iterable = new UfsStatusIterable(ufs, path,
Optional.ofNullable(AuthenticatedClientUser.getOrNull()).map(User::getName),
Predicates.alwaysTrue());
predicate);
return new DoraLoadJob(path, user, UUID.randomUUID().toString(), bandwidth, partialListing,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also want to put this filter information into DoraLoadJob so we can turn this job into a journal entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

verificationEnabled, options.getLoadMetadataOnly(), options.getSkipIfExists(),
iterable.iterator(), ufs);
fileFilterRegx, iterable.iterator(), ufs);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.exception.runtime.InvalidArgumentRuntimeException;
import alluxio.master.predicate.interval.Interval;
import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import org.slf4j.Logger;
Expand Down Expand Up @@ -141,4 +142,10 @@ public Predicate<FileInfo> get() {
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
throw new UnsupportedOperationException(
"getUfsStatusPredicate() is unsupported in DatePredicate");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.master.predicate;

import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Predicate;
import java.util.regex.Pattern;

/**
* A predicate related to date of the file.
*/
public class FileNamePatternPredicate implements FilePredicate {
private static final Logger LOG = LoggerFactory.getLogger(FileNamePatternPredicate.class);
private final String mFilterName;
private String mRegexPatternStr;

/**
* Factory for modification time predicate.
*/
public static class FileNamePatternFactory extends Factory {
@Override
public String getFilterName() {
return "fileNamePattern";
}
}

/**
* Factory for creating instances.
*/
public abstract static class Factory implements FilePredicateFactory {
/**
* @return filter name for the predicate
*/
public abstract String getFilterName();

/**
* Creates a {@link FilePredicate} from the string value.
*
* @param regexPatternStr the regex pattern string from the filter
* @return the created predicate
*/
public FilePredicate createFileNamePatternPredicate(String regexPatternStr) {
return new FileNamePatternPredicate(getFilterName(), regexPatternStr);
}

@Override
public FilePredicate create(FileFilter filter) {
try {
if (filter.hasName() && filter.getName().equals(getFilterName())) {
if (filter.hasValue()) {
return createFileNamePatternPredicate(filter.getValue());
}
}
} catch (Exception e) {
// fall through
}
return null;
}
}

/**
* Creates an instance.
*
* @param filterName the filter name
* @param regexPatternStr the regex pattern string for file filtering
*/
public FileNamePatternPredicate(String filterName, String regexPatternStr) {
mFilterName = filterName;
mRegexPatternStr = regexPatternStr;
}

@Override
public Predicate<FileInfo> get() {
return FileInfo -> {
try {
String fileName = FileInfo.getName();
return Pattern.matches(mRegexPatternStr, fileName);
} catch (RuntimeException e) {
LOG.debug("Failed to filter: ", e);
return false;
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
return UfsStatus -> {
try {
String fileName = getFileName(UfsStatus);
return Pattern.matches(mRegexPatternStr, fileName);
} catch (RuntimeException e) {
LOG.debug("Failed to filter: ", e);
return false;
}
};
}

private String getFileName(UfsStatus ufsStatus) {
String name = ufsStatus.getName();
int index = name.lastIndexOf("/");
if (index == -1) {
return name;
}
name = name.substring(index + 1);
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.master.predicate;

import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.wire.FileInfo;

import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -49,6 +50,12 @@ public List<FilePredicateFactory> load(ClassLoader key) throws Exception {
*/
Predicate<FileInfo> get();

/**
* Get the predicate function from the file predicate.
* @return the predicate function
*/
Predicate<UfsStatus> getUfsStatusPredicate();

/**
* Creates a file predicate from a file filter.
* If the filter name is invalid, it will throw exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.master.predicate.interval.Interval;
import alluxio.proto.journal.Job.FileFilter;
import alluxio.underfs.UfsStatus;
import alluxio.util.FormatUtils;
import alluxio.wire.FileInfo;

Expand Down Expand Up @@ -200,4 +201,10 @@ public Predicate<FileInfo> get() {
}
};
}

@Override
public Predicate<UfsStatus> getUfsStatusPredicate() {
throw new UnsupportedOperationException(
"getUfsStatusPredicate() is unsupported in TimePredicate");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# See the NOTICE file distributed with this work for information regarding copyright ownership.
#

alluxio.master.predicate.FileNamePatternPredicate$FileNamePatternFactory
alluxio.master.predicate.TimePredicate$DateFromFileNameOlderThanFactory
alluxio.master.predicate.TimePredicate$UnmodifiedForFactory
alluxio.master.predicate.DatePredicate$LastModifiedDateFactory
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testGetNextTaskWithVirtualBlocks() throws IOException {
Predicates.alwaysTrue()).iterator();
DoraLoadJob loadJob =
new DoraLoadJob(mLocalUfsRoot, Optional.of("user"), "1", OptionalLong.empty(), false, true,
false, false, iterator, mLocalUfs);
false, false, Optional.empty(), iterator, mLocalUfs);
Collection<WorkerInfo> workers = ImmutableList.of(
new WorkerInfo().setId(1).setAddress(
new WorkerNetAddress().setHost("worker1").setRpcPort(1234)));
Expand Down
Loading