-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathconcurrency_test.go
135 lines (124 loc) · 3.63 KB
/
concurrency_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package quamina
import (
"fmt"
"math/rand"
"testing"
"time"
)
func updateTree(t *testing.T, m *coreMatcher, use37 bool, ch chan string) {
t.Helper()
var pattern string
var val string
if use37 {
//nolint:gosec
val = fmt.Sprintf("%f", 37.0+rand.Float64())
pattern = fmt.Sprintf(`{ "geometry": { "coordinates": [ %s ] } }`, val)
} else {
//nolint:gosec
val = fmt.Sprintf(`"%d"`, rand.Int())
pattern = fmt.Sprintf(`{ "properties": { "STREET": [ %s ] } }`, val)
/* TODO: alternate literal and shellstyle addition
val = fmt.Sprintf(`"*%d"`, rand.Int())
pattern = fmt.Sprintf(`{ "properties": { "STREET": [ {"shellstyle": %s } ] } }`, val)
*/
}
err := m.addPattern(val, pattern)
if err != nil {
t.Error("Concurrent: " + err.Error())
}
ch <- val
}
func TestConcurrency(t *testing.T) {
const UpdateLines = 250
// this is a cut/paste of TestCityLots, except for every few lines we add another pattern to the matcher,
// focusing on the fields that are being used by the patterns. The idea is to exercise concurrent
// update and use of the automaton
// I was initially surprised that adding 860 or so changes to the automaton while it's running doesn't seem to
// cause any decrease in performance. But I guess it splits out very cleanly onto another core and really
// doesn't steal any resources from the thread doing the Match calls
lines := getCityLotsLines(t)
patterns := []string{
`{ "properties": { "STREET": [ "CRANLEIGH" ] } }`,
`{ "properties": { "STREET": [ { "shellstyle": "B*K"} ] } }`,
`{ "properties": { "STREET": [ "17TH" ], "ODD_EVEN": [ "E"] } }`,
`{ "geometry": { "coordinates": [ 37.807807921694092 ] } }`,
`{ "properties": { "MAPBLKLOT": ["0011008"], "BLKLOT": ["0011008"]}, "geometry": { "coordinates": [ 37.807807921694092 ] } } `,
}
names := []string{
"CRANLEIGH",
"shellstyle",
"17TH Even",
"Geometry",
"0011008",
}
wanted := map[X]int{
"CRANLEIGH": 7,
"shellstyle": 746,
"17TH Even": 836,
"Geometry": 2,
"0011008": 1,
}
var err error
m := newCoreMatcher()
for i := range names {
err = m.addPattern(names[i], patterns[i])
if err != nil {
t.Error("addPattern: " + err.Error())
}
}
results := make(map[X]int)
use37 := true
lineCount := 0
before := time.Now()
ch := make(chan string, 1000)
sent := 0
for _, line := range lines {
matches, err := m.matchesForJSONEvent(line)
if err != nil {
t.Error("Matches4JSON: " + err.Error())
}
lineCount++
if lineCount%UpdateLines == 0 {
use37 = !use37
sent++
go updateTree(t, m, use37, ch)
}
for _, match := range matches {
count, ok := results[match]
if !ok {
count = 0
}
results[match] = count + 1
}
}
elapsed := float64(time.Since(before).Milliseconds())
perSecond := float64(lineCount) / (elapsed / 1000.0)
fmt.Printf("\n%.2f matches/second with updates\n\n", perSecond)
if len(results) != len(wanted) {
t.Errorf("got %d results, wanted %d", len(results), len(wanted))
}
for match, count := range results {
if count != wanted[match] {
t.Errorf("For %s, wanted=%d, result=%d", match, wanted[match], count)
}
}
// now we go back and make sure that all those addPattern calls actually made it into the matcher
for i := 0; i < sent; i++ {
val := <-ch
var event string
if val[0] == '"' {
event = fmt.Sprintf(`{"properties": { "STREET": %s} }`, val)
} else {
event = fmt.Sprintf(`{"geometry": { "coordinates": [ %s ] } }`, val)
}
var matches []X
matches, err = m.matchesForJSONEvent([]byte(event))
if err != nil {
t.Error("after concur: " + err.Error())
}
if len(matches) != 1 || matches[0] != val {
t.Error("problem with: " + val)
}
}
close(ch)
}