forked from couchbase/gocbcore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmemdpipelineclient.go
301 lines (247 loc) · 9.33 KB
/
memdpipelineclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package gocbcore
import (
"errors"
"io"
"sync"
"sync/atomic"
)
type clientWait struct {
client *memdClient
err error
}
type memdPipelineClient struct {
parent *memdPipeline
address string
client *memdClient
consumer *memdOpConsumer
lock sync.Mutex
closedSig chan struct{}
clientTakenSig chan struct{}
cancelDialSig chan struct{}
state uint32
connectError error
}
func newMemdPipelineClient(parent *memdPipeline) *memdPipelineClient {
return &memdPipelineClient{
parent: parent,
address: parent.address,
closedSig: make(chan struct{}),
clientTakenSig: make(chan struct{}),
cancelDialSig: make(chan struct{}),
state: uint32(EndpointStateDisconnected),
}
}
func (pipecli *memdPipelineClient) State() EndpointState {
return EndpointState(atomic.LoadUint32(&pipecli.state))
}
func (pipecli *memdPipelineClient) Error() error {
pipecli.lock.Lock()
defer pipecli.lock.Unlock()
return pipecli.connectError
}
func (pipecli *memdPipelineClient) ReassignTo(parent *memdPipeline) {
pipecli.lock.Lock()
pipecli.parent = parent
oldConsumer := pipecli.consumer
pipecli.consumer = nil
pipecli.lock.Unlock()
if oldConsumer != nil {
oldConsumer.Close()
}
}
func (pipecli *memdPipelineClient) ioLoop(client *memdClient) {
pipecli.lock.Lock()
if pipecli.parent == nil {
logDebugf("Pipeline client ioLoop started with no parent pipeline")
pipecli.lock.Unlock()
err := client.Close()
if err != nil {
logErrorf("Failed to close client for shut down ioLoop (%s)", err)
}
return
}
pipecli.client = client
pipecli.lock.Unlock()
killSig := make(chan struct{})
// This goroutine is responsible for monitoring the client and handling
// the cleanup whenever it shuts down. All cases of the client being
// shut down flow through this goroutine, even cases where we may already
// be aware that the client is shutdown, outside this scope.
go func() {
logDebugf("Pipeline client `%s/%p` client watcher starting...", pipecli.address, pipecli)
select {
case <-client.CloseNotify():
logDebugf("Pipeline client `%s/%p` client died", pipecli.address, pipecli)
case <-pipecli.clientTakenSig:
logDebugf("Pipeline client `%s/%p` client taken", pipecli.address, pipecli)
}
pipecli.lock.Lock()
pipecli.client = nil
activeConsumer := pipecli.consumer
pipecli.consumer = nil
pipecli.lock.Unlock()
logDebugf("Pipeline client `%s/%p` closing consumer %p", pipecli.address, pipecli, activeConsumer)
// If we have a consumer, we need to close it to signal the loop below that
// something has happened. If there is no consumer, we don't need to signal
// as the loop below will already be in the process of fetching a new one,
// where it will inevitably detect the problem.
if activeConsumer != nil {
activeConsumer.Close()
}
killSig <- struct{}{}
}()
logDebugf("Pipeline client `%s/%p` IO loop starting...", pipecli.address, pipecli)
var localConsumer *memdOpConsumer
for {
if localConsumer == nil {
logDebugf("Pipeline client `%s/%p` fetching new consumer", pipecli.address, pipecli)
pipecli.lock.Lock()
if pipecli.consumer != nil {
// If we still have an active consumer, lets close it to make room for the new one
pipecli.consumer.Close()
pipecli.consumer = nil
}
if pipecli.client == nil {
// The client has disconnected from the server, this only occurs AFTER the watcher
// goroutine running above has detected the client is closed and has cleaned it up.
pipecli.lock.Unlock()
break
}
if pipecli.parent == nil {
// This pipelineClient has been shut down
logDebugf("Pipeline client `%s/%p` found no parent pipeline", pipecli.address, pipecli)
pipecli.lock.Unlock()
break
}
// Fetch a new consumer to use for this iteration
localConsumer = pipecli.parent.queue.Consumer()
pipecli.consumer = localConsumer
pipecli.lock.Unlock()
}
req := localConsumer.Pop()
if req == nil {
// Set the local consumer to null, this will force our normal logic to run
// which will clean up the original consumer and then attempt to acquire a
// new one if we are not being cleaned up. This is a minor code-optimization
// to avoid having to do a lock/unlock just to lock above anyways. It does
// have the downside of not being able to detect where we've looped around
// in error though.
localConsumer = nil
continue
}
err := client.SendRequest(req)
if err != nil {
logDebugf("Pipeline client `%s/%p` encountered a socket write error: %v", pipecli.address, pipecli, err)
if !errors.Is(err, io.EOF) && !errors.Is(err, ErrMemdClientClosed) {
// If we errored the write, and the client was not already closed,
// lets go ahead and close it. This will trigger the shutdown
// logic via the client watcher above. If the socket error was EOF
// we already did shut down, and the watcher should already be
// cleaning up. If the error was ErrMemdClientClosed then client either
// did shutdown or is gracefully shutting down.
err := client.Close()
if err != nil {
logErrorf("Pipeline client `%s/%p` failed to shut down errored client socket (%s)", pipecli.address, pipecli, err)
}
}
// Send this request upwards to be processed by the higher level processor
shortCircuited, routeErr := client.postErrHandler(nil, req, err)
if !shortCircuited {
client.CancelRequest(req, err)
req.tryCallback(nil, routeErr)
break
}
// Stop looping
break
}
}
atomic.StoreUint32(&pipecli.state, uint32(EndpointStateDisconnecting))
// We must wait for the close wait goroutine to die as well before we can continue.
<-killSig
logDebugf("Pipeline client `%s/%p` received client shutdown notification", pipecli.address, pipecli)
}
func (pipecli *memdPipelineClient) Run() {
for {
logDebugf("Pipeline Client `%s/%p` preparing for new client loop", pipecli.address, pipecli)
atomic.StoreUint32(&pipecli.state, uint32(EndpointStateConnecting))
pipecli.lock.Lock()
pipeline := pipecli.parent
pipecli.lock.Unlock()
if pipeline == nil {
// If our pipeline is nil, it indicates that we need to shut down.
logDebugf("Pipeline Client `%s/%p` is shutting down", pipecli.address, pipecli)
break
}
logDebugf("Pipeline Client `%s/%p` retrieving new client connection for parent %p", pipecli.address, pipecli, pipeline)
wait := make(chan clientWait, 1)
go func() {
client, err := pipeline.getClientFn(pipecli.cancelDialSig)
wait <- clientWait{
client: client,
err: err,
}
}()
cli := <-wait
if cli.err != nil {
atomic.StoreUint32(&pipecli.state, uint32(EndpointStateDisconnected))
pipecli.lock.Lock()
if pipecli.parent != nil {
// If we know that we're shutting then don't log the error, it isn't unexpected.
logWarnf("Pipeline Client %p failed to bootstrap: %s", pipecli, cli.err)
}
pipecli.connectError = cli.err
pipecli.lock.Unlock()
continue
}
pipecli.lock.Lock()
pipecli.connectError = nil
pipecli.lock.Unlock()
atomic.StoreUint32(&pipecli.state, uint32(EndpointStateConnected))
// Runs until the connection has died (for whatever reason)
logDebugf("Pipeline Client `%s/%p` starting new client loop for %p", pipecli.address, pipecli, cli.client)
pipecli.ioLoop(cli.client)
}
// Lets notify anyone who is watching that we are now shut down
close(pipecli.closedSig)
}
// CloseAndTakeClient will close this pipeline client, yielding the memdClient. Note that this method will not wait for
// everything to be cleaned up before returning.
func (pipecli *memdPipelineClient) CloseAndTakeClient() *memdClient {
logDebugf("Pipeline Client `%s/%p` received close request", pipecli.address, pipecli)
atomic.StoreUint32(&pipecli.state, uint32(EndpointStateDisconnecting))
// To shut down the client, we remove our reference to the parent. This
// causes our ioLoop see that we are being shut down and perform cleanup
// before exiting.
pipecli.lock.Lock()
pipecli.parent = nil
activeConsumer := pipecli.consumer
pipecli.consumer = nil
client := pipecli.client
pipecli.client = nil
pipecli.lock.Unlock()
close(pipecli.clientTakenSig)
logDebugf("Pipeline client `%s/%p` closing consumer %p", pipecli.address, pipecli, activeConsumer)
// If we have a consumer, we need to close it to signal the loop below that
// something has happened. If there is no consumer, we don't need to signal
// as the loop below will already be in the process of fetching a new one,
// where it will inevitably detect the problem.
if activeConsumer != nil {
activeConsumer.Close()
}
// We might be currently waiting for a new client to be dialled, in which we need to abandon that wait so that
// it does not block our shutdown.
close(pipecli.cancelDialSig)
// If we have an active consumer, we need to close it to cause the running
// ioLoop to unpause and pick up that our parent has been removed. Note
// that in some cases, we might not have an active consumer. This means
// that the ioLoop is about to try and fetch one, finding the missing
// parent in doing so.
if activeConsumer != nil {
activeConsumer.Close()
}
// Lets wait till the ioLoop has shut everything down before returning.
<-pipecli.closedSig
atomic.StoreUint32(&pipecli.state, uint32(EndpointStateDisconnected))
logDebugf("Pipeline Client `%s/%p` has exited", pipecli.address, pipecli)
return client
}