From 93e4ed7a849a81f53cc4527b3b104299131bbc45 Mon Sep 17 00:00:00 2001 From: "light.zhou" Date: Sun, 24 Nov 2024 18:02:08 +0800 Subject: [PATCH] feat: es connection pool --- pkg/es/es.go | 58 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/pkg/es/es.go b/pkg/es/es.go index 4a53a99..36cb919 100644 --- a/pkg/es/es.go +++ b/pkg/es/es.go @@ -1,11 +1,13 @@ package es import ( + "net" "net/http" "strconv" "strings" "time" + "github.com/elastic/elastic-transport-go/v8/elastictransport" es8 "github.com/elastic/go-elasticsearch/v8" "github.com/zeromicro/go-zero/core/trace" "go.opentelemetry.io/otel" @@ -17,10 +19,13 @@ import ( type ( Config struct { - Addresses []string - Username string - Password string - MaxRetries int + Addresses []string + Username string + Password string + MaxRetries int + MaxIdleConns int // 全局最大空闲连接数 + MaxConnsPerHost int // 每主机最大连接数 + IdleConnTimeout time.Duration // 空闲连接超时时间 } Es struct { @@ -28,7 +33,9 @@ type ( } // esTransport is a transport for elasticsearch client - esTransport struct{} + esTransport struct { + baseTransport *http.Transport + } ) func (t *esTransport) RoundTrip(req *http.Request) (resp *http.Response, err error) { @@ -56,7 +63,8 @@ func (t *esTransport) RoundTrip(req *http.Request) (resp *http.Response, err err req = req.WithContext(ctx) propagator.Inject(ctx, propagation.HeaderCarrier(req.Header)) - resp, err = http.DefaultTransport.RoundTrip(req) + // 调用基础 Transport 执行请求 + resp, err = t.baseTransport.RoundTrip(req) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -71,12 +79,40 @@ func (t *esTransport) RoundTrip(req *http.Request) (resp *http.Response, err err } func NewEs(conf *Config) (*Es, error) { + transport := &http.Transport{ + MaxIdleConns: conf.MaxIdleConns, // 全局最大空闲连接数 + MaxIdleConnsPerHost: conf.MaxConnsPerHost, // 每主机最大空闲连接数 + MaxConnsPerHost: conf.MaxConnsPerHost, // 每主机最大连接数 + IdleConnTimeout: conf.IdleConnTimeout, // 空闲连接超时时间 + DialContext: (&net.Dialer{ + Timeout: 3 * time.Second, // 建立连接超时时间 + KeepAlive: time.Hour, // 保持活动连接的时间 + }).DialContext, + TLSHandshakeTimeout: 10 * time.Second, // TLS 握手超时时间 + } + + // 自定义连接池函数 + // 作用 + // 1. 多节点请求分发 + // 2. 负载均衡 + // 3. 故障节点管理 + customConnectionPoolFunc := func(addrs []*elastictransport.Connection, selector elastictransport.Selector) elastictransport.ConnectionPool { + // 使用 RoundRobinConnectionPool(轮询连接池) + cp, err := elastictransport.NewConnectionPool(addrs, selector) + if err != nil { + panic(err) + } + + return cp + } + c := es8.Config{ - Addresses: conf.Addresses, - Username: conf.Username, - Password: conf.Password, - MaxRetries: conf.MaxRetries, - Transport: &esTransport{}, + Addresses: conf.Addresses, + Username: conf.Username, + Password: conf.Password, + MaxRetries: conf.MaxRetries, + Transport: &esTransport{baseTransport: transport}, + ConnectionPoolFunc: customConnectionPoolFunc, } client, err := es8.NewClient(c)