Skip to content

Commit

Permalink
feat: make pendings job processing resilient to connection loss
Browse files Browse the repository at this point in the history
Instead of passing a single connection into the pending job processor,
use the connection pool to pull a new connection on every loop, releasing
it after every use.
  • Loading branch information
acaloiaro committed Jan 26, 2025
1 parent 17c86fa commit d220a14
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,7 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err
go p.listenerManager(ctx)

// monitor queues for pending jobs, so neoq is resilient to LISTEN disconnects and reconnections
pendingJobsConn, err := p.pool.Acquire(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get a database connection: %w", err)
}
p.processPendingJobs(ctx, pendingJobsConn)
p.processPendingJobs(ctx)

p.listenConnDown <- true

Expand Down Expand Up @@ -831,18 +827,27 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) {
//
// Past due jobs are fetched on the interval [neoq.DefaultPendingJobFetchInterval]
// nolint: cyclop
func (p *PgBackend) processPendingJobs(ctx context.Context, conn *pgxpool.Conn) {
func (p *PgBackend) processPendingJobs(ctx context.Context) {
go func(ctx context.Context) {
defer conn.Release()
var err error
var conn *pgxpool.Conn
var pendingJobs []*jobs.Job
ticker := time.NewTicker(neoq.DefaultPendingJobFetchInterval)

// check for pending jobs on an interval until the context is canceled
for {
pendingJobs, err := p.getPendingJobs(ctx, conn)
conn, err = p.acquire(ctx)
if err != nil {
p.logger.Error("[pending_jobs] unable to get database connection", slog.Any("error", err))
<-ticker.C
continue
}

pendingJobs, err = p.getPendingJobs(ctx, conn)
conn.Release()
if errors.Is(err, context.Canceled) {
return
}

if err != nil && !errors.Is(err, pgx.ErrNoRows) {
p.logger.Error(
"failed to fetch pending jobs",
Expand Down

0 comments on commit d220a14

Please sign in to comment.