-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpolling_manager.go
107 lines (92 loc) · 1.93 KB
/
polling_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package gopolling
import (
"context"
"errors"
"github.com/rs/xid"
"time"
)
var (
ErrTimeout = errors.New("request timeout")
ErrCancelled = errors.New("request cancelled")
defaultTimeout = 120 * time.Second
)
func NewPollingManager(adapter MessageBus, t time.Duration, queuePrefix, pubsubPrefix string) PollingManager {
timeout := defaultTimeout
if t != 0 {
timeout = t
}
return PollingManager{
bus: adapter,
timeout: timeout,
log: &NoOpLog{},
queuePrefix: queuePrefix,
pubsubPrefix: pubsubPrefix,
}
}
type PollingManager struct {
bus MessageBus
timeout time.Duration
log Log
queuePrefix string
pubsubPrefix string
}
const idKey = "_gopolling_id"
func (m *PollingManager) WaitForNotice(ctx context.Context, channel string, data interface{}, sel S) (interface{}, error) {
rChannel := m.pubsubPrefix + channel
sub, err := m.bus.Subscribe(rChannel)
if err != nil {
return nil, err
}
defer func() {
if err := m.bus.Unsubscribe(sub); err != nil {
m.log.Errorf("unsubscribe unsuccessful, error: ", err)
}
}()
tick := time.NewTicker(m.timeout)
defer tick.Stop()
// generate event ID
id := xid.New().String()
sel[idKey] = id
// enqueue event
qChan := m.queuePrefix + channel
tk := Event{
Channel: qChan,
Data: data,
Selector: sel,
}
m.bus.Enqueue(qChan, tk)
delete(sel, idKey)
wait:
select {
case <-ctx.Done():
return nil, context.Canceled
case <-tick.C:
return nil, ErrTimeout
case msg := <-sub.Receive():
// if msg is specified with event ID
if val, ok := msg.Selector[idKey]; ok {
if val == id {
return msg.Data, nil
} else {
goto wait
}
}
if !compareSelectors(msg.Selector, sel) {
goto wait
}
return msg.Data, nil
}
}
func compareSelectors(a, b S) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if val, ok := b[k]; ok && val == v {
continue
} else {
return false
}
}
return true
}