-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor PagedDoraWorker by injecting MetaManager and UfsManager #18181
Changes from 7 commits
6b6bdb8
a78c235
e604f92
28ad411
2cc9b50
53caade
555e55a
e377e92
6b49e8a
e334d2b
26518b5
e459a1e
d2259bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,18 +149,18 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker { | |
// for now Dora Worker does not support Alluxio <-> UFS mapping, | ||
// and assumes all UFS paths belong to the same UFS. | ||
private static final int MOUNT_POINT = 1; | ||
private final Closer mResourceCloser = Closer.create(); | ||
protected final Closer mResourceCloser = Closer.create(); | ||
jiacheliu3 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// TODO(lucy) change to string typed once membership manager got enabled by default | ||
private final AtomicReference<WorkerIdentity> mWorkerId; | ||
protected final CacheManager mCacheManager; | ||
protected final DoraUfsManager mUfsManager; | ||
private final DoraMetaManager mMetaManager; | ||
protected DoraMetaManager mMetaManager; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's necessary to change the scope here so a child class can see this field |
||
private final MembershipManager mMembershipManager; | ||
private final UfsInputStreamCache mUfsStreamCache; | ||
private final long mPageSize; | ||
protected final AlluxioConfiguration mConf; | ||
private final BlockMasterClientPool mBlockMasterClientPool; | ||
private FileSystemContext mFsContext; | ||
protected FileSystemContext mFsContext; | ||
private MkdirsOptions mMkdirsRecursive; | ||
private MkdirsOptions mMkdirsNonRecursive; | ||
|
||
|
@@ -188,23 +188,15 @@ public PagedDoraWorker( | |
AlluxioConfiguration conf, | ||
CacheManager cacheManager, | ||
MembershipManager membershipManager, | ||
BlockMasterClientPool blockMasterClientPool | ||
) { | ||
this(workerId, conf, cacheManager, membershipManager, blockMasterClientPool, | ||
FileSystemContext.create(conf)); | ||
} | ||
|
||
protected PagedDoraWorker( | ||
AtomicReference<WorkerIdentity> workerId, | ||
AlluxioConfiguration conf, | ||
CacheManager cacheManager, | ||
MembershipManager membershipManager, | ||
BlockMasterClientPool blockMasterClientPool, | ||
FileSystemContext fileSystemContext) { | ||
DoraUfsManager ufsManager, | ||
DoraMetaManager metaManager, | ||
FileSystemContext fileSystemContext | ||
) { | ||
super(ExecutorServiceFactories.fixedThreadPool("dora-worker-executor", 5)); | ||
mWorkerId = workerId; | ||
mConf = conf; | ||
mUfsManager = mResourceCloser.register(new DoraUfsManager()); | ||
mUfsManager = mResourceCloser.register(ufsManager); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm just creating the UfsManager outside this constructor and passing it in. The code logic does not change at all so this refactor should be effectively minor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guice advises against injecting closeable resources especially when it's a singleton, as it's hard to reason about who is responsible to close the resource. https://github.com/google/guice/wiki/Avoid-Injecting-Closable-Resources There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea for best coding style we probably should refactor this a bit (together with our next big modularization efforts). |
||
String rootUFS = mConf.getString(PropertyKey.DORA_CLIENT_UFS_ROOT); | ||
mUfsManager.getOrAdd(new AlluxioURI(rootUFS), | ||
() -> UnderFileSystemConfiguration.defaults(mConf)); | ||
|
@@ -213,8 +205,7 @@ protected PagedDoraWorker( | |
mPageSize = mConf.getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE); | ||
mBlockMasterClientPool = blockMasterClientPool; | ||
mCacheManager = cacheManager; | ||
mMetaManager = mResourceCloser.register( | ||
new DoraMetaManager(mConf, this, mCacheManager, mUfsManager)); | ||
mMetaManager = mResourceCloser.register(metaManager); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interestingly, I don't have something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
mMembershipManager = membershipManager; | ||
mOpenFileHandleContainer = new DoraOpenFileHandleContainer(); | ||
mMkdirsRecursive = MkdirsOptions.defaults(mConf).setCreateParent(true); | ||
|
@@ -458,83 +449,6 @@ protected int getCachedPercentage(alluxio.grpc.FileInfo fi, String ufsFullPath) | |
return cachedPercentage; | ||
} | ||
|
||
/** | ||
* Build FileInfo from UfsStatus and UFS full Path. | ||
* | ||
* @param status | ||
* @param ufsFullPath | ||
* @param xattrMap | ||
* @return a FileInfo | ||
*/ | ||
public alluxio.grpc.FileInfo buildFileInfoFromUfsStatus(UfsStatus status, String ufsFullPath, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two methods are just changed |
||
@Nullable Map<String, String> xattrMap) | ||
throws IOException { | ||
UnderFileSystem ufs = getUfsInstance(ufsFullPath); | ||
String filename = new AlluxioURI(ufsFullPath).getName(); | ||
|
||
alluxio.grpc.FileInfo.Builder infoBuilder = alluxio.grpc.FileInfo.newBuilder() | ||
.setUfsType(ufs.getUnderFSType()) | ||
.setFileId(ufsFullPath.hashCode()) | ||
.setName(filename) | ||
.setPath(ufsFullPath) | ||
.setUfsPath(ufsFullPath) | ||
.setMode(status.getMode()) | ||
.setFolder(status.isDirectory()) | ||
.setOwner(status.getOwner()) | ||
.setGroup(status.getGroup()) | ||
.setCompleted(true) | ||
.setPersisted(true); | ||
if (xattrMap != null) { | ||
for (Map.Entry<String, String> entry : xattrMap.entrySet()) { | ||
infoBuilder.putXattr(entry.getKey(), ByteString.copyFromUtf8(entry.getValue())); | ||
} | ||
} | ||
if (status instanceof UfsFileStatus) { | ||
UfsFileStatus fileStatus = (UfsFileStatus) status; | ||
infoBuilder.setLength(fileStatus.getContentLength()) | ||
.setLastModificationTimeMs(status.getLastModifiedTime()) | ||
.setBlockSizeBytes(fileStatus.getBlockSize()); | ||
String contentHash = ((UfsFileStatus) status).getContentHash(); | ||
if (contentHash != null) { | ||
infoBuilder.setContentHash(contentHash); | ||
} | ||
|
||
// get cached percentage | ||
String cacheManagerFileId = new AlluxioURI(ufsFullPath).hash(); | ||
final long bytesInCache = mCacheManager.getUsage() | ||
.flatMap(usage -> usage.partitionedBy(file(cacheManagerFileId))) | ||
.map(CacheUsage::used).orElse(0L); | ||
final long fileLength = fileStatus.getContentLength(); | ||
final int cachedPercentage; | ||
if (fileLength > 0) { | ||
cachedPercentage = (int) (bytesInCache * 100 / fileLength); | ||
} else { | ||
cachedPercentage = 100; | ||
} | ||
|
||
infoBuilder.setInAlluxioPercentage(cachedPercentage) | ||
.setInMemoryPercentage(cachedPercentage); | ||
} | ||
return infoBuilder.build(); | ||
} | ||
|
||
/** | ||
* Build FileStatus from UfsStatus and UFS full Path. | ||
* | ||
* @param status the ufs status | ||
* @param ufsFullPath the full ufs path | ||
* @param xattrMap the map of file xAttrs | ||
* @return the file status | ||
*/ | ||
public DoraMeta.FileStatus buildFileStatusFromUfsStatus(UfsStatus status, String ufsFullPath, | ||
@Nullable Map<String, String> xattrMap) | ||
throws IOException { | ||
return DoraMeta.FileStatus.newBuilder() | ||
.setFileInfo(buildFileInfoFromUfsStatus(status, ufsFullPath, xattrMap)) | ||
.setTs(System.nanoTime()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public BlockReader createFileReader(String fileId, long offset, boolean positionShort, | ||
Protocol.OpenUfsBlockOptions options) throws IOException, AccessControlException { | ||
|
@@ -727,7 +641,8 @@ private void loadMetadata(UfsStatus status, List<LoadFailure> errors) { | |
if (mXAttrWriteToUFSEnabled) { | ||
xattrMap = ufs.getAttributes(ufsFullPath); | ||
} | ||
DoraMeta.FileStatus fs = buildFileStatusFromUfsStatus(status, ufsFullPath, xattrMap); | ||
DoraMeta.FileStatus fs = buildFileStatusFromUfsStatus(getCacheUsage(), ufs.getUnderFSType(), | ||
status, ufsFullPath, xattrMap); | ||
mMetaManager.put(ufsFullPath, fs); | ||
} catch (Exception e) { | ||
LOG.error("Failed to put file status to meta manager", e); | ||
|
@@ -980,7 +895,7 @@ public OpenFileHandle createFile(String path, CreateFilePOptions options) | |
group, | ||
createOption.getMode().toShort(), | ||
DUMMY_BLOCK_SIZE); | ||
info = buildFileInfoFromUfsStatus(status, path, null); | ||
info = buildFileInfoFromUfsStatus(mCacheManager.getUsage(), getUfsInstance(path).getUnderFSType(), status, path, null); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
|
@@ -1201,6 +1116,10 @@ DoraMetaManager getMetaManager() { | |
return mMetaManager; | ||
} | ||
|
||
public Optional<CacheUsage> getCacheUsage() { | ||
return mCacheManager.getUsage(); | ||
} | ||
|
||
protected void checkCopyPermission(String srcPath, String dstPath) | ||
throws AccessControlException, IOException { | ||
// No-op | ||
|
@@ -1219,4 +1138,85 @@ protected DoraOpenFileHandleContainer getOpenFileHandleContainer() { | |
public WorkerNetAddress getAddress() { | ||
return mAddress; | ||
} | ||
|
||
/** | ||
* Build FileInfo from UfsStatus and UFS full Path. | ||
* | ||
* @param cacheUsage | ||
* @param ufsType | ||
* @param status | ||
* @param ufsFullPath | ||
* @param xattrMap | ||
* @return a FileInfo | ||
*/ | ||
public static alluxio.grpc.FileInfo buildFileInfoFromUfsStatus( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's probably a better place for these 2 static methods but I don't see it right now |
||
Optional<CacheUsage> cacheUsage, String ufsType, UfsStatus status, String ufsFullPath, | ||
@Nullable Map<String, String> xattrMap) { | ||
String filename = new AlluxioURI(ufsFullPath).getName(); | ||
|
||
alluxio.grpc.FileInfo.Builder infoBuilder = alluxio.grpc.FileInfo.newBuilder() | ||
.setUfsType(ufsType) | ||
.setFileId(ufsFullPath.hashCode()) | ||
.setName(filename) | ||
.setPath(ufsFullPath) | ||
.setUfsPath(ufsFullPath) | ||
.setMode(status.getMode()) | ||
.setFolder(status.isDirectory()) | ||
.setOwner(status.getOwner()) | ||
.setGroup(status.getGroup()) | ||
.setCompleted(true) | ||
.setPersisted(true); | ||
if (xattrMap != null) { | ||
for (Map.Entry<String, String> entry : xattrMap.entrySet()) { | ||
infoBuilder.putXattr(entry.getKey(), ByteString.copyFromUtf8(entry.getValue())); | ||
} | ||
} | ||
if (status instanceof UfsFileStatus) { | ||
UfsFileStatus fileStatus = (UfsFileStatus) status; | ||
infoBuilder.setLength(fileStatus.getContentLength()) | ||
.setLastModificationTimeMs(status.getLastModifiedTime()) | ||
.setBlockSizeBytes(fileStatus.getBlockSize()); | ||
String contentHash = ((UfsFileStatus) status).getContentHash(); | ||
if (contentHash != null) { | ||
infoBuilder.setContentHash(contentHash); | ||
} | ||
|
||
// get cached percentage | ||
String cacheManagerFileId = new AlluxioURI(ufsFullPath).hash(); | ||
final long bytesInCache = cacheUsage | ||
.flatMap(usage -> usage.partitionedBy(file(cacheManagerFileId))) | ||
.map(CacheUsage::used).orElse(0L); | ||
final long fileLength = fileStatus.getContentLength(); | ||
final int cachedPercentage; | ||
if (fileLength > 0) { | ||
cachedPercentage = (int) (bytesInCache * 100 / fileLength); | ||
} else { | ||
cachedPercentage = 100; | ||
} | ||
|
||
infoBuilder.setInAlluxioPercentage(cachedPercentage) | ||
.setInMemoryPercentage(cachedPercentage); | ||
} | ||
return infoBuilder.build(); | ||
} | ||
|
||
/** | ||
* Build FileStatus from UfsStatus and UFS full Path. | ||
* | ||
* @param cacheUsage | ||
* @param ufsType | ||
* @param status the ufs status | ||
* @param ufsFullPath the full ufs path | ||
* @param xattrMap the map of file xAttrs | ||
* @return the file status | ||
*/ | ||
public static DoraMeta.FileStatus buildFileStatusFromUfsStatus( | ||
Optional<CacheUsage> cacheUsage, String ufsType, UfsStatus status, String ufsFullPath, | ||
@Nullable Map<String, String> xattrMap) { | ||
return DoraMeta.FileStatus.newBuilder() | ||
.setFileInfo(buildFileInfoFromUfsStatus(cacheUsage, ufsType, status, ufsFullPath, xattrMap)) | ||
.setTs(System.nanoTime()) | ||
.build(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,10 @@ protected void configure() { | |
bind(UfsManager.class).to(DoraUfsManager.class).in(Scopes.SINGLETON); | ||
bind(AlluxioConfiguration.class).toProvider(() -> Configuration.global()); | ||
|
||
// Create FileSystemContext shared across all worker components | ||
FileSystemContext fileSystemContext = FileSystemContext.create(); | ||
bind(FileSystemContext.class).toInstance(fileSystemContext); | ||
Comment on lines
+80
to
+82
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the change in PagedDoraWorker constructor, I'm just moving some resource creation out of |
||
|
||
// Note that dora can only use Paged Store | ||
try { | ||
CacheManagerOptions cacheManagerOptions = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ public void before() throws IOException { | |
PagedDoraWorker worker = mock(PagedDoraWorker.class); | ||
CacheManager cacheManager = mock(CacheManager.class); | ||
mDoraUfsManager = mock(DoraUfsManager.class); | ||
mManager = new DoraMetaManager(conf, worker, cacheManager, mDoraUfsManager); | ||
mManager = new DoraMetaManager(conf, cacheManager, mDoraUfsManager); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this line is one of the core change in this PR. The PagedDoraWorker no longer passes its |
||
} | ||
|
||
@After | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DoraWorker
interface is missing a lot of method definitions so we need to downcast inDoraWorkerClientServiceHandler
after all. So I'm just casting to the real class to avoid the frequent casting.