Skip to content

Commit

Permalink
Allow passing Kubernetes resource requirements to jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 committed Dec 22, 2023
1 parent d6080a0 commit 83178e7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 17 deletions.
2 changes: 1 addition & 1 deletion config/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ failed_jobs_retention_time = "30d"
# Kubernetes API optionally supports definining resource limits and requests on
# a per job type basis. Example:
#[jobs.kubernetes]
#jobs_resource_requirements = '{"transcribing": {"limits": {"cpu": "2000m"}}, "recording": {"limits": {"cpu": "500"}}}'
#jobs_resource_requirements = '{"transcribing":{"limits":{"cpu":"4000m"},"requests":{"cpu":"2000m"}},"recording":{"limits":{"cpu":"2000m"},"requests":{"cpu":"1000m"}}}'

[logger]
# A boolean controlling whether to log to the console.
Expand Down
2 changes: 2 additions & 0 deletions public/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
const (
MinSupportedRecorderVersion = "0.6.0"
MinSupportedTranscriberVersion = "0.1.0"
RecordingJobPrefix = "calls-recorder"
TranscribingJobPrefix = "calls-transcriber"
)

var (
Expand Down
54 changes: 54 additions & 0 deletions service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
package service

import (
"encoding/json"
"os"
"testing"
"time"

"github.com/mattermost/calls-offloader/public/job"
"github.com/mattermost/calls-offloader/service/kubernetes"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -97,4 +104,51 @@ func TestParseFromEnv(t *testing.T) {
require.NoError(t, err)
require.Equal(t, JobAPITypeDocker, cfg.Jobs.APIType)
})

t.Run("kubernetes.JobsResourceRequirements", func(t *testing.T) {
requirements := make(kubernetes.JobsResourceRequirements)

t.Run("empty", func(t *testing.T) {
js, err := json.Marshal(requirements)
require.NoError(t, err)

os.Setenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS", string(js))
defer os.Unsetenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS")

var cfg Config
err = cfg.ParseFromEnv()
require.NoError(t, err)
require.Equal(t, requirements, cfg.Jobs.Kubernetes.JobsResourceRequirements)
})

t.Run("defined", func(t *testing.T) {
requirements[job.TypeRecording] = corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
Requests: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
}
requirements[job.TypeTranscribing] = corev1.ResourceRequirements{
Limits: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
Requests: corev1.ResourceList{
"cpu": resource.MustParse("1"),
},
}

js, err := json.Marshal(requirements)
require.NoError(t, err)

os.Setenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS", string(js))
defer os.Unsetenv("JOBS_KUBERNETES_JOBSRESOURCEREQUIREMENTS")

var cfg Config
err = cfg.ParseFromEnv()
require.NoError(t, err)
require.Equal(t, requirements, cfg.Jobs.Kubernetes.JobsResourceRequirements)
})
})
}
9 changes: 2 additions & 7 deletions service/docker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ const (
dockerVolumePath = "/data"
)

const (
recordingJobPrefix = "calls-recorder"
transcribingJobPrefix = "calls-transcriber"
)

var (
dockerStopTimeout = 5 * time.Minute
dockerRetentionJobInterval = time.Minute
Expand Down Expand Up @@ -292,13 +287,13 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
var jobData recorder.RecorderConfig
jobData.FromMap(cfg.InputData)
jobData.SiteURL = getSiteURLForJob(jobData.SiteURL)
jobPrefix = recordingJobPrefix
jobPrefix = job.RecordingJobPrefix
env = append(env, jobData.ToEnv()...)
case job.TypeTranscribing:
var jobData transcriber.CallTranscriberConfig
jobData.FromMap(cfg.InputData)
jobData.SiteURL = getSiteURLForJob(jobData.SiteURL)
jobPrefix = transcribingJobPrefix
jobPrefix = job.TranscribingJobPrefix
env = append(env, jobData.ToEnv()...)
}

Expand Down
13 changes: 4 additions & 9 deletions service/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,11 @@ const (
k8sVolumePath = "/data"
)

const (
recordingJobPrefix = "calls-recorder"
transcribingJobPrefix = "calls-transcriber"
)
// Type alias and custom decoders to support passing JSON from both TOML config and env
// variable.

type JobsResourceRequirements map[job.Type]corev1.ResourceRequirements

// Custom decoders to support passing JSON from both TOML config and env
// variable.

func (r *JobsResourceRequirements) Decode(data string) error {
return yaml.NewYAMLOrJSONDecoder(bytes.NewBuffer([]byte(data)), 0).Decode(r)
}
Expand Down Expand Up @@ -166,7 +161,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
jobCfg.FromMap(cfg.InputData)
jobCfg.SetDefaults()
jobCfg.SiteURL = getSiteURLForJob(jobCfg.SiteURL)
jobPrefix = recordingJobPrefix
jobPrefix = job.RecordingJobPrefix
jobID = jobPrefix + "-job-" + random.NewID()
env = append(env, getEnvFromJobConfig(jobCfg)...)
initContainers = []corev1.Container{
Expand All @@ -191,7 +186,7 @@ func (s *JobService) CreateJob(cfg job.Config, onStopCb job.StopCb) (job.Job, er
jobCfg.FromMap(cfg.InputData)
jobCfg.SetDefaults()
jobCfg.SiteURL = getSiteURLForJob(jobCfg.SiteURL)
jobPrefix = transcribingJobPrefix
jobPrefix = job.TranscribingJobPrefix
jobID = jobPrefix + "-job-" + random.NewID()
env = append(env, getEnvFromJobConfig(jobCfg)...)
}
Expand Down

0 comments on commit 83178e7

Please sign in to comment.