Skip to content

Commit

Permalink
Sample concurrency solutions
Browse files Browse the repository at this point in the history
  • Loading branch information
illicitonion committed Mar 31, 2024
1 parent 28f53c1 commit d7f15b2
Show file tree
Hide file tree
Showing 17 changed files with 971 additions and 0 deletions.
24 changes: 24 additions & 0 deletions projects/concurrency/atomics/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"fmt"
"sync"
"sync/atomic"
)

var x atomic.Int32

func increment(wg *sync.WaitGroup) {
x.Add(1)
wg.Done()
}

func main() {
var w sync.WaitGroup
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w)
}
w.Wait()
fmt.Println("final value of x", x.Load())
}
88 changes: 88 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cache

import (
"sync"
"sync/atomic"
"time"
)

func NewCache[K comparable, V any](targetSize uint64, garbageCollectionInterval time.Duration) *Cache[K, V] {
cache := &Cache[K, V]{
targetSize: targetSize,
values: make(map[K]*valueAndGeneration[V], targetSize),
}

go func() {
ticker := time.Tick(garbageCollectionInterval)
for range ticker {
currentGeneration := cache.currentGeneration.Load()
cache.currentGeneration.Add(1)

// Accumulate a keysToDelete slice so that we can collect the keys to delete under a read lock rather than holding a write lock for the entire GC cycle.
// This will use extra memory, and has a disadvantage that we may bump a generation from a Get but then still evict that value because we already decided to GC it.
var keysToDelete []K
cache.mu.RLock()
// If we have free space, don't garbage collect at all. This will probably lead to very spiky evictions.
if uint64(len(cache.values)) <= targetSize {
cache.mu.RUnlock()
continue
}
for k, v := range cache.values {
// This is a _very_ coarse-grained eviction policy. As soon as our cache becomes full, we may evict lots of entries.
// It may be more useful to treat different values of generation differently, e.g. always evict if v.generation < currentGeneration - 5, and only evict more recent entries if that didn't free up any space.
if v.generation.Load() != currentGeneration {
keysToDelete = append(keysToDelete, k)
}
}
cache.mu.RUnlock()
if len(keysToDelete) > 0 {
cache.mu.Lock()
for _, keyToDelete := range keysToDelete {
delete(cache.values, keyToDelete)
}
cache.mu.Unlock()
}
}
}()

return cache
}

// type Cache implements a roughly-LRU cache. It attempts to keep to a maximum of targetSize, but may contain more entries at points in time.
// When under size pressure, it garbage collects entries which haven't been read or written, with no strict eviction ordering guarantees.
type Cache[K comparable, V any] struct {
targetSize uint64

mu sync.RWMutex
// Every time we Get/Put a value, we store which generation it was last accessed.
// We have a garbage collection goroutine which will delete entries that haven't been recently accessed, if the cache is full.
currentGeneration atomic.Uint64
values map[K]*valueAndGeneration[V]
}

type valueAndGeneration[V any] struct {
value V
generation atomic.Uint64
}

func (c *Cache[K, V]) Put(key K, value V) bool {
c.mu.Lock()
defer c.mu.Unlock()
valueWrapper := &valueAndGeneration[V]{
value: value,
}
valueWrapper.generation.Store(c.currentGeneration.Load())
c.values[key] = valueWrapper
return false
}

