Skip to content

Commit

Permalink
Add -replacing flag
Browse files Browse the repository at this point in the history
Co-authored-by: tyranron <[email protected]>
  • Loading branch information
quite4work and tyranron committed Oct 8, 2021
1 parent bd3db51 commit f725b6a
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 19 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ You can optionally disable this behavior and allow overlapping instances of
your jobs by passing the `-overlapping` flag to Supercronic. Supercronic will
still warn about jobs falling behind, but will run duplicate instances of them.

If you pass `-replacing` flag and it's time for a new job iteration to run,
Supercronic will kill the previous job process if it hasn't finished yet.

## Reload crontab

Expand Down
37 changes: 30 additions & 7 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func startReaderDrain(wg *sync.WaitGroup, readerLogger *logrus.Entry, reader io.
}()
}

func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool) error {
func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, passthroughLogs bool, nextRun time.Time, replacing bool) error {
jobLogger.Info("starting")

cmd := exec.Command(cronCtx.Shell, "-c", command)
Expand Down Expand Up @@ -101,6 +101,22 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p
return err
}

if replacing {
ctx, cancel := context.WithDeadline(context.Background(), nextRun)
defer cancel()
go func(pid int) {
// Kill command and its sub-processes once the deadline is exceeded.
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
// Negative number tells to kill the whole process group.
// By convention PGID of process group equals to the PID of the
// group leader, so the command process is the first member of
// the process group and is the group leader.
syscall.Kill(-pid, syscall.SIGKILL)
}
}(cmd.Process.Pid)
}

var wg sync.WaitGroup

if stdout != nil {
Expand All @@ -122,7 +138,7 @@ func runJob(cronCtx *crontab.Context, command string, jobLogger *logrus.Entry, p
return nil
}

func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *logrus.Entry, overlapping bool, replacing bool, promMetrics *prometheus_metrics.PrometheusMetrics) {
t := t0

for {
Expand All @@ -134,6 +150,9 @@ func monitorJob(ctx context.Context, job *crontab.Job, t0 time.Time, jobLogger *
if overlapping {
m = "overlapping jobs"
}
if replacing {
m = "replacing job"
}

jobLogger.Warnf("%s: job is still running since %s (%s elapsed)", m, t0, t.Sub(t0))

Expand All @@ -149,9 +168,10 @@ func startFunc(
exitCtx context.Context,
logger *logrus.Entry,
overlapping bool,
replacing bool,
expression crontab.Expression,
timezone *time.Location,
fn func(time.Time, *logrus.Entry),
fn func(time.Time, *logrus.Entry, bool),
) {
wg.Add(1)

Expand Down Expand Up @@ -200,7 +220,7 @@ func startFunc(
"iteration": cronIteration,
})

fn(nextRun, jobLogger)
fn(nextRun, jobLogger, replacing)
}

if overlapping {
Expand All @@ -221,10 +241,11 @@ func StartJob(
exitCtx context.Context,
cronLogger *logrus.Entry,
overlapping bool,
replacing bool,
passthroughLogs bool,
promMetrics *prometheus_metrics.PrometheusMetrics,
) {
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry) {
runThisJob := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
promMetrics.CronsCurrentlyRunningGauge.With(jobPromLabels(job)).Inc()

defer func() {
Expand All @@ -234,15 +255,16 @@ func StartJob(
monitorCtx, cancelMonitor := context.WithCancel(context.Background())
defer cancelMonitor()

go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, promMetrics)
go monitorJob(monitorCtx, job, t0, jobLogger, overlapping, replacing, promMetrics)

timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
promMetrics.CronsExecutionTimeHistogram.With(jobPromLabels(job)).Observe(v)
}))

defer timer.ObserveDuration()

err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs)
nextRun := job.Expression.Next(t0)
err := runJob(cronCtx, job.Command, jobLogger, passthroughLogs, nextRun, replacing)

promMetrics.CronsExecCounter.With(jobPromLabels(job)).Inc()

Expand All @@ -262,6 +284,7 @@ func StartJob(
exitCtx,
cronLogger,
overlapping,
replacing,
job.Expression,
cronCtx.Timezone,
runThisJob,
Expand Down
78 changes: 67 additions & 11 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestRunJob(t *testing.T) {
label := fmt.Sprintf("RunJob(%q)", tt.command)
logger, channel := newTestLogger()

err := runJob(tt.context, tt.command, logger, false)
err := runJob(tt.context, tt.command, logger, false, time.Now(), false)
if tt.success {
assert.Nil(t, err, label)
} else {
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestStartJobExitsOnRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS)
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS)

wg.Wait()
}
Expand All @@ -218,7 +218,7 @@ func TestStartJobRunsJob(t *testing.T) {

logger, channel := newTestLogger()

StartJob(&wg, &basicContext, &job, ctx, logger, false, false, &PROM_METRICS)
StartJob(&wg, &basicContext, &job, ctx, logger, false, false, false, &PROM_METRICS)

select {
case entry := <-channel:
Expand Down Expand Up @@ -266,6 +266,62 @@ func TestStartJobRunsJob(t *testing.T) {
wg.Wait()
}

func TestStartJobReplacesPreviousJobs(t *testing.T) {
job := crontab.Job{
CrontabLine: crontab.CrontabLine{
Expression: &testExpression{2 * time.Second},
Schedule: "always!",
Command: "sleep 100",
},
Position: 1,
}

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

logger, channel := newTestLogger()

StartJob(&wg, &basicContext, &job, ctx, logger, false, true, false, &PROM_METRICS)

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for schedule")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("starting"), entry.Message)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for start")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("replacing job"), entry.Message)
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for job replace warning")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("killed"), entry.Message)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for job kill")
}

