-
Notifications
You must be signed in to change notification settings - Fork 0
/
test-bull-worker-reconnect.js
103 lines (86 loc) · 2.18 KB
/
test-bull-worker-reconnect.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
const { Queue, Worker } = require('bullmq');
const Redis = require('ioredis');
const queueName = 'test';
async function start() {
const queue = new Queue(queueName, {
connection: {
host: 'localhost',
port: 6379,
// a warning is thrown on redis startup if these aren't added
enableReadyCheck: false,
enableOfflineQueue: false,
maxRetriesPerRequest: null,
},
});
queue.on('error', (error) => {
console.error(error);
});
setInterval(() => {
queue
.getWorkers()
.then((workers) => {
console.warn(`Number of workers: ${workers.length}`);
})
.catch((error) => {
console.error(error);
});
queue
.getJobCounts()
.then((numberOfJobs) => {
console.warn(`Number of jobs: ${JSON.stringify(numberOfJobs)}`);
})
.catch((error) => {
console.error(error);
});
}, 10_000);
const job = await queue.add('__default__', null, {
jobId: queueName + '-cron-worker-job',
repeat: {
every: 3000, // every 3 seconds
},
data: {
foo: 'bar',
},
});
const processFn = async (job) => {
console.log(`Processing job ${job.id} with data ${job.data}`);
console.log(`-> ${job.id}`);
await new Promise((res) => setTimeout(res, 1000));
console.log(`\t<- ${job.id}`);
};
const workerConnection = new Redis({
host: 'localhost',
port: 6379,
// a warning is thrown on redis startup if these aren't added
enableReadyCheck: false,
maxRetriesPerRequest: null,
enableOfflineQueue: true,
});
const worker = new Worker(queueName, processFn, {
connection: workerConnection,
blockingConnection: true,
});
workerConnection.on('error', (err) => {
console.error(err);
});
workerConnection.on('connect', () => {
console.warn('Worker connected to redis again ...');
console.warn(`Paused: ${worker.isPaused()}, Running: ${worker.isRunning()}`);
});
worker.on('error', (err) => {
console.error(err);
});
worker.on('closed', () => {
console.warn('Worker closed');
});
worker.on('ready', () => {
console.warn('Worker is ready!');
});
worker.on('completed', (job) => {
console.log(`Job ${job.id} completed`);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed with ${err.message}`);
});
}
start();