Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Merge pull request #42 from RSE-Cambridge/fix-watching
Browse files Browse the repository at this point in the history
Fix dacd watching
  • Loading branch information
JohnGarbutt authored Jan 3, 2019
2 parents 90b482f + 27ec3bc commit 6e41214
Show file tree
Hide file tree
Showing 22 changed files with 609 additions and 119 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# limitations under the License.


all: deps buildlocal test format
all: deps buildlocal format test

buildlocal:
mkdir -p `pwd`/bin
Expand All @@ -24,8 +24,8 @@ format:
test:
mkdir -p `pwd`/bin
./build/rebuild_mocks.sh
go test -cover -race -coverprofile=./bin/coverage.txt ./...
go vet ./...
go test -cover -race -coverprofile=./bin/coverage.txt ./...

test-func:
./build/func_test.sh
Expand Down
23 changes: 0 additions & 23 deletions cmd/dac-func-test/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,6 @@ func testGetBricks(poolRegistry registry.PoolRegistry) {
}
}

func testAllocateBricks(poolRegistry registry.PoolRegistry) {
poolRegistry.WatchHostBrickAllocations("foo", func(old *registry.BrickAllocation,
new *registry.BrickAllocation) {
log.Printf("**Allocation update. Old: %+v New: %+v", old, new)
if new.DeallocateRequested {
log.Printf("requested clean of: %d:%s", new.AllocatedIndex, new.Device)
}
})
//allocations := []registry.BrickAllocation{
// {Hostname: "foo", Device: "vbdb1", AllocatedVolume: "vol1"},
// {Hostname: "foo", Device: "nvme3n1", AllocatedVolume: "vol1"},
//}
// TODO: create a volume to get the bricks allocated from?
//if err := poolRegistry.AllocateBricks(allocations); err != nil {
// log.Fatal(err)
//}
if err := poolRegistry.DeallocateBricks("vol1"); err != nil {
log.Fatal(err)
}
log.Println("asdf")
}

