Skip to content

Commit

Permalink
feat(platform): add generic kv store
Browse files Browse the repository at this point in the history
Co-authored-by: Leonardo Di Donato <[email protected]>
Co-authored-by: Michael Desa <[email protected]>

feat(kv): add kv store interface for services

feat(bolt): add boltdb implementation of kv.Store

spike(platform): add kv backed user service

feat(kv): add static cursor

Note here that this operation cannot be transactionally done. This poses
a bit of issues that will need to be worked out.

fix(bolt): use error explicit error message

squash: play with interface a bit

fix(kv): remove commit and rollback from kv interface

feat(inmem): add inmem kv store

chore: add note for inmem transactions

fix(bolt): remove call to tx in kv store tests

feat(kv): add tests for static cursor

doc(kv): add comments to store and associated interfaces

doc(bolt): add comments to key value store

feat(testing): add kv store tests

test(testing): add conformance test for kv.Store

test(inmem): add kv.Store conformance tests

doc(inmem): add comments to key value store

feat(inmem): remove CreateBucketIfNotExists from Tx interface

feat(bolt): remove CreateBucketIfNotExists from Tx

feat(inmem): remove CreateBucketIfNotExists from Tx

doc(kv): add note to bucket interface about conditions methods can be called

feat(kv): add context methods to kv.Tx

feat(bolt): add context methods to bolt.Tx

feat(inmem): add context methods to inmem.Tx

test(kv): add contract tests for view/update transactions

feat(kv): ensure that static cursor is always valid

Co-authored-by: Leonardo Di Donato <[email protected]>
Co-authored-by: Michael Desa <[email protected]>

fix(kv): remove error from cursor methods

test(kv): remove want errors from cursor test

test(testing): add concurrent update test for kv.Store

feat(kv): make kv user service an example service

fix(testing): add concurrnent update test to the kv.Store contract tests

test(platform): fix example kv service tests

dep(platform): make platform tidy
  • Loading branch information
desa committed Dec 18, 2018
1 parent b033bd2 commit 28cea4d
Show file tree
Hide file tree
Showing 14 changed files with 2,357 additions and 12 deletions.
21 changes: 21 additions & 0 deletions bolt/bbolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,24 @@ func TestClientOpen(t *testing.T) {
t.Fatalf("unable to close database %s: %v", boltFile, err)
}
}

func NewTestKVStore() (*bolt.KVStore, func(), error) {
f, err := ioutil.TempFile("", "influxdata-platform-bolt-")
if err != nil {
return nil, nil, errors.New("unable to open temporary boltdb file")
}
f.Close()

path := f.Name()
s := bolt.NewKVStore(path)
if err := s.Open(context.TODO()); err != nil {
return nil, nil, err
}

close := func() {
s.Close()
os.Remove(path)
}

return s, close, nil
}
219 changes: 219 additions & 0 deletions bolt/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package bolt

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

bolt "github.com/coreos/bbolt"
"github.com/influxdata/platform/kv"
"go.uber.org/zap"
)

// KVStore is a kv.Store backed by boltdb.
type KVStore struct {
path string
db *bolt.DB
logger *zap.Logger
}

// NewKVStore returns an instance of KVStore with the file at
// the provided path.
func NewKVStore(path string) *KVStore {
return &KVStore{
path: path,
logger: zap.NewNop(),
}
}

// Open creates boltDB file it doesn't exists and opens it otherwise.
func (s *KVStore) Open(ctx context.Context) error {
// Ensure the required directory structure exists.
if err := os.MkdirAll(filepath.Dir(s.path), 0700); err != nil {
return fmt.Errorf("unable to create directory %s: %v", s.path, err)
}

if _, err := os.Stat(s.path); err != nil && !os.IsNotExist(err) {
return err
}

// Open database file.
db, err := bolt.Open(s.path, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return fmt.Errorf("unable to open boltdb file %v", err)
}
s.db = db

s.logger.Info("Resources opened", zap.String("path", s.path))
return nil
}

// Close the connection to the bolt database
func (s *KVStore) Close() error {
if s.db != nil {
return s.db.Close()
}
return nil
}

// WithLogger sets the logger on the store.
func (s *KVStore) WithLogger(l *zap.Logger) {
s.logger = l
}

