Skip to content

Commit

Permalink
Bugfix // Fix race condition in Pop & PopN operation of ring buff…
Browse files Browse the repository at this point in the history
…er (#177)

* reproduce race condition

* double-checked locking to fix race condition

* lock immediately
  • Loading branch information
mapogolions authored Jan 14, 2025
1 parent 02bbf30 commit ea125b2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
10 changes: 6 additions & 4 deletions ringbuffer/ringbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func (rb *RingBuffer[T]) Len() int64 {
}

func (rb *RingBuffer[T]) Pop() (T, bool) {
if rb.Len() == 0 {
rb.mu.Lock()
if rb.len == 0 {
rb.mu.Unlock()
var t T
return t, false
}
rb.mu.Lock()
rb.content.head = (rb.content.head + 1) % rb.content.mod
item := rb.content.items[rb.content.head]
var t T
Expand All @@ -71,10 +72,11 @@ func (rb *RingBuffer[T]) Pop() (T, bool) {
}

func (rb *RingBuffer[T]) PopN(n int64) ([]T, bool) {
if rb.Len() == 0 {
rb.mu.Lock()
if rb.len == 0 {
rb.mu.Unlock()
return nil, false
}
rb.mu.Lock()
content := rb.content

if n >= rb.len {
Expand Down
56 changes: 56 additions & 0 deletions ringbuffer/ringbuffer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ringbuffer

import (
"sync"
"sync/atomic"
"testing"
)

Expand Down Expand Up @@ -37,3 +39,57 @@ func TestPushPopN(t *testing.T) {
}
}
}

func TestPopThreadSafety(t *testing.T) {
t.Run("Pop should be thread-safe", func(t *testing.T) {
testCase := func() {
rb := New[int](4)
rb.Push(1)
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
rb.Pop()
}()
}
wg.Wait()
if rb.Len() == -1 {
t.Fatal("item popped twice")
}
}

// Increase the number of iterations to raise the likelihood of reproducing the race condition
for i := 0; i < 100_000; i++ {
testCase()
}
})

t.Run("PopN should be thread-safe", func(t *testing.T) {
testCase := func() {
rb := New[int](4)
rb.Push(1)
counter := atomic.Int32{}
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, ok := rb.PopN(1)
if ok {
counter.Add(1)
}
}()
}
wg.Wait()
if counter.Load() > 1 {
t.Fatal("false positive item removal")
}
}

// Increase the number of iterations to raise the likelihood of reproducing the race condition
for i := 0; i < 100_000; i++ {
testCase()
}
})
}

0 comments on commit ea125b2

Please sign in to comment.