From 904bbfc7e7ee718cf7d0d374bd12155946ea0119 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Tue, 8 Feb 2022 17:32:55 +0100 Subject: [PATCH] (#43) support packaging handlers into docker containers Also adjust the handler signature to include a Logger that will make creating reusable handlers easier and more portable Signed-off-by: R.I.Pienaar --- Dockerfile.goreleaser | 2 + README.md | 4 +- ajc/main.go | 1 + ajc/package_command.go | 78 ++++++++++++++++ ajc/task_command.go | 2 +- ajc/util.go | 6 +- client_examples_test.go | 2 +- client_test.go | 10 +-- generators/fs/godocker/Dockerfile.templ | 36 ++++++++ generators/fs/godocker/main.go.templ | 115 ++++++++++++++++++++++++ generators/godocker.go | 89 ++++++++++++++++++ generators/package.go | 33 +++++++ go.mod | 2 +- mux.go | 4 +- mux_test.go | 14 +-- processor.go | 2 +- processor_test.go | 9 +- 17 files changed, 384 insertions(+), 25 deletions(-) create mode 100644 ajc/package_command.go create mode 100644 generators/fs/godocker/Dockerfile.templ create mode 100644 generators/fs/godocker/main.go.templ create mode 100644 generators/godocker.go create mode 100644 generators/package.go diff --git a/Dockerfile.goreleaser b/Dockerfile.goreleaser index 57527b6..8cc65b8 100644 --- a/Dockerfile.goreleaser +++ b/Dockerfile.goreleaser @@ -2,5 +2,7 @@ FROM alpine:latest +RUN apk --no-cache add ca-certificates + ENTRYPOINT ["/usr/bin/ajc"] COPY ajc /usr/bin/ajc diff --git a/README.md b/README.md index 9048a45..cf76c15 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ task, _ := asyncjobs.NewTask("email:new", newEmail()) client.EnqueueTask(ctx, task) ``` -Tasks are processes by horizontally and vertically scalable. Typically, a Handler handles one type of Task. We have Prometheus +Tasks are processes by horizontally and vertically scalable processes. Typically, a Handler handles one type of Task. We have Prometheus integration, concurrency and backoffs configured. ```go @@ -64,7 +64,7 @@ client, _ := asyncjobs.NewClient( asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes)) router := asyncjobs.NewTaskRouter() -router.Handler("email:new", func(ctx context.Context, task *asyncjobs.Task) (interface{}, error) { +router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (interface{}, error) { log.Printf("Processing task %s", task.ID) // do work here using task.Payload diff --git a/ajc/main.go b/ajc/main.go index 609723f..7910100 100644 --- a/ajc/main.go +++ b/ajc/main.go @@ -41,6 +41,7 @@ func main() { configureInfoCommand(ajc) configureTaskCommand(ajc) configureQueueCommand(ajc) + configurePackagesCommand(ajc) _, err := ajc.Parse(os.Args[1:]) if err != nil { diff --git a/ajc/package_command.go b/ajc/package_command.go new file mode 100644 index 0000000..6c46acd --- /dev/null +++ b/ajc/package_command.go @@ -0,0 +1,78 @@ +// Copyright (c) 2022, R.I. Pienaar and the Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/choria-io/asyncjobs/generators" + "gopkg.in/alecthomas/kingpin.v2" + "gopkg.in/yaml.v2" +) + +type packageCommand struct { + file string +} + +func configurePackagesCommand(app *kingpin.Application) { + c := &packageCommand{} + + pkg := app.Command("package", "Creates packages hosting handlers").Alias("pkg") + + pkg.Command("docker", "Creates a Docker Container hosting handlers based on handlers.yaml").Action(c.dockerAction) + pkg.Flag("file", "Use a specific configuration file rather than asyncjobs.yaml").Default("asyncjobs.yaml").ExistingFileVar(&c.file) +} + +func (c *packageCommand) dockerAction(_ *kingpin.ParseContext) error { + createLogger() + + _, err := os.Stat(c.file) + if os.IsNotExist(err) { + return fmt.Errorf("handlers.yaml does not exist") + } + + hb, err := os.ReadFile(c.file) + if err != nil { + return err + } + + h := &generators.Package{} + err = yaml.Unmarshal(hb, h) + if err != nil { + return fmt.Errorf("invalid handlers file: %v", err) + } + + if h.AJVersion == "" { + h.AJVersion = version + } + if h.Name == "" { + h.Name = "choria.io/asyncjobs/handlers" + } + + if len(h.TaskHandlers) == 0 { + return fmt.Errorf("no task handlers specified in %s", c.file) + } + + generator, err := generators.NewGoContainer(h) + if err != nil { + return err + } + + path, err := filepath.Abs(".") + if err != nil { + return err + } + + err = generator.RenderToDirectory(path) + if err != nil { + return err + } + + log.Printf("Run docker build to build your package\n") + + return nil +} diff --git a/ajc/task_command.go b/ajc/task_command.go index e5657d6..c234ac6 100644 --- a/ajc/task_command.go +++ b/ajc/task_command.go @@ -188,7 +188,7 @@ func (c *taskCommand) processAction(_ *kingpin.ParseContext) error { } router := asyncjobs.NewTaskRouter() - err = router.HandleFunc(c.ttype, func(ctx context.Context, task *asyncjobs.Task) (interface{}, error) { + err = router.HandleFunc(c.ttype, func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (interface{}, error) { tj, err := json.Marshal(task) if err != nil { return nil, err diff --git a/ajc/util.go b/ajc/util.go index 7d0415f..35b1894 100644 --- a/ajc/util.go +++ b/ajc/util.go @@ -21,7 +21,7 @@ import ( "golang.org/x/term" ) -func prepare(copts ...asyncjobs.ClientOpt) error { +func createLogger() { logger := logrus.New() if debug { logger.SetLevel(logrus.DebugLevel) @@ -33,7 +33,11 @@ func prepare(copts ...asyncjobs.ClientOpt) error { FullTimestamp: true, TimestampFormat: "15:04:05", }) + log = logrus.NewEntry(logger) +} +func prepare(copts ...asyncjobs.ClientOpt) error { + createLogger() if nctx == "" { return fmt.Errorf("no NATS Context specified") diff --git a/client_examples_test.go b/client_examples_test.go index acc451e..d30c52a 100644 --- a/client_examples_test.go +++ b/client_examples_test.go @@ -71,7 +71,7 @@ func ExampleClient_consumer() { panicIfErr(err) router := NewTaskRouter() - err = router.HandleFunc("email:send", func(_ context.Context, t *Task) (interface{}, error) { + err = router.HandleFunc("email:send", func(_ context.Context, _ Logger, t *Task) (interface{}, error) { log.Printf("Processing task: %s", t.ID) // handle task.Payload which is a JSON encoded email diff --git a/client_test.go b/client_test.go index bfd1f6f..4c3aa83 100644 --- a/client_test.go +++ b/client_test.go @@ -138,14 +138,14 @@ var _ = Describe("Client", func() { handled := int32(0) router := NewTaskRouter() - router.HandleFunc("test", func(ctx context.Context, t *Task) (interface{}, error) { + router.HandleFunc("test", func(ctx context.Context, log Logger, t *Task) (interface{}, error) { if t.Tries > 1 { - log.Printf("Try %d for task %s", t.Tries, t.ID) + log.Infof("Try %d for task %s", t.Tries, t.ID) } done := atomic.AddInt32(&handled, 1) if done == int32(testCount)+10 { - log.Printf("Processed all messages") + log.Infof("Processed all messages") time.AfterFunc(50*time.Millisecond, func() { cancel() }) @@ -198,10 +198,10 @@ var _ = Describe("Client", func() { var tries []time.Time router := NewTaskRouter() - router.HandleFunc("ginkgo", func(ctx context.Context, t *Task) (interface{}, error) { + router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (interface{}, error) { tries = append(tries, time.Now()) - log.Printf("Trying task %s on try %d\n", t.ID, t.Tries) + log.Infof("Trying task %s on try %d\n", t.ID, t.Tries) if t.Tries < 2 { return "fail", fmt.Errorf("simulated failure") diff --git a/generators/fs/godocker/Dockerfile.templ b/generators/fs/godocker/Dockerfile.templ new file mode 100644 index 0000000..aa686ea --- /dev/null +++ b/generators/fs/godocker/Dockerfile.templ @@ -0,0 +1,36 @@ +FROM golang:latest AS builder + +WORKDIR /usr/src/app + +COPY main.go /usr/src/app/main.go + +RUN go mod init "{{ .Package.Name }}" && \ + go mod tidy -compat=1.17 && \ + go get github.com/choria-io/asyncjobs@{{ .Package.AJVersion }} && \ + go mod download + +RUN go build -v -o /app -ldflags="-s -w" + +FROM alpine:latest + +RUN adduser -g "Choria Async Jobs" choria && \ + mkdir /lib64 && \ + ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2 && \ + apk --no-cache add ca-certificates && \ + mkdir -p /handler/config + +COPY --from=builder /app /handler/app + +EXPOSE 8080/tcp + +USER choria + +ENV XDG_CONFIG_HOME "/handler/config" +ENV AJ_WORK_QUEUE "{{ .Package.WorkQueue }}" +{{- if .Package.ContextName }} +ENV AJ_NATS_CONTEXT "{{ .Package.ContextName }}" +{{- else }} +ENV AJ_NATS_CONTEXT "AJ" +{{- end }} + +ENTRYPOINT ["/handler/app"] diff --git a/generators/fs/godocker/main.go.templ b/generators/fs/godocker/main.go.templ new file mode 100644 index 0000000..39b5b24 --- /dev/null +++ b/generators/fs/godocker/main.go.templ @@ -0,0 +1,115 @@ +// Copyright (c) 2022, R.I. Pienaar and the Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "context" + "fmt" + "os" + "runtime" + "strconv" + + "github.com/sirupsen/logrus" + aj "github.com/choria-io/asyncjobs" + +{{ range $handler := .Package.TaskHandlers }} + {{ $handler.TaskType | TypeToPackageName }} "{{ $handler.Package }}" +{{- end }} +) + +var usage = ` +Choria Async Jobs Handler Failed: %v + +This is a generated Handler for the Choria Async Jobs Project. + +It hosts the following handlers: +{{ range $handler := .Package.TaskHandlers }} + - {{ $handler.TaskType }}: {{ $handler.Package }}.AsyncJobHandler +{{- end }} + +To run this you need to prepare a NATS context using the nats +CLI and then mount it into the container on /handler/config/nats. + +The following Environment variables are supported: + + - AJ_WORK_QUEUE: The Work Queue to consume from, defaults to DEFAULT + - AJ_NATS_CONTEXT: The name of a NATS Context to use for connections + - AJ_CONCURRENCY: The number of concurrent handlers that can be run + +Prometheus statistics are Exposed on port 8080 as /metrics + +For further information see the project wiki at: + + https://github.com/choria-io/asyncjobs + +Build Information: + + - Build Time: {{ .BuildTime }} + - Async Jobs Package Version: {{ .Package.AJVersion }} + +` + +func usageIfError(err error) { + if err == nil { + return + } + + fmt.Printf(usage, err) + os.Exit(1) +} + +func main() { + wq := os.Getenv("AJ_WORK_QUEUE") + if wq == "" { + usageIfError(fmt.Errorf("AJ_WORK_QUEUE is required")) + } + + nctx := os.Getenv("AJ_NATS_CONTEXT") + if nctx == "" { + usageIfError(fmt.Errorf("AJ_NATS_CONTEXT is required")) + } + + var err error + concurrencyS := os.Getenv("AJ_CONCURRENCY") + concurrency := runtime.NumCPU() + if concurrencyS != "" { + concurrency, err = strconv.Atoi(concurrencyS) + if err != nil { + usageIfError(fmt.Errorf("AJ_CONCURRENCY must be numeric")) + } + } + + logger := logrus.New() + if os.Getenv("AJ_DEBUG") == "1" { + logger.SetLevel(logrus.DebugLevel) + } else { + logger.SetLevel(logrus.InfoLevel) + } + logger.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + TimestampFormat: "15:04:05", + }) + + log := logrus.NewEntry(logger) + + log.Printf("Connecting using Context %s consuming work queue %s with concurrency %d", nctx, wq, concurrency) + + client, err := aj.NewClient( + aj.NatsContext(nctx), + aj.BindWorkQueue(wq), + aj.ClientConcurrency(concurrency), + aj.CustomLogger(log), + aj.PrometheusListenPort(8080)) + usageIfError(err) + + router := aj.NewTaskRouter() +{{ range $handler := .Package.TaskHandlers }} + err = router.HandleFunc("{{ $handler.TaskType }}", {{ $handler.TaskType | TypeToPackageName }}.AsyncJobHandler) + usageIfError(err) +{{- end }} + + err = client.Run(context.Background(), router) + usageIfError(err) +} diff --git a/generators/godocker.go b/generators/godocker.go new file mode 100644 index 0000000..2ad7610 --- /dev/null +++ b/generators/godocker.go @@ -0,0 +1,89 @@ +// Copyright (c) 2022, R.I. Pienaar and the Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package generators + +import ( + "bytes" + "embed" + "os" + "path/filepath" + "strings" + "text/template" + "time" +) + +// GoContainer builds docker containers based on the package spec +type GoContainer struct { + // Package describes the package to build + Package *Package + // BuildTime is when the package is being built, set at runtime + BuildTime string +} + +var ( + //go:embed fs/godocker + godockerFS embed.FS +) + +// NewGoContainer create a new go container builder +func NewGoContainer(handlers *Package) (*GoContainer, error) { + return &GoContainer{ + Package: handlers, + BuildTime: time.Now().Format(time.RFC822), + }, nil +} + +// RenderToDirectory renders the container to a specific directory +func (g *GoContainer) RenderToDirectory(target string) error { + files, err := godockerFS.ReadDir("fs/godocker") + if err != nil { + return err + } + + funcs := map[string]interface{}{ + "TypeToPackageName": func(t string) string { + remove := []string{"_", "-", ":", "/", "\\"} + res := t + + for _, r := range remove { + res = strings.Replace(res, r, "", -1) + } + + return res + }, + } + + for _, f := range files { + if f.IsDir() { + continue + } + + t := template.New(f.Name()) + t.Funcs(funcs) + body, err := godockerFS.ReadFile(filepath.Join("fs/godocker", f.Name())) + if err != nil { + return err + } + + p, err := t.Parse(string(body)) + if err != nil { + return err + } + + buf := bytes.NewBuffer([]byte{}) + + err = p.Execute(buf, g) + if err != nil { + return err + } + + err = os.WriteFile(filepath.Join(target, strings.TrimSuffix(filepath.Base(f.Name()), ".templ")), buf.Bytes(), 0644) + if err != nil { + return err + } + } + + return nil +} diff --git a/generators/package.go b/generators/package.go new file mode 100644 index 0000000..0827de8 --- /dev/null +++ b/generators/package.go @@ -0,0 +1,33 @@ +// Copyright (c) 2022, R.I. Pienaar and the Project contributors +// +// SPDX-License-Identifier: Apache-2.0 + +package generators + +// Generator is the interfaces generators must implement +type Generator interface { + // RenderToDirectory renders the output to directory target + RenderToDirectory(target string) error +} + +// Package describe a configuration of a asyncjobs handler with multiple handlers loaded +type Package struct { + // ContextName is the optional NATS Context name to use when none is configured + ContextName string `yaml:"nats"` + // WorkQueue is the optional Work Queue name to bind to, else DEFAULT will be used + WorkQueue string `yaml:"queue"` + // TaskHandlers is a list of handlers for tasks + TaskHandlers []TaskHandler `yaml:"tasks"` + // Name is an optional name for the generated go package + Name string `yaml:"name"` + // AJVersion is an optional version to use for the choria-io/asyncjobs dependency + AJVersion string `yaml:"asyncjobs"` +} + +// TaskHandler is an individual Task Handler +type TaskHandler struct { + // TaskType is the type to handle like email:new + TaskType string `yaml:"type"` + // Package is a golang package name that has a AsyncJobHandler() implementing HandlerFunc + Package string `yaml:"package"` +} diff --git a/go.mod b/go.mod index 13175a1..07107b3 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5 golang.org/x/term v0.0.0-20210503060354-a79de5458b56 gopkg.in/alecthomas/kingpin.v2 v2.2.6 + gopkg.in/yaml.v2 v2.4.0 ) require ( @@ -44,5 +45,4 @@ require ( golang.org/x/text v0.3.6 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect google.golang.org/protobuf v1.26.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/mux.go b/mux.go index 773850c..efa9229 100644 --- a/mux.go +++ b/mux.go @@ -24,7 +24,7 @@ type entryHandler struct { } // HandlerFunc handles a single task, the response bytes will be stored in the original task -type HandlerFunc func(ctx context.Context, t *Task) (interface{}, error) +type HandlerFunc func(ctx context.Context, log Logger, t *Task) (interface{}, error) // Mux routes messages // @@ -44,7 +44,7 @@ func NewTaskRouter() *Mux { } } -func notFoundHandler(_ context.Context, t *Task) (interface{}, error) { +func notFoundHandler(_ context.Context, _ Logger, t *Task) (interface{}, error) { return nil, fmt.Errorf("%w %q", ErrNoHandlerForTaskType, t.Type) } diff --git a/mux_test.go b/mux_test.go index 5aa4ee3..22b7b40 100644 --- a/mux_test.go +++ b/mux_test.go @@ -15,7 +15,7 @@ var _ = Describe("Router", func() { Describe("Handler", func() { It("Should support default handler", func() { router := NewTaskRouter() - router.HandleFunc("x", func(_ context.Context, _ *Task) (interface{}, error) { + router.HandleFunc("x", func(_ context.Context, _ Logger, _ *Task) (interface{}, error) { return "x", nil }) @@ -23,28 +23,28 @@ var _ = Describe("Router", func() { Expect(err).ToNot(HaveOccurred()) handler := router.Handler(task) - _, err = handler(nil, task) + _, err = handler(nil, &defaultLogger{}, task) Expect(err).To(MatchError(ErrNoHandlerForTaskType)) }) It("Should find the correct handler", func() { router := NewTaskRouter() - router.HandleFunc("", func(_ context.Context, _ *Task) (interface{}, error) { + router.HandleFunc("", func(_ context.Context, _ Logger, _ *Task) (interface{}, error) { return "custom default", nil }) - router.HandleFunc("things:", func(_ context.Context, _ *Task) (interface{}, error) { + router.HandleFunc("things:", func(_ context.Context, _ Logger, _ *Task) (interface{}, error) { return "things:", nil }) - router.HandleFunc("things:very:specific", func(_ context.Context, _ *Task) (interface{}, error) { + router.HandleFunc("things:very:specific", func(_ context.Context, _ Logger, _ *Task) (interface{}, error) { return "things:very:specific", nil }) - router.HandleFunc("things:specific", func(_ context.Context, _ *Task) (interface{}, error) { + router.HandleFunc("things:specific", func(_ context.Context, _ Logger, _ *Task) (interface{}, error) { return "things:specific", nil }) check := func(ttype string, expected string) { task := &Task{Type: ttype} - res, err := router.Handler(task)(context.Background(), task) + res, err := router.Handler(task)(context.Background(), &defaultLogger{}, task) Expect(err).ToNot(HaveOccurred()) Expect(res).To(Equal(expected)) } diff --git a/processor.go b/processor.go index d216f8b..a035e6d 100644 --- a/processor.go +++ b/processor.go @@ -211,7 +211,7 @@ func (p *processor) handle(ctx context.Context, t *Task, item *ProcessItem, to t t.Tries++ - payload, err := p.mux.Handler(t)(timeout, t) + payload, err := p.mux.Handler(t)(timeout, p.log, t) if err != nil { if errors.Is(err, ErrTerminateTask) { handlersErroredCounter.WithLabelValues(t.Queue, t.Type).Inc() diff --git a/processor_test.go b/processor_test.go index fce10b3..6ba0578 100644 --- a/processor_test.go +++ b/processor_test.go @@ -43,7 +43,7 @@ var _ = Describe("Processor", func() { wg.Add(1) router := NewTaskRouter() - router.HandleFunc("ginkgo", func(ctx context.Context, t *Task) (interface{}, error) { + router.HandleFunc("ginkgo", func(_ context.Context, _ Logger, t *Task) (interface{}, error) { wg.Done() return nil, fmt.Errorf("simulated failure: %w", ErrTerminateTask) }) @@ -88,7 +88,7 @@ var _ = Describe("Processor", func() { wg.Add(1) router := NewTaskRouter() - router.HandleFunc("ginkgo", func(ctx context.Context, t *Task) (interface{}, error) { + router.HandleFunc("ginkgo", func(_ context.Context, _ Logger, t *Task) (interface{}, error) { wg.Done() return nil, fmt.Errorf("simulated failure") }) @@ -129,7 +129,7 @@ var _ = Describe("Processor", func() { wg.Add(1) router := NewTaskRouter() - router.HandleFunc("ginkgo", func(ctx context.Context, t *Task) (interface{}, error) { + router.HandleFunc("ginkgo", func(_ context.Context, _ Logger, t *Task) (interface{}, error) { wg.Done() return "done", nil }) @@ -320,10 +320,11 @@ var _ = Describe("Processor", func() { wg.Add(1) router := NewTaskRouter() - router.HandleFunc("ginkgo", func(ctx context.Context, task *Task) (interface{}, error) { + router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, task *Task) (interface{}, error) { t, err := client.LoadTaskByID(task.ID) Expect(err).ToNot(HaveOccurred()) Expect(t.State).To(Equal(TaskStateActive)) + Expect(log).ToNot(BeNil()) wg.Done() return "done", nil