// WithDB sets the boltdb on the store.
func (s *KVStore) WithDB(db *bolt.DB) {
s.db = db
}

// View opens up a view transaction against the store.
func (s *KVStore) View(fn func(tx kv.Tx) error) error {
return s.db.View(func(tx *bolt.Tx) error {
return fn(&Tx{
tx: tx,
ctx: context.Background(),
})
})
}

// Update opens up an update transaction against the store.
func (s *KVStore) Update(fn func(tx kv.Tx) error) error {
return s.db.Update(func(tx *bolt.Tx) error {
return fn(&Tx{
tx: tx,
ctx: context.Background(),
})
})
}

// Tx is a light wrapper around a boltdb transaction. It implements kv.Tx.
type Tx struct {
tx *bolt.Tx
ctx context.Context
}

// Context returns the context for the transaction.
func (tx *Tx) Context() context.Context {
return tx.ctx
}

// WithContext sets the context for the transaction.
func (tx *Tx) WithContext(ctx context.Context) {
tx.ctx = ctx
}

// createBucketIfNotExists creates a bucket with the provided byte slice.
func (tx *Tx) createBucketIfNotExists(b []byte) (*Bucket, error) {
bkt, err := tx.tx.CreateBucketIfNotExists(b)
if err != nil {
return nil, err
}
return &Bucket{
bucket: bkt,
}, nil
}

// Bucket retrieves the bucket named b.
func (tx *Tx) Bucket(b []byte) (kv.Bucket, error) {
bkt := tx.tx.Bucket(b)
if bkt == nil {
return tx.createBucketIfNotExists(b)
}
return &Bucket{
bucket: bkt,
}, nil
}

// Bucket implements kv.Bucket.
type Bucket struct {
bucket *bolt.Bucket
}

// Get retrieves the value at the provided key.
func (b *Bucket) Get(key []byte) ([]byte, error) {
val := b.bucket.Get(key)
if len(val) == 0 {
return nil, kv.ErrKeyNotFound
}

return val, nil
}

// Put sets the value at the provided key.
func (b *Bucket) Put(key []byte, value []byte) error {
err := b.bucket.Put(key, value)
if err == bolt.ErrTxNotWritable {
return kv.ErrTxNotWritable
}
return err
}

// Delete removes the provided key.
func (b *Bucket) Delete(key []byte) error {
err := b.bucket.Delete(key)
if err == bolt.ErrTxNotWritable {
return kv.ErrTxNotWritable
}
return err
}

// Cursor retrieves a cursor for iterating through the entries
// in the key value store.
func (b *Bucket) Cursor() (kv.Cursor, error) {
return &Cursor{
cursor: b.bucket.Cursor(),
}, nil
}

// Cursor is a struct for iterating through the entries
// in the key value store.
type Cursor struct {
cursor *bolt.Cursor
}

// Seek seeks for the first key that matches the prefix provided.
func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
k, v := c.cursor.Seek(prefix)
if len(v) == 0 {
return nil, nil
}
return k, v
}

// First retrieves the first key value pair in the bucket.
func (c *Cursor) First() ([]byte, []byte) {
k, v := c.cursor.First()
if len(v) == 0 {
return nil, nil
}
return k, v
}

// Last retrieves the last key value pair in the bucket.
func (c *Cursor) Last() ([]byte, []byte) {
k, v := c.cursor.Last()
if len(v) == 0 {
return nil, nil
}
return k, v
}

// Next retrieves the next key in the bucket.
func (c *Cursor) Next() ([]byte, []byte) {
k, v := c.cursor.Next()
if len(v) == 0 {
return nil, nil
}
return k, v
}

// Prev retrieves the previous key in the bucket.
func (c *Cursor) Prev() ([]byte, []byte) {
k, v := c.cursor.Prev()
if len(v) == 0 {
return nil, nil
}
return k, v
}
92 changes: 92 additions & 0 deletions bolt/kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package bolt_test

import (
"context"
"testing"

"github.com/influxdata/platform"
"github.com/influxdata/platform/kv"
platformtesting "github.com/influxdata/platform/testing"
)

func initKVStore(f platformtesting.KVStoreFields, t *testing.T) (kv.Store, func()) {
s, closeFn, err := NewTestKVStore()
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}

