Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds skipStartAfterNow option for fetch #547

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions docs/api/jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Creates a new job and returns the job id.
* **priority**, int

optional priority. Higher numbers have, um, higher priority

* **id**, uuid

optional id. If not set, a uuid will automatically created
Expand Down Expand Up @@ -84,7 +84,7 @@ Available in constructor as a default, or overridden in send.
**Connection options**

* **db**, object

Instead of using pg-boss's default adapter, you can use your own, as long as it implements the following interface (the same as the pg module).

```ts
Expand Down Expand Up @@ -234,6 +234,10 @@ Returns an array of jobs from a queue

If `true`, all job metadata will be returned on the job object.

* `skipStartAfterNow`, bool, *default: false*

If `true`, jobs with a `startAfter` timestamp in the future will be fetched. Useful for fetching jobs immediately without waiting for a retry delay.

```js
interface JobWithMetadata<T = object> {
id: string;
Expand Down Expand Up @@ -284,7 +288,7 @@ await Promise.allSettled(jobs.map(async job => {

Deletes a job by id.

> Job deletion is offered if desired for a "fetch then delete" workflow similar to SQS. This is not the default behavior for workers so "everything just works" by default, including job throttling and debouncing, which requires jobs to exist to enforce a unique constraint. For example, if you are debouncing a queue to "only allow 1 job per hour", deleting jobs after processing would re-open that time slot, breaking your throttling policy.
> Job deletion is offered if desired for a "fetch then delete" workflow similar to SQS. This is not the default behavior for workers so "everything just works" by default, including job throttling and debouncing, which requires jobs to exist to enforce a unique constraint. For example, if you are debouncing a queue to "only allow 1 job per hour", deleting jobs after processing would re-open that time slot, breaking your throttling policy.

### `deleteJob(name, [ids], options)`

Expand All @@ -298,7 +302,7 @@ Cancels a pending or active job.

Cancels a set of pending or active jobs.

When passing an array of ids, it's possible that the operation may partially succeed based on the state of individual jobs requested. Consider this a best-effort attempt.
When passing an array of ids, it's possible that the operation may partially succeed based on the state of individual jobs requested. Consider this a best-effort attempt.

### `resume(name, id, options)`

Expand Down
1 change: 1 addition & 0 deletions src/attorney.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ function checkFetchArgs (name, options) {
assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('priority' in options) || typeof options.priority === 'boolean', 'priority must be a boolean')
assert(!('skipStartAfterNow' in options) || typeof options.skipStartAfterNow === 'boolean', 'skipStartAfterNow must be a boolean')

options.batchSize = options.batchSize || 1
}
Expand Down
26 changes: 13 additions & 13 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ function createTableQueue (schema) {
partition_name text,
created_on timestamp with time zone not null default now(),
updated_on timestamp with time zone not null default now(),
PRIMARY KEY (name)
PRIMARY KEY (name)
)
`
}
Expand Down Expand Up @@ -208,7 +208,7 @@ const allJobColumns = `${baseJobColumns},
retry_count as "retryCount",
retry_delay as "retryDelay",
retry_backoff as "retryBackoff",
start_after as "startAfter",
start_after as "startAfter",
started_on as "startedOn",
singleton_key as "singletonKey",
singleton_on as "singletonOn",
Expand Down Expand Up @@ -263,7 +263,7 @@ function createQueueFunction (schema) {
END IF;

EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name);

EXECUTE format('${formatPartitionCommand(createPrimaryKeyJob(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJob(schema))}', table_name);
EXECUTE format('${formatPartitionCommand(createQueueForeignKeyJobDeadLetter(schema))}', table_name);
Expand Down Expand Up @@ -292,7 +292,7 @@ function deleteQueueFunction (schema) {
$$
DECLARE
table_name varchar;
BEGIN
BEGIN
WITH deleted as (
DELETE FROM ${schema}.queue
WHERE name = queue_name
Expand Down Expand Up @@ -400,7 +400,7 @@ function updateQueue (schema) {

function getQueues (schema) {
return `
SELECT
SELECT
name,
policy,
retry_limit as "retryLimit",
Expand Down Expand Up @@ -503,13 +503,13 @@ function insertVersion (schema, version) {
}

function fetchNextJob (schema) {
return ({ includeMetadata, priority = true } = {}) => `
return ({ includeMetadata, priority = true, skipStartAfterNow = false } = {}) => `
WITH next as (
SELECT id
FROM ${schema}.job
WHERE name = $1
AND state < '${JOB_STATES.active}'
AND start_after < now()
${skipStartAfterNow ? '' : 'AND start_after < now()'}
ORDER BY ${priority ? 'priority desc, ' : ''}created_on, id
LIMIT $2
FOR UPDATE SKIP LOCKED
Expand All @@ -520,7 +520,7 @@ function fetchNextJob (schema) {
retry_count = CASE WHEN started_on IS NOT NULL THEN retry_count + 1 ELSE retry_count END
FROM next
WHERE name = $1 AND j.id = next.id
RETURNING j.${includeMetadata ? allJobColumns : baseJobColumns}
RETURNING j.${includeMetadata ? allJobColumns : baseJobColumns}
`
}

Expand Down Expand Up @@ -615,7 +615,7 @@ function failJobs (schema, where, output) {
END as completed_on,
keep_until,
dead_letter,
policy,
policy,
${output}
FROM deleted_jobs
ON CONFLICT DO NOTHING
Expand Down Expand Up @@ -726,7 +726,7 @@ function deleteJobs (schema) {
with results as (
DELETE FROM ${schema}.job
WHERE name = $1
AND id IN (SELECT UNNEST($2::uuid[]))
AND id IN (SELECT UNNEST($2::uuid[]))
RETURNING 1
)
SELECT COUNT(*) from results
Expand Down Expand Up @@ -813,7 +813,7 @@ function insertJob (schema) {
function insertJobs (schema) {
return `
WITH defaults as (
SELECT
SELECT
$2 as expire_in,
$3 as keep_until,
$4::int as retry_limit,
Expand Down Expand Up @@ -863,7 +863,7 @@ function insertJobs (schema) {
WHEN COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false)
THEN GREATEST(COALESCE("retryDelay", q.retry_delay, defaults.retry_delay), 1)
ELSE COALESCE("retryDelay", q.retry_delay, defaults.retry_delay, 0)
END as retry_delay,
END as retry_delay,
COALESCE("retryBackoff", q.retry_backoff, defaults.retry_backoff, false) as retry_backoff,
q.policy
FROM (
Expand All @@ -886,7 +886,7 @@ function insertJobs (schema) {
"expireInSeconds" integer,
"keepUntil" timestamp with time zone,
"deadLetter" text
)
)
) j
JOIN ${schema}.queue q ON j.name = q.name,
defaults
Expand Down
22 changes: 22 additions & 0 deletions test/fetchTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,26 @@ describe('fetch', function () {
assert(job.startedOn === undefined)
assert.strictEqual(calledCounter, 2)
})

it('should allow fetching jobs that have a start_after in the future', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema

await boss.send(queue, { startAfter: new Date(Date.now() + 1000) })
const db = await helper.getDb()
const sqlStatements = []
const options = {
db: {
async executeSql (sql, values) {
sqlStatements.push(sql)
return db.pool.query(sql, values)
}
}
}

const jobs = await boss.fetch(queue, { ...options, skipStartAfterNow: true })
assert(jobs.length === 1)
assert(sqlStatements.length === 1)
assert(!sqlStatements[0].includes('start_after < now()'))
})
})