Skip to content

Commit

Permalink
Fix retries in the event of handler panic (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
acaloiaro authored Apr 29, 2023
1 parent 71dd54b commit 106ab0f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
23 changes: 7 additions & 16 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type PgBackend struct {
config *config.Config
logger logging.Logger
cron *cron.Cron
mu *sync.Mutex // mutex to protect mutating state on a pgWorker
mu *sync.RWMutex // mutex to protect mutating state on a pgWorker
pool *pgxpool.Pool
futureJobs map[string]time.Time // map of future job IDs to their due time
handlers map[string]handler.Handler // a map of queue names to queue handlers
Expand Down Expand Up @@ -100,7 +100,7 @@ type PgBackend struct {
// postgres://worker:[email protected]:5432/mydb?sslmode=verify-ca&pool_max_conns=10
func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err error) {
p := &PgBackend{
mu: &sync.Mutex{},
mu: &sync.RWMutex{},
config: config.New(),
handlers: make(map[string]handler.Handler),
futureJobs: make(map[string]time.Time),
Expand Down Expand Up @@ -443,6 +443,7 @@ func (p *PgBackend) SetLogger(logger logging.Logger) {
p.logger = logger
}

// Shutdown shuts this backend down
func (p *PgBackend) Shutdown(ctx context.Context) {
for queue := range p.handlers {
p.announceJob(ctx, queue, shutdownJobID)
Expand Down Expand Up @@ -551,13 +552,10 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
}

var runAfter time.Time
if job.Retries > 0 && status == internal.JobStatusFailed {
if status == internal.JobStatusFailed {
runAfter = internal.CalculateBackoff(job.Retries)
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4, run_after = $5 WHERE id = $6"
_, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, runAfter, job.ID)
} else if job.Retries > 0 && status != internal.JobStatusFailed {
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4 WHERE id = $5"
_, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, job.ID)
} else {
qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4"
_, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.ID)
Expand Down Expand Up @@ -629,15 +627,6 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
return nil
}

// removeFutureJob removes a future job from the in-memory list of jobs that will execute in the future
func (p *PgBackend) removeFutureJob(jobID string) {
if _, ok := p.futureJobs[jobID]; ok {
p.mu.Lock()
delete(p.futureJobs, jobID)
p.mu.Unlock()
}
}

// initFutureJobs is intended to be run once to initialize the list of future jobs that must be monitored for
// execution. it should be run only during system startup.
func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) {
Expand Down Expand Up @@ -671,17 +660,19 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) {

for {
// loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds
p.mu.Lock()
for jobID, runAfter := range p.futureJobs {
timeUntillRunAfter := time.Until(runAfter)
if timeUntillRunAfter <= p.config.FutureJobWindow {
p.removeFutureJob(jobID)
delete(p.futureJobs, jobID)
go func(jid string) {
scheduleCh := time.After(timeUntillRunAfter)
<-scheduleCh
p.announceJob(ctx, queue, jid)
}(jobID)
}
}
p.mu.Unlock()

select {
case <-ticker.C:
Expand Down
27 changes: 25 additions & 2 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"log"
"runtime"
"runtime/debug"
"strings"
"time"
)

Expand Down Expand Up @@ -93,8 +96,28 @@ func Exec(ctx context.Context, handler Handler) (err error) {
var errCh = make(chan error, 1)
var done = make(chan bool)
go func(ctx context.Context) {
defer func() {
if x := recover(); x != nil {
log.Printf("recovering from a panic in the job handler:\n%s", string(debug.Stack()))
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
if ok && strings.Contains(file, "runtime/") {
// The panic came from the runtime, most likely due to incorrect
// map/slice usage. The parent frame should have the real trigger.
_, file, line, ok = runtime.Caller(2) //nolint: gomnd
}

// Include the file and line number info in the error, if runtime.Caller returned ok.
if ok {
errCh <- fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113
} else {
errCh <- fmt.Errorf("panic: %v", x) // nolint: goerr113
}
}

done <- true
}()

errCh <- handler.Handle(ctx)
done <- true
}(ctx)

select {
Expand All @@ -115,5 +138,5 @@ func Exec(ctx context.Context, handler Handler) (err error) {
}
}

return
return err
}

0 comments on commit 106ab0f

Please sign in to comment.