forked from cyfdecyf/cow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn_pool.go
208 lines (183 loc) · 5.21 KB
/
conn_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// Share server connections between different clients.
package main
import (
"sync"
"time"
)
// Maximum number of connections to a server.
const maxServerConnCnt = 5
// Store each server's connections in separate channels, getting
// connections for different servers can be done in parallel.
type ConnPool struct {
idleConn map[string]chan *serverConn
muxConn chan *serverConn // connections support multiplexing
sync.RWMutex
}
var connPool = &ConnPool{
idleConn: map[string]chan *serverConn{},
muxConn: make(chan *serverConn, maxServerConnCnt*2),
}
const muxConnHostPort = "@muxConn"
func init() {
// make sure hostPort here won't match any actual hostPort
go closeStaleServerConn(connPool.muxConn, muxConnHostPort)
}
func getConnFromChan(ch chan *serverConn) (sv *serverConn) {
for {
select {
case sv = <-ch:
if sv.mayBeClosed() {
sv.Close()
continue
}
return sv
default:
return nil
}
}
}
func putConnToChan(sv *serverConn, ch chan *serverConn, chname string) {
select {
case ch <- sv:
debug.Printf("connPool channel %s: put conn\n", chname)
return
default:
// Simply close the connection if can't put into channel immediately.
// A better solution would remove old connections from the channel and
// add the new one. But's it's more complicated and this should happen
// rarely.
debug.Printf("connPool channel %s: full", chname)
sv.Close()
}
}
func (cp *ConnPool) Get(hostPort string, direct bool) (sv *serverConn) {
// Get from site specific connection first.
// Direct connection are all site specific, so must use site specific
// first to avoid using parent proxy for direct sites.
cp.RLock()
ch := cp.idleConn[hostPort]
cp.RUnlock()
if ch != nil {
sv = getConnFromChan(ch)
}
if sv != nil {
debug.Printf("connPool %s: get conn\n", hostPort)
return sv
}
// All mulplexing connections are for blocked sites,
// so for direct sites we should stop here.
if direct {
return nil
}
sv = getConnFromChan(cp.muxConn)
if bool(debug) && sv != nil {
debug.Println("connPool mux: get conn", hostPort)
}
return sv
}
func (cp *ConnPool) Put(sv *serverConn) {
// Multiplexing connections.
switch sv.Conn.(type) {
case httpConn, meowConn:
putConnToChan(sv, cp.muxConn, "muxConn")
return
}
// Site specific connections.
cp.RLock()
ch := cp.idleConn[sv.hostPort]
cp.RUnlock()
if ch == nil {
debug.Printf("connPool %s: new channel\n", sv.hostPort)
ch = make(chan *serverConn, maxServerConnCnt)
ch <- sv
cp.Lock()
cp.idleConn[sv.hostPort] = ch
cp.Unlock()
// start a new goroutine to close stale server connections
go closeStaleServerConn(ch, sv.hostPort)
} else {
putConnToChan(sv, ch, sv.hostPort)
}
}
type chanInPool struct {
hostPort string
ch chan *serverConn
}
func (cp *ConnPool) CloseAll() {
debug.Println("connPool: close all server connections")
// Because closeServerConn may acquire connPool.Lock, we first collect all
// channel, and close server connection for each one.
var connCh []chanInPool
cp.RLock()
for hostPort, ch := range cp.idleConn {
connCh = append(connCh, chanInPool{hostPort, ch})
}
cp.RUnlock()
for _, hc := range connCh {
closeServerConn(hc.ch, hc.hostPort, true)
}
closeServerConn(cp.muxConn, muxConnHostPort, true)
}
func closeServerConn(ch chan *serverConn, hostPort string, force bool) (done bool) {
// If force is true, close all idle connection even if it maybe open.
lcnt := len(ch)
if lcnt == 0 {
// Execute the loop at least once.
lcnt = 1
}
for i := 0; i < lcnt; i++ {
select {
case sv := <-ch:
if force || sv.mayBeClosed() {
debug.Printf("connPool channel %s: close one conn\n", hostPort)
sv.Close()
} else {
// Put it back and wait.
debug.Printf("connPool channel %s: put back conn\n", hostPort)
ch <- sv
}
default:
if hostPort != muxConnHostPort {
// No more connection in this channel, remove the channel from
// the map.
debug.Printf("connPool channel %s: remove\n", hostPort)
connPool.Lock()
delete(connPool.idleConn, hostPort)
connPool.Unlock()
}
return true
}
}
return false
}
func closeStaleServerConn(ch chan *serverConn, hostPort string) {
// Tricky here. When removing a channel from the map, there maybe
// goroutines doing Put and Get using that channel.
// For Get, there's no problem because it will return immediately.
// For Put, it's possible that a new connection is added to the
// channel, but the channel is no longer in the map.
// So after removed the channel from the map, we wait for several seconds
// and then close all connections left in it.
// It's possible that Put add the connection after the final wait, but
// that should not happen in practice, and the worst result is just lost
// some memory and open fd.
for {
time.Sleep(5 * time.Second)
if done := closeServerConn(ch, hostPort, false); done {
break
}
}
// Final wait and then close all left connections. In practice, there
// should be no other goroutines holding reference to the channel.
time.Sleep(2 * time.Second)
for {
select {
case sv := <-ch:
debug.Printf("connPool channel %s: close conn after removed\n", hostPort)
sv.Close()
default:
debug.Printf("connPool channel %s: cleanup done\n", hostPort)
return
}
}
}