func testGetAllocations(poolRegistry registry.PoolRegistry) {
allocations, err := poolRegistry.GetAllocationsForHost("foo")
if err != nil {
Expand Down Expand Up @@ -132,7 +110,6 @@ func TestKeystorePoolRegistry(keystore keystoreregistry.Keystore) {
poolRegistry := keystoreregistry.NewPoolRegistry(keystore)
testUpdateHost(poolRegistry)
testGetBricks(poolRegistry)
testAllocateBricks(poolRegistry)
testGetAllocations(poolRegistry)
testDeleteAllocations(poolRegistry)
testKeepHostAlive(poolRegistry)
Expand Down
5 changes: 1 addition & 4 deletions cmd/dac-func-test/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ func TestKeystoreVolumeRegistry(keystore keystoreregistry.Keystore) {
}

func testVolumeCRUD(volRegistry registry.VolumeRegistry) {
volRegistry.WatchVolumeChanges("asdf", func(old *registry.Volume, new *registry.Volume) bool {
log.Printf("Volume update detected. old: %s new: %s", old.State, new.State)
return false
})
// TODO: test get volume changes?

volume := registry.Volume{Name: "asdf", State: registry.Registered, JobName: "foo", SizeBricks: 2, SizeGB: 200}
volume2 := registry.Volume{Name: "asdf2", JobName: "foo", SizeBricks: 3, SizeGB: 300}
Expand Down
3 changes: 3 additions & 0 deletions cmd/dacctl/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func (*stubKeystore) KeepAliveKey(key string) error {
func (*stubKeystore) NewMutex(lockKey string) (keystoreregistry.Mutex, error) {
panic("implement me")
}
func (*stubKeystore) Watch(ctxt context.Context, key string, withPrefix bool) keystoreregistry.KeyValueUpdateChan {
panic("implement me")
}

type stubDacctlActions struct{}

Expand Down
84 changes: 78 additions & 6 deletions internal/pkg/etcdregistry/keystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,19 @@ func newEtcdClient() *clientv3.Client {

func NewKeystore() keystoreregistry.Keystore {
cli := newEtcdClient()
return &etcKeystore{cli}
return &etcKeystore{
Watcher: cli.Watcher,
KV: cli.KV,
Lease: cli.Lease,
Client: cli,
}
}

type etcKeystore struct {
*clientv3.Client
Watcher clientv3.Watcher
KV clientv3.KV
Lease clientv3.Lease
Client *clientv3.Client
}

func (client *etcKeystore) NewMutex(lockKey string) (keystoreregistry.Mutex, error) {
Expand Down Expand Up @@ -97,6 +105,10 @@ func handleError(err error) {
}
}

func (client *etcKeystore) Close() error {
return client.Client.Close()
}

func (client *etcKeystore) runTransaction(ifOps []clientv3.Cmp, thenOps []clientv3.Op) error {
kvc := clientv3.NewKV(client.Client)
kvc.Txn(context.Background())
Expand Down Expand Up @@ -195,7 +207,7 @@ func (client *etcKeystore) Get(key string) (keystoreregistry.KeyValueVersion, er

func (client *etcKeystore) WatchPrefix(prefix string,
onUpdate func(old *keystoreregistry.KeyValueVersion, new *keystoreregistry.KeyValueVersion)) {
rch := client.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV())
rch := client.Client.Watch(context.Background(), prefix, clientv3.WithPrefix(), clientv3.WithPrevKV())
go func() {
for wresp := range rch {
for _, ev := range wresp.Events {
Expand All @@ -213,7 +225,7 @@ func (client *etcKeystore) WatchPrefix(prefix string,

func (client *etcKeystore) WatchKey(ctxt context.Context, key string,
onUpdate func(old *keystoreregistry.KeyValueVersion, new *keystoreregistry.KeyValueVersion)) {
rch := client.Watch(ctxt, key, clientv3.WithPrevKV())
rch := client.Client.Watch(ctxt, key, clientv3.WithPrevKV())
go func() {
for watchResponse := range rch {
for _, ev := range watchResponse.Events {
Expand All @@ -232,6 +244,66 @@ func (client *etcKeystore) WatchKey(ctxt context.Context, key string,
}()
}

// TODO: this needs fixing up
func (client *etcKeystore) WatchForCondition(ctxt context.Context, key string, fromRevision int64,
check func(update keystoreregistry.KeyValueUpdate) bool) (bool, error) {

// check key is present and find revision of the last update
initialValue, err := client.Get(key)
if err != nil {
return false, err
}
if fromRevision < initialValue.CreateRevision {
return false, errors.New("incorrect fromRevision")
}

// no deadline set, so add default timeout of 10 mins
var cancelFunc context.CancelFunc
_, ok := ctxt.Deadline()
if !ok {
ctxt, cancelFunc = context.WithTimeout(ctxt, time.Minute*10)
}

// open channel with etcd, starting with the last revision of the key from above
rch := client.Client.Watch(ctxt, key, clientv3.WithPrefix(), clientv3.WithRev(fromRevision))
if rch == nil {
cancelFunc()
return false, errors.New("no watcher returned from etcd")
}

conditionMet := false
go func() {
for watchResponse := range rch {
// TODO: this should instead use Watch from above!
for _, ev := range watchResponse.Events {
update := keystoreregistry.KeyValueUpdate{
New: getKeyValueVersion(ev.Kv),
Old: getKeyValueVersion(ev.PrevKv),
}

// show deleted by returning nil for new
isKeyDeleted := false
if ev.Type == clientv3.EventTypeDelete {
update.New = nil
isKeyDeleted = true
}

conditionMet := check(update)

// stop watching if the condition passed or key was deleted
if conditionMet || isKeyDeleted {
cancelFunc()
return
}
}
}
// Assuming we get here when the context is cancelled or hits its timeout
// i.e. there are no more events, so we close the channel
}()

return conditionMet, nil
}

func (client *etcKeystore) KeepAliveKey(key string) error {
kvc := clientv3.NewKV(client.Client)

Expand All @@ -243,7 +315,7 @@ func (client *etcKeystore) KeepAliveKey(key string) error {

// TODO what about configure timeout and ttl?
var ttl int64 = 10
grantResponse, err := client.Grant(context.Background(), ttl)
grantResponse, err := client.Client.Grant(context.Background(), ttl)
if err != nil {
log.Fatal(err)
}
Expand All @@ -258,7 +330,7 @@ func (client *etcKeystore) KeepAliveKey(key string) error {
return fmt.Errorf("unable to create keep-alive key: %s", key)
}

ch, err := client.KeepAlive(context.Background(), leaseID)
ch, err := client.Client.KeepAlive(context.Background(), leaseID)
if err != nil {
log.Fatal(err)
}
Expand Down
52 changes: 52 additions & 0 deletions internal/pkg/etcdregistry/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package etcdregistry

import (
"context"
"github.com/RSE-Cambridge/data-acc/internal/pkg/keystoreregistry"
"github.com/coreos/etcd/clientv3"
)

func (client *etcKeystore) Watch(ctxt context.Context, key string, withPrefix bool) keystoreregistry.KeyValueUpdateChan {
options := []clientv3.OpOption{clientv3.WithPrevKV()}
if withPrefix {
options = append(options, clientv3.WithPrefix())
}
rch := client.Watcher.Watch(ctxt, key, options...)

c := make(chan keystoreregistry.KeyValueUpdate)

go processWatchEvents(rch, c)

return c
}

func processWatchEvents(watchChan clientv3.WatchChan, c chan keystoreregistry.KeyValueUpdate) {
for watchResponse := range watchChan {
// if error, send empty update with an error
err := watchResponse.Err()
if err != nil {
c <- keystoreregistry.KeyValueUpdate{Err: err}
}

// send all events in this watch response
for _, ev := range watchResponse.Events {
update := keystoreregistry.KeyValueUpdate{
IsCreate: ev.IsCreate(),
IsModify: ev.IsModify(),
IsDelete: ev.Type == clientv3.EventTypeDelete,
}
if update.IsCreate || update.IsModify {
update.New = getKeyValueVersion(ev.Kv)
}
if update.IsDelete || update.IsModify {
update.Old = getKeyValueVersion(ev.PrevKv)
}

c <- update
}
}

// Assuming we get here when the context is cancelled or hits its timeout
// i.e. there are no more events, so we close the channel
close(c)
}
119 changes: 119 additions & 0 deletions internal/pkg/etcdregistry/watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package etcdregistry

import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/stretchr/testify/assert"
"testing"
)

type fakeWatcher struct {
t *testing.T
ch clientv3.WatchChan
opts []clientv3.OpOption
}

func (fw fakeWatcher) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
assert.Equal(fw.t, "key", key)
assert.EqualValues(fw.t, len(fw.opts), len(opts)) // TODO: how to assert this properly?
return fw.ch
}
func (fakeWatcher) Close() error {
panic("implement me")
}

func TestEtcKeystore_Watch_Nil(t *testing.T) {
keystore := etcKeystore{
Watcher: fakeWatcher{
t: t, ch: nil,
opts: []clientv3.OpOption{clientv3.WithPrevKV()},
},
}

response := keystore.Watch(context.TODO(), "key", false)

assert.Empty(t, response)
}

func TestEtcKeystore_Watch(t *testing.T) {
ch := make(chan clientv3.WatchResponse)

keystore := etcKeystore{
Watcher: fakeWatcher{
t: t, ch: ch,
opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithPrevKV()},
},
}

go func() {
ch <- clientv3.WatchResponse{
Events: []*clientv3.Event{
{Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("key1")}},
{Type: clientv3.EventTypePut, Kv: &mvccpb.KeyValue{Key: []byte("key2")}},
}}
ch <- clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{ModRevision: 1, Key: []byte("key2")},
PrevKv: &mvccpb.KeyValue{ModRevision: 1, Key: []byte("key2")},
},
}}
ch <- clientv3.WatchResponse{
Events: []*clientv3.Event{
{Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key: []byte("key2")}},
{Type: clientv3.EventTypeDelete, PrevKv: &mvccpb.KeyValue{Key: []byte("key1")}},
}}
ch <- clientv3.WatchResponse{Canceled: true}
close(ch)
}()

response := keystore.Watch(context.TODO(), "key", true)

ev1 := <-response
assert.True(t, ev1.IsCreate)
assert.False(t, ev1.IsModify)
assert.False(t, ev1.IsDelete)
assert.Nil(t, ev1.Old)
assert.EqualValues(t, "key1", ev1.New.Key)

ev2 := <-response
assert.True(t, ev2.IsCreate)
assert.False(t, ev2.IsModify)
assert.False(t, ev2.IsDelete)
assert.Nil(t, ev2.Old)
assert.EqualValues(t, "key2", ev2.New.Key)

ev3 := <-response
assert.False(t, ev3.IsCreate)
assert.True(t, ev3.IsModify)
assert.False(t, ev3.IsDelete)
assert.EqualValues(t, "key2", ev3.New.Key)
assert.EqualValues(t, "key2", ev3.Old.Key)

ev4 := <-response
assert.False(t, ev4.IsCreate)
assert.False(t, ev4.IsModify)
assert.True(t, ev4.IsDelete)
assert.Nil(t, ev4.New)
assert.EqualValues(t, "key2", ev4.Old.Key)

ev5 := <-response
assert.False(t, ev5.IsCreate)
assert.False(t, ev5.IsModify)
assert.True(t, ev5.IsDelete)
assert.Nil(t, ev5.New)
assert.EqualValues(t, "key1", ev5.Old.Key)

ev6 := <-response
assert.Equal(t,
"etcdserver: mvcc: required revision is a future revision",
ev6.Err.Error())

// Check channels are closed
_, ok := <-response
assert.False(t, ok)
_, ok = <-ch
assert.False(t, ok)
}
Loading

0 comments on commit 6e41214

Please sign in to comment.