From b8d92e2181844adc73173dc0324f5dcdd3132eb7 Mon Sep 17 00:00:00 2001 From: ankur22 Date: Thu, 29 Sep 2022 17:33:05 +0100 Subject: [PATCH 1/8] Add a queue per handler to keep events in order Events are sometimes out of order due to use using a goroutine to publish events to the handler concurrently, so there's no guarantee that the goroutine will run in the order they're created. To overcome this, a queue has been added per handler to synchronise all publishes of events to that handler. This should ensure that the ordering of events consumed by the handler matches what is emitted. We can't remove the goroutine in emitTo as this deadlocks due to handlers consuming and emitting events. Closes: https://github.com/grafana/xk6-browser/issues/553 --- common/event_emitter.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/common/event_emitter.go b/common/event_emitter.go index 02f9df8e8..9c9edaaf1 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -22,6 +22,7 @@ package common import ( "context" + "sync" ) // Ensure BaseEventEmitter implements the EventEmitter interface. @@ -97,8 +98,10 @@ type NavigationEvent struct { } type eventHandler struct { - ctx context.Context - ch chan Event + ctx context.Context + ch chan Event + queueMutex *sync.Mutex + queue []Event } // EventEmitter that all event emitters need to implement. @@ -166,8 +169,12 @@ func (e *BaseEventEmitter) sync(fn func()) { func (e *BaseEventEmitter) emit(event string, data interface{}) { emitEvent := func(eh eventHandler) { + eh.queueMutex.Lock() + defer eh.queueMutex.Unlock() + select { - case eh.ch <- Event{event, data}: + case eh.ch <- eh.queue[0]: + eh.queue = eh.queue[1:] case <-eh.ctx.Done(): // TODO: handle the error } @@ -180,6 +187,17 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { handlers = append(handlers[:i], handlers[i+1:]...) continue default: + // To ensure that goroutines don't break the ordering + // of the emitted events, we will need the goroutine to synchronize. + // This means that the events need to be stored in a queue per handler. The + // goroutine responsible for the the first popped element must complete + // publishing the event before the next goroutine can pop and work with + // the next event for that one handler. Each handler can process events concurrently still. + + handler.queueMutex.Lock() + handler.queue = append(handler.queue, Event{typ: event, data: data}) + handler.queueMutex.Unlock() + go emitEvent(handler) i++ } @@ -200,7 +218,7 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even if !ok { e.handlers[event] = make([]eventHandler, 0) } - eh := eventHandler{ctx, ch} + eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)} e.handlers[event] = append(e.handlers[event], eh) } }) @@ -209,6 +227,6 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even // OnAll registers a handler for all events. func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) { e.sync(func() { - e.handlersAll = append(e.handlersAll, eventHandler{ctx, ch}) + e.handlersAll = append(e.handlersAll, eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)}) }) } From 4f740e893abbca586d0f2f96d7cd10b80bb8c136 Mon Sep 17 00:00:00 2001 From: ankur22 Date: Wed, 5 Oct 2022 11:04:28 +0100 Subject: [PATCH 2/8] Refactor handlers to be a pointer This will allow us to always work with the same handler instead of the copy of one. --- common/event_emitter.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/common/event_emitter.go b/common/event_emitter.go index 9c9edaaf1..8a7328ce9 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -117,8 +117,8 @@ type syncFunc func() (done chan struct{}) // BaseEventEmitter emits events to registered handlers. type BaseEventEmitter struct { - handlers map[string][]eventHandler - handlersAll []eventHandler + handlers map[string][]*eventHandler + handlersAll []*eventHandler syncCh chan syncFunc ctx context.Context @@ -127,7 +127,7 @@ type BaseEventEmitter struct { // NewBaseEventEmitter creates a new instance of a base event emitter. func NewBaseEventEmitter(ctx context.Context) BaseEventEmitter { bem := BaseEventEmitter{ - handlers: make(map[string][]eventHandler), + handlers: make(map[string][]*eventHandler), syncCh: make(chan syncFunc), ctx: ctx, } @@ -168,7 +168,7 @@ func (e *BaseEventEmitter) sync(fn func()) { } func (e *BaseEventEmitter) emit(event string, data interface{}) { - emitEvent := func(eh eventHandler) { + emitEvent := func(eh *eventHandler) { eh.queueMutex.Lock() defer eh.queueMutex.Unlock() @@ -179,7 +179,7 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { // TODO: handle the error } } - emitTo := func(handlers []eventHandler) (updated []eventHandler) { + emitTo := func(handlers []*eventHandler) (updated []*eventHandler) { for i := 0; i < len(handlers); { handler := handlers[i] select { @@ -216,10 +216,10 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even for _, event := range events { _, ok := e.handlers[event] if !ok { - e.handlers[event] = make([]eventHandler, 0) + e.handlers[event] = make([]*eventHandler, 0) } eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)} - e.handlers[event] = append(e.handlers[event], eh) + e.handlers[event] = append(e.handlers[event], &eh) } }) } @@ -227,6 +227,6 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even // OnAll registers a handler for all events. func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) { e.sync(func() { - e.handlersAll = append(e.handlersAll, eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)}) + e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)}) }) } From 9f65b6b0a29cd6d5ddc6a8626f7430a8ddf8c531 Mon Sep 17 00:00:00 2001 From: ankur22 Date: Wed, 5 Oct 2022 15:52:09 +0100 Subject: [PATCH 3/8] Add double queue for ordered thread safe system There are two criterias that need to be fulfilled. One is to be able to send and receive events in the same order that were emitted. The second criteria is that a handler must be able to receive and emit events. To enable this we've used a double queue system where by threads can be emitting events, which are queued up, and the emitEvent goroutines can read events from a different queue without deadlocking. When the queue that emitEvent goroutines use is depleted, the queues are swapped around so that the emitters are now filling in the empty queue and the emitEvent goroutines are reading from the filled queue. --- common/event_emitter.go | 57 ++++++++++----- common/event_emitter_test.go | 134 +++++++++++++++++++++++++++++++++++ 2 files changed, 174 insertions(+), 17 deletions(-) diff --git a/common/event_emitter.go b/common/event_emitter.go index 8a7328ce9..13726d8ac 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -98,10 +98,12 @@ type NavigationEvent struct { } type eventHandler struct { - ctx context.Context - ch chan Event - queueMutex *sync.Mutex - queue []Event + ctx context.Context + ch chan Event + queueMutex *sync.Mutex + queue []Event + curQueueMutex *sync.Mutex + curQueue []Event } // EventEmitter that all event emitters need to implement. @@ -169,12 +171,25 @@ func (e *BaseEventEmitter) sync(fn func()) { func (e *BaseEventEmitter) emit(event string, data interface{}) { emitEvent := func(eh *eventHandler) { - eh.queueMutex.Lock() - defer eh.queueMutex.Unlock() + eh.curQueueMutex.Lock() + defer eh.curQueueMutex.Unlock() + + // We try to read from the current queue (curQueue) + // If there isn't anything on curQueue then there must + // be something being populated by the synched emitTo + // func below. Swap around curQueue with queue. Queue + // is now being populated again by emitTo, and all + // emitEvent goroutines can continue to consume from + // curQueue until that is again depleted. + if len(eh.curQueue) == 0 { + eh.queueMutex.Lock() + eh.curQueue, eh.queue = eh.queue, eh.curQueue + eh.queueMutex.Unlock() + } select { - case eh.ch <- eh.queue[0]: - eh.queue = eh.queue[1:] + case eh.ch <- eh.curQueue[0]: + eh.curQueue = eh.curQueue[1:] case <-eh.ctx.Done(): // TODO: handle the error } @@ -187,13 +202,6 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { handlers = append(handlers[:i], handlers[i+1:]...) continue default: - // To ensure that goroutines don't break the ordering - // of the emitted events, we will need the goroutine to synchronize. - // This means that the events need to be stored in a queue per handler. The - // goroutine responsible for the the first popped element must complete - // publishing the event before the next goroutine can pop and work with - // the next event for that one handler. Each handler can process events concurrently still. - handler.queueMutex.Lock() handler.queue = append(handler.queue, Event{typ: event, data: data}) handler.queueMutex.Unlock() @@ -218,7 +226,14 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even if !ok { e.handlers[event] = make([]*eventHandler, 0) } - eh := eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)} + eh := eventHandler{ + ctx, + ch, + &sync.Mutex{}, + make([]Event, 0), + &sync.Mutex{}, + make([]Event, 0), + } e.handlers[event] = append(e.handlers[event], &eh) } }) @@ -227,6 +242,14 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even // OnAll registers a handler for all events. func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) { e.sync(func() { - e.handlersAll = append(e.handlersAll, &eventHandler{ctx, ch, &sync.Mutex{}, make([]Event, 0)}) + e.handlersAll = append(e.handlersAll, + &eventHandler{ + ctx, + ch, + &sync.Mutex{}, + make([]Event, 0), + &sync.Mutex{}, + make([]Event, 0), + }) }) } diff --git a/common/event_emitter_test.go b/common/event_emitter_test.go index 2e37a8d9d..d459037b8 100644 --- a/common/event_emitter_test.go +++ b/common/event_emitter_test.go @@ -22,9 +22,11 @@ package common import ( "context" + "sync" "testing" "github.com/chromedp/cdproto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -125,3 +127,135 @@ func TestEventEmitterAllEvents(t *testing.T) { }) }) } + +func TestBaseEventEmitter(t *testing.T) { + t.Parallel() + + t.Run("order of emitted events kept", func(t *testing.T) { + // Test description + // + // 1. Emit many events from the emitWorker. + // 2. Handler receives the emitted events. + // + // Success criteria: Ensure that the ordering of events is + // received in the order they're emitted. + + t.Parallel() + + eventName := "AtomicIntEvent" + maxInt := 100 + + ctx, cancel := context.WithCancel(context.Background()) + emitter := NewBaseEventEmitter(ctx) + ch := make(chan Event) + emitter.on(ctx, []string{eventName}, ch) + + wg := sync.WaitGroup{} + + var expectedI int + wg.Add(1) + handler := func() { + defer wg.Done() + + for expectedI != maxInt { + e := <-ch + + i, ok := e.data.(int) + if !ok { + assert.Fail(t, "unexpected type read from channel", e.data) + } + + assert.Equal(t, eventName, e.typ) + assert.Equal(t, expectedI, i) + + expectedI++ + } + + cancel() + close(ch) + } + go handler() + + wg.Add(1) + emitWorker := func() { + defer wg.Done() + + for i := 0; i < maxInt; i++ { + i := i + emitter.emit(eventName, i) + } + } + go emitWorker() + + wg.Wait() + }) + + t.Run("handler can emit without deadlocking", func(t *testing.T) { + // Test description + // + // 1. Emit many events from the emitWorker. + // 2. Handler receives emitted events (AtomicIntEvent1). + // 3. Handler emits event as AtomicIntEvent2. + // 4. Handler received emitted events again (AtomicIntEvent2). + // + // Success criteria: No deadlock should occur between receiving, + // emitting, and receiving of events. + + t.Parallel() + + eventName1 := "AtomicIntEvent1" + eventName2 := "AtomicIntEvent2" + maxInt := 100 + + ctx, cancel := context.WithCancel(context.Background()) + emitter := NewBaseEventEmitter(ctx) + ch := make(chan Event) + emitter.on(ctx, []string{eventName1, eventName2}, ch) + + wg := sync.WaitGroup{} + + var expectedI2 int + wg.Add(1) + handler := func() { + defer wg.Done() + + for { + if expectedI2 == maxInt { + break + } + + e := <-ch + + switch e.typ { + case eventName1: + i, ok := e.data.(int) + if !ok { + assert.Fail(t, "unexpected type read from channel", e.data) + } + emitter.emit(eventName2, i) + case eventName2: + expectedI2++ + default: + assert.Fail(t, "unexpected event type received") + } + } + + cancel() + close(ch) + } + go handler() + + wg.Add(1) + emitWorker := func() { + defer wg.Done() + + for i := 0; i < maxInt; i++ { + i := i + emitter.emit(eventName1, i) + } + } + go emitWorker() + + wg.Wait() + }) +} From 3fdba694f220dab3053c07e1c3d22af31204cde9 Mon Sep 17 00:00:00 2001 From: Ankur Date: Thu, 6 Oct 2022 11:50:10 +0100 Subject: [PATCH 4/8] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: İnanç Gümüş --- common/event_emitter.go | 28 ++++------------------------ 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/common/event_emitter.go b/common/event_emitter.go index 13726d8ac..a344329e4 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -100,9 +100,9 @@ type NavigationEvent struct { type eventHandler struct { ctx context.Context ch chan Event - queueMutex *sync.Mutex + queueMutex sync.Mutex queue []Event - curQueueMutex *sync.Mutex + curQueueMutex sync.Mutex curQueue []Event } @@ -222,19 +222,7 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Event) { e.sync(func() { for _, event := range events { - _, ok := e.handlers[event] - if !ok { - e.handlers[event] = make([]*eventHandler, 0) - } - eh := eventHandler{ - ctx, - ch, - &sync.Mutex{}, - make([]Event, 0), - &sync.Mutex{}, - make([]Event, 0), - } - e.handlers[event] = append(e.handlers[event], &eh) + e.handlers[event] = append(e.handlers[event], &eventHandler{ctx: ctx, ch: ch}) } }) } @@ -242,14 +230,6 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even // OnAll registers a handler for all events. func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) { e.sync(func() { - e.handlersAll = append(e.handlersAll, - &eventHandler{ - ctx, - ch, - &sync.Mutex{}, - make([]Event, 0), - &sync.Mutex{}, - make([]Event, 0), - }) + e.handlersAll = append(e.handlersAll, &eventHandler{ctx: ctx, ch: ch}) }) } From a48b555a99d0b64f006dfaceec47efb9b1add72c Mon Sep 17 00:00:00 2001 From: ankur22 Date: Thu, 6 Oct 2022 12:49:56 +0100 Subject: [PATCH 5/8] Add a timeout for the emitter unit tests This will help us to diagnose any deadlocks. The unit tests should pass within the 2 second limit. Resolves: https://github.com/grafana/xk6-browser/pull/555#discussion_r988830133 --- common/event_emitter_test.go | 44 ++++++++++++++---------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/common/event_emitter_test.go b/common/event_emitter_test.go index d459037b8..a09d3bd6a 100644 --- a/common/event_emitter_test.go +++ b/common/event_emitter_test.go @@ -22,8 +22,8 @@ package common import ( "context" - "sync" "testing" + "time" "github.com/chromedp/cdproto" "github.com/stretchr/testify/assert" @@ -150,19 +150,16 @@ func TestBaseEventEmitter(t *testing.T) { ch := make(chan Event) emitter.on(ctx, []string{eventName}, ch) - wg := sync.WaitGroup{} - var expectedI int - wg.Add(1) handler := func() { - defer wg.Done() + defer cancel() for expectedI != maxInt { e := <-ch i, ok := e.data.(int) if !ok { - assert.Fail(t, "unexpected type read from channel", e.data) + assert.FailNow(t, "unexpected type read from channel", e.data) } assert.Equal(t, eventName, e.typ) @@ -171,15 +168,11 @@ func TestBaseEventEmitter(t *testing.T) { expectedI++ } - cancel() close(ch) } go handler() - wg.Add(1) emitWorker := func() { - defer wg.Done() - for i := 0; i < maxInt; i++ { i := i emitter.emit(eventName, i) @@ -187,7 +180,11 @@ func TestBaseEventEmitter(t *testing.T) { } go emitWorker() - wg.Wait() + select { + case <-ctx.Done(): + case <-time.After(time.Second * 2): + assert.FailNow(t, "test timed out, deadlock?") + } }) t.Run("handler can emit without deadlocking", func(t *testing.T) { @@ -212,43 +209,32 @@ func TestBaseEventEmitter(t *testing.T) { ch := make(chan Event) emitter.on(ctx, []string{eventName1, eventName2}, ch) - wg := sync.WaitGroup{} - var expectedI2 int - wg.Add(1) handler := func() { - defer wg.Done() - - for { - if expectedI2 == maxInt { - break - } + defer cancel() + for expectedI2 != maxInt { e := <-ch switch e.typ { case eventName1: i, ok := e.data.(int) if !ok { - assert.Fail(t, "unexpected type read from channel", e.data) + assert.FailNow(t, "unexpected type read from channel", e.data) } emitter.emit(eventName2, i) case eventName2: expectedI2++ default: - assert.Fail(t, "unexpected event type received") + assert.FailNow(t, "unexpected event type received") } } - cancel() close(ch) } go handler() - wg.Add(1) emitWorker := func() { - defer wg.Done() - for i := 0; i < maxInt; i++ { i := i emitter.emit(eventName1, i) @@ -256,6 +242,10 @@ func TestBaseEventEmitter(t *testing.T) { } go emitWorker() - wg.Wait() + select { + case <-ctx.Done(): + case <-time.After(time.Second * 2): + assert.FailNow(t, "test timed out, deadlock?") + } }) } From 8421cbace6568c323108a53f43962c97a42d237e Mon Sep 17 00:00:00 2001 From: ankur22 Date: Fri, 7 Oct 2022 00:09:31 +0100 Subject: [PATCH 6/8] Refactor handler to only work with one queue Each handler must only work with a single queue, just as it shares the channel, regardless of how many events it has subscribed to. This will help ensure that the order of events is kept across all event types per handler. --- common/event_emitter.go | 57 ++++++++++++++++++++++----------- common/event_emitter_test.go | 62 ++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 19 deletions(-) diff --git a/common/event_emitter.go b/common/event_emitter.go index a344329e4..bf6a8f46e 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -97,13 +97,17 @@ type NavigationEvent struct { err error } +type queue struct { + writeMutex sync.Mutex + write []Event + readMutex sync.Mutex + read []Event +} + type eventHandler struct { - ctx context.Context - ch chan Event - queueMutex sync.Mutex - queue []Event - curQueueMutex sync.Mutex - curQueue []Event + ctx context.Context + ch chan Event + queue *queue } // EventEmitter that all event emitters need to implement. @@ -122,6 +126,8 @@ type BaseEventEmitter struct { handlers map[string][]*eventHandler handlersAll []*eventHandler + queues map[chan Event]*queue + syncCh chan syncFunc ctx context.Context } @@ -132,6 +138,7 @@ func NewBaseEventEmitter(ctx context.Context) BaseEventEmitter { handlers: make(map[string][]*eventHandler), syncCh: make(chan syncFunc), ctx: ctx, + queues: make(map[chan Event]*queue), } go bem.syncAll(ctx) return bem @@ -171,8 +178,8 @@ func (e *BaseEventEmitter) sync(fn func()) { func (e *BaseEventEmitter) emit(event string, data interface{}) { emitEvent := func(eh *eventHandler) { - eh.curQueueMutex.Lock() - defer eh.curQueueMutex.Unlock() + eh.queue.readMutex.Lock() + defer eh.queue.readMutex.Unlock() // We try to read from the current queue (curQueue) // If there isn't anything on curQueue then there must @@ -181,15 +188,15 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { // is now being populated again by emitTo, and all // emitEvent goroutines can continue to consume from // curQueue until that is again depleted. - if len(eh.curQueue) == 0 { - eh.queueMutex.Lock() - eh.curQueue, eh.queue = eh.queue, eh.curQueue - eh.queueMutex.Unlock() + if len(eh.queue.read) == 0 { + eh.queue.writeMutex.Lock() + eh.queue.read, eh.queue.write = eh.queue.write, eh.queue.read + eh.queue.writeMutex.Unlock() } select { - case eh.ch <- eh.curQueue[0]: - eh.curQueue = eh.curQueue[1:] + case eh.ch <- eh.queue.read[0]: + eh.queue.read = eh.queue.read[1:] case <-eh.ctx.Done(): // TODO: handle the error } @@ -202,9 +209,9 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { handlers = append(handlers[:i], handlers[i+1:]...) continue default: - handler.queueMutex.Lock() - handler.queue = append(handler.queue, Event{typ: event, data: data}) - handler.queueMutex.Unlock() + handler.queue.writeMutex.Lock() + handler.queue.write = append(handler.queue.write, Event{typ: event, data: data}) + handler.queue.writeMutex.Unlock() go emitEvent(handler) i++ @@ -221,8 +228,14 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { // On registers a handler for a specific event. func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Event) { e.sync(func() { + q, ok := e.queues[ch] + if !ok { + q = &queue{} + e.queues[ch] = q + } + for _, event := range events { - e.handlers[event] = append(e.handlers[event], &eventHandler{ctx: ctx, ch: ch}) + e.handlers[event] = append(e.handlers[event], &eventHandler{ctx: ctx, ch: ch, queue: q}) } }) } @@ -230,6 +243,12 @@ func (e *BaseEventEmitter) on(ctx context.Context, events []string, ch chan Even // OnAll registers a handler for all events. func (e *BaseEventEmitter) onAll(ctx context.Context, ch chan Event) { e.sync(func() { - e.handlersAll = append(e.handlersAll, &eventHandler{ctx: ctx, ch: ch}) + q, ok := e.queues[ch] + if !ok { + q = &queue{} + e.queues[ch] = q + } + + e.handlersAll = append(e.handlersAll, &eventHandler{ctx: ctx, ch: ch, queue: q}) }) } diff --git a/common/event_emitter_test.go b/common/event_emitter_test.go index a09d3bd6a..0e8ad0a1d 100644 --- a/common/event_emitter_test.go +++ b/common/event_emitter_test.go @@ -128,6 +128,7 @@ func TestEventEmitterAllEvents(t *testing.T) { }) } +//nolint:gocognit func TestBaseEventEmitter(t *testing.T) { t.Parallel() @@ -187,6 +188,67 @@ func TestBaseEventEmitter(t *testing.T) { } }) + t.Run("order of emitted different event types kept", func(t *testing.T) { + // Test description + // + // 1. Emit many different event types from the emitWorker. + // 2. Handler receives the emitted events. + // + // Success criteria: Ensure that the ordering of events is + // received in the order they're emitted. + + t.Parallel() + + eventName1 := "AtomicIntEvent1" + eventName2 := "AtomicIntEvent2" + eventName3 := "AtomicIntEvent3" + eventName4 := "AtomicIntEvent4" + maxInt := 100 + + ctx, cancel := context.WithCancel(context.Background()) + emitter := NewBaseEventEmitter(ctx) + ch := make(chan Event) + emitter.on(ctx, []string{eventName1, eventName2, eventName3, eventName4}, ch) + + var expectedI int + handler := func() { + defer cancel() + + for expectedI != maxInt { + e := <-ch + + i, ok := e.data.(int) + if !ok { + assert.FailNow(t, "unexpected type read from channel", e.data) + } + + assert.Equal(t, expectedI, i) + + expectedI++ + } + + close(ch) + } + go handler() + + emitWorker := func() { + for i := 0; i < maxInt; i += 4 { + i := i + emitter.emit(eventName1, i) + emitter.emit(eventName2, i+1) + emitter.emit(eventName3, i+2) + emitter.emit(eventName4, i+3) + } + } + go emitWorker() + + select { + case <-ctx.Done(): + case <-time.After(time.Second * 2): + assert.FailNow(t, "test timed out, deadlock?") + } + }) + t.Run("handler can emit without deadlocking", func(t *testing.T) { // Test description // From 901eddf2055a143e3145773a9478fa462d2957d4 Mon Sep 17 00:00:00 2001 From: ankur22 Date: Fri, 7 Oct 2022 09:55:37 +0100 Subject: [PATCH 7/8] Add something --- common/event_emitter_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/event_emitter_test.go b/common/event_emitter_test.go index 0e8ad0a1d..8f52baf9b 100644 --- a/common/event_emitter_test.go +++ b/common/event_emitter_test.go @@ -208,7 +208,10 @@ func TestBaseEventEmitter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) emitter := NewBaseEventEmitter(ctx) ch := make(chan Event) - emitter.on(ctx, []string{eventName1, eventName2, eventName3, eventName4}, ch) + // Calling on twice to ensure that the same queue is used + // internally for the same channel and handler. + emitter.on(ctx, []string{eventName1, eventName2}, ch) + emitter.on(ctx, []string{eventName3, eventName4}, ch) var expectedI int handler := func() { From e40feb81c88c899e833e5df783fd96a15d2284a7 Mon Sep 17 00:00:00 2001 From: Ankur Date: Fri, 7 Oct 2022 09:57:18 +0100 Subject: [PATCH 8/8] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: İnanç Gümüş --- common/event_emitter.go | 11 ++++++----- common/event_emitter_test.go | 3 --- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/common/event_emitter.go b/common/event_emitter.go index bf6a8f46e..9be567b19 100644 --- a/common/event_emitter.go +++ b/common/event_emitter.go @@ -181,13 +181,14 @@ func (e *BaseEventEmitter) emit(event string, data interface{}) { eh.queue.readMutex.Lock() defer eh.queue.readMutex.Unlock() - // We try to read from the current queue (curQueue) - // If there isn't anything on curQueue then there must + // We try to read from the read queue (queue.read). + // If there isn't anything on the read queue, then there must // be something being populated by the synched emitTo - // func below. Swap around curQueue with queue. Queue - // is now being populated again by emitTo, and all + // func below. + // Swap around the read queue with the write queue. + // Queue is now being populated again by emitTo, and all // emitEvent goroutines can continue to consume from - // curQueue until that is again depleted. + // the read queue until that is again depleted. if len(eh.queue.read) == 0 { eh.queue.writeMutex.Lock() eh.queue.read, eh.queue.write = eh.queue.write, eh.queue.read diff --git a/common/event_emitter_test.go b/common/event_emitter_test.go index 8f52baf9b..703832378 100644 --- a/common/event_emitter_test.go +++ b/common/event_emitter_test.go @@ -175,7 +175,6 @@ func TestBaseEventEmitter(t *testing.T) { emitWorker := func() { for i := 0; i < maxInt; i++ { - i := i emitter.emit(eventName, i) } } @@ -236,7 +235,6 @@ func TestBaseEventEmitter(t *testing.T) { emitWorker := func() { for i := 0; i < maxInt; i += 4 { - i := i emitter.emit(eventName1, i) emitter.emit(eventName2, i+1) emitter.emit(eventName3, i+2) @@ -301,7 +299,6 @@ func TestBaseEventEmitter(t *testing.T) { emitWorker := func() { for i := 0; i < maxInt; i++ { - i := i emitter.emit(eventName1, i) } }