Skip to content

Commit

Permalink
feat(elasticsearch): support v8
Browse files Browse the repository at this point in the history
  • Loading branch information
NexZhu committed Feb 21, 2024
1 parent 1b450df commit fe4f80c
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 10 deletions.
11 changes: 9 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ require (
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/elastic/go-elasticsearch/v8 v8.12.0
github.com/fatih/structs v1.1.0
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0
github.com/gofrs/uuid v4.0.0+incompatible
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.5.9
github.com/google/go-cmp v0.6.0
github.com/gorilla/mux v1.8.0
github.com/influxdata/influxdb v1.8.10
github.com/influxdata/influxdb-client-go/v2 v2.6.0
Expand Down Expand Up @@ -73,6 +74,9 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/elastic/elastic-transport-go/v8 v8.4.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/golang/snappy v0.0.3 // indirect
Expand Down Expand Up @@ -119,10 +123,13 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.mongodb.org/mongo-driver v1.13.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
Expand Down
25 changes: 21 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/elastic/elastic-transport-go/v8 v8.4.0 h1:EKYiH8CHd33BmMna2Bos1rDNMM89+hdgcymI+KzJCGE=
github.com/elastic/elastic-transport-go/v8 v8.4.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v8 v8.12.0 h1:krkiCf4peJa7bZwGegy01b5xWWaYpik78wvisTeRO1U=
github.com/elastic/go-elasticsearch/v8 v8.12.0/go.mod h1:wSzJYrrKPZQ8qPuqAqc6KMR4HrBfHnZORvyL+FMFqq0=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand All @@ -162,6 +166,11 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
Expand Down Expand Up @@ -219,8 +228,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand Down Expand Up @@ -543,6 +552,14 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo=
go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4=
go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc=
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -681,8 +698,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
143 changes: 139 additions & 4 deletions pumps/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package pumps

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"time"

elasticv8 "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/mitchellh/mapstructure"
elasticv7 "github.com/olivere/elastic/v7"
elasticv3 "gopkg.in/olivere/elastic.v3"
Expand Down Expand Up @@ -126,6 +130,13 @@ type Elasticsearch7Operator struct {
log *logrus.Entry
}

type Elasticsearch8Operator struct {
conf *ElasticsearchConf
esClient *elasticv8.Client
bulkIndexer esutil.BulkIndexer
log *logrus.Entry
}

type ApiKeyTransport struct {
APIKey string
APIKeyID string
Expand All @@ -147,11 +158,13 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {

urls := strings.Split(conf.ElasticsearchURL, ",")

var httpTransport http.RoundTripper = nil
httpClient := http.DefaultClient
if conf.AuthAPIKey != "" && conf.AuthAPIKeyID != "" {
conf.Username = ""
conf.Password = ""
httpClient = &http.Client{Transport: &ApiKeyTransport{APIKey: conf.AuthAPIKey, APIKeyID: conf.AuthAPIKeyID}}
httpTransport = &ApiKeyTransport{APIKey: conf.AuthAPIKey, APIKeyID: conf.AuthAPIKeyID}
httpClient = &http.Client{Transport: httpTransport}
}

if conf.UseSSL {
Expand All @@ -160,7 +173,8 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {
e.log.WithError(err).Error("Failed to get TLS config")
return nil, err
}
httpClient = &http.Client{Transport: &http.Transport{TLSClientConfig: tlsConf}}
httpTransport = &http.Transport{TLSClientConfig: tlsConf}
httpClient = &http.Client{Transport: httpTransport}
}

switch conf.Version {
Expand Down Expand Up @@ -310,6 +324,36 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {
}

op.bulkProcessor, err = p.Do(context.Background())
op.log = e.log
return op, err
case "8":
op := &Elasticsearch8Operator{
conf: &conf,
}

cfg := elasticv8.Config{
Addresses: urls,
}
if conf.Username != "" || conf.Password != "" {
cfg.Username = conf.Username
cfg.Password = conf.Password
}
if httpTransport != nil {
cfg.Transport = httpTransport
}

op.esClient, err = elasticv8.NewClient(cfg)

if err != nil {
return op, err
}

op.bulkIndexer, err = setupElasticsearch8BulkIndexer(op)

if err != nil {
return op, err
}

op.log = e.log
return op, err
default:
Expand All @@ -320,6 +364,28 @@ func (e *ElasticsearchPump) getOperator() (ElasticsearchOperator, error) {
return nil, err
}

func setupElasticsearch8BulkIndexer(op *Elasticsearch8Operator) (esutil.BulkIndexer, error) {
// Setup a bulk indexer
bulkCfg := esutil.BulkIndexerConfig{
Index: getIndexName(op.conf),
Client: op.esClient,
}

if op.conf.BulkConfig.Workers != 0 {
bulkCfg.NumWorkers = op.conf.BulkConfig.Workers
}

if op.conf.BulkConfig.FlushInterval != 0 {
bulkCfg.FlushInterval = time.Duration(op.conf.BulkConfig.FlushInterval) * time.Second
}

// op.conf.BulkConfig.BulkActions not supported

// op.conf.BulkConfig.BulkSize not supported

return esutil.NewBulkIndexer(bulkCfg)
}

func (e *ElasticsearchPump) New() Pump {
newPump := ElasticsearchPump{}
return &newPump
Expand Down Expand Up @@ -360,9 +426,9 @@ func (e *ElasticsearchPump) Init(config interface{}) error {
case "":
e.esConf.Version = "3"
log.Info("Version not specified, defaulting to 3. If you are importing to Elasticsearch 5, please specify \"version\" = \"5\"")
case "3", "5", "6", "7":
case "3", "5", "6", "7", "8":
default:
err := errors.New("Only 3, 5, 6, 7 are valid values for this field")
err := errors.New("Only 3, 5, 6, 7, 8 are valid values for this field")
e.log.Fatal("Invalid version: ", err)
}

Expand Down Expand Up @@ -643,6 +709,75 @@ func (e Elasticsearch7Operator) flushRecords() error {
return e.bulkProcessor.Flush()
}

func (e *Elasticsearch8Operator) processData(ctx context.Context, data []interface{}, esConf *ElasticsearchConf) error {
for dataIndex := range data {
if ctxErr := ctx.Err(); ctxErr != nil {
continue
}

d, ok := data[dataIndex].(analytics.AnalyticsRecord)
if !ok {
e.log.Error("Error while writing ", data[dataIndex], ": data not of type analytics.AnalyticsRecord")
continue
}

mapping, id := getMapping(d, esConf.ExtendedStatistics, esConf.GenerateID, esConf.DecodeBase64)
bs, err := json.Marshal(mapping)
if err != nil {
e.log.Error("Error while writing ", data[dataIndex], ": failed to marshal into JSON: ", err)
continue
}
body := bytes.NewReader(bs)

if !esConf.DisableBulk {
err = e.bulkIndexer.Add(
ctx,
esutil.BulkIndexerItem{
Action: "index",
Body: body,
Index: getIndexName(esConf),
DocumentID: id,

OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, resp esutil.BulkIndexerResponseItem) {
e.log.Info("Purged 1 record...")
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, resp esutil.BulkIndexerResponseItem, err error) {
e.log.Error("Error while writing ", data[dataIndex], err)
},
},
)
if err != nil {
e.log.Error("Error while adding ", data[dataIndex], " to BulkIndexer: ", err)
}
} else {
e.esClient.Index(
getIndexName(esConf),
body,
e.esClient.Index.WithDocumentID(id),
e.esClient.Index.WithContext(ctx),
)
if err != nil {
e.log.Error("Error while writing ", data[dataIndex], err)
}
}
}
if esConf.DisableBulk {
e.log.Info("Purged ", len(data), " records...")
}

return nil
}

func (e *Elasticsearch8Operator) flushRecords() error {
err := e.bulkIndexer.Close(context.Background())
if err != nil {
return err
}
e.log.Info("Purged ", e.bulkIndexer.Stats().NumFlushed, " records in this bulk...")
e.bulkIndexer, err = setupElasticsearch8BulkIndexer(e)
return err
}

// printPurgedBulkRecords print the purged records = bulk size when bulk is enabled
func printPurgedBulkRecords(bulkSize int, err error, logger *logrus.Entry) {
if err != nil {
Expand Down

0 comments on commit fe4f80c

Please sign in to comment.