Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize retry state over created for same singleton key in stately fetch #536

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -504,15 +504,19 @@ function insertVersion (schema, version) {

function fetchNextJob (schema) {
return ({ includeMetadata, priority = true } = {}) => `
WITH next as (
SELECT id
WITH next_pool as (
SELECT id, singleton_key, name
FROM ${schema}.job
WHERE name = $1
AND state < '${JOB_STATES.active}'
AND start_after < now()
ORDER BY ${priority ? 'priority desc, ' : ''}created_on, id
ORDER BY ${priority ? 'priority desc, ' : ''}state desc, created_on, id
LIMIT $2
FOR UPDATE SKIP LOCKED
),
next as (
SELECT DISTINCT ON (CASE WHEN q.policy = 'stately' THEN np.singleton_key ELSE np.id::TEXT END) np.id
FROM next_pool np JOIN ${schema}.queue q ON np.name = q.name
)
UPDATE ${schema}.job j SET
state = '${JOB_STATES.active}',
Expand Down
41 changes: 41 additions & 0 deletions test/queueTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,47 @@ describe('queues', function () {
assert(!blockedSecondActive)
})

it('stately policy prioritizes job in retry state over created with the same singleton key', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, noDefault: true })
const queue = this.test.bossConfig.schema

await boss.createQueue(queue, { policy: 'stately' })

const jobId1 = await boss.send(queue, null, { retryLimit: 1, singletonKey: 'single' })

let [job1] = await boss.fetch(queue)

await boss.fail(queue, job1.id)

job1 = await boss.getJobById(queue, jobId1)

assert.strictEqual(job1.state, 'retry')

const jobId2 = await boss.send(queue, null, { retryLimit: 1, singletonKey: 'single' })
const jobId3 = await boss.send(queue, null, { retryLimit: 1, singletonKey: 'single2' })

let job2 = await boss.getJobById(queue, jobId2)
let job3 = await boss.getJobById(queue, jobId3)

assert.strictEqual(job2.state, 'created')
assert.strictEqual(job3.state, 'created')

const jobs = await boss.fetch(queue, { batchSize: 3 })

job1 = await boss.getJobById(queue, jobId1)
job2 = await boss.getJobById(queue, jobId2)
job3 = await boss.getJobById(queue, jobId3)

const fetchedJobIds = jobs.map(j => j.id)

assert.strictEqual(jobs.length, 2)
assert(fetchedJobIds.includes(jobId1))
assert(fetchedJobIds.includes(jobId3))
assert.strictEqual(job1.state, 'active')
assert.strictEqual(job2.state, 'created')
assert.strictEqual(job3.state, 'active')
})

it('stately policy fails a job without retry when others are active', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, noDefault: true })
const queue = this.test.bossConfig.schema
Expand Down