Skip to content

Commit

Permalink
Merge pull request #48 from zhoushuguang/feat/es-conn-pool
Browse files Browse the repository at this point in the history
feat: es connection pool
  • Loading branch information
zhoushuguang authored Nov 24, 2024
2 parents c212d88 + 93e4ed7 commit f096205
Showing 1 changed file with 47 additions and 11 deletions.
58 changes: 47 additions & 11 deletions pkg/es/es.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package es

import (
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/elastic/elastic-transport-go/v8/elastictransport"
es8 "github.com/elastic/go-elasticsearch/v8"
"github.com/zeromicro/go-zero/core/trace"
"go.opentelemetry.io/otel"
Expand All @@ -17,18 +19,23 @@ import (

type (
Config struct {
Addresses []string
Username string
Password string
MaxRetries int
Addresses []string
Username string
Password string
MaxRetries int
MaxIdleConns int // 全局最大空闲连接数
MaxConnsPerHost int // 每主机最大连接数
IdleConnTimeout time.Duration // 空闲连接超时时间
}

Es struct {
*es8.Client
}

// esTransport is a transport for elasticsearch client
esTransport struct{}
esTransport struct {
baseTransport *http.Transport
}
)

func (t *esTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
Expand Down Expand Up @@ -56,7 +63,8 @@ func (t *esTransport) RoundTrip(req *http.Request) (resp *http.Response, err err
req = req.WithContext(ctx)
propagator.Inject(ctx, propagation.HeaderCarrier(req.Header))

resp, err = http.DefaultTransport.RoundTrip(req)
// 调用基础 Transport 执行请求
resp, err = t.baseTransport.RoundTrip(req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -71,12 +79,40 @@ func (t *esTransport) RoundTrip(req *http.Request) (resp *http.Response, err err
}

func NewEs(conf *Config) (*Es, error) {
transport := &http.Transport{
MaxIdleConns: conf.MaxIdleConns, // 全局最大空闲连接数
MaxIdleConnsPerHost: conf.MaxConnsPerHost, // 每主机最大空闲连接数
MaxConnsPerHost: conf.MaxConnsPerHost, // 每主机最大连接数
IdleConnTimeout: conf.IdleConnTimeout, // 空闲连接超时时间
DialContext: (&net.Dialer{
Timeout: 3 * time.Second, // 建立连接超时时间
KeepAlive: time.Hour, // 保持活动连接的时间
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second, // TLS 握手超时时间
}

// 自定义连接池函数
// 作用
// 1. 多节点请求分发
// 2. 负载均衡
// 3. 故障节点管理
customConnectionPoolFunc := func(addrs []*elastictransport.Connection, selector elastictransport.Selector) elastictransport.ConnectionPool {
// 使用 RoundRobinConnectionPool(轮询连接池)
cp, err := elastictransport.NewConnectionPool(addrs, selector)
if err != nil {
panic(err)
}

return cp
}

c := es8.Config{
Addresses: conf.Addresses,
Username: conf.Username,
Password: conf.Password,
MaxRetries: conf.MaxRetries,
Transport: &esTransport{},
Addresses: conf.Addresses,
Username: conf.Username,
Password: conf.Password,
MaxRetries: conf.MaxRetries,
Transport: &esTransport{baseTransport: transport},
ConnectionPoolFunc: customConnectionPoolFunc,
}

client, err := es8.NewClient(c)
Expand Down

0 comments on commit f096205

Please sign in to comment.