Skip to content

Commit

Permalink
Make overriding an optional on enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
pconstantinou committed Dec 22, 2023
1 parent 2344b13 commit d5bc4d1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
9 changes: 7 additions & 2 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, e
}

// Enqueue queues jobs to be executed asynchronously
func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
}

var queueChan chan *jobs.Job
var qc any
var ok bool
Expand Down Expand Up @@ -102,7 +107,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
}

// if the job fingerprint is already known, don't queue the job
if _, found := m.fingerprints.Load(job.Fingerprint); found && !job.Override {
if _, found := m.fingerprints.Load(job.Fingerprint); found && !options.Override {
return jobs.DuplicateJobID, nil
} else {
m.fingerprints.Store(job.Fingerprint, job)
Expand Down
13 changes: 8 additions & 5 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,11 @@ func (p *PgBackend) initializeDB() (err error) {
}

// Enqueue adds jobs to the specified queue
func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
}
if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
Expand All @@ -392,7 +396,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
// Rollback is safe to call even if the tx is already closed, so if
// the tx commits successfully, this is a no-op
defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // rollback has no effect if the transaction has been committed
jobID, err = p.enqueueJob(ctx, tx, job)
jobID, err = p.enqueueJob(ctx, tx, job, options)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
Expand Down Expand Up @@ -532,17 +536,16 @@ func (p *PgBackend) Shutdown(ctx context.Context) {
//
// Jobs that are not already fingerprinted are fingerprinted before being added
// Duplicate jobs are not added to the queue. Any two unprocessed jobs with the same fingerprint are duplicates
func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (jobID string, err error) {
func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job, options neoq.JobOptions) (jobID string, err error) {
err = jobs.FingerprintJob(j)
if err != nil {
return
}
p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue))
if !j.Override {
if !options.Override {
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)

} else {
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (fingerprint) SET
Expand Down
7 changes: 6 additions & 1 deletion backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,12 @@ func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption {
}

// Enqueue queues jobs to be executed asynchronously
func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job, jobOptions ...neoq.JobOption) (jobID string, err error) {
options := neoq.JobOptions{}
for _, opt := range jobOptions {
opt(&options)
}

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
Expand Down
1 change: 0 additions & 1 deletion jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type Job struct {
Retries int `db:"retries"` // The number of times the job has retried
MaxRetries *int `db:"max_retries"` // The maximum number of times the job can retry
CreatedAt time.Time `db:"created_at"` // The time the job was created
Override bool // If a matching Fingerprint should replace the existing one resetting the retrues and created at
}

// FingerprintJob fingerprints jobs as an md5 hash of its queue combined with its JSON-serialized payload
Expand Down

0 comments on commit d5bc4d1

Please sign in to comment.