-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_bus.go
140 lines (116 loc) · 2.78 KB
/
message_bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package gopolling
//go:generate mockgen -destination=message_bus_mock_test.go -package=gopolling -source=message_bus.go
import (
"errors"
"github.com/orcaman/concurrent-map"
"github.com/rs/xid"
)
var (
ErrNotSubscriber = errors.New("no subscriber")
)
// A shortcut to define string map
type S map[string]string
// Message struct defines the payload which being sent through message bus from notifier to the waiting client
type Message struct {
Channel string
Data interface{}
Selector S
}
// Event struct is the payload being sent from client to the listening subscriber
type Event struct {
Channel string
Data interface{}
Selector S
}
type Subscription interface {
Receive() <-chan Message
}
func NewDefaultSubscription(channel, id string, ch chan Message) *DefaultSubscription {
return &DefaultSubscription{
Channel: channel,
ID: id,
ch: ch,
}
}
type DefaultSubscription struct {
ch chan Message
Channel string
ID string
}
func (g *DefaultSubscription) Receive() <-chan Message {
return g.ch
}
type PubSub interface {
Publish(string, Message) error
Subscribe(string) (Subscription, error)
Unsubscribe(Subscription) error
}
type EventQueue interface {
Enqueue(string, Event)
// A blocking method wait til receive task
Dequeue(string) <-chan Event
}
type MessageBus interface {
PubSub
EventQueue
Loggable
}
func newGoroutineBus() MessageBus {
return &GoroutineBus{
subscribers: cmap.New(),
queue: cmap.New(),
}
}
type GoroutineBus struct {
subscribers cmap.ConcurrentMap
queue cmap.ConcurrentMap
log Log
}
func (g *GoroutineBus) Unsubscribe(sub Subscription) error {
gsub := sub.(*DefaultSubscription)
if val, ok := g.subscribers.Get(gsub.Channel); ok {
subq := val.(*cmap.ConcurrentMap)
subq.Remove(gsub.ID)
}
return nil
}
func (g *GoroutineBus) SetLog(l Log) {
g.log = l
}
func (g *GoroutineBus) Publish(channel string, msg Message) error {
if val, ok := g.subscribers.Get(channel); ok {
subq := val.(*cmap.ConcurrentMap)
subq.IterCb(func(id string, v interface{}) {
ch := v.(chan Message)
ch <- msg
})
} else {
return ErrNotSubscriber
}
return nil
}
func (g *GoroutineBus) Subscribe(channel string) (Subscription, error) {
ch := make(chan Message)
id := xid.New().String()
sub := NewDefaultSubscription(channel, id, ch)
if val, ok := g.subscribers.Get(channel); ok {
subMap := val.(*cmap.ConcurrentMap)
subMap.Set(id, ch)
} else {
subMap := cmap.New()
subMap.Set(id, ch)
g.subscribers.Set(channel, &subMap)
}
return sub, nil
}
func (g *GoroutineBus) Enqueue(channel string, t Event) {
if val, ok := g.queue.Get(channel); ok {
ch := val.(chan Event)
ch <- t
}
}
func (g *GoroutineBus) Dequeue(channel string) <-chan Event {
ch := make(chan Event)
g.queue.Set(channel, ch)
return ch
}