-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpool.go
167 lines (142 loc) · 3.25 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package easycall
import (
"errors"
"io"
"sync/atomic"
"time"
"github.com/starjiang/elog"
)
type Poolable interface {
io.Closer
GetActiveTime() time.Time
IsClose() bool
}
type factory func() (Poolable, error)
type Pool interface {
Acquire() (Poolable, error) // 获取资源
Release(Poolable) error // 释放资源
Close(Poolable) error // 关闭资源
Shutdown() error // 关闭池
}
type GenericPool struct {
poolChan chan Poolable
maxSize int32 // 池中最大资源数
curSize int32 // 当前池中资源数
minSize int32 // 池中最少资源数
shutdown bool // 池是否已关闭
lifetime time.Duration
connFactory factory // 创建连接的方法
}
func NewGenericPool(minSize int32, maxSize int32, lifetime time.Duration, connFactory factory) *GenericPool {
if maxSize <= 0 || minSize > maxSize {
maxSize = minSize
}
pool := &GenericPool{
maxSize: maxSize,
minSize: minSize,
lifetime: lifetime,
curSize: 0,
connFactory: connFactory,
poolChan: make(chan Poolable, maxSize*2),
}
for i := 0; i < int(minSize); i++ {
conn, err := connFactory()
if err != nil {
elog.Error(err)
continue
}
atomic.AddInt32(&pool.curSize, 1)
pool.poolChan <- conn
}
return pool
}
func (pool *GenericPool) Acquire() (Poolable, error) {
if pool.shutdown {
return nil, errors.New("pool have been shutdown")
}
for {
conn, err := pool.getOrCreate()
if err != nil {
return nil, err
}
if conn.IsClose() {
atomic.AddInt32(&pool.curSize, -1)
continue
}
// 如果设置了超时且当前连接的活跃时间+超时时间早于现在,则当前连接已过期
if pool.lifetime > 0 && conn.GetActiveTime().Add(time.Duration(pool.lifetime)).Before(time.Now()) {
pool.Close(conn)
continue
}
return conn, nil
}
}
func (pool *GenericPool) getOrCreate() (Poolable, error) {
if pool.shutdown {
return nil, errors.New("pool have been shutdown")
}
select {
case conn := <-pool.poolChan:
return conn, nil
default:
}
if pool.curSize >= pool.maxSize {
select {
case conn := <-pool.poolChan:
return conn, nil
case <-time.After(time.Second * POOL_MAX_WAIT_TIME):
}
return nil, errors.New("no connection available")
}
// 新建连接
conn, err := pool.connFactory()
if err != nil {
return nil, err
}
atomic.AddInt32(&pool.curSize, 1)
return conn, nil
}
// 释放单个资源到连接池
func (p *GenericPool) Release(conn Poolable) error {
if p.shutdown {
return errors.New("pool have benn shutdown")
}
p.poolChan <- conn
return nil
}
// 关闭单个资源
func (pool *GenericPool) Close(conn Poolable) error {
conn.Close()
atomic.AddInt32(&pool.curSize, -1)
return nil
}
func (pool *GenericPool) IsShutDown() bool {
return pool.shutdown
}
func (pool *GenericPool) CloseAll() {
if pool.shutdown {
return
}
for {
select {
case conn := <-pool.poolChan:
conn.Close()
atomic.AddInt32(&pool.curSize, -1)
default:
return
}
}
}
// 关闭连接池,释放所有资源
func (pool *GenericPool) Shutdown() error {
if pool.shutdown {
return errors.New("pool have been shutdown")
}
pool.shutdown = true
close(pool.poolChan)
for conn := range pool.poolChan {
conn.Close()
atomic.AddInt32(&pool.curSize, -1)
}
return nil
}