forked from couchbase/gocbcore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfigmanagement_component.go
207 lines (174 loc) · 5.89 KB
/
configmanagement_component.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package gocbcore
import (
"sync"
"sync/atomic"
)
type configManagementComponent struct {
useSSL uint32
networkType string
noTLSSeedNode bool
currentConfig *routeConfig
cfgChangeWatchers []routeConfigWatcher
watchersLock sync.Mutex
srcServers []routeEndpoint
seenConfig bool
}
type configManagerProperties struct {
UseTLS bool
NoTLSSeedNode bool
NetworkType string
SrcMemdAddrs []routeEndpoint
SrcHTTPAddrs []routeEndpoint
}
type routeConfigWatcher interface {
OnNewRouteConfig(cfg *routeConfig)
}
type configManager interface {
AddConfigWatcher(watcher routeConfigWatcher)
RemoveConfigWatcher(watcher routeConfigWatcher)
}
func newConfigManager(props configManagerProperties) *configManagementComponent {
useSSL := uint32(0)
if props.UseTLS {
useSSL = 1
}
return &configManagementComponent{
useSSL: useSSL,
noTLSSeedNode: props.NoTLSSeedNode,
networkType: props.NetworkType,
srcServers: append(props.SrcMemdAddrs, props.SrcHTTPAddrs...),
currentConfig: &routeConfig{
revID: -1,
},
}
}
func (cm *configManagementComponent) UseTLS(use bool) {
useTLS := uint32(0)
if use {
useTLS = 1
}
atomic.StoreUint32(&cm.useSSL, useTLS)
}
func (cm *configManagementComponent) OnNewConfig(cfg *cfgBucket) {
var routeCfg *routeConfig
useSSL := atomic.LoadUint32(&cm.useSSL) == 1
if cm.seenConfig {
routeCfg = cfg.BuildRouteConfig(useSSL, cm.networkType, false, cm.noTLSSeedNode)
} else {
routeCfg = cm.buildFirstRouteConfig(cfg, useSSL)
logDebugf("Using network type %s for connections", cm.networkType)
}
if !routeCfg.IsValid() {
logDebugf("Routing data is not valid, skipping update: \n%s", routeCfg.DebugString())
return
}
// There's something wrong with this route config so don't send it to the watchers.
if !cm.updateRouteConfig(routeCfg) {
return
}
cm.currentConfig = routeCfg
cm.seenConfig = true
logDebugf("Sending out mux routing data (update)...")
logDebugf("New Routing Data:\n%s", routeCfg.DebugString())
// We can end up deadlocking if we iterate whilst in the lock and a watcher decides to remove itself.
cm.watchersLock.Lock()
watchers := make([]routeConfigWatcher, len(cm.cfgChangeWatchers))
copy(watchers, cm.cfgChangeWatchers)
cm.watchersLock.Unlock()
for _, watcher := range watchers {
watcher.OnNewRouteConfig(routeCfg)
}
}
func (cm *configManagementComponent) AddConfigWatcher(watcher routeConfigWatcher) {
cm.watchersLock.Lock()
cm.cfgChangeWatchers = append(cm.cfgChangeWatchers, watcher)
cm.watchersLock.Unlock()
}
func (cm *configManagementComponent) RemoveConfigWatcher(watcher routeConfigWatcher) {
var idx int
cm.watchersLock.Lock()
for i, w := range cm.cfgChangeWatchers {
if w == watcher {
idx = i
}
}
if idx == len(cm.cfgChangeWatchers) {
cm.cfgChangeWatchers = cm.cfgChangeWatchers[:idx]
} else {
cm.cfgChangeWatchers = append(cm.cfgChangeWatchers[:idx], cm.cfgChangeWatchers[idx+1:]...)
}
cm.watchersLock.Unlock()
}
// We should never be receiving concurrent updates and nothing should be accessing
// our internal route config so we shouldn't need to lock here.
func (cm *configManagementComponent) updateRouteConfig(cfg *routeConfig) bool {
oldCfg := cm.currentConfig
// Check some basic things to ensure consistency!
if oldCfg.revID > -1 {
if (cfg.vbMap == nil) != (oldCfg.vbMap == nil) {
logErrorf("Received a configuration with a different number of vbuckets. Ignoring.")
return false
}
if cfg.vbMap != nil && cfg.vbMap.NumVbuckets() != oldCfg.vbMap.NumVbuckets() {
logErrorf("Received a configuration with a different number of vbuckets. Ignoring.")
return false
}
}
// Check that the new config data is newer than the current one, in the case where we've done a select bucket
// against an existing connection then the revisions could be the same. In that case the configuration still
// needs to be applied.
// In the case where the rev epochs are the same then we need to compare rev IDs. If the new config epoch is lower
// than the old one then we ignore it, if it's newer then we apply the new config.
if cfg.bktType != oldCfg.bktType {
logDebugf("Configuration data changed bucket type, switching.")
} else if !cfg.IsNewerThan(oldCfg) {
return false
}
return true
}
func (cm *configManagementComponent) buildFirstRouteConfig(config *cfgBucket, useSSL bool) *routeConfig {
if cm.networkType != "" && cm.networkType != "auto" {
return config.BuildRouteConfig(useSSL, cm.networkType, true, cm.noTLSSeedNode)
}
defaultRouteConfig := config.BuildRouteConfig(useSSL, "default", true, cm.noTLSSeedNode)
var kvServerList []routeEndpoint
var mgmtEpList []routeEndpoint
if useSSL {
kvServerList = defaultRouteConfig.kvServerList.SSLEndpoints
mgmtEpList = defaultRouteConfig.mgmtEpList.SSLEndpoints
} else {
kvServerList = defaultRouteConfig.kvServerList.NonSSLEndpoints
mgmtEpList = defaultRouteConfig.mgmtEpList.NonSSLEndpoints
}
// Iterate over all the source servers and check if any addresses match as default or external network types
for _, srcServer := range cm.srcServers {
// First we check if the source server is from the defaults list
srcInDefaultConfig := false
for _, endpoint := range kvServerList {
if trimSchemePrefix(endpoint.Address) == srcServer.Address {
srcInDefaultConfig = true
}
}
for _, endpoint := range mgmtEpList {
if endpoint == srcServer {
srcInDefaultConfig = true
}
}
if srcInDefaultConfig {
cm.networkType = "default"
return defaultRouteConfig
}
}
// Next lets see if we have an external config, if so, default to that
externalRouteCfg := config.BuildRouteConfig(useSSL, "external", true, cm.noTLSSeedNode)
if externalRouteCfg.IsValid() {
cm.networkType = "external"
return externalRouteCfg
}
// If all else fails, default to the implicit default config
cm.networkType = "default"
return defaultRouteConfig
}
func (cm *configManagementComponent) NetworkType() string {
return cm.networkType
}