Skip to content

Commit

Permalink
Added new cache builder that uses calling context
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Oct 8, 2023
1 parent 481db33 commit 94af337
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 249 deletions.
95 changes: 95 additions & 0 deletions aedile-core/src/main/kotlin/com/sksamuel/aedile/core/Builder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.sksamuel.aedile.core

import com.github.benmanes.caffeine.cache.AsyncCacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.future.asCompletableFuture
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor

class Builder<K, V>(
private val defaultScope: CoroutineScope,
private val useCallingContext: Boolean,
private val caffeine: Caffeine<Any, Any>,
) {

/**
* Returns a [Cache] which suspends when requesting values.
*
* If the key is not present in the cache, returns null, unless a compute function
* is provided with the key.
*
* If the suspendable computation throws or computes a null value then the
* entry will be automatically removed.
*/
fun build(): Cache<K, V> {
return Cache(defaultScope, useCallingContext, caffeine.buildAsync())
}

/**
* Returns a [Cache] which suspends when requesting values.
*
* If the key does not exist, then the suspendable [compute] function is invoked
* to compute a value, unless a specific compute has been provided with the key.
*
* If the suspendable computation throws or computes a null value then the
* entry will be automatically removed.
*
*/
fun build(compute: suspend (K) -> V): LoadingCache<K, V> {
return LoadingCache(
defaultScope,
useCallingContext,
caffeine.buildAsync { key, _ -> defaultScope.async { compute(key) }.asCompletableFuture() })
}

/**
* Returns a [Cache] which suspends when requesting values.
*
* If the key does not exist, then the suspendable [compute] function is invoked
* to compute a value, unless a specific compute has been provided with the key.
*
* If the suspendable computation throws or computes a null value then the
* entry will be automatically removed.
*
* The [reloadCompute] function is invoked to refresh an entry if refreshAfterWrite
* is enabled or refresh is invoked. See full docs [AsyncCacheLoader.asyncReload].
*
*/
fun build(compute: suspend (K) -> V, reloadCompute: suspend (K, V) -> V): LoadingCache<K, V> {
return LoadingCache(
defaultScope,
useCallingContext,
caffeine.buildAsync(object : AsyncCacheLoader<K, V> {
override fun asyncLoad(key: K, executor: Executor?): CompletableFuture<out V> {
return defaultScope.async { compute(key) }.asCompletableFuture()
}

override fun asyncReload(key: K, oldValue: V, executor: Executor?): CompletableFuture<out V> {
return defaultScope.async { reloadCompute(key, oldValue) }.asCompletableFuture()
}
})
)
}

/**
* Returns a [Cache] which suspends when requesting values.
*
* If a requested key does not exist, then the suspendable [compute] function is invoked
* to compute the required values.
*
* If the suspendable computation throws or computes a null value then the
* entry will be automatically removed.
*
*/
fun buildAll(compute: suspend (Set<K>) -> Map<K, V>): LoadingCache<K, V> {
return LoadingCache(
defaultScope,
useCallingContext,
caffeine.buildAsync(AsyncCacheLoader.bulk { keys, _ ->
defaultScope.async { compute(keys) }.asCompletableFuture()
})
)
}
}
19 changes: 14 additions & 5 deletions aedile-core/src/main/kotlin/com/sksamuel/aedile/core/Cache.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.sksamuel.aedile.core

import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Caffeine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
Expand All @@ -10,8 +9,13 @@ import kotlinx.coroutines.future.asDeferred
import kotlinx.coroutines.future.await
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import kotlin.coroutines.coroutineContext

