diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index e153ca3..cb27911 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -67,7 +67,7 @@ type PgBackend struct { config *config.Config logger logging.Logger cron *cron.Cron - mu *sync.Mutex // mutex to protect mutating state on a pgWorker + mu *sync.RWMutex // mutex to protect mutating state on a pgWorker pool *pgxpool.Pool futureJobs map[string]time.Time // map of future job IDs to their due time handlers map[string]handler.Handler // a map of queue names to queue handlers @@ -100,7 +100,7 @@ type PgBackend struct { // postgres://worker:secret@workerdb.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10 func Backend(ctx context.Context, opts ...config.Option) (pb types.Backend, err error) { p := &PgBackend{ - mu: &sync.Mutex{}, + mu: &sync.RWMutex{}, config: config.New(), handlers: make(map[string]handler.Handler), futureJobs: make(map[string]time.Time), @@ -443,6 +443,7 @@ func (p *PgBackend) SetLogger(logger logging.Logger) { p.logger = logger } +// Shutdown shuts this backend down func (p *PgBackend) Shutdown(ctx context.Context) { for queue := range p.handlers { p.announceJob(ctx, queue, shutdownJobID) @@ -551,13 +552,10 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { } var runAfter time.Time - if job.Retries > 0 && status == internal.JobStatusFailed { + if status == internal.JobStatusFailed { runAfter = internal.CalculateBackoff(job.Retries) qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4, run_after = $5 WHERE id = $6" _, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, runAfter, job.ID) - } else if job.Retries > 0 && status != internal.JobStatusFailed { - qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3, retries = $4 WHERE id = $5" - _, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.Retries, job.ID) } else { qstr := "UPDATE neoq_jobs SET ran_at = $1, error = $2, status = $3 WHERE id = $4" _, err = tx.Exec(ctx, qstr, time.Now(), errMsg, status, job.ID) @@ -629,15 +627,6 @@ func (p *PgBackend) start(ctx context.Context, queue string) (err error) { return nil } -// removeFutureJob removes a future job from the in-memory list of jobs that will execute in the future -func (p *PgBackend) removeFutureJob(jobID string) { - if _, ok := p.futureJobs[jobID]; ok { - p.mu.Lock() - delete(p.futureJobs, jobID) - p.mu.Unlock() - } -} - // initFutureJobs is intended to be run once to initialize the list of future jobs that must be monitored for // execution. it should be run only during system startup. func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) { @@ -671,10 +660,11 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { for { // loop over list of future jobs, scheduling goroutines to wait for jobs that are due within the next 30 seconds + p.mu.Lock() for jobID, runAfter := range p.futureJobs { timeUntillRunAfter := time.Until(runAfter) if timeUntillRunAfter <= p.config.FutureJobWindow { - p.removeFutureJob(jobID) + delete(p.futureJobs, jobID) go func(jid string) { scheduleCh := time.After(timeUntillRunAfter) <-scheduleCh @@ -682,6 +672,7 @@ func (p *PgBackend) scheduleFutureJobs(ctx context.Context, queue string) { }(jobID) } } + p.mu.Unlock() select { case <-ticker.C: diff --git a/handler/handler.go b/handler/handler.go index a67eb95..d8c3bda 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -4,7 +4,10 @@ import ( "context" "errors" "fmt" + "log" "runtime" + "runtime/debug" + "strings" "time" ) @@ -93,8 +96,28 @@ func Exec(ctx context.Context, handler Handler) (err error) { var errCh = make(chan error, 1) var done = make(chan bool) go func(ctx context.Context) { + defer func() { + if x := recover(); x != nil { + log.Printf("recovering from a panic in the job handler:\n%s", string(debug.Stack())) + _, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself) + if ok && strings.Contains(file, "runtime/") { + // The panic came from the runtime, most likely due to incorrect + // map/slice usage. The parent frame should have the real trigger. + _, file, line, ok = runtime.Caller(2) //nolint: gomnd + } + + // Include the file and line number info in the error, if runtime.Caller returned ok. + if ok { + errCh <- fmt.Errorf("panic [%s:%d]: %v", file, line, x) // nolint: goerr113 + } else { + errCh <- fmt.Errorf("panic: %v", x) // nolint: goerr113 + } + } + + done <- true + }() + errCh <- handler.Handle(ctx) - done <- true }(ctx) select { @@ -115,5 +138,5 @@ func Exec(ctx context.Context, handler Handler) (err error) { } } - return + return err }