From 6eeaa9c5f09b11def3e5fb064c5efd0996a52ef5 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Thu, 3 Mar 2022 10:51:38 +0100 Subject: [PATCH] (#68) make external command handlers usable via go api and asyncjobs file Signed-off-by: R.I.Pienaar --- ajc/task_command.go | 40 +------------------ errors.go | 5 ++- generators/fs/godocker/Dockerfile.templ | 11 ++++-- generators/fs/godocker/main.go.templ | 6 ++- generators/godocker.go | 3 +- generators/package.go | 2 + go.mod | 4 ++ go.sum | 6 +-- mux.go | 52 +++++++++++++++++++++++++ mux_test.go | 35 +++++++++++++++++ testdata/failing-handler.sh | 4 ++ testdata/passing-handler.sh | 4 ++ 12 files changed, 122 insertions(+), 50 deletions(-) create mode 100755 testdata/failing-handler.sh create mode 100755 testdata/passing-handler.sh diff --git a/ajc/task_command.go b/ajc/task_command.go index 5759206..4319960 100644 --- a/ajc/task_command.go +++ b/ajc/task_command.go @@ -6,10 +6,7 @@ package main import ( "context" - "encoding/json" "fmt" - "os" - "os/exec" "strings" "time" @@ -181,41 +178,6 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error { } } -func (c *taskCommand) commandHandlerFunc(ctx context.Context, log aj.Logger, task *aj.Task) (interface{}, error) { - tj, err := json.Marshal(task) - if err != nil { - return nil, err - } - - stdinFile, err := os.CreateTemp("", "asyncjob") - if err != nil { - return nil, err - } - defer os.Remove(stdinFile.Name()) - defer stdinFile.Close() - - _, err = stdinFile.Write(tj) - if err != nil { - return nil, err - } - stdinFile.Close() - - start := time.Now() - log.Infof("Running task %s try %d", task.ID, task.Tries) - - cmd := exec.CommandContext(ctx, c.command) - cmd.Env = append(cmd.Env, fmt.Sprintf("CHORIA_AJ_TASK=%s", stdinFile.Name())) - out, err := cmd.CombinedOutput() - if err != nil { - log.Errorf("Running %s failed: %q", c.command, out) - return nil, err - } - - log.Infof("Task %s completed after %s and %d tries with %s payload", task.ID, time.Since(start), task.Tries, humanize.IBytes(uint64(len(out)))) - - return out, nil -} - func (c *taskCommand) processAction(_ *kingpin.ParseContext) error { if c.command == "" && !c.remote { return fmt.Errorf("either a command or --remote is required") @@ -243,7 +205,7 @@ func (c *taskCommand) processAction(_ *kingpin.ParseContext) error { if c.remote { err = router.RequestReply(c.ttype, client) } else { - err = router.HandleFunc(c.ttype, c.commandHandlerFunc) + err = router.ExternalProcess(c.ttype, c.command) } if err != nil { return err diff --git a/errors.go b/errors.go index 3043566..7472632 100644 --- a/errors.go +++ b/errors.go @@ -67,7 +67,10 @@ var ( ErrInvalidQueueState = fmt.Errorf("invalid queue storage state") // ErrDuplicateItem indicates that the Work Queue deduplication protection refused a message ErrDuplicateItem = fmt.Errorf("duplicate work queue item") - + // ErrExternalCommandNotFound indicates a command for an ExternalProcess handler was not found + ErrExternalCommandNotFound = fmt.Errorf("command not found") + // ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed + ErrExternalCommandFailed = fmt.Errorf("execution failed") // ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered ErrUnknownEventType = fmt.Errorf("unknown event type") diff --git a/generators/fs/godocker/Dockerfile.templ b/generators/fs/godocker/Dockerfile.templ index e567e10..12d08c7 100644 --- a/generators/fs/godocker/Dockerfile.templ +++ b/generators/fs/godocker/Dockerfile.templ @@ -4,7 +4,7 @@ WORKDIR /usr/src/app RUN go mod init "{{ .Package.Name }}" && \ {{- range $handler := .Package.TaskHandlers }} - {{- if not $handler.RequestReply }} + {{- if $handler.Package }} go get "{{ $handler.Package }}@{{ $handler.Version }}" && \ {{- end }} {{- end }} @@ -12,8 +12,8 @@ RUN go mod init "{{ .Package.Name }}" && \ COPY main.go /usr/src/app/main.go -RUN go mod tidy -compat=1.17 && \ - go build -v -o /app -ldflags="-s -w -extldflags=-static" +RUN go mod tidy -compat=1.17 +RUN go build -v -o /app -ldflags="-s -w -extldflags=-static" FROM alpine:latest @@ -25,6 +25,11 @@ RUN addgroup -g 2048 asyncjobs && \ mkdir -p /handler/config COPY --from=builder /app /handler/app +{{- range $handler := .Package.TaskHandlers }} + {{- if $handler.Command }} +COPY ./commands/{{ $handler.Command }} /handler/commands/{{ $handler.Command }} + {{- end }} +{{- end }} EXPOSE 8080/tcp diff --git a/generators/fs/godocker/main.go.templ b/generators/fs/godocker/main.go.templ index c5c9f60..843a8fd 100644 --- a/generators/fs/godocker/main.go.templ +++ b/generators/fs/godocker/main.go.templ @@ -15,7 +15,7 @@ import ( aj "github.com/choria-io/asyncjobs" {{- range $handler := .Package.TaskHandlers }} - {{- if not $handler.RequestReply }} + {{- if $handler.Package }} {{ $handler.TaskType | TypeToPackageName }} "{{ $handler.Package }}" {{- end }} {{- end }} @@ -27,6 +27,8 @@ It hosts the following handlers: {{ range $handler := .Package.TaskHandlers }} {{- if $handler.RequestReply }} - {{ $handler.TaskType}}: Remote Request-Reply Service + {{- else if $handler.Command }} + - {{ $handler.TaskType}}: Using command /handler/commands/{{ $handler.Command }} {{- else }} - {{ $handler.TaskType }}: {{ $handler.Package }}@{{ $handler.Version }} {{- end }} @@ -130,6 +132,8 @@ func main() { {{ range $handler := .Package.TaskHandlers }} {{- if $handler.RequestReply }} err = router.RequestReply("{{ $handler.TaskType }}", client) + {{- else if $handler.Command }} + err = router.ExternalProcess("{{ $handler.TaskType }}", "/handler/commands/{{ $handler.Command }}") {{- else }} err = router.HandleFunc("{{ $handler.TaskType }}", {{ $handler.TaskType | TypeToPackageName }}.AsyncJobHandler) {{- end }} diff --git a/generators/godocker.go b/generators/godocker.go index 701ba59..7000540 100644 --- a/generators/godocker.go +++ b/generators/godocker.go @@ -50,7 +50,7 @@ func (g *GoContainer) RenderToDirectory(target string) error { } for _, p := range g.Package.TaskHandlers { - if p.RequestReply { + if p.RequestReply || p.Command != "" { continue } @@ -61,7 +61,6 @@ func (g *GoContainer) RenderToDirectory(target string) error { if p.Version == "" { return fmt.Errorf("task handlers require a version") } - } funcs := map[string]interface{}{ diff --git a/generators/package.go b/generators/package.go index 7357d3a..ffc6daa 100644 --- a/generators/package.go +++ b/generators/package.go @@ -38,4 +38,6 @@ type TaskHandler struct { Version string `yaml:"version"` // RequestReply indicates the handler is a callout to a remote service RequestReply bool `yaml:"remote"` + // Command indicates the handler is a callout to a command in the given file + Command string `yaml:"command"` } diff --git a/go.mod b/go.mod index 5748b1e..a5357b8 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,10 @@ module github.com/choria-io/asyncjobs go 1.17 +// this should not be needed, but something somewhere is putting jwt v1 in go.sum +// triggering github security alerts, the dependency is unused +replace github.com/nats-io/jwt v1.2.2 => github.com/nats-io/jwt/v2 v2.2.0 + require ( github.com/AlecAivazis/survey/v2 v2.3.2 github.com/dustin/go-humanize v1.0.0 diff --git a/go.sum b/go.sum index 1c258b1..e112892 100644 --- a/go.sum +++ b/go.sum @@ -193,8 +193,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jsm.go v0.0.29 h1:5y4WaH5OkhknpU35/ej8ZGfWQ6FzugklvlUBGj6EJNo= github.com/nats-io/jsm.go v0.0.29/go.mod h1:ez2gzt0p1YhQXJlzYDZkkoxAQpl6HHpnEI4/GBDzzQA= -github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= -github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE= +github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/jwt/v2 v2.2.1-0.20220216230343-0ebff70bb096 h1:/NsiAJ3QwYAJ1GcE46MZKdn924z1ki5JmLIOLIG7cMw= github.com/nats-io/jwt/v2 v2.2.1-0.20220216230343-0ebff70bb096/go.mod h1:3dd7YxgBieCh8qT6R3lOGvKezVaDVtZ/YB6OTcLxu+Y= @@ -204,7 +204,6 @@ github.com/nats-io/nats-server/v2 v2.7.3/go.mod h1:eJUrA5gm0ch6sJTEv85xmXIgQWsB0 github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d h1:0aVg1U2lBfksxhhnff5DOWwJix9D2n4uBHGaHBroI6I= github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898 h1:FoO4iS4qOKmNWMvv4T48tpwH9C/bs97vN2X9O47My8Y= github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= @@ -286,7 +285,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= diff --git a/mux.go b/mux.go index 4fd59c9..0113658 100644 --- a/mux.go +++ b/mux.go @@ -6,10 +6,16 @@ package asyncjobs import ( "context" + "encoding/json" "fmt" + "os" + "os/exec" "sort" "strings" "sync" + "time" + + "github.com/dustin/go-humanize" ) type entryHandler struct { @@ -86,3 +92,49 @@ func (m *Mux) RequestReply(taskType string, client *Client) error { h := newRequestReplyHandleFunc(client.opts.nc, taskType) return m.HandleFunc(taskType, h) } + +// ExternalProcess sets up a delegated handler that calls an external command to handle the task. +// +// The task will be passed in JSON format on STDIN, any STDOUT/STDERR output will become the task +// result. Any non 0 exit code will be treated as a task failure. +func (m *Mux) ExternalProcess(taskType string, command string) error { + return m.HandleFunc(taskType, func(ctx context.Context, log Logger, task *Task) (interface{}, error) { + stat, err := os.Stat(command) + if err != nil || stat.IsDir() { + return nil, ErrExternalCommandNotFound + } + + tj, err := json.Marshal(task) + if err != nil { + return nil, err + } + + stdinFile, err := os.CreateTemp("", "asyncjobs-task") + if err != nil { + return nil, err + } + defer os.Remove(stdinFile.Name()) + defer stdinFile.Close() + + _, err = stdinFile.Write(tj) + if err != nil { + return nil, err + } + stdinFile.Close() + + start := time.Now() + log.Infof("Running task %s try %d using %q", task.ID, task.Tries, command) + + cmd := exec.CommandContext(ctx, command) + cmd.Env = append(cmd.Env, fmt.Sprintf("CHORIA_AJ_TASK=%s", stdinFile.Name())) + out, err := cmd.CombinedOutput() + if err != nil { + log.Errorf("Running %s failed: %q", command, out) + return nil, fmt.Errorf("%w: %v", ErrExternalCommandFailed, err) + } + + log.Infof("Task %s completed using %q after %s and %d tries with %s payload", task.ID, command, time.Since(start), task.Tries, humanize.IBytes(uint64(len(out)))) + + return string(out), nil + }) +} diff --git a/mux_test.go b/mux_test.go index 22b7b40..3947e7b 100644 --- a/mux_test.go +++ b/mux_test.go @@ -12,6 +12,41 @@ import ( ) var _ = Describe("Router", func() { + Describe("ExternalProcess", func() { + var ( + task *Task + err error + router *Mux + ) + BeforeEach(func() { + task, err = NewTask("email:new", nil) + Expect(err).ToNot(HaveOccurred()) + router = NewTaskRouter() + }) + + It("Should handle missing commands", func() { + Expect(router.ExternalProcess("email:new", "testdata/missing.sh")).ToNot(HaveOccurred()) + handler := router.Handler(task) + _, err = handler(context.Background(), &defaultLogger{}, task) + Expect(err).To(MatchError(ErrExternalCommandNotFound)) + }) + + It("Should handle command failures", func() { + Expect(router.ExternalProcess("email:new", "testdata/failing-handler.sh")).ToNot(HaveOccurred()) + handler := router.Handler(task) + _, err = handler(context.Background(), &defaultLogger{}, task) + Expect(err).To(MatchError(ErrExternalCommandFailed)) + }) + + It("Should handle success", func() { + Expect(router.ExternalProcess("email:new", "testdata/passing-handler.sh")).ToNot(HaveOccurred()) + handler := router.Handler(task) + payload, err := handler(context.Background(), &defaultLogger{}, task) + Expect(err).ToNot(HaveOccurred()) + Expect(payload).To(Equal("success\n")) + }) + }) + Describe("Handler", func() { It("Should support default handler", func() { router := NewTaskRouter() diff --git a/testdata/failing-handler.sh b/testdata/failing-handler.sh new file mode 100755 index 0000000..29fdc60 --- /dev/null +++ b/testdata/failing-handler.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +echo "simulated failure" +exit 1 diff --git a/testdata/passing-handler.sh b/testdata/passing-handler.sh new file mode 100755 index 0000000..f1c45d3 --- /dev/null +++ b/testdata/passing-handler.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +echo "success" +exit 0