class Cache<K, V>(private val scope: CoroutineScope, private val cache: AsyncCache<K, V>) {
class Cache<K, V>(
private val defaultScope: CoroutineScope,
private val useCallingContext: Boolean,
private val cache: AsyncCache<K, V>
) {

fun underlying(): AsyncCache<K, V> = cache

Expand Down Expand Up @@ -49,6 +53,7 @@ class Cache<K, V>(private val scope: CoroutineScope, private val cache: AsyncCac
*
*/
suspend fun get(key: K, compute: suspend (K) -> V): V {
val scope = scope()
return cache.get(key) { k, _ -> scope.async { compute(k) }.asCompletableFuture() }.await()
}

Expand Down Expand Up @@ -82,6 +87,7 @@ class Cache<K, V>(private val scope: CoroutineScope, private val cache: AsyncCac
* @param compute the suspendable function that generate the value.
*/
suspend fun put(key: K, compute: suspend () -> V) {
val scope = scope()
cache.put(key, scope.async { compute() }.asCompletableFuture())
}

Expand All @@ -103,9 +109,8 @@ class Cache<K, V>(private val scope: CoroutineScope, private val cache: AsyncCac
}

suspend fun getAll(keys: Collection<K>, compute: suspend (Collection<K>) -> Map<K, V>): Map<K, V> {
return cache.getAll(
keys
) { ks: Set<K>, _: Executor ->
val scope = scope()
return cache.getAll(keys) { ks: Set<K>, _: Executor ->
scope.async { compute(ks) }.asCompletableFuture()
}.await()
}
Expand All @@ -127,4 +132,8 @@ class Cache<K, V>(private val scope: CoroutineScope, private val cache: AsyncCac
fun invalidateAll() {
cache.synchronous().invalidateAll()
}

private suspend fun scope(): CoroutineScope {
return if (useCallingContext) CoroutineScope(coroutineContext) else defaultScope
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.sksamuel.aedile.core

import com.github.benmanes.caffeine.cache.Caffeine
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.launch
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.toJavaDuration

/**
* Creates a [Builder] which by default uses the coroutine context from the caller for get/getall compute overrides.
* To delegate to a global dispatcher, set the dispatcher in the configuration.
*/
fun <K, V> cacheBuilder(configure: Configuration<K, V>.() -> Unit = {}): Builder<K, V> {

val c = Configuration<K, V>()
c.configure()
val caffeine = Caffeine.newBuilder()

val defaultScope = c.scope ?: CoroutineScope(c.dispatcher + CoroutineName("Aedile-Caffeine-Scope") + SupervisorJob())

c.evictionListener.let { listener ->
caffeine.evictionListener<K, V> { key, value, cause ->
defaultScope.launch {
listener.invoke(key, value, cause)
}
}
}

c.removalListener.let { listener ->
caffeine.removalListener<K, V> { key, value, cause ->
defaultScope.launch {
listener.invoke(key, value, cause)
}
}
}

c.initialCapacity?.let { caffeine.initialCapacity(it) }
c.ticker?.let { caffeine.ticker(it) }

c.maximumSize?.let { caffeine.maximumSize(it) }
c.maximumWeight?.let { caffeine.maximumWeight(it) }
c.weigher?.let { caffeine.weigher(it) }

c.expireAfterWrite?.let { caffeine.expireAfterWrite(it.toJavaDuration()) }
c.expireAfterAccess?.let { caffeine.expireAfterAccess(it.toJavaDuration()) }
c.expireAfter?.let { caffeine.expireAfter(it) }

c.refreshAfterWrite?.let { caffeine.refreshAfterWrite(it.toJavaDuration()) }
c.statsCounter?.let { counter -> caffeine.recordStats { counter } }

if (c.weakKeys == true) caffeine.weakKeys()
if (c.softValues == true) caffeine.softValues()

c.scheduler?.let { scheduler ->
caffeine.scheduler { _, command, delay, unit ->
scheduler.schedule(
{ command.run() },
unit.toNanos(delay).nanoseconds,
).asCompletableFuture()
}
}

return Builder(defaultScope, c.useCallingContext, caffeine)
}
115 changes: 115 additions & 0 deletions aedile-core/src/main/kotlin/com/sksamuel/aedile/core/Configuration.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.sksamuel.aedile.core

import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.Expiry
import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.stats.StatsCounter
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlin.time.Duration

data class Configuration<K, V>(

/**
* Sets the [CoroutineDispatcher] that is used when executing default build functions.
*/
var dispatcher: CoroutineDispatcher = Dispatchers.IO,

/**
* When set to true, the default, compute function overrides (those provided when invoking get/getAll)
* will use the callers coroutine context. When set to false, will use the [dispatcher] value.
*/
var useCallingContext: Boolean = true,

/**
* The [CoroutineScope] that is used to create coroutines for loading functions and listeners.
* If null, one will be created using the specified [dispatcher].
*/
var scope: CoroutineScope? = null,

/**
* See full docs at [Caffeine.refreshAfterWrite].
*/
var refreshAfterWrite: Duration? = null,

/**
* See full docs at [Caffeine.expireAfterAccess].
*/
var expireAfterAccess: Duration? = null,

/**
* See full docs at [Caffeine.expireAfterWrite].
*/
var expireAfterWrite: Duration? = null,

/**
* Specifies that each key (not value) stored in the cache should be wrapped in a WeakReference.
* See full docs at [Caffeine.weakKeys].
*/
var weakKeys: Boolean? = null,

/**
* Specifies that each value (not key) stored in the cache should be wrapped in a SoftReference.
* See full docs at [Caffeine.softValues].
*/
var softValues: Boolean? = null,

/**
* See full docs at [Caffeine.maximumWeight].
*/
var maximumWeight: Long? = null,

/**
* See full docs at [Caffeine.maximumSize].
*/
var maximumSize: Long? = null,

var statsCounter: StatsCounter? = null,

/**
* See full docs at [Caffeine.expireAfter].
*/
var expireAfter: Expiry<K, V>? = null,

/**
* Specifies a nanosecond-precision time source for use in determining when entries
* should be expired or refreshed. By default, System.nanoTime is used.
*
* See full docs at [Caffeine.ticker].
*/
var ticker: (() -> Long)? = null,

/**
* Specifies a listener that is notified each time an entry is evicted.
* See full docs at [Caffeine.evictionListener].
*/
var evictionListener: suspend (K?, V?, RemovalCause) -> Unit = { _, _, _ -> },

/**
* Specifies a listener that is notified each time an entry is removed.
* See full docs at [Caffeine.removalListener].
*/
var removalListener: suspend (K?, V?, RemovalCause) -> Unit = { _, _, _ -> },

/**
* Sets the minimum total size for the internal data structures.
*
* Providing a large enough estimate at construction time avoids the
* need for expensive resizing operations later,
* but setting this value unnecessarily high wastes memory.
*
* See full docs at [Caffeine.initialCapacity].
*/
var initialCapacity: Int? = null,

/**
* Specifies the weigher to use in determining the weight of entries.
* Entry weight is taken into consideration by maximumWeight(long) when determining which entries to evict.
*
* See full docs at [Caffeine.weigher].
*/
var weigher: ((K, V) -> Int)? = null,

var scheduler: Scheduler? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import kotlinx.coroutines.future.asCompletableFuture
import kotlinx.coroutines.future.asDeferred
import kotlinx.coroutines.future.await
import java.util.concurrent.CompletableFuture
import kotlin.coroutines.coroutineContext

class LoadingCache<K, V>(private val scope: CoroutineScope, private val cache: AsyncLoadingCache<K, V>) {
class LoadingCache<K, V>(
private val defaultScope: CoroutineScope,
private val useCallingContext: Boolean,
private val cache: AsyncLoadingCache<K, V>
) {

fun underlying(): AsyncLoadingCache<K, V> = cache

Expand Down Expand Up @@ -63,6 +68,7 @@ class LoadingCache<K, V>(private val scope: CoroutineScope, private val cache: A
* See full docs at [AsyncCache.getAll].
*/
suspend fun getAll(keys: Collection<K>, compute: suspend (Set<K>) -> Map<K, V>): Map<K, V> {
val scope = scope()
return cache.getAll(keys) { k, _ -> scope.async { compute(k.toSet()) }.asCompletableFuture() }.await()
}

Expand All @@ -79,6 +85,7 @@ class LoadingCache<K, V>(private val scope: CoroutineScope, private val cache: A
* If the suspendable computation throws, the entry will be automatically removed from this cache.
*/
suspend fun get(key: K, compute: suspend (K) -> V): V {
val scope = scope()
return cache.get(key) { k, _ -> scope.async { compute(k) }.asCompletableFuture() }.await()
}

Expand All @@ -97,6 +104,7 @@ class LoadingCache<K, V>(private val scope: CoroutineScope, private val cache: A
* @param compute the suspendable function that generate the value.
*/
suspend fun put(key: K, compute: suspend () -> V) {
val scope = scope()
cache.put(key, scope.async { compute() }.asCompletableFuture())
}

Expand Down Expand Up @@ -149,4 +157,8 @@ class LoadingCache<K, V>(private val scope: CoroutineScope, private val cache: A
fun invalidateAll() {
cache.synchronous().invalidateAll()
}

private suspend fun scope(): CoroutineScope {
return if (useCallingContext) CoroutineScope(coroutineContext) else defaultScope
}
}
Loading

0 comments on commit 94af337

Please sign in to comment.