diff --git a/projects/concurrency/atomics/main.go b/projects/concurrency/atomics/main.go new file mode 100644 index 000000000..2b6e144a9 --- /dev/null +++ b/projects/concurrency/atomics/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "sync" + "sync/atomic" +) + +var x atomic.Int32 + +func increment(wg *sync.WaitGroup) { + x.Add(1) + wg.Done() +} + +func main() { + var w sync.WaitGroup + for i := 0; i < 1000; i++ { + w.Add(1) + go increment(&w) + } + w.Wait() + fmt.Println("final value of x", x.Load()) +} diff --git a/projects/concurrency/lru_cache_coarse_grained_generations/cache.go b/projects/concurrency/lru_cache_coarse_grained_generations/cache.go new file mode 100644 index 000000000..c03c2bb21 --- /dev/null +++ b/projects/concurrency/lru_cache_coarse_grained_generations/cache.go @@ -0,0 +1,88 @@ +package cache + +import ( + "sync" + "sync/atomic" + "time" +) + +func NewCache[K comparable, V any](targetSize uint64, garbageCollectionInterval time.Duration) *Cache[K, V] { + cache := &Cache[K, V]{ + targetSize: targetSize, + values: make(map[K]*valueAndGeneration[V], targetSize), + } + + go func() { + ticker := time.Tick(garbageCollectionInterval) + for range ticker { + currentGeneration := cache.currentGeneration.Load() + cache.currentGeneration.Add(1) + + // Accumulate a keysToDelete slice so that we can collect the keys to delete under a read lock rather than holding a write lock for the entire GC cycle. + // This will use extra memory, and has a disadvantage that we may bump a generation from a Get but then still evict that value because we already decided to GC it. + var keysToDelete []K + cache.mu.RLock() + // If we have free space, don't garbage collect at all. This will probably lead to very spiky evictions. + if uint64(len(cache.values)) <= targetSize { + cache.mu.RUnlock() + continue + } + for k, v := range cache.values { + // This is a _very_ coarse-grained eviction policy. As soon as our cache becomes full, we may evict lots of entries. + // It may be more useful to treat different values of generation differently, e.g. always evict if v.generation < currentGeneration - 5, and only evict more recent entries if that didn't free up any space. + if v.generation.Load() != currentGeneration { + keysToDelete = append(keysToDelete, k) + } + } + cache.mu.RUnlock() + if len(keysToDelete) > 0 { + cache.mu.Lock() + for _, keyToDelete := range keysToDelete { + delete(cache.values, keyToDelete) + } + cache.mu.Unlock() + } + } + }() + + return cache +} + +// type Cache implements a roughly-LRU cache. It attempts to keep to a maximum of targetSize, but may contain more entries at points in time. +// When under size pressure, it garbage collects entries which haven't been read or written, with no strict eviction ordering guarantees. +type Cache[K comparable, V any] struct { + targetSize uint64 + + mu sync.RWMutex + // Every time we Get/Put a value, we store which generation it was last accessed. + // We have a garbage collection goroutine which will delete entries that haven't been recently accessed, if the cache is full. + currentGeneration atomic.Uint64 + values map[K]*valueAndGeneration[V] +} + +type valueAndGeneration[V any] struct { + value V + generation atomic.Uint64 +} + +func (c *Cache[K, V]) Put(key K, value V) bool { + c.mu.Lock() + defer c.mu.Unlock() + valueWrapper := &valueAndGeneration[V]{ + value: value, + } + valueWrapper.generation.Store(c.currentGeneration.Load()) + c.values[key] = valueWrapper + return false +} + +func (c *Cache[K, V]) Get(key K) (*V, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + valueWrapper, ok := c.values[key] + if !ok { + return nil, false + } + valueWrapper.generation.Store(c.currentGeneration.Load()) + return &valueWrapper.value, true +} diff --git a/projects/concurrency/lru_cache_coarse_grained_generations/cache_test.go b/projects/concurrency/lru_cache_coarse_grained_generations/cache_test.go new file mode 100644 index 000000000..0a8b749d5 --- /dev/null +++ b/projects/concurrency/lru_cache_coarse_grained_generations/cache_test.go @@ -0,0 +1,51 @@ +package cache + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPutThenGet(t *testing.T) { + cache := NewCache[string, string](10, 1*time.Millisecond) + previouslyExisted := cache.Put("greeting", "hello") + require.False(t, previouslyExisted) + + time.Sleep(3 * time.Millisecond) + + value, present := cache.Get("greeting") + require.True(t, present) + require.Equal(t, "hello", *value) +} + +func TestGetMissing(t *testing.T) { + cache := NewCache[string, string](1, 1*time.Millisecond) + value, present := cache.Get("greeting") + require.False(t, present) + require.Nil(t, value) +} + +func TestEviction_JustWrites(t *testing.T) { + cache := NewCache[string, string](10, 1*time.Millisecond) + + for i := 0; i < 10; i++ { + cache.Put(fmt.Sprintf("entry-%d", i), "hello") + } + + time.Sleep(3 * time.Millisecond) + + _, present0 := cache.Get("entry-0") + require.True(t, present0) + + _, present10 := cache.Get("entry-9") + require.True(t, present10) + + cache.Put("entry-10", "hello") + + time.Sleep(3 * time.Millisecond) + + _, present1 := cache.Get("entry-1") + require.False(t, present1) +} diff --git a/projects/concurrency/lru_cache_coarse_grained_generations/go.mod b/projects/concurrency/lru_cache_coarse_grained_generations/go.mod new file mode 100644 index 000000000..6151042f7 --- /dev/null +++ b/projects/concurrency/lru_cache_coarse_grained_generations/go.mod @@ -0,0 +1,11 @@ +module github.com/CodeYourFuture/immersive-go-course/projects/concurrency/lru_cache_coarse_grained_generations + +go 1.21.5 + +require github.com/stretchr/testify v1.8.4 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/projects/concurrency/lru_cache_coarse_grained_generations/go.sum b/projects/concurrency/lru_cache_coarse_grained_generations/go.sum new file mode 100644 index 000000000..fa4b6e682 --- /dev/null +++ b/projects/concurrency/lru_cache_coarse_grained_generations/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/projects/concurrency/lru_cache_computing/cache.go b/projects/concurrency/lru_cache_computing/cache.go new file mode 100644 index 000000000..a5b8a0fef --- /dev/null +++ b/projects/concurrency/lru_cache_computing/cache.go @@ -0,0 +1,131 @@ +package cache + +import ( + "container/list" + "sync" +) + +// entryLimit and concurrentComputeLimit must both be non-zero. +// computer must never panic. +func NewCache[K comparable, V any](entryLimit uint64, concurrentComputeLimit uint64, computer func(K) V) *Cache[K, V] { + computeChannel := make(chan K, concurrentComputeLimit) + + resultChannel := make(chan keyValuePair[K, V], concurrentComputeLimit) + + for i := 0; i < int(concurrentComputeLimit); i++ { + go func() { + for key := range computeChannel { + value := computer(key) + resultChannel <- keyValuePair[K, V]{ + key: key, + value: &value, + } + } + }() + } + + cache := &Cache[K, V]{ + entryLimit: entryLimit, + computeChannel: computeChannel, + + computedEntries: make(map[K]cacheEntry[K, V], entryLimit), + pendingEntries: make(map[K]*channelList[K, V]), + evictionList: list.New(), + } + + go func() { + for result := range resultChannel { + cache.mu.Lock() + pendingEntry := cache.pendingEntries[result.key] + delete(cache.pendingEntries, result.key) + + if len(cache.computedEntries) == int(cache.entryLimit) { + keyToEvict := cache.evictionList.Remove(cache.evictionList.Back()).(K) + delete(cache.computedEntries, keyToEvict) + } + + evictionListPointer := cache.evictionList.PushFront(result.key) + + cache.computedEntries[result.key] = cacheEntry[K, V]{ + evictionListPointer: evictionListPointer, + value: *result.value, + } + pendingEntry.mu.Lock() + pendingEntry.value = result.value + cache.mu.Unlock() + for _, ch := range pendingEntry.channels { + ch <- result + } + pendingEntry.mu.Unlock() + } + }() + + return cache +} + +type cacheEntry[K any, V any] struct { + evictionListPointer *list.Element + value V +} + +type keyValuePair[K any, V any] struct { + key K + value *V +} + +type channelList[K any, V any] struct { + mu sync.Mutex + channels []chan (keyValuePair[K, V]) + value *V +} + +type Cache[K comparable, V any] struct { + entryLimit uint64 + + computeChannel chan K + + mu sync.Mutex + computedEntries map[K]cacheEntry[K, V] + pendingEntries map[K]*channelList[K, V] + // Front is most recently used, back is least recently used + evictionList *list.List +} + +func (c *Cache[K, V]) Get(key K) (V, bool) { + c.mu.Lock() + computedEntry, isComputed := c.computedEntries[key] + pendingEntry, isPending := c.pendingEntries[key] + if isComputed { + c.evictionList.MoveToFront(computedEntry.evictionListPointer) + c.mu.Unlock() + return computedEntry.value, true + } + if !isPending { + pendingEntry = &channelList[K, V]{} + c.pendingEntries[key] = pendingEntry + } + c.mu.Unlock() + if !isPending { + c.computeChannel <- key + } + + pendingEntry.mu.Lock() + // Maybe the value was computed but hasn't been transfered from pending to computed yet + if pendingEntry.value != nil { + pendingEntry.mu.Unlock() + return *pendingEntry.value, isPending + } + channel := make(chan keyValuePair[K, V], 1) + pendingEntry.channels = append(pendingEntry.channels, channel) + pendingEntry.mu.Unlock() + value := <-channel + return *value.value, isPending +} + +// Only exists for testing. Doesn't count as a usage for LRU purposes. +func (c *Cache[K, V]) has(key K) bool { + c.mu.Lock() + defer c.mu.Unlock() + _, ok := c.computedEntries[key] + return ok +} diff --git a/projects/concurrency/lru_cache_computing/cache_test.go b/projects/concurrency/lru_cache_computing/cache_test.go new file mode 100644 index 000000000..6b35f112a --- /dev/null +++ b/projects/concurrency/lru_cache_computing/cache_test.go @@ -0,0 +1,121 @@ +package cache + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestGetTwice(t *testing.T) { + cache := NewCache[string, string](10, 1, func(string) string { + time.Sleep(10 * time.Millisecond) + return "hello" + }) + + timeBefore := time.Now() + + value0, previouslyExisted0 := cache.Get("greeting") + require.False(t, previouslyExisted0) + require.Equal(t, "hello", value0) + + value1, previouslyExisted1 := cache.Get("greeting") + require.True(t, previouslyExisted1) + require.Equal(t, "hello", value1) + + elapsedTime := time.Since(timeBefore) + + // Should have only been computed once + require.Less(t, elapsedTime, 20*time.Millisecond) +} + +func TestConcurrencyLimit(t *testing.T) { + cache := NewCache[string, string](10, 1, func(string) string { + time.Sleep(10 * time.Millisecond) + return "hello" + }) + + var wg sync.WaitGroup + wg.Add(2) + + timeBefore := time.Now() + + go func() { + value0, previouslyExisted0 := cache.Get("greeting0") + require.False(t, previouslyExisted0) + require.Equal(t, "hello", value0) + wg.Done() + }() + + go func() { + value1, previouslyExisted1 := cache.Get("greeting1") + require.False(t, previouslyExisted1) + require.Equal(t, "hello", value1) + wg.Done() + }() + + wg.Wait() + + elapsedTime := time.Since(timeBefore) + + require.Greater(t, elapsedTime, 19*time.Millisecond) +} + +func TestGetTwoDifferentValuesInParallel(t *testing.T) { + cache := NewCache[string, string](10, 2, func(string) string { + time.Sleep(10 * time.Millisecond) + return "hello" + }) + + var wg sync.WaitGroup + wg.Add(2) + + timeBefore := time.Now() + + go func() { + value0, previouslyExisted0 := cache.Get("greeting0") + require.False(t, previouslyExisted0) + require.Equal(t, "hello", value0) + wg.Done() + }() + + go func() { + value1, previouslyExisted1 := cache.Get("greeting1") + require.False(t, previouslyExisted1) + require.Equal(t, "hello", value1) + wg.Done() + }() + + wg.Wait() + + elapsedTime := time.Since(timeBefore) + + // Should have only been computed once + require.Less(t, elapsedTime, 20*time.Millisecond) +} + +func TestEvictsInOrder(t *testing.T) { + cache := NewCache[string, string](2, 2, func(string) string { + return "hello" + }) + + value0, previouslyExisted0 := cache.Get("greeting0") + require.False(t, previouslyExisted0) + require.Equal(t, "hello", value0) + + value1, previouslyExisted1 := cache.Get("greeting1") + require.False(t, previouslyExisted1) + require.Equal(t, "hello", value1) + + require.True(t, cache.has("greeting0")) + require.True(t, cache.has("greeting1")) + + value2, previespreviouslyExisted2 := cache.Get("greeting2") + require.False(t, previespreviouslyExisted2) + require.Equal(t, "hello", value2) + + require.False(t, cache.has("greeting0")) + require.True(t, cache.has("greeting1")) + require.True(t, cache.has("greeting2")) +} diff --git a/projects/concurrency/lru_cache_computing/go.mod b/projects/concurrency/lru_cache_computing/go.mod new file mode 100644 index 000000000..b5db741c7 --- /dev/null +++ b/projects/concurrency/lru_cache_computing/go.mod @@ -0,0 +1,11 @@ +module github.com/CodeYourFuture/immersive-go-course/projects/concurrency/lru_cache_computing + +go 1.21.5 + +require github.com/stretchr/testify v1.8.4 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/projects/concurrency/lru_cache_computing/go.sum b/projects/concurrency/lru_cache_computing/go.sum new file mode 100644 index 000000000..fa4b6e682 --- /dev/null +++ b/projects/concurrency/lru_cache_computing/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/projects/concurrency/lru_cache_everything_in_buckets/cache.go b/projects/concurrency/lru_cache_everything_in_buckets/cache.go new file mode 100644 index 000000000..a8e56d24f --- /dev/null +++ b/projects/concurrency/lru_cache_everything_in_buckets/cache.go @@ -0,0 +1,161 @@ +package cache + +import ( + "sync" + "time" +) + +type Cache[K comparable, V any] struct { + entryLimit int + + mu sync.Mutex + entries map[K]*cacheEntry[V] + unsuccessfulReads uint64 + evictedSuccessfulReads uint64 + evicted uint64 + evictedNeverRead uint64 +} + +type cacheEntry[V any] struct { + value V + lastAccess time.Time + reads uint64 +} + +func NewCache[K comparable, V any](entryLimit int) *Cache[K, V] { + return &Cache[K, V]{ + entryLimit: entryLimit, + entries: make(map[K]*cacheEntry[V]), + } +} + +// Put adds the value to the cache, and returns a boolean to indicate whether a value already existed in the cache for that key. +// If there was previously a value, it replaces that value with this one. +// Any Put counts as a refresh in terms of LRU tracking. +func (c *Cache[K, V]) Put(key K, value V) bool { + c.mu.Lock() + defer c.mu.Unlock() + + if len(c.entries) == c.entryLimit { + c.evict_locked() + } + + entry, alreadyPresent := c.entries[key] + if !alreadyPresent { + entry = &cacheEntry[V]{ + value: value, + lastAccess: time.Now(), + } + } else { + entry.value = value + entry.lastAccess = time.Now() + } + + c.entries[key] = entry + + return alreadyPresent +} + +// evict_locked removes the oldest entry from the cache. +// We name this _locked to show that it assumes c.mu is held by the caller when this is called. +// +// This function is very expensive - we need to look through every element in the cache to decide whether it's the one to evict. +// This is O(n), which is pretty bad. Because we potentially perform an eviction for every write, this means that writing to the cache is O(n). +func (c *Cache[K, V]) evict_locked() { + if len(c.entries) == 0 { + return + } + isFirst := true + var oldestKey K + var oldestTimestamp time.Time + for k, v := range c.entries { + if isFirst || v.lastAccess.Before(oldestTimestamp) { + oldestKey = k + oldestTimestamp = v.lastAccess + isFirst = false + } + } + toEvict := c.entries[oldestKey] + c.evictedSuccessfulReads += toEvict.reads + if toEvict.reads == 0 { + c.evictedNeverRead++ + } + c.evicted++ + delete(c.entries, oldestKey) +} + +// Get returns the value assocated with the passed key, and a boolean to indicate whether a value was known or not. If not, nil is returned as the value. +// Any Get counts as a refresh in terms of LRU tracking. +func (c *Cache[K, V]) Get(key K) (*V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + entry, present := c.entries[key] + if present { + entry.lastAccess = time.Now() + entry.reads++ + return &entry.value, present + } else { + c.unsuccessfulReads++ + return nil, false + } +} + +func (c *Cache[K, V]) Stats() CacheStats { + c.mu.Lock() + defer c.mu.Unlock() + + writtenNeverRead := c.evictedNeverRead + + var readsCurrentValues uint64 + + for _, entry := range c.entries { + readsCurrentValues += entry.reads + if entry.reads == 0 { + writtenNeverRead++ + } + } + + currentSize := uint64(len(c.entries)) + + return CacheStats{ + sucessfulReadsAllTime: c.evictedSuccessfulReads + readsCurrentValues, + unsuccessfulReadsAllTime: c.unsuccessfulReads, + writtenNeverReadAllTime: writtenNeverRead, + writesAllTime: c.evicted + currentSize, + readsCurrentValues: readsCurrentValues, + currentSize: currentSize, + } +} + +type CacheStats struct { + sucessfulReadsAllTime uint64 + unsuccessfulReadsAllTime uint64 + writtenNeverReadAllTime uint64 + writesAllTime uint64 + readsCurrentValues uint64 + currentSize uint64 +} + +func (c *CacheStats) HitRate() float64 { + return float64(c.sucessfulReadsAllTime) / (float64(c.sucessfulReadsAllTime) + float64(c.unsuccessfulReadsAllTime)) +} + +func (c *CacheStats) WrittenNeverRead() uint64 { + return c.writtenNeverReadAllTime +} + +func (c *CacheStats) AverageReadCountForCurrentEntries() float64 { + return float64(c.readsCurrentValues) / float64(c.currentSize) +} + +func (c *CacheStats) TotalReads() uint64 { + return c.sucessfulReadsAllTime + c.unsuccessfulReadsAllTime +} + +func (c *CacheStats) TotalSuccessfulReads() uint64 { + return c.sucessfulReadsAllTime +} + +func (c *CacheStats) TotalWrites() uint64 { + return c.writesAllTime +} diff --git a/projects/concurrency/lru_cache_everything_in_buckets/cache_test.go b/projects/concurrency/lru_cache_everything_in_buckets/cache_test.go new file mode 100644 index 000000000..be17499fb --- /dev/null +++ b/projects/concurrency/lru_cache_everything_in_buckets/cache_test.go @@ -0,0 +1,157 @@ +package cache + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPutThenGet(t *testing.T) { + cache := NewCache[string, string](10) + previouslyExisted := cache.Put("greeting", "hello") + require.False(t, previouslyExisted) + + value, present := cache.Get("greeting") + require.True(t, present) + require.Equal(t, "hello", *value) +} + +func TestGetMissing(t *testing.T) { + cache := NewCache[string, string](10) + value, present := cache.Get("greeting") + require.False(t, present) + require.Nil(t, value) +} + +func TestPutThenOverwriteSameValue(t *testing.T) { + cache := NewCache[string, string](10) + previouslyExisted1 := cache.Put("greeting", "hello") + require.False(t, previouslyExisted1) + + previouslyExisted2 := cache.Put("greeting", "hello") + require.True(t, previouslyExisted2) + + value, present := cache.Get("greeting") + require.True(t, present) + require.Equal(t, "hello", *value) +} + +func TestPutThenOverwriteDifferentValue(t *testing.T) { + cache := NewCache[string, string](10) + previouslyExisted1 := cache.Put("greeting", "hello") + require.False(t, previouslyExisted1) + + previouslyExisted2 := cache.Put("greeting", "howdy") + require.True(t, previouslyExisted2) + + value, present := cache.Get("greeting") + require.True(t, present) + require.Equal(t, "howdy", *value) +} + +func TestEviction_JustWrites(t *testing.T) { + cache := NewCache[string, string](10) + + for i := 0; i <= 10; i++ { + cache.Put(fmt.Sprintf("entry-%d", i), "hello") + } + _, present0 := cache.Get("entry-0") + require.False(t, present0) + + _, present10 := cache.Get("entry-10") + require.True(t, present10) +} + +func TestEviction_ReadsAndWrites(t *testing.T) { + cache := NewCache[string, string](10) + + for i := 0; i < 10; i++ { + cache.Put(fmt.Sprintf("entry-%d", i), "hello") + } + _, present0 := cache.Get("entry-0") + require.True(t, present0) + + cache.Put("entry-10", "hello") + + _, present0 = cache.Get("entry-0") + require.True(t, present0) + + _, present1 := cache.Get("entry-1") + require.False(t, present1) + + _, present10 := cache.Get("entry-10") + require.True(t, present10) +} + +func TestConcurrentPuts(t *testing.T) { + cache := NewCache[string, string](10) + + var startWaitGroup sync.WaitGroup + var endWaitGroup sync.WaitGroup + + for i := 0; i < 1000; i++ { + startWaitGroup.Add(1) + endWaitGroup.Add(1) + go func(i int) { + startWaitGroup.Wait() + cache.Put(fmt.Sprintf("entry-%d", i), "hello") + endWaitGroup.Done() + }(i) + } + startWaitGroup.Add(-1000) + endWaitGroup.Wait() + + sawEntries := 0 + for i := 0; i < 1000; i++ { + _, saw := cache.Get(fmt.Sprintf("entry-%d", i)) + if saw { + sawEntries++ + } + } + require.Equal(t, 10, sawEntries) +} + +func TestStats(t *testing.T) { + cache := NewCache[string, string](10) + + var startWaitGroup sync.WaitGroup + var endWaitGroup sync.WaitGroup + + for i := 0; i < 1000; i++ { + startWaitGroup.Add(1) + endWaitGroup.Add(1) + go func(i int) { + startWaitGroup.Wait() + cache.Put(fmt.Sprintf("entry-%d", i), "hello") + endWaitGroup.Done() + }(i) + } + startWaitGroup.Add(-1000) + endWaitGroup.Wait() + + var readWaitGroup sync.WaitGroup + var sawEntries atomic.Uint64 + for i := 0; i < 1000; i++ { + readWaitGroup.Add(1) + go func(i int) { + _, saw := cache.Get(fmt.Sprintf("entry-%d", i)) + if saw { + sawEntries.Add(1) + } + readWaitGroup.Done() + }(i) + } + readWaitGroup.Wait() + require.Equal(t, uint64(10), sawEntries.Load()) + + stats := cache.Stats() + require.Equal(t, 0.01, stats.HitRate()) + require.Equal(t, uint64(990), stats.WrittenNeverRead()) + require.Equal(t, float64(1), stats.AverageReadCountForCurrentEntries()) + require.Equal(t, uint64(1000), stats.TotalReads()) + require.Equal(t, uint64(10), stats.TotalSuccessfulReads()) + require.Equal(t, uint64(1000), stats.TotalWrites()) +} diff --git a/projects/concurrency/lru_cache_everything_in_buckets/go.mod b/projects/concurrency/lru_cache_everything_in_buckets/go.mod new file mode 100644 index 000000000..f9c8291f4 --- /dev/null +++ b/projects/concurrency/lru_cache_everything_in_buckets/go.mod @@ -0,0 +1,11 @@ +module github.com/CodeYourFuture/immersive-go-course/projects/concurrency/lru_cache_everything_in_buckets + +go 1.21.5 + +require github.com/stretchr/testify v1.8.4 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/projects/concurrency/lru_cache_everything_in_buckets/go.sum b/projects/concurrency/lru_cache_everything_in_buckets/go.sum new file mode 100644 index 000000000..fa4b6e682 --- /dev/null +++ b/projects/concurrency/lru_cache_everything_in_buckets/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/projects/concurrency/lru_cache_separate_eviction_list/cache.go b/projects/concurrency/lru_cache_separate_eviction_list/cache.go new file mode 100644 index 000000000..801fede99 --- /dev/null +++ b/projects/concurrency/lru_cache_separate_eviction_list/cache.go @@ -0,0 +1,153 @@ +package cache + +import ( + "container/list" + "sync" + "sync/atomic" +) + +type Cache[K comparable, V any] struct { + entryLimit int + + writes atomic.Uint64 + hits atomic.Uint64 + misses atomic.Uint64 + writtenNeverRead atomic.Int64 + + mu sync.Mutex + values map[K]*list.Element + // Front is most recent element, Back is next to be evicted. + // We use a *list.List here because moving elements within a *list.List, as well as adding a new element at one end, or finding the element at one end, is O(1), so all of the following are cheap: + // * Adding a new element to the eviction list. + // * Finding which element should be evicted next. + // * Moving an element from anywhere in the list to one end of it . + // The values inside each list element all have type *keyAndValueContainer[K, V]. + // Ideally this would be a generic type, but it pre-dates generics in the language, + // so we need to ourselves track what type we expect to put in here, and we need to assert the type when we get values out. + evictionList *list.List +} + +type keyAndValueContainer[K any, V any] struct { + key K + value V + readCount atomic.Uint64 +} + +func NewCache[K comparable, V any](entryLimit int) *Cache[K, V] { + evictionList := list.New() + return &Cache[K, V]{ + entryLimit: entryLimit, + // Pre-allocate the whole value map - this optimises for assuming our cache will be pretty full - if we expected it to be mostly empty, we may not pre-allocate here. + values: make(map[K]*list.Element, entryLimit), + evictionList: evictionList, + } +} + +// Put adds the value to the cache, and returns a boolean to indicate whether a value already existed in the cache for that key. +// If there was previously a value, it replaces that value with this one. +// Any Put counts as a refresh in terms of LRU tracking. +func (c *Cache[K, V]) Put(key K, value V) bool { + c.mu.Lock() + defer c.mu.Unlock() + _, alreadyKnown := c.values[key] + if !alreadyKnown && len(c.values) == c.entryLimit { + c.evict_locked() + } + keyAndValue := &keyAndValueContainer[K, V]{ + key: key, + value: value, + } + c.writes.Add(1) + c.writtenNeverRead.Add(1) + c.values[key] = c.evictionList.PushFront(keyAndValue) + return alreadyKnown +} + +// refresh_locked moves a particular key to be the last element to be evicted if it's known (or returns nil if not). +// We name this _locked to show that it assumes c.mu is held by the caller when this is called. +func (c *Cache[K, V]) refresh_locked(key K) *keyAndValueContainer[K, V] { + element, known := c.values[key] + if !known { + return nil + } + keyAndValue := c.evictionList.Remove(element) + c.evictionList.PushFront(keyAndValue) + return keyAndValue.(*keyAndValueContainer[K, V]) +} + +// evict_locked removes the oldest entry from the cache. +// We name this _locked to show that it assumes c.mu is held by the caller when this is called. +func (c *Cache[K, V]) evict_locked() { + delete(c.values, c.evictionList.Remove(c.evictionList.Back()).(*keyAndValueContainer[K, V]).key) +} + +// Get returns the value assocated with the passed key, and a boolean to indicate whether a value was known or not. If not, nil is returned as the value. +// Any Get counts as a refresh in terms of LRU tracking. +func (c *Cache[K, V]) Get(key K) (*V, bool) { + c.mu.Lock() + defer c.mu.Unlock() + if keyAndValue := c.refresh_locked(key); keyAndValue != nil { + c.hits.Add(1) + if keyAndValue.readCount.Add(1) == 1 { + // If this is the first read of the value, this value has moved from being "written never read", to now having been read. + c.writtenNeverRead.Add(-1) + } + return &keyAndValue.value, true + } + c.misses.Add(1) + return nil, false +} + +func (c *Cache[K, V]) Stats() CacheStats { + c.mu.Lock() + defer c.mu.Unlock() + + var readsCurrentValues uint64 + value := c.evictionList.Front() + for value != nil { + readsCurrentValues += value.Value.(*keyAndValueContainer[K, V]).readCount.Load() + value = value.Next() + } + + return CacheStats{ + sucessfulReadsAllTime: c.hits.Load(), + unsuccessfulReadsAllTime: c.misses.Load(), + writtenNeverReadAllTime: uint64(c.writtenNeverRead.Load()), + writesAllTime: c.writes.Load(), + readsCurrentValues: readsCurrentValues, + currentSize: uint64(len(c.values)), + } +} + +type CacheStats struct { + sucessfulReadsAllTime uint64 + unsuccessfulReadsAllTime uint64 + writtenNeverReadAllTime uint64 + writesAllTime uint64 + readsCurrentValues uint64 + currentSize uint64 +} + +func (c *CacheStats) HitRate() float64 { + return float64(c.sucessfulReadsAllTime) / (float64(c.sucessfulReadsAllTime) + float64(c.unsuccessfulReadsAllTime)) +} + +func (c *CacheStats) WrittenNeverRead() uint64 { + return c.writtenNeverReadAllTime +} + +func (c *CacheStats) AverageReadCountForCurrentEntries() float64 { + return float64(c.readsCurrentValues) / float64(c.currentSize) +} + +func (c *CacheStats) TotalReads() uint64 { + return c.sucessfulReadsAllTime + c.unsuccessfulReadsAllTime +} + +func (c *CacheStats) TotalSuccessfulReads() uint64 { + return c.sucessfulReadsAllTime +} + +func (c *CacheStats) TotalWrites() uint64 { + return c.writesAllTime +} diff --git a/projects/concurrency/lru_cache_separate_eviction_list/cache_test.go b/projects/concurrency/lru_cache_separate_eviction_list/cache_test.go new file mode 120000 index 000000000..01ff8c149 --- /dev/null +++ b/projects/concurrency/lru_cache_separate_eviction_list/cache_test.go @@ -0,0 +1 @@ +../lru_cache_everything_in_buckets/cache_test.go \ No newline at end of file diff --git a/projects/concurrency/lru_cache_separate_eviction_list/go.mod b/projects/concurrency/lru_cache_separate_eviction_list/go.mod new file mode 100644 index 000000000..94f1d3e39 --- /dev/null +++ b/projects/concurrency/lru_cache_separate_eviction_list/go.mod @@ -0,0 +1,11 @@ +module github.com/CodeYourFuture/immersive-go-course/projects/concurrency/lru_cache_separate_eviction_list + +go 1.21.5 + +require github.com/stretchr/testify v1.8.4 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/projects/concurrency/lru_cache_separate_eviction_list/go.sum b/projects/concurrency/lru_cache_separate_eviction_list/go.sum new file mode 100644 index 000000000..fa4b6e682 --- /dev/null +++ b/projects/concurrency/lru_cache_separate_eviction_list/go.sum @@ -0,0 +1,10 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=