Skip to content

Commit

Permalink
Merge pull request #228 from jkralik/eventBusKafka
Browse files Browse the repository at this point in the history
add eventbus for kafka
  • Loading branch information
maxekman authored Sep 24, 2018
2 parents bb58e35 + a1fe3f7 commit e0c5d00
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ Fully synchrounos. Useful for testing/experimentation.

Experimental driver.

### Kafka

https://github.com/Kistler-Group/eh-kafka

## Development

To develop Event Horizon you need to have Docker and Docker Compose installed.
Expand Down
10 changes: 5 additions & 5 deletions eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
// eventbus.AcceptanceTest(t, bus1, bus2)
// }
//
func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus) {
func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration) {
// Panic on nil matcher.
func() {
defer func() {
Expand Down Expand Up @@ -101,7 +101,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus) {

// Check for correct event in handler 1 or 2.
expectedEvents := []eh.Event{event1}
if !(handlerBus1.Wait(time.Second) || handlerBus2.Wait(time.Second)) {
if !(handlerBus1.Wait(timeout) || handlerBus2.Wait(timeout)) {
t.Error("did not receive event in time")
}
if !(mocks.EqualEvents(handlerBus1.Events, expectedEvents) ||
Expand Down Expand Up @@ -133,7 +133,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus) {
}

// Check the other handler.
if !anotherHandlerBus2.Wait(time.Second) {
if !anotherHandlerBus2.Wait(timeout) {
t.Error("did not receive event in time")
}
if !mocks.EqualEvents(anotherHandlerBus2.Events, expectedEvents) {
Expand All @@ -145,7 +145,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus) {
}

// Check observer 1.
if !observerBus1.Wait(time.Second) {
if !observerBus1.Wait(timeout) {
t.Error("did not receive event in time")
}
for i, event := range observerBus1.Events {
Expand All @@ -158,7 +158,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus) {
}

// Check observer 2.
if !observerBus2.Wait(time.Second) {
if !observerBus2.Wait(timeout) {
t.Error("did not receive event in time")
}
for i, event := range observerBus2.Events {
Expand Down
3 changes: 2 additions & 1 deletion eventbus/gcp/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"os"
"testing"
"time"

"github.com/looplab/eventhorizon/eventbus"
)
Expand Down Expand Up @@ -46,6 +47,6 @@ func TestEventBus(t *testing.T) {
t.Fatal("there should be no error:", err)
}

eventbus.AcceptanceTest(t, bus1, bus2)
eventbus.AcceptanceTest(t, bus1, bus2, time.Second)

}
3 changes: 2 additions & 1 deletion eventbus/local/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package local

import (
"testing"
"time"

"github.com/looplab/eventhorizon/eventbus"
)
Expand All @@ -36,6 +37,6 @@ func TestEventBus(t *testing.T) {
t.Fatal("there should be a bus")
}

eventbus.AcceptanceTest(t, bus1, bus2)
eventbus.AcceptanceTest(t, bus1, bus2, time.Second)

}

0 comments on commit e0c5d00

Please sign in to comment.