Skip to content

Commit

Permalink
add remote alluxio cache
Browse files Browse the repository at this point in the history
  • Loading branch information
JiamingMai committed Jan 17, 2025
1 parent d949877 commit 71d6bdc
Show file tree
Hide file tree
Showing 10 changed files with 911 additions and 20 deletions.
28 changes: 28 additions & 0 deletions lib/trino-filesystem-cache-alluxio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>trino-filesystem</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-alluxio</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand Down Expand Up @@ -94,6 +99,17 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-transport</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down Expand Up @@ -161,5 +177,17 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
package io.trino.filesystem.alluxio;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.AlluxioProperties;
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.Source;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -58,17 +59,17 @@ public static AlluxioConfiguration create(AlluxioFileSystemCacheConfig config)
.map(directory -> totalSpace(Path.of(directory))).collect(toImmutableList()))
: config.getMaxCacheSizes();

AlluxioProperties alluxioProperties = new AlluxioProperties();
alluxioProperties.set(USER_CLIENT_CACHE_ENABLED, true);
alluxioProperties.set(USER_CLIENT_CACHE_DIRS, join(",", config.getCacheDirectories()));
alluxioProperties.set(USER_CLIENT_CACHE_SIZE, join(",", maxCacheSizes.stream().map(DataSize::toBytesValueString).toList()));
alluxioProperties.set(USER_CLIENT_CACHE_PAGE_SIZE, config.getCachePageSize().toBytesValueString());
InstancedConfiguration alluxioConf = Configuration.copyGlobal();
alluxioConf.set(USER_CLIENT_CACHE_ENABLED, true, Source.RUNTIME);
alluxioConf.set(USER_CLIENT_CACHE_DIRS, join(",", config.getCacheDirectories()), Source.RUNTIME);
alluxioConf.set(USER_CLIENT_CACHE_SIZE, join(",", maxCacheSizes.stream().map(DataSize::toBytesValueString).toList()), Source.RUNTIME);
alluxioConf.set(USER_CLIENT_CACHE_PAGE_SIZE, config.getCachePageSize().toBytesValueString(), Source.RUNTIME);
Optional<Duration> ttl = config.getCacheTTL();
if (ttl.isPresent()) {
alluxioProperties.set(USER_CLIENT_CACHE_TTL_THRESHOLD_SECONDS, ttl.orElseThrow().roundTo(TimeUnit.SECONDS));
alluxioProperties.set(USER_CLIENT_CACHE_TTL_ENABLED, true);
alluxioConf.set(USER_CLIENT_CACHE_TTL_THRESHOLD_SECONDS, ttl.orElseThrow().roundTo(TimeUnit.SECONDS));
alluxioConf.set(USER_CLIENT_CACHE_TTL_ENABLED, true);
}
return new InstancedConfiguration(alluxioProperties);
return alluxioConf;
}

private static void canWrite(Path path)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.alluxio;

import alluxio.AlluxioURI;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.client.file.cache.CacheManager;
import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.AlluxioException;
import alluxio.grpc.OpenFilePOptions;
import alluxio.wire.FileInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.units.DataSize;
import io.opentelemetry.api.trace.Tracer;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoInput;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;
import io.trino.filesystem.cache.TrinoFileSystemCache;
import jakarta.annotation.PreDestroy;

import java.io.IOException;
import java.util.Collection;

import static java.util.Objects.requireNonNull;

