Skip to content

Commit

Permalink
Refactor PagedDoraWorker by injecting MetaManager and UfsManager
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

1. If an object is created inside `PagedDoraWorker` constructor, extract that creation to before the constructor and use dependency injection to inject it to the worker object. This doesn't change any creation logic, just a refactor to better adapt to dependency injection flavor.
2. There is a circular dependency between `MetaManager` and `PagedDoraWorker`. This change removes that cycle. Now we create one, then create the other. Before, we create one and in the construction, we let `this` ref escape and create the other. Some methods are either moved or changed to `static`.
3. By adapting to dependency injection, we rely on `UfsManager` interface instead of `DoraUfsManager` implementation. Some method signatures are extracted to the interface level.
4. A few other small refactors to get rid of some downcasts and variable scope changes. Reasons are attached in comments on this PR.

### Why are the changes needed?

Improve code quality and extensibility.

### Does this PR introduce any user facing changes?

No. All refactor changes are small and equivalent to existing code. So nothing should break.

			pr-link: #18181
			change-id: cid-4f9e9bc770b12253188bb541dd456ef3cd889c2b
  • Loading branch information
jiacheliu3 authored Nov 1, 2023
1 parent ccf20e4 commit 0e83207
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 135 deletions.
20 changes: 20 additions & 0 deletions dora/core/common/src/main/java/alluxio/underfs/UfsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand Down Expand Up @@ -89,6 +90,17 @@ public AlluxioURI getUfsMountPointUri() {
}
}

/**
* Return a UFS instance if it already exists in the cache, otherwise, creates a new instance and
* return it.
*
* @param ufsUri the UFS path
* @param ufsConfSupplier supplier for UFS configuration
* @return the UFS instance
*/
UnderFileSystem getOrAdd(AlluxioURI ufsUri,
Supplier<UnderFileSystemConfiguration> ufsConfSupplier);

/**
* Keeps track of a mount id and maps it to its URI in Alluxio and configuration. This is an
* Alluxio-only operation and no interaction to UFS will be made.
Expand Down Expand Up @@ -131,6 +143,14 @@ void addMountWithRecorder(long mountId, AlluxioURI ufsUri,
*/
UfsClient get(long mountId) throws NotFoundException, UnavailableException;

/**
* Gets an instance for the given UFS URI and configuration, if such exists.
*
* @param ufsUri the URI of the UFS
* @return a UFS instance, or none if not registered
*/
Optional<UnderFileSystem> get(AlluxioURI ufsUri);

/**
* @return the UFS client associated with root
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,7 @@ protected AbstractUfsManager() {
mCloser = Closer.create();
}

/**
* Return a UFS instance if it already exists in the cache, otherwise, creates a new instance and
* return it.
*
* @param ufsUri the UFS path
* @param ufsConfSupplier supplier for UFS configuration
* @return the UFS instance
*/
@Override
public UnderFileSystem getOrAdd(AlluxioURI ufsUri,
Supplier<UnderFileSystemConfiguration> ufsConfSupplier) {
Key key = generateKey(ufsUri);
Expand Down Expand Up @@ -227,12 +220,7 @@ public void removeMount(long mountId) {
mMountIdToUfsInfoMap.remove(mountId);
}

/**
* Gets an instance for the given UFS URI and configuration, if such exists.
*
* @param ufsUri the URI of the UFS
* @return a UFS instance, or none if not registered
*/
@Override
public Optional<UnderFileSystem> get(AlluxioURI ufsUri) {
Key key = generateKey(ufsUri);
return get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import alluxio.underfs.UfsManager;
import alluxio.util.io.FileUtils;
import alluxio.util.io.PathUtils;
import alluxio.worker.dora.DoraWorker;
import alluxio.worker.dora.PagedDoraWorker;
import alluxio.worker.grpc.DoraWorkerClientServiceHandler;
import alluxio.worker.grpc.GrpcDataServer;

Expand Down Expand Up @@ -59,9 +59,9 @@ protected DataServerFactory(UfsManager ufsManager,
*/
public DataServer createRemoteGrpcDataServer(DataWorker dataWorker) {
BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService;
if (dataWorker instanceof DoraWorker) {
if (dataWorker instanceof PagedDoraWorker) {
blockWorkerService =
new DoraWorkerClientServiceHandler((DoraWorker) dataWorker);
new DoraWorkerClientServiceHandler((PagedDoraWorker) dataWorker);
} else {
throw new UnsupportedOperationException(dataWorker.getClass().getCanonicalName()
+ " is no longer supported in Alluxio 3.x");
Expand All @@ -83,9 +83,9 @@ public DataServer createDomainSocketDataServer(DataWorker worker) {
}
LOG.info("Domain socket data server is enabled at {}.", domainSocketPath);
BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService;
if (worker instanceof DoraWorker) {
if (worker instanceof PagedDoraWorker) {
blockWorkerService =
new DoraWorkerClientServiceHandler((DoraWorker) worker);
new DoraWorkerClientServiceHandler((PagedDoraWorker) worker);
} else {
throw new UnsupportedOperationException(worker.getClass().getCanonicalName()
+ " is no longer supported in Alluxio 3.x");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import alluxio.proto.meta.DoraMeta;
import alluxio.proto.meta.DoraMeta.FileStatus;
import alluxio.underfs.Fingerprint;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UfsStatus;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
Expand All @@ -33,6 +34,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,8 +56,7 @@ public class DoraMetaManager implements Closeable {
private final AlluxioConfiguration mConf;
private final DoraMetaStore mMetaStore;
private final CacheManager mCacheManager;
private final PagedDoraWorker mDoraWorker;
protected final DoraUfsManager mUfsManager;
protected final UfsManager mUfsManager;

private static final Logger SAMPLING_LOG = new SamplingLogger(
LoggerFactory.getLogger(DoraMetaManager.class), 1L * Constants.MINUTE_MS);
Expand All @@ -77,20 +78,19 @@ public class DoraMetaManager implements Closeable {
/**
* Creates a dora meta manager.
* @param conf configuration
* @param doraWorker the dora worker instance
* @param cacheManger the cache manager to manage the page cache
* @param ufsManager the ufs Manager
*/
@Inject
public DoraMetaManager(AlluxioConfiguration conf,
PagedDoraWorker doraWorker, CacheManager cacheManger,
DoraUfsManager ufsManager) {
CacheManager cacheManger,
UfsManager ufsManager) {
mConf = conf;
String dbDir = mConf.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR);
Duration duration = mConf.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL);
long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds();
mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl);
mCacheManager = cacheManger;
mDoraWorker = doraWorker;
mUfsManager = ufsManager;
}

Expand Down Expand Up @@ -120,7 +120,8 @@ public Optional<FileStatus> getFromUfs(String path) throws IOException {
if (mXAttrWriteToUFSEnabled) {
xattrMap = ufs.getAttributes(path);
}
DoraMeta.FileStatus fs = mDoraWorker.buildFileStatusFromUfsStatus(status, path, xattrMap);
DoraMeta.FileStatus fs = PagedDoraWorker.buildFileStatusFromUfsStatus(
mCacheManager.getUsage(), ufs.getUnderFSType(), status, path, xattrMap);
return Optional.ofNullable(fs);
} catch (FileNotFoundException e) {
return Optional.empty();
Expand Down
Loading

0 comments on commit 0e83207

Please sign in to comment.