func (c *Cache[K, V]) Get(key K) (*V, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
valueWrapper, ok := c.values[key]
if !ok {
return nil, false
}
valueWrapper.generation.Store(c.currentGeneration.Load())
return &valueWrapper.value, true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package cache

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPutThenGet(t *testing.T) {
cache := NewCache[string, string](10, 1*time.Millisecond)
previouslyExisted := cache.Put("greeting", "hello")
require.False(t, previouslyExisted)

time.Sleep(3 * time.Millisecond)

value, present := cache.Get("greeting")
require.True(t, present)
require.Equal(t, "hello", *value)
}

func TestGetMissing(t *testing.T) {
cache := NewCache[string, string](1, 1*time.Millisecond)
value, present := cache.Get("greeting")
require.False(t, present)
require.Nil(t, value)
}

func TestEviction_JustWrites(t *testing.T) {
cache := NewCache[string, string](10, 1*time.Millisecond)

for i := 0; i < 10; i++ {
cache.Put(fmt.Sprintf("entry-%d", i), "hello")
}

time.Sleep(3 * time.Millisecond)

_, present0 := cache.Get("entry-0")
require.True(t, present0)

_, present10 := cache.Get("entry-9")
require.True(t, present10)

cache.Put("entry-10", "hello")

time.Sleep(3 * time.Millisecond)

_, present1 := cache.Get("entry-1")
require.False(t, present1)
}
11 changes: 11 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/CodeYourFuture/immersive-go-course/projects/concurrency/lru_cache_coarse_grained_generations

go 1.21.5

require github.com/stretchr/testify v1.8.4

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions projects/concurrency/lru_cache_coarse_grained_generations/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
131 changes: 131 additions & 0 deletions projects/concurrency/lru_cache_computing/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package cache

import (
"container/list"
"sync"
)

// entryLimit and concurrentComputeLimit must both be non-zero.
// computer must never panic.
func NewCache[K comparable, V any](entryLimit uint64, concurrentComputeLimit uint64, computer func(K) V) *Cache[K, V] {
computeChannel := make(chan K, concurrentComputeLimit)

resultChannel := make(chan keyValuePair[K, V], concurrentComputeLimit)

for i := 0; i < int(concurrentComputeLimit); i++ {
go func() {
for key := range computeChannel {
value := computer(key)
resultChannel <- keyValuePair[K, V]{
key: key,
value: &value,
}
}
}()
}

cache := &Cache[K, V]{
entryLimit: entryLimit,
computeChannel: computeChannel,

computedEntries: make(map[K]cacheEntry[K, V], entryLimit),
pendingEntries: make(map[K]*channelList[K, V]),
evictionList: list.New(),
}

go func() {
for result := range resultChannel {
cache.mu.Lock()
pendingEntry := cache.pendingEntries[result.key]
delete(cache.pendingEntries, result.key)

if len(cache.computedEntries) == int(cache.entryLimit) {
keyToEvict := cache.evictionList.Remove(cache.evictionList.Back()).(K)
delete(cache.computedEntries, keyToEvict)
}

evictionListPointer := cache.evictionList.PushFront(result.key)

cache.computedEntries[result.key] = cacheEntry[K, V]{
evictionListPointer: evictionListPointer,
value: *result.value,
}
pendingEntry.mu.Lock()
pendingEntry.value = result.value
cache.mu.Unlock()
for _, ch := range pendingEntry.channels {
ch <- result
}
pendingEntry.mu.Unlock()
}
}()

return cache
}

type cacheEntry[K any, V any] struct {
evictionListPointer *list.Element
value V
}

type keyValuePair[K any, V any] struct {
key K
value *V
}

type channelList[K any, V any] struct {
mu sync.Mutex
channels []chan (keyValuePair[K, V])
value *V
}

type Cache[K comparable, V any] struct {
entryLimit uint64

computeChannel chan K

mu sync.Mutex
computedEntries map[K]cacheEntry[K, V]
pendingEntries map[K]*channelList[K, V]
// Front is most recently used, back is least recently used
evictionList *list.List
}

func (c *Cache[K, V]) Get(key K) (V, bool) {
c.mu.Lock()
computedEntry, isComputed := c.computedEntries[key]
pendingEntry, isPending := c.pendingEntries[key]
if isComputed {
c.evictionList.MoveToFront(computedEntry.evictionListPointer)
c.mu.Unlock()
return computedEntry.value, true
}
if !isPending {
pendingEntry = &channelList[K, V]{}
c.pendingEntries[key] = pendingEntry
}
c.mu.Unlock()
if !isPending {
c.computeChannel <- key
}

pendingEntry.mu.Lock()
// Maybe the value was computed but hasn't been transfered from pending to computed yet
if pendingEntry.value != nil {
pendingEntry.mu.Unlock()
return *pendingEntry.value, isPending
}
channel := make(chan keyValuePair[K, V], 1)
pendingEntry.channels = append(pendingEntry.channels, channel)
pendingEntry.mu.Unlock()
value := <-channel
return *value.value, isPending
}

// Only exists for testing. Doesn't count as a usage for LRU purposes.
func (c *Cache[K, V]) has(key K) bool {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.computedEntries[key]
return ok
}
Loading

0 comments on commit d7f15b2

Please sign in to comment.