diff --git a/src/plans.js b/src/plans.js index 8489ff8..07f2501 100644 --- a/src/plans.js +++ b/src/plans.js @@ -504,15 +504,19 @@ function insertVersion (schema, version) { function fetchNextJob (schema) { return ({ includeMetadata, priority = true } = {}) => ` - WITH next as ( - SELECT id + WITH next_pool as ( + SELECT id, singleton_key, name FROM ${schema}.job WHERE name = $1 AND state < '${JOB_STATES.active}' AND start_after < now() - ORDER BY ${priority ? 'priority desc, ' : ''}created_on, id + ORDER BY ${priority ? 'priority desc, ' : ''}state desc, created_on, id LIMIT $2 FOR UPDATE SKIP LOCKED + ), + next as ( + SELECT DISTINCT ON (CASE WHEN q.policy = 'stately' THEN np.singleton_key ELSE np.id::TEXT END) np.id + FROM next_pool np JOIN ${schema}.queue q ON np.name = q.name ) UPDATE ${schema}.job j SET state = '${JOB_STATES.active}', diff --git a/test/queueTest.js b/test/queueTest.js index ce1bdfa..5c5f280 100644 --- a/test/queueTest.js +++ b/test/queueTest.js @@ -350,6 +350,47 @@ describe('queues', function () { assert(!blockedSecondActive) }) + it('stately policy prioritizes job in retry state over created with the same singleton key', async function () { + const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, noDefault: true }) + const queue = this.test.bossConfig.schema + + await boss.createQueue(queue, { policy: 'stately' }) + + const jobId1 = await boss.send(queue, null, { retryLimit: 1, singletonKey: 'single' }) + + let [job1] = await boss.fetch(queue) + + await boss.fail(queue, job1.id) + + job1 = await boss.getJobById(queue, jobId1) + + assert.strictEqual(job1.state, 'retry') + + const jobId2 = await boss.send(queue, null, { retryLimit: 1, singletonKey: 'single' }) + const jobId3 = await boss.send(queue, null, { retryLimit: 1, singletonKey: 'single2' }) + + let job2 = await boss.getJobById(queue, jobId2) + let job3 = await boss.getJobById(queue, jobId3) + + assert.strictEqual(job2.state, 'created') + assert.strictEqual(job3.state, 'created') + + const jobs = await boss.fetch(queue, { batchSize: 3 }) + + job1 = await boss.getJobById(queue, jobId1) + job2 = await boss.getJobById(queue, jobId2) + job3 = await boss.getJobById(queue, jobId3) + + const fetchedJobIds = jobs.map(j => j.id) + + assert.strictEqual(jobs.length, 2) + assert(fetchedJobIds.includes(jobId1)) + assert(fetchedJobIds.includes(jobId3)) + assert.strictEqual(job1.state, 'active') + assert.strictEqual(job2.state, 'created') + assert.strictEqual(job3.state, 'active') + }) + it('stately policy fails a job without retry when others are active', async function () { const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, noDefault: true }) const queue = this.test.bossConfig.schema