From 45a8227ad6a34e1026a7f0f28b1a3df564d139e7 Mon Sep 17 00:00:00 2001 From: WeblWabl Date: Fri, 6 Dec 2024 16:05:03 -0600 Subject: [PATCH] fix(influxd): update xxhash, avoid stringtoslicebyte in cache (#578) (#25622) (#25624) * fix(influxd): update xxhash, avoid stringtoslicebyte in cache (#578) * fix(influxd): update xxhash, avoid stringtoslicebyte in cache This commit does 3 things: * it updates xxhash from v1 to v2; v2 includes a assembly arm version of Sum64 * it changes the cache storer to write with a string key instead of a byte slice. The cache only reads the key which WriteMulti already has as a string so we can avoid a host of allocations when converting back and forth from immutable strings to mutable byte slices. This includes updating the cache ring and ring partition to write with a string key * it updates the xxhash for finding the cache ring partition to use Sum64String which uses unsafe pointers to directly use a string as a byte slice since it only reads the string. Note: this now uses an assembly version because of the v2 xxhash update. Go 1.22 included new compiler ability to recognize calls of Method([]byte(myString)) and not make a copy but from looking at the call sites, I'm not sure the compiler would recognize it as the conversion to a byte slice was happening several calls earlier. That's what this change set does. If we are uncomfortable with any of these, we can do fewer of them (for example, not upgrade xxhash; and/or not use the specialized Sum64String, etc). For the performance issue in maz-rr, I see converting string keys to byte slices taking between 3-5% of cpu usage on both the primary and secondary. So while this pr doesn't address directly the increased cpu usage on the secondary, it makes cpu usage less on both which still feels like a win. I believe these changes are easier to review that switching to a byte slice pool that is likely needed in other places as the compiler provides nearly all of the correctness checks we need (we are relying also on xxhash v2 being correct). * helps #550 * chore: fix tests/lint * chore: don't use assembly version; should inline This 2 line change causes xxhash to use a purego Sum64 implementation which allows the compiler to see that Sum64 only read the byte slice input which them means is can skip the string to byte slice allocation and since it can skip that, it should inline all the calls to getPartitionStringKey and Sum64 avoiding 1 call to Sum64String which isn't inlined. * chore: update ci build file the ci build doesn't use the make file!!! * chore: revert "chore: update ci build file" This reverts commit 94be66fde03e0bbe18004aab25c0e19051406de2. * chore: revert "chore: don't use assembly version; should inline" This reverts commit 67d8d06c02e17e91ba643a2991e30a49308a5283. (cherry picked from commit 1d334c679ca025645ed93518b7832ae676499cd2) * feat: need to update go sum --------- Co-authored-by: Phil Bracikowski <13472206+philjb@users.noreply.github.com> (cherry picked from commit 06ab2245164ce6091e873334695b8e752e060f69) --- go.mod | 3 +-- go.sum | 7 ++----- pkg/bloom/bloom.go | 2 +- pkg/estimator/hll/hll.go | 2 +- pkg/rhh/rhh.go | 2 +- tsdb/engine/tsm1/cache.go | 8 ++++---- tsdb/engine/tsm1/cache_test.go | 9 ++++----- tsdb/engine/tsm1/ring.go | 20 +++++++++++++------- tsdb/engine/tsm1/ring_test.go | 6 +++--- tsdb/index/tsi1/index.go | 14 +++++++------- tsdb/series_file.go | 4 ++-- 11 files changed, 39 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 54cf01d0309..29b01bde70f 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 github.com/benbjohnson/tmpl v1.0.0 github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40 - github.com/cespare/xxhash v1.1.0 + github.com/cespare/xxhash/v2 v2.3.0 github.com/davecgh/go-spew v1.1.1 github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 github.com/go-chi/chi v4.1.0+incompatible @@ -98,7 +98,6 @@ require ( github.com/benbjohnson/immutable v0.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/deepmap/oapi-codegen v1.6.0 // indirect github.com/denisenkom/go-mssqldb v0.10.0 // indirect github.com/dimchansky/utfbom v1.1.0 // indirect diff --git a/go.sum b/go.sum index 8fd9ef4765b..9bb9846a7e5 100644 --- a/go.sum +++ b/go.sum @@ -118,7 +118,6 @@ github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF0 github.com/Masterminds/sprig v2.16.0+incompatible h1:QZbMUPxRQ50EKAq3LFMnxddMu88/EUUG3qmxwtDmPsY= github.com/Masterminds/sprig v2.16.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/purell v1.1.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= @@ -228,12 +227,11 @@ github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -941,7 +939,6 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= -github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= diff --git a/pkg/bloom/bloom.go b/pkg/bloom/bloom.go index b77ca71b946..0d81c3b7f14 100644 --- a/pkg/bloom/bloom.go +++ b/pkg/bloom/bloom.go @@ -12,7 +12,7 @@ import ( "fmt" "math" - "github.com/cespare/xxhash" + "github.com/cespare/xxhash/v2" ) // Filter represents a bloom filter. diff --git a/pkg/estimator/hll/hll.go b/pkg/estimator/hll/hll.go index 96414d59fe9..6661b6673d9 100644 --- a/pkg/estimator/hll/hll.go +++ b/pkg/estimator/hll/hll.go @@ -22,7 +22,7 @@ import ( "sort" "unsafe" - "github.com/cespare/xxhash" + "github.com/cespare/xxhash/v2" "github.com/influxdata/influxdb/pkg/estimator" ) diff --git a/pkg/rhh/rhh.go b/pkg/rhh/rhh.go index bb8db4be7cb..ce2fc733c56 100644 --- a/pkg/rhh/rhh.go +++ b/pkg/rhh/rhh.go @@ -5,7 +5,7 @@ import ( "encoding/binary" "sort" - "github.com/cespare/xxhash" + "github.com/cespare/xxhash/v2" ) // HashMap represents a hash map that implements Robin Hood Hashing. diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index a169cfe8bb6..c2c87617db8 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -165,7 +165,7 @@ const ( // storer is the interface that descibes a cache's store. type storer interface { entry(key []byte) *entry // Get an entry by its key. - write(key []byte, values Values) (bool, error) // Write an entry to the store. + write(key string, values Values) (bool, error) // Write an entry to the store. remove(key []byte) // Remove an entry from the store. keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys. apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel. @@ -294,7 +294,7 @@ func (c *Cache) Write(key []byte, values []Value) error { return ErrCacheMemorySizeLimitExceeded(n, limit) } - newKey, err := c.store.write(key, values) + newKey, err := c.store.write(string(key), values) if err != nil { atomic.AddInt64(&c.stats.WriteErr, 1) return err @@ -339,7 +339,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error { // We'll optimistially set size here, and then decrement it for write errors. c.increaseSize(addedSize) for k, v := range values { - newKey, err := store.write([]byte(k), v) + newKey, err := store.write(k, v) if err != nil { // The write failed, hold onto the error and adjust the size delta. werr = err @@ -820,7 +820,7 @@ func (c *Cache) updateSnapshots() { type emptyStore struct{} func (e emptyStore) entry(key []byte) *entry { return nil } -func (e emptyStore) write(key []byte, values Values) (bool, error) { return false, nil } +func (e emptyStore) write(key string, values Values) (bool, error) { return false, nil } func (e emptyStore) remove(key []byte) {} func (e emptyStore) keys(sorted bool) [][]byte { return nil } func (e emptyStore) apply(f func([]byte, *entry) error) error { return nil } diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index e011d590bb9..ec98f63a35e 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -1,7 +1,6 @@ package tsm1 import ( - "bytes" "errors" "fmt" "math" @@ -119,8 +118,8 @@ func TestCache_WriteMulti_Stats(t *testing.T) { c.init() c.store = ms - ms.writef = func(key []byte, v Values) (bool, error) { - if bytes.Equal(key, []byte("foo")) { + ms.writef = func(key string, v Values) (bool, error) { + if key == "foo" { return false, errors.New("write failed") } return true, nil @@ -871,7 +870,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) { // Cache's storer implememation. type TestStore struct { entryf func(key []byte) *entry - writef func(key []byte, values Values) (bool, error) + writef func(key string, values Values) (bool, error) removef func(key []byte) keysf func(sorted bool) [][]byte applyf func(f func([]byte, *entry) error) error @@ -883,7 +882,7 @@ type TestStore struct { func NewTestStore() *TestStore { return &TestStore{} } func (s *TestStore) entry(key []byte) *entry { return s.entryf(key) } -func (s *TestStore) write(key []byte, values Values) (bool, error) { return s.writef(key, values) } +func (s *TestStore) write(key string, values Values) (bool, error) { return s.writef(key, values) } func (s *TestStore) remove(key []byte) { s.removef(key) } func (s *TestStore) keys(sorted bool) [][]byte { return s.keysf(sorted) } func (s *TestStore) apply(f func([]byte, *entry) error) error { return s.applyf(f) } diff --git a/tsdb/engine/tsm1/ring.go b/tsdb/engine/tsm1/ring.go index 51740614d5d..e424b64cad2 100644 --- a/tsdb/engine/tsm1/ring.go +++ b/tsdb/engine/tsm1/ring.go @@ -4,7 +4,7 @@ import ( "fmt" "sync" - "github.com/cespare/xxhash" + "github.com/cespare/xxhash/v2" "github.com/influxdata/influxdb/pkg/bytesutil" ) @@ -80,6 +80,12 @@ func (r *ring) getPartition(key []byte) *partition { return r.partitions[int(xxhash.Sum64(key)%uint64(len(r.partitions)))] } +// getPartition retrieves the hash ring partition associated with the provided +// key, as a string, which can be faster if you already have a string as this is read only +func (r *ring) getPartitionStringKey(key string) *partition { + return r.partitions[int(xxhash.Sum64String(key)%uint64(len(r.partitions)))] +} + // entry returns the entry for the given key. // entry is safe for use by multiple goroutines. func (r *ring) entry(key []byte) *entry { @@ -89,8 +95,8 @@ func (r *ring) entry(key []byte) *entry { // write writes values to the entry in the ring's partition associated with key. // If no entry exists for the key then one will be created. // write is safe for use by multiple goroutines. -func (r *ring) write(key []byte, values Values) (bool, error) { - return r.getPartition(key).write(key, values) +func (r *ring) write(key string, values Values) (bool, error) { + return r.getPartitionStringKey(key).write(key, values) } // remove deletes the entry for the given key. @@ -218,9 +224,9 @@ func (p *partition) entry(key []byte) *entry { // write writes the values to the entry in the partition, creating the entry // if it does not exist. // write is safe for use by multiple goroutines. -func (p *partition) write(key []byte, values Values) (bool, error) { +func (p *partition) write(key string, values Values) (bool, error) { p.mu.RLock() - e := p.store[string(key)] + e := p.store[key] p.mu.RUnlock() if e != nil { // Hot path. @@ -231,7 +237,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) { defer p.mu.Unlock() // Check again. - if e = p.store[string(key)]; e != nil { + if e = p.store[key]; e != nil { return false, e.add(values) } @@ -241,7 +247,7 @@ func (p *partition) write(key []byte, values Values) (bool, error) { return false, err } - p.store[string(key)] = e + p.store[key] = e return true, nil } diff --git a/tsdb/engine/tsm1/ring_test.go b/tsdb/engine/tsm1/ring_test.go index 172dccb5db8..c9449cddcf2 100644 --- a/tsdb/engine/tsm1/ring_test.go +++ b/tsdb/engine/tsm1/ring_test.go @@ -51,7 +51,7 @@ var strSliceRes [][]byte func benchmarkRingkeys(b *testing.B, r *ring, keys int) { // Add some keys for i := 0; i < keys; i++ { - r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{ + r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{ IntegerValue{ unixnano: 1, value: int64(i), @@ -77,7 +77,7 @@ func benchmarkRingGetPartition(b *testing.B, r *ring, keys int) { // Add some keys for i := 0; i < keys; i++ { vals[i] = []byte(fmt.Sprintf("cpu,host=server-%d field1=value1,field2=value2,field4=value4,field5=value5,field6=value6,field7=value7,field8=value1,field9=value2,field10=value4,field11=value5,field12=value6,field13=value7", i)) - r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", i)), Values([]Value{ + r.write(fmt.Sprintf("cpu,host=server-%d value=1", i), Values([]Value{ IntegerValue{ unixnano: 1, value: int64(i), @@ -109,7 +109,7 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) { go func() { defer wg.Done() for j := 0; j < n; j++ { - if _, err := r.write([]byte(fmt.Sprintf("cpu,host=server-%d value=1", j)), Values{}); err != nil { + if _, err := r.write(fmt.Sprintf("cpu,host=server-%d value=1", j), Values{}); err != nil { errC <- err } } diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 44139412026..6bfa12d09a8 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -13,7 +13,7 @@ import ( "time" "unsafe" - "github.com/cespare/xxhash" + "github.com/cespare/xxhash/v2" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/estimator/hll" @@ -141,7 +141,7 @@ type Index struct { path string // Root directory of the index partitions. disableCompactions bool // Initially disables compactions on the index. maxLogFileSize int64 // Maximum size of a LogFile before it's compacted. - maxLogFileAge time.Duration // Maximum age of a LogFile before it's compacted + maxLogFileAge time.Duration // Maximum age of a LogFile before it's compacted. logfileBufferSize int // The size of the buffer used by the LogFile. disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline. logger *zap.Logger // Index's logger. @@ -398,9 +398,9 @@ func (i *Index) updateMeasurementSketches() error { for j := 0; j < int(i.PartitionN); j++ { if s, t, err := i.partitions[j].MeasurementsSketches(); err != nil { return err - } else if i.mSketch.Merge(s); err != nil { + } else if err := i.mSketch.Merge(s); err != nil { return err - } else if i.mTSketch.Merge(t); err != nil { + } else if err := i.mTSketch.Merge(t); err != nil { return err } } @@ -412,9 +412,9 @@ func (i *Index) updateSeriesSketches() error { for j := 0; j < int(i.PartitionN); j++ { if s, t, err := i.partitions[j].SeriesSketches(); err != nil { return err - } else if i.sSketch.Merge(s); err != nil { + } else if err := i.sSketch.Merge(s); err != nil { return err - } else if i.sTSketch.Merge(t); err != nil { + } else if err := i.sTSketch.Merge(t); err != nil { return err } } @@ -898,7 +898,7 @@ func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error func (i *Index) DropSeriesGlobal(key []byte) error { return nil } // DropMeasurementIfSeriesNotExist drops a measurement only if there are no more -// series for the measurment. +// series for the measurement. func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { // Check if that was the last series for the measurement in the entire index. if ok, err := i.MeasurementHasSeries(name); err != nil { diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 9c9059b1138..eca08cc3863 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -11,7 +11,7 @@ import ( "sort" "sync" - "github.com/cespare/xxhash" + "github.com/cespare/xxhash/v2" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/binaryutil" "github.com/influxdata/influxdb/pkg/limiter" @@ -401,7 +401,7 @@ func ParseSeriesKey(data []byte) (name []byte, tags models.Tags) { } // ParseSeriesKeyInto extracts the name and tags for data, parsing the tags into -// dstTags, which is then returened. +// dstTags, which is then returned. // // The returned dstTags may have a different length and capacity. func ParseSeriesKeyInto(data []byte, dstTags models.Tags) ([]byte, models.Tags) {