Skip to content

Commit

Permalink
Merge pull request #44 from ripienaar/43
Browse files Browse the repository at this point in the history
(#43) support packaging handlers into docker containers
  • Loading branch information
ripienaar authored Feb 8, 2022
2 parents 4ea9e69 + 904bbfc commit 783f133
Show file tree
Hide file tree
Showing 17 changed files with 384 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Dockerfile.goreleaser
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@

FROM alpine:latest

RUN apk --no-cache add ca-certificates

ENTRYPOINT ["/usr/bin/ajc"]
COPY ajc /usr/bin/ajc
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ task, _ := asyncjobs.NewTask("email:new", newEmail())
client.EnqueueTask(ctx, task)
```

Tasks are processes by horizontally and vertically scalable. Typically, a Handler handles one type of Task. We have Prometheus
Tasks are processes by horizontally and vertically scalable processes. Typically, a Handler handles one type of Task. We have Prometheus
integration, concurrency and backoffs configured.

```go
Expand All @@ -64,7 +64,7 @@ client, _ := asyncjobs.NewClient(
asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes))

router := asyncjobs.NewTaskRouter()
router.Handler("email:new", func(ctx context.Context, task *asyncjobs.Task) (interface{}, error) {
router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (interface{}, error) {
log.Printf("Processing task %s", task.ID)

// do work here using task.Payload
Expand Down
1 change: 1 addition & 0 deletions ajc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func main() {
configureInfoCommand(ajc)
configureTaskCommand(ajc)
configureQueueCommand(ajc)
configurePackagesCommand(ajc)

_, err := ajc.Parse(os.Args[1:])
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions ajc/package_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0

package main

import (
"fmt"
"os"
"path/filepath"

"github.com/choria-io/asyncjobs/generators"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
)

type packageCommand struct {
file string
}

func configurePackagesCommand(app *kingpin.Application) {
c := &packageCommand{}

pkg := app.Command("package", "Creates packages hosting handlers").Alias("pkg")

pkg.Command("docker", "Creates a Docker Container hosting handlers based on handlers.yaml").Action(c.dockerAction)
pkg.Flag("file", "Use a specific configuration file rather than asyncjobs.yaml").Default("asyncjobs.yaml").ExistingFileVar(&c.file)
}

func (c *packageCommand) dockerAction(_ *kingpin.ParseContext) error {
createLogger()

_, err := os.Stat(c.file)
if os.IsNotExist(err) {
return fmt.Errorf("handlers.yaml does not exist")
}

hb, err := os.ReadFile(c.file)
if err != nil {
return err
}

h := &generators.Package{}
err = yaml.Unmarshal(hb, h)
if err != nil {
return fmt.Errorf("invalid handlers file: %v", err)
}

if h.AJVersion == "" {
h.AJVersion = version
}
if h.Name == "" {
h.Name = "choria.io/asyncjobs/handlers"
}

if len(h.TaskHandlers) == 0 {
return fmt.Errorf("no task handlers specified in %s", c.file)
}

generator, err := generators.NewGoContainer(h)
if err != nil {
return err
}

path, err := filepath.Abs(".")
if err != nil {
return err
}

err = generator.RenderToDirectory(path)
if err != nil {
return err
}

log.Printf("Run docker build to build your package\n")

return nil
}
2 changes: 1 addition & 1 deletion ajc/task_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (c *taskCommand) processAction(_ *kingpin.ParseContext) error {
}

router := asyncjobs.NewTaskRouter()
err = router.HandleFunc(c.ttype, func(ctx context.Context, task *asyncjobs.Task) (interface{}, error) {
err = router.HandleFunc(c.ttype, func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (interface{}, error) {
tj, err := json.Marshal(task)
if err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion ajc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"golang.org/x/term"
)

func prepare(copts ...asyncjobs.ClientOpt) error {
func createLogger() {
logger := logrus.New()
if debug {
logger.SetLevel(logrus.DebugLevel)
Expand All @@ -33,7 +33,11 @@ func prepare(copts ...asyncjobs.ClientOpt) error {
FullTimestamp: true,
TimestampFormat: "15:04:05",
})

log = logrus.NewEntry(logger)
}
func prepare(copts ...asyncjobs.ClientOpt) error {
createLogger()

if nctx == "" {
return fmt.Errorf("no NATS Context specified")
Expand Down
2 changes: 1 addition & 1 deletion client_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExampleClient_consumer() {
panicIfErr(err)

router := NewTaskRouter()
err = router.HandleFunc("email:send", func(_ context.Context, t *Task) (interface{}, error) {
err = router.HandleFunc("email:send", func(_ context.Context, _ Logger, t *Task) (interface{}, error) {
log.Printf("Processing task: %s", t.ID)

// handle task.Payload which is a JSON encoded email
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ var _ = Describe("Client", func() {
handled := int32(0)

router := NewTaskRouter()
router.HandleFunc("test", func(ctx context.Context, t *Task) (interface{}, error) {
router.HandleFunc("test", func(ctx context.Context, log Logger, t *Task) (interface{}, error) {
if t.Tries > 1 {
log.Printf("Try %d for task %s", t.Tries, t.ID)
log.Infof("Try %d for task %s", t.Tries, t.ID)
}

done := atomic.AddInt32(&handled, 1)
if done == int32(testCount)+10 {
log.Printf("Processed all messages")
log.Infof("Processed all messages")
time.AfterFunc(50*time.Millisecond, func() {
cancel()
})
Expand Down Expand Up @@ -198,10 +198,10 @@ var _ = Describe("Client", func() {
var tries []time.Time

router := NewTaskRouter()
router.HandleFunc("ginkgo", func(ctx context.Context, t *Task) (interface{}, error) {
router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (interface{}, error) {
tries = append(tries, time.Now())

log.Printf("Trying task %s on try %d\n", t.ID, t.Tries)
log.Infof("Trying task %s on try %d\n", t.ID, t.Tries)

if t.Tries < 2 {
return "fail", fmt.Errorf("simulated failure")
Expand Down
36 changes: 36 additions & 0 deletions generators/fs/godocker/Dockerfile.templ
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
FROM golang:latest AS builder

WORKDIR /usr/src/app

COPY main.go /usr/src/app/main.go

RUN go mod init "{{ .Package.Name }}" && \
go mod tidy -compat=1.17 && \
go get github.com/choria-io/asyncjobs@{{ .Package.AJVersion }} && \
go mod download

RUN go build -v -o /app -ldflags="-s -w"

FROM alpine:latest

RUN adduser -g "Choria Async Jobs" choria && \
mkdir /lib64 && \
ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2 && \
apk --no-cache add ca-certificates && \
mkdir -p /handler/config

COPY --from=builder /app /handler/app

EXPOSE 8080/tcp

USER choria

ENV XDG_CONFIG_HOME "/handler/config"
ENV AJ_WORK_QUEUE "{{ .Package.WorkQueue }}"
{{- if .Package.ContextName }}
ENV AJ_NATS_CONTEXT "{{ .Package.ContextName }}"
{{- else }}
ENV AJ_NATS_CONTEXT "AJ"
{{- end }}

ENTRYPOINT ["/handler/app"]
115 changes: 115 additions & 0 deletions generators/fs/godocker/main.go.templ
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
//
// SPDX-License-Identifier: Apache-2.0

package main

import (
"context"
"fmt"
"os"
"runtime"
"strconv"

"github.com/sirupsen/logrus"
aj "github.com/choria-io/asyncjobs"

{{ range $handler := .Package.TaskHandlers }}
{{ $handler.TaskType | TypeToPackageName }} "{{ $handler.Package }}"
{{- end }}
)

var usage = `
Choria Async Jobs Handler Failed: %v
This is a generated Handler for the Choria Async Jobs Project.
It hosts the following handlers:
{{ range $handler := .Package.TaskHandlers }}
- {{ $handler.TaskType }}: {{ $handler.Package }}.AsyncJobHandler
{{- end }}
To run this you need to prepare a NATS context using the nats
CLI and then mount it into the container on /handler/config/nats.
The following Environment variables are supported:
- AJ_WORK_QUEUE: The Work Queue to consume from, defaults to DEFAULT
- AJ_NATS_CONTEXT: The name of a NATS Context to use for connections
- AJ_CONCURRENCY: The number of concurrent handlers that can be run
Prometheus statistics are Exposed on port 8080 as /metrics
For further information see the project wiki at:
https://github.com/choria-io/asyncjobs
Build Information:
- Build Time: {{ .BuildTime }}
- Async Jobs Package Version: {{ .Package.AJVersion }}
`

func usageIfError(err error) {
if err == nil {
return
}

fmt.Printf(usage, err)
os.Exit(1)
}

func main() {
wq := os.Getenv("AJ_WORK_QUEUE")
if wq == "" {
usageIfError(fmt.Errorf("AJ_WORK_QUEUE is required"))
}

nctx := os.Getenv("AJ_NATS_CONTEXT")
if nctx == "" {
usageIfError(fmt.Errorf("AJ_NATS_CONTEXT is required"))
}

var err error
concurrencyS := os.Getenv("AJ_CONCURRENCY")
concurrency := runtime.NumCPU()
if concurrencyS != "" {
concurrency, err = strconv.Atoi(concurrencyS)
if err != nil {
usageIfError(fmt.Errorf("AJ_CONCURRENCY must be numeric"))
}
}

logger := logrus.New()
if os.Getenv("AJ_DEBUG") == "1" {
logger.SetLevel(logrus.DebugLevel)
} else {
logger.SetLevel(logrus.InfoLevel)
}
logger.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: "15:04:05",
})

log := logrus.NewEntry(logger)

log.Printf("Connecting using Context %s consuming work queue %s with concurrency %d", nctx, wq, concurrency)

client, err := aj.NewClient(
aj.NatsContext(nctx),
aj.BindWorkQueue(wq),
aj.ClientConcurrency(concurrency),
aj.CustomLogger(log),
aj.PrometheusListenPort(8080))
usageIfError(err)

router := aj.NewTaskRouter()
{{ range $handler := .Package.TaskHandlers }}
err = router.HandleFunc("{{ $handler.TaskType }}", {{ $handler.TaskType | TypeToPackageName }}.AsyncJobHandler)
usageIfError(err)
{{- end }}

err = client.Run(context.Background(), router)
usageIfError(err)
}
Loading

0 comments on commit 783f133

Please sign in to comment.