-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.test.ts
162 lines (138 loc) · 4.3 KB
/
index.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import { fail } from 'assert';
import SQS from 'aws-sdk/clients/sqs';
import { AWSError } from 'aws-sdk/lib/error';
import { expect } from 'chai';
import crypto from 'crypto';
import { SQSReader } from '../src';
const { AWS_SQS_QUEUE_URL } = process.env;
if (!AWS_SQS_QUEUE_URL) {
throw new Error('AWS_SQS_QUEUE_URL is not configured');
}
const sqs = new SQS({ apiVersion: '2012-11-05' });
function makeBarrier(count = 2) {
let calls = 0;
let resolve: (value: unknown) => void;
const promise = new Promise((r) => (resolve = r));
return function barrier() {
if (++calls === count) {
resolve(null);
}
return promise;
};
}
describe('SQSReader', function () {
it('receives a message', async function () {
const sentBody = crypto.randomBytes(64).toString('base64');
const barrier = makeBarrier();
let receivedId;
let receivedBody;
const queueReader = new SQSReader(AWS_SQS_QUEUE_URL, async (message) => {
receivedId = message.MessageId;
receivedBody = message.Body;
if (receivedBody === sentBody) {
await barrier();
} else {
// ignore unexpected messages in case there is pending junk in the queue
console.log(`Received unexpected message ${receivedId}: ${receivedBody}`);
}
});
queueReader.start();
const sendResult = await sqs
.sendMessage({
QueueUrl: AWS_SQS_QUEUE_URL,
MessageBody: sentBody,
})
.promise();
const sentId = sendResult.MessageId;
await barrier();
queueReader.stop();
await queueReader.join();
expect(receivedId).to.equal(sentId);
expect(receivedBody).to.equal(sentBody);
});
it('sleeps when idle', async function () {
// wait for two sleep cycles to cover next delay calculation
const barrier = makeBarrier(3);
const logger = Object.assign(Object.create(console), {
verbose(message: string): void {
console.log(message);
if (/sleep/i.test(message)) {
barrier();
}
},
});
const queueReader = new SQSReader(
AWS_SQS_QUEUE_URL,
async (message) => {
// ignore unexpected messages in case there is pending junk in the queue
console.log(`Received unexpected message ${message.MessageId}: ${message.Body}`);
},
{ waitTimeSeconds: 0, initialIdleDelaySeconds: 0, logger }
);
queueReader.start();
// test redundant start
queueReader.start();
await barrier();
queueReader.stop();
await queueReader.join();
});
it('resumes when idle', async function () {
const sleepBarrier = makeBarrier(2);
const logger = Object.assign(Object.create(console), {
verbose(message: string): void {
console.log(message);
if (/sleep/i.test(message)) {
sleepBarrier();
}
},
});
const sentBody = crypto.randomBytes(64).toString('base64');
const receivedBarrier = makeBarrier();
let receivedId;
let receivedBody;
const queueReader = new SQSReader(
AWS_SQS_QUEUE_URL,
async (message) => {
receivedId = message.MessageId;
receivedBody = message.Body;
if (receivedBody === sentBody) {
await receivedBarrier();
} else {
// ignore unexpected messages in case there is pending junk in the queue
console.log(`Received unexpected message ${receivedId}: ${receivedBody}`);
}
},
{ waitTimeSeconds: 0, initialIdleDelaySeconds: 30, logger }
);
queueReader.start();
// test resume when not sleeping
queueReader.resume();
await sleepBarrier();
const sendResult = await sqs
.sendMessage({
QueueUrl: AWS_SQS_QUEUE_URL,
MessageBody: sentBody,
})
.promise();
const sentId = sendResult.MessageId;
// test resume when sleeping
queueReader.resume();
await receivedBarrier();
queueReader.stop();
await queueReader.join();
expect(receivedId).to.equal(sentId);
expect(receivedBody).to.equal(sentBody);
});
it('exits on error', async function () {
const queueReader = new SQSReader('invalid', async () => {
fail('never called');
});
queueReader.start();
try {
await queueReader.join();
fail('exception expected');
} catch (e) {
expect((e as AWSError).code).to.equal('UnknownEndpoint');
}
});
});