-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgopolling.go
129 lines (105 loc) · 2.88 KB
/
gopolling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package gopolling
import (
"context"
"strings"
"time"
)
const (
queuePrefix = "gpqueue_"
pubsubPrefix = "gppubsub_"
)
type Option struct {
// Retention period indicate the TTL time in second for the message, default 60s
Retention int
// Long polling client retention (default 120s)
Timeout time.Duration
// Message Bus
Bus MessageBus
// Message Buffer
Buffer MessageBuffer
// Logging implementation
Logger Log
PubSubPrefix string
QueuePrefix string
}
var DefaultOption = Option{
Retention: 600,
Timeout: 120 * time.Second,
Bus: newGoroutineBus(),
Buffer: newMemoryBuffer(),
PubSubPrefix: pubsubPrefix,
QueuePrefix: queuePrefix,
Logger: &NoOpLog{},
}
func buildOption(option Option) Option {
op := DefaultOption
if option.Timeout > 0 {
op.Timeout = option.Timeout
}
if option.Logger != nil {
op.Logger = option.Logger
}
if option.Bus != nil {
op.Bus = option.Bus
}
if option.Buffer != nil {
op.Buffer = option.Buffer
}
if option.Retention != 0 {
op.Retention = option.Retention
}
if len(strings.TrimSpace(option.QueuePrefix)) != 0 {
op.QueuePrefix = option.QueuePrefix
}
if len(strings.TrimSpace(option.PubSubPrefix)) != 0 {
op.PubSubPrefix = option.PubSubPrefix
}
return op
}
func New(opt Option) *GoPolling {
var gp GoPolling
option := buildOption(opt)
gp.pollingMgr = NewPollingManager(option.Bus, option.Timeout, option.QueuePrefix, option.PubSubPrefix)
gp.listenerMgr = NewListenerManager(option.Bus, option.QueuePrefix, option.PubSubPrefix)
gp.pollingMgr.log = option.Logger
gp.listenerMgr.log = option.Logger
gp.bus = option.Bus
gp.bus.SetLog(option.Logger)
gp.buffer = option.Buffer
gp.retention = option.Retention
gp.pubsubPrefix = option.PubSubPrefix
return &gp
}
type GoPolling struct {
bus MessageBus
pollingMgr PollingManager
listenerMgr ListenerManager
buffer MessageBuffer
retention int
pubsubPrefix string
}
func (g *GoPolling) WaitForNotice(ctx context.Context, channel string, data interface{}) (interface{}, error) {
return g.WaitForSelectedNotice(ctx, channel, data, S{})
}
func (g *GoPolling) WaitForSelectedNotice(ctx context.Context, channel string, data interface{}, selector S) (interface{}, error) {
if g.buffer != nil {
key := bufferKey{channel, selector}
hashedKey := getKeyHash(key)
if val, ok := g.buffer.Find(hashedKey); ok {
return val.Data, nil
}
}
return g.pollingMgr.WaitForNotice(ctx, channel, data, selector)
}
func (g *GoPolling) SubscribeListener(roomID string, lf ListenerFunc) {
g.listenerMgr.Subscribe(roomID, lf)
}
func (g *GoPolling) Notify(channel string, data interface{}, selector S) error {
msg := Message{g.pubsubPrefix + channel, data, selector}
if g.buffer != nil {
key := bufferKey{channel, selector}
hashedKey := getKeyHash(key)
g.buffer.Save(hashedKey, msg, g.retention)
}
return g.bus.Publish(g.pubsubPrefix+channel, msg)
}