Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(GCS): Update CustomTime metadata field at cache entry hits #2337

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/Gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ export SCCACHE_GCS_KEY_PATH=secret-gcp-storage.json
Cache location GCS, bucket: Bucket(name=<bucket name in GCP>), key_prefix: (none)
```

## Lifecycle management

Sccache updates the `CustomTime` metadata field of cache objects every time
there was a cache hit.
This can be used to implement automatic cleanup in GCS using the
["Custom time before"](https://cloud.google.com/storage/docs/lifecycle#customtimebefore)
or ["Days since custom time"](https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime)
conditions on the bucket in order to remove cache entries which have not been
actively used for a certain amount of time.

## Deprecation

`SCCACHE_GCS_OAUTH_URL` have been deprecated and not supported, please use `SCCACHE_GCS_SERVICE_ACCOUNT` instead.
Expand Down
179 changes: 172 additions & 7 deletions src/cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use crate::cache::azure::AzureBlobCache;
use crate::cache::disk::DiskCache;
#[cfg(feature = "gcs")]
use crate::cache::gcs::GCSCache;
use crate::cache::gcs;
#[cfg(feature = "gha")]
use crate::cache::gha::GHACache;
#[cfg(feature = "memcached")]
Expand Down Expand Up @@ -51,7 +51,9 @@ use std::io::{self, Cursor, Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tempfile::NamedTempFile;
use tokio::sync::RwLock as TokioRwLock;
use zip::write::FileOptions;
use zip::{CompressionMethod, ZipArchive, ZipWriter};

Expand Down Expand Up @@ -355,7 +357,8 @@ pub trait Storage: Send + Sync {
///
/// - `Ok(CacheMode::ReadOnly)` means cache can only be used to `get`
/// cache.
/// - `Ok(CacheMode::ReadWrite)` means cache can do both `get` and `put`.
/// - `Ok(CacheMode::ReadWrite)` means cache can do all `get`, `put`, and
/// `timestamp_cache_hit` methods.
/// - `Err(err)` means cache is not setup correctly or not match with
/// users input (for example, user try to use `ReadWrite` but cache
/// is `ReadOnly`).
Expand All @@ -367,6 +370,21 @@ pub trait Storage: Send + Sync {
Ok(CacheMode::ReadWrite)
}

/// Stamp the custom "access time" or "custom time" record for an entry in
/// the cache, if present.
///
/// It is not always generally possible or practical to query this
/// information within sccache itself.
///
/// Returns a `Future` that will provide the result or error when the stamp
/// request finished. In case the operation is supported and successfully
/// completed, an `Ok(Some(Duration)` will be present as a `Result`. In case
/// the operation can not be performed for configuration reasons an
/// `Ok(None)` will be returned. In a context where it is assumed that the
/// operation will succeed and any kind of error occurs, the `Err` is
/// returned as the `Result`.
async fn timestamp_cache_hit(&self, key: &str) -> Result<Option<Duration>>;

/// Get the storage location.
fn location(&self) -> String;

Expand Down Expand Up @@ -402,6 +420,32 @@ pub trait Storage: Send + Sync {
}
}

/// An interface to least recent usage time (`atime`-like) timestamp updates.
#[async_trait]
pub trait TimestampUpdater: Send + Sync {
/// Returns whether the current implementation can update the timestamp.
/// This might be `false` due to configuration reasons, or the lack of
/// necessary rights.
fn can_update(&self) -> bool {
true
}

/// Returns whether the `TimestampUpdater` needs (re-)initialization.
/// A `true` value should indicate that a reinitialization is required or
/// it can not be determined if such a reinitialization is required.
/// A `false` value shall only be returned if it is deterministically
/// known that reinitialization can be skipped.
async fn needs_init(&self) -> Result<bool>;

/// (Re-)initializes the timestamp updater's runtime data, such as
/// authentication tokens.
async fn init(&mut self) -> Result<()>;

/// Updates the least recent use timestamp (if applicable) of the cache
/// entry identified by `key` to the current timestamp.
async fn update(&self, key: &str) -> Result<()>;
}

/// Configuration switches for preprocessor cache mode.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -453,10 +497,9 @@ impl PreprocessorCacheModeConfig {
}
}

/// Implement storage for operator.
/// Implement `Storage` for `opendal::Operator`.
#[cfg(any(
feature = "azure",
feature = "gcs",
feature = "gha",
feature = "memcached",
feature = "redis",
Expand All @@ -480,7 +523,7 @@ impl Storage for opendal::Operator {
}

async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
let start = std::time::Instant::now();
let start = Instant::now();

self.write(&normalize_key(key), entry.finish()?).await?;

Expand Down Expand Up @@ -533,6 +576,29 @@ impl Storage for opendal::Operator {
Ok(mode)
}

#[cfg(any(
feature = "azure",
feature = "gha",
feature = "memcached",
feature = "redis",
feature = "s3",
feature = "webdav",
))]
async fn timestamp_cache_hit(&self, _key: &str) -> Result<Option<Duration>> {
let scheme = self.info().scheme();
match scheme {
#[allow(unreachable_patterns)]
// If we build only with `cargo build --no-default-features`, we
// only want to use sccache with a local cache and no remote storage
// support. Also, lack of support for `timestamp_cache_hit` is not
// a problem for provider cases non-implemented in here.
_ => {
debug!("timestamp_cache_hit is not supported for {scheme}");
Err(anyhow!("Not implemented."))
}
}
}

fn location(&self) -> String {
let meta = self.info();
format!(
Expand All @@ -552,6 +618,92 @@ impl Storage for opendal::Operator {
}
}

/// Wrapper object for `Storage` implementations where a `TimestampUpdater`
/// implementation is also available.
#[derive(Debug)]
pub struct TimestampUpdatingStorage<S: Storage, U: TimestampUpdater> {
pub storage: S,
pub updater: Arc<TokioRwLock<U>>,
}

/// Implement `Storage` for `opendal::Operator` that also retained a
/// `TimestampUpdater`.
///
/// Normally, this implementation calls the usual `Storage` trait methods.
#[cfg(any(
feature = "gcs",
))]
#[async_trait]
impl<U: TimestampUpdater> Storage
for TimestampUpdatingStorage<opendal::Operator, U> {
async fn get(&self, key: &str) -> Result<Cache> {
self.storage.get(key).await
}

async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
self.storage.put(key, entry).await
}

async fn check(&self) -> Result<CacheMode> {
let as_storage: &dyn Storage = &self.storage;
as_storage.check().await
}

#[cfg(any(
feature = "gcs",
))]
async fn timestamp_cache_hit(&self, key: &str) -> Result<Option<Duration>> {
let scheme = self.storage.info().scheme();
match scheme {
#[cfg(feature = "gcs")]
opendal::Scheme::Gcs => {
let start = Instant::now();
{
// Try to run the update without reinitialization if
// possible. This saves us taking the exclusive write lock
// and speeds up overall performance if everyone can take
// a const reference to `updater`.
let updater = self.updater.read().await;
if !updater.can_update() {
//.The inability to update the cache, if from a known
// and verifiable property, is not an error.
return Ok(None);
}
if !updater.needs_init().await? {
updater.update(key).await?;
return Ok(Some(start.elapsed()));
}
}

{
let mut updater = self.updater.write().await;
updater.init().await?;
updater.update(key).await?;

Ok(Some(start.elapsed()))
}
}
#[allow(unreachable_patterns)]
_ => {
self.storage.timestamp_cache_hit(key).await
}
}
}

fn location(&self) -> String {
self.storage.location()
}

async fn current_size(&self) -> Result<Option<u64>> {
self.storage.current_size().await
}

async fn max_size(&self) -> Result<Option<u64>> {
self.storage.max_size().await
}
}


/// Normalize key `abcdef` into `a/b/c/abcdef`
pub(in crate::cache) fn normalize_key(key: &str) -> String {
format!("{}/{}/{}/{}", &key[0..1], &key[1..2], &key[2..3], &key)
Expand Down Expand Up @@ -587,7 +739,7 @@ pub fn storage_from_config(
}) => {
debug!("Init gcs cache with bucket {bucket}, key_prefix {key_prefix}");

let storage = GCSCache::build(
let storage = gcs::GCSCache::build(
bucket,
key_prefix,
cred_path.as_deref(),
Expand All @@ -596,8 +748,21 @@ pub fn storage_from_config(
credential_url.as_deref(),
)
.map_err(|err| anyhow!("create gcs cache failed: {err:?}"))?;
let updater = gcs::GCSCustomTimeUpdater::new(
bucket,
key_prefix,
cred_path.as_deref(),
service_account.as_deref(),
(*rw_mode).into(),
credential_url.as_deref(),
);

return Ok(Arc::new(storage));
let storage_with_updater = TimestampUpdatingStorage {
storage,
updater: Arc::new(TokioRwLock::new(updater)),
};

return Ok(Arc::new(storage_with_updater));
}
#[cfg(feature = "gha")]
CacheType::GHA(config::GHACacheConfig { ref version, .. }) => {
Expand Down
5 changes: 5 additions & 0 deletions src/cache/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ impl Storage for DiskCache {
Ok(self.rw_mode)
}

async fn timestamp_cache_hit(&self, _key: &str) -> Result<Option<Duration>> {
// Not supported.
Ok(None)
}

fn location(&self) -> String {
format!("Local disk: {:?}", self.lru.lock().unwrap().path())
}
Expand Down
Loading