Skip to content

Commit

Permalink
caduceus round robin loadbalancing (#69)
Browse files Browse the repository at this point in the history
* adding xresolver of caduceus

* adding configuration

* update changelog
  • Loading branch information
kcajmagic authored Feb 25, 2020
1 parent 223e2da commit dc95e3f
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
- add caduceus round robin logic through consul [#69](https://github.com/xmidt-org/talaria/pull/69)

## [v0.2.2]
- bumped webpa-common to v1.6.3 to make sessionID a first class citizen [#120](https://github.com/xmidt-org/talaria/pull/120)
Expand Down
45 changes: 25 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/xmidt-org/webpa-common/service/monitor"
"github.com/xmidt-org/webpa-common/service/servicecfg"
"github.com/xmidt-org/webpa-common/xmetrics"
"github.com/xmidt-org/webpa-common/xresolver/consul"
)

const (
Expand All @@ -52,28 +53,28 @@ var (
BuildTime = "undefined"
)

func newDeviceManager(logger log.Logger, r xmetrics.Registry, v *viper.Viper) (device.Manager, error) {
func newDeviceManager(logger log.Logger, r xmetrics.Registry, v *viper.Viper) (device.Manager, *consul.ConsulWatcher, error) {
deviceOptions, err := device.NewOptions(logger, v.Sub(device.DeviceManagerKey))
if err != nil {
return nil, err
return nil, nil, err
}

outbounder, err := NewOutbounder(logger, v.Sub(OutbounderKey))
outbounder, watcher, err := NewOutbounder(logger, v.Sub(OutbounderKey))
if err != nil {
return nil, err
return nil, nil, err
}

outboundListener, err := outbounder.Start(NewOutboundMeasures(r))
if err != nil {
return nil, err
return nil, nil, err
}

deviceOptions.MetricsProvider = r
deviceOptions.Listeners = []device.Listener{
outboundListener,
}

return device.NewManager(deviceOptions), nil
return device.NewManager(deviceOptions), watcher, nil
}

// talaria is the driver function for Talaria. It performs everything main() would do,
Expand Down Expand Up @@ -107,7 +108,7 @@ func talaria(arguments []string) int {
return 1
}

manager, err := newDeviceManager(logger, metricsRegistry, v)
manager, watcher, err := newDeviceManager(logger, metricsRegistry, v)
if err != nil {
logger.Log(level.Key(), level.ErrorValue(), logging.MessageKey(), "Unable to create device manager", logging.ErrorKey(), err)
return 2
Expand Down Expand Up @@ -150,23 +151,27 @@ func talaria(arguments []string) int {
logger.Log(level.Key(), level.InfoValue(), "configurationFile", v.ConfigFileUsed())
e.Register()

listeners := []monitor.Listener{
monitor.NewMetricsListener(metricsRegistry),
monitor.NewRegistrarListener(logger, e, true),
monitor.NewAccessorListener(e.AccessorFactory(), a.Update),
// this rehasher will handle device disconnects in response to service discovery events
rehasher.New(
manager,
rehasher.WithLogger(logger),
rehasher.WithIsRegistered(e.IsRegistered),
rehasher.WithMetricsProvider(metricsRegistry),
),
}
if watcher != nil {
listeners = append(listeners, watcher)
}

_, err = monitor.New(
monitor.WithLogger(logger),
monitor.WithFilter(monitor.NewNormalizeFilter(e.DefaultScheme())),
monitor.WithEnvironment(e),
monitor.WithListeners(
monitor.NewMetricsListener(metricsRegistry),
monitor.NewRegistrarListener(logger, e, true),
monitor.NewAccessorListener(e.AccessorFactory(), a.Update),

// this rehasher will handle device disconnects in response to service discovery events
rehasher.New(
manager,
rehasher.WithLogger(logger),
rehasher.WithIsRegistered(e.IsRegistered),
rehasher.WithMetricsProvider(metricsRegistry),
),
),
monitor.WithListeners(listeners...),
)

if err != nil {
Expand Down
45 changes: 30 additions & 15 deletions outbounder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/xmidt-org/webpa-common/device"
"github.com/xmidt-org/webpa-common/event"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/xresolver"
"github.com/xmidt-org/webpa-common/xresolver/consul"
)

const (
Expand Down Expand Up @@ -57,25 +59,31 @@ const (
// Outbounder encapsulates the configuration necessary for handling outbound traffic
// and grants the ability to start the outbounding infrastructure.
type Outbounder struct {
Method string `json:"method"`
Retries int `json:"retries"`
RequestTimeout time.Duration `json:"requestTimeout"`
DefaultScheme string `json:"defaultScheme"`
AllowedSchemes []string `json:"allowedSchemes"`
EventEndpoints map[string]interface{} `json:"eventEndpoints"`
OutboundQueueSize uint `json:"outboundQueueSize"`
WorkerPoolSize uint `json:"workerPoolSize"`
Source string `json:"source"`
Transport http.Transport `json:"transport"`
ClientTimeout time.Duration `json:"clientTimeout"`
AuthKey []string `json:"authKey"`
Logger log.Logger `json:"-"`
Method string `json:"method"`
Retries int `json:"retries"`
RequestTimeout time.Duration `json:"requestTimeout"`
DefaultScheme string `json:"defaultScheme"`
AllowedSchemes []string `json:"allowedSchemes"`
EventEndpoints map[string]interface{} `json:"eventEndpoints"`
EnableConsulRoundRobin bool `json:"enableConsulRoundRobin"`
OutboundQueueSize uint `json:"outboundQueueSize"`
WorkerPoolSize uint `json:"workerPoolSize"`
Source string `json:"source"`
Transport http.Transport `json:"transport"`
ClientTimeout time.Duration `json:"clientTimeout"`
AuthKey []string `json:"authKey"`
Logger log.Logger `json:"-"`
}

// NewOutbounder returns an Outbounder unmarshalled from a Viper environment.
// This function allows the Viper instance to be nil, in which case a default
// Outbounder is returned.
func NewOutbounder(logger log.Logger, v *viper.Viper) (o *Outbounder, err error) {
func NewOutbounder(logger log.Logger, v *viper.Viper) (o *Outbounder, watcher *consul.ConsulWatcher, err error) {
options := consul.Options{
Watch: make(map[string]string),
Logger: logger,
}

o = &Outbounder{
Method: DefaultMethod,
RequestTimeout: DefaultRequestTimeout,
Expand All @@ -95,7 +103,14 @@ func NewOutbounder(logger log.Logger, v *viper.Viper) (o *Outbounder, err error)
if v != nil {
err = v.Unmarshal(o)
}

if o.EnableConsulRoundRobin {
logging.Info(o.Logger).Log(logging.MessageKey(), "Using consul round robin on service discover", "service", "caduceus")
for k := range o.EventEndpoints {
options.Watch[k] = "caduceus"
}
watcher = consul.NewConsulWatcher(options)
o.Transport.DialContext = xresolver.NewResolver(xresolver.DefaultDialer, watcher).DialContext
}
return
}

Expand Down
8 changes: 4 additions & 4 deletions outbounder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func ExampleOutbounder() {
return
}

o, err := NewOutbounder(logging.DefaultLogger(), v)
o, _, err := NewOutbounder(logging.DefaultLogger(), v)
if err != nil {
fmt.Println(err)
return
Expand Down Expand Up @@ -115,11 +115,11 @@ func ExampleOutbounder() {

func testOutbounderDefaults(t *testing.T) {
require := require.New(t)
nilViper, err := NewOutbounder(nil, nil)
nilViper, _, err := NewOutbounder(nil, nil)
require.NotNil(nilViper)
require.NoError(err)

withViper, err := NewOutbounder(nil, viper.New())
withViper, _, err := NewOutbounder(nil, viper.New())
require.NotNil(withViper)
require.NoError(err)

Expand Down Expand Up @@ -179,7 +179,7 @@ func testOutbounderConfiguration(t *testing.T) {
v.SetConfigType("json")
require.NoError(v.ReadConfig(bytes.NewReader(configuration)))

o, err := NewOutbounder(logger, v)
o, _, err := NewOutbounder(logger, v)
require.NotNil(o)
require.NoError(err)

Expand Down
36 changes: 36 additions & 0 deletions talaria.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ device:
eventEndpoints:
default: http://caduceus:6000/api/v3/notify

# enableConsulRoundRobin will overwrite the eventEndpoints with using consul to discover the caduceus in the datacenter.
# NOTE: eventEndpoints still must be set, and in the service section of this config caduceus must be added to the list
# of services to watch.
# if no services are found, talaria will fail back to the defined endpoint.
# (Optional) defaults to false
enableConsulRoundRobin: true

# requestTimeout is how long an event will be held on to starting from when it is
# received till completing the http request.
# So if the event was in the queue for 124s and using the default value
Expand Down Expand Up @@ -430,3 +437,32 @@ service:
# # (Optional) defaults to empty struct
# # queryOptions:
# # useCache: true
# - # service name to watch for which caduceus to send events to.
# # NOTE: enableConsulRoundRobin must be set to true in order for this to work.
# service: "caduceus"
#
# # tags is a list of strings that must be attached to the services
# # being watched.
# # (Optional) defaults to empty list
# tags:
# - "dev"
# - "docker"
#
# # passingOnly determines if only services passing the consul check are returned.
# # (Optional) defaults to false
# passingOnly: true
#
# # allDatacenters determines if there is a watch for all datacenter changes.
# # change this to have the devices hash across all datacenters instead of
# # the single datacenter. The datacenter is known by the consul agent who is
# # aware of which datacenter it is in.
# # (Optional) defaults to false, aka only watch for services in the
# # current datacenter.
# allDatacenters: false
#
# # queryOptions are options for the consul query, used in conjunction
# # with passingOnly and allDatacenters.
# # defined by https://godoc.org/github.com/hashicorp/consul/api#QueryOptions
# # (Optional) defaults to empty struct
# # queryOptions:
# # useCache: true

0 comments on commit dc95e3f

Please sign in to comment.