Skip to content

Commit

Permalink
[m3admin] Add topic client (m3db#190)
Browse files Browse the repository at this point in the history
Allows for automation of topics in the future.

- Update M3 dependency version.
- Support per-request headers in m3admin client (necessary to send topic
  name).
  • Loading branch information
schallert authored Jan 31, 2020
1 parent 1cc2f05 commit bf57144
Show file tree
Hide file tree
Showing 16 changed files with 808 additions and 18 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ test-all: clean-all install-tools verify-gen lint metalint test-all-gen bins tes
.PHONY: test
test: install-tools test-base
@echo "--- $@"
@PATH=$(combined_bin_paths):$(PATH) gocov convert $(coverfile) | gocov report
@$(tools_bin_path)/gocov convert $(coverfile) | $(tools_bin_path)/gocov report

.PHONY: test-no-deps
test-no-deps: test-base
@echo "--- $@"
@PATH=$(combined_bin_paths):$(PATH) gocov convert $(coverfile) | gocov report
@$(tools_bin_path)/gocov convert $(coverfile) | $(tools_bin_path)/gocov report

.PHONY: test-e2e
test-e2e:
Expand All @@ -139,7 +139,7 @@ test-ci-unit: install-tools test-base verify-gen
.PHONY: install-tools
install-tools:
@echo "--- $@"
GOBIN=$(tools_bin_path) go install github.com/axw/gocov
GOBIN=$(tools_bin_path) go install github.com/axw/gocov/gocov
GOBIN=$(tools_bin_path) go install github.com/garethr/kubeval
GOBIN=$(tools_bin_path) go install github.com/golang/mock/mockgen
GOBIN=$(tools_bin_path) go install github.com/m3db/build-tools/linters/badtime
Expand Down
1 change: 1 addition & 0 deletions generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// mockgen rules for generating mocks using file mode
//go:generate sh -c "mockgen -package=placement -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/placement/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/placement/types.go"
//go:generate sh -c "mockgen -package=namespace -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/namespace/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/namespace/types.go"
//go:generate sh -c "mockgen -package=topic -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/topic/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/topic/types.go"
//go:generate sh -c "mockgen -package=m3admin -destination=$GOPATH/src/$PACKAGE/pkg/m3admin/client_mock.go -source=$GOPATH/src/$PACKAGE/pkg/m3admin/client.go"
//go:generate sh -c "mockgen -package=m3db -destination=$GOPATH/src/$PACKAGE/pkg/k8sops/m3db/k8sops_mock.go -source=$GOPATH/src/$PACKAGE/pkg/k8sops/m3db/types.go"
//go:generate sh -c "mockgen -package=podidentity -destination=$GOPATH/src/$PACKAGE/pkg/k8sops/podidentity/provider_mock.go -source=$GOPATH/src/$PACKAGE/pkg/k8sops/podidentity/provider.go"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a
github.com/m3db/m3 v0.8.4
github.com/m3db/m3 v0.14.2
github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd
github.com/m3db/prometheus_client_golang v0.8.1 // indirect
github.com/m3db/prometheus_client_model v0.0.0-20180517145114-8b2299a4bf7d // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a h1:CwsSHIJLeCESKdZ844jXg/3rQD3yA5azuVlJBp5w8U8=
github.com/m3db/build-tools v0.0.0-20181013000606-edd1bdd1df8a/go.mod h1:Pk9AtZeKuCO2xcAth0gxwzRNFv4lV26GPSx4I6A7DQ8=
github.com/m3db/m3 v0.8.4 h1:b7yOpeHsdr4WUt9ODYfnE/7pA8B/FwD6RQMJ/2dxMzQ=
github.com/m3db/m3 v0.8.4/go.mod h1:7izI0EeTws4qSNJ1mb1rF0c3PKbQbogUFDsfAgcH2jo=
github.com/m3db/m3 v0.14.2 h1:cdEsX+4f0eufXf/Cn7zibMLngK4ncZYHZlbGeSxIKDQ=
github.com/m3db/m3 v0.14.2/go.mod h1:7izI0EeTws4qSNJ1mb1rF0c3PKbQbogUFDsfAgcH2jo=
github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd h1:wzLBtXzxZM9b6IXwLSRE5crynocLTCuRDpGDaOJzyuI=
github.com/m3db/m3x v0.0.0-20190408051622-ebf3c7b94afd/go.mod h1:zLbcVb352e3Jsg62A6zzEhZ1gumeFsiamTqDs9ZmZrs=
github.com/m3db/prometheus_client_golang v0.8.1 h1:t7w/tcFws81JL1j5sqmpqcOyQOpH4RDOmIe3A3fdN3w=
Expand Down
64 changes: 64 additions & 0 deletions pkg/controller/m3admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
"github.com/m3db/m3db-operator/pkg/m3admin/topic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
m3placement "github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
m3topic "github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/generated/proto/admin"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -44,9 +47,11 @@ type multiAdminClient struct {
mu sync.RWMutex
nsClients map[string]namespace.Client
plClients map[string]placement.Client
tpClients map[string]topic.Client

nsClientFn func(...namespace.Option) (namespace.Client, error)
plClientFn func(...placement.Option) (placement.Client, error)
tpClientFn func(...topic.Option) (topic.Client, error)

clusterKeyFn func(metav1.ObjectMetaAccessor, string) string
clusterURLFn func(metav1.ObjectMetaAccessor) string
Expand Down Expand Up @@ -89,8 +94,10 @@ func newMultiAdminClient(adminOpts []m3admin.Option, logger *zap.Logger) *multiA
return &multiAdminClient{
nsClients: make(map[string]namespace.Client),
plClients: make(map[string]placement.Client),
tpClients: make(map[string]topic.Client),
nsClientFn: namespace.NewClient,
plClientFn: placement.NewClient,
tpClientFn: topic.NewClient,
clusterKeyFn: clusterKey,
clusterURLFn: clusterURL,
adminClientFn: newAdminClient,
Expand Down Expand Up @@ -172,6 +179,39 @@ func (m *multiAdminClient) placementClientForCluster(cluster metav1.ObjectMetaAc
return client
}

func (m *multiAdminClient) topicClientForCluster(cluster metav1.ObjectMetaAccessor) topic.Client {
url := m.clusterURLFn(cluster)
key := m.clusterKeyFn(cluster, url)

m.mu.RLock()
client, ok := m.tpClients[key]
m.mu.RUnlock()
if ok {
return client
}

adminClient := m.adminClientForCluster(cluster)
client, err := m.tpClientFn(
topic.WithClient(adminClient),
topic.WithLogger(m.logger),
topic.WithURL(url),
)
if err != nil {
return newErrorTopicClient(err)
}

m.mu.Lock()
mapClient, ok := m.tpClients[key]
if ok {
client = mapClient
} else {
m.tpClients[key] = client
}
m.mu.Unlock()

return client
}

// errorNamespaceClient implements namespace.Client by returning an error that a
// specified cluster couldn't be found, enabling easier ergonomics for the
// common pattern of looking up a client and returning an error if one is
Expand Down Expand Up @@ -229,3 +269,27 @@ func (c errorPlacementClient) Remove(string) error {
func (c errorPlacementClient) Replace(string, placementpb.Instance) error {
return c.err
}

type errorTopicClient struct {
err error
}

func newErrorTopicClient(err error) topic.Client {
return errorTopicClient{err: err}
}

func (c errorTopicClient) Init(name string, req *admin.TopicInitRequest) error {
return c.err
}

func (c errorTopicClient) Get(topicName string) (m3topic.Topic, error) {
return nil, c.err
}

func (c errorTopicClient) Delete(topicName string) error {
return c.err
}

func (c errorTopicClient) Add(topicName string, consumerSvc *topicpb.ConsumerService) error {
return c.err
}
55 changes: 55 additions & 0 deletions pkg/controller/m3admin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"github.com/m3db/m3db-operator/pkg/m3admin"
"github.com/m3db/m3db-operator/pkg/m3admin/namespace"
"github.com/m3db/m3db-operator/pkg/m3admin/placement"
"github.com/m3db/m3db-operator/pkg/m3admin/topic"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -168,6 +170,38 @@ func TestPlacementClientForCluster(t *testing.T) {
assert.Equal(t, testErr, cl3.Delete())
}

func TestTopicClientForCluster(t *testing.T) {
mc := gomock.NewController(t)
defer mc.Finish()

m3Client := m3admin.NewMockClient(mc)
tpClient := topic.NewMockClient(mc)

m := newTestAdminClient(m3Client, "http://foo")
m.tpClientFn = func(_ ...topic.Option) (topic.Client, error) {
return tpClient, nil
}

clusterA := newM3DBCluster("ns", "a")
clusterB := newM3DBCluster("ns", "b")
clusterC := newM3DBCluster("ns", "c")
testErr := errors.New("test")

cl := m.topicClientForCluster(clusterA)
assert.Equal(t, tpClient, cl)
_ = m.topicClientForCluster(clusterB)
assert.Equal(t, 2, len(m.tpClients))
m.tpClientFn = func(_ ...topic.Option) (topic.Client, error) {
return nil, testErr
}

cl2 := m.topicClientForCluster(clusterA)
assert.Equal(t, cl, cl2)

cl3 := m.topicClientForCluster(clusterC)
assert.Equal(t, testErr, cl3.Delete("topic"))
}

func TestErrorNamespaceClient(t *testing.T) {
clErr := errors.New("test")
cl := newErrorNamespaceClient(clErr)
Expand Down Expand Up @@ -206,3 +240,24 @@ func TestErrorPlacementClient(t *testing.T) {
err = cl.Replace("foo", placementpb.Instance{})
assert.Equal(t, clErr, err)
}

func TestErrorTopicClient(t *testing.T) {
clErr := errors.New("test")
cl := newErrorTopicClient(clErr)

err := cl.Init("topic", nil)
assert.Equal(t, clErr, err)

tp, err := cl.Get("topic")
assert.Nil(t, tp)
assert.Equal(t, clErr, err)

err = cl.Delete("topic")
assert.Equal(t, clErr, err)

err = cl.Add("topic", &topicpb.ConsumerService{})
assert.Equal(t, clErr, err)

err = cl.Delete("topic")
assert.Equal(t, clErr, err)
}
19 changes: 15 additions & 4 deletions pkg/m3admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var (

// Client is an m3admin client.
type Client interface {
DoHTTPRequest(action, url string, data *bytes.Buffer) (*http.Response, error)
DoHTTPJSONPBRequest(action, url string, request, response proto.Message) error
DoHTTPRequest(action, url string, data *bytes.Buffer, opts ...RequestOption) (*http.Response, error)
DoHTTPJSONPBRequest(action, url string, request, response proto.Message, opts ...RequestOption) error
}

type client struct {
Expand Down Expand Up @@ -95,9 +95,13 @@ func NewClient(clientOpts ...Option) Client {
func (c *client) DoHTTPRequest(
action, url string,
data *bytes.Buffer,
options ...RequestOption,
) (*http.Response, error) {

l := c.logger.With(zap.String("action", action), zap.String("url", url))
opts := &reqOptions{}
for _, o := range options {
o.execute(opts)
}

var request *retryhttp.Request
var err error
Expand All @@ -116,6 +120,12 @@ func (c *client) DoHTTPRequest(
}
}

if opts.headers != nil {
for k, v := range opts.headers {
request.Header.Add(k, v)
}
}

request.Header.Add("Content-Type", "application/json")
if c.environment != "" {
request.Header.Add(m3EnvironmentHeader, c.environment)
Expand Down Expand Up @@ -177,6 +187,7 @@ func (c *client) DoHTTPJSONPBRequest(
action, url string,
request proto.Message,
response proto.Message,
opts ...RequestOption,
) error {
var data *bytes.Buffer
if request != nil {
Expand All @@ -186,7 +197,7 @@ func (c *client) DoHTTPJSONPBRequest(
}
}

r, err := c.DoHTTPRequest(action, url, data)
r, err := c.DoHTTPRequest(action, url, data, opts...)
if err != nil {
return err
}
Expand Down
26 changes: 18 additions & 8 deletions pkg/m3admin/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions pkg/m3admin/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,32 @@ func TestClient_DoHTTPRequest_Header(t *testing.T) {
assert.Equal(t, []byte("hello"), readAll(resp.Body))
}

func TestClient_DoHTTPRequest_PerRequestHeader(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("test-header") != "test-val" {
w.WriteHeader(500)
return
}
w.Write([]byte("hello"))
}))
defer s.Close()

readAll := func(r io.Reader) []byte {
data, err := ioutil.ReadAll(r)
assert.NoError(t, err)
return data
}

cl := newTestClient(WithHTTPClient(devNullRetry()))
_, err := cl.DoHTTPRequest("GET", s.URL, nil)
assert.Error(t, err)

resp, err := cl.DoHTTPRequest("GET", s.URL, nil, WithHeader("test-header", "test-val"))
require.NoError(t, err)
assert.Equal(t, resp.StatusCode, http.StatusOK)
assert.Equal(t, []byte("hello"), readAll(resp.Body))
}

func TestClient_DoHTTPRequest_Err(t *testing.T) {
for _, test := range []struct {
code int
Expand Down
25 changes: 25 additions & 0 deletions pkg/m3admin/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,28 @@ func WithEnvironment(e string) Option {
o.environment = e
})
}

// RequestOption defines a per-request option.
type RequestOption interface {
execute(*reqOptions)
}

type reqOptionFn func(o *reqOptions)

func (f reqOptionFn) execute(o *reqOptions) {
f(o)
}

type reqOptions struct {
headers map[string]string
}

// WithHeader adds a header to a request. It can be specified multiple times.
func WithHeader(name, value string) RequestOption {
return reqOptionFn(func(o *reqOptions) {
if o.headers == nil {
o.headers = make(map[string]string)
}
o.headers[name] = value
})
}
Loading

0 comments on commit bf57144

Please sign in to comment.