diff --git a/elasticsearch/bulk/bulk.go b/elasticsearch/bulk/bulk.go index 260c387..e202bf4 100644 --- a/elasticsearch/bulk/bulk.go +++ b/elasticsearch/bulk/bulk.go @@ -55,7 +55,7 @@ type Metric struct { } type BatchItem struct { - Bytes []byte + Bytes *bytes.Buffer Action *document.ESActionDocument } @@ -145,7 +145,7 @@ func (b *Bulk) AddActions( key := getActionKey(action) if batchIndex, ok := b.batchKeys[key]; ok { - b.batchByteSize += len(value) - len(b.batch[batchIndex].Bytes) + b.batchByteSize += value.Len() - b.batch[batchIndex].Bytes.Len() b.batch[batchIndex] = BatchItem{ Action: &actions[i], Bytes: value, @@ -158,7 +158,7 @@ func (b *Bulk) AddActions( b.batchKeys[key] = b.batchIndex b.batchIndex++ b.batchSize++ - b.batchByteSize += len(value) + b.batchByteSize += value.Len() } } ctx.Ack() @@ -182,35 +182,43 @@ var ( var metaPool = sync.Pool{ New: func() interface{} { - return []byte{} + return &bytes.Buffer{} }, } -func getEsActionJSON(docID []byte, action document.EsAction, indexName string, routing *string, source []byte, typeName []byte) []byte { - meta := metaPool.Get().([]byte)[:0] +func getEsActionJSON( + docID []byte, + action document.EsAction, + indexName string, + routing *string, + source []byte, + typeName []byte, +) *bytes.Buffer { + meta := metaPool.Get().(*bytes.Buffer) + meta.Reset() if action == document.Index { - meta = append(meta, indexPrefix...) + meta.Write(indexPrefix) } else { - meta = append(meta, deletePrefix...) + meta.Write(deletePrefix) } - meta = append(meta, helper.Byte(indexName)...) - meta = append(meta, idPrefix...) - meta = append(meta, helper.EscapePredefinedBytes(docID)...) + meta.Write(helper.Byte(indexName)) + meta.Write(idPrefix) + meta.Write(helper.EscapePredefinedBytes(docID)) if routing != nil { - meta = append(meta, routingPrefix...) - meta = append(meta, helper.Byte(*routing)...) + meta.Write(routingPrefix) + meta.Write(helper.Byte(*routing)) } if typeName != nil { - meta = append(meta, typePrefix...) - meta = append(meta, typeName...) + meta.Write(typePrefix) + meta.Write(typeName) } - meta = append(meta, postFix...) + meta.Write(postFix) if action == document.Index { - meta = append(meta, '\n') - meta = append(meta, source...) + meta.WriteByte('\n') + meta.Write(source) } - meta = append(meta, '\n') + meta.WriteByte('\n') return meta } @@ -233,7 +241,6 @@ func (b *Bulk) flushMessages() { } b.batchTicker.Reset(b.batchTickerDuration) for _, batch := range b.batch { - //nolint:staticcheck metaPool.Put(batch.Bytes) } b.batch = b.batch[:0] @@ -387,7 +394,7 @@ func getActionKey(action document.ESActionDocument) string { func getBytes(batchItems []BatchItem) [][]byte { var batchBytes [][]byte for _, batchItem := range batchItems { - batchBytes = append(batchBytes, batchItem.Bytes) + batchBytes = append(batchBytes, batchItem.Bytes.Bytes()) } return batchBytes }