Skip to content

Commit

Permalink
Merge pull request #69 from ripienaar/68
Browse files Browse the repository at this point in the history
(#68) make external command handlers usable via go api and asyncjobs file
  • Loading branch information
ripienaar authored Mar 4, 2022
2 parents d18ed3e + 6eeaa9c commit d860d41
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 50 deletions.
40 changes: 1 addition & 39 deletions ajc/task_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ package main

import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"time"

Expand Down Expand Up @@ -181,41 +178,6 @@ func (c *taskCommand) watchAction(_ *kingpin.ParseContext) error {
}
}

func (c *taskCommand) commandHandlerFunc(ctx context.Context, log aj.Logger, task *aj.Task) (interface{}, error) {
tj, err := json.Marshal(task)
if err != nil {
return nil, err
}

stdinFile, err := os.CreateTemp("", "asyncjob")
if err != nil {
return nil, err
}
defer os.Remove(stdinFile.Name())
defer stdinFile.Close()

_, err = stdinFile.Write(tj)
if err != nil {
return nil, err
}
stdinFile.Close()

start := time.Now()
log.Infof("Running task %s try %d", task.ID, task.Tries)

cmd := exec.CommandContext(ctx, c.command)
cmd.Env = append(cmd.Env, fmt.Sprintf("CHORIA_AJ_TASK=%s", stdinFile.Name()))
out, err := cmd.CombinedOutput()
if err != nil {
log.Errorf("Running %s failed: %q", c.command, out)
return nil, err
}

log.Infof("Task %s completed after %s and %d tries with %s payload", task.ID, time.Since(start), task.Tries, humanize.IBytes(uint64(len(out))))

return out, nil
}