err = s.Update(func(tx kv.Tx) error {
b, err := tx.Bucket(f.Bucket)
if err != nil {
return err
}

for _, p := range f.Pairs {
if err := b.Put(p.Key, p.Value); err != nil {
return err
}
}

return nil
})
if err != nil {
t.Fatalf("failed to put keys: %v", err)
}
return s, func() {
closeFn()
}
}

func TestKVStore(t *testing.T) {
platformtesting.KVStore(initKVStore, t)
}

func initExampleService(f platformtesting.UserFields, t *testing.T) (platform.UserService, string, func()) {
s, closeFn, err := NewTestKVStore()
if err != nil {
t.Fatalf("failed to create new kv store: %v", err)
}
svc := kv.NewExampleService(s, f.IDGenerator)
if err := svc.Initialize(); err != nil {
t.Fatalf("error initializing user service: %v", err)
}

ctx := context.Background()
for _, u := range f.Users {
if err := svc.PutUser(ctx, u); err != nil {
t.Fatalf("failed to populate users")
}
}
return svc, "", func() {
defer closeFn()
for _, u := range f.Users {
if err := svc.DeleteUser(ctx, u.ID); err != nil {
t.Logf("failed to remove users: %v", err)
}
}
}
}

func TestExampleService_CreateUser(t *testing.T) {
platformtesting.CreateUser(initExampleService, t)
}

func TestExampleService_FindUserByID(t *testing.T) {
platformtesting.FindUserByID(initExampleService, t)
}

func TestExampleService_FindUsers(t *testing.T) {
platformtesting.FindUsers(initExampleService, t)
}

func TestExampleService_DeleteUser(t *testing.T) {
platformtesting.DeleteUser(initExampleService, t)
}

func TestExampleService_FindUser(t *testing.T) {
platformtesting.FindUser(initExampleService, t)
}

func TestExampleService_UpdateUser(t *testing.T) {
platformtesting.UpdateUser(initExampleService, t)
}
5 changes: 3 additions & 2 deletions bolt/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import (
"testing"

"github.com/influxdata/platform"
bolt "github.com/influxdata/platform/bolt"
"github.com/influxdata/platform/bolt"
platformtesting "github.com/influxdata/platform/testing"
)

func initUserService(f platformtesting.UserFields, t *testing.T) (platform.UserService, string, func()) {
c, closeFn, err := NewTestClient()
if err != nil {
t.Fatalf("failed to create new bolt client: %v", err)
t.Fatalf("failed to create new kv store: %v", err)
}
c.IDGenerator = f.IDGenerator

ctx := context.Background()
for _, u := range f.Users {
if err := c.PutUser(ctx, u); err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ require (
github.com/campoy/unique v0.0.0-20180121183637-88950e537e7e // indirect
github.com/cenkalti/backoff v2.0.0+incompatible // indirect
github.com/cespare/xxhash v1.1.0
github.com/circonus-labs/circonus-gometrics v2.2.4+incompatible // indirect
github.com/circonus-labs/circonusllhist v0.1.1 // indirect
github.com/circonus-labs/circonus-gometrics v2.2.5+incompatible // indirect
github.com/circonus-labs/circonusllhist v0.1.3 // indirect
github.com/containerd/continuity v0.0.0-20181027224239-bea7585dbfac // indirect
github.com/coreos/bbolt v1.3.1-coreos.6
github.com/davecgh/go-spew v1.1.1
Expand All @@ -50,6 +50,7 @@ require (
github.com/gocql/gocql v0.0.0-20181117210152-33c0e89ca93a // indirect
github.com/gogo/protobuf v1.1.1
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
github.com/google/go-cmp v0.2.0
github.com/google/go-github v17.0.0+incompatible
github.com/google/go-querystring v1.0.0 // indirect
Expand Down Expand Up @@ -91,7 +92,7 @@ require (
github.com/mattn/go-isatty v0.0.4
github.com/mattn/go-zglob v0.0.0-20180803001819-2ea3427bfa53 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/miekg/dns v1.0.15 // indirect
github.com/miekg/dns v1.1.1 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/go-testing-interface v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
Expand Down
Loading

0 comments on commit 28cea4d

Please sign in to comment.