Skip to content

Commit

Permalink
Merge pull request #43 from axiomhq/intmap
Browse files Browse the repository at this point in the history
Improve set operation speeds
  • Loading branch information
seiflotfy authored Dec 5, 2024
2 parents 53e9214 + 4528df8 commit 0cc8976
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 37 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/axiomhq/hyperloglog

go 1.21
go 1.23

toolchain go1.23.0
toolchain go1.23.4

require (
github.com/davecgh/go-spew v1.1.1
Expand All @@ -11,6 +11,7 @@ require (
)

require (
github.com/kamstrup/intmap v0.5.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8=
github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/kamstrup/intmap v0.5.0 h1:WY7OJQeG7Ujc9zpPTO6PraDGSveG9js9wCPoI2q8wJQ=
github.com/kamstrup/intmap v0.5.0/go.mod h1:gWUVWHKzWj8xpJVFf5GC0O26bWmv3GqdnIX/LMT6Aq4=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
Expand Down
34 changes: 16 additions & 18 deletions hyperloglog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Sketch struct {
p uint8
m uint32
alpha float64
tmpSet set
tmpSet *set
sparseList *compressedList
regs []uint8
}
Expand All @@ -45,7 +45,7 @@ func NewSketch(precision uint8, sparse bool) (*Sketch, error) {
alpha: alpha(float64(m)),
}
if sparse {
s.tmpSet = set{}
s.tmpSet = newSet(0)
s.sparseList = newCompressedList(0)
} else {
s.regs = make([]uint8, m)
Expand All @@ -65,7 +65,7 @@ func (sk *Sketch) Clone() *Sketch {
}

func (sk *Sketch) maybeToNormal() {
if uint32(len(sk.tmpSet))*100 > sk.m {
if uint32(sk.tmpSet.Len())*100 > sk.m {
sk.mergeSparse()
if uint32(sk.sparseList.Len()) > sk.m {
sk.toNormal()
Expand All @@ -90,9 +90,7 @@ func (sk *Sketch) Merge(other *Sketch) error {
}

func (sk *Sketch) mergeSparseSketch(other *Sketch) {
for k := range other.tmpSet {
sk.tmpSet.add(k)
}
sk.tmpSet.Merge(other.tmpSet)
for iter := other.sparseList.Iter(); iter.HasNext(); {
sk.tmpSet.add(iter.Next())
}
Expand All @@ -105,10 +103,10 @@ func (sk *Sketch) mergeDenseSketch(other *Sketch) {
}

if other.sparse() {
for k := range other.tmpSet {
other.tmpSet.ForEach(func(k uint32) {
i, r := decodeHash(k, other.p, pp)
sk.insert(i, r)
}
})
for iter := other.sparseList.Iter(); iter.HasNext(); {
i, r := decodeHash(iter.Next(), other.p, pp)
sk.insert(i, r)
Expand All @@ -123,7 +121,7 @@ func (sk *Sketch) mergeDenseSketch(other *Sketch) {
}

func (sk *Sketch) toNormal() {
if len(sk.tmpSet) > 0 {
if sk.tmpSet.Len() > 0 {
sk.mergeSparse()
}

Expand Down Expand Up @@ -165,17 +163,17 @@ func (sk *Sketch) Estimate() uint64 {
}

func (sk *Sketch) mergeSparse() {
if len(sk.tmpSet) == 0 {
if sk.tmpSet.Len() == 0 {
return
}

keys := make(uint64Slice, 0, len(sk.tmpSet))
for k := range sk.tmpSet {
keys := make(uint64Slice, 0, sk.tmpSet.Len())
sk.tmpSet.ForEach(func(k uint32) {
keys = append(keys, k)
}
})
sort.Sort(keys)

newList := newCompressedList(4*len(sk.tmpSet) + len(sk.sparseList.b))
newList := newCompressedList(4*sk.tmpSet.Len() + sk.sparseList.Len())
for iter, i := sk.sparseList.Iter(), 0; iter.HasNext() || i < len(keys); {
if !iter.HasNext() {
newList.Append(keys[i])
Expand All @@ -201,7 +199,7 @@ func (sk *Sketch) mergeSparse() {
}

sk.sparseList = newList
sk.tmpSet = set{}
sk.tmpSet = newSet(0)
}

// MarshalBinary implements the encoding.BinaryMarshaler interface.
Expand Down Expand Up @@ -277,7 +275,7 @@ func (sk *Sketch) UnmarshalBinary(data []byte) error {
sparse := data[3] == byte(1)

// Make a newSketch Sketch if the precision doesn't match or if the Sketch was used
if sk.p != p || sk.regs != nil || len(sk.tmpSet) > 0 || (sk.sparseList != nil && sk.sparseList.Len() > 0) {
if sk.p != p || sk.regs != nil || sk.tmpSet.Len() > 0 || (sk.sparseList != nil && sk.sparseList.Len() > 0) {
newh, err := NewSketch(p, sparse)
if err != nil {
return err
Expand All @@ -292,14 +290,14 @@ func (sk *Sketch) UnmarshalBinary(data []byte) error {

// Unmarshal the tmp_set.
tssz := binary.BigEndian.Uint32(data[4:8])
sk.tmpSet = make(map[uint32]struct{}, tssz)
sk.tmpSet = newSet(int(tssz))

// We need to unmarshal tssz values in total, and each value requires us
// to read 4 bytes.
tsLastByte := int((tssz * 4) + 8)
for i := 8; i < tsLastByte; i += 4 {
k := binary.BigEndian.Uint32(data[i : i+4])
sk.tmpSet[k] = struct{}{}
sk.tmpSet.add(k)
}

// Unmarshal the sparse Sketch.
Expand Down
40 changes: 39 additions & 1 deletion hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ func TestHLL_Error(t *testing.T) {

func TestHLL_Marshal_Unmarshal_Sparse(t *testing.T) {
sk, _ := NewSketch(4, true)
sk.tmpSet = map[uint32]struct{}{26: {}, 40: {}}
sk.tmpSet = newSet(2)
sk.tmpSet.add(26)
sk.tmpSet.add(40)

// Add a bunch of values to the sparse representation.
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -811,3 +813,39 @@ func TestHLL_Add_Out_Of_Order(t *testing.T) {
})
}
}

func benchmarkMerge(b *testing.B, size1, size2 int) {
// Generate data for first sketch
sk1 := New14()
for i := 0; i < size1; i++ {
sk1.Insert([]byte(fmt.Sprintf("a%d", i)))
}

// Generate data for second sketch
sk2 := New14()
for i := 0; i < size2; i++ {
sk2.Insert([]byte(fmt.Sprintf("b%d", i)))
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
sk := New14()
sk.Merge(sk1)
sk.Merge(sk2)
}
}

func Benchmark_Merge(b *testing.B) {
sizes := []int{100, 10000, 1000000}

for _, size1 := range sizes {
for _, size2 := range sizes {
name := fmt.Sprintf("size1=%d/size2=%d", size1, size2)
b.Run(name, func(b *testing.B) {
benchmarkMerge(b, size1, size2)
})
}
}
}
59 changes: 43 additions & 16 deletions sparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package hyperloglog

import (
"math/bits"

"github.com/kamstrup/intmap"
)

func getIndex(k uint32, p, pp uint8) uint32 {
Expand Down Expand Up @@ -34,37 +36,61 @@ func decodeHash(k uint32, p, pp uint8) (uint32, uint8) {
return getIndex(k, p, pp), r
}

type set map[uint32]struct{}
type set struct {
m *intmap.Set[uint32]
}

func newSet(size int) *set {
return &set{m: intmap.NewSet[uint32](size)}
}

func (s *set) ForEach(fn func(v uint32)) {
s.m.ForEach(func(v uint32) bool {
fn(v)
return true
})
}

func (s *set) Merge(other *set) {
other.m.ForEach(func(v uint32) bool {
s.m.Add(v)
return true
})
}

func (s *set) Len() int {
return s.m.Len()
}

func (s set) add(v uint32) bool {
_, ok := s[v]
if ok {
func (s *set) add(v uint32) bool {
if s.m.Has(v) {
return false
}
s[v] = struct{}{}
s.m.Add(v)
return true
}

func (s set) Clone() set {
func (s *set) Clone() *set {
if s == nil {
return nil
}

newS := make(map[uint32]struct{}, len(s))
for k, v := range s {
newS[k] = v
}
return newS
newS := intmap.NewSet[uint32](s.m.Len())
s.m.ForEach(func(v uint32) bool {
newS.Add(v)
return true
})
return &set{m: newS}
}

func (s set) MarshalBinary() (data []byte, err error) {
func (s *set) MarshalBinary() (data []byte, err error) {
// 4 bytes for the size of the set, and 4 bytes for each key.
// list.
data = make([]byte, 0, 4+(4*len(s)))
data = make([]byte, 0, 4+(4*s.m.Len()))

// Length of the set. We only need 32 bits because the size of the set
// couldn't exceed that on 32 bit architectures.
sl := len(s)
sl := s.m.Len()
data = append(data, []byte{
byte(sl >> 24),
byte(sl >> 16),
Expand All @@ -73,14 +99,15 @@ func (s set) MarshalBinary() (data []byte, err error) {
}...)

// Marshal each element in the set.
for k := range s {
s.m.ForEach(func(k uint32) bool {
data = append(data, []byte{
byte(k >> 24),
byte(k >> 16),
byte(k >> 8),
byte(k),
}...)
}
return true
})

return data, nil
}
Expand Down

0 comments on commit 0cc8976

Please sign in to comment.