Skip to content

Commit

Permalink
add missing coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Aug 18, 2024
1 parent e9b9ef0 commit 3fa6be3
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 28 deletions.
1 change: 1 addition & 0 deletions flow/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestBatch_Ptr(t *testing.T) {
go func() {
source.
Via(batch).
Via(flow.NewPassThrough()). // Via coverage
To(sink)
}()

Expand Down
20 changes: 12 additions & 8 deletions flow/reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,23 @@ func TestReduce(t *testing.T) {

if tt.ptr {
ingestSlice(ptrSlice(input), in)
} else {
ingestSlice(input, in)
}
close(in)
close(in)

source.
Via(tt.reduceFlow).
To(sink)
source.
Via(tt.reduceFlow).
To(sink)

if tt.ptr {
output := readSlicePtr[int](out)
assert.Equal(t, ptrSlice(expected), output)
} else {
ingestSlice(input, in)
close(in)

source.
Via(tt.reduceFlow).
Via(flow.NewPassThrough()). // Via coverage
To(sink)

output := readSlice[int](out)
assert.Equal(t, expected, output)
}
Expand Down
1 change: 1 addition & 0 deletions flow/session_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestSessionWindow_Ptr(t *testing.T) {
go func() {
source.
Via(sessionWindow).
Via(flow.NewPassThrough()). // Via coverage
To(sink)
}()

Expand Down
1 change: 1 addition & 0 deletions flow/sliding_window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func TestSlidingWindow_WithExtractorPtr(t *testing.T) {
go func() {
source.
Via(slidingWindow).
Via(flow.NewPassThrough()). // Via coverage
To(sink)
}()

Expand Down
8 changes: 4 additions & 4 deletions flow/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewThrottler(elements int, period time.Duration, bufferSize int, mode Throt
done: make(chan struct{}),
}
go throttler.resetQuotaCounterLoop()
go throttler.bufferize()
go throttler.buffer()

return throttler
}
Expand Down Expand Up @@ -95,9 +95,9 @@ func (th *Throttler) notifyQuotaReset() {
}
}

// bufferize starts buffering incoming elements.
// If an unsupported ThrottleMode was specified, bufferize will panic.
func (th *Throttler) bufferize() {
// buffer starts buffering incoming elements.
// If an unsupported ThrottleMode was specified, buffer will panic.
func (th *Throttler) buffer() {
switch th.mode {
case Discard:
for element := range th.in {
Expand Down
21 changes: 5 additions & 16 deletions flow/throttler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package flow_test

import (
"fmt"
"testing"
"time"

Expand All @@ -10,7 +9,7 @@ import (
"github.com/reugn/go-streams/internal/assert"
)

func TestThrottlerWithBackpressure(t *testing.T) {
func TestThrottler_WithBackpressure(t *testing.T) {
in := make(chan any)
out := make(chan any)

Expand All @@ -30,27 +29,22 @@ func TestThrottlerWithBackpressure(t *testing.T) {

outputValues := readValues(interval/2, out)
assert.Equal(t, []any{"a", "b"}, outputValues)
fmt.Println(outputValues)

outputValues = readValues(interval, out)
fmt.Println(outputValues)
assert.Equal(t, []any{"c", "d"}, outputValues)

outputValues = readValues(interval, out)
fmt.Println(outputValues)
assert.Equal(t, []any{"e", "f"}, outputValues)

outputValues = readValues(interval, out)
fmt.Println(outputValues)
assert.Equal(t, []any{"g"}, outputValues)

outputValues = readValues(interval, out)
fmt.Println(outputValues)
var empty []any
assert.Equal(t, empty, outputValues)
}

func TestThrottlerWithDiscard(t *testing.T) {
func TestThrottler_WithDiscard(t *testing.T) {
in := make(chan any, 7)
out := make(chan any, 7)

Expand All @@ -69,18 +63,15 @@ func TestThrottlerWithDiscard(t *testing.T) {

outputValues := readValues(interval/2, out)
assert.Equal(t, []any{"a", "b"}, outputValues)
fmt.Println(outputValues)

outputValues = readValues(interval, out)
fmt.Println(outputValues)
_ = readValues(interval, out)

outputValues = readValues(interval, out)
fmt.Println(outputValues)
var empty []any
assert.Equal(t, empty, outputValues)
}

func TestThrottlerNonPositiveElements(t *testing.T) {
func TestThrottler_NonPositiveElements(t *testing.T) {
assert.Panics(t, func() {
flow.NewThrottler(0, time.Second, 1, flow.Discard)
})
Expand All @@ -89,7 +80,7 @@ func TestThrottlerNonPositiveElements(t *testing.T) {
})
}

func TestThrottlerNonPositiveBufferSize(t *testing.T) {
func TestThrottler_NonPositiveBufferSize(t *testing.T) {
assert.Panics(t, func() {
flow.NewThrottler(1, time.Second, 0, flow.Backpressure)
})
Expand All @@ -102,7 +93,6 @@ func writeValues(in chan any) {
inputValues := []string{"a", "b", "c", "d", "e", "f", "g"}
ingestSlice(inputValues, in)
close(in)
fmt.Println("Closed input channel")
}

func readValues(timeout time.Duration, out <-chan any) []any {
Expand All @@ -114,7 +104,6 @@ func readValues(timeout time.Duration, out <-chan any) []any {
if e != nil {
outputValues = append(outputValues, e)
} else {
fmt.Println("Got nil in output")
timer.Stop()
return outputValues
}
Expand Down

0 comments on commit 3fa6be3

Please sign in to comment.