-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathChannel.ts
53 lines (45 loc) · 1.38 KB
/
Channel.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import { QueueService } from './QueueService';
import { container, TYPES } from "./Container";
export interface IChannel {
sendToQueue: (queue: string, message: Buffer, options?: {}) => Promise<any>;
assertQueue: (queue, options) => Promise<any>;
consume: (
queue: string,
callback: (data: { content: Buffer }) => void
) => void;
getQueue: (queue: string) => any;
}
export class Channel implements IChannel {
private queueService: QueueService;
constructor() {
this.queueService = container.get<QueueService>(TYPES.queueService);
}
async sendToQueue(queue: string, message: Buffer, options?: {}) {
return new Promise((resolve, reject) => {
try {
this.queueService.publish(queue, message);
resolve();
} catch (e) {
reject(e);
}
});
}
async assertQueue(queue: string, options: {}) {
return new Promise(resolve => {
resolve();
});
}
consume(queue: string, callback: (data: { content: Buffer }) => void) {
const currentQueue: Buffer[] = this.queueService.consume(queue);
if (currentQueue && currentQueue.length > 0) {
for (let i = 0; i < currentQueue.length; i++) {
const message = { content: currentQueue[i] };
delete currentQueue[i];
callback(message);
}
}
}
getQueue(queue: string) {
return this.queueService.consume(queue, { readonly: true });
}
}