func (c *taskCommand) processAction(_ *kingpin.ParseContext) error {
if c.command == "" && !c.remote {
return fmt.Errorf("either a command or --remote is required")
Expand Down Expand Up @@ -243,7 +205,7 @@ func (c *taskCommand) processAction(_ *kingpin.ParseContext) error {
if c.remote {
err = router.RequestReply(c.ttype, client)
} else {
err = router.HandleFunc(c.ttype, c.commandHandlerFunc)
err = router.ExternalProcess(c.ttype, c.command)
}
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ var (
ErrInvalidQueueState = fmt.Errorf("invalid queue storage state")
// ErrDuplicateItem indicates that the Work Queue deduplication protection refused a message
ErrDuplicateItem = fmt.Errorf("duplicate work queue item")

// ErrExternalCommandNotFound indicates a command for an ExternalProcess handler was not found
ErrExternalCommandNotFound = fmt.Errorf("command not found")
// ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed
ErrExternalCommandFailed = fmt.Errorf("execution failed")
// ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered
ErrUnknownEventType = fmt.Errorf("unknown event type")

Expand Down
11 changes: 8 additions & 3 deletions generators/fs/godocker/Dockerfile.templ
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ WORKDIR /usr/src/app

RUN go mod init "{{ .Package.Name }}" && \
{{- range $handler := .Package.TaskHandlers }}
{{- if not $handler.RequestReply }}
{{- if $handler.Package }}
go get "{{ $handler.Package }}@{{ $handler.Version }}" && \
{{- end }}
{{- end }}
go get github.com/choria-io/asyncjobs@{{ .Package.AJVersion }}

COPY main.go /usr/src/app/main.go

RUN go mod tidy -compat=1.17 && \
go build -v -o /app -ldflags="-s -w -extldflags=-static"
RUN go mod tidy -compat=1.17
RUN go build -v -o /app -ldflags="-s -w -extldflags=-static"

FROM alpine:latest

Expand All @@ -25,6 +25,11 @@ RUN addgroup -g 2048 asyncjobs && \
mkdir -p /handler/config

COPY --from=builder /app /handler/app
{{- range $handler := .Package.TaskHandlers }}
{{- if $handler.Command }}
COPY ./commands/{{ $handler.Command }} /handler/commands/{{ $handler.Command }}
{{- end }}
{{- end }}

EXPOSE 8080/tcp

Expand Down
6 changes: 5 additions & 1 deletion generators/fs/godocker/main.go.templ
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
aj "github.com/choria-io/asyncjobs"

{{- range $handler := .Package.TaskHandlers }}
{{- if not $handler.RequestReply }}
{{- if $handler.Package }}
{{ $handler.TaskType | TypeToPackageName }} "{{ $handler.Package }}"
{{- end }}
{{- end }}
Expand All @@ -27,6 +27,8 @@ It hosts the following handlers:
{{ range $handler := .Package.TaskHandlers }}
{{- if $handler.RequestReply }}
- {{ $handler.TaskType}}: Remote Request-Reply Service
{{- else if $handler.Command }}
- {{ $handler.TaskType}}: Using command /handler/commands/{{ $handler.Command }}
{{- else }}
- {{ $handler.TaskType }}: {{ $handler.Package }}@{{ $handler.Version }}
{{- end }}
Expand Down Expand Up @@ -130,6 +132,8 @@ func main() {
{{ range $handler := .Package.TaskHandlers }}
{{- if $handler.RequestReply }}
err = router.RequestReply("{{ $handler.TaskType }}", client)
{{- else if $handler.Command }}
err = router.ExternalProcess("{{ $handler.TaskType }}", "/handler/commands/{{ $handler.Command }}")
{{- else }}
err = router.HandleFunc("{{ $handler.TaskType }}", {{ $handler.TaskType | TypeToPackageName }}.AsyncJobHandler)
{{- end }}
Expand Down
3 changes: 1 addition & 2 deletions generators/godocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (g *GoContainer) RenderToDirectory(target string) error {
}

for _, p := range g.Package.TaskHandlers {
if p.RequestReply {
if p.RequestReply || p.Command != "" {
continue
}

Expand All @@ -61,7 +61,6 @@ func (g *GoContainer) RenderToDirectory(target string) error {
if p.Version == "" {
return fmt.Errorf("task handlers require a version")
}

}

funcs := map[string]interface{}{
Expand Down
2 changes: 2 additions & 0 deletions generators/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,6 @@ type TaskHandler struct {
Version string `yaml:"version"`
// RequestReply indicates the handler is a callout to a remote service
RequestReply bool `yaml:"remote"`
// Command indicates the handler is a callout to a command in the given file
Command string `yaml:"command"`
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ module github.com/choria-io/asyncjobs

go 1.17

// this should not be needed, but something somewhere is putting jwt v1 in go.sum
// triggering github security alerts, the dependency is unused
replace github.com/nats-io/jwt v1.2.2 => github.com/nats-io/jwt/v2 v2.2.0

require (
github.com/AlecAivazis/survey/v2 v2.3.2
github.com/dustin/go-humanize v1.0.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jsm.go v0.0.29 h1:5y4WaH5OkhknpU35/ej8ZGfWQ6FzugklvlUBGj6EJNo=
github.com/nats-io/jsm.go v0.0.29/go.mod h1:ez2gzt0p1YhQXJlzYDZkkoxAQpl6HHpnEI4/GBDzzQA=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.2.0 h1:Yg/4WFK6vsqMudRg91eBb7Dh6XeVcDMPHycDE8CfltE=
github.com/nats-io/jwt/v2 v2.2.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/jwt/v2 v2.2.1-0.20220216230343-0ebff70bb096 h1:/NsiAJ3QwYAJ1GcE46MZKdn924z1ki5JmLIOLIG7cMw=
github.com/nats-io/jwt/v2 v2.2.1-0.20220216230343-0ebff70bb096/go.mod h1:3dd7YxgBieCh8qT6R3lOGvKezVaDVtZ/YB6OTcLxu+Y=
Expand All @@ -204,7 +204,6 @@ github.com/nats-io/nats-server/v2 v2.7.3/go.mod h1:eJUrA5gm0ch6sJTEv85xmXIgQWsB0
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d h1:0aVg1U2lBfksxhhnff5DOWwJix9D2n4uBHGaHBroI6I=
github.com/nats-io/nats.go v1.13.1-0.20220216000616-0096b1bfae8d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898 h1:FoO4iS4qOKmNWMvv4T48tpwH9C/bs97vN2X9O47My8Y=
github.com/nats-io/nkeys v0.3.1-0.20220214171627-79ae42e4d898/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down Expand Up @@ -286,7 +285,6 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
Expand Down
52 changes: 52 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ package asyncjobs

import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"sort"
"strings"
"sync"
"time"

"github.com/dustin/go-humanize"
)

type entryHandler struct {
Expand Down Expand Up @@ -86,3 +92,49 @@ func (m *Mux) RequestReply(taskType string, client *Client) error {
h := newRequestReplyHandleFunc(client.opts.nc, taskType)
return m.HandleFunc(taskType, h)
}

// ExternalProcess sets up a delegated handler that calls an external command to handle the task.
//
// The task will be passed in JSON format on STDIN, any STDOUT/STDERR output will become the task
// result. Any non 0 exit code will be treated as a task failure.
func (m *Mux) ExternalProcess(taskType string, command string) error {
return m.HandleFunc(taskType, func(ctx context.Context, log Logger, task *Task) (interface{}, error) {
stat, err := os.Stat(command)
if err != nil || stat.IsDir() {
return nil, ErrExternalCommandNotFound
}

tj, err := json.Marshal(task)
if err != nil {
return nil, err
}

stdinFile, err := os.CreateTemp("", "asyncjobs-task")
if err != nil {
return nil, err
}
defer os.Remove(stdinFile.Name())
defer stdinFile.Close()

_, err = stdinFile.Write(tj)
if err != nil {
return nil, err
}
stdinFile.Close()

start := time.Now()
log.Infof("Running task %s try %d using %q", task.ID, task.Tries, command)

cmd := exec.CommandContext(ctx, command)
cmd.Env = append(cmd.Env, fmt.Sprintf("CHORIA_AJ_TASK=%s", stdinFile.Name()))
out, err := cmd.CombinedOutput()
if err != nil {
log.Errorf("Running %s failed: %q", command, out)
return nil, fmt.Errorf("%w: %v", ErrExternalCommandFailed, err)
}

log.Infof("Task %s completed using %q after %s and %d tries with %s payload", task.ID, command, time.Since(start), task.Tries, humanize.IBytes(uint64(len(out))))

return string(out), nil
})
}
35 changes: 35 additions & 0 deletions mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,41 @@ import (
)

var _ = Describe("Router", func() {
Describe("ExternalProcess", func() {
var (
task *Task
err error
router *Mux
)
BeforeEach(func() {
task, err = NewTask("email:new", nil)
Expect(err).ToNot(HaveOccurred())
router = NewTaskRouter()
})

It("Should handle missing commands", func() {
Expect(router.ExternalProcess("email:new", "testdata/missing.sh")).ToNot(HaveOccurred())
handler := router.Handler(task)
_, err = handler(context.Background(), &defaultLogger{}, task)
Expect(err).To(MatchError(ErrExternalCommandNotFound))
})

It("Should handle command failures", func() {
Expect(router.ExternalProcess("email:new", "testdata/failing-handler.sh")).ToNot(HaveOccurred())
handler := router.Handler(task)
_, err = handler(context.Background(), &defaultLogger{}, task)
Expect(err).To(MatchError(ErrExternalCommandFailed))
})

It("Should handle success", func() {
Expect(router.ExternalProcess("email:new", "testdata/passing-handler.sh")).ToNot(HaveOccurred())
handler := router.Handler(task)
payload, err := handler(context.Background(), &defaultLogger{}, task)
Expect(err).ToNot(HaveOccurred())
Expect(payload).To(Equal("success\n"))
})
})

Describe("Handler", func() {
It("Should support default handler", func() {
router := NewTaskRouter()
Expand Down
4 changes: 4 additions & 0 deletions testdata/failing-handler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh

echo "simulated failure"
exit 1
4 changes: 4 additions & 0 deletions testdata/passing-handler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh

echo "success"
exit 0

0 comments on commit d860d41

Please sign in to comment.