diff --git a/dora/core/common/src/main/java/alluxio/underfs/UfsManager.java b/dora/core/common/src/main/java/alluxio/underfs/UfsManager.java index e9f0f3b35472..04b7542c2ad2 100644 --- a/dora/core/common/src/main/java/alluxio/underfs/UfsManager.java +++ b/dora/core/common/src/main/java/alluxio/underfs/UfsManager.java @@ -26,6 +26,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.URI; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -89,6 +90,17 @@ public AlluxioURI getUfsMountPointUri() { } } + /** + * Return a UFS instance if it already exists in the cache, otherwise, creates a new instance and + * return it. + * + * @param ufsUri the UFS path + * @param ufsConfSupplier supplier for UFS configuration + * @return the UFS instance + */ + UnderFileSystem getOrAdd(AlluxioURI ufsUri, + Supplier ufsConfSupplier); + /** * Keeps track of a mount id and maps it to its URI in Alluxio and configuration. This is an * Alluxio-only operation and no interaction to UFS will be made. @@ -131,6 +143,14 @@ void addMountWithRecorder(long mountId, AlluxioURI ufsUri, */ UfsClient get(long mountId) throws NotFoundException, UnavailableException; + /** + * Gets an instance for the given UFS URI and configuration, if such exists. + * + * @param ufsUri the URI of the UFS + * @return a UFS instance, or none if not registered + */ + Optional get(AlluxioURI ufsUri); + /** * @return the UFS client associated with root */ diff --git a/dora/core/server/common/src/main/java/alluxio/underfs/AbstractUfsManager.java b/dora/core/server/common/src/main/java/alluxio/underfs/AbstractUfsManager.java index 6f7be5298fab..60235b793f18 100644 --- a/dora/core/server/common/src/main/java/alluxio/underfs/AbstractUfsManager.java +++ b/dora/core/server/common/src/main/java/alluxio/underfs/AbstractUfsManager.java @@ -108,14 +108,7 @@ protected AbstractUfsManager() { mCloser = Closer.create(); } - /** - * Return a UFS instance if it already exists in the cache, otherwise, creates a new instance and - * return it. - * - * @param ufsUri the UFS path - * @param ufsConfSupplier supplier for UFS configuration - * @return the UFS instance - */ + @Override public UnderFileSystem getOrAdd(AlluxioURI ufsUri, Supplier ufsConfSupplier) { Key key = generateKey(ufsUri); @@ -227,12 +220,7 @@ public void removeMount(long mountId) { mMountIdToUfsInfoMap.remove(mountId); } - /** - * Gets an instance for the given UFS URI and configuration, if such exists. - * - * @param ufsUri the URI of the UFS - * @return a UFS instance, or none if not registered - */ + @Override public Optional get(AlluxioURI ufsUri) { Key key = generateKey(ufsUri); return get(key); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/DataServerFactory.java b/dora/core/server/worker/src/main/java/alluxio/worker/DataServerFactory.java index c0ddb514f29d..6adb1a1a50ba 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/DataServerFactory.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/DataServerFactory.java @@ -20,7 +20,7 @@ import alluxio.underfs.UfsManager; import alluxio.util.io.FileUtils; import alluxio.util.io.PathUtils; -import alluxio.worker.dora.DoraWorker; +import alluxio.worker.dora.PagedDoraWorker; import alluxio.worker.grpc.DoraWorkerClientServiceHandler; import alluxio.worker.grpc.GrpcDataServer; @@ -59,9 +59,9 @@ protected DataServerFactory(UfsManager ufsManager, */ public DataServer createRemoteGrpcDataServer(DataWorker dataWorker) { BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService; - if (dataWorker instanceof DoraWorker) { + if (dataWorker instanceof PagedDoraWorker) { blockWorkerService = - new DoraWorkerClientServiceHandler((DoraWorker) dataWorker); + new DoraWorkerClientServiceHandler((PagedDoraWorker) dataWorker); } else { throw new UnsupportedOperationException(dataWorker.getClass().getCanonicalName() + " is no longer supported in Alluxio 3.x"); @@ -83,9 +83,9 @@ public DataServer createDomainSocketDataServer(DataWorker worker) { } LOG.info("Domain socket data server is enabled at {}.", domainSocketPath); BlockWorkerGrpc.BlockWorkerImplBase blockWorkerService; - if (worker instanceof DoraWorker) { + if (worker instanceof PagedDoraWorker) { blockWorkerService = - new DoraWorkerClientServiceHandler((DoraWorker) worker); + new DoraWorkerClientServiceHandler((PagedDoraWorker) worker); } else { throw new UnsupportedOperationException(worker.getClass().getCanonicalName() + " is no longer supported in Alluxio 3.x"); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java index 77787114a8f8..711b2a928255 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/DoraMetaManager.java @@ -22,6 +22,7 @@ import alluxio.proto.meta.DoraMeta; import alluxio.proto.meta.DoraMeta.FileStatus; import alluxio.underfs.Fingerprint; +import alluxio.underfs.UfsManager; import alluxio.underfs.UfsStatus; import alluxio.underfs.UnderFileSystem; import alluxio.underfs.UnderFileSystemConfiguration; @@ -33,6 +34,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.inject.Inject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +56,7 @@ public class DoraMetaManager implements Closeable { private final AlluxioConfiguration mConf; private final DoraMetaStore mMetaStore; private final CacheManager mCacheManager; - private final PagedDoraWorker mDoraWorker; - protected final DoraUfsManager mUfsManager; + protected final UfsManager mUfsManager; private static final Logger SAMPLING_LOG = new SamplingLogger( LoggerFactory.getLogger(DoraMetaManager.class), 1L * Constants.MINUTE_MS); @@ -77,20 +78,19 @@ public class DoraMetaManager implements Closeable { /** * Creates a dora meta manager. * @param conf configuration - * @param doraWorker the dora worker instance * @param cacheManger the cache manager to manage the page cache * @param ufsManager the ufs Manager */ + @Inject public DoraMetaManager(AlluxioConfiguration conf, - PagedDoraWorker doraWorker, CacheManager cacheManger, - DoraUfsManager ufsManager) { + CacheManager cacheManger, + UfsManager ufsManager) { mConf = conf; String dbDir = mConf.getString(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR); Duration duration = mConf.getDuration(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_TTL); long ttl = (duration.isNegative() || duration.isZero()) ? -1 : duration.getSeconds(); mMetaStore = new RocksDBDoraMetaStore(dbDir, ttl); mCacheManager = cacheManger; - mDoraWorker = doraWorker; mUfsManager = ufsManager; } @@ -120,7 +120,8 @@ public Optional getFromUfs(String path) throws IOException { if (mXAttrWriteToUFSEnabled) { xattrMap = ufs.getAttributes(path); } - DoraMeta.FileStatus fs = mDoraWorker.buildFileStatusFromUfsStatus(status, path, xattrMap); + DoraMeta.FileStatus fs = PagedDoraWorker.buildFileStatusFromUfsStatus( + mCacheManager.getUsage(), ufs.getUnderFSType(), status, path, xattrMap); return Optional.ofNullable(fs); } catch (FileNotFoundException e) { return Optional.empty(); diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java index 06483eb90c48..56871f9ef036 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/dora/PagedDoraWorker.java @@ -149,18 +149,18 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker { // for now Dora Worker does not support Alluxio <-> UFS mapping, // and assumes all UFS paths belong to the same UFS. private static final int MOUNT_POINT = 1; - private final Closer mResourceCloser = Closer.create(); + protected final Closer mResourceCloser = Closer.create(); // TODO(lucy) change to string typed once membership manager got enabled by default private final AtomicReference mWorkerId; protected final CacheManager mCacheManager; - protected final DoraUfsManager mUfsManager; - private final DoraMetaManager mMetaManager; + protected final UfsManager mUfsManager; + protected final DoraMetaManager mMetaManager; private final MembershipManager mMembershipManager; private final UfsInputStreamCache mUfsStreamCache; private final long mPageSize; protected final AlluxioConfiguration mConf; private final BlockMasterClientPool mBlockMasterClientPool; - private FileSystemContext mFsContext; + protected final FileSystemContext mFsContext; private MkdirsOptions mMkdirsRecursive; private MkdirsOptions mMkdirsNonRecursive; @@ -181,6 +181,9 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker { * @param cacheManager * @param membershipManager * @param blockMasterClientPool + * @param ufsManager + * @param metaManager + * @param fileSystemContext */ @Inject public PagedDoraWorker( @@ -188,23 +191,15 @@ public PagedDoraWorker( AlluxioConfiguration conf, CacheManager cacheManager, MembershipManager membershipManager, - BlockMasterClientPool blockMasterClientPool - ) { - this(workerId, conf, cacheManager, membershipManager, blockMasterClientPool, - FileSystemContext.create(conf)); - } - - protected PagedDoraWorker( - AtomicReference workerId, - AlluxioConfiguration conf, - CacheManager cacheManager, - MembershipManager membershipManager, BlockMasterClientPool blockMasterClientPool, - FileSystemContext fileSystemContext) { + UfsManager ufsManager, + DoraMetaManager metaManager, + FileSystemContext fileSystemContext + ) { super(ExecutorServiceFactories.fixedThreadPool("dora-worker-executor", 5)); mWorkerId = workerId; mConf = conf; - mUfsManager = mResourceCloser.register(new DoraUfsManager()); + mUfsManager = mResourceCloser.register(ufsManager); String rootUFS = mConf.getString(PropertyKey.DORA_CLIENT_UFS_ROOT); mUfsManager.getOrAdd(new AlluxioURI(rootUFS), () -> UnderFileSystemConfiguration.defaults(mConf)); @@ -213,8 +208,7 @@ protected PagedDoraWorker( mPageSize = mConf.getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE); mBlockMasterClientPool = blockMasterClientPool; mCacheManager = cacheManager; - mMetaManager = mResourceCloser.register( - new DoraMetaManager(mConf, this, mCacheManager, mUfsManager)); + mMetaManager = mResourceCloser.register(metaManager); mMembershipManager = membershipManager; mOpenFileHandleContainer = new DoraOpenFileHandleContainer(); mMkdirsRecursive = MkdirsOptions.defaults(mConf).setCreateParent(true); @@ -458,83 +452,6 @@ protected int getCachedPercentage(alluxio.grpc.FileInfo fi, String ufsFullPath) return cachedPercentage; } - /** - * Build FileInfo from UfsStatus and UFS full Path. - * - * @param status - * @param ufsFullPath - * @param xattrMap - * @return a FileInfo - */ - public alluxio.grpc.FileInfo buildFileInfoFromUfsStatus(UfsStatus status, String ufsFullPath, - @Nullable Map xattrMap) - throws IOException { - UnderFileSystem ufs = getUfsInstance(ufsFullPath); - String filename = new AlluxioURI(ufsFullPath).getName(); - - alluxio.grpc.FileInfo.Builder infoBuilder = alluxio.grpc.FileInfo.newBuilder() - .setUfsType(ufs.getUnderFSType()) - .setFileId(ufsFullPath.hashCode()) - .setName(filename) - .setPath(ufsFullPath) - .setUfsPath(ufsFullPath) - .setMode(status.getMode()) - .setFolder(status.isDirectory()) - .setOwner(status.getOwner()) - .setGroup(status.getGroup()) - .setCompleted(true) - .setPersisted(true); - if (xattrMap != null) { - for (Map.Entry entry : xattrMap.entrySet()) { - infoBuilder.putXattr(entry.getKey(), ByteString.copyFromUtf8(entry.getValue())); - } - } - if (status instanceof UfsFileStatus) { - UfsFileStatus fileStatus = (UfsFileStatus) status; - infoBuilder.setLength(fileStatus.getContentLength()) - .setLastModificationTimeMs(status.getLastModifiedTime()) - .setBlockSizeBytes(fileStatus.getBlockSize()); - String contentHash = ((UfsFileStatus) status).getContentHash(); - if (contentHash != null) { - infoBuilder.setContentHash(contentHash); - } - - // get cached percentage - String cacheManagerFileId = new AlluxioURI(ufsFullPath).hash(); - final long bytesInCache = mCacheManager.getUsage() - .flatMap(usage -> usage.partitionedBy(file(cacheManagerFileId))) - .map(CacheUsage::used).orElse(0L); - final long fileLength = fileStatus.getContentLength(); - final int cachedPercentage; - if (fileLength > 0) { - cachedPercentage = (int) (bytesInCache * 100 / fileLength); - } else { - cachedPercentage = 100; - } - - infoBuilder.setInAlluxioPercentage(cachedPercentage) - .setInMemoryPercentage(cachedPercentage); - } - return infoBuilder.build(); - } - - /** - * Build FileStatus from UfsStatus and UFS full Path. - * - * @param status the ufs status - * @param ufsFullPath the full ufs path - * @param xattrMap the map of file xAttrs - * @return the file status - */ - public DoraMeta.FileStatus buildFileStatusFromUfsStatus(UfsStatus status, String ufsFullPath, - @Nullable Map xattrMap) - throws IOException { - return DoraMeta.FileStatus.newBuilder() - .setFileInfo(buildFileInfoFromUfsStatus(status, ufsFullPath, xattrMap)) - .setTs(System.nanoTime()) - .build(); - } - @Override public BlockReader createFileReader(String fileId, long offset, boolean positionShort, Protocol.OpenUfsBlockOptions options) throws IOException, AccessControlException { @@ -728,7 +645,8 @@ private void loadMetadata(UfsStatus status, List errors) { if (mXAttrWriteToUFSEnabled) { xattrMap = ufs.getAttributes(ufsFullPath); } - DoraMeta.FileStatus fs = buildFileStatusFromUfsStatus(status, ufsFullPath, xattrMap); + DoraMeta.FileStatus fs = buildFileStatusFromUfsStatus(getCacheUsage(), ufs.getUnderFSType(), + status, ufsFullPath, xattrMap); mMetaManager.put(ufsFullPath, fs); } catch (Exception e) { LOG.error("Failed to put file status to meta manager", e); @@ -981,7 +899,8 @@ public OpenFileHandle createFile(String path, CreateFilePOptions options) group, createOption.getMode().toShort(), DUMMY_BLOCK_SIZE); - info = buildFileInfoFromUfsStatus(status, path, null); + info = buildFileInfoFromUfsStatus(mCacheManager.getUsage(), + getUfsInstance(path).getUnderFSType(), status, path, null); } catch (IOException e) { throw new RuntimeException(e); } @@ -1202,6 +1121,14 @@ DoraMetaManager getMetaManager() { return mMetaManager; } + /** + * Gets the current cache usage in worker. + * @return cache usage + */ + public Optional getCacheUsage() { + return mCacheManager.getUsage(); + } + protected void checkCopyPermission(String srcPath, String dstPath) throws AccessControlException, IOException { // No-op @@ -1220,4 +1147,84 @@ protected DoraOpenFileHandleContainer getOpenFileHandleContainer() { public WorkerNetAddress getAddress() { return mAddress; } + + /** + * Build FileInfo from UfsStatus and UFS full Path. + * + * @param cacheUsage cache usage + * @param ufsType type of the UFS + * @param status file status + * @param ufsFullPath full UFS path mapping to the file + * @param xattrMap extra attributes + * @return a FileInfo + */ + public static alluxio.grpc.FileInfo buildFileInfoFromUfsStatus( + Optional cacheUsage, String ufsType, UfsStatus status, String ufsFullPath, + @Nullable Map xattrMap) { + String filename = new AlluxioURI(ufsFullPath).getName(); + + alluxio.grpc.FileInfo.Builder infoBuilder = alluxio.grpc.FileInfo.newBuilder() + .setUfsType(ufsType) + .setFileId(ufsFullPath.hashCode()) + .setName(filename) + .setPath(ufsFullPath) + .setUfsPath(ufsFullPath) + .setMode(status.getMode()) + .setFolder(status.isDirectory()) + .setOwner(status.getOwner()) + .setGroup(status.getGroup()) + .setCompleted(true) + .setPersisted(true); + if (xattrMap != null) { + for (Map.Entry entry : xattrMap.entrySet()) { + infoBuilder.putXattr(entry.getKey(), ByteString.copyFromUtf8(entry.getValue())); + } + } + if (status instanceof UfsFileStatus) { + UfsFileStatus fileStatus = (UfsFileStatus) status; + infoBuilder.setLength(fileStatus.getContentLength()) + .setLastModificationTimeMs(status.getLastModifiedTime()) + .setBlockSizeBytes(fileStatus.getBlockSize()); + String contentHash = ((UfsFileStatus) status).getContentHash(); + if (contentHash != null) { + infoBuilder.setContentHash(contentHash); + } + + // get cached percentage + String cacheManagerFileId = new AlluxioURI(ufsFullPath).hash(); + final long bytesInCache = cacheUsage + .flatMap(usage -> usage.partitionedBy(file(cacheManagerFileId))) + .map(CacheUsage::used).orElse(0L); + final long fileLength = fileStatus.getContentLength(); + final int cachedPercentage; + if (fileLength > 0) { + cachedPercentage = (int) (bytesInCache * 100 / fileLength); + } else { + cachedPercentage = 100; + } + + infoBuilder.setInAlluxioPercentage(cachedPercentage) + .setInMemoryPercentage(cachedPercentage); + } + return infoBuilder.build(); + } + + /** + * Build FileStatus from UfsStatus and UFS full Path. + * + * @param cacheUsage cache usage + * @param ufsType type of the UFS + * @param status the ufs status + * @param ufsFullPath the full ufs path + * @param xattrMap the map of file xAttrs + * @return the file status + */ + public static DoraMeta.FileStatus buildFileStatusFromUfsStatus( + Optional cacheUsage, String ufsType, UfsStatus status, String ufsFullPath, + @Nullable Map xattrMap) { + return DoraMeta.FileStatus.newBuilder() + .setFileInfo(buildFileInfoFromUfsStatus(cacheUsage, ufsType, status, ufsFullPath, xattrMap)) + .setTs(System.nanoTime()) + .build(); + } } diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/grpc/DoraWorkerClientServiceHandler.java b/dora/core/server/worker/src/main/java/alluxio/worker/grpc/DoraWorkerClientServiceHandler.java index b3a2a4a7978c..4a5ea8576223 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/grpc/DoraWorkerClientServiceHandler.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/grpc/DoraWorkerClientServiceHandler.java @@ -55,7 +55,6 @@ import alluxio.grpc.TaskStatus; import alluxio.underfs.UfsStatus; import alluxio.util.io.PathUtils; -import alluxio.worker.dora.DoraWorker; import alluxio.worker.dora.OpenFileHandle; import alluxio.worker.dora.PagedDoraWorker; @@ -88,14 +87,14 @@ public class DoraWorkerClientServiceHandler extends BlockWorkerGrpc.BlockWorkerI Configuration.getInt(PropertyKey.MASTER_FILE_SYSTEM_LISTSTATUS_RESULTS_PER_MESSAGE); private final ReadResponseMarshaller mReadResponseMarshaller = new ReadResponseMarshaller(); - private final DoraWorker mWorker; + private final PagedDoraWorker mWorker; /** * Creates a new implementation of gRPC BlockWorker interface. * @param doraWorker the DoraWorker object */ @Inject - public DoraWorkerClientServiceHandler(DoraWorker doraWorker) { + public DoraWorkerClientServiceHandler(PagedDoraWorker doraWorker) { mWorker = requireNonNull(doraWorker); } @@ -233,7 +232,9 @@ public void listStatus(ListStatusPRequest request, // the list status do not include xattr now. GetAttr will cause some additional overhead. // And not every request requires the Xattr. Now only get file xattr in GetStatus. alluxio.grpc.FileInfo fi = - ((PagedDoraWorker) mWorker).buildFileInfoFromUfsStatus(status, ufsFullPath, null); + PagedDoraWorker.buildFileInfoFromUfsStatus(mWorker.getCacheUsage(), + mWorker.getUfsInstance(ufsFullPath).getUnderFSType(), + status, ufsFullPath, null); builder.addFileInfos(fi); if (builder.getFileInfosCount() == LIST_STATUS_BATCH_SIZE) { diff --git a/dora/core/server/worker/src/main/java/alluxio/worker/modules/DoraWorkerModule.java b/dora/core/server/worker/src/main/java/alluxio/worker/modules/DoraWorkerModule.java index 014adc0103c8..fca6e1a53207 100644 --- a/dora/core/server/worker/src/main/java/alluxio/worker/modules/DoraWorkerModule.java +++ b/dora/core/server/worker/src/main/java/alluxio/worker/modules/DoraWorkerModule.java @@ -26,6 +26,7 @@ import alluxio.wire.WorkerIdentity; import alluxio.worker.Worker; import alluxio.worker.block.BlockMasterClientPool; +import alluxio.worker.dora.DoraMetaManager; import alluxio.worker.dora.DoraUfsManager; import alluxio.worker.dora.DoraWorker; import alluxio.worker.dora.PagedDoraWorker; @@ -73,8 +74,13 @@ protected void configure() { .toProvider(BlockMasterClientPool::new) .in(Scopes.SINGLETON); bind(UfsManager.class).to(DoraUfsManager.class).in(Scopes.SINGLETON); + bind(DoraMetaManager.class).in(Scopes.SINGLETON); bind(AlluxioConfiguration.class).toProvider(() -> Configuration.global()); + // Create FileSystemContext shared across all worker components + FileSystemContext fileSystemContext = FileSystemContext.create(); + bind(FileSystemContext.class).toInstance(fileSystemContext); + // Note that dora can only use Paged Store try { CacheManagerOptions cacheManagerOptions = diff --git a/dora/core/server/worker/src/test/java/alluxio/worker/dora/DoraMetaManagerTest.java b/dora/core/server/worker/src/test/java/alluxio/worker/dora/DoraMetaManagerTest.java index 632d485d4967..f2c5ba7f3810 100644 --- a/dora/core/server/worker/src/test/java/alluxio/worker/dora/DoraMetaManagerTest.java +++ b/dora/core/server/worker/src/test/java/alluxio/worker/dora/DoraMetaManagerTest.java @@ -57,7 +57,7 @@ public void before() throws IOException { PagedDoraWorker worker = mock(PagedDoraWorker.class); CacheManager cacheManager = mock(CacheManager.class); mDoraUfsManager = mock(DoraUfsManager.class); - mManager = new DoraMetaManager(conf, worker, cacheManager, mDoraUfsManager); + mManager = new DoraMetaManager(conf, cacheManager, mDoraUfsManager); } @After diff --git a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java index 9c6815417299..5e8432272437 100644 --- a/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java +++ b/dora/core/server/worker/src/test/java/alluxio/worker/dora/PagedDoraWorkerTest.java @@ -20,6 +20,7 @@ import alluxio.AlluxioURI; import alluxio.PositionReader; +import alluxio.client.file.FileSystemContext; import alluxio.client.file.cache.CacheManager; import alluxio.client.file.cache.CacheManagerOptions; import alluxio.client.file.cache.PageId; @@ -104,9 +105,13 @@ public void before() throws Exception { CacheManager.Factory.create(Configuration.global(), cacheManagerOptions, pageMetaStore); mMembershipManager = MembershipManager.Factory.create(Configuration.global()); - mWorker = new PagedDoraWorker( - new AtomicReference<>(WorkerIdentity.ParserV0.INSTANCE.fromLong(1L)), - Configuration.global(), mCacheManager, mMembershipManager, new BlockMasterClientPool()); + DoraUfsManager ufsManager = new DoraUfsManager(); + DoraMetaManager metaManager = new DoraMetaManager(Configuration.global(), + mCacheManager, ufsManager); + mWorker = new PagedDoraWorker(new AtomicReference<>( + WorkerIdentity.ParserV0.INSTANCE.fromLong(1L)), + Configuration.global(), mCacheManager, mMembershipManager, + new BlockMasterClientPool(), ufsManager, metaManager, FileSystemContext.create()); } @After diff --git a/dora/core/server/worker/src/test/java/alluxio/worker/grpc/DoraWorkerClientServiceHandlerTest.java b/dora/core/server/worker/src/test/java/alluxio/worker/grpc/DoraWorkerClientServiceHandlerTest.java index 2c109f549200..5fe33c50d45a 100644 --- a/dora/core/server/worker/src/test/java/alluxio/worker/grpc/DoraWorkerClientServiceHandlerTest.java +++ b/dora/core/server/worker/src/test/java/alluxio/worker/grpc/DoraWorkerClientServiceHandlerTest.java @@ -14,6 +14,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import alluxio.client.file.FileSystemContext; import alluxio.client.file.cache.CacheManager; import alluxio.client.file.cache.CacheManagerOptions; import alluxio.client.file.cache.PageMetaStore; @@ -25,6 +26,8 @@ import alluxio.util.io.PathUtils; import alluxio.wire.WorkerIdentity; import alluxio.worker.block.BlockMasterClientPool; +import alluxio.worker.dora.DoraMetaManager; +import alluxio.worker.dora.DoraUfsManager; import alluxio.worker.dora.PagedDoraWorker; import io.grpc.stub.StreamObserver; @@ -67,9 +70,13 @@ public void before() throws Exception { CacheManager.Factory.create(Configuration.global(), cacheManagerOptions, pageMetaStore); mMembershipManager = MembershipManager.Factory.create(Configuration.global()); - mWorker = new PagedDoraWorker( - new AtomicReference<>(WorkerIdentity.ParserV0.INSTANCE.fromLong(1L)), - Configuration.global(), mCacheManager, mMembershipManager, new BlockMasterClientPool()); + DoraUfsManager ufsManager = new DoraUfsManager(); + DoraMetaManager metaManager = new DoraMetaManager(Configuration.global(), + mCacheManager, ufsManager); + mWorker = new PagedDoraWorker(new AtomicReference<>( + WorkerIdentity.ParserV0.INSTANCE.fromLong(1L)), + Configuration.global(), mCacheManager, mMembershipManager, new BlockMasterClientPool(), + ufsManager, metaManager, FileSystemContext.create()); mServiceHandler = new DoraWorkerClientServiceHandler(mWorker); }