From 28cea4d957010e3a3c9dddc3041c54d61fbb6db3 Mon Sep 17 00:00:00 2001 From: Michael Desa Date: Tue, 18 Dec 2018 10:44:25 -0500 Subject: [PATCH] feat(platform): add generic kv store Co-authored-by: Leonardo Di Donato Co-authored-by: Michael Desa feat(kv): add kv store interface for services feat(bolt): add boltdb implementation of kv.Store spike(platform): add kv backed user service feat(kv): add static cursor Note here that this operation cannot be transactionally done. This poses a bit of issues that will need to be worked out. fix(bolt): use error explicit error message squash: play with interface a bit fix(kv): remove commit and rollback from kv interface feat(inmem): add inmem kv store chore: add note for inmem transactions fix(bolt): remove call to tx in kv store tests feat(kv): add tests for static cursor doc(kv): add comments to store and associated interfaces doc(bolt): add comments to key value store feat(testing): add kv store tests test(testing): add conformance test for kv.Store test(inmem): add kv.Store conformance tests doc(inmem): add comments to key value store feat(inmem): remove CreateBucketIfNotExists from Tx interface feat(bolt): remove CreateBucketIfNotExists from Tx feat(inmem): remove CreateBucketIfNotExists from Tx doc(kv): add note to bucket interface about conditions methods can be called feat(kv): add context methods to kv.Tx feat(bolt): add context methods to bolt.Tx feat(inmem): add context methods to inmem.Tx test(kv): add contract tests for view/update transactions feat(kv): ensure that static cursor is always valid Co-authored-by: Leonardo Di Donato Co-authored-by: Michael Desa fix(kv): remove error from cursor methods test(kv): remove want errors from cursor test test(testing): add concurrent update test for kv.Store feat(kv): make kv user service an example service fix(testing): add concurrnent update test to the kv.Store contract tests test(platform): fix example kv service tests dep(platform): make platform tidy --- bolt/bbolt_test.go | 21 + bolt/kv.go | 219 +++++++++++ bolt/kv_test.go | 92 +++++ bolt/user_test.go | 5 +- go.mod | 7 +- go.sum | 14 +- inmem/kv.go | 203 ++++++++++ inmem/kv_test.go | 64 ++++ kv/cursor.go | 80 ++++ kv/cursor_test.go | 244 ++++++++++++ kv/example.go | 436 +++++++++++++++++++++ kv/store.go | 52 +++ testing/kv.go | 928 +++++++++++++++++++++++++++++++++++++++++++++ user.go | 4 +- 14 files changed, 2357 insertions(+), 12 deletions(-) create mode 100644 bolt/kv.go create mode 100644 bolt/kv_test.go create mode 100644 inmem/kv.go create mode 100644 inmem/kv_test.go create mode 100644 kv/cursor.go create mode 100644 kv/cursor_test.go create mode 100644 kv/example.go create mode 100644 kv/store.go create mode 100644 testing/kv.go diff --git a/bolt/bbolt_test.go b/bolt/bbolt_test.go index 43e00f800ca..38af04ea0ee 100644 --- a/bolt/bbolt_test.go +++ b/bolt/bbolt_test.go @@ -64,3 +64,24 @@ func TestClientOpen(t *testing.T) { t.Fatalf("unable to close database %s: %v", boltFile, err) } } + +func NewTestKVStore() (*bolt.KVStore, func(), error) { + f, err := ioutil.TempFile("", "influxdata-platform-bolt-") + if err != nil { + return nil, nil, errors.New("unable to open temporary boltdb file") + } + f.Close() + + path := f.Name() + s := bolt.NewKVStore(path) + if err := s.Open(context.TODO()); err != nil { + return nil, nil, err + } + + close := func() { + s.Close() + os.Remove(path) + } + + return s, close, nil +} diff --git a/bolt/kv.go b/bolt/kv.go new file mode 100644 index 00000000000..01e2a78aaaf --- /dev/null +++ b/bolt/kv.go @@ -0,0 +1,219 @@ +package bolt + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + bolt "github.com/coreos/bbolt" + "github.com/influxdata/platform/kv" + "go.uber.org/zap" +) + +// KVStore is a kv.Store backed by boltdb. +type KVStore struct { + path string + db *bolt.DB + logger *zap.Logger +} + +// NewKVStore returns an instance of KVStore with the file at +// the provided path. +func NewKVStore(path string) *KVStore { + return &KVStore{ + path: path, + logger: zap.NewNop(), + } +} + +// Open creates boltDB file it doesn't exists and opens it otherwise. +func (s *KVStore) Open(ctx context.Context) error { + // Ensure the required directory structure exists. + if err := os.MkdirAll(filepath.Dir(s.path), 0700); err != nil { + return fmt.Errorf("unable to create directory %s: %v", s.path, err) + } + + if _, err := os.Stat(s.path); err != nil && !os.IsNotExist(err) { + return err + } + + // Open database file. + db, err := bolt.Open(s.path, 0600, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return fmt.Errorf("unable to open boltdb file %v", err) + } + s.db = db + + s.logger.Info("Resources opened", zap.String("path", s.path)) + return nil +} + +// Close the connection to the bolt database +func (s *KVStore) Close() error { + if s.db != nil { + return s.db.Close() + } + return nil +} + +// WithLogger sets the logger on the store. +func (s *KVStore) WithLogger(l *zap.Logger) { + s.logger = l +} + +// WithDB sets the boltdb on the store. +func (s *KVStore) WithDB(db *bolt.DB) { + s.db = db +} + +// View opens up a view transaction against the store. +func (s *KVStore) View(fn func(tx kv.Tx) error) error { + return s.db.View(func(tx *bolt.Tx) error { + return fn(&Tx{ + tx: tx, + ctx: context.Background(), + }) + }) +} + +// Update opens up an update transaction against the store. +func (s *KVStore) Update(fn func(tx kv.Tx) error) error { + return s.db.Update(func(tx *bolt.Tx) error { + return fn(&Tx{ + tx: tx, + ctx: context.Background(), + }) + }) +} + +// Tx is a light wrapper around a boltdb transaction. It implements kv.Tx. +type Tx struct { + tx *bolt.Tx + ctx context.Context +} + +// Context returns the context for the transaction. +func (tx *Tx) Context() context.Context { + return tx.ctx +} + +// WithContext sets the context for the transaction. +func (tx *Tx) WithContext(ctx context.Context) { + tx.ctx = ctx +} + +// createBucketIfNotExists creates a bucket with the provided byte slice. +func (tx *Tx) createBucketIfNotExists(b []byte) (*Bucket, error) { + bkt, err := tx.tx.CreateBucketIfNotExists(b) + if err != nil { + return nil, err + } + return &Bucket{ + bucket: bkt, + }, nil +} + +// Bucket retrieves the bucket named b. +func (tx *Tx) Bucket(b []byte) (kv.Bucket, error) { + bkt := tx.tx.Bucket(b) + if bkt == nil { + return tx.createBucketIfNotExists(b) + } + return &Bucket{ + bucket: bkt, + }, nil +} + +// Bucket implements kv.Bucket. +type Bucket struct { + bucket *bolt.Bucket +} + +// Get retrieves the value at the provided key. +func (b *Bucket) Get(key []byte) ([]byte, error) { + val := b.bucket.Get(key) + if len(val) == 0 { + return nil, kv.ErrKeyNotFound + } + + return val, nil +} + +// Put sets the value at the provided key. +func (b *Bucket) Put(key []byte, value []byte) error { + err := b.bucket.Put(key, value) + if err == bolt.ErrTxNotWritable { + return kv.ErrTxNotWritable + } + return err +} + +// Delete removes the provided key. +func (b *Bucket) Delete(key []byte) error { + err := b.bucket.Delete(key) + if err == bolt.ErrTxNotWritable { + return kv.ErrTxNotWritable + } + return err +} + +// Cursor retrieves a cursor for iterating through the entries +// in the key value store. +func (b *Bucket) Cursor() (kv.Cursor, error) { + return &Cursor{ + cursor: b.bucket.Cursor(), + }, nil +} + +// Cursor is a struct for iterating through the entries +// in the key value store. +type Cursor struct { + cursor *bolt.Cursor +} + +// Seek seeks for the first key that matches the prefix provided. +func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) { + k, v := c.cursor.Seek(prefix) + if len(v) == 0 { + return nil, nil + } + return k, v +} + +// First retrieves the first key value pair in the bucket. +func (c *Cursor) First() ([]byte, []byte) { + k, v := c.cursor.First() + if len(v) == 0 { + return nil, nil + } + return k, v +} + +// Last retrieves the last key value pair in the bucket. +func (c *Cursor) Last() ([]byte, []byte) { + k, v := c.cursor.Last() + if len(v) == 0 { + return nil, nil + } + return k, v +} + +// Next retrieves the next key in the bucket. +func (c *Cursor) Next() ([]byte, []byte) { + k, v := c.cursor.Next() + if len(v) == 0 { + return nil, nil + } + return k, v +} + +// Prev retrieves the previous key in the bucket. +func (c *Cursor) Prev() ([]byte, []byte) { + k, v := c.cursor.Prev() + if len(v) == 0 { + return nil, nil + } + return k, v +} diff --git a/bolt/kv_test.go b/bolt/kv_test.go new file mode 100644 index 00000000000..8b083518b7f --- /dev/null +++ b/bolt/kv_test.go @@ -0,0 +1,92 @@ +package bolt_test + +import ( + "context" + "testing" + + "github.com/influxdata/platform" + "github.com/influxdata/platform/kv" + platformtesting "github.com/influxdata/platform/testing" +) + +func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func()) { + s, closeFn, err := NewTestKVStore() + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + + err = s.Update(func(tx kv.Tx) error { + b, err := tx.Bucket(f.Bucket) + if err != nil { + return err + } + + for _, p := range f.Pairs { + if err := b.Put(p.Key, p.Value); err != nil { + return err + } + } + + return nil + }) + if err != nil { + t.Fatalf("failed to put keys: %v", err) + } + return s, func() { + closeFn() + } +} + +func TestKVStore(t *testing.T) { + platformtesting.KVStore(initKVStore, t) +} + +func initExampleService(f platformtesting.UserFields, t *testing.T) (platform.UserService, string, func()) { + s, closeFn, err := NewTestKVStore() + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + svc := kv.NewExampleService(s, f.IDGenerator) + if err := svc.Initialize(); err != nil { + t.Fatalf("error initializing user service: %v", err) + } + + ctx := context.Background() + for _, u := range f.Users { + if err := svc.PutUser(ctx, u); err != nil { + t.Fatalf("failed to populate users") + } + } + return svc, "", func() { + defer closeFn() + for _, u := range f.Users { + if err := svc.DeleteUser(ctx, u.ID); err != nil { + t.Logf("failed to remove users: %v", err) + } + } + } +} + +func TestExampleService_CreateUser(t *testing.T) { + platformtesting.CreateUser(initExampleService, t) +} + +func TestExampleService_FindUserByID(t *testing.T) { + platformtesting.FindUserByID(initExampleService, t) +} + +func TestExampleService_FindUsers(t *testing.T) { + platformtesting.FindUsers(initExampleService, t) +} + +func TestExampleService_DeleteUser(t *testing.T) { + platformtesting.DeleteUser(initExampleService, t) +} + +func TestExampleService_FindUser(t *testing.T) { + platformtesting.FindUser(initExampleService, t) +} + +func TestExampleService_UpdateUser(t *testing.T) { + platformtesting.UpdateUser(initExampleService, t) +} diff --git a/bolt/user_test.go b/bolt/user_test.go index 6bc7840307b..b3c529cc5c4 100644 --- a/bolt/user_test.go +++ b/bolt/user_test.go @@ -5,16 +5,17 @@ import ( "testing" "github.com/influxdata/platform" - bolt "github.com/influxdata/platform/bolt" + "github.com/influxdata/platform/bolt" platformtesting "github.com/influxdata/platform/testing" ) func initUserService(f platformtesting.UserFields, t *testing.T) (platform.UserService, string, func()) { c, closeFn, err := NewTestClient() if err != nil { - t.Fatalf("failed to create new bolt client: %v", err) + t.Fatalf("failed to create new kv store: %v", err) } c.IDGenerator = f.IDGenerator + ctx := context.Background() for _, u := range f.Users { if err := c.PutUser(ctx, u); err != nil { diff --git a/go.mod b/go.mod index 04b1402e95a..33fec377e22 100644 --- a/go.mod +++ b/go.mod @@ -25,8 +25,8 @@ require ( github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e // indirect github.com/cenkalti/backoff v2.0.0+incompatible // indirect github.com/cespare/xxhash v1.1.0 - github.com/circonus-labs/circonus-gometrics v2.2.4+incompatible // indirect - github.com/circonus-labs/circonusllhist v0.1.1 // indirect + github.com/circonus-labs/circonus-gometrics v2.2.5+incompatible // indirect + github.com/circonus-labs/circonusllhist v0.1.3 // indirect github.com/containerd/continuity v0.0.0-20181027224239-bea7585dbfac // indirect github.com/coreos/bbolt v1.3.1-coreos.6 github.com/davecgh/go-spew v1.1.1 @@ -50,6 +50,7 @@ require ( github.com/gocql/gocql v0.0.0-20181117210152-33c0e89ca93a // indirect github.com/gogo/protobuf v1.1.1 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c github.com/google/go-cmp v0.2.0 github.com/google/go-github v17.0.0+incompatible github.com/google/go-querystring v1.0.0 // indirect @@ -91,7 +92,7 @@ require ( github.com/mattn/go-isatty v0.0.4 github.com/mattn/go-zglob v0.0.0-20180803001819-2ea3427bfa53 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 - github.com/miekg/dns v1.0.15 // indirect + github.com/miekg/dns v1.1.1 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.0.0 // indirect github.com/mitchellh/mapstructure v1.1.2 // indirect diff --git a/go.sum b/go.sum index 9932ecc2032..f9f72266f07 100644 --- a/go.sum +++ b/go.sum @@ -66,10 +66,10 @@ github.com/cenkalti/backoff v2.0.0+incompatible h1:5IIPUHhlnUZbcHQsQou5k1Tn58nJk github.com/cenkalti/backoff v2.0.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/circonus-labs/circonus-gometrics v2.2.4+incompatible h1:+ZwGzyJGsOwSxIEDDOXzPagR167tQak/1P5wBwH+/dM= -github.com/circonus-labs/circonus-gometrics v2.2.4+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= -github.com/circonus-labs/circonusllhist v0.1.1 h1:MNPpugofgAFpPY/hTULMZIRfN18c5EQc8B8+4oFBx+4= -github.com/circonus-labs/circonusllhist v0.1.1/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= +github.com/circonus-labs/circonus-gometrics v2.2.5+incompatible h1:KsuY3ogbxgVv3FNhbLUoT+SE9znoWEUIuChSIT4HukI= +github.com/circonus-labs/circonus-gometrics v2.2.5+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3 h1:TJH+oke8D16535+jHExHj4nQvzlZrj7ug5D7I/orNUA= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/containerd/continuity v0.0.0-20181027224239-bea7585dbfac h1:PThQaO4yCvJzJBUW1XoFQxLotWRhvX2fgljJX8yrhFI= github.com/containerd/continuity v0.0.0-20181027224239-bea7585dbfac/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= @@ -137,6 +137,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY= @@ -278,8 +280,8 @@ github.com/mattn/go-zglob v0.0.0-20180803001819-2ea3427bfa53 h1:tGfIHhDghvEnneeR github.com/mattn/go-zglob v0.0.0-20180803001819-2ea3427bfa53/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/miekg/dns v1.0.15 h1:9+UupePBQCG6zf1q/bGmTO1vumoG13jsrbWOSX1W6Tw= -github.com/miekg/dns v1.0.15/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/miekg/dns v1.1.1 h1:DVkblRdiScEnEr0LR9nTnEQqHYycjkXW9bOjd+2EL2o= +github.com/miekg/dns v1.1.1/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0= diff --git a/inmem/kv.go b/inmem/kv.go new file mode 100644 index 00000000000..0a20b5af91a --- /dev/null +++ b/inmem/kv.go @@ -0,0 +1,203 @@ +package inmem + +import ( + "bytes" + "context" + "fmt" + "sync" + + "github.com/google/btree" + "github.com/influxdata/platform/kv" +) + +// KVStore is an in memory btree backed kv.Store. +type KVStore struct { + mu sync.RWMutex + buckets map[string]*Bucket +} + +// NewKVStore creates an instance of a KVStore. +func NewKVStore() *KVStore { + return &KVStore{ + buckets: map[string]*Bucket{}, + } +} + +// View opens up a transaction with a read lock. +func (s *KVStore) View(fn func(kv.Tx) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + return fn(&Tx{ + kv: s, + writable: false, + ctx: context.Background(), + }) +} + +// Update opens up a transaction with a write lock. +func (s *KVStore) Update(fn func(kv.Tx) error) error { + s.mu.Lock() + defer s.mu.Unlock() + return fn(&Tx{ + kv: s, + writable: true, + ctx: context.Background(), + }) +} + +// Tx is an in memory transaction. +// TODO: make transactions actually transactional +type Tx struct { + kv *KVStore + writable bool + ctx context.Context +} + +// Context returns the context for the transaction. +func (t *Tx) Context() context.Context { + return t.ctx +} + +// WithContext sets the context for the transaction. +func (t *Tx) WithContext(ctx context.Context) { + t.ctx = ctx +} + +// createBucketIfNotExists creates a btree bucket at the provided key. +func (t *Tx) createBucketIfNotExists(b []byte) (kv.Bucket, error) { + if t.writable { + bkt, ok := t.kv.buckets[string(b)] + if !ok { + bkt = &Bucket{btree.New(2)} + t.kv.buckets[string(b)] = bkt + return &bucket{ + Bucket: bkt, + writable: t.writable, + }, nil + } + + return &bucket{ + Bucket: bkt, + writable: t.writable, + }, nil + } + + return nil, kv.ErrTxNotWritable +} + +// Bucket retrieves the bucket at the provided key. +func (t *Tx) Bucket(b []byte) (kv.Bucket, error) { + bkt, ok := t.kv.buckets[string(b)] + if !ok { + return t.createBucketIfNotExists(b) + } + + return &bucket{ + Bucket: bkt, + writable: t.writable, + }, nil +} + +// Bucket is a btree that implements kv.Bucket. +type Bucket struct { + btree *btree.BTree +} + +type bucket struct { + kv.Bucket + writable bool +} + +// Put wraps the put method of a kv bucket and ensures that the +// bucket is writable. +func (b *bucket) Put(key, value []byte) error { + if b.writable { + return b.Bucket.Put(key, value) + } + return kv.ErrTxNotWritable +} + +// Delete wraps the delete method of a kv bucket and ensures that the +// bucket is writable. +func (b *bucket) Delete(key []byte) error { + if b.writable { + return b.Bucket.Delete(key) + } + return kv.ErrTxNotWritable +} + +type item struct { + key []byte + value []byte +} + +// Less is used to implement btree.Item. +func (i *item) Less(b btree.Item) bool { + j, ok := b.(*item) + if !ok { + return false + } + + return bytes.Compare(i.key, j.key) < 0 +} + +// Get retrieves the value at the provided key. +func (b *Bucket) Get(key []byte) ([]byte, error) { + i := b.btree.Get(&item{key: key}) + + if i == nil { + return nil, kv.ErrKeyNotFound + } + + j, ok := i.(*item) + if !ok { + return nil, fmt.Errorf("error item is type %T not *item", i) + } + + return j.value, nil +} + +// Put sets the key value pair provided. +func (b *Bucket) Put(key []byte, value []byte) error { + _ = b.btree.ReplaceOrInsert(&item{key: key, value: value}) + return nil +} + +// Delete removes the key provided. +func (b *Bucket) Delete(key []byte) error { + _ = b.btree.Delete(&item{key: key}) + return nil +} + +// Cursor creates a static cursor from all entries in the database. +func (b *Bucket) Cursor() (kv.Cursor, error) { + // TODO we should do this by using the Ascend/Descend methods that + // the btree provides. + pairs, err := b.getAll() + if err != nil { + return nil, err + } + + return kv.NewStaticCursor(pairs), nil +} + +func (b *Bucket) getAll() ([]kv.Pair, error) { + pairs := []kv.Pair{} + var err error + b.btree.Ascend(func(i btree.Item) bool { + j, ok := i.(*item) + if !ok { + err = fmt.Errorf("error item is type %T not *item", i) + return false + } + + pairs = append(pairs, kv.Pair{Key: j.key, Value: j.value}) + return true + }) + + if err != nil { + return nil, err + } + + return pairs, nil +} diff --git a/inmem/kv_test.go b/inmem/kv_test.go new file mode 100644 index 00000000000..69eccf84450 --- /dev/null +++ b/inmem/kv_test.go @@ -0,0 +1,64 @@ +package inmem_test + +import ( + "context" + "testing" + + "github.com/influxdata/platform" + "github.com/influxdata/platform/inmem" + "github.com/influxdata/platform/kv" + platformtesting "github.com/influxdata/platform/testing" +) + +func initExampleService(f platformtesting.UserFields, t *testing.T) (platform.UserService, string, func()) { + s := inmem.NewKVStore() + svc := kv.NewExampleService(s, f.IDGenerator) + if err := svc.Initialize(); err != nil { + t.Fatalf("error initializing user service: %v", err) + } + + ctx := context.Background() + for _, u := range f.Users { + if err := svc.PutUser(ctx, u); err != nil { + t.Fatalf("failed to populate users") + } + } + return svc, "", func() { + for _, u := range f.Users { + if err := svc.DeleteUser(ctx, u.ID); err != nil { + t.Logf("failed to remove users: %v", err) + } + } + } +} + +func TestExampleService(t *testing.T) { + platformtesting.UserService(initExampleService, t) +} + +func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func()) { + s := inmem.NewKVStore() + + err := s.Update(func(tx kv.Tx) error { + b, err := tx.Bucket(f.Bucket) + if err != nil { + return err + } + + for _, p := range f.Pairs { + if err := b.Put(p.Key, p.Value); err != nil { + return err + } + } + + return nil + }) + if err != nil { + t.Fatalf("failed to put keys: %v", err) + } + return s, func() {} +} + +func TestKVStore(t *testing.T) { + platformtesting.KVStore(initKVStore, t) +} diff --git a/kv/cursor.go b/kv/cursor.go new file mode 100644 index 00000000000..a8a54e57a79 --- /dev/null +++ b/kv/cursor.go @@ -0,0 +1,80 @@ +package kv + +import ( + "bytes" + "sort" +) + +// staticCursor implements the Cursor interface for a slice of +// static key value pairs. +type staticCursor struct { + idx int + pairs []Pair +} + +// Pair is a struct for key value pairs. +type Pair struct { + Key []byte + Value []byte +} + +// NewStaticCursor returns an instance of a StaticCursor. It +// destructively sorts the provided pairs to be in key ascending order. +func NewStaticCursor(pairs []Pair) Cursor { + sort.Slice(pairs, func(i, j int) bool { + return bytes.Compare(pairs[i].Key, pairs[j].Key) < 0 + }) + return &staticCursor{ + pairs: pairs, + } +} + +// Seek searches the slice for the first key with the provided prefix. +func (c *staticCursor) Seek(prefix []byte) ([]byte, []byte) { + // TODO: do binary search for prefix since pairs are ordered. + for i, pair := range c.pairs { + if bytes.HasPrefix(pair.Key, prefix) { + c.idx = i + return pair.Key, pair.Value + } + } + + return nil, nil +} + +func (c *staticCursor) getValueAtIndex(delta int) ([]byte, []byte) { + idx := c.idx + delta + if idx < 0 { + return nil, nil + } + + if idx >= len(c.pairs) { + return nil, nil + } + + c.idx = idx + + pair := c.pairs[c.idx] + + return pair.Key, pair.Value +} + +// First retrieves the first element in the cursor. +func (c *staticCursor) First() ([]byte, []byte) { + return c.getValueAtIndex(-c.idx) +} + +// Last retrieves the last element in the cursor. +func (c *staticCursor) Last() ([]byte, []byte) { + return c.getValueAtIndex(len(c.pairs) - 1 - c.idx) +} + +// Next retrieves the next entry in the cursor. +func (c *staticCursor) Next() ([]byte, []byte) { + return c.getValueAtIndex(1) +} + +// Prev retrieves the previous entry in the cursor. +func (c *staticCursor) Prev() ([]byte, []byte) { + return c.getValueAtIndex(-1) +} diff --git a/kv/cursor_test.go b/kv/cursor_test.go new file mode 100644 index 00000000000..358cd0e5b24 --- /dev/null +++ b/kv/cursor_test.go @@ -0,0 +1,244 @@ +package kv_test + +import ( + "bytes" + "testing" + + "github.com/influxdata/platform/kv" +) + +func TestStaticCursor_First(t *testing.T) { + type args struct { + pairs []kv.Pair + } + type wants struct { + key []byte + val []byte + } + + tests := []struct { + name string + args args + wants wants + }{ + { + name: "nil pairs", + args: args{ + pairs: nil, + }, + wants: wants{}, + }, + { + name: "empty pairs", + args: args{ + pairs: []kv.Pair{}, + }, + wants: wants{}, + }, + { + name: "unsorted pairs", + args: args{ + pairs: []kv.Pair{ + { + Key: []byte("bcd"), + Value: []byte("yoyo"), + }, + { + Key: []byte("abc"), + Value: []byte("oyoy"), + }, + }, + }, + wants: wants{ + key: []byte("abc"), + val: []byte("oyoy"), + }, + }, + { + name: "sorted pairs", + args: args{ + pairs: []kv.Pair{ + { + Key: []byte("abc"), + Value: []byte("oyoy"), + }, + { + Key: []byte("bcd"), + Value: []byte("yoyo"), + }, + }, + }, + wants: wants{ + key: []byte("abc"), + val: []byte("oyoy"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cur := kv.NewStaticCursor(tt.args.pairs) + + key, val := cur.First() + + if want, got := tt.wants.key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + } + + if want, got := tt.wants.val, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + } + }) + } +} + +func TestStaticCursor_Last(t *testing.T) { + type args struct { + pairs []kv.Pair + } + type wants struct { + key []byte + val []byte + } + + tests := []struct { + name string + args args + wants wants + }{ + { + name: "nil pairs", + args: args{ + pairs: nil, + }, + wants: wants{}, + }, + { + name: "empty pairs", + args: args{ + pairs: []kv.Pair{}, + }, + wants: wants{}, + }, + { + name: "unsorted pairs", + args: args{ + pairs: []kv.Pair{ + { + Key: []byte("bcd"), + Value: []byte("yoyo"), + }, + { + Key: []byte("abc"), + Value: []byte("oyoy"), + }, + }, + }, + wants: wants{ + key: []byte("bcd"), + val: []byte("yoyo"), + }, + }, + { + name: "sorted pairs", + args: args{ + pairs: []kv.Pair{ + { + Key: []byte("abc"), + Value: []byte("oyoy"), + }, + { + Key: []byte("bcd"), + Value: []byte("yoyo"), + }, + }, + }, + wants: wants{ + key: []byte("bcd"), + val: []byte("yoyo"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cur := kv.NewStaticCursor(tt.args.pairs) + + key, val := cur.Last() + + if want, got := tt.wants.key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + } + + if want, got := tt.wants.val, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + } + }) + } +} + +func TestStaticCursor_Seek(t *testing.T) { + type args struct { + prefix []byte + pairs []kv.Pair + } + type wants struct { + key []byte + val []byte + } + + tests := []struct { + name string + args args + wants wants + }{ + { + name: "sorted pairs", + args: args{ + prefix: []byte("bc"), + pairs: []kv.Pair{ + { + Key: []byte("abc"), + Value: []byte("oyoy"), + }, + { + Key: []byte("abcd"), + Value: []byte("oyoy"), + }, + { + Key: []byte("bcd"), + Value: []byte("yoyo"), + }, + { + Key: []byte("bcde"), + Value: []byte("yoyo"), + }, + { + Key: []byte("cde"), + Value: []byte("yyoo"), + }, + }, + }, + wants: wants{ + key: []byte("bcd"), + val: []byte("yoyo"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cur := kv.NewStaticCursor(tt.args.pairs) + + key, val := cur.Seek(tt.args.prefix) + + if want, got := tt.wants.key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + } + + if want, got := tt.wants.val, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + } + }) + } +} diff --git a/kv/example.go b/kv/example.go new file mode 100644 index 00000000000..866b2d704e2 --- /dev/null +++ b/kv/example.go @@ -0,0 +1,436 @@ +// Note: this file is used as a proof of concept for having a generic +// keyvalue store backed by specific implementations of kv.Store. +package kv + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/influxdata/platform" +) + +var ( + exampleBucket = []byte("examplesv1") + exampleIndex = []byte("exampleindexv1") +) + +// ExampleService is an example user like service built on a generic kv store. +type ExampleService struct { + kv Store + idGenerator platform.IDGenerator +} + +// NewExampleService creates an instance of an example service. +func NewExampleService(kv Store, idGen platform.IDGenerator) *ExampleService { + return &ExampleService{ + kv: kv, + idGenerator: idGen, + } +} + +// Initialize creates the buckets for the example service +func (c *ExampleService) Initialize() error { + return c.kv.Update(func(tx Tx) error { + if _, err := tx.Bucket([]byte(exampleBucket)); err != nil { + return err + } + if _, err := tx.Bucket([]byte(exampleIndex)); err != nil { + return err + } + return nil + }) +} + +// FindUserByID retrieves a example by id. +func (c *ExampleService) FindUserByID(ctx context.Context, id platform.ID) (*platform.User, error) { + var u *platform.User + + err := c.kv.View(func(tx Tx) error { + usr, err := c.findUserByID(ctx, tx, id) + if err != nil { + return err + } + u = usr + return nil + }) + + if err != nil { + return nil, &platform.Error{ + Op: platform.OpFindUserByID, + Err: err, + } + } + + return u, nil +} + +func (c *ExampleService) findUserByID(ctx context.Context, tx Tx, id platform.ID) (*platform.User, error) { + encodedID, err := id.Encode() + if err != nil { + return nil, err + } + + b, err := tx.Bucket(exampleBucket) + if err != nil { + return nil, err + } + + v, err := b.Get(encodedID) + if err == ErrKeyNotFound { + return nil, &platform.Error{ + Code: platform.ENotFound, + Msg: "user not found", + } + } + if err != nil { + return nil, err + } + + var u platform.User + if err := json.Unmarshal(v, &u); err != nil { + return nil, err + } + + return &u, nil +} + +// FindUserByName returns a example by name for a particular example. +func (c *ExampleService) FindUserByName(ctx context.Context, n string) (*platform.User, error) { + var u *platform.User + + err := c.kv.View(func(tx Tx) error { + usr, err := c.findUserByName(ctx, tx, n) + if err != nil { + return err + } + u = usr + return nil + }) + + return u, err +} + +func (c *ExampleService) findUserByName(ctx context.Context, tx Tx, n string) (*platform.User, error) { + b, err := tx.Bucket(exampleIndex) + if err != nil { + return nil, err + } + uid, err := b.Get(exampleIndexKey(n)) + if err == ErrKeyNotFound { + return nil, &platform.Error{ + Code: platform.ENotFound, + Msg: "user not found", + Op: platform.OpFindUser, + } + } + if err != nil { + return nil, err + } + + var id platform.ID + if err := id.Decode(uid); err != nil { + return nil, err + } + return c.findUserByID(ctx, tx, id) +} + +// FindUser retrives a example using an arbitrary example filter. +// Filters using ID, or Name should be efficient. +// Other filters will do a linear scan across examples until it finds a match. +func (c *ExampleService) FindUser(ctx context.Context, filter platform.UserFilter) (*platform.User, error) { + if filter.ID != nil { + return c.FindUserByID(ctx, *filter.ID) + } + + if filter.Name != nil { + return c.FindUserByName(ctx, *filter.Name) + } + + filterFn := filterExamplesFn(filter) + + var u *platform.User + err := c.kv.View(func(tx Tx) error { + return forEachExample(ctx, tx, func(usr *platform.User) bool { + if filterFn(usr) { + u = usr + return false + } + return true + }) + }) + + if err != nil { + return nil, err + } + + if u == nil { + return nil, &platform.Error{ + Code: platform.ENotFound, + Msg: "user not found", + } + } + + return u, nil +} + +func filterExamplesFn(filter platform.UserFilter) func(u *platform.User) bool { + if filter.ID != nil { + return func(u *platform.User) bool { + return u.ID.Valid() && u.ID == *filter.ID + } + } + + if filter.Name != nil { + return func(u *platform.User) bool { + return u.Name == *filter.Name + } + } + + return func(u *platform.User) bool { return true } +} + +// FindUsers retrives all examples that match an arbitrary example filter. +// Filters using ID, or Name should be efficient. +// Other filters will do a linear scan across all examples searching for a match. +func (c *ExampleService) FindUsers(ctx context.Context, filter platform.UserFilter, opt ...platform.FindOptions) ([]*platform.User, int, error) { + op := platform.OpFindUsers + if filter.ID != nil { + u, err := c.FindUserByID(ctx, *filter.ID) + if err != nil { + return nil, 0, &platform.Error{ + Err: err, + Op: op, + } + } + + return []*platform.User{u}, 1, nil + } + + if filter.Name != nil { + u, err := c.FindUserByName(ctx, *filter.Name) + if err != nil { + return nil, 0, &platform.Error{ + Err: err, + Op: op, + } + } + + return []*platform.User{u}, 1, nil + } + + us := []*platform.User{} + filterFn := filterExamplesFn(filter) + err := c.kv.View(func(tx Tx) error { + return forEachExample(ctx, tx, func(u *platform.User) bool { + if filterFn(u) { + us = append(us, u) + } + return true + }) + }) + + if err != nil { + return nil, 0, err + } + + return us, len(us), nil +} + +// CreateUser creates a platform example and sets b.ID. +func (c *ExampleService) CreateUser(ctx context.Context, u *platform.User) error { + err := c.kv.Update(func(tx Tx) error { + unique := c.uniqueExampleName(ctx, tx, u) + + if !unique { + // TODO: make standard error + return &platform.Error{ + Code: platform.EConflict, + Msg: fmt.Sprintf("user with name %s already exists", u.Name), + } + } + + u.ID = c.idGenerator.ID() + + return c.putUser(ctx, tx, u) + }) + + if err != nil { + return &platform.Error{ + Err: err, + Op: platform.OpCreateUser, + } + } + + return nil +} + +// PutUser will put a example without setting an ID. +func (c *ExampleService) PutUser(ctx context.Context, u *platform.User) error { + return c.kv.Update(func(tx Tx) error { + return c.putUser(ctx, tx, u) + }) +} + +func (c *ExampleService) putUser(ctx context.Context, tx Tx, u *platform.User) error { + v, err := json.Marshal(u) + if err != nil { + return err + } + encodedID, err := u.ID.Encode() + if err != nil { + return err + } + + idx, err := tx.Bucket(exampleIndex) + if err != nil { + return err + } + + if err := idx.Put(exampleIndexKey(u.Name), encodedID); err != nil { + return err + } + + b, err := tx.Bucket(exampleBucket) + if err != nil { + return err + } + + return b.Put(encodedID, v) +} + +func exampleIndexKey(n string) []byte { + return []byte(n) +} + +// forEachExample will iterate through all examples while fn returns true. +func forEachExample(ctx context.Context, tx Tx, fn func(*platform.User) bool) error { + b, err := tx.Bucket(exampleBucket) + if err != nil { + return err + } + + cur, err := b.Cursor() + if err != nil { + return err + } + + for k, v := cur.First(); k != nil; k, v = cur.Next() { + u := &platform.User{} + if err := json.Unmarshal(v, u); err != nil { + return err + } + if !fn(u) { + break + } + } + + return nil +} + +func (c *ExampleService) uniqueExampleName(ctx context.Context, tx Tx, u *platform.User) bool { + idx, err := tx.Bucket(exampleIndex) + if err != nil { + return false + } + + if _, err := idx.Get(exampleIndexKey(u.Name)); err == ErrKeyNotFound { + return true + } + return false +} + +// UpdateUser updates a example according the parameters set on upd. +func (c *ExampleService) UpdateUser(ctx context.Context, id platform.ID, upd platform.UserUpdate) (*platform.User, error) { + var u *platform.User + err := c.kv.Update(func(tx Tx) error { + usr, err := c.updateUser(ctx, tx, id, upd) + if err != nil { + return err + } + u = usr + return nil + }) + + if err != nil { + return nil, &platform.Error{ + Err: err, + Op: platform.OpUpdateUser, + } + } + + return u, nil +} + +func (c *ExampleService) updateUser(ctx context.Context, tx Tx, id platform.ID, upd platform.UserUpdate) (*platform.User, error) { + u, err := c.findUserByID(ctx, tx, id) + if err != nil { + return nil, err + } + + if upd.Name != nil { + // Examples are indexed by name and so the example index must be pruned + // when name is modified. + idx, err := tx.Bucket(exampleIndex) + if err != nil { + return nil, err + } + + if err := idx.Delete(exampleIndexKey(u.Name)); err != nil { + return nil, err + } + u.Name = *upd.Name + } + + if err := c.putUser(ctx, tx, u); err != nil { + return nil, err + } + + return u, nil +} + +// DeleteUser deletes a example and prunes it from the index. +func (c *ExampleService) DeleteUser(ctx context.Context, id platform.ID) error { + err := c.kv.Update(func(tx Tx) error { + return c.deleteUser(ctx, tx, id) + }) + + if err != nil { + return &platform.Error{ + Op: platform.OpDeleteUser, + Err: err, + } + } + + return nil +} + +func (c *ExampleService) deleteUser(ctx context.Context, tx Tx, id platform.ID) error { + u, err := c.findUserByID(ctx, tx, id) + if err != nil { + return err + } + encodedID, err := id.Encode() + if err != nil { + return err + } + + idx, err := tx.Bucket(exampleIndex) + if err != nil { + return err + } + + if err := idx.Delete(exampleIndexKey(u.Name)); err != nil { + return err + } + + b, err := tx.Bucket(exampleBucket) + if err != nil { + return err + } + if err := b.Delete(encodedID); err != nil { + return err + } + + return nil +} diff --git a/kv/store.go b/kv/store.go new file mode 100644 index 00000000000..cab2ce799e4 --- /dev/null +++ b/kv/store.go @@ -0,0 +1,52 @@ +package kv + +import ( + "context" + "errors" +) + +var ( + // ErrKeyNotFound is the error returned when the key requested is not found. + ErrKeyNotFound = errors.New("key not found") + // ErrTxNotWritable is the error returned when an mutable operation is called during + // a non-writable transaction. + ErrTxNotWritable = errors.New("transaction is not writable") +) + +// Store is an interface for a generic key value store. It is modeled after +// the boltdb database struct. +type Store interface { + // View opens up a transaction that will not write to any data. Implementing interfaces + // should take care to ensure that all view transactions do not mutate any data. + View(func(Tx) error) error + // Update opens up a transaction that will mutate data. + Update(func(Tx) error) error +} + +// Tx is a transaction in the store. +type Tx interface { + Bucket(b []byte) (Bucket, error) + Context() context.Context + WithContext(ctx context.Context) +} + +// Bucket is the abstraction used to perform get/put/delete/get-many operations +// in a key value store. +type Bucket interface { + Get(key []byte) ([]byte, error) + Cursor() (Cursor, error) + // Put should error if the transaction it was called in is not writable. + Put(key, value []byte) error + // Delete should error if the transaction it was called in is not writable. + Delete(key []byte) error +} + +// Cursor is an abstraction for iterating/ranging through data. A concrete implementation +// of a cursor can be found in cursor.go. +type Cursor interface { + Seek(prefix []byte) (k []byte, v []byte) + First() (k []byte, v []byte) + Last() (k []byte, v []byte) + Next() (k []byte, v []byte) + Prev() (k []byte, v []byte) +} diff --git a/testing/kv.go b/testing/kv.go new file mode 100644 index 00000000000..62565dd641a --- /dev/null +++ b/testing/kv.go @@ -0,0 +1,928 @@ +package testing + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/influxdata/platform/kv" +) + +// KVStoreFields are background data that has to be set before +// the test runs. +type KVStoreFields struct { + Bucket []byte + Pairs []kv.Pair +} + +// KVStore tests the key value store contract +func KVStore( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + tests := []struct { + name string + fn func( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, + ) + }{ + { + name: "Get", + fn: KVGet, + }, + { + name: "Put", + fn: KVPut, + }, + { + name: "Delete", + fn: KVDelete, + }, + { + name: "Cursor", + fn: KVCursor, + }, + { + name: "View", + fn: KVView, + }, + { + name: "Update", + fn: KVUpdate, + }, + { + name: "ConcurrentUpdate", + fn: KVConcurrentUpdate, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.fn(init, t) + }) + } +} + +// KVGet tests the get method contract for the key value store. +func KVGet( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + bucket []byte + key []byte + } + type wants struct { + err error + val []byte + } + + tests := []struct { + name string + fields KVStoreFields + args args + wants wants + }{ + { + name: "get key", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + }, + wants: wants{ + val: []byte("world"), + }, + }, + { + name: "get missing key", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{}, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + }, + wants: wants{ + err: kv.ErrKeyNotFound, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, close := init(tt.fields, t) + defer close() + + err := s.View(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + val, err := b.Get(tt.args.key) + if (err != nil) != (tt.wants.err != nil) { + t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) + return err + } + + if err != nil && tt.wants.err != nil { + if err.Error() != tt.wants.err.Error() { + t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) + return err + } + } + + if want, got := tt.wants.val, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + }) + } +} + +// KVPut tests the get method contract for the key value store. +func KVPut( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + bucket []byte + key []byte + val []byte + } + type wants struct { + err error + } + + tests := []struct { + name string + fields KVStoreFields + args args + wants wants + }{ + { + name: "put pair", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{}, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + val: []byte("world"), + }, + wants: wants{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, close := init(tt.fields, t) + defer close() + + err := s.Update(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + { + err := b.Put(tt.args.key, tt.args.val) + if (err != nil) != (tt.wants.err != nil) { + t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) + return err + } + + if err != nil && tt.wants.err != nil { + if err.Error() != tt.wants.err.Error() { + t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) + return err + } + } + + val, err := b.Get(tt.args.key) + if err != nil { + t.Errorf("unexpected error retrieving value: %v", err) + return err + } + + if want, got := tt.args.val, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + }) + } +} + +// KVDelete tests the delete method contract for the key value store. +func KVDelete( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + bucket []byte + key []byte + } + type wants struct { + err error + } + + tests := []struct { + name string + fields KVStoreFields + args args + wants wants + }{ + { + name: "delete key", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + }, + wants: wants{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, close := init(tt.fields, t) + defer close() + + err := s.Update(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + { + err := b.Delete(tt.args.key) + if (err != nil) != (tt.wants.err != nil) { + t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) + return err + } + + if err != nil && tt.wants.err != nil { + if err.Error() != tt.wants.err.Error() { + t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) + return err + } + } + + if _, err := b.Get(tt.args.key); err != kv.ErrKeyNotFound { + t.Errorf("expected key not found error got %v", err) + return err + } + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + }) + } +} + +// KVCursor tests the cursor contract for the key value store. +func KVCursor( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + bucket []byte + seek []byte + } + type wants struct { + err error + first kv.Pair + last kv.Pair + seek kv.Pair + next kv.Pair + prev kv.Pair + } + + tests := []struct { + name string + fields KVStoreFields + args args + wants wants + }{ + { + name: "basic cursor", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("a"), + Value: []byte("1"), + }, + { + Key: []byte("ab"), + Value: []byte("2"), + }, + { + Key: []byte("abc"), + Value: []byte("3"), + }, + { + Key: []byte("abcd"), + Value: []byte("4"), + }, + { + Key: []byte("abcde"), + Value: []byte("5"), + }, + { + Key: []byte("bcd"), + Value: []byte("6"), + }, + { + Key: []byte("cd"), + Value: []byte("7"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + seek: []byte("abc"), + }, + wants: wants{ + first: kv.Pair{ + Key: []byte("a"), + Value: []byte("1"), + }, + last: kv.Pair{ + Key: []byte("cd"), + Value: []byte("7"), + }, + seek: kv.Pair{ + Key: []byte("abc"), + Value: []byte("3"), + }, + next: kv.Pair{ + Key: []byte("abcd"), + Value: []byte("4"), + }, + prev: kv.Pair{ + Key: []byte("abc"), + Value: []byte("3"), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, close := init(tt.fields, t) + defer close() + + err := s.View(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + cur, err := b.Cursor() + if (err != nil) != (tt.wants.err != nil) { + t.Errorf("expected error '%v' got '%v'", tt.wants.err, err) + return err + } + + if err != nil && tt.wants.err != nil { + if err.Error() != tt.wants.err.Error() { + t.Errorf("expected error messages to match '%v' got '%v'", tt.wants.err, err.Error()) + return err + } + } + + { + key, val := cur.First() + if want, got := tt.wants.first.Key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + return err + } + + if want, got := tt.wants.first.Value, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + } + + { + key, val := cur.Last() + if want, got := tt.wants.last.Key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + return err + } + + if want, got := tt.wants.last.Value, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + } + + { + key, val := cur.Seek(tt.args.seek) + if want, got := tt.wants.seek.Key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + return err + } + + if want, got := tt.wants.seek.Value, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + } + + { + key, val := cur.Next() + if want, got := tt.wants.next.Key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + return err + } + + if want, got := tt.wants.next.Value, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + } + + { + key, val := cur.Prev() + if want, got := tt.wants.prev.Key, key; !bytes.Equal(want, got) { + t.Errorf("exptected to get key %s got %s", string(want), string(got)) + return err + } + + if want, got := tt.wants.prev.Value, val; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + }) + } +} + +// KVView tests the view method contract for the key value store. +func KVView( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + bucket []byte + key []byte + // If len(value) == 0 the test will not attempt a put + value []byte + // If true, the test will attempt to delete the provided key + delete bool + } + type wants struct { + value []byte + } + + tests := []struct { + name string + fields KVStoreFields + args args + wants wants + }{ + { + name: "basic view", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("cruel world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + }, + wants: wants{ + value: []byte("cruel world"), + }, + }, + { + name: "basic view with delete", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("cruel world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + delete: true, + }, + wants: wants{ + value: []byte("cruel world"), + }, + }, + { + name: "basic view with put", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("cruel world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + value: []byte("world"), + delete: true, + }, + wants: wants{ + value: []byte("cruel world"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, close := init(tt.fields, t) + defer close() + + err := s.View(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + if len(tt.args.value) != 0 { + err := b.Put(tt.args.key, tt.args.value) + if err == nil { + return fmt.Errorf("expected transaction to fail") + } + if err != kv.ErrTxNotWritable { + return err + } + return nil + } + + value, err := b.Get(tt.args.key) + if err != nil { + return err + } + + if want, got := tt.wants.value, value; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + + if tt.args.delete { + err := b.Delete(tt.args.key) + if err == nil { + return fmt.Errorf("expected transaction to fail") + } + if err != kv.ErrTxNotWritable { + return err + } + return nil + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + }) + } +} + +// KVUpdate tests the update method contract for the key value store. +func KVUpdate( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + bucket []byte + key []byte + value []byte + delete bool + } + type wants struct { + value []byte + } + + tests := []struct { + name string + fields KVStoreFields + args args + wants wants + }{ + { + name: "basic update", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("cruel world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + value: []byte("world"), + }, + wants: wants{ + value: []byte("world"), + }, + }, + { + name: "basic update with delete", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("cruel world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + value: []byte("world"), + delete: true, + }, + wants: wants{}, + }, + // TODO: add case with failed update transaction that doesn't apply all of the changes. + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, close := init(tt.fields, t) + defer close() + + { + err := s.Update(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + if len(tt.args.value) != 0 { + err := b.Put(tt.args.key, tt.args.value) + if err != nil { + return err + } + } + + if tt.args.delete { + err := b.Delete(tt.args.key) + if err != nil { + return err + } + } + + value, err := b.Get(tt.args.key) + if tt.args.delete { + if err != kv.ErrKeyNotFound { + return fmt.Errorf("expected key not found") + } + return nil + } else if err != nil { + return err + } + + if want, got := tt.wants.value, value; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + + return nil + }) + + if err != nil { + t.Fatalf("error during update transaction: %v", err) + } + } + + { + err := s.View(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + value, err := b.Get(tt.args.key) + if tt.args.delete { + if err != kv.ErrKeyNotFound { + return fmt.Errorf("expected key not found") + } + } else if err != nil { + return err + } + + if want, got := tt.wants.value, value; !bytes.Equal(want, got) { + t.Errorf("exptected to get value %s got %s", string(want), string(got)) + return err + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + } + }) + } +} + +// KVConcurrentUpdate tests concurrent calls to update. +func KVConcurrentUpdate( + init func(KVStoreFields, *testing.T) (kv.Store, func()), + t *testing.T, +) { + type args struct { + bucket []byte + key []byte + valueA []byte + valueB []byte + } + type wants struct { + value []byte + } + + tests := []struct { + name string + fields KVStoreFields + args args + wants wants + }{ + { + name: "basic concurrent update", + fields: KVStoreFields{ + Bucket: []byte("bucket"), + Pairs: []kv.Pair{ + { + Key: []byte("hello"), + Value: []byte("cruel world"), + }, + }, + }, + args: args{ + bucket: []byte("bucket"), + key: []byte("hello"), + valueA: []byte("world"), + valueB: []byte("darkness my new friend"), + }, + wants: wants{ + value: []byte("darkness my new friend"), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, closeFn := init(tt.fields, t) + defer closeFn() + + errCh := make(chan error) + var fn = func(v []byte) { + err := s.Update(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + return err + } + + if err := b.Put(tt.args.key, v); err != nil { + return err + } + + return nil + }) + + if err != nil { + errCh <- fmt.Errorf("error during update transaction: %v", err) + } else { + errCh <- nil + } + } + go fn(tt.args.valueA) + // To ensure that a is scheduled before b + time.Sleep(time.Millisecond) + go fn(tt.args.valueB) + + count := 0 + for err := range errCh { + count++ + if err != nil { + t.Fatal(err) + } + if count == 2 { + break + } + } + + close(errCh) + + { + err := s.View(func(tx kv.Tx) error { + b, err := tx.Bucket(tt.args.bucket) + if err != nil { + t.Errorf("unexpected error retrieving bucket: %v", err) + return err + } + + deadline := time.Now().Add(1 * time.Second) + var returnErr error + for { + if time.Now().After(deadline) { + break + } + + value, err := b.Get(tt.args.key) + if err != nil { + return err + } + + if want, got := tt.wants.value, value; !bytes.Equal(want, got) { + returnErr = fmt.Errorf("exptected to get value %s got %s", string(want), string(got)) + } else { + returnErr = nil + break + } + } + + if returnErr != nil { + return returnErr + } + + return nil + }) + + if err != nil { + t.Fatalf("error during view transaction: %v", err) + } + } + }) + } +} diff --git a/user.go b/user.go index a2acbef52c7..9a55f4d433c 100644 --- a/user.go +++ b/user.go @@ -1,6 +1,8 @@ package platform -import "context" +import ( + "context" +) // User is a user. 🎉 type User struct {