Skip to content

Commit

Permalink
move job engine to a new worker pod, using redis as the backend
Browse files Browse the repository at this point in the history
  • Loading branch information
lindgrenj6 committed Apr 21, 2022
1 parent 527a42e commit 2b8b93b
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 91 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ inlinerun:
listener:
go run `ls *.go | grep -v test` -listener

backgroundworker:
go run `ls *.go | grep -v test` -background-worker

container:
docker build . -t sources-api-go

Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type SourcesApiConfig struct {
Psks []string
BypassRbac bool
StatusListener bool
BackgroundWorker bool
MigrationsSetup bool
MigrationsReset bool
SecretStore string
Expand Down Expand Up @@ -124,6 +125,7 @@ func Get() *SourcesApiConfig {
// Parse any Flags (using our own flag set to not conflict with the global flag)
fs := flag.NewFlagSet("runtime", flag.ContinueOnError)
availabilityListener := fs.Bool("listener", false, "run availability status listener")
backgroundWorker := fs.Bool("background-worker", false, "run background worker")
setUpDatabase := fs.Bool("setup", false, "create the database and exit")
resetDatabase := fs.Bool("reset", false, "drop the database, recreate it and exit")

Expand All @@ -133,6 +135,7 @@ func Get() *SourcesApiConfig {
}

options.SetDefault("StatusListener", *availabilityListener)
options.SetDefault("BackgroundWorker", *backgroundWorker)
options.SetDefault("MigrationsSetup", *setUpDatabase)
options.SetDefault("MigrationsReset", *resetDatabase)

Expand Down Expand Up @@ -177,6 +180,7 @@ func Get() *SourcesApiConfig {
Psks: options.GetStringSlice("psks"),
BypassRbac: options.GetBool("BypassRbac"),
StatusListener: options.GetBool("StatusListener"),
BackgroundWorker: options.GetBool("BackgroundWorker"),
MigrationsSetup: options.GetBool("MigrationsSetup"),
MigrationsReset: options.GetBool("MigrationsReset"),
SecretStore: options.GetString("SecretStore"),
Expand Down
2 changes: 1 addition & 1 deletion dao/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func Init() {
Vault = vaultClient.Logical()

// we only want to seed the database when running the api pod - not the status listener
if !conf.StatusListener {
if !conf.StatusListener && !conf.BackgroundWorker {
err = seedDatabase()
if err != nil {
logging.Log.Fatalf("Failed to seed db: %v", err)
Expand Down
13 changes: 8 additions & 5 deletions db/migrations/migrations.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package migrations

import (
"context"
"fmt"
"time"

Expand All @@ -14,6 +15,8 @@ var migrationsCollection = []*gormigrate.Migration{
InitialSchema(),
}

var ctx = context.Background()

// redisLockKey is the key which will be used for the Redis lock when performing the migrations.
const redisLockKey = "sources-api-go-redis-lock"

Expand All @@ -33,7 +36,7 @@ func Migrate(db *gorm.DB) error {
}

// Before doing anything, check for the existence of the lock.
exists, err := redis.Client.Exists(redisLockKey).Result()
exists, err := redis.Client.Exists(ctx, redisLockKey).Result()
if err != nil {
return err
}
Expand All @@ -43,7 +46,7 @@ func Migrate(db *gorm.DB) error {
for lockExists {
time.Sleep(redisSleepTime)

exists, err = redis.Client.Exists(redisLockKey).Result()
exists, err = redis.Client.Exists(ctx, redisLockKey).Result()
if err != nil {
return err
}
Expand All @@ -52,7 +55,7 @@ func Migrate(db *gorm.DB) error {
}

// Set the migrations lock.
err = redis.Client.Set(redisLockKey, uuid.String(), redisLockExpirationTime).Err()
err = redis.Client.Set(ctx, redisLockKey, uuid.String(), redisLockExpirationTime).Err()
if err != nil {
return err
}
Expand All @@ -62,14 +65,14 @@ func Migrate(db *gorm.DB) error {
migrationErr := migrateTool.Migrate()

// Once the migrations have finished, get the lock's value to attempt to release it.
value, err := redis.Client.Get(redisLockKey).Result()
value, err := redis.Client.Get(ctx, redisLockKey).Result()
if err != nil {
return err
}

// The lock's value should coincide with the one we set above. If it doesn't something very wrong happened.
if value == uuid.String() {
err = redis.Client.Del(redisLockKey).Err()
err = redis.Client.Del(ctx, redisLockKey).Err()
if err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,27 @@ objects:
spec:
envName: ${ENV_NAME}
deployments:
- name: background-worker
minReplicas: 1
podSpec:
args:
- -background-worker
image: ${IMAGE}:${IMAGE_TAG}
env:
- name: LOG_LEVEL
value: ${LOG_LEVEL}
- name: ENCRYPTION_KEY
valueFrom:
secretKeyRef:
name: sources-api-secrets
key: encryption-key
resources:
limits:
cpu: ${AVAILABILITY_LISTENER_CPU_LIMIT}
memory: ${AVAILABILITY_LISTENER_MEMORY_LIMIT}
requests:
cpu: ${AVAILABILITY_LISTENER_CPU_REQUEST}
memory: ${AVAILABILITY_LISTENER_MEMORY_REQUEST}
- name: availability-status-listener
minReplicas: ${{AVAILABILITY_MIN_REPLICAS}}
podSpec:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/aws/aws-sdk-go v1.42.22
github.com/gertd/go-pluralize v0.1.7
github.com/go-gormigrate/gormigrate/v2 v2.0.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/golang-migrate/migrate/v4 v4.15.1
github.com/google/go-cmp v0.5.6
github.com/google/uuid v1.3.0
Expand Down
13 changes: 11 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27
github.com/denverdino/aliyungo v0.0.0-20190125010748-a747050bb1ba/go.mod h1:dV8lFg6daOBZbT6/BDGIz6Y3WFGn8juu6G+CQ6LHtl0=
github.com/dgrijalva/jwt-go v0.0.0-20170104182250-a601269ab70c/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
Expand Down Expand Up @@ -494,6 +496,8 @@ github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down Expand Up @@ -629,6 +633,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210715191844-86eeefc3e471/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
Expand Down Expand Up @@ -1056,6 +1061,7 @@ github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvw
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/gomega v0.0.0-20151007035656-2152b45fa28a/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
Expand All @@ -1066,8 +1072,10 @@ github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoT
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
Expand Down Expand Up @@ -1704,8 +1712,9 @@ golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211103235746-7861aae1554b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211210111614-af8b64212486 h1:5hpz5aRr+W1erYCL5JRhSUBJRph7l9XkNveoExlrKYk=
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
16 changes: 16 additions & 0 deletions internal/testutils/mocks/mock_sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mocks

import "github.com/RedHatInsights/sources-api-go/kafka"

type MockSender struct {
Hit int
Headers []kafka.Header
Body string
}

func (m *MockSender) RaiseEvent(_ string, b []byte, headers []kafka.Header) error {
m.Headers = headers
m.Body = string(b)
m.Hit++
return nil
}
20 changes: 14 additions & 6 deletions jobs/async_destroy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jobs

import (
"encoding/json"
"fmt"
"strings"
"time"
Expand All @@ -10,11 +11,11 @@ import (
)

type AsyncDestroyJob struct {
Headers []kafka.Header
Tenant int64
WaitSeconds int
Model string
Id int64
Headers []kafka.Header `json:"headers"`
Tenant int64 `json:"tenant"`
WaitSeconds int `json:"wait_seconds"`
Model string `json:"model"`
Id int64 `json:"id"`
}

func (ad AsyncDestroyJob) Delay() time.Duration {
Expand All @@ -40,7 +41,6 @@ func (ad AsyncDestroyJob) Run() error {
if err != nil {
return err
}

case "application":
err := service.DeleteCascade(&ad.Tenant, "Application", ad.Id, ad.Headers)
if err != nil {
Expand All @@ -52,3 +52,11 @@ func (ad AsyncDestroyJob) Run() error {

return nil
}

func (ad AsyncDestroyJob) ToJSON() []byte {
bytes, err := json.Marshal(&ad)
if err != nil {
panic(err)
}
return bytes
}
67 changes: 67 additions & 0 deletions jobs/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package jobs

import (
"context"
"encoding/json"
"fmt"

l "github.com/RedHatInsights/sources-api-go/logger"
"github.com/RedHatInsights/sources-api-go/redis"
)

type JobRequest struct {
JobName string
JobRaw []byte
Job Job
}

// implementing binary mashaler/unmarshaler interfaces for redis encoding/decoding.
func (jr JobRequest) MarshalBinary() (data []byte, err error) {
return json.Marshal(&jr)
}
func (jr *JobRequest) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, jr)
}

func (jr *JobRequest) Parse() error {
switch jr.JobName {
case "SuperkeyDestroyJob":
sdj := SuperkeyDestroyJob{}
err := json.Unmarshal([]byte(jr.JobRaw), &sdj)
if err != nil {
return err
}

jr.Job = &sdj
case "AsyncDestroyJob":
adj := AsyncDestroyJob{}
err := json.Unmarshal(jr.JobRaw, &adj)
if err != nil {
return err
}

jr.Job = &adj
default:
l.Log.Warnf("Unsupported job: %v", jr.JobName)
return fmt.Errorf("unsupported job %v", jr.JobName)
}

l.Log.Debugf("Successfully parsed job %v, args %v", jr.Job.Name(), jr.Job.Arguments())

return nil
}

// Throws a `job` on the redis list to be picked up by the worker
func Enqueue(j Job) {
l.Log.Infof("Submitting job %v to redis with %v", j.Name(), j.Arguments())

req := JobRequest{
JobName: j.Name(),
JobRaw: j.ToJSON(),
}

err := redis.Client.RPush(context.Background(), workQueue, req).Err()
if err != nil {
l.Log.Warnf("Failed to submit job: %v", err)
}
}
19 changes: 14 additions & 5 deletions jobs/superkey.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jobs

import (
"encoding/json"
"fmt"
"time"

Expand All @@ -13,11 +14,11 @@ import (
)

type SuperkeyDestroyJob struct {
Headers []kafka.Header
Identity string
Tenant int64
Model string
Id int64
Headers []kafka.Header `json:"headers"`
Identity string `json:"identity"`
Tenant int64 `json:"tenant"`
Model string `json:"model"`
Id int64 `json:"id"`
}

func (sk SuperkeyDestroyJob) Delay() time.Duration {
Expand Down Expand Up @@ -107,3 +108,11 @@ func (sk SuperkeyDestroyJob) sendForApplication(id int64) error {

return nil
}

func (sk SuperkeyDestroyJob) ToJSON() []byte {
bytes, err := json.Marshal(&sk)
if err != nil {
panic(err)
}
return bytes
}
4 changes: 4 additions & 0 deletions jobs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import "time"
3. Name, for pretty logging
4. Run, what do we do?
5. ToJSON, serialize the job into a byte array for sending off to redis
*/
type Job interface {
// how long to wait until performing (just do a sleep)
Expand All @@ -24,4 +26,6 @@ type Job interface {
Name() string
// run the job, using any args on the struct
Run() error
// serialize the job into JSON
ToJSON() []byte
}
Loading

0 comments on commit 2b8b93b

Please sign in to comment.