Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PagedDoraWorker by injecting MetaManager and UfsManager #18181

Merged
merged 13 commits into from
Nov 1, 2023
14 changes: 14 additions & 0 deletions dora/core/common/src/main/java/alluxio/underfs/UfsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand Down Expand Up @@ -89,6 +90,17 @@ public AlluxioURI getUfsMountPointUri() {
}
}

/**
* Return a UFS instance if it already exists in the cache, otherwise, creates a new instance and
* return it.
*
* @param ufsUri the UFS path
* @param ufsConfSupplier supplier for UFS configuration
* @return the UFS instance
*/
UnderFileSystem getOrAdd(AlluxioURI ufsUri,
Supplier<UnderFileSystemConfiguration> ufsConfSupplier);

/**
* Keeps track of a mount id and maps it to its URI in Alluxio and configuration. This is an
* Alluxio-only operation and no interaction to UFS will be made.
Expand Down Expand Up @@ -131,6 +143,8 @@ void addMountWithRecorder(long mountId, AlluxioURI ufsUri,
*/
UfsClient get(long mountId) throws NotFoundException, UnavailableException;

Optional<UnderFileSystem> get(AlluxioURI ufsUri);

/**
* @return the UFS client associated with root
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,7 @@ protected AbstractUfsManager() {
mCloser = Closer.create();
}

/**
* Return a UFS instance if it already exists in the cache, otherwise, creates a new instance and
* return it.
*
* @param ufsUri the UFS path
* @param ufsConfSupplier supplier for UFS configuration
* @return the UFS instance
*/
@Override
public UnderFileSystem getOrAdd(AlluxioURI ufsUri,
Supplier<UnderFileSystemConfiguration> ufsConfSupplier) {
Key key = generateKey(ufsUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import alluxio.util.io.FileUtils;
import alluxio.util.io.PathUtils;
import alluxio.worker.dora.DoraWorker;
import alluxio.worker.dora.PagedDoraWorker;
import alluxio.worker.grpc.DoraWorkerClientServiceHandler;
import alluxio.worker.grpc.GrpcDataServer;

Expand Down Expand Up @@ -59,9 +60,9 @@ protected DataServerFactory(UfsManager ufsManager,
*/
public DataServer createRemoteGrpcDataServer(DataWorker dataWorker) {
BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService;
if (dataWorker instanceof DoraWorker) {
if (dataWorker instanceof PagedDoraWorker) {
blockWorkerService =
new DoraWorkerClientServiceHandler((DoraWorker) dataWorker);
new DoraWorkerClientServiceHandler((PagedDoraWorker) dataWorker);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoraWorker interface is missing a lot of method definitions so we need to downcast in DoraWorkerClientServiceHandler after all. So I'm just casting to the real class to avoid the frequent casting.

} else {
throw new UnsupportedOperationException(dataWorker.getClass().getCanonicalName()
+ " is no longer supported in Alluxio 3.x");
Expand All @@ -83,9 +84,9 @@ public DataServer createDomainSocketDataServer(DataWorker worker) {
}
LOG.info("Domain socket data server is enabled at {}.", domainSocketPath);
BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService;
if (worker instanceof DoraWorker) {
if (worker instanceof PagedDoraWorker) {
blockWorkerService =
new DoraWorkerClientServiceHandler((DoraWorker) worker);
new DoraWorkerClientServiceHandler((PagedDoraWorker) worker);
} else {
throw new UnsupportedOperationException(worker.getClass().getCanonicalName()
+ " is no longer supported in Alluxio 3.x");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import alluxio.proto.meta.DoraMeta;
import alluxio.proto.meta.DoraMeta.FileStatus;
import alluxio.underfs.Fingerprint;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UfsStatus;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
Expand All @@ -36,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -54,8 +57,7 @@ public class DoraMetaManager implements Closeable {
private final AlluxioConfiguration mConf;
private final DoraMetaStore mMetaStore;
private final CacheManager mCacheManager;
private final PagedDoraWorker mDoraWorker;
protected final DoraUfsManager mUfsManager;
protected final UfsManager mUfsManager;

private static final Logger SAMPLING_LOG = new SamplingLogger(
LoggerFactory.getLogger(DoraMetaManager.class), 1L * Constants.MINUTE_MS);
Expand All @@ -77,28 +79,27 @@ public class DoraMetaManager implements Closeable {
/**
* Creates a dora meta manager.
* @param conf configuration
* @param doraWorker the dora worker instance
* @param cacheManger the cache manager to manage the page cache
* @param ufsManager the ufs Manager
*/
@Inject
public DoraMetaManager(AlluxioConfiguration conf,
PagedDoraWorker doraWorker, CacheManager cacheManger,
DoraUfsManager ufsManager) {
CacheManager cacheManger,
UfsManager ufsManager) {
mConf = conf;
String dbDir = mConf.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR);
Duration duration = mConf.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL);
long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds();
mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl);
mCacheManager = cacheManger;
mDoraWorker = doraWorker;
mUfsManager = ufsManager;
}

protected UnderFileSystem getUfsInstance(String ufsUriStr) {
AlluxioURI ufsUriUri = new AlluxioURI(ufsUriStr);
try {
UnderFileSystem ufs = mUfsManager.getOrAdd(ufsUriUri,
() -> UnderFileSystemConfiguration.defaults(mConf));
() -> UnderFileSystemConfiguration.defaults(mConf));
return ufs;
} catch (Exception e) {
LOG.debug("failed to get UFS instance for URI {}", ufsUriStr, e);
Expand All @@ -120,7 +121,8 @@ public Optional<FileStatus> getFromUfs(String path) throws IOException {
if (mXAttrWriteToUFSEnabled) {
xattrMap = ufs.getAttributes(path);
}
DoraMeta.FileStatus fs = mDoraWorker.buildFileStatusFromUfsStatus(status, path, xattrMap);
DoraMeta.FileStatus fs = PagedDoraWorker.buildFileStatusFromUfsStatus(
mCacheManager.getUsage(), ufs.getUnderFSType(), status, path, xattrMap);
return Optional.ofNullable(fs);
} catch (FileNotFoundException e) {
return Optional.empty();
Expand Down
Loading
Loading