Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PagedDoraWorker by injecting MetaManager and UfsManager #18181

Merged
merged 13 commits into from
Nov 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import alluxio.util.io.FileUtils;
import alluxio.util.io.PathUtils;
import alluxio.worker.dora.DoraWorker;
import alluxio.worker.dora.PagedDoraWorker;
import alluxio.worker.grpc.DoraWorkerClientServiceHandler;
import alluxio.worker.grpc.GrpcDataServer;

Expand Down Expand Up @@ -59,9 +60,9 @@ protected DataServerFactory(UfsManager ufsManager,
*/
public DataServer createRemoteGrpcDataServer(DataWorker dataWorker) {
BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService;
if (dataWorker instanceof DoraWorker) {
if (dataWorker instanceof PagedDoraWorker) {
blockWorkerService =
new DoraWorkerClientServiceHandler((DoraWorker) dataWorker);
new DoraWorkerClientServiceHandler((PagedDoraWorker) dataWorker);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoraWorker interface is missing a lot of method definitions so we need to downcast in DoraWorkerClientServiceHandler after all. So I'm just casting to the real class to avoid the frequent casting.

} else {
throw new UnsupportedOperationException(dataWorker.getClass().getCanonicalName()
+ " is no longer supported in Alluxio 3.x");
Expand All @@ -83,9 +84,9 @@ public DataServer createDomainSocketDataServer(DataWorker worker) {
}
LOG.info("Domain socket data server is enabled at {}.", domainSocketPath);
BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService;
if (worker instanceof DoraWorker) {
if (worker instanceof PagedDoraWorker) {
blockWorkerService =
new DoraWorkerClientServiceHandler((DoraWorker) worker);
new DoraWorkerClientServiceHandler((PagedDoraWorker) worker);
} else {
throw new UnsupportedOperationException(worker.getClass().getCanonicalName()
+ " is no longer supported in Alluxio 3.x");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -54,7 +56,6 @@ public class DoraMetaManager implements Closeable {
private final AlluxioConfiguration mConf;
private final DoraMetaStore mMetaStore;
private final CacheManager mCacheManager;
private final PagedDoraWorker mDoraWorker;
protected final DoraUfsManager mUfsManager;

private static final Logger SAMPLING_LOG = new SamplingLogger(
Expand All @@ -77,28 +78,27 @@ public class DoraMetaManager implements Closeable {
/**
* Creates a dora meta manager.
* @param conf configuration
* @param doraWorker the dora worker instance
* @param cacheManger the cache manager to manage the page cache
* @param ufsManager the ufs Manager
*/
@Inject
public DoraMetaManager(AlluxioConfiguration conf,
PagedDoraWorker doraWorker, CacheManager cacheManger,
CacheManager cacheManger,
DoraUfsManager ufsManager) {
mConf = conf;
String dbDir = mConf.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR);
Duration duration = mConf.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL);
long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds();
mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl);
mCacheManager = cacheManger;
mDoraWorker = doraWorker;
mUfsManager = ufsManager;
}

protected UnderFileSystem getUfsInstance(String ufsUriStr) {
AlluxioURI ufsUriUri = new AlluxioURI(ufsUriStr);
try {
UnderFileSystem ufs = mUfsManager.getOrAdd(ufsUriUri,
() -> UnderFileSystemConfiguration.defaults(mConf));
() -> UnderFileSystemConfiguration.defaults(mConf));
return ufs;
} catch (Exception e) {
LOG.debug("failed to get UFS instance for URI {}", ufsUriStr, e);
Expand All @@ -120,7 +120,8 @@ public Optional<FileStatus> getFromUfs(String path) throws IOException {
if (mXAttrWriteToUFSEnabled) {
xattrMap = ufs.getAttributes(path);
}
DoraMeta.FileStatus fs = mDoraWorker.buildFileStatusFromUfsStatus(status, path, xattrMap);
DoraMeta.FileStatus fs = PagedDoraWorker.buildFileStatusFromUfsStatus(
mCacheManager.getUsage(), ufs.getUnderFSType(), status, path, xattrMap);
return Optional.ofNullable(fs);
} catch (FileNotFoundException e) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,18 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker {
// for now Dora Worker does not support Alluxio <-> UFS mapping,
// and assumes all UFS paths belong to the same UFS.
private static final int MOUNT_POINT = 1;
private final Closer mResourceCloser = Closer.create();
protected final Closer mResourceCloser = Closer.create();
jiacheliu3 marked this conversation as resolved.
Show resolved Hide resolved
// TODO(lucy) change to string typed once membership manager got enabled by default
private final AtomicReference<WorkerIdentity> mWorkerId;
protected final CacheManager mCacheManager;
protected final DoraUfsManager mUfsManager;
private final DoraMetaManager mMetaManager;
protected DoraMetaManager mMetaManager;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's necessary to change the scope here so a child class can see this field

private final MembershipManager mMembershipManager;
private final UfsInputStreamCache mUfsStreamCache;
private final long mPageSize;
protected final AlluxioConfiguration mConf;
private final BlockMasterClientPool mBlockMasterClientPool;
private FileSystemContext mFsContext;
protected FileSystemContext mFsContext;
private MkdirsOptions mMkdirsRecursive;
private MkdirsOptions mMkdirsNonRecursive;

Expand Down Expand Up @@ -188,23 +188,15 @@ public PagedDoraWorker(
AlluxioConfiguration conf,
CacheManager cacheManager,
MembershipManager membershipManager,
BlockMasterClientPool blockMasterClientPool
) {
this(workerId, conf, cacheManager, membershipManager, blockMasterClientPool,
FileSystemContext.create(conf));
}

protected PagedDoraWorker(
AtomicReference<WorkerIdentity> workerId,
AlluxioConfiguration conf,
CacheManager cacheManager,
MembershipManager membershipManager,
BlockMasterClientPool blockMasterClientPool,
FileSystemContext fileSystemContext) {
DoraUfsManager ufsManager,
DoraMetaManager metaManager,
FileSystemContext fileSystemContext
) {
super(ExecutorServiceFactories.fixedThreadPool("dora-worker-executor", 5));
mWorkerId = workerId;
mConf = conf;
mUfsManager = mResourceCloser.register(new DoraUfsManager());
mUfsManager = mResourceCloser.register(ufsManager);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just creating the UfsManager outside this constructor and passing it in. The code logic does not change at all so this refactor should be effectively minor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guice advises against injecting closeable resources especially when it's a singleton, as it's hard to reason about who is responsible to close the resource. https://github.com/google/guice/wiki/Avoid-Injecting-Closable-Resources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea for best coding style we probably should refactor this a bit (together with our next big modularization efforts).
For the current code, PagedDoraWorker.close() is the end of worker process and I figure it's fine we let the closer close that singleton.

String rootUFS = mConf.getString(PropertyKey.DORA_CLIENT_UFS_ROOT);
mUfsManager.getOrAdd(new AlluxioURI(rootUFS),
() -> UnderFileSystemConfiguration.defaults(mConf));
Expand All @@ -213,8 +205,7 @@ protected PagedDoraWorker(
mPageSize = mConf.getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE);
mBlockMasterClientPool = blockMasterClientPool;
mCacheManager = cacheManager;
mMetaManager = mResourceCloser.register(
new DoraMetaManager(mConf, this, mCacheManager, mUfsManager));
mMetaManager = mResourceCloser.register(metaManager);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, I don't have something like bind(MetaManager).to(DoraMetaManager.class) in DoraWorkerModule and Guice is actually able to notice that and create the MetaManager instance for me. Is that okay? I figure we should always call bind() explicitly in code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoraMetaManager is not an interface but a concrete class. There is no MetaManager.

mMembershipManager = membershipManager;
mOpenFileHandleContainer = new DoraOpenFileHandleContainer();
mMkdirsRecursive = MkdirsOptions.defaults(mConf).setCreateParent(true);
Expand Down Expand Up @@ -458,83 +449,6 @@ protected int getCachedPercentage(alluxio.grpc.FileInfo fi, String ufsFullPath)
return cachedPercentage;
}

/**
* Build FileInfo from UfsStatus and UFS full Path.
*
* @param status
* @param ufsFullPath
* @param xattrMap
* @return a FileInfo
*/
public alluxio.grpc.FileInfo buildFileInfoFromUfsStatus(UfsStatus status, String ufsFullPath,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two methods are just changed static and moved to the end of this file.

@Nullable Map<String, String> xattrMap)
throws IOException {
UnderFileSystem ufs = getUfsInstance(ufsFullPath);
String filename = new AlluxioURI(ufsFullPath).getName();

alluxio.grpc.FileInfo.Builder infoBuilder = alluxio.grpc.FileInfo.newBuilder()
.setUfsType(ufs.getUnderFSType())
.setFileId(ufsFullPath.hashCode())
.setName(filename)
.setPath(ufsFullPath)
.setUfsPath(ufsFullPath)
.setMode(status.getMode())
.setFolder(status.isDirectory())
.setOwner(status.getOwner())
.setGroup(status.getGroup())
.setCompleted(true)
.setPersisted(true);
if (xattrMap != null) {
for (Map.Entry<String, String> entry : xattrMap.entrySet()) {
infoBuilder.putXattr(entry.getKey(), ByteString.copyFromUtf8(entry.getValue()));
}
}
if (status instanceof UfsFileStatus) {
UfsFileStatus fileStatus = (UfsFileStatus) status;
infoBuilder.setLength(fileStatus.getContentLength())
.setLastModificationTimeMs(status.getLastModifiedTime())
.setBlockSizeBytes(fileStatus.getBlockSize());
String contentHash = ((UfsFileStatus) status).getContentHash();
if (contentHash != null) {
infoBuilder.setContentHash(contentHash);
}

// get cached percentage
String cacheManagerFileId = new AlluxioURI(ufsFullPath).hash();
final long bytesInCache = mCacheManager.getUsage()
.flatMap(usage -> usage.partitionedBy(file(cacheManagerFileId)))
.map(CacheUsage::used).orElse(0L);
final long fileLength = fileStatus.getContentLength();
final int cachedPercentage;
if (fileLength > 0) {
cachedPercentage = (int) (bytesInCache * 100 / fileLength);
} else {
cachedPercentage = 100;
}

infoBuilder.setInAlluxioPercentage(cachedPercentage)
.setInMemoryPercentage(cachedPercentage);
}
return infoBuilder.build();
}

/**
* Build FileStatus from UfsStatus and UFS full Path.
*
* @param status the ufs status
* @param ufsFullPath the full ufs path
* @param xattrMap the map of file xAttrs
* @return the file status
*/
public DoraMeta.FileStatus buildFileStatusFromUfsStatus(UfsStatus status, String ufsFullPath,
@Nullable Map<String, String> xattrMap)
throws IOException {
return DoraMeta.FileStatus.newBuilder()
.setFileInfo(buildFileInfoFromUfsStatus(status, ufsFullPath, xattrMap))
.setTs(System.nanoTime())
.build();
}

@Override
public BlockReader createFileReader(String fileId, long offset, boolean positionShort,
Protocol.OpenUfsBlockOptions options) throws IOException, AccessControlException {
Expand Down Expand Up @@ -727,7 +641,8 @@ private void loadMetadata(UfsStatus status, List<LoadFailure> errors) {
if (mXAttrWriteToUFSEnabled) {
xattrMap = ufs.getAttributes(ufsFullPath);
}
DoraMeta.FileStatus fs = buildFileStatusFromUfsStatus(status, ufsFullPath, xattrMap);
DoraMeta.FileStatus fs = buildFileStatusFromUfsStatus(getCacheUsage(), ufs.getUnderFSType(),
status, ufsFullPath, xattrMap);
mMetaManager.put(ufsFullPath, fs);
} catch (Exception e) {
LOG.error("Failed to put file status to meta manager", e);
Expand Down Expand Up @@ -980,7 +895,7 @@ public OpenFileHandle createFile(String path, CreateFilePOptions options)
group,
createOption.getMode().toShort(),
DUMMY_BLOCK_SIZE);
info = buildFileInfoFromUfsStatus(status, path, null);
info = buildFileInfoFromUfsStatus(mCacheManager.getUsage(), getUfsInstance(path).getUnderFSType(), status, path, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -1201,6 +1116,10 @@ DoraMetaManager getMetaManager() {
return mMetaManager;
}

public Optional<CacheUsage> getCacheUsage() {
return mCacheManager.getUsage();
}

protected void checkCopyPermission(String srcPath, String dstPath)
throws AccessControlException, IOException {
// No-op
Expand All @@ -1219,4 +1138,85 @@ protected DoraOpenFileHandleContainer getOpenFileHandleContainer() {
public WorkerNetAddress getAddress() {
return mAddress;
}

/**
* Build FileInfo from UfsStatus and UFS full Path.
*
* @param cacheUsage
* @param ufsType
* @param status
* @param ufsFullPath
* @param xattrMap
* @return a FileInfo
*/
public static alluxio.grpc.FileInfo buildFileInfoFromUfsStatus(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's probably a better place for these 2 static methods but I don't see it right now

Optional<CacheUsage> cacheUsage, String ufsType, UfsStatus status, String ufsFullPath,
@Nullable Map<String, String> xattrMap) {
String filename = new AlluxioURI(ufsFullPath).getName();

alluxio.grpc.FileInfo.Builder infoBuilder = alluxio.grpc.FileInfo.newBuilder()
.setUfsType(ufsType)
.setFileId(ufsFullPath.hashCode())
.setName(filename)
.setPath(ufsFullPath)
.setUfsPath(ufsFullPath)
.setMode(status.getMode())
.setFolder(status.isDirectory())
.setOwner(status.getOwner())
.setGroup(status.getGroup())
.setCompleted(true)
.setPersisted(true);
if (xattrMap != null) {
for (Map.Entry<String, String> entry : xattrMap.entrySet()) {
infoBuilder.putXattr(entry.getKey(), ByteString.copyFromUtf8(entry.getValue()));
}
}
if (status instanceof UfsFileStatus) {
UfsFileStatus fileStatus = (UfsFileStatus) status;
infoBuilder.setLength(fileStatus.getContentLength())
.setLastModificationTimeMs(status.getLastModifiedTime())
.setBlockSizeBytes(fileStatus.getBlockSize());
String contentHash = ((UfsFileStatus) status).getContentHash();
if (contentHash != null) {
infoBuilder.setContentHash(contentHash);
}

// get cached percentage
String cacheManagerFileId = new AlluxioURI(ufsFullPath).hash();
final long bytesInCache = cacheUsage
.flatMap(usage -> usage.partitionedBy(file(cacheManagerFileId)))
.map(CacheUsage::used).orElse(0L);
final long fileLength = fileStatus.getContentLength();
final int cachedPercentage;
if (fileLength > 0) {
cachedPercentage = (int) (bytesInCache * 100 / fileLength);
} else {
cachedPercentage = 100;
}

infoBuilder.setInAlluxioPercentage(cachedPercentage)
.setInMemoryPercentage(cachedPercentage);
}
return infoBuilder.build();
}

/**
* Build FileStatus from UfsStatus and UFS full Path.
*
* @param cacheUsage
* @param ufsType
* @param status the ufs status
* @param ufsFullPath the full ufs path
* @param xattrMap the map of file xAttrs
* @return the file status
*/
public static DoraMeta.FileStatus buildFileStatusFromUfsStatus(
Optional<CacheUsage> cacheUsage, String ufsType, UfsStatus status, String ufsFullPath,
@Nullable Map<String, String> xattrMap) {
return DoraMeta.FileStatus.newBuilder()
.setFileInfo(buildFileInfoFromUfsStatus(cacheUsage, ufsType, status, ufsFullPath, xattrMap))
.setTs(System.nanoTime())
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ public class DoraWorkerClientServiceHandler extends BlockWorkerGrpc.BlockWorkerI
Configuration.getInt(PropertyKey.MASTER_FILE_SYSTEM_LISTSTATUS_RESULTS_PER_MESSAGE);

private final ReadResponseMarshaller mReadResponseMarshaller = new ReadResponseMarshaller();
private final DoraWorker mWorker;
private final PagedDoraWorker mWorker;

/**
* Creates a new implementation of gRPC BlockWorker interface.
* @param doraWorker the DoraWorker object
*/
@Inject
public DoraWorkerClientServiceHandler(DoraWorker doraWorker) {
public DoraWorkerClientServiceHandler(PagedDoraWorker doraWorker) {
mWorker = requireNonNull(doraWorker);
}

Expand Down Expand Up @@ -233,7 +233,9 @@ public void listStatus(ListStatusPRequest request,
// the list status do not include xattr now. GetAttr will cause some additional overhead.
// And not every request requires the Xattr. Now only get file xattr in GetStatus.
alluxio.grpc.FileInfo fi =
((PagedDoraWorker) mWorker).buildFileInfoFromUfsStatus(status, ufsFullPath, null);
PagedDoraWorker.buildFileInfoFromUfsStatus(((PagedDoraWorker) mWorker).getCacheUsage(),
((PagedDoraWorker) mWorker).getUfsInstance(ufsFullPath).getUnderFSType(),
status, ufsFullPath, null);

builder.addFileInfos(fi);
if (builder.getFileInfosCount() == LIST_STATUS_BATCH_SIZE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ protected void configure() {
bind(UfsManager.class).to(DoraUfsManager.class).in(Scopes.SINGLETON);
bind(AlluxioConfiguration.class).toProvider(() -> Configuration.global());

// Create FileSystemContext shared across all worker components
FileSystemContext fileSystemContext = FileSystemContext.create();
bind(FileSystemContext.class).toInstance(fileSystemContext);
Comment on lines +80 to +82
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the change in PagedDoraWorker constructor, I'm just moving some resource creation out of PagedDoraWorker() into this module definition, and injecting into the constructor. The code logic effectively doesn't change.


// Note that dora can only use Paged Store
try {
CacheManagerOptions cacheManagerOptions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void before() throws IOException {
PagedDoraWorker worker = mock(PagedDoraWorker.class);
CacheManager cacheManager = mock(CacheManager.class);
mDoraUfsManager = mock(DoraUfsManager.class);
mManager = new DoraMetaManager(conf, worker, cacheManager, mDoraUfsManager);
mManager = new DoraMetaManager(conf, cacheManager, mDoraUfsManager);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is one of the core change in this PR. The PagedDoraWorker no longer passes its this out in its own constructor. The this ref escaping in the constructure is a bad practice we should avoid.

}

@After
Expand Down
Loading