select {
case entry := <-channel:
assert.Regexp(t, regexp.MustCompile("job will run next"), entry.Message)
case <-time.After(time.Second):
t.Fatalf("timed out waiting for schedule of the second job iteration")
}

cancel()
wg.Wait()
}

func TestStartFuncWaitsForCompletion(t *testing.T) {
// We use startFunc to start a function, wait for it to start, then
// tell the whole thing to exit, and verify that it waits for the
Expand All @@ -281,12 +337,12 @@ func TestStartFuncWaitsForCompletion(t *testing.T) {
ctxStep1, step1Done := context.WithCancel(context.Background())
ctxStep2, step2Done := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
step1Done()
<-ctxStep2.Done()
}

startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn)
startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn)
go func() {
wg.Wait()
allDone()
Expand Down Expand Up @@ -329,12 +385,12 @@ func TestStartFuncDoesNotRunOverlappingJobs(t *testing.T) {
ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
ctxAllDone, allDone := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
testChan <- nil
<-ctxAllDone.Done()
}

startFunc(&wg, ctxStartFunc, logger, false, expr, time.Local, testFn)
startFunc(&wg, ctxStartFunc, logger, false, false, expr, time.Local, testFn)

select {
case <-testChan:
Expand Down Expand Up @@ -368,12 +424,12 @@ func TestStartFuncRunsOverlappingJobs(t *testing.T) {
ctxStartFunc, cancelStartFunc := context.WithCancel(context.Background())
ctxAllDone, allDone := context.WithCancel(context.Background())

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
testChan <- nil
<-ctxAllDone.Done()
}

startFunc(&wg, ctxStartFunc, logger, true, expr, time.Local, testFn)
startFunc(&wg, ctxStartFunc, logger, true, false, expr, time.Local, testFn)

for i := 0; i < 5; i++ {
select {
Expand Down Expand Up @@ -406,7 +462,7 @@ func TestStartFuncUsesTz(t *testing.T) {

it := 0

testFn := func(t0 time.Time, jobLogger *logrus.Entry) {
testFn := func(t0 time.Time, jobLogger *logrus.Entry, replacing bool) {
testChan <- t0.Location()
it += 1

Expand All @@ -422,7 +478,7 @@ func TestStartFuncUsesTz(t *testing.T) {
}
}

startFunc(&wg, ctxStartFunc, logger, false, expr, loc, testFn)
startFunc(&wg, ctxStartFunc, logger, false, false, expr, loc, testFn)

for i := 0; i < 5; i++ {
select {
Expand Down
5 changes: 5 additions & 0 deletions integration/test.bats
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ wait_for() {
[[ "$n" -ge 4 ]]
}

@test "it runs replacing jobs" {
n="$(SUPERCRONIC_ARGS="-replacing" run_supercronic "${BATS_TEST_DIRNAME}/timeout.crontab" 5s | grep -iE "killed" | wc -l)"
[[ "$n" -ge 3 ]]
}

@test "it supports debug logging " {
SUPERCRONIC_ARGS="-debug" run_supercronic "${BATS_TEST_DIRNAME}/hello.crontab" | grep -iE "debug"
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {
sentry := flag.String("sentry-dsn", "", "enable Sentry error logging, using provided DSN")
sentryAlias := flag.String("sentryDsn", "", "alias for sentry-dsn")
overlapping := flag.Bool("overlapping", false, "enable tasks overlapping")
replacing := flag.Bool("replacing", false, "enable tasks replacing")
flag.Parse()

var sentryDsn string
Expand Down Expand Up @@ -147,7 +148,7 @@ func main() {
"job.position": job.Position,
})

cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *passthroughLogs, &promMetrics)
cron.StartJob(&wg, tab.Context, job, exitCtx, cronLogger, *overlapping, *replacing, *passthroughLogs, &promMetrics)
}

termChan := make(chan os.Signal, 1)
Expand Down

0 comments on commit f725b6a

Please sign in to comment.