Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#2169) Pass choria federations to external discovery agent #2170

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"os"
"strings"
"sync"

"github.com/choria-io/go-choria/internal/fs"
Expand All @@ -23,6 +24,8 @@ type discoverCommand struct {
verbose bool
silent bool
fo *discovery.StandardOptions

federations string
}

func (d *discoverCommand) Setup() error {
Expand All @@ -35,6 +38,8 @@ func (d *discoverCommand) Setup() error {
d.fo.AddSelectionFlags(d.cmd)
d.fo.AddFlatFileFlags(d.cmd)

d.cmd.Flag("federations", "Comma-separated list of federations to target").StringVar(&d.federations)

d.cmd.Flag("verbose", "Log verbosely").Default("false").Short('v').UnNegatableBoolVar(&d.verbose)
d.cmd.Flag("json", "Produce JSON output").Short('j').UnNegatableBoolVar(&d.jsonFormat)
d.cmd.Flag("silent", "Produce as little logging as possible").Hidden().UnNegatableBoolVar(&d.silent)
Expand All @@ -44,6 +49,12 @@ func (d *discoverCommand) Setup() error {

func (d *discoverCommand) Configure() error {
err = commonConfigure()

// If list of federations is specified on the CLI, mutate the configuration directly
if len(d.federations) > 0 {
cfg.Choria.FederationCollectives = strings.Split(d.federations, ",")
}

if err != nil {
return err
}
Expand Down
14 changes: 13 additions & 1 deletion cmd/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"sync"
"time"

Expand All @@ -34,6 +35,8 @@ type pingCommand struct {

fo *discovery.StandardOptions

federations string

namesOnly bool

start time.Time
Expand All @@ -54,6 +57,8 @@ func (p *pingCommand) Setup() (err error) {
p.fo = discovery.NewStandardOptions()
p.fo.AddFilterFlags(p.cmd)

p.cmd.Flag("federations", "Comma-separated list of federations to target").StringVar(&p.federations)

p.cmd.Flag("expect", "Wait until this many replies were received or timeout").IntVar(&p.waitfor)
p.cmd.Flag("timeout", "How long to wait for responses").IntVar(&p.timeout)
p.cmd.Flag("graph", "Produce a graph of the result times").UnNegatableBoolVar(&p.graph)
Expand Down Expand Up @@ -195,7 +200,14 @@ func (p *pingCommand) createMessage(filter *protocol.Filter) (inter.Message, err
func (p *pingCommand) Configure() error {
protocol.ClientStrictValidation = false

return commonConfigure()
err := commonConfigure()

// If list of federations is specified on the CLI, mutate the configuration directly
if len(p.federations) > 0 {
cfg.Choria.FederationCollectives = strings.Split(p.federations, ",")
}

return err
}

// chart takes all the received time stamps and put them
Expand Down
9 changes: 9 additions & 0 deletions cmd/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type reqCommand struct {

outputWriter *bufio.Writer
outputFileHandle *os.File

federations string
}

func (r *reqCommand) Setup() (err error) {
Expand Down Expand Up @@ -99,6 +101,8 @@ that match the filter.
r.fo.AddFlatFileFlags(r.cmd)
r.fo.AddSelectionFlags(r.cmd)

r.cmd.Flag("federations", "List of federations to search for collectives in, comma separated").StringVar(&r.federations)

r.cmd.Flag("limit", "Limits request to a set of targets eg 10 or 10%").StringVar(&r.limit)
r.cmd.Flag("limit-seed", "Seed value for deterministic random limits").PlaceHolder("SEED").Int64Var(&r.limitSeed)
r.cmd.Flag("batch", "Do requests in batches").PlaceHolder("SIZE").IntVar(&r.batch)
Expand Down Expand Up @@ -462,6 +466,11 @@ func (r *reqCommand) Configure() error {
return err
}

// If list of federations is specified on the CLI, mutate the configuration directly
if len(r.federations) > 0 {
cfg.Choria.FederationCollectives = strings.Split(r.federations, ",")
}

// we try not to spam things to stderr in these structured output formats
if (r.jsonLinesOnly || r.jsonOnly) && cfg.LogLevel != "debug" {
cfg.LogLevel = "fatal"
Expand Down
16 changes: 16 additions & 0 deletions inter/imocks/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@
}
}

func WithFederations(federations []string) fwMockOption {

Check notice on line 79 in inter/imocks/util.go

View check run for this annotation

codefactor.io / CodeFactor

inter/imocks/util.go#L79

Exported func WithFederations returns unexported type imock.fwMockOption, which can be annoying to use (unexported-return)
return func(o *fwMockOpts) {
o.cfg.Choria.FederationCollectives = federations
}
}

func NewFrameworkForTests(ctrl *gomock.Controller, logWriter io.Writer, opts ...fwMockOption) (*MockFramework, *config.Config) {
mopts := &fwMockOpts{
cfg: config.NewConfigForTests(),
Expand All @@ -96,6 +102,16 @@
fw.EXPECT().Configuration().Return(mopts.cfg).AnyTimes()
fw.EXPECT().Logger(gomock.AssignableToTypeOf("")).Return(logrus.NewEntry(logger)).AnyTimes()
fw.EXPECT().NewRequestID().Return(util.RandomHexString(), nil).AnyTimes()
fw.EXPECT().FederationCollectives().DoAndReturn(
func() []string {
if len(fw.Configuration().Choria.FederationCollectives) == 0 {
retval := strings.Split(os.Getenv("CHORIA_FED_COLLECTIVE"), ",")
if retval[0] == "" {
return []string{}
}
}
return fw.Configuration().Choria.FederationCollectives
}).AnyTimes()
fw.EXPECT().HasCollective(gomock.AssignableToTypeOf("")).DoAndReturn(func(c string) bool {
for _, collective := range fw.Configuration().Collectives {
if c == collective {
Expand Down
36 changes: 20 additions & 16 deletions providers/discovery/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ type Response struct {

// Request is the request sent to the external script on its STDIN
type Request struct {
Protocol string `json:"protocol"`
Collective string `json:"collective"`
Filter *protocol.Filter `json:"filter"`
Options map[string]string `json:"options"`
Schema string `json:"$schema"`
Timeout float64 `json:"timeout"`
Protocol string `json:"protocol"`
Collective string `json:"collective"`
Filter *protocol.Filter `json:"filter"`
Federations []string `json:"federations"`
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
Options map[string]string `json:"options"`
Schema string `json:"$schema"`
Timeout float64 `json:"timeout"`
}

const (
Expand All @@ -67,10 +68,11 @@ func New(fw inter.Framework) *External {

func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []string, err error) {
dopts := &dOpts{
collective: e.fw.Configuration().MainCollective,
timeout: e.timeout,
command: e.fw.Configuration().Choria.ExternalDiscoveryCommand,
do: make(map[string]string),
collective: e.fw.Configuration().MainCollective,
timeout: e.timeout,
command: e.fw.Configuration().Choria.ExternalDiscoveryCommand,
federations: e.fw.FederationCollectives(),
do: make(map[string]string),
}

for _, opt := range opts {
Expand Down Expand Up @@ -100,12 +102,13 @@ func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []st
defer cancel()

idat := &Request{
Schema: RequestSchema,
Protocol: RequestProtocol,
Timeout: dopts.timeout.Seconds(),
Collective: dopts.collective,
Filter: dopts.filter,
Options: dopts.do,
Schema: RequestSchema,
Protocol: RequestProtocol,
Timeout: dopts.timeout.Seconds(),
Collective: dopts.collective,
Federations: dopts.federations,
Filter: dopts.filter,
Options: dopts.do,
}

req, err := json.Marshal(idat)
Expand Down Expand Up @@ -145,6 +148,7 @@ func (e *External) Discover(ctx context.Context, opts ...DiscoverOption) (n []st

cmd := exec.CommandContext(timeoutCtx, command, args[1:]...)
cmd.Dir = os.TempDir()

cmd.Env = []string{
"CHORIA_EXTERNAL_REQUEST=" + reqfile.Name(),
"CHORIA_EXTERNAL_REPLY=" + repfile.Name(),
Expand Down
139 changes: 93 additions & 46 deletions providers/discovery/external/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,64 +34,111 @@ var _ = Describe("External", func() {
e *External
)

BeforeEach(func() {
mockctl = gomock.NewController(GinkgoT())
fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter)
cfg.Collectives = []string{"mcollective", "test"}
Context("command without federation", func() {
BeforeEach(func() {
mockctl = gomock.NewController(GinkgoT())
fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter)
cfg.Collectives = []string{"mcollective", "test"}

e = New(fw)
})

AfterEach(func() {
mockctl.Finish()
})

Describe("New", func() {
It("Should initialize timeout to default", func() {
Expect(e.timeout).To(Equal(2 * time.Second))
cfg.DiscoveryTimeout = 100
e = New(fw)
Expect(e.timeout).To(Equal(100 * time.Second))
})
})

Describe("Discover", func() {
It("Should request and return discovered nodes", func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}
AfterEach(func() {
mockctl.Finish()
})

f := protocol.NewFilter()
f.AddAgentFilter("rpcutil")
f.AddFactFilter("country", "==", "mt")
Describe("New", func() {
It("Should initialize timeout to default", func() {
Expect(e.timeout).To(Equal(2 * time.Second))
cfg.DiscoveryTimeout = 100
e = New(fw)
Expect(e.timeout).To(Equal(100 * time.Second))
})
})

Describe("Discover", func() {
wd, _ := os.Getwd()
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good.rb")
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))

cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err = e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
var f *protocol.Filter
BeforeEach(func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}

f = protocol.NewFilter()
f.AddAgentFilter("rpcutil")
err := f.AddFactFilter("country", "==", "mt")
Expect(err).ToNot(HaveOccurred())
})
It("Should request and return discovered nodes", func() {
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good.rb")
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))

cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err = e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
})

It("Should support command overrides via options", func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}

cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/missing.rb")
cmd := filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"command": cmd, "foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
})
})
})
Context("With federation", func() {
BeforeEach(func() {
mockctl = gomock.NewController(GinkgoT())
fw, cfg = imock.NewFrameworkForTests(mockctl, GinkgoWriter, imock.WithFederations([]string{"alpha", "beta"}))
cfg.Collectives = []string{"mcollective", "test"}

It("Should support command overrides via options", func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}
e = New(fw)
})

f := protocol.NewFilter()
f.AddAgentFilter("rpcutil")
f.AddFactFilter("country", "==", "mt")
AfterEach(func() {
mockctl.Finish()
})

Describe("New", func() {
It("Should initialize timeout to default", func() {
Expect(e.timeout).To(Equal(2 * time.Second))
cfg.DiscoveryTimeout = 100
e = New(fw)
Expect(e.timeout).To(Equal(100 * time.Second))
})
})

Describe("Discover", func() {
wd, _ := os.Getwd()
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/missing.rb")
cmd := filepath.Join(wd, "testdata/good_with_argument.rb") + " discover --test"
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"command": cmd, "foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
var f *protocol.Filter
var err error
BeforeEach(func() {
if runtime.GOOS == "windows" {
Skip("not tested on windows")
}
// err := os.Setenv("CHORIA_FED_COLLECTIVE", "alpha,beta")
// Expect(err).ToNot(HaveOccurred())

f = protocol.NewFilter()
f.AddAgentFilter("rpcutil")
err = f.AddFactFilter("country", "==", "mt")
Expect(err).ToNot(HaveOccurred())
})
It("Should request and return discovered nodes", func() {
cfg.Choria.ExternalDiscoveryCommand = filepath.Join(wd, "testdata/good_with_federation.rb")
nodes, err := e.Discover(context.Background(), Filter(f), DiscoveryOptions(map[string]string{"foo": "bar"}))
Expect(err).ToNot(HaveOccurred())
Expect(nodes).To(Equal([]string{"one", "two"}))
})
})

})
})
18 changes: 13 additions & 5 deletions providers/discovery/external/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
)

type dOpts struct {
filter *protocol.Filter
collective string
timeout time.Duration
command string
do map[string]string
filter *protocol.Filter
collective string
federations []string
timeout time.Duration
command string
do map[string]string
}

// DiscoverOption configures the broadcast discovery method
Expand All @@ -35,6 +36,13 @@ func Collective(c string) DiscoverOption {
}
}

// Federations sets the list of federated collectives to discover in
func Federations(f []string) DiscoverOption {
return func(o *dOpts) {
o.federations = f
}
}

// Timeout sets the discovery timeout, else the configured default is used
func Timeout(t time.Duration) DiscoverOption {
return func(o *dOpts) {
Expand Down
Loading
Loading