public class RemoteAlluxioFileSystemCache
implements TrinoFileSystemCache
{
private static final Logger log = Logger.get(TrinoFileSystemCache.class);
private final Tracer tracer;
private final FileSystem fileSystem;
private final AlluxioConfiguration config;
private final AlluxioCacheStats statistics;
private final CacheManager cacheManager;
private final DataSize pageSize;

@Inject
public RemoteAlluxioFileSystemCache(Tracer tracer, AlluxioFileSystemCacheConfig config, AlluxioCacheStats statistics)
throws IOException
{
this.tracer = requireNonNull(tracer, "tracer is null");
this.config = AlluxioConfigurationFactory.create(requireNonNull(config, "config is null"));
this.pageSize = config.getCachePageSize();
this.cacheManager = CacheManager.Factory.create(this.config);
FileSystemContext fsContext = FileSystemContext.create(this.config);
this.fileSystem = FileSystem.Factory.create(fsContext);
this.statistics = requireNonNull(statistics, "statistics is null");
}

@Override
public TrinoInput cacheInput(TrinoInputFile delegate, String key)
throws IOException
{
return new RemoteAlluxioInput(tracer, delegate, key, uriStatus(delegate), new TracingCacheManager(tracer, key, pageSize, cacheManager), config, statistics, openFile(delegate));
}

@Override
public TrinoInputStream cacheStream(TrinoInputFile delegate, String key)
throws IOException
{
return new RemoteAlluxioInputStream(tracer, delegate, key, uriStatus(delegate), new TracingCacheManager(tracer, key, pageSize, cacheManager), config, statistics, openFile(delegate));
}

@Override
public long cacheLength(TrinoInputFile delegate, String key)
throws IOException
{
return delegate.length();
}

@Override
public void expire(Location source)
throws IOException
{
try {
AlluxioURI sourceUri = new AlluxioURI(source.toString());
if (fileSystem.exists(sourceUri) && !sourceUri.isRoot()) {
fileSystem.free(sourceUri);
}
}
catch (AlluxioException e) {
log.error(e, "Failed to free the file %s", source);
}
}

@Override
public void expire(Collection<Location> locations)
throws IOException
{
for (Location location : locations) {
try {
AlluxioURI locationUri = new AlluxioURI(location.toString());
if (fileSystem.exists(locationUri) && !locationUri.isRoot()) {
fileSystem.free(locationUri);
}
}
catch (AlluxioException e) {
log.error(e, "Failed to free the file %s", location);
}
}
}

@PreDestroy
public void shutdown()
throws Exception
{
cacheManager.close();
fileSystem.close();
}

@VisibleForTesting
protected URIStatus uriStatus(TrinoInputFile file)
throws IOException
{
FileInfo info = new FileInfo()
.setPath(file.location().toString())
.setLength(file.length());
return new URIStatus(info);
}

private FileInStream openFile(TrinoInputFile file)
throws IOException
{
try {
return fileSystem.openFile(new AlluxioURI(file.location().toString()), OpenFilePOptions.getDefaultInstance());
}
catch (AlluxioException e) {
throw new IOException("fail to open remote cache file", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.alluxio;

import alluxio.metrics.MetricsConfig;
import alluxio.metrics.MetricsSystem;
import com.google.inject.Binder;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.filesystem.cache.CachingHostAddressProvider;
import io.trino.filesystem.cache.ConsistentHashingHostAddressProvider;
import io.trino.filesystem.cache.ConsistentHashingHostAddressProviderConfig;
import io.trino.filesystem.cache.TrinoFileSystemCache;

import java.util.Properties;

import static com.google.inject.Scopes.SINGLETON;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class RemoteAlluxioFileSystemCacheModule
extends AbstractConfigurationAwareModule
{
private final boolean isCoordinator;

public RemoteAlluxioFileSystemCacheModule(boolean isCoordinator)
{
this.isCoordinator = isCoordinator;
}

@Override
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(AlluxioFileSystemCacheConfig.class);
configBinder(binder).bindConfig(ConsistentHashingHostAddressProviderConfig.class);
binder.bind(AlluxioCacheStats.class).in(SINGLETON);
newExporter(binder).export(AlluxioCacheStats.class).as(generator -> generator.generatedNameOf(AlluxioCacheStats.class));

if (isCoordinator) {
newOptionalBinder(binder, CachingHostAddressProvider.class).setBinding().to(ConsistentHashingHostAddressProvider.class).in(SINGLETON);
}
binder.bind(TrinoFileSystemCache.class).to(RemoteAlluxioFileSystemCache.class).in(SINGLETON);

Properties metricProps = new Properties();
metricProps.put("sink.jmx.class", "alluxio.metrics.sink.JmxSink");
metricProps.put("sink.jmx.domain", "org.alluxio");
MetricsSystem.startSinksFromConfig(new MetricsConfig(metricProps));
}
}
Loading

0 comments on commit 71d6bdc

Please sign in to comment.